ghost-notifier-bot/main.py
Uncle Stretch 1d4853df57
update the pair address on hoodi
Signed-off-by: Uncle Stretch <uncle.stretch@ghostchain.io>
2026-04-06 18:32:12 +03:00

202 lines
7.2 KiB
Python

import os
import json
import asyncio
import logging
from dotenv import load_dotenv
from web3 import Web3
from aiogram import types, Bot
from aiogram.utils.keyboard import InlineKeyboardBuilder
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("GhostNotifierBot")
SLEEP_TIME=10 # seconds
CONNECTION_TIMEOUT=10 # seconds
MAX_BLOCK_DELAY=5 # blocks
class RPCManager:
def __init__(self, network_name):
self.network_name = network_name
self.current_index = 0
self.rpcs = []
async def start(self):
asyncio.create_task(self._auto_update_loop())
def next_rpc(self):
if len(self.rpcs) > 0:
self.current_index = (self.current_index + 1) % len(self.rpcs)
def get_current_rpc(self):
if len(self.rpcs) == 0:
return ""
return self.rpcs[self.current_index]
def update_rpc_from_file(self):
path = os.path.join("rpcs", f"{self.network_name.lower()}.txt")
if os.path.exists(path):
with open(path, "r") as f:
rpcs = [line.strip() for line in f if line.strip()]
self.rpcs = rpcs
self.current_index %= len(self.rpcs)
logger.info(f"[{self.network_name}] list of RPC updated.")
else:
logger.warning(f"[{self.network_name}] file with RPCs not found at {path}.")
async def _auto_update_loop(self):
while True:
self.update_rpc_from_file()
await asyncio.sleep(1800) # 30 mins
class EventBot:
def __init__(self, token, chat_id, thread_id, messages):
self.bot = Bot(token)
self.chat_id = chat_id
self.thread_id = thread_id
self.messages = messages
def prepare_message(self, network_name, explorer_tx_link, event_name, log):
msgs = self.messages.get(event_name)
if msgs is None:
return f"{event_name} has no explanation text yet. =("
tx_link = f"{explorer_tx_link}{log['transactionHash'].to_0x_hex()}"
message = f"*{msgs['header']} #{network_name}!*\n\n"
message += f"{msgs['body']}\n[{msgs['link']}]({tx_link})\n\n"
message += f"_{msgs['footer']}_"
return message
def prepare_button(self, network_name, event_name, ftso_address, log):
msgs = self.messages.get(event_name)
if msgs is None:
return None
button_url = msgs["button_url"]
button_url = button_url.replace("/#/", f"/#/{network_name}/")
button = InlineKeyboardBuilder()
if event_name == "Swap":
button_url += ftso_address
elif event_name == "Bond" or event_name == "MarketCreated":
bond_id = int(log["topics"][1].hex(), 16)
button_url += str(bond_id)
elif event_name == "ProposalCreated":
proposal_id = int(log["topics"][1].hex(), 16)
button_url += str(proposal_id)
button.add(types.InlineKeyboardButton(
text=msgs["button_text"],
url=button_url
))
return button.as_markup()
async def monitor_network(self, network_name, network_data):
rpc_manager = RPCManager(network_name)
await rpc_manager.start()
last_block = None
while True:
checked_index = 0
current_rpc = rpc_manager.get_current_rpc()
try:
w3 = Web3(Web3.HTTPProvider(current_rpc, request_kwargs={'timeout': CONNECTION_TIMEOUT}))
if not w3.is_connected():
raise Exception(f"[{network_name}] Could not connect to RPC: {current_rpc}.")
logger.info(f"[{network_name}] Connecting to {current_rpc}.")
if last_block is None:
last_block = w3.eth.block_number
logger.info(f"[{network_name}] Starting from {last_block} based on {current_rpc}.")
while True:
latest_block = w3.eth.block_number
if last_block >= latest_block:
await asyncio.sleep(SLEEP_TIME)
continue
current_block = min(last_block + MAX_BLOCK_DELAY, latest_block)
logger.info(f"[{network_name}] Looking for range from {last_block + 1} to {current_block}")
for index, event in enumerate(network_data["events"]):
if checked_index > index:
continue
event_name = event["name"]
logs = w3.eth.get_logs({
"fromBlock": last_block + 1,
"toBlock": current_block,
"address": Web3.to_checksum_address(event["address"]),
"topics": [event["topic"]]
})
checked_index = index + 1
logger.info(f"""[{network_name}] Found {len(logs)} {event_name} events from {current_rpc}.""")
await asyncio.sleep(2)
for log in logs:
message = self.prepare_message(
network_name,
network_data["explorer_tx_link"],
event_name,
log
)
button = self.prepare_button(
network_name,
event_name,
network_data["ftso"],
log
)
await self.send_alert(network_name, message, button)
checked_index = 0
last_block = current_block
await asyncio.sleep(SLEEP_TIME)
except Exception as e:
logger.warning(f"[{network_name}] Connection failed {current_rpc}: {e}.")
await asyncio.sleep(2)
finally:
rpc_manager.next_rpc()
async def send_alert(self, network_name, message, button_reply_markup):
try:
await self.bot.send_message(
chat_id=self.chat_id,
message_thread_id=self.thread_id,
text=message,
parse_mode="Markdown",
reply_markup=button_reply_markup
)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"[{network_name}] TG error during sending: {e}.")
async def start(self, networks):
tasks = [
self.monitor_network(network_name, network_data)
for network_name, network_data in networks.items()
]
logger.info(f"Ghost Event Bot started. Monitoring {len(tasks)} networks.")
await asyncio.gather(*tasks)
if __name__ == "__main__":
TG_TOKEN = os.getenv("TELEGRAM_TOKEN")
CHAT_ID = os.getenv("NOTIFY_CHAT_ID")
THREAD_ID = os.getenv("THREAD_ID") or None
with open("networks.json", "r") as f:
networks = json.load(f)
with open("messages.json", "r") as f:
messages = json.load(f)
bot = EventBot(token=TG_TOKEN, chat_id=CHAT_ID, thread_id=THREAD_ID, messages=messages)
asyncio.run(bot.start(networks))