Telegram 机器人的 TON Connect - Python
本指南介绍了将 TON Connect 与 Telegram 机器人集成的过时方法。如需更安全、更现代的方法,请考虑使用 Telegram 小程序。
打开演示机器人
查看 GitHub
安装库
安装库
要制作机器人,我们将使用 aiogram
3.0 Python 库。
要开始将 TON Connect 集成到 Telegram 机器人中,你需要安装 pytonconnect
软件包。
要使用 TON 基元和解析用户地址,我们需要 pytoniq-core
。
为此您可以使用 pip:
pip install aiogram pytoniq-core python-dotenv
pip install pytonconnect
设置配置
在 .env
文件中指定 机器人令牌 和 TON Connect 清单文件 的链接。之后在 config.py
中加载它们:
# .env
TOKEN='1111111111:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' # your bot token here
MANIFEST_URL='https://raw.githubusercontent.com/XaBbl4/pytonconnect/main/pytonconnect-manifest.json'
# config.py
from os import environ as env
from dotenv import load_dotenv
load_dotenv()
TOKEN = env['TOKEN']
MANIFEST_URL = env['MANIFEST_URL']
创建简单的机器人
创建包含机器人主代码的 main.py
文件:
# main.py
import sys
import logging
import asyncio
import config
from aiogram import Bot, Dispatcher, F
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command
from aiogram.types import Message, CallbackQuery
logger = logging.getLogger(__file__)
dp = Dispatcher()
bot = Bot(config.TOKEN, parse_mode=ParseMode.HTML)
@dp.message(CommandStart())
async def command_start_handler(message: Message):
await message.answer(text='Hi!')
async def main() -> None:
await bot.delete_webhook(drop_pending_updates=True) # skip_updates = True
await dp.start_polling(bot)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
asyncio.run(main())
TON Connect 存储
TON 连接存储
让我们为 TON Connect 创建一个简单的存储空间
# tc_storage.py
from pytonconnect.storage import IStorage, DefaultStorage
storage = {}
class TcStorage(IStorage):
def __init__(self, chat_id: int):
self.chat_id = chat_id
def _get_key(self, key: str):
return str(self.chat_id) + key
async def set_item(self, key: str, value: str):
storage[self._get_key(key)] = value
async def get_item(self, key: str, default_value: str = None):
return storage.get(self._get_key(key), default_value)
async def remove_item(self, key: str):
storage.pop(self._get_key(key))
连接处理程序
首先,我们需要为每个用户返回不同实例的函数:
# connector.py
from pytonconnect import TonConnect
import config
from tc_storage import TcStorage
def get_connector(chat_id: int):
return TonConnect(config.MANIFEST_URL, storage=TcStorage(chat_id))
其次,让我们在 command_start_handler()
中添加连接处理程序:
# main.py
@dp.message(CommandStart())
async def command_start_handler(message: Message):
chat_id = message.chat.id
connector = get_connector(chat_id)
connected = await connector.restore_connection()
mk_b = InlineKeyboardBuilder()
if connected:
mk_b.button(text='Send Transaction', callback_data='send_tr')
mk_b.button(text='Disconnect', callback_data='disconnect')
await message.answer(text='You are already connected!', reply_markup=mk_b.as_markup())
else:
wallets_list = TonConnect.get_wallets()
for wallet in wallets_list:
mk_b.button(text=wallet['name'], callback_data=f'connect:{wallet["name"]}')
mk_b.adjust(1, )
await message.answer(text='Choose wallet to connect', reply_markup=mk_b.as_markup())
现在,对于尚未连接钱包的用户,机器人会发送一条包含所有可用钱包按钮的消息。
因此,我们需要编写函数来处理 connect:{wallet["name"]}
回调:
# main.py
async def connect_wallet(message: Message, wallet_name: str):
connector = get_connector(message.chat.id)
wallets_list = connector.get_wallets()
wallet = None
for w in wallets_list:
if w['name'] == wallet_name:
wallet = w
if wallet is None:
raise Exception(f'Unknown wallet: {wallet_name}')
generated_url = await connector.connect(wallet)
mk_b = InlineKeyboardBuilder()
mk_b.button(text='Connect', url=generated_url)
await message.answer(text='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())
mk_b = InlineKeyboardBuilder()
mk_b.button(text='Start', callback_data='start')
for i in range(1, 180):
await asyncio.sleep(1)
if connector.connected:
if connector.account.address:
wallet_address = connector.account.address
wallet_address = Address(wallet_address).to_str(is_bounceable=False)
await message.answer(f'You are connected with address <code>{wallet_address}</code>', reply_markup=mk_b.as_markup())
logger.info(f'Connected with address: {wallet_address}')
return
await message.answer(f'Timeout error!', reply_markup=mk_b.as_markup())
@dp.callback_query(lambda call: True)
async def main_callback_handler(call: CallbackQuery):
await call.answer()
message = call.message
data = call.data
if data == "start":
await command_start_handler(message)
elif data == "send_tr":
await send_transaction(message)
elif data == 'disconnect':
await disconnect_wallet(message)
else:
data = data.split(':')
if data[0] == 'connect':
await connect_wallet(message, data[1])
机器人会给用户 3 分钟时间连接钱包,之后会报告超时错误。
实施事务请求
让我们以 Message builders 一文中的一个例子为例:
# messages.py
from base64 import urlsafe_b64encode
from pytoniq_core import begin_cell
def get_comment_message(destination_address: str, amount: int, comment: str) -> dict:
data = {
'address': destination_address,
'amount': str(amount),
'payload': urlsafe_b64encode(
begin_cell()
.store_uint(0, 32) # op code for comment message
.store_string(comment) # store comment
.end_cell() # end cell
.to_boc() # convert it to boc
)
.decode() # encode it to urlsafe base64
}
return data
并在 main.py
文件中添加 send_transaction()
函数:
# main.py
@dp.message(Command('transaction'))
async def send_transaction(message: Message):
connector = get_connector(message.chat.id)
connected = await connector.restore_connection()
if not connected:
await message.answer('Connect wallet first!')
return
transaction = {
'valid_until': int(time.time() + 3600),
'messages': [
get_comment_message(
destination_address='0:0000000000000000000000000000000000000000000000000000000000000000',
amount=int(0.01 * 10 ** 9),
comment='hello world!'
)
]
}
await message.answer(text='Approve transaction in your wallet app!')
await connector.send_transaction(
transaction=transaction
)
但我们还应该处理可能出现的错误,因此我们将 send_transaction
方法封装到 try - except
语句中:
@dp.message(Command('transaction'))
async def send_transaction(message: Message):
...
await message.answer(text='Approve transaction in your wallet app!')
try:
await asyncio.wait_for(connector.send_transaction(
transaction=transaction
), 300)
except asyncio.TimeoutError:
await message.answer(text='Timeout error!')
except pytonconnect.exceptions.UserRejectsError:
await message.answer(text='You rejected the transaction!')
except Exception as e:
await message.answer(text=f'Unknown error: {e}')
添加断开连接处理程序
这个函数的实现非常简单:
async def disconnect_wallet(message: Message):
connector = get_connector(message.chat.id)
await connector.restore_connection()
await connector.disconnect()
await message.answer('You have been successfully disconnected!')
目前,该项目的结构如下:
.
.env
├── config.py
├── connector.py
├── main.py
├── messages.py
└── tc_storage.py
而 main.py
看起来是这样的:
显示 main.py
# main.py
import sys
import logging
import asyncio
import time
import pytonconnect.exceptions
from pytoniq_core import Address
from pytonconnect import TonConnect
import config
from messages import get_comment_message
from connector import get_connector
from aiogram import Bot, Dispatcher, F
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command
from aiogram.types import Message, CallbackQuery
from aiogram.utils.keyboard import InlineKeyboardBuilder
logger = logging.getLogger(__file__)
dp = Dispatcher()
bot = Bot(config.TOKEN, parse_mode=ParseMode.HTML)
@dp.message(CommandStart())
async def command_start_handler(message: Message):
chat_id = message.chat.id
connector = get_connector(chat_id)
connected = await connector.restore_connection()
mk_b = InlineKeyboardBuilder()
if connected:
mk_b.button(text='Send Transaction', callback_data='send_tr')
mk_b.button(text='Disconnect', callback_data='disconnect')
await message.answer(text='You are already connected!', reply_markup=mk_b.as_markup())
else:
wallets_list = TonConnect.get_wallets()
for wallet in wallets_list:
mk_b.button(text=wallet['name'], callback_data=f'connect:{wallet["name"]}')
mk_b.adjust(1, )
await message.answer(text='Choose wallet to connect', reply_markup=mk_b.as_markup())
@dp.message(Command('transaction'))
async def send_transaction(message: Message):
connector = get_connector(message.chat.id)
connected = await connector.restore_connection()
if not connected:
await message.answer('Connect wallet first!')
return
transaction = {
'valid_until': int(time.time() + 3600),
'messages': [
get_comment_message(
destination_address='0:0000000000000000000000000000000000000000000000000000000000000000',
amount=int(0.01 * 10 ** 9),
comment='hello world!'
)
]
}
await message.answer(text='Approve transaction in your wallet app!')
try:
await asyncio.wait_for(connector.send_transaction(
transaction=transaction
), 300)
except asyncio.TimeoutError:
await message.answer(text='Timeout error!')
except pytonconnect.exceptions.UserRejectsError:
await message.answer(text='You rejected the transaction!')
except Exception as e:
await message.answer(text=f'Unknown error: {e}')
async def connect_wallet(message: Message, wallet_name: str):
connector = get_connector(message.chat.id)
wallets_list = connector.get_wallets()
wallet = None
for w in wallets_list:
if w['name'] == wallet_name:
wallet = w
if wallet is None:
raise Exception(f'Unknown wallet: {wallet_name}')
generated_url = await connector.connect(wallet)
mk_b = InlineKeyboardBuilder()
mk_b.button(text='Connect', url=generated_url)
await message.answer(text='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())
mk_b = InlineKeyboardBuilder()
mk_b.button(text='Start', callback_data='start')
for i in range(1, 180):
await asyncio.sleep(1)
if connector.connected:
if connector.account.address:
wallet_address = connector.account.address
wallet_address = Address(wallet_address).to_str(is_bounceable=False)
await message.answer(f'You are connected with address <code>{wallet_address}</code>', reply_markup=mk_b.as_markup())
logger.info(f'Connected with address: {wallet_address}')
return
await message.answer(f'Timeout error!', reply_markup=mk_b.as_markup())
async def disconnect_wallet(message: Message):
connector = get_connector(message.chat.id)
await connector.restore_connection()
await connector.disconnect()
await message.answer('You have been successfully disconnected!')
@dp.callback_query(lambda call: True)
async def main_callback_handler(call: CallbackQuery):
await call.answer()
message = call.message
data = call.data
if data == "start":
await command_start_handler(message)
elif data == "send_tr":
await send_transaction(message)
elif data == 'disconnect':
await disconnect_wallet(message)
else:
data = data.split(':')
if data[0] == 'connect':
await connect_wallet(message, data[1])
async def main() -> None:
await bot.delete_webhook(drop_pending_updates=True) # skip_updates = True
await dp.start_polling(bot)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
asyncio.run(main())
添加持久存储 - Redis
添加永久存储 - Redis
在启动 Redis 数据库后安装用于与之交互的 python 库:
启动 Redis 数据库后,安装 python 库与之交互:
pip install redis
并更新 tc_storage.py
中的 TcStorage
类:
import redis.asyncio as redis
client = redis.Redis(host='localhost', port=6379)
class TcStorage(IStorage):
def __init__(self, chat_id: int):
self.chat_id = chat_id
def _get_key(self, key: str):
return str(self.chat_id) + key
async def set_item(self, key: str, value: str):
await client.set(name=self._get_key(key), value=value)
async def get_item(self, key: str, default_value: str = None):
value = await client.get(name=self._get_key(key))
return value.decode() if value else default_value
async def remove_item(self, key: str):
await client.delete(self._get_key(key))
添加 QR 码
安装 python qrcode
软件包来生成它们:
pip install qrcode
更改 connect_wallet()
函数,使其生成 qrcode 并以照片形式发送给用户:
from io import BytesIO
import qrcode
from aiogram.types import BufferedInputFile
async def connect_wallet(message: Message, wallet_name: str):
...
img = qrcode.make(generated_url)
stream = BytesIO()
img.save(stream)
file = BufferedInputFile(file=stream.getvalue(), filename='qrcode')
await message.answer_photo(photo=file, caption='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())
...
摘要
下一步是什么?
- 您可以在机器人中添加更好的错误处理功能。
- 您可以添加开始文本和类似
/connect_wallet
命令的内容。