import os, json, time, base64, asyncio, re, urllib.request, logging, threading from http.server import HTTPServer, BaseHTTPRequestHandler from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') log = logging.getLogger(__name__) TOKEN = os.environ.get('BOT_TOKEN', '') ADMIN_ID = int(os.environ.get('ADMIN_ID', '0')) DATA_FILE = os.environ.get('DATA_FILE', '/opt/sub-bot/data.json') PROTOS = ['ss://', 'vmess://', 'vless://', 'trojan://', 'hysteria2://', 'hy2://', 'tuic://'] AUTO_DEL = 60 SUB_SECRET = os.environ.get('SUB_SECRET', 'change_me_to_random_string') SUB_HOST = os.environ.get('SUB_HOST', '0.0.0.0:18888') WAITING_ADD = set() # user_ids waiting to add sub def load_data(): if os.path.exists(DATA_FILE): with open(DATA_FILE) as f: return json.load(f) return {'subs': [], 'groups': []} def save_data(data): os.makedirs(os.path.dirname(DATA_FILE), exist_ok=True) with open(DATA_FILE, 'w') as f: json.dump(data, f, ensure_ascii=False, indent=2) def detect_type(link): for p in PROTOS: if link.startswith(p): return p.replace('://', '') return 'unknown' def parse_surge_ss(text): """Parse Surge format: Name = ss, server, port, encrypt-method=x, password=y""" m = re.match(r'^(.+?)\s*=\s*ss\s*,\s*([^,]+)\s*,\s*(\d+)\s*,\s*(.+)$', text.strip()) if not m: return None, None name = m.group(1).strip() server = m.group(2).strip() port = m.group(3).strip() params = dict(re.findall(r'([\w-]+)\s*=\s*([^,]+)', m.group(4))) method = params.get('encrypt-method', '').strip() pwd = params.get('password', '').strip() if not method or not pwd: return None, None # build ss:// link raw = f'{method}:{pwd}@{server}:{port}' encoded = base64.b64encode(raw.encode()).decode().rstrip('=') ss_link = f'ss://{encoded}#{urllib.request.quote(name)}' return ss_link, name def extract_name(link): if '#' in link: return urllib.request.unquote(link.split('#')[-1]) return link[:30] async def auto_del(msg, delay=AUTO_DEL): await asyncio.sleep(delay) try: await msg.delete() except: pass async def send_del(target, text, delay=AUTO_DEL, **kw): msg = await target.reply_text(text, **kw) asyncio.ensure_future(auto_del(msg, delay)) return msg async def is_member(update, context): uid = update.effective_user.id if uid == ADMIN_ID: return True data = load_data() groups = data.get('groups', []) if not groups: return True for gid in groups: try: m = await context.bot.get_chat_member(gid, uid) if m.status in ('member', 'administrator', 'creator'): return True except: pass return False def add_sub(link, user): proto = detect_type(link) if proto == 'unknown': return None data = load_data() for s in data['subs']: if s['link'] == link: return 'dup' name = extract_name(link) data['subs'].append({'link': link, 'type': proto, 'name': name, 'added_by': user.id, 'added_name': user.first_name, 'added_at': int(time.time()), 'alive': True}) save_data(data) return f'✅ 已添加 [{proto}] {name}' # --- /help: main menu with buttons --- async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE): if not await is_member(update, context): return await update.message.reply_text('⛔ 仅限群成员使用') buttons = [ [InlineKeyboardButton("📋 订阅列表", callback_data='menu_list'), InlineKeyboardButton("📥 获取订阅", callback_data='menu_get')], [InlineKeyboardButton("➕ 添加订阅", callback_data='menu_add'), InlineKeyboardButton("🗑 删除订阅", callback_data='menu_del')], [InlineKeyboardButton("🔍 检测存活", callback_data='menu_check')], ] if update.effective_user.id == ADMIN_ID: buttons.append([InlineKeyboardButton("⚙️ 绑定当前群", callback_data='menu_setgroup')]) await send_del(update.message, '🚀 *订阅管理 Bot*\n\n' '直接发订阅链接自动入库\n' '或点击下方按钮操作:', parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons)) # --- menu callbacks --- async def cb_menu(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() action = q.data data = load_data() if action == 'menu_list': if not data['subs']: return await send_del(q.message, '📭 暂无订阅') lines = [f"`{i+1}` {'🟢' if s.get('alive',True) else '🔴'} [{s['type']}] {s['name']}" for i, s in enumerate(data['subs'])] await send_del(q.message, '📋 *订阅列表*\n\n' + '\n'.join(lines), parse_mode='Markdown') elif action == 'menu_get': alive = [s for s in data['subs'] if s.get('alive', True)] if not alive: return await send_del(q.message, '📭 暂无可用订阅') types = sorted(set(s['type'] for s in alive)) buttons = [[InlineKeyboardButton(f"📦 全部 ({len(alive)})", callback_data='get_all_raw')]] for t in types: cnt = sum(1 for s in alive if s['type'] == t) buttons.append([ InlineKeyboardButton(f"🔗 {t} ({cnt})", callback_data=f'get_{t}_raw'), InlineKeyboardButton(f"📄 {t} Base64", callback_data=f'get_{t}_b64')]) buttons.append([InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data='get_all_clash')]) await send_del(q.message, '选择格式:', reply_markup=InlineKeyboardMarkup(buttons)) elif action == 'menu_del': if not data['subs']: return await send_del(q.message, '📭 暂无订阅') uid = q.from_user.id buttons = [] for i, s in enumerate(data['subs']): if uid != ADMIN_ID and uid != s.get('added_by'): continue st = '🟢' if s.get('alive', True) else '🔴' buttons.append([InlineKeyboardButton(f"{st} [{s['type']}] {s['name']}", callback_data=f'del_{i}')]) if not buttons: return await send_del(q.message, '没有可删除的订阅') buttons.append([InlineKeyboardButton("🗑 删除所有不可用", callback_data='del_alldown')]) await send_del(q.message, '选择要删除的:', reply_markup=InlineKeyboardMarkup(buttons)) elif action == 'menu_add': WAITING_ADD.add(q.from_user.id) await send_del(q.message, '📎 请发送订阅链接(支持多条,空格或换行分隔)\n支持: ss/vmess/vless/trojan/hy2/tuic') elif action == 'menu_check': msg = await send_del(q.message, '🔍 检测中...', delay=120) results = [] for s in data['subs']: ok = await check_node(s['link'], s['type']) s['alive'] = ok results.append(f"{'🟢' if ok else '🔴'} [{s['type']}] {s['name']}") save_data(data) try: await msg.delete() except: pass await send_del(q.message, '📊 *检测结果*\n\n' + '\n'.join(results), parse_mode='Markdown') elif action == 'menu_setgroup': if q.from_user.id != ADMIN_ID: return await send_del(q.message, '⛔ 仅管理员可用') cid = q.message.chat.id if cid not in data.get('groups', []): data.setdefault('groups', []).append(cid) save_data(data) await send_del(q.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown') # --- get/pick/del callbacks --- async def cb_getsub(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() data = load_data() alive = [s for s in data['subs'] if s.get('alive', True)] action = q.data if action.startswith('pick_'): idx = int(action.split('_')[1]) if idx < len(alive): buttons = [ [InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{idx}_raw')], [InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data=f'fmt_{idx}_clash')], ] await send_del(q.message, f'📍 {alive[idx]["name"]}\n选择输出格式:', reply_markup=InlineKeyboardMarkup(buttons)) return if action.startswith('fmt_'): parts = action.split('_') idx, fmt = int(parts[1]), parts[2] if idx < len(alive): s = alive[idx] if fmt == 'raw': await send_del(q.message, s["link"]) else: # single node clash meta - generate inline cm = gen_clash_meta([s]) await send_del(q.message, f'```yaml\n{cm}\n```', parse_mode='Markdown') return if action.startswith('send_'): parts = action.split('_') proto, fmt = parts[1], parts[2] subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto] # ask format first if fmt == 'raw': buttons = [ [InlineKeyboardButton("🔗 原始链接", callback_data=f'out_{proto}_raw')], [InlineKeyboardButton("⚡ Clash Meta", callback_data=f'out_{proto}_clash')], [InlineKeyboardButton("📄 Base64", callback_data=f'out_{proto}_b64')], ] return await send_del(q.message, f'📦 全部 ({len(subs)}) — 选择格式:', reply_markup=InlineKeyboardMarkup(buttons)) if action.startswith('out_'): parts = action.split('_') proto, fmt = parts[1], parts[2] subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto] if fmt == 'clash': url = f'http://{SUB_HOST}/{SUB_SECRET}/download?target=ClashMeta' if proto != 'all': url += f'&type={proto}' return await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}') links = '\n'.join(s['link'] for s in subs) if fmt == 'b64': return await send_del(q.message, f'```\n{base64.b64encode(links.encode()).decode()}\n```', parse_mode='Markdown') if len(links) > 4000: for s in subs: await send_del(q.message, s['link']) else: await send_del(q.message, links) return if action == 'get_all_clash': url = f'http://{SUB_HOST}/{SUB_SECRET}/download?target=ClashMeta' await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}') return parts = action.split('_') proto, fmt = parts[1], parts[2] subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto] if not subs: return await send_del(q.message, '📭 无匹配') if fmt == 'raw' and len(subs) > 1: buttons = [] for i, s in enumerate(subs): gi = alive.index(s) buttons.append([InlineKeyboardButton( f"{'🟢' if s.get('alive',True) else '🔴'} {s['name']}", callback_data=f'pick_{gi}')]) buttons.append([InlineKeyboardButton(f"📦 全部 ({len(subs)})", callback_data=f'send_{proto}_raw')]) return await send_del(q.message, '选择节点:', reply_markup=InlineKeyboardMarkup(buttons)) if fmt == 'raw' and len(subs) == 1: buttons = [ [InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{alive.index(subs[0])}_raw')], [InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data=f'fmt_{alive.index(subs[0])}_clash')], ] return await send_del(q.message, f'📍 {subs[0]["name"]}\n选择输出格式:', reply_markup=InlineKeyboardMarkup(buttons)) links = '\n'.join(s['link'] for s in subs) if fmt == 'b64': await send_del(q.message, base64.b64encode(links.encode()).decode()) else: await send_del(q.message, links) async def cb_delsub(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() data = load_data() action = q.data uid = q.from_user.id if action == 'del_alldown': dead = [s for s in data['subs'] if not s.get('alive', True)] if uid != ADMIN_ID: dead = [s for s in dead if s.get('added_by') == uid] if not dead: return await send_del(q.message, '没有不可用节点') names = [s['name'] for s in dead] for s in dead: data['subs'].remove(s) save_data(data) return await send_del(q.message, f"🗑 已删除 {len(names)} 个:\n" + '\n'.join(f'• {n}' for n in names)) idx = int(action.split('_')[1]) if idx >= len(data['subs']): return await send_del(q.message, '❌ 已失效') s = data['subs'][idx] if uid != ADMIN_ID and uid != s.get('added_by'): return await send_del(q.message, '⛔ 只能删除自己上传的') removed = data['subs'].pop(idx) save_data(data) await send_del(q.message, f"🗑 已删除: {removed['name']}") # --- auto detect links in messages --- async def auto_detect(update: Update, context: ContextTypes.DEFAULT_TYPE): if not update.message or not update.message.text: return text = update.message.text.strip() uid = update.effective_user.id # check standard protocol links found = [w for w in text.split() if any(w.startswith(p) for p in PROTOS)] # check Surge format (line-based) surge_links = [] if not found: for line in text.split('\n'): ss_link, name = parse_surge_ss(line) if ss_link: surge_links.append(ss_link) if not found and not surge_links: if uid in WAITING_ADD: WAITING_ADD.discard(uid) await send_del(update.message, '❌ 未识别到订阅链接') return if not await is_member(update, context): return WAITING_ADD.discard(uid) results = [] for link in found + surge_links: r = add_sub(link, update.effective_user) if r and r != 'dup': results.append(r) elif r == 'dup': results.append(f'⚠️ 已存在: {extract_name(link)}') if results: await send_del(update.message, '\n'.join(results)) # --- node health check --- async def check_node(link, proto): try: server, port = parse_sp(link, proto) if not server: return False _, w = await asyncio.wait_for(asyncio.open_connection(server, int(port)), timeout=5) w.close(); await w.wait_closed() return True except: return False def parse_sp(link, proto): try: if proto == 'ss': raw = link[5:] if '#' in raw: raw = raw.split('#')[0] decoded = base64.b64decode(raw + '==').decode() _, rest = decoded.split(':', 1) _, sp = rest.rsplit('@', 1) return sp.rsplit(':', 1) m = re.search(r'@([^:/?#]+):(\d+)', link) if m: return m.group(1), m.group(2) except: pass return None, None def parse_to_clash_proxy(s): try: if s['type'] == 'ss': raw = s['link'][5:] name = '' if '#' in raw: raw, name = raw.rsplit('#', 1); name = urllib.request.unquote(name) decoded = base64.b64decode(raw + '==').decode() method, rest = decoded.split(':', 1) pwd, sp = rest.rsplit('@', 1) server, port = sp.rsplit(':', 1) return {'name': name or server, 'type': 'ss', 'server': server, 'port': int(port), 'cipher': method, 'password': pwd} except: pass return None def gen_clash_meta(subs): lines = ['proxies:'] for s in subs: p = parse_to_clash_proxy(s) if p: lines.append(f' - {json.dumps(p, ensure_ascii=False)}') return '\n'.join(lines) # --- HTTP subscription endpoint --- class SubHandler(BaseHTTPRequestHandler): def do_GET(self): path = self.path.split('?')[0] if path != f'/{SUB_SECRET}/download': self.send_response(404) self.end_headers() return data = load_data() alive = [s for s in data['subs'] if s.get('alive', True)] qs = {} if '?' in self.path: qs = dict(x.split('=',1) for x in self.path.split('?',1)[1].split('&') if '=' in x) ftype = qs.get('type', '') if ftype: alive = [s for s in alive if s['type'] == ftype] target = qs.get('target', 'ClashMeta') body = gen_clash_meta(alive) if target == 'ClashMeta' else '\n'.join(s['link'] for s in alive) self.send_response(200) self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write(body.encode()) def log_message(self, *a): pass def start_http(): srv = HTTPServer(('0.0.0.0', 18888), SubHandler) log.info('Sub HTTP on :18888') srv.serve_forever() # --- legacy commands still work --- async def cmd_setgroup(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id != ADMIN_ID: return await update.message.reply_text('⛔ 仅管理员可用') cid = update.effective_chat.id data = load_data() if cid not in data.get('groups', []): data.setdefault('groups', []).append(cid) save_data(data) await send_del(update.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown') async def auto_cleanup(bot_token): """Every 6h: check nodes, auto-delete dead ones, notify group""" import httpx await asyncio.sleep(60) while True: data = load_data() if data['subs']: dead = [] for s in list(data['subs']): ok = await check_node(s['link'], s['type']) if not ok: dead.append(s) data['subs'].remove(s) if dead: save_data(data) names = '\n'.join(f"• [{s['type']}] {s['name']}" for s in dead) msg = f'🗑 自动清理 — 已删除 {len(dead)} 个不可用节点:\n\n{names}' for gid in data.get('groups', []): try: async with httpx.AsyncClient() as c: await c.post(f'https://api.telegram.org/bot{bot_token}/sendMessage', json={'chat_id': gid, 'text': msg}) except: pass log.info(f'Auto cleanup: removed {len(dead)} dead nodes') await asyncio.sleep(21600) def run_cleanup(): asyncio.run(auto_cleanup(TOKEN)) def main(): app = Application.builder().token(TOKEN).build() app.add_handler(CommandHandler('start', cmd_help)) app.add_handler(CommandHandler('vps', cmd_help)) app.add_handler(CommandHandler('setgroup', cmd_setgroup)) app.add_handler(CallbackQueryHandler(cb_menu, pattern='^menu_')) app.add_handler(CallbackQueryHandler(cb_delsub, pattern='^del_')) app.add_handler(CallbackQueryHandler(cb_getsub, pattern='^(get_|pick_|send_|fmt_|out_)')) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, auto_detect)) log.info('Sub Bot starting polling...') threading.Thread(target=start_http, daemon=True).start() threading.Thread(target=run_cleanup, daemon=True).start() app.run_polling(drop_pending_updates=True) if __name__ == '__main__': main()