跳到主要内容

Telegram 机器人的 TON Connect - Python

弃用

本指南介绍了将 TON Connect 与 Telegram 机器人集成的过时方法。如需更安全、更现代的方法,请考虑使用 Telegram 小程序

打开演示机器人

查看 GitHub

安装库

安装库

要制作机器人,我们将使用 aiogram 3.0 Python 库。 要开始将 TON Connect 集成到 Telegram 机器人中,你需要安装 pytonconnect 软件包。 要使用 TON 基元和解析用户地址,我们需要 pytoniq-core。 为此您可以使用 pip:

pip install aiogram pytoniq-core python-dotenv
pip install pytonconnect

设置配置

.env 文件中指定 机器人令牌 和 TON Connect 清单文件 的链接。之后在 config.py 中加载它们:

# .env

TOKEN='1111111111:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' # your bot token here
MANIFEST_URL='https://raw.githubusercontent.com/XaBbl4/pytonconnect/main/pytonconnect-manifest.json'
# config.py

from os import environ as env

from dotenv import load_dotenv
load_dotenv()

TOKEN = env['TOKEN']
MANIFEST_URL = env['MANIFEST_URL']

创建简单的机器人

创建包含机器人主代码的 main.py 文件:

# main.py

import sys
import logging
import asyncio

import config

from aiogram import Bot, Dispatcher, F
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command
from aiogram.types import Message, CallbackQuery


logger = logging.getLogger(__file__)

dp = Dispatcher()
bot = Bot(config.TOKEN, parse_mode=ParseMode.HTML)


@dp.message(CommandStart())
async def command_start_handler(message: Message):
await message.answer(text='Hi!')

async def main() -> None:
await bot.delete_webhook(drop_pending_updates=True) # skip_updates = True
await dp.start_polling(bot)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
asyncio.run(main())

TON Connect 存储

TON 连接存储

让我们为 TON Connect 创建一个简单的存储空间

# tc_storage.py

from pytonconnect.storage import IStorage, DefaultStorage


storage = {}


class TcStorage(IStorage):

def __init__(self, chat_id: int):
self.chat_id = chat_id

def _get_key(self, key: str):
return str(self.chat_id) + key

async def set_item(self, key: str, value: str):
storage[self._get_key(key)] = value

async def get_item(self, key: str, default_value: str = None):
return storage.get(self._get_key(key), default_value)

async def remove_item(self, key: str):
storage.pop(self._get_key(key))

连接处理程序

首先,我们需要为每个用户返回不同实例的函数:

# connector.py

from pytonconnect import TonConnect

import config
from tc_storage import TcStorage


def get_connector(chat_id: int):
return TonConnect(config.MANIFEST_URL, storage=TcStorage(chat_id))

其次,让我们在 command_start_handler() 中添加连接处理程序:

# main.py

@dp.message(CommandStart())
async def command_start_handler(message: Message):
chat_id = message.chat.id
connector = get_connector(chat_id)
connected = await connector.restore_connection()

mk_b = InlineKeyboardBuilder()
if connected:
mk_b.button(text='Send Transaction', callback_data='send_tr')
mk_b.button(text='Disconnect', callback_data='disconnect')
await message.answer(text='You are already connected!', reply_markup=mk_b.as_markup())
else:
wallets_list = TonConnect.get_wallets()
for wallet in wallets_list:
mk_b.button(text=wallet['name'], callback_data=f'connect:{wallet["name"]}')
mk_b.adjust(1, )
await message.answer(text='Choose wallet to connect', reply_markup=mk_b.as_markup())

现在,对于尚未连接钱包的用户,机器人会发送一条包含所有可用钱包按钮的消息。 因此,我们需要编写函数来处理 connect:{wallet["name"]} 回调:

# main.py

async def connect_wallet(message: Message, wallet_name: str):
connector = get_connector(message.chat.id)

wallets_list = connector.get_wallets()
wallet = None

for w in wallets_list:
if w['name'] == wallet_name:
wallet = w

if wallet is None:
raise Exception(f'Unknown wallet: {wallet_name}')

generated_url = await connector.connect(wallet)

mk_b = InlineKeyboardBuilder()
mk_b.button(text='Connect', url=generated_url)

