import os, json, time, base64, asyncio, re, urllib.request, logging, threading, secrets from http.server import HTTPServer, BaseHTTPRequestHandler from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') log = logging.getLogger(__name__) TOKEN = os.environ.get('BOT_TOKEN', '') ADMIN_ID = int(os.environ.get('ADMIN_ID', '0')) DATA_FILE = '/opt/sub-bot/data.json' PROTOS = ['ss://', 'vmess://', 'vless://', 'trojan://', 'hysteria2://', 'hy2://', 'tuic://'] AUTO_DEL = 60 SUB_SECRET = os.environ.get('SUB_SECRET', 'changeme') SUB_HOST = os.environ.get('SUB_HOST', 'substore.mjjtop.com') WAITING_ADD = set() # user_ids waiting to add sub def get_default_secret(): """获取默认节点池的 secret,优先用 data.json 里的,没有就用环境变量""" data = load_data() return data.get('default_secret', SUB_SECRET) def load_data(): if os.path.exists(DATA_FILE): with open(DATA_FILE) as f: d = json.load(f) d.setdefault('sub_groups', []) return d return {'subs': [], 'groups': [], 'sub_groups': []} def save_data(data): os.makedirs(os.path.dirname(DATA_FILE), exist_ok=True) with open(DATA_FILE, 'w') as f: json.dump(data, f, ensure_ascii=False, indent=2) def detect_type(link): for p in PROTOS: if link.startswith(p): return p.replace('://', '') return 'unknown' def parse_surge_ss(text): """Parse Surge format: Name = ss, server, port, encrypt-method=x, password=y""" m = re.match(r'^(.+?)\s*=\s*ss\s*,\s*([^,]+)\s*,\s*(\d+)\s*,\s*(.+)$', text.strip()) if not m: return None, None name = m.group(1).strip() server = m.group(2).strip() port = m.group(3).strip() params = dict(re.findall(r'([\w-]+)\s*=\s*([^,]+)', m.group(4))) method = params.get('encrypt-method', '').strip() pwd = params.get('password', '').strip() if not method or not pwd: return None, None # build ss:// link raw = f'{method}:{pwd}@{server}:{port}' encoded = base64.b64encode(raw.encode()).decode().rstrip('=') ss_link = f'ss://{encoded}#{urllib.request.quote(name)}' return ss_link, name def extract_name(link): if '#' in link: return urllib.request.unquote(link.split('#')[-1]) return link[:30] async def auto_del(msg, delay=AUTO_DEL): await asyncio.sleep(delay) try: await msg.delete() except: pass async def send_del(target, text, delay=AUTO_DEL, **kw): msg = await target.reply_text(text, **kw) asyncio.ensure_future(auto_del(msg, delay)) return msg async def is_member(update, context): uid = update.effective_user.id if uid == ADMIN_ID: return True data = load_data() groups = data.get('groups', []) if not groups: return True for gid in groups: try: m = await context.bot.get_chat_member(gid, uid) if m.status in ('member', 'administrator', 'creator'): return True except: pass return False def add_sub(link, user): proto = detect_type(link) if proto == 'unknown': return None data = load_data() for s in data['subs']: if s['link'] == link: return 'dup' name = extract_name(link) data['subs'].append({'link': link, 'type': proto, 'name': name, 'added_by': user.id, 'added_name': user.first_name, 'added_at': int(time.time()), 'alive': True}) save_data(data) return f'✅ 已添加 [{proto}] {name}' def add_sub_to_group(link, user, group_id): """添加节点到指定分组""" proto = detect_type(link) if proto == 'unknown': return None data = load_data() sg = next((g for g in data.get('sub_groups', []) if g['id'] == group_id), None) if not sg: return '❌ 分组不存在' for s in sg['nodes']: if s['link'] == link: return 'dup' name = extract_name(link) sg['nodes'].append({'link': link, 'type': proto, 'name': name, 'added_by': user.id, 'added_name': user.first_name, 'added_at': int(time.time()), 'alive': True}) save_data(data) return f'✅ 已添加到 [{sg["name"]}] [{proto}] {name}' # --- /help: main menu with buttons --- async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE): if not await is_member(update, context): return await update.message.reply_text('⛔ 仅限群成员使用') data = load_data() default_cnt = len([s for s in data['subs'] if s.get('alive', True)]) sg_cnt = len(data.get('sub_groups', [])) buttons = [ [InlineKeyboardButton(f"📦 默认节点池 ({default_cnt})", callback_data='menu_default')], [InlineKeyboardButton(f"📁 分组管理 ({sg_cnt}个分组)", callback_data='sg_list')], [InlineKeyboardButton("🔍 全部检测", callback_data='menu_check')], ] if update.effective_user.id == ADMIN_ID: buttons.append([InlineKeyboardButton("⚙️ 绑定当前群", callback_data='menu_setgroup')]) await send_del(update.message, '🚀 *订阅管理 Bot*\n\n' '发送订阅链接自动入库到默认节点池\n' '或通过分组管理创建独立订阅', parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) # --- menu callbacks --- async def cb_menu(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() action = q.data data = load_data() if action == 'menu_default': cnt = len(data['subs']) alive_cnt = len([s for s in data['subs'] if s.get('alive', True)]) host = SUB_HOST url = f"https://{host}/{get_default_secret()}/download?target=ClashMeta" buttons = [ [InlineKeyboardButton("📋 节点列表", callback_data='menu_list'), InlineKeyboardButton("📥 获取订阅", callback_data='menu_get')], [InlineKeyboardButton("➕ 添加节点", callback_data='menu_add'), InlineKeyboardButton("🗑 删除节点", callback_data='menu_del')], [InlineKeyboardButton("🔄 重置订阅链接", callback_data='menu_reset_secret')], [InlineKeyboardButton("◀️ 返回主菜单", callback_data='menu_home')], ] try: await q.edit_message_text( f'📦 *默认节点池*\n\n总计 {cnt} 个节点,{alive_cnt} 个可用\n🔗 订阅链接:\n`{url}`', parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, f'📦 *默认节点池*\n\n总计 {cnt} 个节点,{alive_cnt} 个可用\n🔗 订阅链接:\n`{url}`', parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) elif action == 'menu_reset_secret': new_secret = secrets.token_hex(16) data['default_secret'] = new_secret save_data(data) host = SUB_HOST url = f"https://{host}/{new_secret}/download?target=ClashMeta" buttons = [[InlineKeyboardButton("◀️ 返回", callback_data='menu_default')]] try: await q.edit_message_text(f"✅ 默认订阅链接已重置\n\n旧链接已失效\n新链接:\n`{url}`", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, f"✅ 默认订阅链接已重置\n\n旧链接已失效\n新链接:\n`{url}`", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) elif action == 'menu_home': default_cnt = len([s for s in data['subs'] if s.get('alive', True)]) sg_cnt = len(data.get('sub_groups', [])) buttons = [ [InlineKeyboardButton(f"📦 默认节点池 ({default_cnt})", callback_data='menu_default')], [InlineKeyboardButton(f"📁 分组管理 ({sg_cnt}个分组)", callback_data='sg_list')], [InlineKeyboardButton("🔍 全部检测", callback_data='menu_check')], ] try: await q.edit_message_text( '🚀 *订阅管理 Bot*\n\n发送订阅链接自动入库到默认节点池\n或通过分组管理创建独立订阅', parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) except: pass elif action == 'menu_list': if not data['subs']: return await send_del(q.message, '📭 暂无订阅') lines = [f"`{i+1}` {'🟢' if s.get('alive',True) else '🔴'} [{s['type']}] {s['name']}" for i, s in enumerate(data['subs'])] await send_del(q.message, '📋 *订阅列表*\n\n' + '\n'.join(lines), parse_mode='Markdown') elif action == 'menu_get': alive = [s for s in data['subs'] if s.get('alive', True)] if not alive: return await send_del(q.message, '📭 暂无可用订阅') types = sorted(set(s['type'] for s in alive)) buttons = [[InlineKeyboardButton(f"📦 全部 ({len(alive)})", callback_data='get_all_raw')]] for t in types: cnt = sum(1 for s in alive if s['type'] == t) buttons.append([ InlineKeyboardButton(f"🔗 {t} ({cnt})", callback_data=f'get_{t}_raw'), InlineKeyboardButton(f"📄 {t} Base64", callback_data=f'get_{t}_b64')]) buttons.append([InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data='get_all_clash')]) buttons.append([InlineKeyboardButton("🎯 自选节点", callback_data='menu_pick_multi')]) await send_del(q.message, '选择格式:', reply_markup=InlineKeyboardMarkup(buttons)) elif action == 'menu_del': if not data['subs']: return await send_del(q.message, '📭 暂无订阅') uid = q.from_user.id buttons = [] for i, s in enumerate(data['subs']): if uid != ADMIN_ID and uid != s.get('added_by'): continue st = '🟢' if s.get('alive', True) else '🔴' buttons.append([InlineKeyboardButton(f"{st} [{s['type']}] {s['name']}", callback_data=f'del_{i}')]) if not buttons: return await send_del(q.message, '没有可删除的订阅') buttons.append([InlineKeyboardButton("🗑 删除所有不可用", callback_data='del_alldown')]) await send_del(q.message, '选择要删除的:', reply_markup=InlineKeyboardMarkup(buttons)) elif action == 'menu_add': WAITING_ADD.add(q.from_user.id) await send_del(q.message, '📎 请发送订阅链接(支持多条,空格或换行分隔)\n支持: ss/vmess/vless/trojan/hy2/tuic') elif action == 'menu_check': msg = await send_del(q.message, '🔍 检测中...', delay=120) results = [] # 检测默认分组 if data['subs']: results.append('📦 *默认分组*') for s in data['subs']: ok = await check_node(s['link'], s['type']) s['alive'] = ok results.append(f"{'🟢' if ok else '🔴'} [{s['type']}] {s['name']}") # 检测所有订阅分组 for sg in data.get('sub_groups', []): if not sg.get('nodes'): continue results.append(f"\n📁 *{sg['name']}*") for s in sg['nodes']: ok = await check_node(s['link'], s['type']) s['alive'] = ok results.append(f"{'🟢' if ok else '🔴'} [{s['type']}] {s['name']}") save_data(data) try: await msg.delete() except: pass if not results: await send_del(q.message, '📭 暂无订阅') else: results.append('\n⚠️ _检测从服务器发起,专线节点可能显示不通但实际可用_') await send_del(q.message, '📊 *检测结果*\n\n' + '\n'.join(results), parse_mode='Markdown') elif action == 'menu_pick_multi': alive = [s for s in data['subs'] if s.get('alive', True)] if not alive: return await send_del(q.message, '📭 暂无可用订阅') context.user_data['multi_sel'] = set() buttons = [[InlineKeyboardButton(f"☐ {s['name']}", callback_data=f'msel_{i}')] for i, s in enumerate(alive)] buttons.append([InlineKeyboardButton("✅ 确认", callback_data='msel_done')]) await send_del(q.message, '🎯 点击选择节点:', reply_markup=InlineKeyboardMarkup(buttons), delay=120) elif action == 'menu_setgroup': if q.from_user.id != ADMIN_ID: return await send_del(q.message, '⛔ 仅管理员可用') cid = q.message.chat.id if cid not in data.get('groups', []): data.setdefault('groups', []).append(cid) save_data(data) await send_del(q.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown') # --- get/pick/del callbacks --- async def cb_getsub(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() data = load_data() alive = [s for s in data['subs'] if s.get('alive', True)] action = q.data if action.startswith('pick_'): idx = int(action.split('_')[1]) if idx < len(alive): buttons = [ [InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{idx}_raw')], [InlineKeyboardButton("⚡ Clash Meta", callback_data=f'fmt_{idx}_clash')], [InlineKeyboardButton("📎 Clash Meta 订阅URL", callback_data=f'fmt_{idx}_url')], ] await send_del(q.message, f'📍 {alive[idx]["name"]}\n选择输出格式:', reply_markup=InlineKeyboardMarkup(buttons)) return if action.startswith('fmt_'): parts = action.split('_') idx, fmt = int(parts[1]), parts[2] if idx < len(alive): s = alive[idx] if fmt == 'raw': await send_del(q.message, s["link"]) elif fmt == 'clash': cm = gen_clash_meta([s]) await send_del(q.message, f'```yaml\n{cm}\n```', parse_mode='Markdown') elif fmt == 'url': url = f'https://substore.mjjtop.com/{get_default_secret()}/download?target=ClashMeta' await send_del(q.message, f'📎 订阅URL:\n{url}') return if action.startswith('send_'): parts = action.split('_') proto, fmt = parts[1], parts[2] subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto] # ask format first if fmt == 'raw': buttons = [ [InlineKeyboardButton("🔗 原始链接", callback_data=f'out_{proto}_raw')], [InlineKeyboardButton("⚡ Clash Meta", callback_data=f'out_{proto}_clash')], [InlineKeyboardButton("📄 Base64", callback_data=f'out_{proto}_b64')], ] return await send_del(q.message, f'📦 全部 ({len(subs)}) — 选择格式:', reply_markup=InlineKeyboardMarkup(buttons)) if action.startswith('out_'): parts = action.split('_') proto, fmt = parts[1], parts[2] subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto] if fmt == 'clash': url = f'https://substore.mjjtop.com/{get_default_secret()}/download?target=ClashMeta' if proto != 'all': url += f'&type={proto}' return await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}') links = '\n'.join(s['link'] for s in subs) if fmt == 'b64': return await send_del(q.message, f'```\n{base64.b64encode(links.encode()).decode()}\n```', parse_mode='Markdown') if len(links) > 4000: for s in subs: await send_del(q.message, s['link']) else: await send_del(q.message, links) return if action == 'get_all_clash': url = f'https://substore.mjjtop.com/{get_default_secret()}/download?target=ClashMeta' await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}') return parts = action.split('_') proto, fmt = parts[1], parts[2] subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto] if not subs: return await send_del(q.message, '📭 无匹配') if fmt == 'raw' and len(subs) > 1: buttons = [] for i, s in enumerate(subs): gi = alive.index(s) buttons.append([InlineKeyboardButton( f"{'🟢' if s.get('alive',True) else '🔴'} {s['name']}", callback_data=f'pick_{gi}')]) buttons.append([InlineKeyboardButton(f"📦 全部 ({len(subs)})", callback_data=f'send_{proto}_raw')]) return await send_del(q.message, '选择节点:', reply_markup=InlineKeyboardMarkup(buttons)) if fmt == 'raw' and len(subs) == 1: buttons = [ [InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{alive.index(subs[0])}_raw')], [InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data=f'fmt_{alive.index(subs[0])}_clash')], ] return await send_del(q.message, f'📍 {subs[0]["name"]}\n选择输出格式:', reply_markup=InlineKeyboardMarkup(buttons)) links = '\n'.join(s['link'] for s in subs) if fmt == 'b64': await send_del(q.message, base64.b64encode(links.encode()).decode()) else: await send_del(q.message, links) async def cb_multisel(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() data = load_data() alive = [s for s in data['subs'] if s.get('alive', True)] sel = context.user_data.get('multi_sel', set()) action = q.data if action == 'msel_done': if not sel: return await send_del(q.message, '⚠️ 未选择任何节点') chosen = [alive[i] for i in sorted(sel) if i < len(alive)] if not chosen: return await send_del(q.message, '⚠️ 选择已失效') context.user_data['multi_sel'] = set() names = ', '.join(s['name'] for s in chosen) buttons = [ [InlineKeyboardButton("🔗 原始链接", callback_data='mout_raw')], [InlineKeyboardButton("⚡ Clash Meta", callback_data='mout_clash')], [InlineKeyboardButton("📎 订阅URL", callback_data='mout_url')], [InlineKeyboardButton("📄 Base64", callback_data='mout_b64')], ] context.user_data['multi_chosen'] = chosen await send_del(q.message, f'已选 {len(chosen)} 个: {names}\n选择输出格式:', reply_markup=InlineKeyboardMarkup(buttons)) return idx = int(action.split('_')[1]) if idx in sel: sel.discard(idx) else: sel.add(idx) context.user_data['multi_sel'] = sel buttons = [] for i, s in enumerate(alive): mark = '☑' if i in sel else '☐' buttons.append([InlineKeyboardButton(f"{mark} {s['name']}", callback_data=f'msel_{i}')]) buttons.append([InlineKeyboardButton(f"✅ 确认 ({len(sel)})", callback_data='msel_done')]) try: await q.message.edit_text('🎯 点击选择节点:', reply_markup=InlineKeyboardMarkup(buttons)) except: pass async def cb_multiout(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() chosen = context.user_data.get('multi_chosen', []) if not chosen: return await send_del(q.message, '⚠️ 选择已过期,请重新操作') fmt = q.data.split('_')[1] if fmt == 'raw': links = '\n'.join(s['link'] for s in chosen) await send_del(q.message, links) elif fmt == 'clash': cm = gen_clash_meta(chosen) await send_del(q.message, f'```yaml\n{cm}\n```', parse_mode='Markdown') elif fmt == 'url': names = ','.join(urllib.request.quote(s['name']) for s in chosen) url = f'https://substore.mjjtop.com/{get_default_secret()}/download?target=ClashMeta&name={names}' await send_del(q.message, f'📎 订阅URL:\n{url}') elif fmt == 'b64': links = '\n'.join(s['link'] for s in chosen) await send_del(q.message, f'```\n{base64.b64encode(links.encode()).decode()}\n```', parse_mode='Markdown') context.user_data.pop('multi_chosen', None) async def cb_delsub(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() data = load_data() action = q.data uid = q.from_user.id if action == 'del_alldown': dead = [s for s in data['subs'] if not s.get('alive', True)] if uid != ADMIN_ID: dead = [s for s in dead if s.get('added_by') == uid] if not dead: return await send_del(q.message, '没有不可用节点') names = [s['name'] for s in dead] for s in dead: data['subs'].remove(s) save_data(data) return await send_del(q.message, f"🗑 已删除 {len(names)} 个:\n" + '\n'.join(f'• {n}' for n in names)) idx = int(action.split('_')[1]) if idx >= len(data['subs']): return await send_del(q.message, '❌ 已失效') s = data['subs'][idx] if uid != ADMIN_ID and uid != s.get('added_by'): return await send_del(q.message, '⛔ 只能删除自己上传的') removed = data['subs'].pop(idx) save_data(data) await send_del(q.message, f"🗑 已删除: {removed['name']}") # --- auto detect links in messages --- async def auto_detect(update: Update, context: ContextTypes.DEFAULT_TYPE): if not update.message or not update.message.text: return text = update.message.text.strip() uid = update.effective_user.id # --- 分组相关文本输入处理 --- # 创建分组:等待输入名称 if context.user_data.get('waiting_sg_name'): del context.user_data['waiting_sg_name'] data = load_data() gid = secrets.token_hex(4) sec = secrets.token_hex(16) data['sub_groups'].append({ 'id': gid, 'name': text, 'secret': sec, 'nodes': [], 'created_at': int(time.time()) }) save_data(data) host = SUB_HOST url = f"https://{host}/{sec}/download?target=ClashMeta" return await send_del(update.message, f"✅ 分组 *{text}* 已创建\n\n🔗 订阅链接:\n`{url}`", parse_mode='Markdown') # 重命名分组:等待输入新名称 if context.user_data.get('waiting_sg_rename'): gid = context.user_data.pop('waiting_sg_rename') data = load_data() sg = next((g for g in data.get('sub_groups', []) if g['id'] == gid), None) if sg: old = sg['name'] sg['name'] = text save_data(data) return await send_del(update.message, f"✅ 分组已重命名: {old} → {text}") return await send_del(update.message, '❌ 分组不存在') # 添加节点到分组:等待输入链接 if context.user_data.get('waiting_sg_add'): gid = context.user_data.pop('waiting_sg_add') found = [w for w in text.split() if any(w.startswith(p) for p in PROTOS)] surge_links = [] if not found: for line in text.split('\n'): ss_link, name = parse_surge_ss(line) if ss_link: surge_links.append(ss_link) if not found and not surge_links: return await send_del(update.message, '❌ 未识别到订阅链接') results = [] for link in found + surge_links: r = add_sub_to_group(link, update.effective_user, gid) if r and r != 'dup': results.append(r) elif r == 'dup': results.append(f'⚠️ 已存在: {extract_name(link)}') if results: return await send_del(update.message, '\n'.join(results)) return # --- 原有逻辑 --- # check standard protocol links found = [w for w in text.split() if any(w.startswith(p) for p in PROTOS)] # check Surge format (line-based) surge_links = [] if not found: for line in text.split('\n'): ss_link, name = parse_surge_ss(line) if ss_link: surge_links.append(ss_link) if not found and not surge_links: if uid in WAITING_ADD: WAITING_ADD.discard(uid) await send_del(update.message, '❌ 未识别到订阅链接') return if not await is_member(update, context): return WAITING_ADD.discard(uid) results = [] for link in found + surge_links: r = add_sub(link, update.effective_user) if r and r != 'dup': results.append(r) elif r == 'dup': results.append(f'⚠️ 已存在: {extract_name(link)}') if results: await send_del(update.message, '\n'.join(results)) # --- node health check --- async def check_node(link, proto): try: server, port = parse_sp(link, proto) if not server: return False _, w = await asyncio.wait_for(asyncio.open_connection(server, int(port)), timeout=5) w.close(); await w.wait_closed() return True except: return False def parse_sp(link, proto): try: if proto == "ss": raw = link[5:] if "#" in raw: raw = raw.split("#")[0] if "@" in raw: sp = raw.split("@")[-1] return sp.rsplit(":", 1) try: decoded = base64.b64decode(raw + "==").decode() _, rest = decoded.split(":", 1) _, sp = rest.rsplit("@", 1) return sp.rsplit(":", 1) except: pass m = re.search(r'@([^:/?#]+):(\d+)', link) if m: return m.group(1), m.group(2) except: pass return None, None def parse_to_clash_proxy(s): try: if s['type'] == 'ss': raw = s['link'][5:] name = '' if '#' in raw: raw, name = raw.rsplit('#', 1); name = urllib.request.unquote(name) if "@" in raw: userinfo, sp = raw.rsplit("@", 1) server, port = sp.rsplit(":", 1) decoded = base64.b64decode(userinfo + "==").decode() method, pwd = decoded.split(":", 1) else: decoded = base64.b64decode(raw + "==").decode() method, rest = decoded.split(":", 1) pwd, sp = rest.rsplit("@", 1) server, port = sp.rsplit(":", 1) return {'name': name or server, 'type': 'ss', 'server': server, 'port': int(port), 'cipher': method, 'password': pwd} except: pass return None def gen_clash_meta(subs): lines = ['proxies:'] for s in subs: p = parse_to_clash_proxy(s) if p: lines.append(f' - {json.dumps(p, ensure_ascii=False)}') return '\n'.join(lines) # --- HTTP subscription endpoint --- class SubHandler(BaseHTTPRequestHandler): def do_GET(self): path = self.path.split('?')[0] # 解析查询参数 qs = {} if '?' in self.path: qs = dict(x.split('=',1) for x in self.path.split('?',1)[1].split('&') if '=' in x) # 匹配默认分组或自定义分组 alive = [] if path == f'/{get_default_secret()}/download': data = load_data() alive = [s for s in data['subs'] if s.get('alive', True)] else: # 尝试匹配分组 secret parts = path.strip('/').split('/') if len(parts) == 2 and parts[1] == 'download': group_secret = parts[0] data = load_data() sg = next((g for g in data.get('sub_groups', []) if g['secret'] == group_secret), None) if sg: alive = [s for s in sg.get('nodes', []) if s.get('alive', True)] else: self.send_response(404) self.end_headers() return else: self.send_response(404) self.end_headers() return ftype = qs.get('type', '') if ftype: alive = [s for s in alive if s['type'] == ftype] fname = qs.get('name', '') if fname: names = [urllib.request.unquote(n) for n in fname.split(',')] alive = [s for s in alive if s['name'] in names] target = qs.get('target', 'ClashMeta') body = gen_clash_meta(alive) if target == 'ClashMeta' else '\n'.join(s['link'] for s in alive) self.send_response(200) self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write(body.encode()) def log_message(self, *a): pass def start_http(): srv = HTTPServer(('0.0.0.0', 18888), SubHandler) log.info('Sub HTTP on :18888') srv.serve_forever() # --- legacy commands still work --- async def cmd_setgroup(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id != ADMIN_ID: return await update.message.reply_text('⛔ 仅管理员可用') cid = update.effective_chat.id data = load_data() if cid not in data.get('groups', []): data.setdefault('groups', []).append(cid) save_data(data) await send_del(update.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown') # --- 分组管理回调 --- async def cb_subgroups(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() action = q.data data = load_data() sgs = data.get('sub_groups', []) if action == 'sg_list': buttons = [] for sg in sgs: cnt = len(sg.get('nodes', [])) buttons.append([InlineKeyboardButton(f"📁 {sg['name']} ({cnt}个节点)", callback_data=f"sg_detail_{sg['id']}")]) buttons.append([InlineKeyboardButton("➕ 创建分组", callback_data='sg_create')]) buttons.append([InlineKeyboardButton("◀️ 返回主菜单", callback_data='sg_back')]) text = f"📁 *分组管理*\n\n共 {len(sgs)} 个分组" if sgs else "📁 *分组管理*\n\n暂无分组,点击创建" try: await q.edit_message_text(text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) elif action == 'sg_create': context.user_data['waiting_sg_name'] = True try: await q.edit_message_text('📝 请发送分组名称:') except: await send_del(q.message, '📝 请发送分组名称:') elif action.startswith('sg_detail_'): gid = action[len('sg_detail_'):] sg = next((g for g in sgs if g['id'] == gid), None) if not sg: return await send_del(q.message, '❌ 分组不存在') cnt = len(sg.get('nodes', [])) host = SUB_HOST url = f"https://{host}/{sg['secret']}/download?target=ClashMeta" text = (f"📁 *{sg['name']}*\n\n" f"📊 节点数: {cnt}\n" f"🔗 订阅链接:\n`{url}`") buttons = [ [InlineKeyboardButton("📋 节点列表", callback_data=f"sg_nodes_{gid}"), InlineKeyboardButton("➕ 添加节点", callback_data=f"sg_add_{gid}")], [InlineKeyboardButton("🗑 删除节点", callback_data=f"sg_delnodes_{gid}"), InlineKeyboardButton("🔍 检测节点", callback_data=f"sg_check_{gid}")], [InlineKeyboardButton("📥 获取订阅", callback_data=f"sg_getsub_{gid}"), InlineKeyboardButton("🔄 重置链接", callback_data=f"sg_reset_{gid}")], [InlineKeyboardButton("✏️ 重命名", callback_data=f"sg_rename_{gid}"), InlineKeyboardButton("🗑 删除分组", callback_data=f"sg_delconfirm_{gid}")], [InlineKeyboardButton("◀️ 返回分组列表", callback_data='sg_list')], ] try: await q.edit_message_text(text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) elif action.startswith('sg_nodes_'): gid = action[len('sg_nodes_'):] sg = next((g for g in sgs if g['id'] == gid), None) if not sg: return nodes = sg.get('nodes', []) if not nodes: text = f"📁 *{sg['name']}* — 暂无节点" else: lines = [f"📁 *{sg['name']}* 节点列表\n"] for i, s in enumerate(nodes): st = '🟢' if s.get('alive', True) else '🔴' lines.append(f"`{i+1}` {st} [{s['type']}] {s['name']}") text = '\n'.join(lines) buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]] try: await q.edit_message_text(text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) elif action.startswith('sg_add_'): gid = action[len('sg_add_'):] context.user_data['waiting_sg_add'] = gid sg = next((g for g in sgs if g['id'] == gid), None) name = sg['name'] if sg else '?' try: await q.edit_message_text(f'📎 请发送要添加到 [{name}] 的订阅链接:') except: await send_del(q.message, f'📎 请发送要添加到 [{name}] 的订阅链接:') elif action.startswith('sg_check_'): gid = action[len('sg_check_'):] sg = next((g for g in sgs if g['id'] == gid), None) if not sg: return nodes = sg.get('nodes', []) if not nodes: buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]] return await send_del(q.message, f"📁 *{sg['name']}* 暂无节点", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) msg = await send_del(q.message, f"🔍 检测 *{sg['name']}* 中...", parse_mode='Markdown', delay=120) results = [] for s in nodes: ok = await check_node(s['link'], s['type']) s['alive'] = ok results.append(f"{'🟢' if ok else '🔴'} [{s['type']}] {s['name']}") save_data(data) try: await msg.delete() except: pass buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]] await send_del(q.message, f"📊 *{sg['name']}* 检测结果\n\n" + '\n'.join(results), parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) elif action.startswith('sg_delnodes_'): gid = action[len('sg_delnodes_'):] sg = next((g for g in sgs if g['id'] == gid), None) if not sg: return nodes = sg.get('nodes', []) if not nodes: buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]] return await send_del(q.message, f"📁 *{sg['name']}* 暂无节点", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) buttons = [] for i, s in enumerate(nodes): st = '🟢' if s.get('alive', True) else '🔴' buttons.append([InlineKeyboardButton( f"{st} [{s['type']}] {s['name']}", callback_data=f"sg_delnode_{gid}_{i}")]) buttons.append([InlineKeyboardButton("🗑 删除所有不可用", callback_data=f"sg_deldown_{gid}")]) buttons.append([InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]) try: await q.edit_message_text(f"🗑 *{sg['name']}* — 选择要删除的节点:", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, f"🗑 *{sg['name']}* — 选择要删除的节点:", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) elif action.startswith('sg_delnode_'): # sg_delnode_{gid}_{idx} parts = action[len('sg_delnode_'):].rsplit('_', 1) if len(parts) != 2: return gid, idx_str = parts sg = next((g for g in sgs if g['id'] == gid), None) if not sg: return idx = int(idx_str) nodes = sg.get('nodes', []) if idx >= len(nodes): return await send_del(q.message, '❌ 已失效') removed = nodes.pop(idx) save_data(data) buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]] try: await q.edit_message_text(f"✅ 已删除: [{removed['type']}] {removed['name']}", reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, f"✅ 已删除: [{removed['type']}] {removed['name']}", reply_markup=InlineKeyboardMarkup(buttons)) elif action.startswith('sg_deldown_'): gid = action[len('sg_deldown_'):] sg = next((g for g in sgs if g['id'] == gid), None) if not sg: return dead = [s for s in sg.get('nodes', []) if not s.get('alive', True)] if not dead: buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]] return await send_del(q.message, '没有不可用节点', reply_markup=InlineKeyboardMarkup(buttons)) names = [s['name'] for s in dead] sg['nodes'] = [s for s in sg['nodes'] if s.get('alive', True)] save_data(data) buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]] try: await q.edit_message_text(f"🗑 已删除 {len(names)} 个不可用:\n" + '\n'.join(f'• {n}' for n in names), reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, f"🗑 已删除 {len(names)} 个不可用:\n" + '\n'.join(f'• {n}' for n in names), reply_markup=InlineKeyboardMarkup(buttons)) elif action.startswith('sg_getsub_'): gid = action[len('sg_getsub_'):] sg = next((g for g in sgs if g['id'] == gid), None) if not sg: return host = SUB_HOST url = f"https://{host}/{sg['secret']}/download?target=ClashMeta" buttons = [ [InlineKeyboardButton("📋 复制链接", callback_data=f"sg_detail_{gid}")], [InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")], ] await send_del(q.message, f"📎 *{sg['name']}* 订阅链接:\n\n`{url}`", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) elif action.startswith('sg_reset_'): gid = action[len('sg_reset_'):] sg = next((g for g in sgs if g['id'] == gid), None) if not sg: return sg['secret'] = secrets.token_hex(16) save_data(data) host = SUB_HOST url = f"https://{host}/{sg['secret']}/download?target=ClashMeta" buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]] try: await q.edit_message_text(f"✅ 链接已重置\n\n新链接:\n`{url}`", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, f"✅ 链接已重置\n\n新链接:\n`{url}`", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) elif action.startswith('sg_rename_'): gid = action[len('sg_rename_'):] context.user_data['waiting_sg_rename'] = gid try: await q.edit_message_text('✏️ 请发送新的分组名称:') except: await send_del(q.message, '✏️ 请发送新的分组名称:') elif action.startswith('sg_delconfirm_'): gid = action[len('sg_delconfirm_'):] sg = next((g for g in sgs if g['id'] == gid), None) if not sg: return buttons = [ [InlineKeyboardButton("⚠️ 确认删除", callback_data=f"sg_dodel_{gid}"), InlineKeyboardButton("取消", callback_data=f"sg_detail_{gid}")], ] try: await q.edit_message_text(f"⚠️ 确认删除分组 *{sg['name']}*?\n节点数据将全部丢失!", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, f"⚠️ 确认删除分组 *{sg['name']}*?", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) elif action.startswith('sg_dodel_'): gid = action[len('sg_dodel_'):] data['sub_groups'] = [g for g in sgs if g['id'] != gid] save_data(data) buttons = [[InlineKeyboardButton("◀️ 返回分组列表", callback_data='sg_list')]] try: await q.edit_message_text("✅ 分组已删除", reply_markup=InlineKeyboardMarkup(buttons)) except: await send_del(q.message, "✅ 分组已删除", reply_markup=InlineKeyboardMarkup(buttons)) elif action == 'sg_back': # 返回主菜单 default_cnt = len([s for s in data['subs'] if s.get('alive', True)]) sg_cnt = len(data.get('sub_groups', [])) buttons = [ [InlineKeyboardButton(f"📦 默认节点池 ({default_cnt})", callback_data='menu_default')], [InlineKeyboardButton(f"📁 分组管理 ({sg_cnt}个分组)", callback_data='sg_list')], [InlineKeyboardButton("🔍 全部检测", callback_data='menu_check')], ] try: await q.edit_message_text( '🚀 *订阅管理 Bot*\n\n发送订阅链接自动入库到默认节点池\n或通过分组管理创建独立订阅', parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) except: pass async def auto_cleanup(bot_token): """Every 6h: check all nodes, update alive status only (no auto-delete)""" await asyncio.sleep(60) while True: data = load_data() changed = False # 默认分组:只更新状态 for s in data['subs']: ok = await check_node(s['link'], s['type']) if s.get('alive', True) != ok: s['alive'] = ok changed = True # 订阅分组:只更新状态 for sg in data.get('sub_groups', []): for s in sg.get('nodes', []): ok = await check_node(s['link'], s['type']) if s.get('alive', True) != ok: s['alive'] = ok changed = True if changed: save_data(data) log.info('Auto check: updated node status') await asyncio.sleep(21600) def run_cleanup(): asyncio.run(auto_cleanup(TOKEN)) def main(): app = Application.builder().token(TOKEN).build() app.add_handler(CommandHandler('start', cmd_help)) app.add_handler(CommandHandler('vps', cmd_help)) app.add_handler(CommandHandler('setgroup', cmd_setgroup)) app.add_handler(CallbackQueryHandler(cb_menu, pattern='^menu_')) app.add_handler(CallbackQueryHandler(cb_delsub, pattern='^del_')) app.add_handler(CallbackQueryHandler(cb_multisel, pattern='^msel_')) app.add_handler(CallbackQueryHandler(cb_multiout, pattern='^mout_')) app.add_handler(CallbackQueryHandler(cb_getsub, pattern='^(get_|pick_|send_|fmt_|out_)')) app.add_handler(CallbackQueryHandler(cb_subgroups, pattern='^sg_')) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, auto_detect)) log.info('Sub Bot starting polling...') threading.Thread(target=start_http, daemon=True).start() threading.Thread(target=run_cleanup, daemon=True).start() app.run_polling(drop_pending_updates=True) if __name__ == '__main__': main()