From 7d1c09d9926fce98c03346f50cfdc375cf94e2a3 Mon Sep 17 00:00:00 2001 From: vpsbot8 Date: Mon, 16 Mar 2026 19:56:50 +0530 Subject: [PATCH] feat: initial vpsbot8 multi-vps control bot --- .gitignore | 15 + README.txt | 12 + app.py | 1737 ++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 + 4 files changed, 1766 insertions(+) create mode 100644 .gitignore create mode 100644 README.txt create mode 100644 app.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6549841 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +venv/ +__pycache__/ +*.pyc +*.pyo +*.pyd +.python-version +.env +.env.* +config.json +data/ +keys/ +*.db +*.sqlite +*.sqlite3 +*.log diff --git a/README.txt b/README.txt new file mode 100644 index 0000000..d0f5979 --- /dev/null +++ b/README.txt @@ -0,0 +1,12 @@ +VPS 多机中控 - MVP + +当前功能: +- Telegram Bot 启动 +- 添加节点 +- 节点列表 +- SSH 测试连接 +- 系统概览 +- 删除节点 + +部署目录建议: /opt/vpsbot8 +服务名建议: vpsbot8.service diff --git a/app.py b/app.py new file mode 100644 index 0000000..2c4b418 --- /dev/null +++ b/app.py @@ -0,0 +1,1737 @@ +import asyncio +import html +import json +import math +import os +import sqlite3 +import time +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + +import paramiko +from telegram import ( + BotCommand, + InlineKeyboardButton, + InlineKeyboardMarkup, + MenuButtonCommands, + ReplyKeyboardRemove, + Update, +) +from telegram.constants import ParseMode +from telegram.ext import ( + Application, + CallbackQueryHandler, + CommandHandler, + ContextTypes, + MessageHandler, + filters, +) + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +DATA_DIR = os.path.join(BASE_DIR, "data") +KEYS_DIR = os.path.join(BASE_DIR, "keys") +os.makedirs(DATA_DIR, exist_ok=True) +os.makedirs(KEYS_DIR, exist_ok=True) +DB_PATH = os.path.join(DATA_DIR, "app.db") +CONFIG_PATH = os.path.join(BASE_DIR, "config.json") + + +def esc(s: str) -> str: + return html.escape(str(s), quote=False) + + +def load_config_file(): + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + return json.load(f) + + +def save_config_file(cfg): + with open(CONFIG_PATH, "w", encoding="utf-8") as f: + json.dump(cfg, f, ensure_ascii=False, indent=2) + + +CONFIG = load_config_file() +TOKEN = CONFIG["bot_token"] +ADMIN_IDS = set(CONFIG.get("admin_ids", [])) +BOT_NAME = CONFIG.get("bot_name", "VPS 多机中控") + +SETTINGS_DEFAULTS = { + "default_user": CONFIG.get("default_user", "root"), + "default_port": str(CONFIG.get("default_port", 22)), + "default_password": CONFIG.get("default_password", ""), + "default_key_path": CONFIG.get("default_key_path", "/opt/vpsbot8/keys/koipy_key"), + "check_interval": "60", + "fail_threshold": "3", + "notify_recovery": "1", + "notify_offline": "1", + "cpu_alert_threshold": "0", + "mem_alert_threshold": "0", + "disk_alert_threshold": "0", + "expire_remind_days": "7", +} + +READ_ONLY_BATCH = { + "hostname": ("🏷 主机名", "hostname"), + "disk": ("💽 磁盘", "df -h /"), + "failed": ("🚨 失败服务", "systemctl --failed --no-pager --no-legend || true"), + "docker": ("🐳 Docker", "docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Image}}' 2>/dev/null || echo docker_not_available"), +} + +ADD_FLOW = ["name", "group_name", "host", "port", "user"] +SERVICE_ACTIONS = { + "status": "状态", + "restart": "重启", + "start": "启动", + "stop": "停止", +} +READONLY_CUSTOM_PREFIXES = ( + "uname", "hostname", "whoami", "uptime", "date", "df", "free", "cat /proc/", + "docker ps", "docker images", "systemctl status", "systemctl is-active", "systemctl list-units", + "ss ", "netstat ", "ip a", "ip r", "curl -I", "curl -sI", "ps ", "top -bn1", "lsb_release", "neofetch" +) +BLOCKED_CUSTOM_TOKENS = ( + " rm ", " reboot", " shutdown", " poweroff", " init 0", " init 6", " mkfs", " dd ", " chmod ", " chown ", + " useradd", " userdel", " passwd", " systemctl restart", " systemctl stop", " systemctl start", " docker stop", " docker restart", + " docker rm", " apt ", " apt-get", " yum ", " dnf ", " apk ", " pacman ", " pip install", " npm install", " sed -i", " tee ", + ">", "| sh", "| bash", "&&", ";", "\n" +) + + +@dataclass +class Node: + id: int + name: str + group_name: str + host: str + port: int + user: str + auth_type: str + auth_value: str + enabled: int + created_at: int + remark: str = "" + expires_at: int = 0 + monthly_price: float = 0.0 + price_currency: str = "U" + price_cycle: str = "month" + + +def db(): + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn + + +def init_db(): + conn = db() + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS nodes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + group_name TEXT NOT NULL DEFAULT 'default', + host TEXT NOT NULL, + port INTEGER NOT NULL DEFAULT 22, + user TEXT NOT NULL DEFAULT 'root', + password TEXT NOT NULL DEFAULT '', + auth_type TEXT DEFAULT 'password', + auth_value TEXT DEFAULT '', + enabled INTEGER NOT NULL DEFAULT 1, + created_at INTEGER NOT NULL, + remark TEXT NOT NULL DEFAULT '', + expires_at INTEGER NOT NULL DEFAULT 0, + monthly_price REAL NOT NULL DEFAULT 0, + price_currency TEXT NOT NULL DEFAULT 'U', + price_cycle TEXT NOT NULL DEFAULT 'month' + ); + + CREATE TABLE IF NOT EXISTS settings ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS node_state ( + node_id INTEGER PRIMARY KEY, + fail_count INTEGER NOT NULL DEFAULT 0, + is_online INTEGER NOT NULL DEFAULT 1, + last_error TEXT DEFAULT '', + last_change INTEGER NOT NULL DEFAULT 0, + FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS resource_state ( + node_id INTEGER NOT NULL, + metric TEXT NOT NULL, + alerted INTEGER NOT NULL DEFAULT 0, + last_value REAL NOT NULL DEFAULT 0, + updated_at INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (node_id, metric), + FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS expiry_state ( + node_id INTEGER PRIMARY KEY, + last_days_left INTEGER NOT NULL DEFAULT 999999, + last_notified_at INTEGER NOT NULL DEFAULT 0, + FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE + ); + """ + ) + cols = {row[1] for row in conn.execute("PRAGMA table_info(nodes)").fetchall()} + if "auth_type" not in cols: + conn.execute("ALTER TABLE nodes ADD COLUMN auth_type TEXT DEFAULT 'password'") + if "auth_value" not in cols: + conn.execute("ALTER TABLE nodes ADD COLUMN auth_value TEXT DEFAULT ''") + if "group_name" not in cols: + conn.execute("ALTER TABLE nodes ADD COLUMN group_name TEXT DEFAULT 'default'") + if "password" in cols: + conn.execute("UPDATE nodes SET auth_value=password WHERE (auth_value='' OR auth_value IS NULL) AND password IS NOT NULL") + conn.execute("UPDATE nodes SET auth_type='password' WHERE auth_type IS NULL OR auth_type='' ") + if "remark" not in cols: + conn.execute("ALTER TABLE nodes ADD COLUMN remark TEXT NOT NULL DEFAULT ''") + if "expires_at" not in cols: + conn.execute("ALTER TABLE nodes ADD COLUMN expires_at INTEGER NOT NULL DEFAULT 0") + if "monthly_price" not in cols: + conn.execute("ALTER TABLE nodes ADD COLUMN monthly_price REAL NOT NULL DEFAULT 0") + if "price_currency" not in cols: + conn.execute("ALTER TABLE nodes ADD COLUMN price_currency TEXT NOT NULL DEFAULT 'U'") + if "price_cycle" not in cols: + conn.execute("ALTER TABLE nodes ADD COLUMN price_cycle TEXT NOT NULL DEFAULT 'month'") + for k, v in SETTINGS_DEFAULTS.items(): + conn.execute("INSERT OR IGNORE INTO settings(key,value) VALUES(?,?)", (k, v)) + conn.commit() + conn.close() + + +def get_setting(key: str) -> str: + conn = db() + row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone() + conn.close() + if row: + return row[0] + return SETTINGS_DEFAULTS[key] + + +def set_setting(key: str, value: str): + conn = db() + conn.execute("INSERT INTO settings(key,value) VALUES(?,?) ON CONFLICT(key) DO UPDATE SET value=excluded.value", (key, str(value))) + conn.commit() + conn.close() + if key in {"default_user", "default_port", "default_password", "default_key_path"}: + cfg = load_config_file() + cfg[key] = int(value) if key == "default_port" else value + save_config_file(cfg) + + +def defaults_dict(): + return { + "user": get_setting("default_user"), + "port": int(get_setting("default_port")), + "password": get_setting("default_password"), + "key_path": get_setting("default_key_path"), + } + +def format_expiry(ts: int) -> str: + if not ts: + return "未设置" + return datetime.fromtimestamp(int(ts)).strftime("%Y-%m-%d") + + +def days_left_text(ts: int) -> str: + if not ts: + return "未设置" + days = int((int(ts) - int(time.time())) // 86400) + if days > 0: + return f"{days} 天" + if days == 0: + return "今天到期" + return f"已过期 {abs(days)} 天" + + +def alert_settings_text() -> str: + return ( + f"巡检间隔:{get_setting('check_interval')} 秒\n" + f"失败阈值:{get_setting('fail_threshold')} 次\n" + f"掉线通知:{'开' if get_setting('notify_offline') == '1' else '关'}\n" + f"恢复通知:{'开' if get_setting('notify_recovery') == '1' else '关'}" + ) + + +def add_prompts(): + d = defaults_dict() + return { + "name": "发我节点名称,例如:HK-1", + "group_name": f"发我节点分组,例如:HK / 生产。留空默认 default\n现有分组:{', '.join([g for g,_ in list_groups()][:8]) or 'default'}", + "host": "发我节点 IP 或域名,例如:1.2.3.4", + "port": f"发我 SSH 端口,默认 {d['port']},留空也行。", + "user": f"发我 SSH 用户,默认 {d['user']},留空也行。", + "custom_password": "发我这台机器的 SSH 密码。", + "custom_key": "发我私钥路径,或者直接把私钥内容整段发过来。", + "set_default_password": "发我新的默认 SSH 密码。", + "set_default_key": "发我新的默认私钥路径,或者直接发整段私钥内容。", + "set_default_user": "发我新的默认 SSH 用户。", + "set_default_port": "发我新的默认 SSH 端口。", + "set_check_interval": "发我新的巡检间隔(秒),例如 60。", + "set_fail_threshold": "发我新的失败阈值,例如 3。", + "set_cpu_alert_threshold": "发我新的 CPU 告警阈值。建议用 load 值,例如 4。填 0 为关闭。", + "set_mem_alert_threshold": "发我新的内存告警阈值(百分比)。填 0 为关闭。", + "set_disk_alert_threshold": "发我新的磁盘告警阈值(百分比)。填 0 为关闭。", + "set_expire_remind_days": "发我新的到期提前提醒天数,例如 7。", + "edit_name": "发我新的节点名称。", + "edit_host": "发我新的 IP 或域名。", + "edit_port": "发我新的 SSH 端口。", + "edit_user": "发我新的 SSH 用户。", + "edit_auth_password": "发我新的 SSH 密码。", + "edit_auth_key": "发我新的私钥路径,或者直接发整段私钥内容。", + "edit_remark": "发我新的节点备注。留空可发一个 - 代表清空。", + "edit_monthly_price": "发我新的月付(U),例如 4.99。填 0 代表未设置。", + "edit_expires_at": "发我新的到期日期,格式:YYYY-MM-DD。填 0 代表清空。", + "batch_service_name": "发我要批量操作的 systemd 服务名,例如:nginx / docker / openclaw。", + "batch_readonly_command": "发我要批量执行的只读命令,例如:df -h /、free -m、docker ps、systemctl status nginx。", + } + + +def maybe_store_private_key(raw: str, prefix: str = "custom") -> str: + text = (raw or "").strip() + if "BEGIN OPENSSH PRIVATE KEY" in text or "BEGIN RSA PRIVATE KEY" in text or "BEGIN PRIVATE KEY" in text: + path = os.path.join(KEYS_DIR, f"{prefix}_{int(time.time()*1000)}.key") + with open(path, "w", encoding="utf-8") as f: + f.write(text) + if not text.endswith("\n"): + f.write("\n") + os.chmod(path, 0o600) + return path + return text + + +def is_admin(update: Update) -> bool: + user = update.effective_user + return bool(user and user.id in ADMIN_IDS) + + +async def guard(update: Update): + if not is_admin(update): + if update.message: + await update.message.reply_text("你不在管理员白名单里。") + elif update.callback_query: + await update.callback_query.answer("你不在管理员白名单里。", show_alert=True) + return False + return True + + +def main_menu(): + return InlineKeyboardMarkup([ + [InlineKeyboardButton("📊 总览", callback_data="nodes:overview"), InlineKeyboardButton("🖥 节点", callback_data="nodes:list")], + [InlineKeyboardButton("➕ 添加", callback_data="nodes:add"), InlineKeyboardButton("🗂 分组", callback_data="groups:list")], + [InlineKeyboardButton("🔔 告警", callback_data="settings:alerts"), InlineKeyboardButton("🔐 认证", callback_data="settings:auth")], + [InlineKeyboardButton("💰 账单", callback_data="billing:summary"), InlineKeyboardButton("⏰ 到期", callback_data="billing:expiring")], + [InlineKeyboardButton("🧰 批量操作", callback_data="batch:menu")], + ]) + + +def load_node(node_id: int) -> Optional[Node]: + conn = db() + row = conn.execute( + "SELECT id,name,group_name,host,port,user,COALESCE(auth_type,'password') auth_type, COALESCE(NULLIF(auth_value,''),password) auth_value, enabled, created_at, COALESCE(remark,'') remark, COALESCE(expires_at,0) expires_at, COALESCE(monthly_price,0) monthly_price, COALESCE(price_currency,'U') price_currency, COALESCE(price_cycle,'month') price_cycle FROM nodes WHERE id=?", + (node_id,), + ).fetchone() + conn.close() + return Node(**dict(row)) if row else None + + +def all_nodes(enabled_only=True): + conn = db() + sql = "SELECT id,name,group_name,host,port,user,COALESCE(auth_type,'password') auth_type, COALESCE(NULLIF(auth_value,''),password) auth_value, enabled, created_at, COALESCE(remark,'') remark, COALESCE(expires_at,0) expires_at, COALESCE(monthly_price,0) monthly_price, COALESCE(price_currency,'U') price_currency, COALESCE(price_cycle,'month') price_cycle FROM nodes" + if enabled_only: + sql += " WHERE enabled=1" + sql += " ORDER BY group_name ASC, id DESC" + rows = conn.execute(sql).fetchall() + conn.close() + return [Node(**dict(r)) for r in rows] + + +def nodes_by_group(group_name: str): + conn = db() + rows = conn.execute( + "SELECT id,name,group_name,host,port,user,COALESCE(auth_type,'password') auth_type, COALESCE(NULLIF(auth_value,''),password) auth_value, enabled, created_at, COALESCE(remark,'') remark, COALESCE(expires_at,0) expires_at, COALESCE(monthly_price,0) monthly_price, COALESCE(price_currency,'U') price_currency, COALESCE(price_cycle,'month') price_cycle FROM nodes WHERE enabled=1 AND group_name=? ORDER BY id DESC", + (group_name,), + ).fetchall() + conn.close() + return [Node(**dict(r)) for r in rows] + + +def list_groups(): + conn = db() + rows = conn.execute("SELECT group_name, COUNT(*) c FROM nodes WHERE enabled=1 GROUP BY group_name ORDER BY group_name ASC").fetchall() + conn.close() + return [(r[0], r[1]) for r in rows] + + +def auth_label(node: Node) -> str: + d = defaults_dict() + if node.auth_type == "key": + if node.auth_value == d["key_path"]: + return "默认Key" + return "自定义Key" + if node.auth_value == d["password"] and d["password"]: + return "默认密码" + return "自定义密码" + + +async def run_ssh(node: Node, command: str, timeout: int = 12): + def _run(): + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + kwargs = dict( + hostname=node.host, + port=node.port, + username=node.user, + timeout=timeout, + banner_timeout=timeout, + auth_timeout=timeout, + look_for_keys=False, + allow_agent=False, + ) + if node.auth_type == "key": + kwargs["key_filename"] = node.auth_value + else: + kwargs["password"] = node.auth_value + try: + client.connect(**kwargs) + stdin, stdout, stderr = client.exec_command(command, timeout=timeout) + out = stdout.read().decode("utf-8", errors="ignore") + err = stderr.read().decode("utf-8", errors="ignore") + code = stdout.channel.recv_exit_status() + return code, out.strip(), err.strip() + finally: + client.close() + return await asyncio.to_thread(_run) + + +async def collect_node_summary(node: Node): + cmd = r'''bash -lc ' +HOST=$(hostname 2>/dev/null || echo unknown) +UP=$(uptime -p 2>/dev/null || uptime) +LOAD=$(cat /proc/loadavg 2>/dev/null | awk "{print \$1,\$2,\$3}" || echo -) +MEM=$(free -m 2>/dev/null | awk "/Mem:/ {printf \"%s/%s MB\", \$3, \$2}" || echo -) +DISK=$(df -h / 2>/dev/null | awk "NR==2 {printf \"%s/%s (%s)\", \$3, \$2, \$5}" || echo -) +echo "HOST=$HOST" +echo "UP=$UP" +echo "LOAD=$LOAD" +echo "MEM=$MEM" +echo "DISK=$DISK" +' ''' + code, out, err = await run_ssh(node, cmd, timeout=12) + if code != 0: + raise RuntimeError(err or out or "SSH 执行失败") + data = {} + for line in out.splitlines(): + if "=" in line: + k, v = line.split("=", 1) + data[k] = v + return data + + +async def collect_docker_list(node: Node): + cmd = r'''bash -lc ' +if ! command -v docker >/dev/null 2>&1; then + echo "DOCKER_MISSING=1" + exit 0 +fi +docker ps -a --format "{{.Names}}|{{.Status}}|{{.Image}}" 2>/dev/null +' ''' + code, out, err = await run_ssh(node, cmd, timeout=15) + if code != 0: + raise RuntimeError(err or out or "获取 Docker 列表失败") + if "DOCKER_MISSING=1" in out: + return None + items = [] + for line in out.splitlines(): + if not line.strip() or "|" not in line: + continue + name, status, image = (line.split("|", 2) + ["", "", ""])[:3] + items.append({"name": name, "status": status, "image": image}) + return items + + +async def docker_action(node: Node, container: str, action: str): + return await run_ssh(node, f"docker {action} {container}", timeout=20) + + +async def docker_logs(node: Node, container: str): + return await run_ssh(node, f"docker logs --tail 40 {container}", timeout=20) + + +def set_node_state(node_id: int, fail_count: int, is_online: int, last_error: str): + conn = db() + conn.execute( + "INSERT INTO node_state(node_id,fail_count,is_online,last_error,last_change) VALUES(?,?,?,?,?) ON CONFLICT(node_id) DO UPDATE SET fail_count=excluded.fail_count,is_online=excluded.is_online,last_error=excluded.last_error,last_change=excluded.last_change", + (node_id, fail_count, is_online, last_error[:500], int(time.time())), + ) + conn.commit() + conn.close() + + +def get_node_state(node_id: int): + conn = db() + row = conn.execute("SELECT fail_count,is_online,last_error,last_change FROM node_state WHERE node_id=?", (node_id,)).fetchone() + conn.close() + if row: + return dict(row) + return {"fail_count": 0, "is_online": 1, "last_error": "", "last_change": 0} + +def update_node_group(node_id: int, group_name: str): + conn = db() + conn.execute("UPDATE nodes SET group_name=? WHERE id=?", (group_name or 'default', node_id)) + conn.commit() + conn.close() + +def move_group_nodes(old_group: str, new_group: str = 'default'): + conn = db() + conn.execute("UPDATE nodes SET group_name=? WHERE group_name=?", (new_group or 'default', old_group)) + conn.commit() + conn.close() + +def update_node_field(node_id: int, field: str, value): + if field not in {"name", "host", "port", "user", "auth_type", "auth_value", "group_name", "remark", "expires_at", "monthly_price", "price_currency", "price_cycle"}: + raise ValueError("bad field") + conn = db() + conn.execute(f"UPDATE nodes SET {field}=? WHERE id=?", (value, node_id)) + conn.commit() + conn.close() + +def get_resource_state(node_id: int, metric: str): + conn = db() + row = conn.execute("SELECT alerted,last_value,updated_at FROM resource_state WHERE node_id=? AND metric=?", (node_id, metric)).fetchone() + conn.close() + if row: + return dict(row) + return {"alerted": 0, "last_value": 0, "updated_at": 0} + +def set_resource_state(node_id: int, metric: str, alerted: int, last_value: float): + conn = db() + conn.execute("INSERT INTO resource_state(node_id,metric,alerted,last_value,updated_at) VALUES(?,?,?,?,?) ON CONFLICT(node_id,metric) DO UPDATE SET alerted=excluded.alerted,last_value=excluded.last_value,updated_at=excluded.updated_at", (node_id, metric, int(alerted), float(last_value), int(time.time()))) + conn.commit() + conn.close() + +def parse_resource_usage(info: dict): + result = {"cpu": None, "mem": None, "disk": None} + try: + result["cpu"] = float((info.get("LOAD") or "-").split()[0]) + except Exception: + pass + try: + mem = info.get("MEM", "") + left = mem.split(" MB")[0] + used, total = left.split("/") + result["mem"] = round(float(used) / float(total) * 100, 1) + except Exception: + pass + try: + disk = info.get("DISK", "") + pct = disk.split("(")[-1].split("%)")[0].replace("%", "") + result["disk"] = float(pct) + except Exception: + pass + return result + + +def get_expiry_state(node_id: int): + conn = db() + row = conn.execute("SELECT last_days_left,last_notified_at FROM expiry_state WHERE node_id=?", (node_id,)).fetchone() + conn.close() + if row: + return dict(row) + return {"last_days_left": 999999, "last_notified_at": 0} + + +def set_expiry_state(node_id: int, last_days_left: int): + conn = db() + conn.execute("INSERT INTO expiry_state(node_id,last_days_left,last_notified_at) VALUES(?,?,?) ON CONFLICT(node_id) DO UPDATE SET last_days_left=excluded.last_days_left,last_notified_at=excluded.last_notified_at", (node_id, int(last_days_left), int(time.time()))) + conn.commit() + conn.close() + +def choose_group_buttons(node_id: int): + buttons = [] + groups = [g for g, _ in list_groups()] + for g in groups[:12]: + buttons.append([InlineKeyboardButton(f"🗂 {g}", callback_data=f"node:setgroupto:{node_id}:{g}")]) + buttons.append([InlineKeyboardButton("✍️ 新建/输入新分组", callback_data=f"node:setgroupinput:{node_id}")]) + buttons.append([InlineKeyboardButton("⬅️ 返回节点", callback_data=f"node:view:{node_id}")]) + return InlineKeyboardMarkup(buttons) + +def expiring_nodes(limit_days: int = 30): + now = int(time.time()) + end = now + limit_days * 86400 + conn = db() + rows = conn.execute( + "SELECT id,name,group_name,host,port,user,COALESCE(auth_type,'password') auth_type, COALESCE(NULLIF(auth_value,''),password) auth_value, enabled, created_at, COALESCE(remark,'') remark, COALESCE(expires_at,0) expires_at, COALESCE(monthly_price,0) monthly_price, COALESCE(price_currency,'U') price_currency, COALESCE(price_cycle,'month') price_cycle FROM nodes WHERE enabled=1 AND expires_at>0 AND expires_at<=? ORDER BY expires_at ASC, id DESC", + (end,), + ).fetchall() + conn.close() + return [Node(**dict(r)) for r in rows] + + +def monthly_cost_total(): + conn = db() + row = conn.execute("SELECT COALESCE(SUM(monthly_price),0) s FROM nodes WHERE enabled=1").fetchone() + conn.close() + return float(row[0] or 0) + + +def monthly_cost_by_group(): + conn = db() + rows = conn.execute("SELECT group_name, COUNT(*) c, COALESCE(SUM(monthly_price),0) s FROM nodes WHERE enabled=1 GROUP BY group_name ORDER BY s DESC, group_name ASC").fetchall() + conn.close() + return [(r[0], int(r[1]), float(r[2] or 0)) for r in rows] + + +def is_safe_readonly_command(command: str) -> bool: + c = (command or "").strip().lower() + if not c: + return False + if any(tok in c for tok in BLOCKED_CUSTOM_TOKENS): + return False + return c.startswith(READONLY_CUSTOM_PREFIXES) + + +def cycle_label(cycle: str) -> str: + return {"month": "月付", "quarter": "季付", "year": "年付"}.get((cycle or "month"), "月付") + + +def format_price(node: Node) -> str: + amount = float(node.monthly_price or 0) + if amount <= 0: + return "未设置" + currency = (node.price_currency or "U").upper() + return f"{amount:.2f} {currency}/{cycle_label(node.price_cycle)}" + + +def annual_cost_total(): + return round(monthly_cost_total() * 12, 2) + + +def top_cost_nodes(limit: int = 10): + conn = db() + rows = conn.execute( + "SELECT id,name,group_name,host,port,user,COALESCE(auth_type,'password') auth_type, COALESCE(NULLIF(auth_value,''),password) auth_value, enabled, created_at, COALESCE(remark,'') remark, COALESCE(expires_at,0) expires_at, COALESCE(monthly_price,0) monthly_price, COALESCE(price_currency,'U') price_currency, COALESCE(price_cycle,'month') price_cycle FROM nodes WHERE enabled=1 ORDER BY monthly_price DESC, expires_at ASC, id DESC LIMIT ?", + (limit,), + ).fetchall() + conn.close() + return [Node(**dict(r)) for r in rows] + + +async def notify_admins(app: Application, text: str): + for uid in ADMIN_IDS: + try: + await app.bot.send_message(chat_id=uid, text=text) + except Exception: + pass + + +async def monitor_once(app: Application): + nodes = all_nodes(enabled_only=True) + threshold = max(1, int(get_setting("fail_threshold"))) + expire_days = max(0, int(get_setting("expire_remind_days") or "0")) + now = int(time.time()) + for node in nodes: + state = get_node_state(node.id) + try: + code, out, err = await run_ssh(node, "echo ok", timeout=8) + if code == 0: + if state["is_online"] == 0 and get_setting("notify_recovery") == "1": + await notify_admins(app, f"✅ 节点恢复\n{node.name} ({node.host})\n分组: {node.group_name}") + set_node_state(node.id, 0, 1, "") + else: + fail_count = state["fail_count"] + 1 + is_online = 0 if fail_count >= threshold else 1 + if is_online == 0 and state["is_online"] == 1 and get_setting("notify_offline") == "1": + await notify_admins(app, f"🚨 节点掉线\n{node.name} ({node.host})\n分组: {node.group_name}\n错误: {(err or out or 'SSH failed')[:200]}") + set_node_state(node.id, fail_count, is_online, err or out or "SSH failed") + except Exception as e: + fail_count = state["fail_count"] + 1 + is_online = 0 if fail_count >= threshold else 1 + if is_online == 0 and state["is_online"] == 1 and get_setting("notify_offline") == "1": + await notify_admins(app, f"🚨 节点掉线\n{node.name} ({node.host})\n分组: {node.group_name}\n错误: {str(e)[:200]}") + set_node_state(node.id, fail_count, is_online, str(e)) + + if int(node.expires_at or 0) > 0 and expire_days > 0: + days_left = math.floor((int(node.expires_at) - now) / 86400) + exp_state = get_expiry_state(node.id) + remind_points = sorted({expire_days, 7, 3, 0}, reverse=True) + should_notify = False + title = None + if days_left < 0: + last_at = int(exp_state.get("last_notified_at") or 0) + if last_at <= 0 or (now - last_at) >= 86400: + should_notify = True + title = "⛔ 节点已过期" + else: + for point in remind_points: + if point < 0: + continue + if days_left <= point and int(exp_state.get("last_days_left", 999999)) > point: + should_notify = True + title = "⏰ 节点即将到期" if days_left > 0 else "⚠️ 节点今天到期" + break + if should_notify: + await notify_admins(app, f"{title}\n{node.name} ({node.host})\n分组: {node.group_name}\n到期: {format_expiry(node.expires_at)}\n剩余: {days_left_text(node.expires_at)}\n账单: {format_price(node)}") + set_expiry_state(node.id, days_left) + + +async def monitor_loop(app: Application): + await asyncio.sleep(8) + while True: + try: + await monitor_once(app) + except Exception: + pass + await asyncio.sleep(max(15, int(get_setting("check_interval")))) + + +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not await guard(update): + return + total = len(all_nodes()) + groups = len(list_groups()) + text = ( + f"{BOT_NAME}\n" + f"多 VPS 中控台\n\n" + f"节点数:{total}\n" + f"分组数:{groups}\n" + f"告警:{'开' if get_setting('notify_offline') == '1' else '关'}" + ) + if update.message: + await update.message.reply_text(text, parse_mode=ParseMode.HTML, reply_markup=ReplyKeyboardRemove()) + await update.message.reply_text("主菜单", reply_markup=main_menu()) + else: + await update.callback_query.edit_message_text(text, parse_mode=ParseMode.HTML, reply_markup=main_menu()) + + +async def show_nodes(query, group_name: Optional[str] = None): + rows = nodes_by_group(group_name) if group_name else all_nodes() + title = f"分组:{group_name}" if group_name else "节点列表(按分组)" + if not rows: + await query.edit_message_text( + f"{title}\n\n还没有节点。", + reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("➕ 添加节点", callback_data="nodes:add")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ]), + ) + return + buttons = [] + last_group = None + for r in rows[:50]: + if r.group_name != last_group: + buttons.append([InlineKeyboardButton(f"🗂 {r.group_name}", callback_data=f"batch:group:{r.group_name}")]) + last_group = r.group_name + buttons.append([InlineKeyboardButton(f" └ 🖥️ {r.name} ({r.host})", callback_data=f"node:view:{r.id}")]) + buttons.append([InlineKeyboardButton("🧰 批量操作", callback_data="batch:menu")]) + buttons.append([InlineKeyboardButton("🏠 首页", callback_data="home")]) + await query.edit_message_text(title, reply_markup=InlineKeyboardMarkup(buttons)) + + +async def show_groups(query): + groups = list_groups() + if not groups: + await query.edit_message_text("还没有任何分组。", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 首页", callback_data="home")]])) + return + buttons = [] + for g, c in groups: + buttons.append([ + InlineKeyboardButton(f"🗂 {g} ({c})", callback_data=f"groups:view:{g}"), + InlineKeyboardButton("🧰 批量", callback_data=f"batch:group:{g}"), + InlineKeyboardButton("🗑 删除", callback_data=f"groups:delask:{g}"), + ]) + buttons.append([InlineKeyboardButton("🌐 全部节点批量", callback_data="batch:scope:all")]) + buttons.append([InlineKeyboardButton("🏠 首页", callback_data="home")]) + await query.edit_message_text("分组中心", reply_markup=InlineKeyboardMarkup(buttons)) + + +async def begin_add(query, context: ContextTypes.DEFAULT_TYPE): + context.user_data["add_node"] = {} + context.user_data["add_step"] = ADD_FLOW[0] + await query.edit_message_text( + "开始添加节点\n\n" + add_prompts()[ADD_FLOW[0]], + parse_mode=ParseMode.HTML, + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 首页", callback_data="home")]]), + ) + + +async def prompt_auth_choice(message, data): + summary = ( + f"名称:{esc(data['name'])}\n" + f"分组:{esc(data['group_name'])}\n" + f"地址:{esc(data['host'])}:{data['port']}\n" + f"用户:{esc(data['user'])}\n\n" + f"现在选 SSH 认证方式:" + ) + await message.reply_text(summary, parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("🔐 默认密码", callback_data="auth:default_password"), InlineKeyboardButton("🗝️ 默认Key", callback_data="auth:default_key")], + [InlineKeyboardButton("✍️ 自定义密码", callback_data="auth:custom_password"), InlineKeyboardButton("📁 自定义Key", callback_data="auth:custom_key")], + ])) + + +async def save_node_from_context(update: Update, context: ContextTypes.DEFAULT_TYPE): + data = context.user_data.get("add_node", {}) + d = defaults_dict() + conn = db() + conn.execute( + "INSERT INTO nodes(name,group_name,host,port,user,password,auth_type,auth_value,enabled,created_at) VALUES(?,?,?,?,?,?,?,?,?,?)", + (data["name"], data["group_name"], data["host"], data["port"], data["user"], data.get("auth_value", ""), data["auth_type"], data["auth_value"], 1, int(time.time())), + ) + conn.commit() + node_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] + conn.close() + context.user_data.pop("add_step", None) + context.user_data.pop("add_node", None) + auth_text = "默认密码" if data["auth_type"] == "password" and data["auth_value"] == d["password"] else ("默认Key" if data["auth_type"] == "key" and data["auth_value"] == d["key_path"] else ("自定义密码" if data["auth_type"] == "password" else "自定义Key")) + await update.effective_chat.send_message( + f"✅ 节点已添加\n\n名称:{esc(data['name'])}\n分组:{esc(data['group_name'])}\n地址:{esc(data['host'])}:{data['port']}\n用户:{esc(data['user'])}\n认证:{esc(auth_text)}", + parse_mode=ParseMode.HTML, + reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("🔌 测试连接", callback_data=f"node:test:{node_id}"), InlineKeyboardButton("📄 查看节点", callback_data=f"node:view:{node_id}")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ]), + ) + + +async def show_node(query, node_id: int): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + state = get_node_state(node.id) + status = "在线" if state["is_online"] else f"离线({state['fail_count']})" + price = format_price(node) + expiry = format_expiry(node.expires_at) + expiry_left = days_left_text(node.expires_at) if int(node.expires_at or 0) > 0 else "未设置" + text = ( + f"{esc(node.name)}\n" + f"分组:{esc(node.group_name)}\n" + f"地址:{esc(node.host)}:{node.port}\n" + f"用户:{esc(node.user)}\n" + f"认证:{esc(auth_label(node))}\n" + f"状态:{esc(status)}\n" + f"账单:{esc(price)}\n" + f"到期:{esc(expiry)}\n" + f"剩余:{esc(expiry_left)}" + ) + if node.remark: + text += f"\n备注:{esc(node.remark)}" + await query.edit_message_text(text, parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("🔌 测试", callback_data=f"node:test:{node.id}"), InlineKeyboardButton("📊 概览", callback_data=f"node:summary:{node.id}")], + [InlineKeyboardButton("🐳 Docker", callback_data=f"node:docker:{node.id}"), InlineKeyboardButton("💰 账单", callback_data=f"node:billing:{node.id}")], + [InlineKeyboardButton("✏️ 编辑", callback_data=f"node:edit:{node.id}"), InlineKeyboardButton("🗂 改分组", callback_data=f"node:setgroup:{node.id}")], + [InlineKeyboardButton("🧰 批量同组", callback_data=f"batch:group:{node.group_name}"), InlineKeyboardButton("🗑 删除", callback_data=f"node:delask:{node.id}")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ])) + + +async def node_test(query, node_id: int): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + await query.edit_message_text(f"正在测试 {esc(node.name)}…", parse_mode=ParseMode.HTML) + try: + code, out, err = await run_ssh(node, "echo ok && hostname && whoami", timeout=10) + if code == 0: + lines = [x for x in out.splitlines() if x.strip()] + host = lines[1] if len(lines) > 1 else "-" + user = lines[2] if len(lines) > 2 else node.user + text = f"✅ 连接成功\n\n节点:{esc(node.name)}\n主机:{esc(host)}\n用户:{esc(user)}" + else: + text = f"❌ 连接失败\n\n{esc(err or out or '未知错误')}" + except Exception as e: + text = f"❌ 连接失败\n\n{esc(str(e))}" + await query.edit_message_text(text, parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("⬅️ 返回节点", callback_data=f"node:view:{node_id}")], + ])) + + +async def node_summary(query, node_id: int): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + await query.edit_message_text(f"正在拉取 {esc(node.name)} 概览…", parse_mode=ParseMode.HTML) + try: + info = await collect_node_summary(node) + text = ( + f"{esc(node.name)}\n\n" + f"主机名:{esc(info.get('HOST','-'))}\n" + f"运行时间:{esc(info.get('UP','-'))}\n" + f"负载:{esc(info.get('LOAD','-'))}\n" + f"内存:{esc(info.get('MEM','-'))}\n" + f"磁盘:{esc(info.get('DISK','-'))}" + ) + except Exception as e: + text = f"❌ 拉取失败\n\n{esc(str(e))}" + await query.edit_message_text(text, parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("⬅️ 返回节点", callback_data=f"node:view:{node_id}")], + ])) + + +async def nodes_overview(query): + nodes = all_nodes() + if not nodes: + await query.edit_message_text("还没有节点。", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("➕ 添加节点", callback_data="nodes:add")], [InlineKeyboardButton("🏠 首页", callback_data="home")]])) + return + await query.edit_message_text("正在拉取全部节点概览…") + results = await asyncio.gather(*[collect_node_summary(n) for n in nodes], return_exceptions=True) + lines = ["批量总览"] + ok = 0 + for node, result in zip(nodes, results): + if isinstance(result, Exception): + lines.append(f"\n🔴 {esc(node.name)} [{esc(node.group_name)}]\n{esc(str(result))[:120]}") + else: + ok += 1 + lines.append(f"\n🟢 {esc(node.name)} [{esc(node.group_name)}]\n负载 {esc(result.get('LOAD','-'))} | 内存 {esc(result.get('MEM','-'))}\n磁盘 {esc(result.get('DISK','-'))}") + lines.insert(1, f"在线/可连:{ok}/{len(nodes)}\n") + await query.edit_message_text("\n".join(lines)[:3900], parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("🔄 刷新", callback_data="nodes:overview")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ])) + + +async def show_auth_settings(query): + d = defaults_dict() + text = ( + "默认认证设置\n\n" + f"默认用户:{esc(d['user'])}\n" + f"默认端口:{d['port']}\n" + f"默认密码:{'已设置' if d['password'] else '未设置'}\n" + f"默认Key:{esc(d['key_path'])}" + ) + await query.edit_message_text(text, parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("改默认密码", callback_data="settings:set_default_password"), InlineKeyboardButton("改默认Key", callback_data="settings:set_default_key")], + [InlineKeyboardButton("改默认用户", callback_data="settings:set_default_user"), InlineKeyboardButton("改默认端口", callback_data="settings:set_default_port")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ])) + + +async def show_alert_settings(query): + text = "告警设置\n\n" + alert_settings_text() + await query.edit_message_text(text, parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("改巡检间隔", callback_data="settings:set_check_interval"), InlineKeyboardButton("改失败阈值", callback_data="settings:set_fail_threshold")], + [InlineKeyboardButton("CPU 阈值", callback_data="settings:set_cpu_alert_threshold"), InlineKeyboardButton("内存阈值", callback_data="settings:set_mem_alert_threshold")], + [InlineKeyboardButton("磁盘阈值", callback_data="settings:set_disk_alert_threshold"), InlineKeyboardButton("到期提醒天数", callback_data="settings:set_expire_remind_days")], + [InlineKeyboardButton("切换掉线通知", callback_data="settings:toggle_notify_offline"), InlineKeyboardButton("切换恢复通知", callback_data="settings:toggle_notify_recovery")], + [InlineKeyboardButton("立即巡检一次", callback_data="monitor:run")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ])) + + +async def show_billing_summary(query): + nodes = all_nodes() + priced = [n for n in nodes if float(n.monthly_price or 0) > 0] + upcoming = expiring_nodes(30) + group_rows = monthly_cost_by_group() + top_nodes = [n for n in top_cost_nodes(8) if float(n.monthly_price or 0) > 0] + lines = ["账单总览"] + lines.append(f"节点:{len(nodes)} | 已填账单:{len(priced)}") + lines.append("总成本:多币种,按节点明细查看") + if group_rows: + lines.append("\n分组覆盖") + for g, c, s in group_rows[:8]: + lines.append(f"• {esc(g)}{c} 台") + if top_nodes: + lines.append("\n账单节点") + for n in top_nodes[:5]: + lines.append(f"• {esc(n.name)} [{esc(n.group_name)}]:{esc(format_price(n))}") + if upcoming: + lines.append("\n30 天内到期") + for n in upcoming[:5]: + lines.append(f"• {esc(n.name)}{esc(format_expiry(n.expires_at))}({esc(days_left_text(n.expires_at))})") + else: + lines.append("\n30 天内没有到期节点。") + await query.edit_message_text("\n".join(lines)[:3900], parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("⏰ 即将到期", callback_data="billing:expiring"), InlineKeyboardButton("🔄 刷新", callback_data="billing:summary")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ])) + + +async def show_expiring_nodes(query, limit_days: int = 30): + rows = expiring_nodes(limit_days) + if not rows: + await query.edit_message_text(f"未来 {limit_days} 天没有到期节点。", reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("💰 返回账单", callback_data="billing:summary")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ])) + return + lines = [f"{limit_days} 天内到期"] + for n in rows[:30]: + price = f" | {format_price(n)}" if float(n.monthly_price or 0) > 0 else "" + remark = f"\n{esc(n.remark)}" if n.remark else "" + lines.append(f"\n• {esc(n.name)} [{esc(n.group_name)}]\n{esc(format_expiry(n.expires_at))}({esc(days_left_text(n.expires_at))}){price}{remark}") + await query.edit_message_text("\n".join(lines)[:3900], parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("7 天内", callback_data="billing:expiring:7"), InlineKeyboardButton("30 天内", callback_data="billing:expiring:30")], + [InlineKeyboardButton("💰 返回账单", callback_data="billing:summary")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ])) + + +async def show_batch_menu(query): + buttons = [ + [InlineKeyboardButton("🖥 预置只读命令", callback_data="batch:readonly")], + [InlineKeyboardButton("⌨️ 自定义只读命令", callback_data="batchcustom:menu")], + [InlineKeyboardButton("⚙️ 批量服务管理", callback_data="batchsvc:menu")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ] + await query.edit_message_text("选择批量操作类型:", reply_markup=InlineKeyboardMarkup(buttons)) + + +async def show_batch_actions(query, scope: str): + buttons = [[InlineKeyboardButton(title, callback_data=f"batch:run:{scope}:{action}")] for action, (title, _) in READ_ONLY_BATCH.items()] + buttons.append([InlineKeyboardButton("⬅️ 返回", callback_data="batch:menu")]) + await query.edit_message_text(f"范围:{esc(scope)}\n选择一个只读批量动作:", parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup(buttons)) + + +async def run_batch_action(query, scope: str, action: str): + nodes = all_nodes() if scope == "all" else nodes_by_group(scope) + if not nodes: + await query.edit_message_text("这个范围下没有节点。", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回", callback_data="batch:menu")]])) + return + title, command = READ_ONLY_BATCH[action] + await query.edit_message_text(f"正在执行 {title}…") + results = await asyncio.gather(*[run_ssh(n, command, timeout=15) for n in nodes], return_exceptions=True) + lines = [f"{esc(title)} | 范围 {esc(scope)}"] + for node, result in zip(nodes, results): + if isinstance(result, Exception): + lines.append(f"\n🔴 {esc(node.name)}\n{esc(str(result))[:250]}") + else: + code, out, err = result + body = (out or err or "(空输出)")[:350] + status = "🟢" if code == 0 else "🔴" + lines.append(f"\n{status} {esc(node.name)}\n
{esc(body)}
") + await query.edit_message_text("\n".join(lines)[:3900], parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("⬅️ 返回批量菜单", callback_data="batch:menu")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ])) + + +async def show_batch_custom_scope(query): + groups = list_groups() + buttons = [[InlineKeyboardButton("全部节点", callback_data="batchcustom:scope:all")]] + for g, c in groups[:20]: + buttons.append([InlineKeyboardButton(f"分组 {g} ({c})", callback_data=f"batchcustom:scope:{g}")]) + buttons.append([InlineKeyboardButton("⬅️ 返回批量菜单", callback_data="batch:menu")]) + await query.edit_message_text("选择自定义只读命令范围:", reply_markup=InlineKeyboardMarkup(buttons)) + + +async def show_batch_service_scope(query): + groups = list_groups() + buttons = [[InlineKeyboardButton("全部节点", callback_data="batchsvc:scope:all")]] + for g, c in groups[:20]: + buttons.append([InlineKeyboardButton(f"分组 {g} ({c})", callback_data=f"batchsvc:scope:{g}")]) + buttons.append([InlineKeyboardButton("⬅️ 返回批量菜单", callback_data="batch:menu")]) + await query.edit_message_text("选择批量服务管理范围:", reply_markup=InlineKeyboardMarkup(buttons)) + + +async def show_batch_service_actions(query, scope: str): + buttons = [ + [InlineKeyboardButton("查看状态", callback_data=f"batchsvc:ask:{scope}:status"), InlineKeyboardButton("重启服务", callback_data=f"batchsvc:ask:{scope}:restart")], + [InlineKeyboardButton("启动服务", callback_data=f"batchsvc:ask:{scope}:start"), InlineKeyboardButton("停止服务", callback_data=f"batchsvc:ask:{scope}:stop")], + [InlineKeyboardButton("⬅️ 返回范围选择", callback_data="batchsvc:menu")], + ] + await query.edit_message_text(f"范围:{esc(scope)}\n选择服务动作:", parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup(buttons)) + + +async def run_batch_service_action(query, scope: str, service_name: str, action: str): + nodes = all_nodes() if scope == "all" else nodes_by_group(scope) + if not nodes: + await query.edit_message_text("这个范围下没有节点。", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回", callback_data="batchsvc:menu")]])) + return + service_name = service_name.strip() + action_label = SERVICE_ACTIONS.get(action, action) + if action == "status": + command = f"systemctl status --no-pager --full {service_name} 2>&1 || systemctl is-active {service_name} 2>&1 || true" + timeout = 20 + else: + command = f"systemctl {action} {service_name} 2>&1" + timeout = 25 + await query.edit_message_text(f"正在对 {esc(scope)} 执行 {esc(action_label)}{esc(service_name)}", parse_mode=ParseMode.HTML) + results = await asyncio.gather(*[run_ssh(n, command, timeout=timeout) for n in nodes], return_exceptions=True) + lines = [f"批量服务 {esc(action_label)} | 范围 {esc(scope)} | 服务 {esc(service_name)}"] + for node, result in zip(nodes, results): + if isinstance(result, Exception): + lines.append(f"\n🔴 {esc(node.name)}\n{esc(str(result))[:220]}") + else: + code, out, err = result + body = (out or err or "(空输出)")[:320] + status = "🟢" if code == 0 else "🔴" + lines.append(f"\n{status} {esc(node.name)}\n
{esc(body)}
") + await query.edit_message_text("\n".join(lines)[:3900], parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("⬅️ 返回服务菜单", callback_data=f"batchsvc:scope:{scope}")], + [InlineKeyboardButton("⬅️ 返回批量菜单", callback_data="batch:menu")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ])) + + +async def show_docker(query, node_id: int): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + await query.edit_message_text(f"正在拉取 {esc(node.name)} 的 Docker 列表…", parse_mode=ParseMode.HTML) + try: + items = await collect_docker_list(node) + if items is None: + await query.edit_message_text("这台机器没装 Docker。", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回节点", callback_data=f"node:view:{node.id}")]])) + return + if not items: + await query.edit_message_text("这台机器当前没有容器。", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回节点", callback_data=f"node:view:{node.id}")]])) + return + lines = [f"{esc(node.name)} Docker"] + buttons = [] + for item in items[:12]: + lines.append(f"\n• {esc(item['name'])}\n{esc(item['status'])}\n{esc(item['image'])}") + buttons.append([InlineKeyboardButton(f"⚙️ {item['name']}", callback_data=f"docker:menu:{node.id}:{item['name']}")]) + buttons.append([InlineKeyboardButton("⬅️ 返回节点", callback_data=f"node:view:{node.id}")]) + await query.edit_message_text("\n".join(lines)[:3900], parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup(buttons)) + except Exception as e: + await query.edit_message_text(f"❌ 获取 Docker 列表失败\n\n{esc(str(e))}", parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回节点", callback_data=f"node:view:{node.id}")]])) + + +async def show_docker_menu(query, node_id: int, container: str): + await query.edit_message_text(f"容器:{esc(container)}", parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("▶️ 启动", callback_data=f"docker:act:{node_id}:{container}:start"), InlineKeyboardButton("⏹️ 停止", callback_data=f"docker:act:{node_id}:{container}:stop")], + [InlineKeyboardButton("🔄 重启", callback_data=f"docker:act:{node_id}:{container}:restart")], + [InlineKeyboardButton("📜 日志", callback_data=f"docker:logs:{node_id}:{container}")], + [InlineKeyboardButton("⬅️ 返回 Docker", callback_data=f"node:docker:{node_id}")], + ])) + + +async def do_docker_action(query, node_id: int, container: str, action: str): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + await query.edit_message_text(f"正在执行 {action}…") + try: + code, out, err = await docker_action(node, container, action) + text = f"{'✅' if code == 0 else '❌'} {action}\n\n{esc(out or err or '(空输出)')[:3500]}" + except Exception as e: + text = f"❌ 执行失败\n\n{esc(str(e))}" + await query.edit_message_text(text, parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回容器菜单", callback_data=f"docker:menu:{node_id}:{container}")]])) + + +async def show_docker_logs(query, node_id: int, container: str): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + await query.edit_message_text("正在读取日志…") + try: + code, out, err = await docker_logs(node, container) + text = f"{esc(container)} 日志\n\n
{esc((out or err or '(空日志)')[:3500])}
" + except Exception as e: + text = f"❌ 日志获取失败\n\n{esc(str(e))}" + await query.edit_message_text(text, parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回容器菜单", callback_data=f"docker:menu:{node_id}:{container}")]])) + + +async def show_billing_menu(query, node_id: int): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + price = format_price(node) + expiry = format_expiry(node.expires_at) + left = days_left_text(node.expires_at) if int(node.expires_at or 0) > 0 else "未设置" + remark = node.remark or "未设置" + await query.edit_message_text( + f"账单信息\n\n节点:{esc(node.name)}\n备注:{esc(remark)}\n账单:{esc(price)}\n到期:{esc(expiry)}\n剩余:{esc(left)}", + parse_mode=ParseMode.HTML, + reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("改备注", callback_data=f"node:editfield:{node_id}:remark"), InlineKeyboardButton("改金额", callback_data=f"node:editfield:{node_id}:monthly_price")], + [InlineKeyboardButton("周期", callback_data=f"node:cycle:{node_id}"), InlineKeyboardButton("货币", callback_data=f"node:currency:{node_id}")], + [InlineKeyboardButton("改到期日", callback_data=f"node:editfield:{node_id}:expires_at")], + [InlineKeyboardButton("⬅️ 返回节点", callback_data=f"node:view:{node_id}")], + ]), + ) + + +async def show_cycle_menu(query, node_id: int): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + await query.edit_message_text( + f"选择账单周期\n\n节点:{esc(node.name)}\n当前:{esc(cycle_label(node.price_cycle))}", + parse_mode=ParseMode.HTML, + reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("月付", callback_data=f"node:setcycle:{node_id}:month"), InlineKeyboardButton("季付", callback_data=f"node:setcycle:{node_id}:quarter")], + [InlineKeyboardButton("年付", callback_data=f"node:setcycle:{node_id}:year")], + [InlineKeyboardButton("⬅️ 返回账单", callback_data=f"node:billing:{node_id}")], + ]), + ) + + +async def show_currency_menu(query, node_id: int): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + current = (node.price_currency or "U").upper() + await query.edit_message_text( + f"选择货币单位\n\n节点:{esc(node.name)}\n当前:{esc(current)}", + parse_mode=ParseMode.HTML, + reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("U", callback_data=f"node:setcurrency:{node_id}:U"), InlineKeyboardButton("CNY", callback_data=f"node:setcurrency:{node_id}:CNY")], + [InlineKeyboardButton("USD", callback_data=f"node:setcurrency:{node_id}:USD"), InlineKeyboardButton("HKD", callback_data=f"node:setcurrency:{node_id}:HKD")], + [InlineKeyboardButton("TWD", callback_data=f"node:setcurrency:{node_id}:TWD")], + [InlineKeyboardButton("⬅️ 返回账单", callback_data=f"node:billing:{node_id}")], + ]), + ) + + +async def show_edit_menu(query, node_id: int): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + await query.edit_message_text( + f"编辑节点:{esc(node.name)}", + parse_mode=ParseMode.HTML, + reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("改名称", callback_data=f"node:editfield:{node_id}:name"), InlineKeyboardButton("改地址", callback_data=f"node:editfield:{node_id}:host")], + [InlineKeyboardButton("改端口", callback_data=f"node:editfield:{node_id}:port"), InlineKeyboardButton("改用户", callback_data=f"node:editfield:{node_id}:user")], + [InlineKeyboardButton("改密码", callback_data=f"node:editauth:{node_id}:password"), InlineKeyboardButton("改Key", callback_data=f"node:editauth:{node_id}:key")], + [InlineKeyboardButton("💰 账单信息", callback_data=f"node:billing:{node_id}")], + [InlineKeyboardButton("⬅️ 返回节点", callback_data=f"node:view:{node_id}")], + ]), + ) + +async def ask_delete(query, node_id: int): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + await query.edit_message_text(f"确认删除 {esc(node.name)} 吗?", parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("⚠️ 确认删除", callback_data=f"node:del:{node.id}")], + [InlineKeyboardButton("取消", callback_data=f"node:view:{node.id}")], + ])) + + +async def delete_node(query, node_id: int): + node = load_node(node_id) + if not node: + await query.answer("节点不存在", show_alert=True) + return + conn = db() + conn.execute("DELETE FROM node_state WHERE node_id=?", (node_id,)) + conn.execute("DELETE FROM nodes WHERE id=?", (node_id,)) + conn.commit() + conn.close() + await query.edit_message_text(f"✅ 已删除 {esc(node.name)}", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 首页", callback_data="home")]])) + + +async def on_text(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not await guard(update): + return + step = context.user_data.get("add_step") + if not step: + return + value = (update.message.text or "").strip() + prompts = add_prompts() + defaults = defaults_dict() + data = context.user_data.get("add_node", {}) + + if step == "port": + value = value or str(defaults["port"]) + if not value.isdigit(): + await update.message.reply_text("端口必须是数字,重新发。") + return + data["port"] = int(value) + elif step == "user": + data["user"] = value or defaults["user"] + elif step == "group_name": + data["group_name"] = value or "default" + elif step == "custom_password": + if not value: + await update.message.reply_text("密码不能为空。") + return + data["auth_type"] = "password" + data["auth_value"] = value + context.user_data["add_node"] = data + await save_node_from_context(update, context) + return + elif step == "custom_key": + if not value: + await update.message.reply_text("Key 路径或私钥内容不能为空。") + return + data["auth_type"] = "key" + data["auth_value"] = maybe_store_private_key(value, "node") + context.user_data["add_node"] = data + await save_node_from_context(update, context) + return + elif step == "edit_group_name": + node_id = context.user_data.get("edit_node_id") + if not node_id: + await update.message.reply_text("没有找到要修改的节点。") + return + group_name = value or "default" + update_node_group(int(node_id), group_name) + context.user_data.pop("add_step", None) + context.user_data.pop("edit_node_id", None) + await update.message.reply_text(f"✅ 分组已改为:{group_name}", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("📄 返回节点", callback_data=f"node:view:{node_id}")]])) + return + elif step in {"edit_name", "edit_host", "edit_port", "edit_user", "edit_auth_password", "edit_auth_key", "edit_remark", "edit_monthly_price", "edit_expires_at"}: + node_id = context.user_data.get("edit_node_id") + if not node_id: + await update.message.reply_text("没有找到要修改的节点。") + return + if step == "edit_port": + if not value.isdigit(): + await update.message.reply_text("端口必须是数字。") + return + update_node_field(int(node_id), "port", int(value)) + elif step == "edit_name": + update_node_field(int(node_id), "name", value) + elif step == "edit_host": + update_node_field(int(node_id), "host", value) + elif step == "edit_user": + update_node_field(int(node_id), "user", value) + elif step == "edit_auth_password": + update_node_field(int(node_id), "auth_type", "password") + update_node_field(int(node_id), "auth_value", value) + elif step == "edit_auth_key": + update_node_field(int(node_id), "auth_type", "key") + update_node_field(int(node_id), "auth_value", maybe_store_private_key(value, "edit")) + elif step == "edit_remark": + update_node_field(int(node_id), "remark", "" if value == "-" else value) + elif step == "edit_monthly_price": + try: + price = float(value) + except Exception: + await update.message.reply_text("金额格式不对,重新发,例如 4.99") + return + update_node_field(int(node_id), "monthly_price", price) + elif step == "edit_expires_at": + if value == "0": + update_node_field(int(node_id), "expires_at", 0) + else: + try: + ts = int(datetime.strptime(value, "%Y-%m-%d").timestamp()) + except Exception: + await update.message.reply_text("日期格式不对,按 YYYY-MM-DD 发。") + return + update_node_field(int(node_id), "expires_at", ts) + context.user_data.pop("add_step", None) + context.user_data.pop("edit_node_id", None) + await update.message.reply_text("✅ 节点已更新。", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("📄 返回节点", callback_data=f"node:view:{node_id}")]])) + return + elif step == "batch_service_name": + scope = context.user_data.get("batch_service_scope") + action = context.user_data.get("batch_service_action") + if not scope or not action: + await update.message.reply_text("批量服务上下文丢了,请重新进入批量服务菜单。") + return + if not value: + await update.message.reply_text("服务名不能为空。") + return + context.user_data.pop("add_step", None) + context.user_data.pop("batch_service_scope", None) + context.user_data.pop("batch_service_action", None) + wait = await update.message.reply_text(f"收到,开始批量{SERVICE_ACTIONS.get(action, action)}:{value}") + class _Q: + async def edit_message_text(self, *args, **kwargs): + return await wait.reply_text(*args, **kwargs) + await run_batch_service_action(_Q(), scope, value, action) + return + elif step == "batch_readonly_command": + scope = context.user_data.get("batch_custom_scope") + if not scope: + await update.message.reply_text("批量命令上下文丢了,请重新进入自定义只读命令菜单。") + return + if not is_safe_readonly_command(value): + await update.message.reply_text("这条命令不在只读白名单里。\n可用示例:df -h /、free -m、docker ps、systemctl status nginx") + return + context.user_data.pop("add_step", None) + context.user_data.pop("batch_custom_scope", None) + wait = await update.message.reply_text(f"收到,开始批量执行只读命令:{value}") + class _Q: + async def edit_message_text(self, *args, **kwargs): + return await wait.reply_text(*args, **kwargs) + nodes = all_nodes() if scope == "all" else nodes_by_group(scope) + if not nodes: + await wait.reply_text("这个范围下没有节点。") + return + results = await asyncio.gather(*[run_ssh(n, value, timeout=20) for n in nodes], return_exceptions=True) + lines = [f"批量只读命令 | 范围 {esc(scope)}", f"{esc(value)}"] + for node, result in zip(nodes, results): + if isinstance(result, Exception): + lines.append(f"\n🔴 {esc(node.name)}\n{esc(str(result))[:220]}") + else: + code, out, err = result + body = (out or err or "(空输出)")[:320] + status = "🟢" if code == 0 else "🔴" + lines.append(f"\n{status} {esc(node.name)}\n
{esc(body)}
") + await _Q().edit_message_text("\n".join(lines)[:3900], parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("⬅️ 返回自定义命令", callback_data="batchcustom:menu")], + [InlineKeyboardButton("🏠 首页", callback_data="home")], + ])) + return + elif step in {"set_default_password", "set_default_key", "set_default_user", "set_default_port", "set_check_interval", "set_fail_threshold", "set_cpu_alert_threshold", "set_mem_alert_threshold", "set_disk_alert_threshold", "set_expire_remind_days"}: + if step in {"set_default_port", "set_check_interval", "set_fail_threshold", "set_cpu_alert_threshold", "set_mem_alert_threshold", "set_disk_alert_threshold", "set_expire_remind_days"} and not value.isdigit(): + await update.message.reply_text("这里必须是数字,重新发。") + return + if step == "set_default_password": + set_setting("default_password", value) + elif step == "set_default_key": + set_setting("default_key_path", maybe_store_private_key(value, "default")) + elif step == "set_default_user": + set_setting("default_user", value or "root") + elif step == "set_default_port": + set_setting("default_port", value) + elif step == "set_check_interval": + set_setting("check_interval", value) + elif step == "set_fail_threshold": + set_setting("fail_threshold", value) + elif step == "set_cpu_alert_threshold": + set_setting("cpu_alert_threshold", value) + elif step == "set_mem_alert_threshold": + set_setting("mem_alert_threshold", value) + elif step == "set_disk_alert_threshold": + set_setting("disk_alert_threshold", value) + elif step == "set_expire_remind_days": + set_setting("expire_remind_days", value) + context.user_data.pop("add_step", None) + await update.message.reply_text("✅ 设置已更新。", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 首页", callback_data="home")]])) + return + else: + if not value: + await update.message.reply_text("这一项不能为空。") + return + data[step] = value + + if step in ADD_FLOW: + idx = ADD_FLOW.index(step) + if idx == len(ADD_FLOW) - 1: + context.user_data["add_node"] = data + context.user_data["add_step"] = "auth_choice" + await prompt_auth_choice(update.message, data) + return + next_step = ADD_FLOW[idx + 1] + context.user_data["add_node"] = data + context.user_data["add_step"] = next_step + await update.message.reply_text(prompts[next_step], parse_mode=ParseMode.HTML) + + +async def on_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not await guard(update): + return + q = update.callback_query + await q.answer() + data = q.data or "" + + if data == "home": + await start(update, context) + elif data == "nodes:list": + await show_nodes(q) + elif data == "nodes:overview": + await nodes_overview(q) + elif data == "nodes:add": + await begin_add(q, context) + elif data == "groups:list": + await show_groups(q) + elif data.startswith("groups:view:"): + await show_nodes(q, data.split(":", 2)[2]) + elif data == "settings:auth": + await show_auth_settings(q) + elif data == "settings:alerts": + await show_alert_settings(q) + elif data == "billing:summary": + await show_billing_summary(q) + elif data == "billing:expiring": + await show_expiring_nodes(q, 30) + elif data.startswith("billing:expiring:"): + await show_expiring_nodes(q, int(data.split(":")[-1])) + elif data.startswith("settings:set_"): + step = data.split(":", 1)[1] + context.user_data["add_step"] = step + await q.edit_message_text(add_prompts()[step], parse_mode=ParseMode.HTML) + elif data == "settings:toggle_notify_offline": + set_setting("notify_offline", "0" if get_setting("notify_offline") == "1" else "1") + await show_alert_settings(q) + elif data == "settings:toggle_notify_recovery": + set_setting("notify_recovery", "0" if get_setting("notify_recovery") == "1" else "1") + await show_alert_settings(q) + elif data == "monitor:run": + await q.edit_message_text("正在手动巡检一次…") + await monitor_once(context.application) + await q.message.reply_text("✅ 已手动巡检一次。", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔔 返回告警设置", callback_data="settings:alerts")]])) + elif data.startswith("auth:"): + current = context.user_data.get("add_node") + if not current: + await q.edit_message_text("当前没有进行中的添加流程。", reply_markup=main_menu()) + return + defaults = defaults_dict() + choice = data.split(":", 1)[1] + if choice == "default_password": + current["auth_type"] = "password" + current["auth_value"] = defaults["password"] + context.user_data["add_node"] = current + context.user_data["add_step"] = None + await save_node_from_context(update, context) + elif choice == "default_key": + current["auth_type"] = "key" + current["auth_value"] = defaults["key_path"] + context.user_data["add_node"] = current + context.user_data["add_step"] = None + await save_node_from_context(update, context) + elif choice == "custom_password": + context.user_data["add_step"] = "custom_password" + await q.edit_message_text(add_prompts()["custom_password"], parse_mode=ParseMode.HTML) + elif choice == "custom_key": + context.user_data["add_step"] = "custom_key" + await q.edit_message_text(add_prompts()["custom_key"], parse_mode=ParseMode.HTML) + elif data.startswith("node:setgroupto:"): + _, _, rest = data.split(":", 2) + node_id_str, group_name = rest.split(":", 1) + node_id = int(node_id_str) + update_node_group(node_id, group_name) + await q.edit_message_text(f"✅ 分组已改为:{esc(group_name)}", parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("📄 返回节点", callback_data=f"node:view:{node_id}")]])) + elif data.startswith("node:setgroupinput:"): + node_id = int(data.split(":")[-1]) + node = load_node(node_id) + if not node: + await q.answer("节点不存在", show_alert=True) + return + context.user_data["add_step"] = "edit_group_name" + context.user_data["edit_node_id"] = node_id + groups = ', '.join([g for g,_ in list_groups()][:8]) or 'default' + await q.edit_message_text(f"当前节点:{esc(node.name)}\n当前分组:{esc(node.group_name)}\n\n直接发新的分组名即可。\n现有分组:{esc(groups)}", parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回分组选择", callback_data=f"node:setgroup:{node_id}")]])) + elif data.startswith("node:setgroup:"): + node_id = int(data.split(":")[-1]) + node = load_node(node_id) + if not node: + await q.answer("节点不存在", show_alert=True) + return + await q.edit_message_text( + f"当前节点:{esc(node.name)}\n当前分组:{esc(node.group_name)}\n\n选一个现有分组,或者新建分组。", + parse_mode=ParseMode.HTML, + reply_markup=choose_group_buttons(node_id), + ) + elif data.startswith("node:billing:"): + await show_billing_menu(q, int(data.split(":")[-1])) + elif data.startswith("node:cycle:"): + await show_cycle_menu(q, int(data.split(":")[-1])) + elif data.startswith("node:setcycle:"): + _, _, node_id, cycle = data.split(":", 3) + update_node_field(int(node_id), "price_cycle", cycle) + await show_billing_menu(q, int(node_id)) + elif data.startswith("node:currency:"): + await show_currency_menu(q, int(data.split(":")[-1])) + elif data.startswith("node:setcurrency:"): + _, _, node_id, currency = data.split(":", 3) + update_node_field(int(node_id), "price_currency", currency.upper()) + await show_billing_menu(q, int(node_id)) + elif data.startswith("node:edit:"): + await show_edit_menu(q, int(data.split(":")[-1])) + elif data.startswith("node:editfield:"): + _, _, node_id, field = data.split(":", 3) + context.user_data["add_step"] = f"edit_{field}" + context.user_data["edit_node_id"] = int(node_id) + await q.edit_message_text(add_prompts()[f"edit_{field}"], parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回编辑", callback_data=f"node:edit:{node_id}")]])) + elif data.startswith("node:editauth:"): + _, _, node_id, kind = data.split(":", 3) + step = "edit_auth_password" if kind == "password" else "edit_auth_key" + context.user_data["add_step"] = step + context.user_data["edit_node_id"] = int(node_id) + await q.edit_message_text(add_prompts()[step], parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回编辑", callback_data=f"node:edit:{node_id}")]])) + elif data.startswith("node:view:"): + await show_node(q, int(data.split(":")[-1])) + elif data.startswith("node:test:"): + await node_test(q, int(data.split(":")[-1])) + elif data.startswith("node:summary:"): + await node_summary(q, int(data.split(":")[-1])) + elif data.startswith("node:docker:"): + await show_docker(q, int(data.split(":")[-1])) + elif data.startswith("node:delask:"): + await ask_delete(q, int(data.split(":")[-1])) + elif data.startswith("node:del:"): + await delete_node(q, int(data.split(":")[-1])) + elif data == "batch:menu": + await show_batch_menu(q) + elif data == "batch:readonly": + await show_batch_actions(q, "all") + elif data == "batchcustom:menu": + await show_batch_custom_scope(q) + elif data.startswith("batchcustom:scope:"): + scope = data.split(":", 2)[2] + context.user_data["add_step"] = "batch_readonly_command" + context.user_data["batch_custom_scope"] = scope + await q.edit_message_text(f"范围:{esc(scope)}\n\n发我要批量执行的只读命令。\n示例:df -h /free -mdocker pssystemctl status nginx", parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回范围选择", callback_data="batchcustom:menu")]])) + elif data == "batchsvc:menu": + await show_batch_service_scope(q) + elif data.startswith("batchsvc:scope:"): + await show_batch_service_actions(q, data.split(":", 2)[2]) + elif data.startswith("batchsvc:ask:"): + _, _, scope, action = data.split(":", 3) + context.user_data["add_step"] = "batch_service_name" + context.user_data["batch_service_scope"] = scope + context.user_data["batch_service_action"] = action + await q.edit_message_text(f"范围:{esc(scope)}\n动作:{esc(SERVICE_ACTIONS.get(action, action))}\n\n发我要操作的 systemd 服务名。", parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ 返回服务菜单", callback_data=f"batchsvc:scope:{scope}")]])) + elif data.startswith("batch:scope:"): + await show_batch_actions(q, data.split(":", 2)[2]) + elif data.startswith("batch:run:"): + _, _, scope, action = data.split(":", 3) + await run_batch_action(q, scope, action) + elif data.startswith("batch:group:"): + await show_batch_actions(q, data.split(":", 2)[2]) + elif data.startswith("docker:menu:"): + _, _, node_id, container = data.split(":", 3) + await show_docker_menu(q, int(node_id), container) + elif data.startswith("docker:act:"): + _, _, node_id, container, action = data.split(":", 4) + await do_docker_action(q, int(node_id), container, action) + elif data.startswith("docker:logs:"): + _, _, node_id, container = data.split(":", 3) + await show_docker_logs(q, int(node_id), container) + + +async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not await guard(update): + return + context.user_data.pop("add_step", None) + context.user_data.pop("add_node", None) + await update.message.reply_text("已取消当前操作。", reply_markup=ReplyKeyboardRemove()) + await update.message.reply_text("主菜单", reply_markup=main_menu()) + + +async def cmd_nodes(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not await guard(update): + return + rows = all_nodes() + if not rows: + await update.message.reply_text("还没有节点,先 /add。") + return + buttons = [[InlineKeyboardButton(f"🖥️ {r.name} [{r.group_name}]", callback_data=f"node:view:{r.id}")] for r in rows[:50]] + await update.message.reply_text("节点列表", reply_markup=InlineKeyboardMarkup(buttons)) + + +async def cmd_add(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not await guard(update): + return + context.user_data["add_node"] = {} + context.user_data["add_step"] = ADD_FLOW[0] + await update.message.reply_text("开始添加节点\n\n" + add_prompts()[ADD_FLOW[0]], parse_mode=ParseMode.HTML) + + +async def cmd_overview(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not await guard(update): + return + nodes = all_nodes() + if not nodes: + await update.message.reply_text("还没有节点,先 /add。") + return + wait = await update.message.reply_text("正在拉取全部节点概览…") + results = await asyncio.gather(*[collect_node_summary(n) for n in nodes], return_exceptions=True) + lines = ["批量总览"] + ok = 0 + for node, result in zip(nodes, results): + if isinstance(result, Exception): + lines.append(f"\n🔴 {esc(node.name)} [{esc(node.group_name)}]\n{esc(str(result))[:120]}") + else: + ok += 1 + lines.append(f"\n🟢 {esc(node.name)} [{esc(node.group_name)}]\n负载 {esc(result.get('LOAD','-'))} | 内存 {esc(result.get('MEM','-'))}\n磁盘 {esc(result.get('DISK','-'))}") + lines.insert(1, f"在线/可连:{ok}/{len(nodes)}\n") + await wait.reply_text("\n".join(lines)[:3900], parse_mode=ParseMode.HTML) + + +async def cmd_auth(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not await guard(update): + return + d = defaults_dict() + text = ( + "默认认证设置\n\n" + f"默认用户:{esc(d['user'])}\n" + f"默认端口:{d['port']}\n" + f"默认密码:{'已设置' if d['password'] else '未设置'}\n" + f"默认Key:{esc(d['key_path'])}" + ) + await update.message.reply_text(text, parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("改默认密码", callback_data="settings:set_default_password"), InlineKeyboardButton("改默认Key", callback_data="settings:set_default_key")], + [InlineKeyboardButton("改默认用户", callback_data="settings:set_default_user"), InlineKeyboardButton("改默认端口", callback_data="settings:set_default_port")], + ])) + + +async def cmd_alerts(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not await guard(update): + return + await update.message.reply_text("告警设置\n\n" + alert_settings_text(), parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("改巡检间隔", callback_data="settings:set_check_interval"), InlineKeyboardButton("改失败阈值", callback_data="settings:set_fail_threshold")], + [InlineKeyboardButton("CPU 阈值", callback_data="settings:set_cpu_alert_threshold"), InlineKeyboardButton("内存阈值", callback_data="settings:set_mem_alert_threshold")], + [InlineKeyboardButton("磁盘阈值", callback_data="settings:set_disk_alert_threshold"), InlineKeyboardButton("到期提醒天数", callback_data="settings:set_expire_remind_days")], + [InlineKeyboardButton("切换掉线通知", callback_data="settings:toggle_notify_offline"), InlineKeyboardButton("切换恢复通知", callback_data="settings:toggle_notify_recovery")], + [InlineKeyboardButton("立即巡检一次", callback_data="monitor:run")], + ])) + + +async def cmd_billing(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not await guard(update): + return + nodes = all_nodes() + priced = [n for n in nodes if float(n.monthly_price or 0) > 0] + group_rows = monthly_cost_by_group() + upcoming = expiring_nodes(30) + top_nodes = [n for n in top_cost_nodes(8) if float(n.monthly_price or 0) > 0] + lines = ["账单总览"] + lines.append(f"节点:{len(nodes)} | 已填账单:{len(priced)}") + lines.append("总成本:多币种,按节点明细查看") + if group_rows: + lines.append("\n分组覆盖") + for g, c, s in group_rows[:8]: + lines.append(f"• {esc(g)}{c} 台") + if top_nodes: + lines.append("\n账单节点") + for n in top_nodes[:5]: + lines.append(f"• {esc(n.name)} [{esc(n.group_name)}]:{esc(format_price(n))}") + if upcoming: + lines.append("\n30 天内到期") + for n in upcoming[:5]: + lines.append(f"• {esc(n.name)}{esc(format_expiry(n.expires_at))}({esc(days_left_text(n.expires_at))})") + else: + lines.append("\n30 天内没有到期节点。") + await update.message.reply_text("\n".join(lines)[:3900], parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([ + [InlineKeyboardButton("⏰ 即将到期", callback_data="billing:expiring"), InlineKeyboardButton("🔄 刷新", callback_data="billing:summary")], + ])) + + +async def setup_bot(app: Application): + await app.bot.set_my_commands([ + BotCommand("start", "打开主菜单"), + BotCommand("nodes", "节点列表"), + BotCommand("add", "添加节点"), + BotCommand("overview", "批量总览"), + BotCommand("auth", "默认认证设置"), + BotCommand("alerts", "告警设置"), + BotCommand("billing", "账单总览"), + BotCommand("cancel", "取消当前操作"), + ]) + try: + await app.bot.set_chat_menu_button(menu_button=MenuButtonCommands()) + except Exception: + pass + app.bot_data["monitor_task"] = asyncio.create_task(monitor_loop(app)) + + +if __name__ == "__main__": + init_db() + app = Application.builder().token(TOKEN).post_init(setup_bot).build() + app.add_handler(CommandHandler("start", start)) + app.add_handler(CommandHandler("cancel", cancel)) + app.add_handler(CommandHandler("nodes", cmd_nodes)) + app.add_handler(CommandHandler("add", cmd_add)) + app.add_handler(CommandHandler("overview", cmd_overview)) + app.add_handler(CommandHandler("auth", cmd_auth)) + app.add_handler(CommandHandler("alerts", cmd_alerts)) + app.add_handler(CommandHandler("billing", cmd_billing)) + app.add_handler(CallbackQueryHandler(on_callback)) + app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, on_text)) + print("bot starting...") + app.run_polling(allowed_updates=Update.ALL_TYPES) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0cc0a9d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +python-telegram-bot==22.3 +paramiko==3.5.1