Files
sub-bot/bot.py
2026-02-23 00:04:38 +08:00

457 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os, json, time, base64, asyncio, re, urllib.request, logging, threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
log = logging.getLogger(__name__)
TOKEN = os.environ.get('BOT_TOKEN', '')
ADMIN_ID = int(os.environ.get('ADMIN_ID', '0'))
DATA_FILE = os.environ.get('DATA_FILE', '/opt/sub-bot/data.json')
PROTOS = ['ss://', 'vmess://', 'vless://', 'trojan://', 'hysteria2://', 'hy2://', 'tuic://']
AUTO_DEL = 60
SUB_SECRET = os.environ.get('SUB_SECRET', 'change_me_to_random_string')
SUB_HOST = os.environ.get('SUB_HOST', '0.0.0.0:18888')
WAITING_ADD = set() # user_ids waiting to add sub
def load_data():
if os.path.exists(DATA_FILE):
with open(DATA_FILE) as f: return json.load(f)
return {'subs': [], 'groups': []}
def save_data(data):
os.makedirs(os.path.dirname(DATA_FILE), exist_ok=True)
with open(DATA_FILE, 'w') as f: json.dump(data, f, ensure_ascii=False, indent=2)
def detect_type(link):
for p in PROTOS:
if link.startswith(p): return p.replace('://', '')
return 'unknown'
def parse_surge_ss(text):
"""Parse Surge format: Name = ss, server, port, encrypt-method=x, password=y"""
m = re.match(r'^(.+?)\s*=\s*ss\s*,\s*([^,]+)\s*,\s*(\d+)\s*,\s*(.+)$', text.strip())
if not m: return None, None
name = m.group(1).strip()
server = m.group(2).strip()
port = m.group(3).strip()
params = dict(re.findall(r'([\w-]+)\s*=\s*([^,]+)', m.group(4)))
method = params.get('encrypt-method', '').strip()
pwd = params.get('password', '').strip()
if not method or not pwd: return None, None
# build ss:// link
raw = f'{method}:{pwd}@{server}:{port}'
encoded = base64.b64encode(raw.encode()).decode().rstrip('=')
ss_link = f'ss://{encoded}#{urllib.request.quote(name)}'
return ss_link, name
def extract_name(link):
if '#' in link: return urllib.request.unquote(link.split('#')[-1])
return link[:30]
async def auto_del(msg, delay=AUTO_DEL):
await asyncio.sleep(delay)
try: await msg.delete()
except: pass
async def send_del(target, text, delay=AUTO_DEL, **kw):
msg = await target.reply_text(text, **kw)
asyncio.ensure_future(auto_del(msg, delay))
return msg
async def is_member(update, context):
uid = update.effective_user.id
if uid == ADMIN_ID: return True
data = load_data()
groups = data.get('groups', [])
if not groups: return True
for gid in groups:
try:
m = await context.bot.get_chat_member(gid, uid)
if m.status in ('member', 'administrator', 'creator'): return True
except: pass
return False
def add_sub(link, user):
proto = detect_type(link)
if proto == 'unknown': return None
data = load_data()
for s in data['subs']:
if s['link'] == link: return 'dup'
name = extract_name(link)
data['subs'].append({'link': link, 'type': proto, 'name': name,
'added_by': user.id, 'added_name': user.first_name,
'added_at': int(time.time()), 'alive': True})
save_data(data)
return f'✅ 已添加 [{proto}] {name}'
# --- /help: main menu with buttons ---
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await is_member(update, context):
return await update.message.reply_text('⛔ 仅限群成员使用')
buttons = [
[InlineKeyboardButton("📋 订阅列表", callback_data='menu_list'),
InlineKeyboardButton("📥 获取订阅", callback_data='menu_get')],
[InlineKeyboardButton(" 添加订阅", callback_data='menu_add'),
InlineKeyboardButton("🗑 删除订阅", callback_data='menu_del')],
[InlineKeyboardButton("🔍 检测存活", callback_data='menu_check')],
]
if update.effective_user.id == ADMIN_ID:
buttons.append([InlineKeyboardButton("⚙️ 绑定当前群", callback_data='menu_setgroup')])
await send_del(update.message,
'🚀 *订阅管理 Bot*\n\n'
'直接发订阅链接自动入库\n'
'或点击下方按钮操作:',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
# --- menu callbacks ---
async def cb_menu(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
action = q.data
data = load_data()
if action == 'menu_list':
if not data['subs']:
return await send_del(q.message, '📭 暂无订阅')
lines = [f"`{i+1}` {'🟢' if s.get('alive',True) else '🔴'} [{s['type']}] {s['name']}"
for i, s in enumerate(data['subs'])]
await send_del(q.message, '📋 *订阅列表*\n\n' + '\n'.join(lines), parse_mode='Markdown')
elif action == 'menu_get':
alive = [s for s in data['subs'] if s.get('alive', True)]
if not alive:
return await send_del(q.message, '📭 暂无可用订阅')
types = sorted(set(s['type'] for s in alive))
buttons = [[InlineKeyboardButton(f"📦 全部 ({len(alive)})", callback_data='get_all_raw')]]
for t in types:
cnt = sum(1 for s in alive if s['type'] == t)
buttons.append([
InlineKeyboardButton(f"🔗 {t} ({cnt})", callback_data=f'get_{t}_raw'),
InlineKeyboardButton(f"📄 {t} Base64", callback_data=f'get_{t}_b64')])
buttons.append([InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data='get_all_clash')])
await send_del(q.message, '选择格式:', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_del':
if not data['subs']:
return await send_del(q.message, '📭 暂无订阅')
uid = q.from_user.id
buttons = []
for i, s in enumerate(data['subs']):
if uid != ADMIN_ID and uid != s.get('added_by'): continue
st = '🟢' if s.get('alive', True) else '🔴'
buttons.append([InlineKeyboardButton(f"{st} [{s['type']}] {s['name']}", callback_data=f'del_{i}')])
if not buttons:
return await send_del(q.message, '没有可删除的订阅')
buttons.append([InlineKeyboardButton("🗑 删除所有不可用", callback_data='del_alldown')])
await send_del(q.message, '选择要删除的:', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_add':
WAITING_ADD.add(q.from_user.id)
await send_del(q.message, '📎 请发送订阅链接(支持多条,空格或换行分隔)\n支持: ss/vmess/vless/trojan/hy2/tuic')
elif action == 'menu_check':
msg = await send_del(q.message, '🔍 检测中...', delay=120)
results = []
for s in data['subs']:
ok = await check_node(s['link'], s['type'])
s['alive'] = ok
results.append(f"{'🟢' if ok else '🔴'} [{s['type']}] {s['name']}")
save_data(data)
try: await msg.delete()
except: pass
await send_del(q.message, '📊 *检测结果*\n\n' + '\n'.join(results), parse_mode='Markdown')
elif action == 'menu_setgroup':
if q.from_user.id != ADMIN_ID:
return await send_del(q.message, '⛔ 仅管理员可用')
cid = q.message.chat.id
if cid not in data.get('groups', []):
data.setdefault('groups', []).append(cid)
save_data(data)
await send_del(q.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown')
# --- get/pick/del callbacks ---
async def cb_getsub(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = load_data()
alive = [s for s in data['subs'] if s.get('alive', True)]
action = q.data
if action.startswith('pick_'):
idx = int(action.split('_')[1])
if idx < len(alive):
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{idx}_raw')],
[InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data=f'fmt_{idx}_clash')],
]
await send_del(q.message, f'📍 {alive[idx]["name"]}\n选择输出格式:',
reply_markup=InlineKeyboardMarkup(buttons))
return
if action.startswith('fmt_'):
parts = action.split('_')
idx, fmt = int(parts[1]), parts[2]
if idx < len(alive):
s = alive[idx]
if fmt == 'raw':
await send_del(q.message, s["link"])
else:
# single node clash meta - generate inline
cm = gen_clash_meta([s])
await send_del(q.message, f'```yaml\n{cm}\n```', parse_mode='Markdown')
return
if action.startswith('send_'):
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
# ask format first
if fmt == 'raw':
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'out_{proto}_raw')],
[InlineKeyboardButton("⚡ Clash Meta", callback_data=f'out_{proto}_clash')],
[InlineKeyboardButton("📄 Base64", callback_data=f'out_{proto}_b64')],
]
return await send_del(q.message, f'📦 全部 ({len(subs)}) — 选择格式:',
reply_markup=InlineKeyboardMarkup(buttons))
if action.startswith('out_'):
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
if fmt == 'clash':
url = f'http://{SUB_HOST}/{SUB_SECRET}/download?target=ClashMeta'
if proto != 'all': url += f'&type={proto}'
return await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}')
links = '\n'.join(s['link'] for s in subs)
if fmt == 'b64':
return await send_del(q.message, f'```\n{base64.b64encode(links.encode()).decode()}\n```', parse_mode='Markdown')
if len(links) > 4000:
for s in subs:
await send_del(q.message, s['link'])
else:
await send_del(q.message, links)
return
if action == 'get_all_clash':
url = f'http://{SUB_HOST}/{SUB_SECRET}/download?target=ClashMeta'
await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}')
return
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
if not subs:
return await send_del(q.message, '📭 无匹配')
if fmt == 'raw' and len(subs) > 1:
buttons = []
for i, s in enumerate(subs):
gi = alive.index(s)
buttons.append([InlineKeyboardButton(
f"{'🟢' if s.get('alive',True) else '🔴'} {s['name']}", callback_data=f'pick_{gi}')])
buttons.append([InlineKeyboardButton(f"📦 全部 ({len(subs)})", callback_data=f'send_{proto}_raw')])
return await send_del(q.message, '选择节点:', reply_markup=InlineKeyboardMarkup(buttons))
if fmt == 'raw' and len(subs) == 1:
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{alive.index(subs[0])}_raw')],
[InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data=f'fmt_{alive.index(subs[0])}_clash')],
]
return await send_del(q.message, f'📍 {subs[0]["name"]}\n选择输出格式:',
reply_markup=InlineKeyboardMarkup(buttons))
links = '\n'.join(s['link'] for s in subs)
if fmt == 'b64':
await send_del(q.message, base64.b64encode(links.encode()).decode())
else:
await send_del(q.message, links)
async def cb_delsub(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = load_data()
action = q.data
uid = q.from_user.id
if action == 'del_alldown':
dead = [s for s in data['subs'] if not s.get('alive', True)]
if uid != ADMIN_ID:
dead = [s for s in dead if s.get('added_by') == uid]
if not dead:
return await send_del(q.message, '没有不可用节点')
names = [s['name'] for s in dead]
for s in dead: data['subs'].remove(s)
save_data(data)
return await send_del(q.message, f"🗑 已删除 {len(names)} 个:\n" + '\n'.join(f'{n}' for n in names))
idx = int(action.split('_')[1])
if idx >= len(data['subs']):
return await send_del(q.message, '❌ 已失效')
s = data['subs'][idx]
if uid != ADMIN_ID and uid != s.get('added_by'):
return await send_del(q.message, '⛔ 只能删除自己上传的')
removed = data['subs'].pop(idx)
save_data(data)
await send_del(q.message, f"🗑 已删除: {removed['name']}")
# --- auto detect links in messages ---
async def auto_detect(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message or not update.message.text: return
text = update.message.text.strip()
uid = update.effective_user.id
# check standard protocol links
found = [w for w in text.split() if any(w.startswith(p) for p in PROTOS)]
# check Surge format (line-based)
surge_links = []
if not found:
for line in text.split('\n'):
ss_link, name = parse_surge_ss(line)
if ss_link: surge_links.append(ss_link)
if not found and not surge_links:
if uid in WAITING_ADD:
WAITING_ADD.discard(uid)
await send_del(update.message, '❌ 未识别到订阅链接')
return
if not await is_member(update, context): return
WAITING_ADD.discard(uid)
results = []
for link in found + surge_links:
r = add_sub(link, update.effective_user)
if r and r != 'dup': results.append(r)
elif r == 'dup': results.append(f'⚠️ 已存在: {extract_name(link)}')
if results:
await send_del(update.message, '\n'.join(results))
# --- node health check ---
async def check_node(link, proto):
try:
server, port = parse_sp(link, proto)
if not server: return False
_, w = await asyncio.wait_for(asyncio.open_connection(server, int(port)), timeout=5)
w.close(); await w.wait_closed()
return True
except: return False
def parse_sp(link, proto):
try:
if proto == 'ss':
raw = link[5:]
if '#' in raw: raw = raw.split('#')[0]
decoded = base64.b64decode(raw + '==').decode()
_, rest = decoded.split(':', 1)
_, sp = rest.rsplit('@', 1)
return sp.rsplit(':', 1)
m = re.search(r'@([^:/?#]+):(\d+)', link)
if m: return m.group(1), m.group(2)
except: pass
return None, None
def parse_to_clash_proxy(s):
try:
if s['type'] == 'ss':
raw = s['link'][5:]
name = ''
if '#' in raw: raw, name = raw.rsplit('#', 1); name = urllib.request.unquote(name)
decoded = base64.b64decode(raw + '==').decode()
method, rest = decoded.split(':', 1)
pwd, sp = rest.rsplit('@', 1)
server, port = sp.rsplit(':', 1)
return {'name': name or server, 'type': 'ss', 'server': server, 'port': int(port),
'cipher': method, 'password': pwd}
except: pass
return None
def gen_clash_meta(subs):
lines = ['proxies:']
for s in subs:
p = parse_to_clash_proxy(s)
if p: lines.append(f' - {json.dumps(p, ensure_ascii=False)}')
return '\n'.join(lines)
# --- HTTP subscription endpoint ---
class SubHandler(BaseHTTPRequestHandler):
def do_GET(self):
path = self.path.split('?')[0]
if path != f'/{SUB_SECRET}/download':
self.send_response(404)
self.end_headers()
return
data = load_data()
alive = [s for s in data['subs'] if s.get('alive', True)]
qs = {}
if '?' in self.path:
qs = dict(x.split('=',1) for x in self.path.split('?',1)[1].split('&') if '=' in x)
ftype = qs.get('type', '')
if ftype: alive = [s for s in alive if s['type'] == ftype]
target = qs.get('target', 'ClashMeta')
body = gen_clash_meta(alive) if target == 'ClashMeta' else '\n'.join(s['link'] for s in alive)
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(body.encode())
def log_message(self, *a): pass
def start_http():
srv = HTTPServer(('0.0.0.0', 18888), SubHandler)
log.info('Sub HTTP on :18888')
srv.serve_forever()
# --- legacy commands still work ---
async def cmd_setgroup(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != ADMIN_ID:
return await update.message.reply_text('⛔ 仅管理员可用')
cid = update.effective_chat.id
data = load_data()
if cid not in data.get('groups', []):
data.setdefault('groups', []).append(cid)
save_data(data)
await send_del(update.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown')
async def auto_cleanup(bot_token):
"""Every 6h: check nodes, auto-delete dead ones, notify group"""
import httpx
await asyncio.sleep(60)
while True:
data = load_data()
if data['subs']:
dead = []
for s in list(data['subs']):
ok = await check_node(s['link'], s['type'])
if not ok:
dead.append(s)
data['subs'].remove(s)
if dead:
save_data(data)
names = '\n'.join(f"• [{s['type']}] {s['name']}" for s in dead)
msg = f'🗑 自动清理 — 已删除 {len(dead)} 个不可用节点:\n\n{names}'
for gid in data.get('groups', []):
try:
async with httpx.AsyncClient() as c:
await c.post(f'https://api.telegram.org/bot{bot_token}/sendMessage',
json={'chat_id': gid, 'text': msg})
except: pass
log.info(f'Auto cleanup: removed {len(dead)} dead nodes')
await asyncio.sleep(21600)
def run_cleanup():
asyncio.run(auto_cleanup(TOKEN))
def main():
app = Application.builder().token(TOKEN).build()
app.add_handler(CommandHandler('start', cmd_help))
app.add_handler(CommandHandler('vps', cmd_help))
app.add_handler(CommandHandler('setgroup', cmd_setgroup))
app.add_handler(CallbackQueryHandler(cb_menu, pattern='^menu_'))
app.add_handler(CallbackQueryHandler(cb_delsub, pattern='^del_'))
app.add_handler(CallbackQueryHandler(cb_getsub, pattern='^(get_|pick_|send_|fmt_|out_)'))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, auto_detect))
log.info('Sub Bot starting polling...')
threading.Thread(target=start_http, daemon=True).start()
threading.Thread(target=run_cleanup, daemon=True).start()
app.run_polling(drop_pending_updates=True)
if __name__ == '__main__':
main()