import os import logging import aiosqlite from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, MessageHandler, CommandHandler, filters, ContextTypes # --- 从环境变量读取配置 --- BOT_TOKEN = os.getenv('BOT_TOKEN') ADMIN_ID = int(os.getenv('ADMIN_ID', '0')) DB_FILE = os.getenv('DB_PATH', '/app/data/messages.db') GITHUB_URL = os.getenv('GITHUB_URL', 'https://github.com/EchoZenith/TelegramContactBot') # 启用日志 logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # 初始化数据库 async def init_db(): async with aiosqlite.connect(DB_FILE) as db: # 记录:管理员端消息ID <-> 用户端消息ID <-> 用户ID await db.execute('''CREATE TABLE IF NOT EXISTS msg_pairs (admin_msg_id INTEGER PRIMARY KEY, user_msg_id INTEGER, user_id INTEGER)''') await db.execute('''CREATE TABLE IF NOT EXISTS blacklist (user_id INTEGER PRIMARY KEY)''') await db.commit() # 处理 /start 命令 async def handle_start(update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = update.effective_chat.id keyboard = [[InlineKeyboardButton("🌟 查看项目源码", url=GITHUB_URL)]] reply_markup = InlineKeyboardMarkup(keyboard) if user_id != ADMIN_ID: await update.message.reply_text("Hello!\n\nYou can contact us using this bot.", reply_markup=reply_markup) else: await update.message.reply_text("你好,管理员!有人给机器人发消息时,我会转发给你。你直接【回复】该消息即可回信。") # 处理 /ban 命令 async def handle_ban(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_chat.id != ADMIN_ID: return msg = update.message if not msg.reply_to_message: await msg.reply_text("❌ 请回复一条要封禁的用户消息并输入 /ban") return target_id = msg.reply_to_message.message_id async with aiosqlite.connect(DB_FILE) as db: # 通过消息 ID 找到对应的用户 ID async with db.execute("SELECT user_id FROM msg_pairs WHERE admin_msg_id = ?", (target_id,)) as cursor: row = await cursor.fetchone() if row: user_id = row[0] try: await db.execute("INSERT OR IGNORE INTO blacklist VALUES (?)", (user_id,)) await db.commit() await msg.reply_text(f"🚫 用户 `{user_id}` 已被加入黑名单,后续消息将不再转发。", parse_mode='Markdown') except Exception as e: logger.error(f"Ban failed: {e}") else: await msg.reply_text("⚠️ 找不到该消息的用户记录,无法执行封禁。") # 处理 /unban 命令 async def handle_unban(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_chat.id != ADMIN_ID: return msg = update.message if not msg.reply_to_message: await msg.reply_text("❌ 请回复一条要解封的用户消息并输入 /unban") return target_id = msg.reply_to_message.message_id async with aiosqlite.connect(DB_FILE) as db: # 同样先通过消息 ID 找到对应的用户 ID async with db.execute("SELECT user_id FROM msg_pairs WHERE admin_msg_id = ?", (target_id,)) as cursor: row = await cursor.fetchone() if row: user_id = row[0] try: # 从黑名单表中移除 await db.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,)) await db.commit() await msg.reply_text(f"✅ 用户 `{user_id}` 已解封,现在可以正常接收其消息。", parse_mode='Markdown') except Exception as e: logger.error(f"Unban failed: {e}") else: await msg.reply_text("⚠️ 找不到该消息的用户记录,无法执行解封。") # 处理 /del 命令 async def handle_delete(update: Update, context: ContextTypes.DEFAULT_TYPE): # 权限检查:仅限管理员 if update.effective_chat.id != ADMIN_ID: return message = update.message # 必须是回复某条消息才能执行删除 if not message.reply_to_message: await message.reply_text("请回复一条你想删除的消息并输入 /del") return target_id = message.reply_to_message.message_id async with aiosqlite.connect(DB_FILE) as db: # 查找这条消息对应的用户ID和用户端消息ID async with db.execute( "SELECT user_id, user_msg_id FROM msg_pairs WHERE admin_msg_id = ?", (target_id,) ) as cursor: row = await cursor.fetchone() if row: user_id, user_msg_id = row try: # 1. 删除用户端的消息 await context.bot.delete_message(chat_id=user_id, message_id=user_msg_id) # 2. 删除管理员端的消息 await context.bot.delete_message(chat_id=ADMIN_ID, message_id=target_id) # 3. 删除管理员刚才发的 "/del" 命令消息,保持界面整洁 await message.delete() # 4. 从数据库中移除这条记录 await db.execute("DELETE FROM msg_pairs WHERE admin_msg_id = ?", (target_id,)) await db.commit() except Exception as e: logger.error(f"同步删除失败: {e}") await message.reply_text(f"删除失败,可能消息已超过48小时或机器人无权限:{e}") else: await message.reply_text("未找到该消息的转发记录,无法同步删除。") # 处理同步修改逻辑 async def handle_edit(update: Update, context: ContextTypes.DEFAULT_TYPE): edited_msg = update.edited_message if not edited_msg: return user_id = edited_msg.chat.id async with aiosqlite.connect(DB_FILE) as db: if user_id != ADMIN_ID: # 1. 查找旧的转发记录 async with db.execute("SELECT admin_msg_id FROM msg_pairs WHERE user_msg_id = ? AND user_id = ?", (edited_msg.message_id, user_id)) as cursor: row = await cursor.fetchone() if row: old_admin_msg_id = row[0] # 获取新内容(文字或媒体说明) content = edited_msg.text or edited_msg.caption or "" new_text = f"【此消息已修改】\n{content}" try: # 2. 判断是纯文本还是带媒体的消息 if edited_msg.text: # 发送纯文字消息,并回复在旧消息上 new_msg = await context.bot.send_message( chat_id=ADMIN_ID, text=new_text, reply_to_message_id=old_admin_msg_id ) else: # 如果是图片/视频等媒体,复制它并修改其 Caption new_msg = await context.bot.copy_message( chat_id=ADMIN_ID, from_chat_id=user_id, message_id=edited_msg.message_id, caption=new_text, reply_to_message_id=old_admin_msg_id ) # 3. 更新映射关系,确保管理员回复这条带“【新消息】”提示的消息时,也能回传 await db.execute("INSERT OR REPLACE INTO msg_pairs VALUES (?, ?, ?)", (new_msg.message_id, edited_msg.message_id, user_id)) await db.commit() except Exception as e: logger.error(f"处理用户修改回传失败: {e}") else: # 管理员修改逻辑保持不变(原地编辑用户收到的消息) async with db.execute("SELECT user_id, user_msg_id FROM msg_pairs WHERE admin_msg_id = ?", (edited_msg.message_id,)) as cursor: row = await cursor.fetchone() if row: try: new_content = edited_msg.text or edited_msg.caption await context.bot.edit_message_text(chat_id=row[0], message_id=row[1], text=new_content) except Exception as e: logger.warning(f"管理员修改同步略过: {e}") # 处理所有普通消息 async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): if not update.message: return user_id = update.effective_chat.id message = update.message # 1. 用户 -> 管理员 if user_id != ADMIN_ID: # 检查黑名单 async with aiosqlite.connect(DB_FILE) as db: async with db.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,)) as cursor: if await cursor.fetchone(): return # 如果在黑名单里,直接无视,不执行后续转发 try: forwarded_msg = await context.bot.forward_message(chat_id=ADMIN_ID, from_chat_id=user_id, message_id=message.message_id) async with aiosqlite.connect(DB_FILE) as db: await db.execute("INSERT INTO msg_pairs VALUES (?, ?, ?)", (forwarded_msg.message_id, message.message_id, user_id)) await db.commit() except Exception as e: logger.error(f"转发失败: {e}") # 2. 管理员回复 -> 用户 elif message.reply_to_message: async with aiosqlite.connect(DB_FILE) as db: async with db.execute("SELECT user_id FROM msg_pairs WHERE admin_msg_id = ?", (message.reply_to_message.message_id,)) as cursor: row = await cursor.fetchone() if row: target_user_id = row[0] try: sent_msg = await context.bot.copy_message(chat_id=target_user_id, from_chat_id=ADMIN_ID, message_id=message.message_id) # 记录管理员发出的回复,以便管理员后续修改这条回复时能同步给用户 async with aiosqlite.connect(DB_FILE) as db: await db.execute("INSERT INTO msg_pairs (admin_msg_id, user_msg_id, user_id) VALUES (?, ?, ?)", (message.message_id, sent_msg.message_id, target_user_id)) await db.commit() except Exception as e: await message.reply_text(f"回复失败: {e}") async def post_init(application: Application): await init_db() if __name__ == '__main__': # 检查配置 if not BOT_TOKEN or ADMIN_ID == 0: print("错误: 请确保环境变量 BOT_TOKEN 和 ADMIN_ID 已正确设置!") exit(1) # 构建应用 application = Application.builder().token(BOT_TOKEN).post_init(post_init).build() # 处理器注册 application.add_handler(CommandHandler("start", handle_start)) # 监听封禁 application.add_handler(CommandHandler("ban", handle_ban)) # 监听解禁 application.add_handler(CommandHandler("unban", handle_unban)) # 监听管理员删除消息 application.add_handler(CommandHandler("del", handle_delete)) # 监听修改消息的更新 application.add_handler(MessageHandler(filters.UpdateType.EDITED_MESSAGE & filters.TEXT, handle_edit)) # 监听普通消息 application.add_handler(MessageHandler(filters.ALL & ~filters.COMMAND, handle_message)) print(f"机器人已启动... 管理员ID: {ADMIN_ID}") application.run_polling()