import os import json import asyncio import logging from dotenv import load_dotenv from web3 import Web3 from aiogram import types, Bot from aiogram.utils.keyboard import InlineKeyboardBuilder load_dotenv() logging.basicConfig(level=logging.INFO) logger = logging.getLogger("GhostNotifierBot") SLEEP_TIME=10 # seconds CONNECTION_TIMEOUT=10 # seconds MAX_BLOCK_DELAY=5 # blocks class RPCManager: def __init__(self, network_name): self.network_name = network_name self.current_index = 0 self.rpcs = [] async def start(self): asyncio.create_task(self._auto_update_loop()) def next_rpc(self): if len(self.rpcs) > 0: self.current_index = (self.current_index + 1) % len(self.rpcs) def get_current_rpc(self): if len(self.rpcs) == 0: return "" return self.rpcs[self.current_index] def update_rpc_from_file(self): path = os.path.join("rpcs", f"{self.network_name.lower()}.txt") if os.path.exists(path): with open(path, "r") as f: rpcs = [line.strip() for line in f if line.strip()] self.rpcs = rpcs self.current_index %= len(self.rpcs) logger.info(f"[{self.network_name}] list of RPC updated.") else: logger.warning(f"[{self.network_name}] file with RPCs not found at {path}.") async def _auto_update_loop(self): while True: self.update_rpc_from_file() await asyncio.sleep(1800) # 30 mins class EventBot: def __init__(self, token, chat_id, thread_id, messages): self.bot = Bot(token) self.chat_id = chat_id self.thread_id = thread_id self.messages = messages def prepare_message(self, network_name, explorer_tx_link, event_name, log): msgs = self.messages.get(event_name) if msgs is None: return f"{event_name} has no explanation text yet. =(" tx_link = f"{explorer_tx_link}{log['transactionHash'].to_0x_hex()}" message = f"*{msgs['header']} #{network_name}!*\n\n" message += f"{msgs['body']}\n[{msgs['link']}]({tx_link})\n\n" message += f"_{msgs['footer']}_" return message def prepare_button(self, network_name, event_name, ftso_address, log): msgs = self.messages.get(event_name) if msgs is None: return None button_url = msgs["button_url"] button_url = button_url.replace("/#/", f"/#/{network_name}/") button = InlineKeyboardBuilder() if event_name == "Swap": button_url += ftso_address elif event_name == "Bond" or event_name == "MarketCreated": bond_id = int(log["topics"][1].hex(), 16) button_url += str(bond_id) elif event_name == "ProposalCreated": proposal_id = int(log["topics"][1].hex(), 16) button_url += str(proposal_id) button.add(types.InlineKeyboardButton( text=msgs["button_text"], url=button_url )) return button.as_markup() async def monitor_network(self, network_name, network_data): rpc_manager = RPCManager(network_name) await rpc_manager.start() last_block = None while True: checked_index = 0 current_rpc = rpc_manager.get_current_rpc() try: w3 = Web3(Web3.HTTPProvider(current_rpc, request_kwargs={'timeout': CONNECTION_TIMEOUT})) if not w3.is_connected(): raise Exception(f"[{network_name}] Could not connect to RPC: {current_rpc}.") logger.info(f"[{network_name}] Connecting to {current_rpc}.") if last_block is None: last_block = w3.eth.block_number logger.info(f"[{network_name}] Starting from {last_block} based on {current_rpc}.") while True: latest_block = w3.eth.block_number if last_block >= latest_block: await asyncio.sleep(SLEEP_TIME) continue current_block = min(last_block + MAX_BLOCK_DELAY, latest_block) logger.info(f"[{network_name}] Looking for range from {last_block + 1} to {current_block}") for index, event in enumerate(network_data["events"]): if checked_index > index: continue event_name = event["name"] logs = w3.eth.get_logs({ "fromBlock": last_block + 1, "toBlock": current_block, "address": Web3.to_checksum_address(event["address"]), "topics": [event["topic"]] }) checked_index = index + 1 logger.info(f"""[{network_name}] Found {len(logs)} {event_name} events from {current_rpc}.""") await asyncio.sleep(2) for log in logs: message = self.prepare_message( network_name, network_data["explorer_tx_link"], event_name, log ) button = self.prepare_button( network_name, event_name, network_data["ftso"], log ) await self.send_alert(message, button) checked_index = 0 last_block = current_block await asyncio.sleep(SLEEP_TIME) except Exception as e: logger.warning(f"[{network_name}] Connection failed {current_rpc}: {e}.") await asyncio.sleep(2) finally: rpc_manager.next_rpc() async def send_alert(self, message, button_reply_markup): try: await self.bot.send_message( chat_id=self.chat_id, message_thread_id=self.thread_id, text=message, parse_mode="Markdown", reply_markup=button_reply_markup ) await asyncio.sleep(0.5) except Exception as e: logger.error(f"[{network}] TG error during sending: {e}.") async def start(self, networks): tasks = [ self.monitor_network(network_name, network_data) for network_name, network_data in networks.items() ] logger.info(f"Ghost Event Bot started. Monitoring {len(tasks)} networks.") await asyncio.gather(*tasks) if __name__ == "__main__": TG_TOKEN = os.getenv("TELEGRAM_TOKEN") CHAT_ID = os.getenv("NOTIFY_CHAT_ID") THREAD_ID = os.getenv("THREAD_ID") or None with open("networks.json", "r") as f: networks = json.load(f) with open("messages.json", "r") as f: messages = json.load(f) bot = EventBot(token=TG_TOKEN, chat_id=CHAT_ID, thread_id=THREAD_ID, messages=messages) asyncio.run(bot.start(networks))