刚接触飞机开发者 API ,写的 python 脚本用于群关键字监控,运行时报错 Attempt 1 at connecting failed: TimeoutError
。
之后使用 curl -4 -L https://api.telegram.org
测试连通性,返回的是一个 html 网页,而不是 json 。
该 IP 用于飞机登录和日常使用是没问题的。期间也换了几个,报错都一样。
请问这问题是不是出在 IP 上?
脚本:
import asyncio
from telethon import TelegramClient, events
from telethon.network import ConnectionTcpMTProxyRandomizedIntermediate
# 替换为你的 API 凭证
api_id = 'xxxx' # 你的 api_id
api_hash = 'xxxx' # 你的 api_hash
phone_number = 'xxxx' # 你的电话号码,例如 '+1234567890'
group_name = 'xxxx' # 目标群聊的用户名(例如 '@MyGroup')或 ID (例如 -100123456789 )
# 定义要监控的关键字列表
keywords = ['test', 'python'] # 替换为你想监控的关键字
# 代理设置(根据你的代理类型选择 SOCKS5 或 HTTP )
proxy = {
'proxy_type': 'http', # 可选值:'socks5', 'http', 或 'mtproxy'
'addr': '127.0.0.1', # 代理服务器地址
'port': 7897, # 代理服务器端口
'username': None, # 代理用户名(如果不需要认证,设为 None )
'password': None # 代理密码(如果不需要认证,设为 None )
}
# 创建 Telegram 客户端并应用代理设置
if proxy['proxy_type'] == 'mtproxy':
client = TelegramClient(
'session_name',
api_id,
api_hash,
connection=ConnectionTcpMTProxyRandomizedIntermediate,
proxy=(proxy['addr'], proxy['port'], proxy.get('secret')) # MTProxy 需要 secret
)
else:
client = TelegramClient(
'session_name',
api_id,
api_hash,
proxy=(proxy['proxy_type'].upper(), proxy['addr'], proxy['port'], proxy['username'], proxy['password'])
)
# 事件处理器:监听新消息
@client.on(events.NewMessage(chats=group_name))
async def handler(event):
message = event.message.text
if not message: # 确保消息内容不为空
return
message = message.lower() # 获取消息内容并转为小写
# 检查消息是否包含任意关键字
for keyword in keywords:
if keyword.lower() in message:
print(f"检测到关键字 '{keyword}' 在消息: {event.message.text}")
print(f"发送者: {event.message.sender_id}, 时间: {event.message.date}")
# 将消息保存到文件
with open('keyword_messages.txt', 'a', encoding='utf-8') as f:
f.write(f"关键字: {keyword}\n 消息: {event.message.text}\n 发送者: {event.message.sender_id}\n 时间: {event.message.date}\n\n")
async def main():
# 启动客户端并登录
try:
await client.start(phone=phone_number)
print("正在通过代理监控群聊消息...")
# 保持脚本运行
await client.run_until_disconnected()
except Exception as e:
print(f"启动客户端失败: {e}")
# 运行脚本
if __name__ == '__main__':
asyncio.run(main())