await message.answer(text='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())

mk_b = InlineKeyboardBuilder()
mk_b.button(text='Start', callback_data='start')

for i in range(1, 180):
await asyncio.sleep(1)
if connector.connected:
if connector.account.address:
wallet_address = connector.account.address
wallet_address = Address(wallet_address).to_str(is_bounceable=False)
await message.answer(f'You are connected with address <code>{wallet_address}</code>', reply_markup=mk_b.as_markup())
logger.info(f'Connected with address: {wallet_address}')
return

await message.answer(f'Timeout error!', reply_markup=mk_b.as_markup())


@dp.callback_query(lambda call: True)
async def main_callback_handler(call: CallbackQuery):
await call.answer()
message = call.message
data = call.data
if data == "start":
await command_start_handler(message)
elif data == "send_tr":
await send_transaction(message)
elif data == 'disconnect':
await disconnect_wallet(message)
else:
data = data.split(':')
if data[0] == 'connect':
await connect_wallet(message, data[1])

机器人会给用户 3 分钟时间连接钱包,之后会报告超时错误。

实施事务请求

让我们以 Message builders 一文中的一个例子为例:

# messages.py

from base64 import urlsafe_b64encode

from pytoniq_core import begin_cell


def get_comment_message(destination_address: str, amount: int, comment: str) -> dict:

data = {
'address': destination_address,
'amount': str(amount),
'payload': urlsafe_b64encode(
begin_cell()
.store_uint(0, 32) # op code for comment message
.store_string(comment) # store comment
.end_cell() # end cell
.to_boc() # convert it to boc
)
.decode() # encode it to urlsafe base64
}

return data

并在 main.py 文件中添加 send_transaction() 函数:

# main.py

@dp.message(Command('transaction'))
async def send_transaction(message: Message):
connector = get_connector(message.chat.id)
connected = await connector.restore_connection()
if not connected:
await message.answer('Connect wallet first!')
return

transaction = {
'valid_until': int(time.time() + 3600),
'messages': [
get_comment_message(
destination_address='0:0000000000000000000000000000000000000000000000000000000000000000',
amount=int(0.01 * 10 ** 9),
comment='hello world!'
)
]
}

await message.answer(text='Approve transaction in your wallet app!')
await connector.send_transaction(
transaction=transaction
)

但我们还应该处理可能出现的错误,因此我们将 send_transaction 方法封装到 try - except 语句中:

@dp.message(Command('transaction'))
async def send_transaction(message: Message):
...
await message.answer(text='Approve transaction in your wallet app!')
try:
await asyncio.wait_for(connector.send_transaction(
transaction=transaction
), 300)
except asyncio.TimeoutError:
await message.answer(text='Timeout error!')
except pytonconnect.exceptions.UserRejectsError:
await message.answer(text='You rejected the transaction!')
except Exception as e:
await message.answer(text=f'Unknown error: {e}')

添加断开连接处理程序

这个函数的实现非常简单:

async def disconnect_wallet(message: Message):
connector = get_connector(message.chat.id)
await connector.restore_connection()
await connector.disconnect()
await message.answer('You have been successfully disconnected!')

目前,该项目的结构如下:

.
.env
├── config.py
├── connector.py
├── main.py
├── messages.py
└── tc_storage.py

main.py 看起来是这样的:

显示 main.py
# main.py

import sys
import logging
import asyncio
import time

import pytonconnect.exceptions
from pytoniq_core import Address
from pytonconnect import TonConnect

import config
from messages import get_comment_message
from connector import get_connector

from aiogram import Bot, Dispatcher, F
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command
from aiogram.types import Message, CallbackQuery
from aiogram.utils.keyboard import InlineKeyboardBuilder


logger = logging.getLogger(__file__)

dp = Dispatcher()
bot = Bot(config.TOKEN, parse_mode=ParseMode.HTML)


@dp.message(CommandStart())
async def command_start_handler(message: Message):
chat_id = message.chat.id
connector = get_connector(chat_id)
connected = await connector.restore_connection()

mk_b = InlineKeyboardBuilder()
if connected:
mk_b.button(text='Send Transaction', callback_data='send_tr')
mk_b.button(text='Disconnect', callback_data='disconnect')
await message.answer(text='You are already connected!', reply_markup=mk_b.as_markup())

else:
wallets_list = TonConnect.get_wallets()
for wallet in wallets_list:
mk_b.button(text=wallet['name'], callback_data=f'connect:{wallet["name"]}')
mk_b.adjust(1, )
await message.answer(text='Choose wallet to connect', reply_markup=mk_b.as_markup())


@dp.message(Command('transaction'))
async def send_transaction(message: Message):
connector = get_connector(message.chat.id)
connected = await connector.restore_connection()
if not connected:
await message.answer('Connect wallet first!')
return

transaction = {
'valid_until': int(time.time() + 3600),
'messages': [
get_comment_message(
destination_address='0:0000000000000000000000000000000000000000000000000000000000000000',
amount=int(0.01 * 10 ** 9),
comment='hello world!'
)
]
}

await message.answer(text='Approve transaction in your wallet app!')
try:
await asyncio.wait_for(connector.send_transaction(
transaction=transaction
), 300)
except asyncio.TimeoutError:
await message.answer(text='Timeout error!')
except pytonconnect.exceptions.UserRejectsError:
await message.answer(text='You rejected the transaction!')
except Exception as e:
await message.answer(text=f'Unknown error: {e}')


async def connect_wallet(message: Message, wallet_name: str):
connector = get_connector(message.chat.id)

wallets_list = connector.get_wallets()
wallet = None

for w in wallets_list:
if w['name'] == wallet_name:
wallet = w

if wallet is None:
raise Exception(f'Unknown wallet: {wallet_name}')

generated_url = await connector.connect(wallet)

mk_b = InlineKeyboardBuilder()
mk_b.button(text='Connect', url=generated_url)

await message.answer(text='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())

mk_b = InlineKeyboardBuilder()
mk_b.button(text='Start', callback_data='start')

for i in range(1, 180):
await asyncio.sleep(1)
if connector.connected:
if connector.account.address:
wallet_address = connector.account.address
wallet_address = Address(wallet_address).to_str(is_bounceable=False)
await message.answer(f'You are connected with address <code>{wallet_address}</code>', reply_markup=mk_b.as_markup())
logger.info(f'Connected with address: {wallet_address}')
return

await message.answer(f'Timeout error!', reply_markup=mk_b.as_markup())


async def disconnect_wallet(message: Message):
connector = get_connector(message.chat.id)
await connector.restore_connection()
await connector.disconnect()
await message.answer('You have been successfully disconnected!')


@dp.callback_query(lambda call: True)
async def main_callback_handler(call: CallbackQuery):
await call.answer()
message = call.message
data = call.data
if data == "start":
await command_start_handler(message)
elif data == "send_tr":
await send_transaction(message)
elif data == 'disconnect':
await disconnect_wallet(message)
else:
data = data.split(':')
if data[0] == 'connect':
await connect_wallet(message, data[1])


async def main() -> None:
await bot.delete_webhook(drop_pending_updates=True) # skip_updates = True
await dp.start_polling(bot)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
asyncio.run(main())

添加持久存储 - Redis

添加永久存储 - Redis

在启动 Redis 数据库后安装用于与之交互的 python 库:

启动 Redis 数据库后,安装 python 库与之交互:

pip install redis

并更新 tc_storage.py 中的 TcStorage 类:

import redis.asyncio as redis

client = redis.Redis(host='localhost', port=6379)


class TcStorage(IStorage):

def __init__(self, chat_id: int):
self.chat_id = chat_id

def _get_key(self, key: str):
return str(self.chat_id) + key

async def set_item(self, key: str, value: str):
await client.set(name=self._get_key(key), value=value)

async def get_item(self, key: str, default_value: str = None):
value = await client.get(name=self._get_key(key))
return value.decode() if value else default_value

async def remove_item(self, key: str):
await client.delete(self._get_key(key))

添加 QR 码

安装 python qrcode 软件包来生成它们:

pip install qrcode

更改 connect_wallet() 函数,使其生成 qrcode 并以照片形式发送给用户:

from io import BytesIO
import qrcode
from aiogram.types import BufferedInputFile


async def connect_wallet(message: Message, wallet_name: str):
...

img = qrcode.make(generated_url)
stream = BytesIO()
img.save(stream)
file = BufferedInputFile(file=stream.getvalue(), filename='qrcode')

await message.answer_photo(photo=file, caption='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())

...

摘要

下一步是什么?

  • 您可以在机器人中添加更好的错误处理功能。
  • 您可以添加开始文本和类似 /connect_wallet命令的内容。

另请参见