Files
TelegramContactBot/bot.py
EchoZenith 819062ae02 Feat/ban user support (#5)
* feat: add /ban command to blacklist users

- Create 'blacklist' table in database to store banned user IDs.
- Implement handle_ban to blacklist users by replying to their messages.
- Add check in handle_message to intercept messages from banned users.

* feat: add /unban command to restore user communication

- Implement handle_unban to remove user IDs from the blacklist.
- Allow admins to restore communication by replying to historic
  messages.

* docs: update README.md
2026-02-01 02:45:30 +08:00

273 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import logging
import aiosqlite
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, MessageHandler, CommandHandler, filters, ContextTypes
# --- 从环境变量读取配置 ---
BOT_TOKEN = os.getenv('BOT_TOKEN')
ADMIN_ID = int(os.getenv('ADMIN_ID', '0'))
DB_FILE = os.getenv('DB_PATH', '/app/data/messages.db')
GITHUB_URL = os.getenv('GITHUB_URL', 'https://github.com/EchoZenith/TelegramContactBot')
# 启用日志
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# 初始化数据库
async def init_db():
async with aiosqlite.connect(DB_FILE) as db:
# 记录管理员端消息ID <-> 用户端消息ID <-> 用户ID
await db.execute('''CREATE TABLE IF NOT EXISTS msg_pairs
(admin_msg_id INTEGER PRIMARY KEY, user_msg_id INTEGER, user_id INTEGER)''')
await db.execute('''CREATE TABLE IF NOT EXISTS blacklist (user_id INTEGER PRIMARY KEY)''')
await db.commit()
# 处理 /start 命令
async def handle_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_chat.id
keyboard = [[InlineKeyboardButton("🌟 查看项目源码", url=GITHUB_URL)]]
reply_markup = InlineKeyboardMarkup(keyboard)
if user_id != ADMIN_ID:
await update.message.reply_text("Hello!\n\nYou can contact us using this bot.", reply_markup=reply_markup)
else:
await update.message.reply_text("你好,管理员!有人给机器人发消息时,我会转发给你。你直接【回复】该消息即可回信。")
# 处理 /ban 命令
async def handle_ban(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_chat.id != ADMIN_ID:
return
msg = update.message
if not msg.reply_to_message:
await msg.reply_text("❌ 请回复一条要封禁的用户消息并输入 /ban")
return
target_id = msg.reply_to_message.message_id
async with aiosqlite.connect(DB_FILE) as db:
# 通过消息 ID 找到对应的用户 ID
async with db.execute("SELECT user_id FROM msg_pairs WHERE admin_msg_id = ?", (target_id,)) as cursor:
row = await cursor.fetchone()
if row:
user_id = row[0]
try:
await db.execute("INSERT OR IGNORE INTO blacklist VALUES (?)", (user_id,))
await db.commit()
await msg.reply_text(f"🚫 用户 `{user_id}` 已被加入黑名单,后续消息将不再转发。", parse_mode='Markdown')
except Exception as e:
logger.error(f"Ban failed: {e}")
else:
await msg.reply_text("⚠️ 找不到该消息的用户记录,无法执行封禁。")
# 处理 /unban 命令
async def handle_unban(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_chat.id != ADMIN_ID:
return
msg = update.message
if not msg.reply_to_message:
await msg.reply_text("❌ 请回复一条要解封的用户消息并输入 /unban")
return
target_id = msg.reply_to_message.message_id
async with aiosqlite.connect(DB_FILE) as db:
# 同样先通过消息 ID 找到对应的用户 ID
async with db.execute("SELECT user_id FROM msg_pairs WHERE admin_msg_id = ?", (target_id,)) as cursor:
row = await cursor.fetchone()
if row:
user_id = row[0]
try:
# 从黑名单表中移除
await db.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
await db.commit()
await msg.reply_text(f"✅ 用户 `{user_id}` 已解封,现在可以正常接收其消息。", parse_mode='Markdown')
except Exception as e:
logger.error(f"Unban failed: {e}")
else:
await msg.reply_text("⚠️ 找不到该消息的用户记录,无法执行解封。")
# 处理 /del 命令
async def handle_delete(update: Update, context: ContextTypes.DEFAULT_TYPE):
# 权限检查:仅限管理员
if update.effective_chat.id != ADMIN_ID:
return
message = update.message
# 必须是回复某条消息才能执行删除
if not message.reply_to_message:
await message.reply_text("请回复一条你想删除的消息并输入 /del")
return
target_id = message.reply_to_message.message_id
async with aiosqlite.connect(DB_FILE) as db:
# 查找这条消息对应的用户ID和用户端消息ID
async with db.execute(
"SELECT user_id, user_msg_id FROM msg_pairs WHERE admin_msg_id = ?",
(target_id,)
) as cursor:
row = await cursor.fetchone()
if row:
user_id, user_msg_id = row
try:
# 1. 删除用户端的消息
await context.bot.delete_message(chat_id=user_id, message_id=user_msg_id)
# 2. 删除管理员端的消息
await context.bot.delete_message(chat_id=ADMIN_ID, message_id=target_id)
# 3. 删除管理员刚才发的 "/del" 命令消息,保持界面整洁
await message.delete()
# 4. 从数据库中移除这条记录
await db.execute("DELETE FROM msg_pairs WHERE admin_msg_id = ?", (target_id,))
await db.commit()
except Exception as e:
logger.error(f"同步删除失败: {e}")
await message.reply_text(f"删除失败可能消息已超过48小时或机器人无权限{e}")
else:
await message.reply_text("未找到该消息的转发记录,无法同步删除。")
# 处理同步修改逻辑
async def handle_edit(update: Update, context: ContextTypes.DEFAULT_TYPE):
edited_msg = update.edited_message
if not edited_msg:
return
user_id = edited_msg.chat.id
async with aiosqlite.connect(DB_FILE) as db:
if user_id != ADMIN_ID:
# 1. 查找旧的转发记录
async with db.execute("SELECT admin_msg_id FROM msg_pairs WHERE user_msg_id = ? AND user_id = ?",
(edited_msg.message_id, user_id)) as cursor:
row = await cursor.fetchone()
if row:
old_admin_msg_id = row[0]
# 获取新内容(文字或媒体说明)
content = edited_msg.text or edited_msg.caption or ""
new_text = f"【此消息已修改】\n{content}"
try:
# 2. 判断是纯文本还是带媒体的消息
if edited_msg.text:
# 发送纯文字消息,并回复在旧消息上
new_msg = await context.bot.send_message(
chat_id=ADMIN_ID,
text=new_text,
reply_to_message_id=old_admin_msg_id
)
else:
# 如果是图片/视频等媒体,复制它并修改其 Caption
new_msg = await context.bot.copy_message(
chat_id=ADMIN_ID,
from_chat_id=user_id,
message_id=edited_msg.message_id,
caption=new_text,
reply_to_message_id=old_admin_msg_id
)
# 3. 更新映射关系,确保管理员回复这条带“【新消息】”提示的消息时,也能回传
await db.execute("INSERT OR REPLACE INTO msg_pairs VALUES (?, ?, ?)",
(new_msg.message_id, edited_msg.message_id, user_id))
await db.commit()
except Exception as e:
logger.error(f"处理用户修改回传失败: {e}")
else:
# 管理员修改逻辑保持不变(原地编辑用户收到的消息)
async with db.execute("SELECT user_id, user_msg_id FROM msg_pairs WHERE admin_msg_id = ?",
(edited_msg.message_id,)) as cursor:
row = await cursor.fetchone()
if row:
try:
new_content = edited_msg.text or edited_msg.caption
await context.bot.edit_message_text(chat_id=row[0], message_id=row[1], text=new_content)
except Exception as e:
logger.warning(f"管理员修改同步略过: {e}")
# 处理所有普通消息
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message: return
user_id = update.effective_chat.id
message = update.message
# 1. 用户 -> 管理员
if user_id != ADMIN_ID:
# 检查黑名单
async with aiosqlite.connect(DB_FILE) as db:
async with db.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,)) as cursor:
if await cursor.fetchone():
return # 如果在黑名单里,直接无视,不执行后续转发
try:
forwarded_msg = await context.bot.forward_message(chat_id=ADMIN_ID, from_chat_id=user_id, message_id=message.message_id)
async with aiosqlite.connect(DB_FILE) as db:
await db.execute("INSERT INTO msg_pairs VALUES (?, ?, ?)",
(forwarded_msg.message_id, message.message_id, user_id))
await db.commit()
except Exception as e: logger.error(f"转发失败: {e}")
# 2. 管理员回复 -> 用户
elif message.reply_to_message:
async with aiosqlite.connect(DB_FILE) as db:
async with db.execute("SELECT user_id FROM msg_pairs WHERE admin_msg_id = ?", (message.reply_to_message.message_id,)) as cursor:
row = await cursor.fetchone()
if row:
target_user_id = row[0]
try:
sent_msg = await context.bot.copy_message(chat_id=target_user_id, from_chat_id=ADMIN_ID, message_id=message.message_id)
# 记录管理员发出的回复,以便管理员后续修改这条回复时能同步给用户
async with aiosqlite.connect(DB_FILE) as db:
await db.execute("INSERT INTO msg_pairs (admin_msg_id, user_msg_id, user_id) VALUES (?, ?, ?)",
(message.message_id, sent_msg.message_id, target_user_id))
await db.commit()
except Exception as e: await message.reply_text(f"回复失败: {e}")
async def post_init(application: Application):
await init_db()
if __name__ == '__main__':
# 检查配置
if not BOT_TOKEN or ADMIN_ID == 0:
print("错误: 请确保环境变量 BOT_TOKEN 和 ADMIN_ID 已正确设置!")
exit(1)
# 构建应用
application = Application.builder().token(BOT_TOKEN).post_init(post_init).build()
# 处理器注册
application.add_handler(CommandHandler("start", handle_start))
# 监听封禁
application.add_handler(CommandHandler("ban", handle_ban))
# 监听解禁
application.add_handler(CommandHandler("unban", handle_unban))
# 监听管理员删除消息
application.add_handler(CommandHandler("del", handle_delete))
# 监听修改消息的更新
application.add_handler(MessageHandler(filters.UpdateType.EDITED_MESSAGE & filters.TEXT, handle_edit))
# 监听普通消息
application.add_handler(MessageHandler(filters.ALL & ~filters.COMMAND, handle_message))
print(f"机器人已启动... 管理员ID: {ADMIN_ID}")
application.run_polling()