#!/usr/bin/env python3 """VPS 到期提醒 Telegram Bot""" import os, json, subprocess from datetime import datetime from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, ConversationHandler, filters, ContextTypes from dotenv import load_dotenv load_dotenv() TOKEN = os.getenv("BOT_TOKEN") ADMIN = int(os.getenv("ADMIN_ID", "0")) DATA_FILE = os.path.join(os.path.dirname(__file__), "data.json") def load_data(): try: with open(DATA_FILE) as f: return json.load(f) except: return {"vps": [], "remind_days": [1, 3, 7]} def save_data(data): with open(DATA_FILE, "w") as f: json.dump(data, f, ensure_ascii=False, indent=2) def parse_date(text): """解析日期简写: 0315 -> 2026-03-15, 3-15 -> 2026-03-15""" text = text.strip().replace("/", "-").replace(".", "-") year = datetime.now().year if len(text) == 4 and text.isdigit(): return f"{year}-{text[:2]}-{text[2:]}" if len(text) in [3,4] and "-" in text: parts = text.split("-") return f"{year}-{parts[0].zfill(2)}-{parts[1].zfill(2)}" if len(text) == 10: return text return text def days_left(d): return (datetime.strptime(d, "%Y-%m-%d") - datetime.now()).days def ping_host(ip): try: r = subprocess.run(["ping", "-c", "1", "-W", "2", ip], capture_output=True, timeout=5) return r.returncode == 0 except: return False def calc_next_date(date_str, cycle): """根据周期计算下一个到期日""" from dateutil.relativedelta import relativedelta dt = datetime.strptime(date_str, "%Y-%m-%d") c = cycle.lower().strip() if c in ("月", "monthly", "month", "1m"): return (dt + relativedelta(months=1)).strftime("%Y-%m-%d") elif c in ("季", "quarterly", "quarter", "3m"): return (dt + relativedelta(months=3)).strftime("%Y-%m-%d") elif c in ("半年", "semi", "6m"): return (dt + relativedelta(months=6)).strftime("%Y-%m-%d") elif c in ("年", "yearly", "annual", "12m"): return (dt + relativedelta(years=1)).strftime("%Y-%m-%d") return None def main_kb(): return InlineKeyboardMarkup([ [InlineKeyboardButton("📋 VPS列表", callback_data="list")], [InlineKeyboardButton("➕ 添加", callback_data="add"), InlineKeyboardButton("✏️ 编辑", callback_data="edit"), InlineKeyboardButton("🗑 删除", callback_data="del")], [InlineKeyboardButton("📡 Ping全部", callback_data="ping"), InlineKeyboardButton("🔔 测试", callback_data="test"), InlineKeyboardButton("⚙️ 设置", callback_data="settings")] ]) def back_kb(): return InlineKeyboardMarkup([[InlineKeyboardButton("🔙 返回", callback_data="back")]]) async def start(update: Update, ctx: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("🖥 *VPS 到期提醒*", parse_mode="Markdown", reply_markup=main_kb()) async def button(update: Update, ctx: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() data = q.data if data == "back": await q.edit_message_text("🖥 *VPS 到期提醒*", parse_mode="Markdown", reply_markup=main_kb()) elif data == "list": d = load_data() vps = d.get("vps", []) if not vps: await q.edit_message_text("📭 暂无VPS", reply_markup=back_kb()) return groups = {} for i, v in enumerate(vps): p = v.get("provider", "未知") if p not in groups: groups[p] = [] groups[p].append((i, v)) msg = "📋 *VPS 列表*\n" renew_btns = [] for provider, items in groups.items(): msg += f"\n🏢 *{provider}*\n" for i, v in sorted(items, key=lambda x: days_left(x[1]["date"])): dl = days_left(v["date"]) if dl < 0: icon, dl_text = "⚫", f"已过期{-dl}天" renew_btns.append(InlineKeyboardButton(f"✅ 续费 {v['name']}", callback_data=f"renew_{i}")) elif dl <= 3: icon, dl_text = "🔴", f"{dl}天" renew_btns.append(InlineKeyboardButton(f"✅ 续费 {v['name']}", callback_data=f"renew_{i}")) elif dl <= 7: icon, dl_text = "🟡", f"{dl}天" else: icon, dl_text = "🟢", f"{dl}天" msg += f"{icon} {v['name']} - {v['date']} ({dl_text})\n" kb = [[b] for b in renew_btns] kb.append([InlineKeyboardButton("🔙 返回", callback_data="back")]) await q.edit_message_text(msg, parse_mode="Markdown", reply_markup=InlineKeyboardMarkup(kb)) elif data.startswith("renew_"): idx = int(data.split("_")[1]) d = load_data() v = d["vps"][idx] cycle = v.get("cycle", "") next_date = calc_next_date(v["date"], cycle) kb = [] if next_date: kb.append([InlineKeyboardButton(f"🔄 自动续期 → {next_date}", callback_data=f"renewauto_{idx}")]) kb.append([InlineKeyboardButton("📝 手动输入日期", callback_data=f"renewmanual_{idx}")]) kb.append([InlineKeyboardButton("🔙 返回", callback_data="list")]) msg = f"✅ 续费 *{v['name']}*\n\n当前到期: {v['date']}\n周期: {cycle}" if next_date: msg += f"\n自动续期: {next_date}" await q.edit_message_text(msg, parse_mode="Markdown", reply_markup=InlineKeyboardMarkup(kb)) elif data.startswith("renewauto_"): idx = int(data.split("_")[1]) d = load_data() v = d["vps"][idx] old_date = v["date"] new_date = calc_next_date(v["date"], v.get("cycle", "")) if new_date: v["date"] = new_date save_data(d) await q.edit_message_text(f"✅ *{v['name']}* 已续费\n\n旧日期: {old_date}\n新日期: {new_date}", parse_mode="Markdown", reply_markup=back_kb()) else: await q.edit_message_text("❌ 无法计算,请手动输入", reply_markup=back_kb()) elif data.startswith("renewmanual_"): idx = int(data.split("_")[1]) ctx.user_data["renew_idx"] = idx ctx.user_data["step"] = "renew_date" d = load_data() v = d["vps"][idx] await q.edit_message_text(f"📝 续费 *{v['name']}*\n\n请输入新的到期日期 (如 0401 或 2026-04-01):", parse_mode="Markdown") elif data == "settings": d = load_data() days = d.get("remind_days", [1, 3, 7]) kb = [[InlineKeyboardButton(f"{'✅' if i in days else '⬜'} {i}天", callback_data=f"toggle_{i}") for i in [1, 3, 7]], [InlineKeyboardButton(f"{'✅' if i in days else '⬜'} {i}天", callback_data=f"toggle_{i}") for i in [14, 30]], [InlineKeyboardButton("🔙 返回", callback_data="back")]] await q.edit_message_text(f"⚙️ 提醒天数设置\n当前: {days}", reply_markup=InlineKeyboardMarkup(kb)) elif data.startswith("toggle_"): day = int(data.split("_")[1]) d = load_data() days = d.get("remind_days", [1, 3, 7]) if day in days: days.remove(day) else: days.append(day) days.sort() d["remind_days"] = days save_data(d) kb = [[InlineKeyboardButton(f"{'✅' if i in days else '⬜'} {i}天", callback_data=f"toggle_{i}") for i in [1, 3, 7]], [InlineKeyboardButton(f"{'✅' if i in days else '⬜'} {i}天", callback_data=f"toggle_{i}") for i in [14, 30]], [InlineKeyboardButton("🔙 返回", callback_data="back")]] await q.edit_message_text(f"⚙️ 提醒天数设置\n当前: {days}", reply_markup=InlineKeyboardMarkup(kb)) elif data == "edit": d = load_data() vps = d.get("vps", []) if not vps: await q.edit_message_text("📭 暂无VPS", reply_markup=back_kb()) return kb = [[InlineKeyboardButton(f"✏️ {v['name']}", callback_data=f"editvps_{i}")] for i, v in enumerate(vps)] kb.append([InlineKeyboardButton("🔙 返回", callback_data="back")]) await q.edit_message_text("选择要编辑的VPS:", reply_markup=InlineKeyboardMarkup(kb)) elif data.startswith("editvps_"): idx = int(data.split("_")[1]) ctx.user_data["edit_idx"] = idx d = load_data() v = d["vps"][idx] kb = [ [InlineKeyboardButton("📝 名称", callback_data="ed_name"), InlineKeyboardButton("🏢 商家", callback_data="ed_provider")], [InlineKeyboardButton("🌐 IP", callback_data="ed_ip"), InlineKeyboardButton("📅 日期", callback_data="ed_date")], [InlineKeyboardButton("💰 价格", callback_data="ed_price"), InlineKeyboardButton("🔄 周期", callback_data="ed_cycle")], [InlineKeyboardButton("🔙 返回", callback_data="edit")] ] msg = f"✏️ 编辑 *{v['name']}*\n\n" msg += f"🏢 商家: {v.get('provider','')}\n" msg += f"🌐 IP: {v.get('ip','无')}\n" msg += f"📅 日期: {v.get('date','')}\n" msg += f"💰 价格: {v.get('price','无')}\n" msg += f"🔄 周期: {v.get('cycle','')}" await q.edit_message_text(msg, parse_mode="Markdown", reply_markup=InlineKeyboardMarkup(kb)) elif data.startswith("ed_"): field = data.split("_")[1] ctx.user_data["edit_field"] = field names = {"name":"名称","provider":"商家","ip":"IP","date":"日期","price":"价格","cycle":"周期"} await q.edit_message_text(f"请输入新的{names.get(field,field)}:") elif data == "del": d = load_data() vps = d.get("vps", []) if not vps: await q.edit_message_text("📭 暂无VPS", reply_markup=back_kb()) return kb = [[InlineKeyboardButton(f"🗑 {v['name']}", callback_data=f"delvps_{i}")] for i, v in enumerate(vps)] kb.append([InlineKeyboardButton("🔙 返回", callback_data="back")]) await q.edit_message_text("选择要删除的VPS:", reply_markup=InlineKeyboardMarkup(kb)) elif data.startswith("delvps_"): idx = int(data.split("_")[1]) d = load_data() if 0 <= idx < len(d["vps"]): removed = d["vps"].pop(idx) save_data(d) await q.edit_message_text(f"✅ 已删除: {removed['name']}", reply_markup=back_kb()) elif data == "test": d = load_data() vps = d.get("vps", []) if not vps: await q.edit_message_text("📭 暂无VPS可测试", reply_markup=back_kb()) return v = min(vps, key=lambda x: days_left(x["date"])) dl = days_left(v["date"]) msg = f"🔔 *测试提醒*\n\n{v['name']} ({v['provider']})\n📅 {v['date']} (还剩 {dl} 天)" await q.message.reply_text(msg, parse_mode="Markdown") await q.edit_message_text("✅ 测试提醒已发送", reply_markup=back_kb()) elif data == "ping": d = load_data() vps = [v for v in d.get("vps", []) if v.get("ip")] if not vps: await q.edit_message_text("📭 没有可ping的VPS", reply_markup=back_kb()) return await q.edit_message_text("📡 正在检测...") msg = "📡 *Ping 结果*\n\n" for v in vps: ok = ping_host(v["ip"]) msg += f"{'🟢' if ok else '🔴'} {v['name']} - `{v['ip']}`\n" await q.edit_message_text(msg, parse_mode="Markdown", reply_markup=back_kb()) elif data == "add": ctx.user_data["step"] = "name" await q.edit_message_text("📝 请输入 VPS 名称:") async def handle_msg(update: Update, ctx: ContextTypes.DEFAULT_TYPE): step = ctx.user_data.get("step") text = update.message.text.strip() # 编辑模式 if ctx.user_data.get("edit_field"): idx = ctx.user_data.get("edit_idx", 0) field = ctx.user_data["edit_field"] d = load_data() if field == "date": text = parse_date(text) d["vps"][idx][field] = text save_data(d) ctx.user_data.clear() await update.message.reply_text("✅ 已更新", reply_markup=main_kb()) return # 续费模式 if step == "renew_date": idx = ctx.user_data.get("renew_idx", 0) d = load_data() v = d["vps"][idx] old_date = v["date"] v["date"] = parse_date(text) save_data(d) ctx.user_data.clear() await update.message.reply_text(f"✅ *{v['name']}* 已续费\n旧日期: {old_date}\n新日期: {v['date']}", parse_mode="Markdown", reply_markup=main_kb()) return return if step == "name": ctx.user_data["name"] = text ctx.user_data["step"] = "provider" await update.message.reply_text("🏢 请输入商家名称:") elif step == "provider": ctx.user_data["provider"] = text ctx.user_data["step"] = "ip" await update.message.reply_text("🌐 请输入 IP (没有输入 0):") elif step == "ip": ctx.user_data["ip"] = None if text == "0" else text ctx.user_data["step"] = "cycle" await update.message.reply_text("🔄 付款周期 (月/季/年):") elif step == "cycle": ctx.user_data["cycle"] = text ctx.user_data["step"] = "date" await update.message.reply_text("📅 到期日期 (YYYY-MM-DD):") elif step == "date": ctx.user_data["date"] = text ctx.user_data["step"] = "price" await update.message.reply_text("💰 价格 (如 /月,没有输入 0):") elif step == "price": ctx.user_data["price"] = None if text == "0" else text d = load_data() d["vps"].append({ "name": ctx.user_data["name"], "provider": ctx.user_data["provider"], "ip": ctx.user_data["ip"], "cycle": ctx.user_data["cycle"], "date": ctx.user_data["date"], "price": ctx.user_data["price"] }) save_data(d) ctx.user_data.clear() await update.message.reply_text(f"✅ 已添加: {ctx.user_data.get('name', 'VPS')}", reply_markup=main_kb()) async def check_expire(ctx: ContextTypes.DEFAULT_TYPE): d = load_data() remind_days = d.get("remind_days", [1, 3, 7]) for v in d.get("vps", []): dl = days_left(v["date"]) if dl in remind_days: msg = f"⏰ *到期提醒*\n\n{v['name']} ({v['provider']})\n📅 {v['date']} (还剩 {dl} 天)" await ctx.bot.send_message(ADMIN, msg, parse_mode="Markdown") def main(): app = Application.builder().token(TOKEN).build() app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("help", start)) app.add_handler(CallbackQueryHandler(button)) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_msg)) app.job_queue.run_daily(check_expire, time=datetime.strptime("09:00", "%H:%M").time()) print("Bot 启动") app.run_polling(drop_pending_updates=True) if __name__ == "__main__": main()