Files
sub-bot/bot.py

958 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os, json, time, base64, asyncio, re, urllib.request, logging, threading, secrets
from http.server import HTTPServer, BaseHTTPRequestHandler
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
log = logging.getLogger(__name__)
TOKEN = os.environ.get('BOT_TOKEN', '')
ADMIN_ID = int(os.environ.get('ADMIN_ID', '0'))
DATA_FILE = '/opt/sub-bot/data.json'
PROTOS = ['ss://', 'vmess://', 'vless://', 'trojan://', 'hysteria2://', 'hy2://', 'tuic://']
AUTO_DEL = 60
SUB_SECRET = os.environ.get('SUB_SECRET', 'changeme')
SUB_HOST = os.environ.get('SUB_HOST', 'substore.mjjtop.com')
WAITING_ADD = set() # user_ids waiting to add sub
def load_data():
if os.path.exists(DATA_FILE):
with open(DATA_FILE) as f:
d = json.load(f)
d.setdefault('sub_groups', [])
return d
return {'subs': [], 'groups': [], 'sub_groups': []}
def save_data(data):
os.makedirs(os.path.dirname(DATA_FILE), exist_ok=True)
with open(DATA_FILE, 'w') as f: json.dump(data, f, ensure_ascii=False, indent=2)
def detect_type(link):
for p in PROTOS:
if link.startswith(p): return p.replace('://', '')
return 'unknown'
def parse_surge_ss(text):
"""Parse Surge format: Name = ss, server, port, encrypt-method=x, password=y"""
m = re.match(r'^(.+?)\s*=\s*ss\s*,\s*([^,]+)\s*,\s*(\d+)\s*,\s*(.+)$', text.strip())
if not m: return None, None
name = m.group(1).strip()
server = m.group(2).strip()
port = m.group(3).strip()
params = dict(re.findall(r'([\w-]+)\s*=\s*([^,]+)', m.group(4)))
method = params.get('encrypt-method', '').strip()
pwd = params.get('password', '').strip()
if not method or not pwd: return None, None
# build ss:// link
raw = f'{method}:{pwd}@{server}:{port}'
encoded = base64.b64encode(raw.encode()).decode().rstrip('=')
ss_link = f'ss://{encoded}#{urllib.request.quote(name)}'
return ss_link, name
def extract_name(link):
if '#' in link: return urllib.request.unquote(link.split('#')[-1])
return link[:30]
async def auto_del(msg, delay=AUTO_DEL):
await asyncio.sleep(delay)
try: await msg.delete()
except: pass
async def send_del(target, text, delay=AUTO_DEL, **kw):
msg = await target.reply_text(text, **kw)
asyncio.ensure_future(auto_del(msg, delay))
return msg
async def is_member(update, context):
uid = update.effective_user.id
if uid == ADMIN_ID: return True
data = load_data()
groups = data.get('groups', [])
if not groups: return True
for gid in groups:
try:
m = await context.bot.get_chat_member(gid, uid)
if m.status in ('member', 'administrator', 'creator'): return True
except: pass
return False
def add_sub(link, user):
proto = detect_type(link)
if proto == 'unknown': return None
data = load_data()
for s in data['subs']:
if s['link'] == link: return 'dup'
name = extract_name(link)
data['subs'].append({'link': link, 'type': proto, 'name': name,
'added_by': user.id, 'added_name': user.first_name,
'added_at': int(time.time()), 'alive': True})
save_data(data)
return f'✅ 已添加 [{proto}] {name}'
def add_sub_to_group(link, user, group_id):
"""添加节点到指定分组"""
proto = detect_type(link)
if proto == 'unknown': return None
data = load_data()
sg = next((g for g in data.get('sub_groups', []) if g['id'] == group_id), None)
if not sg: return '❌ 分组不存在'
for s in sg['nodes']:
if s['link'] == link: return 'dup'
name = extract_name(link)
sg['nodes'].append({'link': link, 'type': proto, 'name': name,
'added_by': user.id, 'added_name': user.first_name,
'added_at': int(time.time()), 'alive': True})
save_data(data)
return f'✅ 已添加到 [{sg["name"]}] [{proto}] {name}'
# --- /help: main menu with buttons ---
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await is_member(update, context):
return await update.message.reply_text('⛔ 仅限群成员使用')
data = load_data()
default_cnt = len([s for s in data['subs'] if s.get('alive', True)])
sg_cnt = len(data.get('sub_groups', []))
buttons = [
[InlineKeyboardButton(f"📦 默认节点池 ({default_cnt})", callback_data='menu_default')],
[InlineKeyboardButton(f"📁 分组管理 ({sg_cnt}个分组)", callback_data='sg_list')],
[InlineKeyboardButton("🔍 全部检测", callback_data='menu_check')],
]
if update.effective_user.id == ADMIN_ID:
buttons.append([InlineKeyboardButton("⚙️ 绑定当前群", callback_data='menu_setgroup')])
await send_del(update.message,
'🚀 *订阅管理 Bot*\n\n'
'发送订阅链接自动入库到默认节点池\n'
'或通过分组管理创建独立订阅',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
# --- menu callbacks ---
async def cb_menu(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
action = q.data
data = load_data()
if action == 'menu_default':
cnt = len(data['subs'])
alive_cnt = len([s for s in data['subs'] if s.get('alive', True)])
buttons = [
[InlineKeyboardButton("📋 节点列表", callback_data='menu_list'),
InlineKeyboardButton("📥 获取订阅", callback_data='menu_get')],
[InlineKeyboardButton(" 添加节点", callback_data='menu_add'),
InlineKeyboardButton("🗑 删除节点", callback_data='menu_del')],
[InlineKeyboardButton("◀️ 返回主菜单", callback_data='menu_home')],
]
try:
await q.edit_message_text(
f'📦 *默认节点池*\n\n总计 {cnt} 个节点,{alive_cnt} 个可用',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message,
f'📦 *默认节点池*\n\n总计 {cnt} 个节点,{alive_cnt} 个可用',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_home':
default_cnt = len([s for s in data['subs'] if s.get('alive', True)])
sg_cnt = len(data.get('sub_groups', []))
buttons = [
[InlineKeyboardButton(f"📦 默认节点池 ({default_cnt})", callback_data='menu_default')],
[InlineKeyboardButton(f"📁 分组管理 ({sg_cnt}个分组)", callback_data='sg_list')],
[InlineKeyboardButton("🔍 全部检测", callback_data='menu_check')],
]
try:
await q.edit_message_text(
'🚀 *订阅管理 Bot*\n\n发送订阅链接自动入库到默认节点池\n或通过分组管理创建独立订阅',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except: pass
elif action == 'menu_list':
if not data['subs']:
return await send_del(q.message, '📭 暂无订阅')
lines = [f"`{i+1}` {'🟢' if s.get('alive',True) else '🔴'} [{s['type']}] {s['name']}"
for i, s in enumerate(data['subs'])]
await send_del(q.message, '📋 *订阅列表*\n\n' + '\n'.join(lines), parse_mode='Markdown')
elif action == 'menu_get':
alive = [s for s in data['subs'] if s.get('alive', True)]
if not alive:
return await send_del(q.message, '📭 暂无可用订阅')
types = sorted(set(s['type'] for s in alive))
buttons = [[InlineKeyboardButton(f"📦 全部 ({len(alive)})", callback_data='get_all_raw')]]
for t in types:
cnt = sum(1 for s in alive if s['type'] == t)
buttons.append([
InlineKeyboardButton(f"🔗 {t} ({cnt})", callback_data=f'get_{t}_raw'),
InlineKeyboardButton(f"📄 {t} Base64", callback_data=f'get_{t}_b64')])
buttons.append([InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data='get_all_clash')])
buttons.append([InlineKeyboardButton("🎯 自选节点", callback_data='menu_pick_multi')])
await send_del(q.message, '选择格式:', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_del':
if not data['subs']:
return await send_del(q.message, '📭 暂无订阅')
uid = q.from_user.id
buttons = []
for i, s in enumerate(data['subs']):
if uid != ADMIN_ID and uid != s.get('added_by'): continue
st = '🟢' if s.get('alive', True) else '🔴'
buttons.append([InlineKeyboardButton(f"{st} [{s['type']}] {s['name']}", callback_data=f'del_{i}')])
if not buttons:
return await send_del(q.message, '没有可删除的订阅')
buttons.append([InlineKeyboardButton("🗑 删除所有不可用", callback_data='del_alldown')])
await send_del(q.message, '选择要删除的:', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_add':
WAITING_ADD.add(q.from_user.id)
await send_del(q.message, '📎 请发送订阅链接(支持多条,空格或换行分隔)\n支持: ss/vmess/vless/trojan/hy2/tuic')
elif action == 'menu_check':
msg = await send_del(q.message, '🔍 检测中...', delay=120)
results = []
# 检测默认分组
if data['subs']:
results.append('📦 *默认分组*')
for s in data['subs']:
ok = await check_node(s['link'], s['type'])
s['alive'] = ok
results.append(f"{'🟢' if ok else '🔴'} [{s['type']}] {s['name']}")
# 检测所有订阅分组
for sg in data.get('sub_groups', []):
if not sg.get('nodes'): continue
results.append(f"\n📁 *{sg['name']}*")
for s in sg['nodes']:
ok = await check_node(s['link'], s['type'])
s['alive'] = ok
results.append(f"{'🟢' if ok else '🔴'} [{s['type']}] {s['name']}")
save_data(data)
try: await msg.delete()
except: pass
if not results:
await send_del(q.message, '📭 暂无订阅')
else:
await send_del(q.message, '📊 *检测结果*\n\n' + '\n'.join(results), parse_mode='Markdown')
elif action == 'menu_pick_multi':
alive = [s for s in data['subs'] if s.get('alive', True)]
if not alive:
return await send_del(q.message, '📭 暂无可用订阅')
context.user_data['multi_sel'] = set()
buttons = [[InlineKeyboardButton(f"{s['name']}", callback_data=f'msel_{i}')] for i, s in enumerate(alive)]
buttons.append([InlineKeyboardButton("✅ 确认", callback_data='msel_done')])
await send_del(q.message, '🎯 点击选择节点:', reply_markup=InlineKeyboardMarkup(buttons), delay=120)
elif action == 'menu_setgroup':
if q.from_user.id != ADMIN_ID:
return await send_del(q.message, '⛔ 仅管理员可用')
cid = q.message.chat.id
if cid not in data.get('groups', []):
data.setdefault('groups', []).append(cid)
save_data(data)
await send_del(q.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown')
# --- get/pick/del callbacks ---
async def cb_getsub(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = load_data()
alive = [s for s in data['subs'] if s.get('alive', True)]
action = q.data
if action.startswith('pick_'):
idx = int(action.split('_')[1])
if idx < len(alive):
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{idx}_raw')],
[InlineKeyboardButton("⚡ Clash Meta", callback_data=f'fmt_{idx}_clash')],
[InlineKeyboardButton("📎 Clash Meta 订阅URL", callback_data=f'fmt_{idx}_url')],
]
await send_del(q.message, f'📍 {alive[idx]["name"]}\n选择输出格式:',
reply_markup=InlineKeyboardMarkup(buttons))
return
if action.startswith('fmt_'):
parts = action.split('_')
idx, fmt = int(parts[1]), parts[2]
if idx < len(alive):
s = alive[idx]
if fmt == 'raw':
await send_del(q.message, s["link"])
elif fmt == 'clash':
cm = gen_clash_meta([s])
await send_del(q.message, f'```yaml\n{cm}\n```', parse_mode='Markdown')
elif fmt == 'url':
url = f'https://substore.mjjtop.com/{SUB_SECRET}/download?target=ClashMeta'
await send_del(q.message, f'📎 订阅URL:\n{url}')
return
if action.startswith('send_'):
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
# ask format first
if fmt == 'raw':
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'out_{proto}_raw')],
[InlineKeyboardButton("⚡ Clash Meta", callback_data=f'out_{proto}_clash')],
[InlineKeyboardButton("📄 Base64", callback_data=f'out_{proto}_b64')],
]
return await send_del(q.message, f'📦 全部 ({len(subs)}) — 选择格式:',
reply_markup=InlineKeyboardMarkup(buttons))
if action.startswith('out_'):
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
if fmt == 'clash':
url = f'https://substore.mjjtop.com/{SUB_SECRET}/download?target=ClashMeta'
if proto != 'all': url += f'&type={proto}'
return await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}')
links = '\n'.join(s['link'] for s in subs)
if fmt == 'b64':
return await send_del(q.message, f'```\n{base64.b64encode(links.encode()).decode()}\n```', parse_mode='Markdown')
if len(links) > 4000:
for s in subs:
await send_del(q.message, s['link'])
else:
await send_del(q.message, links)
return
if action == 'get_all_clash':
url = f'https://substore.mjjtop.com/{SUB_SECRET}/download?target=ClashMeta'
await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}')
return
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
if not subs:
return await send_del(q.message, '📭 无匹配')
if fmt == 'raw' and len(subs) > 1:
buttons = []
for i, s in enumerate(subs):
gi = alive.index(s)
buttons.append([InlineKeyboardButton(
f"{'🟢' if s.get('alive',True) else '🔴'} {s['name']}", callback_data=f'pick_{gi}')])
buttons.append([InlineKeyboardButton(f"📦 全部 ({len(subs)})", callback_data=f'send_{proto}_raw')])
return await send_del(q.message, '选择节点:', reply_markup=InlineKeyboardMarkup(buttons))
if fmt == 'raw' and len(subs) == 1:
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{alive.index(subs[0])}_raw')],
[InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data=f'fmt_{alive.index(subs[0])}_clash')],
]
return await send_del(q.message, f'📍 {subs[0]["name"]}\n选择输出格式:',
reply_markup=InlineKeyboardMarkup(buttons))
links = '\n'.join(s['link'] for s in subs)
if fmt == 'b64':
await send_del(q.message, base64.b64encode(links.encode()).decode())
else:
await send_del(q.message, links)
async def cb_multisel(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = load_data()
alive = [s for s in data['subs'] if s.get('alive', True)]
sel = context.user_data.get('multi_sel', set())
action = q.data
if action == 'msel_done':
if not sel:
return await send_del(q.message, '⚠️ 未选择任何节点')
chosen = [alive[i] for i in sorted(sel) if i < len(alive)]
if not chosen:
return await send_del(q.message, '⚠️ 选择已失效')
context.user_data['multi_sel'] = set()
names = ', '.join(s['name'] for s in chosen)
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data='mout_raw')],
[InlineKeyboardButton("⚡ Clash Meta", callback_data='mout_clash')],
[InlineKeyboardButton("📎 订阅URL", callback_data='mout_url')],
[InlineKeyboardButton("📄 Base64", callback_data='mout_b64')],
]
context.user_data['multi_chosen'] = chosen
await send_del(q.message, f'已选 {len(chosen)} 个: {names}\n选择输出格式:',
reply_markup=InlineKeyboardMarkup(buttons))
return
idx = int(action.split('_')[1])
if idx in sel:
sel.discard(idx)
else:
sel.add(idx)
context.user_data['multi_sel'] = sel
buttons = []
for i, s in enumerate(alive):
mark = '' if i in sel else ''
buttons.append([InlineKeyboardButton(f"{mark} {s['name']}", callback_data=f'msel_{i}')])
buttons.append([InlineKeyboardButton(f"✅ 确认 ({len(sel)})", callback_data='msel_done')])
try:
await q.message.edit_text('🎯 点击选择节点:', reply_markup=InlineKeyboardMarkup(buttons))
except: pass
async def cb_multiout(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
chosen = context.user_data.get('multi_chosen', [])
if not chosen:
return await send_del(q.message, '⚠️ 选择已过期,请重新操作')
fmt = q.data.split('_')[1]
if fmt == 'raw':
links = '\n'.join(s['link'] for s in chosen)
await send_del(q.message, links)
elif fmt == 'clash':
cm = gen_clash_meta(chosen)
await send_del(q.message, f'```yaml\n{cm}\n```', parse_mode='Markdown')
elif fmt == 'url':
names = ','.join(urllib.request.quote(s['name']) for s in chosen)
url = f'https://substore.mjjtop.com/{SUB_SECRET}/download?target=ClashMeta&name={names}'
await send_del(q.message, f'📎 订阅URL:\n{url}')
elif fmt == 'b64':
links = '\n'.join(s['link'] for s in chosen)
await send_del(q.message, f'```\n{base64.b64encode(links.encode()).decode()}\n```', parse_mode='Markdown')
context.user_data.pop('multi_chosen', None)
async def cb_delsub(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = load_data()
action = q.data
uid = q.from_user.id
if action == 'del_alldown':
dead = [s for s in data['subs'] if not s.get('alive', True)]
if uid != ADMIN_ID:
dead = [s for s in dead if s.get('added_by') == uid]
if not dead:
return await send_del(q.message, '没有不可用节点')
names = [s['name'] for s in dead]
for s in dead: data['subs'].remove(s)
save_data(data)
return await send_del(q.message, f"🗑 已删除 {len(names)} 个:\n" + '\n'.join(f'{n}' for n in names))
idx = int(action.split('_')[1])
if idx >= len(data['subs']):
return await send_del(q.message, '❌ 已失效')
s = data['subs'][idx]
if uid != ADMIN_ID and uid != s.get('added_by'):
return await send_del(q.message, '⛔ 只能删除自己上传的')
removed = data['subs'].pop(idx)
save_data(data)
await send_del(q.message, f"🗑 已删除: {removed['name']}")
# --- auto detect links in messages ---
async def auto_detect(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message or not update.message.text: return
text = update.message.text.strip()
uid = update.effective_user.id
# --- 分组相关文本输入处理 ---
# 创建分组:等待输入名称
if context.user_data.get('waiting_sg_name'):
del context.user_data['waiting_sg_name']
data = load_data()
gid = secrets.token_hex(4)
sec = secrets.token_hex(16)
data['sub_groups'].append({
'id': gid, 'name': text, 'secret': sec,
'nodes': [], 'created_at': int(time.time())
})
save_data(data)
host = SUB_HOST
url = f"https://{host}/{sec}/download?target=ClashMeta"
return await send_del(update.message,
f"✅ 分组 *{text}* 已创建\n\n🔗 订阅链接:\n`{url}`", parse_mode='Markdown')
# 重命名分组:等待输入新名称
if context.user_data.get('waiting_sg_rename'):
gid = context.user_data.pop('waiting_sg_rename')
data = load_data()
sg = next((g for g in data.get('sub_groups', []) if g['id'] == gid), None)
if sg:
old = sg['name']
sg['name'] = text
save_data(data)
return await send_del(update.message, f"✅ 分组已重命名: {old}{text}")
return await send_del(update.message, '❌ 分组不存在')
# 添加节点到分组:等待输入链接
if context.user_data.get('waiting_sg_add'):
gid = context.user_data.pop('waiting_sg_add')
found = [w for w in text.split() if any(w.startswith(p) for p in PROTOS)]
surge_links = []
if not found:
for line in text.split('\n'):
ss_link, name = parse_surge_ss(line)
if ss_link: surge_links.append(ss_link)
if not found and not surge_links:
return await send_del(update.message, '❌ 未识别到订阅链接')
results = []
for link in found + surge_links:
r = add_sub_to_group(link, update.effective_user, gid)
if r and r != 'dup': results.append(r)
elif r == 'dup': results.append(f'⚠️ 已存在: {extract_name(link)}')
if results:
return await send_del(update.message, '\n'.join(results))
return
# --- 原有逻辑 ---
# check standard protocol links
found = [w for w in text.split() if any(w.startswith(p) for p in PROTOS)]
# check Surge format (line-based)
surge_links = []
if not found:
for line in text.split('\n'):
ss_link, name = parse_surge_ss(line)
if ss_link: surge_links.append(ss_link)
if not found and not surge_links:
if uid in WAITING_ADD:
WAITING_ADD.discard(uid)
await send_del(update.message, '❌ 未识别到订阅链接')
return
if not await is_member(update, context): return
WAITING_ADD.discard(uid)
results = []
for link in found + surge_links:
r = add_sub(link, update.effective_user)
if r and r != 'dup': results.append(r)
elif r == 'dup': results.append(f'⚠️ 已存在: {extract_name(link)}')
if results:
await send_del(update.message, '\n'.join(results))
# --- node health check ---
async def check_node(link, proto):
try:
server, port = parse_sp(link, proto)
if not server: return False
_, w = await asyncio.wait_for(asyncio.open_connection(server, int(port)), timeout=5)
w.close(); await w.wait_closed()
return True
except: return False
def parse_sp(link, proto):
try:
if proto == "ss":
raw = link[5:]
if "#" in raw: raw = raw.split("#")[0]
if "@" in raw:
sp = raw.split("@")[-1]
return sp.rsplit(":", 1)
try:
decoded = base64.b64decode(raw + "==").decode()
_, rest = decoded.split(":", 1)
_, sp = rest.rsplit("@", 1)
return sp.rsplit(":", 1)
except: pass
m = re.search(r'@([^:/?#]+):(\d+)', link)
if m: return m.group(1), m.group(2)
except: pass
return None, None
def parse_to_clash_proxy(s):
try:
if s['type'] == 'ss':
raw = s['link'][5:]
name = ''
if '#' in raw: raw, name = raw.rsplit('#', 1); name = urllib.request.unquote(name)
if "@" in raw:
userinfo, sp = raw.rsplit("@", 1)
server, port = sp.rsplit(":", 1)
decoded = base64.b64decode(userinfo + "==").decode()
method, pwd = decoded.split(":", 1)
else:
decoded = base64.b64decode(raw + "==").decode()
method, rest = decoded.split(":", 1)
pwd, sp = rest.rsplit("@", 1)
server, port = sp.rsplit(":", 1)
return {'name': name or server, 'type': 'ss', 'server': server, 'port': int(port),
'cipher': method, 'password': pwd}
except: pass
return None
def gen_clash_meta(subs):
lines = ['proxies:']
for s in subs:
p = parse_to_clash_proxy(s)
if p: lines.append(f' - {json.dumps(p, ensure_ascii=False)}')
return '\n'.join(lines)
# --- HTTP subscription endpoint ---
class SubHandler(BaseHTTPRequestHandler):
def do_GET(self):
path = self.path.split('?')[0]
# 解析查询参数
qs = {}
if '?' in self.path:
qs = dict(x.split('=',1) for x in self.path.split('?',1)[1].split('&') if '=' in x)
# 匹配默认分组或自定义分组
alive = []
if path == f'/{SUB_SECRET}/download':
data = load_data()
alive = [s for s in data['subs'] if s.get('alive', True)]
else:
# 尝试匹配分组 secret
parts = path.strip('/').split('/')
if len(parts) == 2 and parts[1] == 'download':
group_secret = parts[0]
data = load_data()
sg = next((g for g in data.get('sub_groups', []) if g['secret'] == group_secret), None)
if sg:
alive = [s for s in sg.get('nodes', []) if s.get('alive', True)]
else:
self.send_response(404)
self.end_headers()
return
else:
self.send_response(404)
self.end_headers()
return
ftype = qs.get('type', '')
if ftype: alive = [s for s in alive if s['type'] == ftype]
fname = qs.get('name', '')
if fname:
names = [urllib.request.unquote(n) for n in fname.split(',')]
alive = [s for s in alive if s['name'] in names]
target = qs.get('target', 'ClashMeta')
body = gen_clash_meta(alive) if target == 'ClashMeta' else '\n'.join(s['link'] for s in alive)
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(body.encode())
def log_message(self, *a): pass
def start_http():
srv = HTTPServer(('0.0.0.0', 18888), SubHandler)
log.info('Sub HTTP on :18888')
srv.serve_forever()
# --- legacy commands still work ---
async def cmd_setgroup(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != ADMIN_ID:
return await update.message.reply_text('⛔ 仅管理员可用')
cid = update.effective_chat.id
data = load_data()
if cid not in data.get('groups', []):
data.setdefault('groups', []).append(cid)
save_data(data)
await send_del(update.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown')
# --- 分组管理回调 ---
async def cb_subgroups(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
action = q.data
data = load_data()
sgs = data.get('sub_groups', [])
if action == 'sg_list':
buttons = []
for sg in sgs:
cnt = len(sg.get('nodes', []))
buttons.append([InlineKeyboardButton(f"📁 {sg['name']} ({cnt}个节点)", callback_data=f"sg_detail_{sg['id']}")])
buttons.append([InlineKeyboardButton(" 创建分组", callback_data='sg_create')])
buttons.append([InlineKeyboardButton("◀️ 返回主菜单", callback_data='sg_back')])
text = f"📁 *分组管理*\n\n{len(sgs)} 个分组" if sgs else "📁 *分组管理*\n\n暂无分组,点击创建"
try:
await q.edit_message_text(text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'sg_create':
context.user_data['waiting_sg_name'] = True
try:
await q.edit_message_text('📝 请发送分组名称:')
except:
await send_del(q.message, '📝 请发送分组名称:')
elif action.startswith('sg_detail_'):
gid = action[len('sg_detail_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg:
return await send_del(q.message, '❌ 分组不存在')
cnt = len(sg.get('nodes', []))
host = SUB_HOST
url = f"https://{host}/{sg['secret']}/download?target=ClashMeta"
text = (f"📁 *{sg['name']}*\n\n"
f"📊 节点数: {cnt}\n"
f"🔗 订阅链接:\n`{url}`")
buttons = [
[InlineKeyboardButton("📋 节点列表", callback_data=f"sg_nodes_{gid}"),
InlineKeyboardButton(" 添加节点", callback_data=f"sg_add_{gid}")],
[InlineKeyboardButton("🗑 删除节点", callback_data=f"sg_delnodes_{gid}"),
InlineKeyboardButton("🔍 检测节点", callback_data=f"sg_check_{gid}")],
[InlineKeyboardButton("📥 获取订阅", callback_data=f"sg_getsub_{gid}"),
InlineKeyboardButton("🔄 重置链接", callback_data=f"sg_reset_{gid}")],
[InlineKeyboardButton("✏️ 重命名", callback_data=f"sg_rename_{gid}"),
InlineKeyboardButton("🗑 删除分组", callback_data=f"sg_delconfirm_{gid}")],
[InlineKeyboardButton("◀️ 返回分组列表", callback_data='sg_list')],
]
try:
await q.edit_message_text(text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_nodes_'):
gid = action[len('sg_nodes_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
nodes = sg.get('nodes', [])
if not nodes:
text = f"📁 *{sg['name']}* — 暂无节点"
else:
lines = [f"📁 *{sg['name']}* 节点列表\n"]
for i, s in enumerate(nodes):
st = '🟢' if s.get('alive', True) else '🔴'
lines.append(f"`{i+1}` {st} [{s['type']}] {s['name']}")
text = '\n'.join(lines)
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
try:
await q.edit_message_text(text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, text, parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_add_'):
gid = action[len('sg_add_'):]
context.user_data['waiting_sg_add'] = gid
sg = next((g for g in sgs if g['id'] == gid), None)
name = sg['name'] if sg else '?'
try:
await q.edit_message_text(f'📎 请发送要添加到 [{name}] 的订阅链接:')
except:
await send_del(q.message, f'📎 请发送要添加到 [{name}] 的订阅链接:')
elif action.startswith('sg_check_'):
gid = action[len('sg_check_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
nodes = sg.get('nodes', [])
if not nodes:
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
return await send_del(q.message, f"📁 *{sg['name']}* 暂无节点",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
msg = await send_del(q.message, f"🔍 检测 *{sg['name']}* 中...", parse_mode='Markdown', delay=120)
results = []
for s in nodes:
ok = await check_node(s['link'], s['type'])
s['alive'] = ok
results.append(f"{'🟢' if ok else '🔴'} [{s['type']}] {s['name']}")
save_data(data)
try: await msg.delete()
except: pass
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
await send_del(q.message, f"📊 *{sg['name']}* 检测结果\n\n" + '\n'.join(results),
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_delnodes_'):
gid = action[len('sg_delnodes_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
nodes = sg.get('nodes', [])
if not nodes:
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
return await send_del(q.message, f"📁 *{sg['name']}* 暂无节点",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
buttons = []
for i, s in enumerate(nodes):
st = '🟢' if s.get('alive', True) else '🔴'
buttons.append([InlineKeyboardButton(
f"{st} [{s['type']}] {s['name']}", callback_data=f"sg_delnode_{gid}_{i}")])
buttons.append([InlineKeyboardButton("🗑 删除所有不可用", callback_data=f"sg_deldown_{gid}")])
buttons.append([InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")])
try:
await q.edit_message_text(f"🗑 *{sg['name']}* — 选择要删除的节点:",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"🗑 *{sg['name']}* — 选择要删除的节点:",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_delnode_'):
# sg_delnode_{gid}_{idx}
parts = action[len('sg_delnode_'):].rsplit('_', 1)
if len(parts) != 2: return
gid, idx_str = parts
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
idx = int(idx_str)
nodes = sg.get('nodes', [])
if idx >= len(nodes):
return await send_del(q.message, '❌ 已失效')
removed = nodes.pop(idx)
save_data(data)
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
try:
await q.edit_message_text(f"✅ 已删除: [{removed['type']}] {removed['name']}",
reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"✅ 已删除: [{removed['type']}] {removed['name']}",
reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_deldown_'):
gid = action[len('sg_deldown_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
dead = [s for s in sg.get('nodes', []) if not s.get('alive', True)]
if not dead:
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
return await send_del(q.message, '没有不可用节点', reply_markup=InlineKeyboardMarkup(buttons))
names = [s['name'] for s in dead]
sg['nodes'] = [s for s in sg['nodes'] if s.get('alive', True)]
save_data(data)
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
try:
await q.edit_message_text(f"🗑 已删除 {len(names)} 个不可用:\n" + '\n'.join(f'{n}' for n in names),
reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"🗑 已删除 {len(names)} 个不可用:\n" + '\n'.join(f'{n}' for n in names),
reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_getsub_'):
gid = action[len('sg_getsub_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
host = SUB_HOST
url = f"https://{host}/{sg['secret']}/download?target=ClashMeta"
buttons = [
[InlineKeyboardButton("📋 复制链接", callback_data=f"sg_detail_{gid}")],
[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")],
]
await send_del(q.message, f"📎 *{sg['name']}* 订阅链接:\n\n`{url}`",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_reset_'):
gid = action[len('sg_reset_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
sg['secret'] = secrets.token_hex(16)
save_data(data)
host = SUB_HOST
url = f"https://{host}/{sg['secret']}/download?target=ClashMeta"
buttons = [[InlineKeyboardButton("◀️ 返回", callback_data=f"sg_detail_{gid}")]]
try:
await q.edit_message_text(f"✅ 链接已重置\n\n新链接:\n`{url}`",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"✅ 链接已重置\n\n新链接:\n`{url}`",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_rename_'):
gid = action[len('sg_rename_'):]
context.user_data['waiting_sg_rename'] = gid
try:
await q.edit_message_text('✏️ 请发送新的分组名称:')
except:
await send_del(q.message, '✏️ 请发送新的分组名称:')
elif action.startswith('sg_delconfirm_'):
gid = action[len('sg_delconfirm_'):]
sg = next((g for g in sgs if g['id'] == gid), None)
if not sg: return
buttons = [
[InlineKeyboardButton("⚠️ 确认删除", callback_data=f"sg_dodel_{gid}"),
InlineKeyboardButton("取消", callback_data=f"sg_detail_{gid}")],
]
try:
await q.edit_message_text(f"⚠️ 确认删除分组 *{sg['name']}*\n节点数据将全部丢失!",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, f"⚠️ 确认删除分组 *{sg['name']}*",
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
elif action.startswith('sg_dodel_'):
gid = action[len('sg_dodel_'):]
data['sub_groups'] = [g for g in sgs if g['id'] != gid]
save_data(data)
buttons = [[InlineKeyboardButton("◀️ 返回分组列表", callback_data='sg_list')]]
try:
await q.edit_message_text("✅ 分组已删除", reply_markup=InlineKeyboardMarkup(buttons))
except:
await send_del(q.message, "✅ 分组已删除", reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'sg_back':
# 返回主菜单
default_cnt = len([s for s in data['subs'] if s.get('alive', True)])
sg_cnt = len(data.get('sub_groups', []))
buttons = [
[InlineKeyboardButton(f"📦 默认节点池 ({default_cnt})", callback_data='menu_default')],
[InlineKeyboardButton(f"📁 分组管理 ({sg_cnt}个分组)", callback_data='sg_list')],
[InlineKeyboardButton("🔍 全部检测", callback_data='menu_check')],
]
try:
await q.edit_message_text(
'🚀 *订阅管理 Bot*\n\n发送订阅链接自动入库到默认节点池\n或通过分组管理创建独立订阅',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
except: pass
async def auto_cleanup(bot_token):
"""Every 6h: check nodes, auto-delete dead ones from default subs only, check group nodes but don't delete"""
import httpx
await asyncio.sleep(60)
while True:
data = load_data()
# 默认分组:检测并删除不可用
if data['subs']:
dead = []
for s in list(data['subs']):
ok = await check_node(s['link'], s['type'])
if not ok:
dead.append(s)
data['subs'].remove(s)
if dead:
save_data(data)
names = '\n'.join(f"• [{s['type']}] {s['name']}" for s in dead)
msg = f'🗑 自动清理 — 已删除 {len(dead)} 个不可用节点:\n\n{names}'
for gid in data.get('groups', []):
try:
async with httpx.AsyncClient() as c:
await c.post(f'https://api.telegram.org/bot{bot_token}/sendMessage',
json={'chat_id': gid, 'text': msg})
except: pass
log.info(f'Auto cleanup: removed {len(dead)} dead nodes')
# 订阅分组:只更新存活状态,不删除
changed = False
for sg in data.get('sub_groups', []):
for s in sg.get('nodes', []):
ok = await check_node(s['link'], s['type'])
if s.get('alive', True) != ok:
s['alive'] = ok
changed = True
if changed:
save_data(data)
await asyncio.sleep(21600)
def run_cleanup():
asyncio.run(auto_cleanup(TOKEN))
def main():
app = Application.builder().token(TOKEN).build()
app.add_handler(CommandHandler('start', cmd_help))
app.add_handler(CommandHandler('vps', cmd_help))
app.add_handler(CommandHandler('setgroup', cmd_setgroup))
app.add_handler(CallbackQueryHandler(cb_menu, pattern='^menu_'))
app.add_handler(CallbackQueryHandler(cb_delsub, pattern='^del_'))
app.add_handler(CallbackQueryHandler(cb_multisel, pattern='^msel_'))
app.add_handler(CallbackQueryHandler(cb_multiout, pattern='^mout_'))
app.add_handler(CallbackQueryHandler(cb_getsub, pattern='^(get_|pick_|send_|fmt_|out_)'))
app.add_handler(CallbackQueryHandler(cb_subgroups, pattern='^sg_'))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, auto_detect))
log.info('Sub Bot starting polling...')
threading.Thread(target=start_http, daemon=True).start()
threading.Thread(target=run_cleanup, daemon=True).start()
app.run_polling(drop_pending_updates=True)
if __name__ == '__main__':
main()