init: Telegram sub bot

This commit is contained in:
mango
2026-02-23 00:04:38 +08:00
commit debb0040d0
2 changed files with 521 additions and 0 deletions

65
README.md Normal file
View File

@@ -0,0 +1,65 @@
# Sub Bot — Telegram 群订阅管理机器人
群成员共享的代理订阅管理 Bot支持上传、获取、检测、自动清理。
## 功能
- `/vps` 打开交互菜单(添加/列表/获取/删除/检测)
- 直接发订阅链接自动识别入库(支持多条)
- 支持 ss/vmess/vless/trojan/hy2/tuic 协议 + Surge 格式
- 获取订阅:原始链接 / Base64 / Clash Meta 订阅 URL
- 节点存活检测TCP 连通性)
- 每 6 小时自动清理不可用节点并通知群
- 所有 Bot 消息 60 秒后自动删除
- 内置 HTTP 订阅端点,客户端可直接导入
## 部署
```bash
pip install python-telegram-bot
```
配置环境变量:
```bash
export BOT_TOKEN="your_bot_token"
export ADMIN_ID="your_telegram_id"
export SUB_SECRET="random_secret_string"
export SUB_HOST="your_server_ip:18888"
```
运行:
```bash
python3 bot.py
```
## systemd
```ini
[Unit]
Description=Sub Bot
After=network.target
[Service]
Type=simple
WorkingDirectory=/opt/sub-bot
Environment=BOT_TOKEN=your_token
Environment=ADMIN_ID=your_id
Environment=SUB_SECRET=your_secret
Environment=SUB_HOST=your_ip:18888
ExecStart=/usr/bin/python3 /opt/sub-bot/bot.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
```
## 订阅端点
```
http://your_ip:18888/{SUB_SECRET}/download?target=ClashMeta
http://your_ip:18888/{SUB_SECRET}/download?target=raw
http://your_ip:18888/{SUB_SECRET}/download?target=ClashMeta&type=ss
```

456
bot.py Normal file
View File

