Files
sub-bot/bot.py

951 lines
42 KiB
Python
Raw Normal View History

2026-02-25 17:18:33 +08:00
import os, json, time, base64, asyncio, re, urllib.request, logging, threading, secrets
2026-02-23 00:04:38 +08:00
from http.server import HTTPServer, BaseHTTPRequestHandler
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
log = logging.getLogger(__name__)
TOKEN = os.environ.get('BOT_TOKEN', '')
ADMIN_ID = int(os.environ.get('ADMIN_ID', '0'))
DATA_FILE = '/opt/sub-bot/data.json'
2026-02-23 00:04:38 +08:00
PROTOS = ['ss://', 'vmess://', 'vless://', 'trojan://', 'hysteria2://', 'hy2://', 'tuic://']
AUTO_DEL = 60
SUB_SECRET = os.environ.get('SUB_SECRET', 'changeme')
2026-02-25 17:18:33 +08:00
SUB_HOST = os.environ.get('SUB_HOST', 'substore.mjjtop.com')
2026-02-23 00:04:38 +08:00
WAITING_ADD = set() # user_ids waiting to add sub
def _status_icon(s):
"""节点状态图标True=🟢 False=🔴 None=🟡"""
v = s.get('alive')
if v is True: return '🟢'
if v is False: return '🔴'
return '🟡'
def get_default_secret():
"""获取默认节点池的 secret优先用 data.json 里的,没有就用环境变量"""
data = load_data()
return data.get('default_secret', SUB_SECRET)
2026-02-23 00:04:38 +08:00
def load_data():
if os.path.exists(DATA_FILE):
2026-02-25 17:18:33 +08:00
with open(DATA_FILE) as f:
d = json.load(f)
d.setdefault('sub_groups', [])
return d
return {'subs': [], 'groups': [], 'sub_groups': []}
2026-02-23 00:04:38 +08:00
def save_data(data):
os.makedirs(os.path.dirname(DATA_FILE), exist_ok=True)
with open(DATA_FILE, 'w') as f: json.dump(data, f, ensure_ascii=False, indent=2)
def detect_type(link):
for p in PROTOS:
if link.startswith(p): return p.replace('://', '')
return 'unknown'
def parse_surge_ss(text):
"""Parse Surge format: Name = ss, server, port, encrypt-method=x, password=y"""
m = re.match(r'^(.+?)\s*=\s*ss\s*,\s*([^,]+)\s*,\s*(\d+)\s*,\s*(.+)$', text.strip())
if not m: return None, None
name = m.group(1).strip()
server = m.group(2).strip()
port = m.group(3).strip()
params = dict(re.findall(r'([\w-]+)\s*=\s*([^,]+)', m.group(4)))
method = params.get('encrypt-method', '').strip()
pwd = params.get('password', '').strip()
if not method or not pwd: return None, None
# build ss:// link
raw = f'{method}:{pwd}@{server}:{port}'
encoded = base64.b64encode(raw.encode()).decode().rstrip('=')
ss_link = f'ss://{encoded}#{urllib.request.quote(name)}'
return ss_link, name
def extract_name(link):
if '#' in link: return urllib.request.unquote(link.split('#')[-1])
return link[:30]
async def auto_del(msg, delay=AUTO_DEL):
await asyncio.sleep(delay)
try: await msg.delete()
except: pass
async def send_del(target, text, delay=AUTO_DEL, **kw):
msg = await target.reply_text(text, **kw)
asyncio.ensure_future(auto_del(msg, delay))
return msg
async def is_member(update, context):
uid = update.effective_user.id
if uid == ADMIN_ID: return True
data = load_data()
groups = data.get('groups', [])
if not groups: return True
for gid in groups:
try:
m = await context.bot.get_chat_member(gid, uid)
if m.status in ('member', 'administrator', 'creator'): return True
except: pass
return False
def add_sub(link, user):
proto = detect_type(link)
if proto == 'unknown': return None
data = load_data()
for s in data['subs']:
if s['link'] == link: return 'dup'
name = extract_name(link)
data['subs'].append({'link': link, 'type': proto, 'name': name,
'added_by': user.id, 'added_name': user.first_name,
'added_at': int(time.time()), 'alive': True})
save_data(data)
return f'✅ 已添加 [{proto}] {name}'
2026-02-25 17:18:33 +08:00
def add_sub_to_group(link, user, group_id):
"""添加节点到指定分组"""
proto = detect_type(link)
if proto == 'unknown': return None
data = load_data()
sg = next((g for g in data.get('sub_groups', []) if g['id'] == group_id), None)
if not sg: return '❌ 分组不存在'
for s in sg['nodes']:
if s['link'] == link: return 'dup'
name = extract_name(link)
sg['nodes'].append({'link': link, 'type': proto, 'name': name,
'added_by': user.id, 'added_name': user.first_name,
'added_at': int(time.time()), 'alive': True})
save_data(data)
return f'✅ 已添加到 [{sg["name"]}] [{proto}] {name}'
2026-02-23 00:04:38 +08:00
# --- /help: main menu with buttons ---
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await is_member(update, context):
return await update.message.reply_text('⛔ 仅限群成员使用')
data = load_data()
default_cnt = len([s for s in data['subs'] if s.get('alive', True)])
sg_cnt = len(data.get('sub_groups', []))
2026-02-23 00:04:38 +08:00
buttons = [
[InlineKeyboardButton(f"📦 默认节点池 ({default_cnt})", callback_data='menu_default')],
[InlineKeyboardButton(f"📁 分组管理 ({sg_cnt}个分组)", callback_data='sg_list')],
[InlineKeyboardButton("🔍 全部检测", callback_data='menu_check')],
2026-02-23 00:04:38 +08:00
]
if update.effective_user.id == ADMIN_ID:
buttons.append([InlineKeyboardButton("⚙️ 绑定当前群", callback_data='menu_setgroup')])
await send_del(update.message,
'🚀 *订阅管理 Bot*\n\n'
'发送订阅链接自动入库到默认节点池\n'
'或通过分组管理创建独立订阅',
2026-02-23 00:04:38 +08:00
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
# --- menu callbacks ---
async def cb_menu(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
action = q.data
data = load_data()
if action == 'menu_default':
cnt = len(data['subs'])
alive_cnt = len([s for s in data['subs'] if s.get('alive', True)])
host = SUB_HOST
url = f"https://{host}/{get_default_secret()}/download?target=ClashMeta"
buttons = [
[InlineKeyboardButton("📋 节点列表", callback_data='menu_list'),
InlineKeyboardButton("📥 获取订阅", callback_data='menu_get')],
[InlineKeyboardButton(" 添加节点", callback_data='menu_add'),
InlineKeyboardButton("🗑 删除节点", callback_data='menu_del')],
[InlineKeyboardButton("🔄 重置订阅链接", callback_data='menu_reset_secret')],
[InlineKeyboardButton("◀️ 返回主菜单", callback_data='menu_home')],
]
try:
await q.edit_message_text(
f'📦 *默认节点池*\n\n总计 {cnt} 个节点,{alive_cnt} 个可用\n🔗 订阅链接:\n`{url}`',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message,
f'📦 *默认节点池*\n\n总计 {cnt} 个节点,{alive_cnt} 个可用\n🔗 订阅链接:\n`{url}`',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_reset_secret':
new_secret = secrets.token_hex(16)
data['default_secret'] = new_secret
save_data(data)
host = SUB_HOST
url = f"https://{host}/{new_secret}/download?target=ClashMeta"
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data='menu_default')]]
try:
await q.edit_message_text(f"✅ 默认订阅链接已重置\n\n旧链接已失效\n新链接:\n`{url}`",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"✅ 默认订阅链接已重置\n\n旧链接已失效\n新链接:\n`{url}`",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_home':
default_cnt = len([s for s in data['subs'] if s.get('alive', True)])
sg_cnt = len(data.get('sub_groups', []))
buttons = [
[InlineKeyboardButton(f"📦 默认节点池 ({default_cnt})", callback_data='menu_default')],
[InlineKeyboardButton(f"📁 分组管理 ({sg_cnt}个分组)", callback_data='sg_list')],
[InlineKeyboardButton("🔍 全部检测", callback_data='menu_check')],
]
try:
await q.edit_message_text(
'🚀 *订阅管理 Bot*\n\n发送订阅链接自动入库到默认节点池\n或通过分组管理创建独立订阅',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except: pass
elif action == 'menu_list':
2026-02-23 00:04:38 +08:00
if not data['subs']:
return await send_del(q.message, '📭 暂无订阅')
lines = [f"`{i+1}` {_status_icon(s)} [{s['type']}] {s['name']}"
2026-02-23 00:04:38 +08:00
for i, s in enumerate(data['subs'])]
await send_del(q.message, '📋 *订阅列表*\n\n' + '\n'.join(lines), parse_mode='Markdown')
elif action == 'menu_get':
alive = [s for s in data['subs'] if s.get('alive', True)]
if not alive:
return await send_del(q.message, '📭 暂无可用订阅')
types = sorted(set(s['type'] for s in alive))
buttons = [[InlineKeyboardButton(f"📦 全部 ({len(alive)})", callback_data='get_all_raw')]]
for t in types:
cnt = sum(1 for s in alive if s['type'] == t)
buttons.append([
InlineKeyboardButton(f"🔗 {t} ({cnt})", callback_data=f'get_{t}_raw'),
InlineKeyboardButton(f"📄 {t} Base64", callback_data=f'get_{t}_b64')])
buttons.append([InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data='get_all_clash')])
buttons.append([InlineKeyboardButton("🎯 自选节点", callback_data='menu_pick_multi')])
2026-02-23 00:04:38 +08:00
await send_del(q.message, '选择格式:', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_del':
if not data['subs']:
return await send_del(q.message, '📭 暂无订阅')
uid = q.from_user.id
buttons = []
for i, s in enumerate(data['subs']):
if uid != ADMIN_ID and uid != s.get('added_by'): continue
st = _status_icon(s)
2026-02-23 00:04:38 +08:00
buttons.append([InlineKeyboardButton(f"{st} [{s['type']}] {s['name']}", callback_data=f'del_{i}')])
if not buttons:
return await send_del(q.message, '没有可删除的订阅')
buttons.append([InlineKeyboardButton("🗑 删除所有不可用", callback_data='del_alldown')])
await send_del(q.message, '选择要删除的:', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_add':
WAITING_ADD.add(q.from_user.id)
await send_del(q.message, '📎 请发送订阅链接(支持多条,空格或换行分隔)\n支持: ss/vmess/vless/trojan/hy2/tuic')
elif action == 'menu_check':
msg = await send_del(q.message, '🔍 检测中...', delay=120)
results = []
# 检测默认分组
if data['subs']:
results.append('📦 *默认分组*')
for s in data['subs']:
ok = await check_node(s['link'], s['type'])
s['alive'] = ok
results.append(f"{_status_icon(s)} [{s['type']}] {s['name']}")
# 检测所有订阅分组
for sg in data.get('sub_groups', []):
if not sg.get('nodes'): continue
results.append(f"\n📁 *{sg['name']}*")
for s in sg['nodes']:
ok = await check_node(s['link'], s['type'])
s['alive'] = ok
results.append(f"{_status_icon(s)} [{s['type']}] {s['name']}")
2026-02-23 00:04:38 +08:00
save_data(data)
try: await msg.delete()
except: pass
if not results:
await send_del(q.message, '📭 暂无订阅')
else:
results.append('\n⚠️ _检测从服务器发起专线节点可能显示不通但实际可用_')
await send_del(q.message, '📊 *检测结果*\n\n' + '\n'.join(results), parse_mode='Markdown')
2026-02-23 00:04:38 +08:00
elif action == 'menu_pick_multi':
alive = [s for s in data['subs'] if s.get('alive', True)]
if not alive:
return await send_del(q.message, '📭 暂无可用订阅')
context.user_data['multi_sel'] = set()
buttons = [[InlineKeyboardButton(f"{s['name']}", callback_data=f'msel_{i}')] for i, s in enumerate(alive)]
buttons.append([InlineKeyboardButton("✅ 确认", callback_data='msel_done')])
await send_del(q.message, '🎯 点击选择节点:', reply_markup=InlineKeyboardMarkup(buttons), delay=120)
2026-02-23 00:04:38 +08:00
elif action == 'menu_setgroup':
if q.from_user.id != ADMIN_ID:
return await send_del(q.message, '⛔ 仅管理员可用')
cid = q.message.chat.id
if cid not in data.get('groups', []):
data.setdefault('groups', []).append(cid)
save_data(data)
await send_del(q.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown')
# --- get/pick/del callbacks ---
async def cb_getsub(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = load_data()
alive = [s for s in data['subs'] if s.get('alive', True)]
action = q.data
if action.startswith('pick_'):
idx = int(action.split('_')[1])
if idx < len(alive):
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{idx}_raw')],
[InlineKeyboardButton("⚡ Clash Meta", callback_data=f'fmt_{idx}_clash')],
[InlineKeyboardButton("📎 Clash Meta 订阅URL", callback_data=f'fmt_{idx}_url')],
2026-02-23 00:04:38 +08:00
]
await send_del(q.message, f'📍 {alive[idx]["name"]}\n选择输出格式:',
reply_markup=InlineKeyboardMarkup(buttons))
return
if action.startswith('fmt_'):
parts = action.split('_')
idx, fmt = int(parts[1]), parts[2]
if idx < len(alive):
s = alive[idx]
if fmt == 'raw':
await send_del(q.message, s["link"])
elif fmt == 'clash':
2026-02-23 00:04:38 +08:00
cm = gen_clash_meta([s])
await send_del(q.message, f'```yaml\n{cm}\n```', parse_mode='Markdown')
elif fmt == 'url':
url = f'https://substore.mjjtop.com/{get_default_secret()}/download?target=ClashMeta'
await send_del(q.message, f'📎 订阅URL:\n{url}')
2026-02-23 00:04:38 +08:00
return
if action.startswith('send_'):
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
# ask format first
if fmt == 'raw':
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'out_{proto}_raw')],
[InlineKeyboardButton("⚡ Clash Meta", callback_data=f'out_{proto}_clash')],
[InlineKeyboardButton("📄 Base64", callback_data=f'out_{proto}_b64')],
]
return await send_del(q.message, f'📦 全部 ({len(subs)}) — 选择格式:',
reply_markup=InlineKeyboardMarkup(buttons))
if action.startswith('out_'):
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
if fmt == 'clash':
url = f'https://substore.mjjtop.com/{get_default_secret()}/download?target=ClashMeta'
2026-02-23 00:04:38 +08:00
if proto != 'all': url += f'&type={proto}'
return await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}')
links = '\n'.join(s['link'] for s in subs)
if fmt == 'b64':
return await send_del(q.message, f'```\n{base64.b64encode(links.encode()).decode()}\n```', parse_mode='Markdown')
if len(links) > 4000:
for s in subs:
await send_del(q.message, s['link'])
else:
await send_del(q.message, links)
return
if action == 'get_all_clash':
url = f'https://substore.mjjtop.com/{get_default_secret()}/download?target=ClashMeta'
2026-02-23 00:04:38 +08:00
await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}')
return
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
if not subs:
return await send_del(q.message, '📭 无匹配')
if fmt == 'raw' and len(subs) > 1:
buttons = []
for i, s in enumerate(subs):
gi = alive.index(s)
buttons.append([InlineKeyboardButton(
f"{_status_icon(s)} {s['name']}", callback_data=f'pick_{gi}')])
2026-02-23 00:04:38 +08:00
buttons.append([InlineKeyboardButton(f"📦 全部 ({len(subs)})", callback_data=f'send_{proto}_raw')])
return await send_del(q.message, '选择节点:', reply_markup=InlineKeyboardMarkup(buttons))
if fmt == 'raw' and len(subs) == 1:
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{alive.index(subs[0])}_raw')],
[InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data=f'fmt_{alive.index(subs[0])}_clash')],
]
return await send_del(q.message, f'📍 {subs[0]["name"]}\n选择输出格式:',
reply_markup=InlineKeyboardMarkup(buttons))
links = '\n'.join(s['link'] for s in subs)
if fmt == 'b64':
await send_del(q.message, base64.b64encode(links.encode()).decode())
else:
await send_del(q.message, links)
async def cb_multisel(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = load_data()
alive = [s for s in data['subs'] if s.get('alive', True)]
sel = context.user_data.get('multi_sel', set())
action = q.data
if action == 'msel_done':
if not sel:
return await send_del(q.message, '⚠️ 未选择任何节点')
chosen = [alive[i] for i in sorted(sel) if i < len(alive)]
if not chosen:
return await send_del(q.message, '⚠️ 选择已失效')
context.user_data['multi_sel'] = set()
names = ', '.join(s['name'] for s in chosen)
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data='mout_raw')],
[InlineKeyboardButton("⚡ Clash Meta", callback_data='mout_clash')],
[InlineKeyboardButton("📎 订阅URL", callback_data='mout_url')],
[InlineKeyboardButton("📄 Base64", callback_data='mout_b64')],
]
context.user_data['multi_chosen'] = chosen
await send_del(q.message, f'已选 {len(chosen)} 个: {names}\n选择输出格式:',
reply_markup=InlineKeyboardMarkup(buttons))
return
idx = int(action.split('_')[1])
if idx in sel:
sel.discard(idx)
else:
sel.add(idx)
context.user_data['multi_sel'] = sel
buttons = []
for i, s in enumerate(alive):
mark = '' if i in sel else ''
buttons.append([InlineKeyboardButton(f"{mark} {s['name']}", callback_data=f'msel_{i}')])
buttons.append([InlineKeyboardButton(f"✅ 确认 ({len(sel)})", callback_data='msel_done')])
try:
await q.message.edit_text('🎯 点击选择节点:', reply_markup=InlineKeyboardMarkup(buttons))
except: pass
async def cb_multiout(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
chosen = context.user_data.get('multi_chosen', [])
if not chosen:
return await send_del(q.message, '⚠️ 选择已过期,请重新操作')
fmt = q.data.split('_')[1]
if fmt == 'raw':
links = '\n'.join(s['link'] for s in chosen)
await send_del(q.message, links)
elif fmt == 'clash':
cm = gen_clash_meta(chosen)
await send_del(q.message, f'```yaml\n{cm}\n```', parse_mode='Markdown')
elif fmt == 'url':
names = ','.join(urllib.request.quote(s['name']) for s in chosen)
url = f'https://substore.mjjtop.com/{get_default_secret()}/download?target=ClashMeta&name={names}'
await send_del(q.message, f'📎 订阅URL:\n{url}')
elif fmt == 'b64':
links = '\n'.join(s['link'] for s in chosen)
await send_del(q.message, f'```\n{base64.b64encode(links.encode()).decode()}\n```', parse_mode='Markdown')
context.user_data.pop('multi_chosen', None)
2026-02-23 00:04:38 +08:00
async def cb_delsub(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = load_data()
action = q.data
uid = q.from_user.id
if action == 'del_alldown':
dead = [s for s in data['subs'] if not s.get('alive', True)]
if uid != ADMIN_ID:
dead = [s for s in dead if s.get('added_by') == uid]
if not dead:
return await send_del(q.message, '没有不可用节点')
names = [s['name'] for s in dead]
for s in dead: data['subs'].remove(s)
save_data(data)
return await send_del(q.message, f"🗑 已删除 {len(names)} 个:\n" + '\n'.join(f'{n}' for n in names))
idx = int(action.split('_')[1])
if idx >= len(data['subs']):
return await send_del(q.message, '❌ 已失效')
s = data['subs'][idx]
if uid != ADMIN_ID and uid != s.get('added_by'):
return await send_del(q.message, '⛔ 只能删除自己上传的')
removed = data['subs'].pop(idx)
save_data(data)
await send_del(q.message, f"🗑 已删除: {removed['name']}")
# --- auto detect links in messages ---
async def auto_detect(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message or not update.message.text: return
text = update.message.text.strip()
uid = update.effective_user.id
2026-02-25 17:18:33 +08:00
# --- 分组相关文本输入处理 ---
# 创建分组:等待输入名称
if context.user_data.get('waiting_sg_name'):
del context.user_data['waiting_sg_name']
data = load_data()
gid = secrets.token_hex(4)
sec = secrets.token_hex(16)
data['sub_groups'].append({
'id': gid, 'name': text, 'secret': sec,
'nodes': [], 'created_at': int(time.time())
})
save_data(data)
host = SUB_HOST
url = f"https://{host}/{sec}/download?target=ClashMeta"
return await send_del(update.message,
f"✅ 分组 *{text}* 已创建\n\n🔗 订阅链接:\n`{url}`", parse_mode='Markdown')
# 重命名分组:等待输入新名称
if context.user_data.get('waiting_sg_rename'):
gid = context.user_data.pop('waiting_sg_rename')
data = load_data()
sg = next((g for g in data.get('sub_groups', []) if g['id'] == gid), None)
if sg:
old = sg['name']
sg['name'] = text
save_data(data)
return await send_del(update.message, f"✅ 分组已重命名: {old}{text}")
return await send_del(update.message, '❌ 分组不存在')
# 添加节点到分组:等待输入链接
if context.user_data.get('waiting_sg_add'):
gid = context.user_data.pop('waiting_sg_add')
found = [w for w in text.split() if any(w.startswith(p) for p in PROTOS)]
surge_links = []
if not found:
for line in text.split('\n'):
ss_link, name = parse_surge_ss(line)
if ss_link: surge_links.append(ss_link)
if not found and not surge_links:
return await send_del(update.message, '❌ 未识别到订阅链接')
results = []
for link in found + surge_links:
r = add_sub_to_group(link, update.effective_user, gid)
if r and r != 'dup': results.append(r)
elif r == 'dup': results.append(f'⚠️ 已存在: {extract_name(link)}')
if results:
return await send_del(update.message, '\n'.join(results))
return
# --- 原有逻辑 ---
2026-02-23 00:04:38 +08:00
# check standard protocol links
found = [w for w in text.split() if any(w.startswith(p) for p in PROTOS)]
# check Surge format (line-based)
surge_links = []
if not found:
for line in text.split('\n'):
ss_link, name = parse_surge_ss(line)
if ss_link: surge_links.append(ss_link)
if not found and not surge_links:
if uid in WAITING_ADD:
WAITING_ADD.discard(uid)
await send_del(update.message, '❌ 未识别到订阅链接')
return
if not await is_member(update, context): return
WAITING_ADD.discard(uid)
results = []
for link in found + surge_links:
r = add_sub(link, update.effective_user)
if r and r != 'dup': results.append(r)
elif r == 'dup': results.append(f'⚠️ 已存在: {extract_name(link)}')
if results:
await send_del(update.message, '\n'.join(results))
# --- node health check ---
async def check_node(link, proto):
"""检测节点存活TCP 直连,连不通返回 None未知"""
2026-02-23 00:04:38 +08:00
try:
server, port = parse_sp(link, proto)
if not server: return None
2026-02-23 00:04:38 +08:00
_, w = await asyncio.wait_for(asyncio.open_connection(server, int(port)), timeout=5)
w.close(); await w.wait_closed()
return True
except:
return None # 连不通返回未知,不判定为死
2026-02-23 00:04:38 +08:00
def parse_sp(link, proto):
try:
if proto == "ss":
2026-02-23 00:04:38 +08:00
raw = link[5:]
if "#" in raw: raw = raw.split("#")[0]
if "@" in raw:
sp = raw.split("@")[-1]
return sp.rsplit(":", 1)
try:
decoded = base64.b64decode(raw + "==").decode()
_, rest = decoded.split(":", 1)
_, sp = rest.rsplit("@", 1)
return sp.rsplit(":", 1)
except: pass
2026-02-23 00:04:38 +08:00
m = re.search(r'@([^:/?#]+):(\d+)', link)
if m: return m.group(1), m.group(2)
except: pass
return None, None
def parse_to_clash_proxy(s):
try:
if s['type'] == 'ss':
raw = s['link'][5:]
name = ''
if '#' in raw: raw, name = raw.rsplit('#', 1); name = urllib.request.unquote(name)
if "@" in raw:
userinfo, sp = raw.rsplit("@", 1)
server, port = sp.rsplit(":", 1)
decoded = base64.b64decode(userinfo + "==").decode()
method, pwd = decoded.split(":", 1)
else:
decoded = base64.b64decode(raw + "==").decode()
method, rest = decoded.split(":", 1)
pwd, sp = rest.rsplit("@", 1)
server, port = sp.rsplit(":", 1)
2026-02-23 00:04:38 +08:00
return {'name': name or server, 'type': 'ss', 'server': server, 'port': int(port),
'cipher': method, 'password': pwd}
except: pass
return None
def gen_clash_meta(subs):
lines = ['proxies:']
for s in subs:
p = parse_to_clash_proxy(s)
if p: lines.append(f' - {json.dumps(p, ensure_ascii=False)}')
return '\n'.join(lines)
# --- HTTP subscription endpoint ---
class SubHandler(BaseHTTPRequestHandler):
def do_GET(self):
path = self.path.split('?')[0]
2026-02-25 17:18:33 +08:00
# 解析查询参数
2026-02-23 00:04:38 +08:00
qs = {}
if '?' in self.path:
qs = dict(x.split('=',1) for x in self.path.split('?',1)[1].split('&') if '=' in x)
2026-02-25 17:18:33 +08:00
# 匹配默认分组或自定义分组
alive = []
if path == f'/{get_default_secret()}/download':
2026-02-25 17:18:33 +08:00
data = load_data()
alive = [s for s in data['subs'] if s.get('alive', True)]
else:
# 尝试匹配分组 secret
parts = path.strip('/').split('/')
if len(parts) == 2 and parts[1] == 'download':
group_secret = parts[0]
data = load_data()
sg = next((g for g in data.get('sub_groups', []) if g['secret'] == group_secret), None)
if sg:
alive = [s for s in sg.get('nodes', []) if s.get('alive', True)]
else:
self.send_response(404)
self.end_headers()
return
else:
self.send_response(404)
self.end_headers()
return
2026-02-23 00:04:38 +08:00
ftype = qs.get('type', '')
if ftype: alive = [s for s in alive if s['type'] == ftype]
fname = qs.get('name', '')
if fname:
names = [urllib.request.unquote(n) for n in fname.split(',')]
alive = [s for s in alive if s['name'] in names]
2026-02-23 00:04:38 +08:00
target = qs.get('target', 'ClashMeta')
body = gen_clash_meta(alive) if target == 'ClashMeta' else '\n'.join(s['link'] for s in alive)
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(body.encode())
def log_message(self, *a): pass
def start_http():
srv = HTTPServer(('0.0.0.0', 18888), SubHandler)
log.info('Sub HTTP on :18888')
srv.serve_forever()
# --- legacy commands still work ---
async def cmd_setgroup(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != ADMIN_ID:
return await update.message.reply_text('⛔ 仅管理员可用')
cid = update.effective_chat.id
data = load_data()
if cid not in data.get('groups', []):
data.setdefault('groups', []).append(cid)
save_data(data)
await send_del(update.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown')
2026-02-25 17:18:33 +08:00
# --- 分组管理回调 ---
async def cb_subgroups(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
action = q.data
data = load_data()
sgs = data.get('sub_groups', [])
if action == 'sg_list':
buttons = []
for sg in sgs:
cnt = len(sg.get('nodes', []))
buttons.append([InlineKeyboardButton(f"📁 {sg['name']} ({cnt}个节点)", callback_data=f"sg_detail_{sg['id']}")])
buttons.append([InlineKeyboardButton(" 创建分组", callback_data='sg_create')])
buttons.append([InlineKeyboardButton("◀️ 返回主菜单", callback_data='sg_back')])
2026-02-25 17:18:33 +08:00
text = f"📁 *分组管理*\n\n{len(sgs)} 个分组" if sgs else "📁 *分组管理*\n\n暂无分组,点击创建"
try:
await q.edit_message_text(text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'sg_create':
context.user_data['waiting_sg_name'] = True
try:
await q.edit_message_text('📝 请发送分组名称:')
except:
await send_del(q.message, '📝 请发送分组名称:')
elif action.startswith('sg_detail_'):
gid = action[len('sg_detail_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg:
return await send_del(q.message, '❌ 分组不存在')
cnt = len(sg.get('nodes', []))
host = SUB_HOST
url = f"https://{host}/{sg['secret']}/download?target=ClashMeta"
text = (f"📁 *{sg['name']}*\n\n"
f"📊 节点数: {cnt}\n"
f"🔗 订阅链接:\n`{url}`")
buttons = [
[InlineKeyboardButton("📋 节点列表", callback_data=f"sg_nodes_{gid}"),
InlineKeyboardButton(" 添加节点", callback_data=f"sg_add_{gid}")],
[InlineKeyboardButton("🗑 删除节点", callback_data=f"sg_delnodes_{gid}"),
InlineKeyboardButton("🔍 检测节点", callback_data=f"sg_check_{gid}")],
2026-02-25 17:18:33 +08:00
[InlineKeyboardButton("📥 获取订阅", callback_data=f"sg_getsub_{gid}"),
InlineKeyboardButton("🔄 重置链接", callback_data=f"sg_reset_{gid}")],
[InlineKeyboardButton("✏️ 重命名", callback_data=f"sg_rename_{gid}"),
InlineKeyboardButton("🗑 删除分组", callback_data=f"sg_delconfirm_{gid}")],
[InlineKeyboardButton("◀️ 返回分组列表", callback_data='sg_list')],
]
try:
await q.edit_message_text(text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_nodes_'):
gid = action[len('sg_nodes_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
nodes = sg.get('nodes', [])
if not nodes:
text = f"📁 *{sg['name']}* — 暂无节点"
else:
lines = [f"📁 *{sg['name']}* 节点列表\n"]
for i, s in enumerate(nodes):
st = _status_icon(s)
2026-02-25 17:18:33 +08:00
lines.append(f"`{i+1}` {st} [{s['type']}] {s['name']}")
text = '\n'.join(lines)
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
try:
await q.edit_message_text(text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_add_'):
gid = action[len('sg_add_'):]
context.user_data['waiting_sg_add'] = gid
sg = next((g for g in sgs if g['id'] == gid), None)
name = sg['name'] if sg else '?'
try:
await q.edit_message_text(f'📎 请发送要添加到 [{name}] 的订阅链接:')
except:
await send_del(q.message, f'📎 请发送要添加到 [{name}] 的订阅链接:')
elif action.startswith('sg_check_'):
gid = action[len('sg_check_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
nodes = sg.get('nodes', [])
if not nodes:
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
return await send_del(q.message, f"📁 *{sg['name']}* 暂无节点",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
msg = await send_del(q.message, f"🔍 检测 *{sg['name']}* 中...", parse_mode='Markdown', delay=120)
results = []
for s in nodes:
ok = await check_node(s['link'], s['type'])
s['alive'] = ok
results.append(f"{_status_icon(s)} [{s['type']}] {s['name']}")
save_data(data)
try: await msg.delete()
except: pass
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
await send_del(q.message, f"📊 *{sg['name']}* 检测结果\n\n" + '\n'.join(results),
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_delnodes_'):
gid = action[len('sg_delnodes_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
nodes = sg.get('nodes', [])
if not nodes:
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
return await send_del(q.message, f"📁 *{sg['name']}* 暂无节点",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
buttons = []
for i, s in enumerate(nodes):
st = _status_icon(s)
buttons.append([InlineKeyboardButton(
f"{st} [{s['type']}] {s['name']}", callback_data=f"sg_delnode_{gid}_{i}")])
buttons.append([InlineKeyboardButton("🗑 删除所有不可用", callback_data=f"sg_deldown_{gid}")])
buttons.append([InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")])
try:
await q.edit_message_text(f"🗑 *{sg['name']}* — 选择要删除的节点:",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"🗑 *{sg['name']}* — 选择要删除的节点:",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_delnode_'):
# sg_delnode_{gid}_{idx}
parts = action[len('sg_delnode_'):].rsplit('_', 1)
if len(parts) != 2: return
gid, idx_str = parts
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
idx = int(idx_str)
nodes = sg.get('nodes', [])
if idx >= len(nodes):
return await send_del(q.message, '❌ 已失效')
removed = nodes.pop(idx)
save_data(data)
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
try:
await q.edit_message_text(f"✅ 已删除: [{removed['type']}] {removed['name']}",
reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"✅ 已删除: [{removed['type']}] {removed['name']}",
reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_deldown_'):
gid = action[len('sg_deldown_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
dead = [s for s in sg.get('nodes', []) if not s.get('alive', True)]
if not dead:
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
return await send_del(q.message, '没有不可用节点', reply_markup=InlineKeyboardMarkup(buttons))
names = [s['name'] for s in dead]
sg['nodes'] = [s for s in sg['nodes'] if s.get('alive', True)]
save_data(data)
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
try:
await q.edit_message_text(f"🗑 已删除 {len(names)} 个不可用:\n" + '\n'.join(f'{n}' for n in names),
reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"🗑 已删除 {len(names)} 个不可用:\n" + '\n'.join(f'{n}' for n in names),
reply_markup=InlineKeyboardMarkup(buttons))
2026-02-25 17:18:33 +08:00
elif action.startswith('sg_getsub_'):
gid = action[len('sg_getsub_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
host = SUB_HOST
url = f"https://{host}/{sg['secret']}/download?target=ClashMeta"
buttons = [
[InlineKeyboardButton("📋 复制链接", callback_data=f"sg_detail_{gid}")],
[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")],
]
await send_del(q.message, f"📎 *{sg['name']}* 订阅链接:\n\n`{url}`",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_reset_'):
gid = action[len('sg_reset_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
sg['secret'] = secrets.token_hex(16)
save_data(data)
host = SUB_HOST
url = f"https://{host}/{sg['secret']}/download?target=ClashMeta"
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
try:
await q.edit_message_text(f"✅ 链接已重置\n\n新链接:\n`{url}`",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"✅ 链接已重置\n\n新链接:\n`{url}`",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_rename_'):
gid = action[len('sg_rename_'):]
context.user_data['waiting_sg_rename'] = gid
try:
await q.edit_message_text('✏️ 请发送新的分组名称:')
except:
await send_del(q.message, '✏️ 请发送新的分组名称:')
elif action.startswith('sg_delconfirm_'):
gid = action[len('sg_delconfirm_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
buttons = [
[InlineKeyboardButton("⚠️ 确认删除", callback_data=f"sg_dodel_{gid}"),
InlineKeyboardButton("取消", callback_data=f"sg_detail_{gid}")],
]
try:
await q.edit_message_text(f"⚠️ 确认删除分组 *{sg['name']}*\n节点数据将全部丢失!",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"⚠️ 确认删除分组 *{sg['name']}*",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_dodel_'):
gid = action[len('sg_dodel_'):]
data['sub_groups'] = [g for g in sgs if g['id'] != gid]
save_data(data)
buttons = [[InlineKeyboardButton("◀️ 返回分组列表", callback_data='sg_list')]]
try:
await q.edit_message_text("✅ 分组已删除", reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, "✅ 分组已删除", reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'sg_back':
2026-02-25 17:18:33 +08:00
# 返回主菜单
default_cnt = len([s for s in data['subs'] if s.get('alive', True)])
sg_cnt = len(data.get('sub_groups', []))
2026-02-25 17:18:33 +08:00
buttons = [
[InlineKeyboardButton(f"📦 默认节点池 ({default_cnt})", callback_data='menu_default')],
[InlineKeyboardButton(f"📁 分组管理 ({sg_cnt}个分组)", callback_data='sg_list')],
[InlineKeyboardButton("🔍 全部检测", callback_data='menu_check')],
2026-02-25 17:18:33 +08:00
]
try:
await q.edit_message_text(
'🚀 *订阅管理 Bot*\n\n发送订阅链接自动入库到默认节点池\n或通过分组管理创建独立订阅',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
2026-02-25 17:18:33 +08:00
except: pass
2026-02-23 00:04:38 +08:00
def main():
app = Application.builder().token(TOKEN).build()
app.add_handler(CommandHandler('start', cmd_help))
app.add_handler(CommandHandler('vps', cmd_help))
app.add_handler(CommandHandler('setgroup', cmd_setgroup))
app.add_handler(CallbackQueryHandler(cb_menu, pattern='^menu_'))
app.add_handler(CallbackQueryHandler(cb_delsub, pattern='^del_'))
app.add_handler(CallbackQueryHandler(cb_multisel, pattern='^msel_'))
app.add_handler(CallbackQueryHandler(cb_multiout, pattern='^mout_'))
2026-02-23 00:04:38 +08:00
app.add_handler(CallbackQueryHandler(cb_getsub, pattern='^(get_|pick_|send_|fmt_|out_)'))
app.add_handler(CallbackQueryHandler(cb_subgroups, pattern='^sg_'))
2026-02-23 00:04:38 +08:00
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, auto_detect))
log.info('Sub Bot starting polling...')
threading.Thread(target=start_http, daemon=True).start()
app.run_polling(drop_pending_updates=True)
if __name__ == '__main__':
main()