202 lines
7.2 KiB
Python
202 lines
7.2 KiB
Python
import os
|
|
import json
|
|
import asyncio
|
|
import logging
|
|
|
|
from dotenv import load_dotenv
|
|
from web3 import Web3
|
|
from aiogram import types, Bot
|
|
from aiogram.utils.keyboard import InlineKeyboardBuilder
|
|
|
|
load_dotenv()
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("GhostNotifierBot")
|
|
|
|
SLEEP_TIME=10 # seconds
|
|
CONNECTION_TIMEOUT=10 # seconds
|
|
MAX_BLOCK_DELAY=5 # blocks
|
|
|
|
class RPCManager:
|
|
def __init__(self, network_name):
|
|
self.network_name = network_name
|
|
self.current_index = 0
|
|
self.rpcs = []
|
|
|
|
async def start(self):
|
|
asyncio.create_task(self._auto_update_loop())
|
|
|
|
def next_rpc(self):
|
|
if len(self.rpcs) > 0:
|
|
self.current_index = (self.current_index + 1) % len(self.rpcs)
|
|
|
|
def get_current_rpc(self):
|
|
if len(self.rpcs) == 0:
|
|
return ""
|
|
return self.rpcs[self.current_index]
|
|
|
|
def update_rpc_from_file(self):
|
|
path = os.path.join("rpcs", f"{self.network_name.lower()}.txt")
|
|
if os.path.exists(path):
|
|
with open(path, "r") as f:
|
|
rpcs = [line.strip() for line in f if line.strip()]
|
|
self.rpcs = rpcs
|
|
self.current_index %= len(self.rpcs)
|
|
logger.info(f"[{self.network_name}] list of RPC updated.")
|
|
else:
|
|
logger.warning(f"[{self.network_name}] file with RPCs not found at {path}.")
|
|
|
|
async def _auto_update_loop(self):
|
|
while True:
|
|
self.update_rpc_from_file()
|
|
await asyncio.sleep(1800) # 30 mins
|
|
|
|
class EventBot:
|
|
def __init__(self, token, chat_id, thread_id, messages):
|
|
self.bot = Bot(token)
|
|
self.chat_id = chat_id
|
|
self.thread_id = thread_id
|
|
self.messages = messages
|
|
|
|
def prepare_message(self, network_name, explorer_tx_link, event_name, log):
|
|
msgs = self.messages.get(event_name)
|
|
if msgs is None:
|
|
return f"{event_name} has no explanation text yet. =("
|
|
|
|
tx_link = f"{explorer_tx_link}{log['transactionHash'].to_0x_hex()}"
|
|
message = f"*{msgs['header']} #{network_name}!*\n\n"
|
|
message += f"{msgs['body']}\n[{msgs['link']}]({tx_link})\n\n"
|
|
message += f"_{msgs['footer']}_"
|
|
return message
|
|
|
|
def prepare_button(self, network_name, event_name, ftso_address, log):
|
|
msgs = self.messages.get(event_name)
|
|
if msgs is None:
|
|
return None
|
|
|
|
button_url = msgs["button_url"]
|
|
button_url = button_url.replace("/#/", f"/#/{network_name}/")
|
|
button = InlineKeyboardBuilder()
|
|
|
|
if event_name == "Swap":
|
|
button_url += ftso_address
|
|
elif event_name == "Bond" or event_name == "MarketCreated":
|
|
bond_id = int(log["topics"][1].hex(), 16)
|
|
button_url += str(bond_id)
|
|
elif event_name == "ProposalCreated":
|
|
proposal_id = int(log["topics"][1].hex(), 16)
|
|
button_url += str(proposal_id)
|
|
|
|
button.add(types.InlineKeyboardButton(
|
|
text=msgs["button_text"],
|
|
url=button_url
|
|
))
|
|
|
|
return button.as_markup()
|
|
|
|
async def monitor_network(self, network_name, network_data):
|
|
rpc_manager = RPCManager(network_name)
|
|
await rpc_manager.start()
|
|
last_block = None
|
|
|
|
while True:
|
|
checked_index = 0
|
|
current_rpc = rpc_manager.get_current_rpc()
|
|
|
|
try:
|
|
w3 = Web3(Web3.HTTPProvider(current_rpc, request_kwargs={'timeout': CONNECTION_TIMEOUT}))
|
|
if not w3.is_connected():
|
|
raise Exception(f"[{network_name}] Could not connect to RPC: {current_rpc}.")
|
|
|
|
logger.info(f"[{network_name}] Connecting to {current_rpc}.")
|
|
if last_block is None:
|
|
last_block = w3.eth.block_number
|
|
logger.info(f"[{network_name}] Starting from {last_block} based on {current_rpc}.")
|
|
|
|
while True:
|
|
latest_block = w3.eth.block_number
|
|
|
|
if last_block >= latest_block:
|
|
await asyncio.sleep(SLEEP_TIME)
|
|
continue
|
|
|
|
current_block = min(last_block + MAX_BLOCK_DELAY, latest_block)
|
|
logger.info(f"[{network_name}] Looking for range from {last_block + 1} to {current_block}")
|
|
|
|
for index, event in enumerate(network_data["events"]):
|
|
if checked_index > index:
|
|
continue
|
|
|
|
event_name = event["name"]
|
|
logs = w3.eth.get_logs({
|
|
"fromBlock": last_block + 1,
|
|
"toBlock": current_block,
|
|
"address": Web3.to_checksum_address(event["address"]),
|
|
"topics": [event["topic"]]
|
|
})
|
|
checked_index = index + 1
|
|
|
|
logger.info(f"""[{network_name}] Found {len(logs)} {event_name} events from {current_rpc}.""")
|
|
await asyncio.sleep(2)
|
|
|
|
for log in logs:
|
|
message = self.prepare_message(
|
|
network_name,
|
|
network_data["explorer_tx_link"],
|
|
event_name,
|
|
log
|
|
)
|
|
button = self.prepare_button(
|
|
network_name,
|
|
event_name,
|
|
network_data["ftso"],
|
|
log
|
|
)
|
|
await self.send_alert(network_name, message, button)
|
|
|
|
checked_index = 0
|
|
last_block = current_block
|
|
await asyncio.sleep(SLEEP_TIME)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"[{network_name}] Connection failed {current_rpc}: {e}.")
|
|
await asyncio.sleep(2)
|
|
|
|
finally:
|
|
rpc_manager.next_rpc()
|
|
|
|
async def send_alert(self, network_name, message, button_reply_markup):
|
|
try:
|
|
await self.bot.send_message(
|
|
chat_id=self.chat_id,
|
|
message_thread_id=self.thread_id,
|
|
text=message,
|
|
parse_mode="Markdown",
|
|
reply_markup=button_reply_markup
|
|
)
|
|
await asyncio.sleep(0.5)
|
|
except Exception as e:
|
|
logger.error(f"[{network_name}] TG error during sending: {e}.")
|
|
|
|
async def start(self, networks):
|
|
tasks = [
|
|
self.monitor_network(network_name, network_data)
|
|
for network_name, network_data in networks.items()
|
|
]
|
|
|
|
logger.info(f"Ghost Event Bot started. Monitoring {len(tasks)} networks.")
|
|
await asyncio.gather(*tasks)
|
|
|
|
if __name__ == "__main__":
|
|
TG_TOKEN = os.getenv("TELEGRAM_TOKEN")
|
|
CHAT_ID = os.getenv("NOTIFY_CHAT_ID")
|
|
THREAD_ID = os.getenv("THREAD_ID") or None
|
|
|
|
with open("networks.json", "r") as f:
|
|
networks = json.load(f)
|
|
|
|
with open("messages.json", "r") as f:
|
|
messages = json.load(f)
|
|
|
|
bot = EventBot(token=TG_TOKEN, chat_id=CHAT_ID, thread_id=THREAD_ID, messages=messages)
|
|
asyncio.run(bot.start(networks))
|