@@ -0,0 +1,456 @@
import os, json, time, base64, asyncio, re, urllib.request, logging, threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
log = logging.getLogger(__name__)
TOKEN = os.environ.get('BOT_TOKEN', '')
ADMIN_ID = int(os.environ.get('ADMIN_ID', '0'))
DATA_FILE = os.environ.get('DATA_FILE', '/opt/sub-bot/data.json')
PROTOS = ['ss://', 'vmess://', 'vless://', 'trojan://', 'hysteria2://', 'hy2://', 'tuic://']
AUTO_DEL = 60
SUB_SECRET = os.environ.get('SUB_SECRET', 'change_me_to_random_string')
SUB_HOST = os.environ.get('SUB_HOST', '0.0.0.0:18888')
WAITING_ADD = set() # user_ids waiting to add sub
def load_data():
if os.path.exists(DATA_FILE):
with open(DATA_FILE) as f: return json.load(f)
return {'subs': [], 'groups': []}
def save_data(data):
os.makedirs(os.path.dirname(DATA_FILE), exist_ok=True)
with open(DATA_FILE, 'w') as f: json.dump(data, f, ensure_ascii=False, indent=2)
def detect_type(link):
for p in PROTOS:
if link.startswith(p): return p.replace('://', '')
return 'unknown'
def parse_surge_ss(text):
"""Parse Surge format: Name = ss, server, port, encrypt-method=x, password=y"""
m = re.match(r'^(.+?)\s*=\s*ss\s*,\s*([^,]+)\s*,\s*(\d+)\s*,\s*(.+)$', text.strip())
if not m: return None, None
name = m.group(1).strip()
server = m.group(2).strip()
port = m.group(3).strip()
params = dict(re.findall(r'([\w-]+)\s*=\s*([^,]+)', m.group(4)))
method = params.get('encrypt-method', '').strip()
pwd = params.get('password', '').strip()
if not method or not pwd: return None, None
# build ss:// link
raw = f'{method}:{pwd}@{server}:{port}'
encoded = base64.b64encode(raw.encode()).decode().rstrip('=')
ss_link = f'ss://{encoded}#{urllib.request.quote(name)}'
return ss_link, name
def extract_name(link):
if '#' in link: return urllib.request.unquote(link.split('#')[-1])
return link[:30]
async def auto_del(msg, delay=AUTO_DEL):
await asyncio.sleep(delay)
try: await msg.delete()
except: pass
async def send_del(target, text, delay=AUTO_DEL, **kw):
msg = await target.reply_text(text, **kw)
asyncio.ensure_future(auto_del(msg, delay))
return msg
async def is_member(update, context):
uid = update.effective_user.id
if uid == ADMIN_ID: return True
data = load_data()
groups = data.get('groups', [])
if not groups: return True
for gid in groups:
try:
m = await context.bot.get_chat_member(gid, uid)
if m.status in ('member', 'administrator', 'creator'): return True
except: pass
return False
def add_sub(link, user):
proto = detect_type(link)
if proto == 'unknown': return None
data = load_data()
for s in data['subs']:
if s['link'] == link: return 'dup'
name = extract_name(link)
data['subs'].append({'link': link, 'type': proto, 'name': name,
'added_by': user.id, 'added_name': user.first_name,
'added_at': int(time.time()), 'alive': True})
save_data(data)
return f'✅ 已添加 [{proto}] {name}'
# --- /help: main menu with buttons ---
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await is_member(update, context):
return await update.message.reply_text('⛔ 仅限群成员使用')
buttons = [
[InlineKeyboardButton("📋 订阅列表", callback_data='menu_list'),
InlineKeyboardButton("📥 获取订阅", callback_data='menu_get')],
[InlineKeyboardButton(" 添加订阅", callback_data='menu_add'),
InlineKeyboardButton("🗑 删除订阅", callback_data='menu_del')],
[InlineKeyboardButton("🔍 检测存活", callback_data='menu_check')],
]
if update.effective_user.id == ADMIN_ID:
buttons.append([InlineKeyboardButton("⚙️ 绑定当前群", callback_data='menu_setgroup')])
await send_del(update.message,
'🚀 *订阅管理 Bot*\n\n'
'直接发订阅链接自动入库\n'
'或点击下方按钮操作:',
parse_mode='Markdown', reply_markup=InlineKeyboardMarkup(buttons))
# --- menu callbacks ---
async def cb_menu(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
action = q.data
data = load_data()
if action == 'menu_list':
if not data['subs']:
return await send_del(q.message, '📭 暂无订阅')
lines = [f"`{i+1}` {'🟢' if s.get('alive',True) else '🔴'} [{s['type']}] {s['name']}"
for i, s in enumerate(data['subs'])]
await send_del(q.message, '📋 *订阅列表*\n\n' + '\n'.join(lines), parse_mode='Markdown')
elif action == 'menu_get':
alive = [s for s in data['subs'] if s.get('alive', True)]
if not alive:
return await send_del(q.message, '📭 暂无可用订阅')
types = sorted(set(s['type'] for s in alive))
buttons = [[InlineKeyboardButton(f"📦 全部 ({len(alive)})", callback_data='get_all_raw')]]
for t in types:
cnt = sum(1 for s in alive if s['type'] == t)
buttons.append([
InlineKeyboardButton(f"🔗 {t} ({cnt})", callback_data=f'get_{t}_raw'),
InlineKeyboardButton(f"📄 {t} Base64", callback_data=f'get_{t}_b64')])
buttons.append([InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data='get_all_clash')])
await send_del(q.message, '选择格式:', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_del':
if not data['subs']:
return await send_del(q.message, '📭 暂无订阅')
uid = q.from_user.id
buttons = []
for i, s in enumerate(data['subs']):
if uid != ADMIN_ID and uid != s.get('added_by'): continue
st = '🟢' if s.get('alive', True) else '🔴'
buttons.append([InlineKeyboardButton(f"{st} [{s['type']}] {s['name']}", callback_data=f'del_{i}')])
if not buttons:
return await send_del(q.message, '没有可删除的订阅')
buttons.append([InlineKeyboardButton("🗑 删除所有不可用", callback_data='del_alldown')])
await send_del(q.message, '选择要删除的:', reply_markup=InlineKeyboardMarkup(buttons))
elif action == 'menu_add':
WAITING_ADD.add(q.from_user.id)
await send_del(q.message, '📎 请发送订阅链接(支持多条,空格或换行分隔)\n支持: ss/vmess/vless/trojan/hy2/tuic')
elif action == 'menu_check':
msg = await send_del(q.message, '🔍 检测中...', delay=120)
results = []
for s in data['subs']:
ok = await check_node(s['link'], s['type'])
s['alive'] = ok
results.append(f"{'🟢' if ok else '🔴'} [{s['type']}] {s['name']}")
save_data(data)
try: await msg.delete()
except: pass
await send_del(q.message, '📊 *检测结果*\n\n' + '\n'.join(results), parse_mode='Markdown')
elif action == 'menu_setgroup':
if q.from_user.id != ADMIN_ID:
return await send_del(q.message, '⛔ 仅管理员可用')
cid = q.message.chat.id
if cid not in data.get('groups', []):
data.setdefault('groups', []).append(cid)
save_data(data)
await send_del(q.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown')
# --- get/pick/del callbacks ---
async def cb_getsub(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = load_data()
alive = [s for s in data['subs'] if s.get('alive', True)]
action = q.data
if action.startswith('pick_'):
idx = int(action.split('_')[1])
if idx < len(alive):
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{idx}_raw')],
[InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data=f'fmt_{idx}_clash')],
]
await send_del(q.message, f'📍 {alive[idx]["name"]}\n选择输出格式:',
reply_markup=InlineKeyboardMarkup(buttons))
return
if action.startswith('fmt_'):
parts = action.split('_')
idx, fmt = int(parts[1]), parts[2]
if idx < len(alive):
s = alive[idx]
if fmt == 'raw':
await send_del(q.message, s["link"])
else:
# single node clash meta - generate inline
cm = gen_clash_meta([s])
await send_del(q.message, f'```yaml\n{cm}\n```', parse_mode='Markdown')
return
if action.startswith('send_'):
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
# ask format first
if fmt == 'raw':
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'out_{proto}_raw')],
[InlineKeyboardButton("⚡ Clash Meta", callback_data=f'out_{proto}_clash')],
[InlineKeyboardButton("📄 Base64", callback_data=f'out_{proto}_b64')],
]
return await send_del(q.message, f'📦 全部 ({len(subs)}) — 选择格式:',
reply_markup=InlineKeyboardMarkup(buttons))
if action.startswith('out_'):
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
if fmt == 'clash':
url = f'http://{SUB_HOST}/{SUB_SECRET}/download?target=ClashMeta'
if proto != 'all': url += f'&type={proto}'
return await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}')
links = '\n'.join(s['link'] for s in subs)
if fmt == 'b64':
return await send_del(q.message, f'```\n{base64.b64encode(links.encode()).decode()}\n```', parse_mode='Markdown')
if len(links) > 4000:
for s in subs:
await send_del(q.message, s['link'])
else:
await send_del(q.message, links)
return
if action == 'get_all_clash':
url = f'http://{SUB_HOST}/{SUB_SECRET}/download?target=ClashMeta'
await send_del(q.message, f'📎 Clash Meta 订阅链接:\n{url}')
return
parts = action.split('_')
proto, fmt = parts[1], parts[2]
subs = alive if proto == 'all' else [s for s in alive if s['type'] == proto]
if not subs:
return await send_del(q.message, '📭 无匹配')
if fmt == 'raw' and len(subs) > 1:
buttons = []
for i, s in enumerate(subs):
gi = alive.index(s)
buttons.append([InlineKeyboardButton(
f"{'🟢' if s.get('alive',True) else '🔴'} {s['name']}", callback_data=f'pick_{gi}')])
buttons.append([InlineKeyboardButton(f"📦 全部 ({len(subs)})", callback_data=f'send_{proto}_raw')])
return await send_del(q.message, '选择节点:', reply_markup=InlineKeyboardMarkup(buttons))
if fmt == 'raw' and len(subs) == 1:
buttons = [
[InlineKeyboardButton("🔗 原始链接", callback_data=f'fmt_{alive.index(subs[0])}_raw')],
[InlineKeyboardButton("⚡ Clash Meta 订阅", callback_data=f'fmt_{alive.index(subs[0])}_clash')],
]
return await send_del(q.message, f'📍 {subs[0]["name"]}\n选择输出格式:',
reply_markup=InlineKeyboardMarkup(buttons))
links = '\n'.join(s['link'] for s in subs)
if fmt == 'b64':
await send_del(q.message, base64.b64encode(links.encode()).decode())
else:
await send_del(q.message, links)
async def cb_delsub(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = load_data()
action = q.data
uid = q.from_user.id
if action == 'del_alldown':
dead = [s for s in data['subs'] if not s.get('alive', True)]
if uid != ADMIN_ID:
dead = [s for s in dead if s.get('added_by') == uid]
if not dead:
return await send_del(q.message, '没有不可用节点')
names = [s['name'] for s in dead]
for s in dead: data['subs'].remove(s)
save_data(data)
return await send_del(q.message, f"🗑 已删除 {len(names)} 个:\n" + '\n'.join(f'{n}' for n in names))
idx = int(action.split('_')[1])
if idx >= len(data['subs']):
return await send_del(q.message, '❌ 已失效')
s = data['subs'][idx]
if uid != ADMIN_ID and uid != s.get('added_by'):
return await send_del(q.message, '⛔ 只能删除自己上传的')
removed = data['subs'].pop(idx)
save_data(data)
await send_del(q.message, f"🗑 已删除: {removed['name']}")
# --- auto detect links in messages ---
async def auto_detect(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message or not update.message.text: return
text = update.message.text.strip()
uid = update.effective_user.id
# check standard protocol links
found = [w for w in text.split() if any(w.startswith(p) for p in PROTOS)]
# check Surge format (line-based)
surge_links = []
if not found:
for line in text.split('\n'):
ss_link, name = parse_surge_ss(line)
if ss_link: surge_links.append(ss_link)
if not found and not surge_links:
if uid in WAITING_ADD:
WAITING_ADD.discard(uid)
await send_del(update.message, '❌ 未识别到订阅链接')
return
if not await is_member(update, context): return
WAITING_ADD.discard(uid)
results = []
for link in found + surge_links:
r = add_sub(link, update.effective_user)
if r and r != 'dup': results.append(r)
elif r == 'dup': results.append(f'⚠️ 已存在: {extract_name(link)}')
if results:
await send_del(update.message, '\n'.join(results))
# --- node health check ---
async def check_node(link, proto):
try:
server, port = parse_sp(link, proto)
if not server: return False
_, w = await asyncio.wait_for(asyncio.open_connection(server, int(port)), timeout=5)
w.close(); await w.wait_closed()
return True
except: return False
def parse_sp(link, proto):
try:
if proto == 'ss':
raw = link[5:]
if '#' in raw: raw = raw.split('#')[0]
decoded = base64.b64decode(raw + '==').decode()
_, rest = decoded.split(':', 1)
_, sp = rest.rsplit('@', 1)
return sp.rsplit(':', 1)
m = re.search(r'@([^:/?#]+):(\d+)', link)
if m: return m.group(1), m.group(2)
except: pass
return None, None
def parse_to_clash_proxy(s):
try:
if s['type'] == 'ss':
raw = s['link'][5:]
name = ''
if '#' in raw: raw, name = raw.rsplit('#', 1); name = urllib.request.unquote(name)
decoded = base64.b64decode(raw + '==').decode()
method, rest = decoded.split(':', 1)
pwd, sp = rest.rsplit('@', 1)
server, port = sp.rsplit(':', 1)
return {'name': name or server, 'type': 'ss', 'server': server, 'port': int(port),
'cipher': method, 'password': pwd}
except: pass
return None
def gen_clash_meta(subs):
lines = ['proxies:']
for s in subs:
p = parse_to_clash_proxy(s)
if p: lines.append(f' - {json.dumps(p, ensure_ascii=False)}')
return '\n'.join(lines)
# --- HTTP subscription endpoint ---
class SubHandler(BaseHTTPRequestHandler):
def do_GET(self):
path = self.path.split('?')[0]
if path != f'/{SUB_SECRET}/download':
self.send_response(404)
self.end_headers()
return
data = load_data()
alive = [s for s in data['subs'] if s.get('alive', True)]
qs = {}
if '?' in self.path:
qs = dict(x.split('=',1) for x in self.path.split('?',1)[1].split('&') if '=' in x)
ftype = qs.get('type', '')
if ftype: alive = [s for s in alive if s['type'] == ftype]
target = qs.get('target', 'ClashMeta')
body = gen_clash_meta(alive) if target == 'ClashMeta' else '\n'.join(s['link'] for s in alive)
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(body.encode())
def log_message(self, *a): pass
def start_http():
srv = HTTPServer(('0.0.0.0', 18888), SubHandler)
log.info('Sub HTTP on :18888')
srv.serve_forever()
# --- legacy commands still work ---
async def cmd_setgroup(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != ADMIN_ID:
return await update.message.reply_text('⛔ 仅管理员可用')
cid = update.effective_chat.id
data = load_data()
if cid not in data.get('groups', []):
data.setdefault('groups', []).append(cid)
save_data(data)
await send_del(update.message, f'✅ 已绑定群 `{cid}`', parse_mode='Markdown')
async def auto_cleanup(bot_token):
"""Every 6h: check nodes, auto-delete dead ones, notify group"""
import httpx
await asyncio.sleep(60)
while True:
data = load_data()
if data['subs']:
dead = []
for s in list(data['subs']):
ok = await check_node(s['link'], s['type'])
if not ok:
dead.append(s)
data['subs'].remove(s)
if dead:
save_data(data)
names = '\n'.join(f"• [{s['type']}] {s['name']}" for s in dead)
msg = f'🗑 自动清理 — 已删除 {len(dead)} 个不可用节点:\n\n{names}'
for gid in data.get('groups', []):
try:
async with httpx.AsyncClient() as c:
await c.post(f'https://api.telegram.org/bot{bot_token}/sendMessage',
json={'chat_id': gid, 'text': msg})
except: pass
log.info(f'Auto cleanup: removed {len(dead)} dead nodes')
await asyncio.sleep(21600)
def run_cleanup():
asyncio.run(auto_cleanup(TOKEN))
def main():
app = Application.builder().token(TOKEN).build()
app.add_handler(CommandHandler('start', cmd_help))
app.add_handler(CommandHandler('vps', cmd_help))
app.add_handler(CommandHandler('setgroup', cmd_setgroup))
app.add_handler(CallbackQueryHandler(cb_menu, pattern='^menu_'))
app.add_handler(CallbackQueryHandler(cb_delsub, pattern='^del_'))
app.add_handler(CallbackQueryHandler(cb_getsub, pattern='^(get_|pick_|send_|fmt_|out_)'))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, auto_detect))
log.info('Sub Bot starting polling...')
threading.Thread(target=start_http, daemon=True).start()
threading.Thread(target=run_cleanup, daemon=True).start()
app.run_polling(drop_pending_updates=True)
if __name__ == '__main__':
main()