use tokio::sync::mpsc::UnboundedSender; use color_eyre::Result; use subxt::{ backend::rpc::RpcClient, tx::{TxProgress, TxStatus}, utils::H256, OnlineClient, }; mod legacy_rpc_calls; mod predefined_calls; mod predefined_txs; mod subscriptions; mod miscellaneous; mod raw_calls; pub use miscellaneous::{prepare_perbill_fraction_string, calculate_for_fraction}; use crate::{ types::{ActionLevel, ActionTarget}, action::Action, casper::CasperConfig, }; pub use subscriptions::{FinalizedSubscription, BestSubscription}; struct TxToWatch { tx_progress: TxProgress>, sender: String, target: ActionTarget, } pub struct Network { action_tx: UnboundedSender, online_client_api: OnlineClient, rpc_client: RpcClient, best_hash: Option, finalized_hash: Option, stash_to_watch: Option<[u8; 32]>, accounts_to_watch: std::collections::HashSet<[u8; 32]>, transactions_to_watch: Vec, senders: std::collections::HashMap, } impl Network { pub fn new( action_tx: UnboundedSender, online_client_api: OnlineClient, rpc_client: RpcClient, ) -> Self { Self { action_tx, online_client_api, rpc_client, best_hash: None, finalized_hash: None, stash_to_watch: None, accounts_to_watch: Default::default(), transactions_to_watch: Default::default(), senders: Default::default(), } } fn store_stash_if_possible(&mut self, new_stash: [u8; 32]) { match self.stash_to_watch { Some(stash) if stash == new_stash => {}, _ => self.stash_to_watch = Some(new_stash), } } fn store_sender_nonce(&mut self, seed: &str, maybe_nonce: Option) { if let Some(current_nonce) = maybe_nonce { self.senders .entry(seed.to_string()) .and_modify(|stored_nonce| { if *stored_nonce < current_nonce { *stored_nonce = current_nonce; } }) .or_insert(current_nonce); } else { let _ = self.senders.remove(seed); } } fn remove_transaction_and_decrement_nonce(&mut self, index: usize) { let removed = self.transactions_to_watch.remove(index); self.senders .get_mut(&removed.sender) .map(|nonce| *nonce = nonce.saturating_sub(1)); } pub async fn handle_network_event(&mut self, io_event: Action) -> Result<()> { match io_event { Action::NewBestHash(hash) => { self.best_hash = Some(hash); Ok(()) }, Action::NewFinalizedHash(hash) => { self.finalized_hash = Some(hash); if let Some(stash_to_watch) = self.stash_to_watch { predefined_calls::get_session_keys(&self.action_tx, &self.online_client_api, &self.rpc_client, &stash_to_watch).await?; predefined_calls::get_queued_session_keys(&self.action_tx, &self.online_client_api, &self.rpc_client, &stash_to_watch).await?; predefined_calls::get_nominators_by_validator(&self.action_tx, &self.online_client_api, &stash_to_watch).await?; predefined_calls::get_validator_prefs(&self.action_tx, &self.online_client_api, &stash_to_watch).await?; predefined_calls::get_staking_value_ratio(&self.action_tx, &self.online_client_api, &stash_to_watch).await?; predefined_calls::get_is_stash_bonded(&self.action_tx, &self.online_client_api, &stash_to_watch).await?; predefined_calls::get_validators_ledger(&self.action_tx, &self.online_client_api, &stash_to_watch).await?; } for account_id in self.accounts_to_watch.iter() { predefined_calls::get_balance(&self.action_tx, &self.online_client_api, &account_id).await?; } Ok(()) }, Action::CheckPendingTransactions => { let length = self.transactions_to_watch.len(); for i in (0..length).rev() { let pending_tx = &mut self.transactions_to_watch[i]; let ext_hash = pending_tx.tx_progress.extrinsic_hash(); let log_target = pending_tx.target.clone(); match (*pending_tx).tx_progress.next().await { Some(Ok(status)) => { match status { TxStatus::Validated => self.action_tx.send(Action::EventLog(format!("transaction {} is part of future queue", ext_hash), ActionLevel::Info, log_target))?, TxStatus::Broadcasted { num_peers } => self.action_tx.send(Action::EventLog(format!("transaction {} has been broardcasted to {} nodes", ext_hash, num_peers), ActionLevel::Info, log_target))?, TxStatus::NoLongerInBestBlock => self.action_tx.send(Action::EventLog(format!("transaction {} is no longer in a best block", ext_hash), ActionLevel::Warn, log_target))?, TxStatus::InBestBlock(b) => self.action_tx.send(Action::EventLog(format!("transaction {} included in the block header {}", b.extrinsic_hash(), b.block_hash()), ActionLevel::Info, log_target))?, TxStatus::InFinalizedBlock(b) => { self.action_tx.send(Action::EventLog(format!("transaction {} has been finalized in block header {}", b.extrinsic_hash(), b.block_hash()), ActionLevel::Info, log_target))?; self.transactions_to_watch.remove(i); } TxStatus::Error { message } => { self.action_tx.send(Action::EventLog(format!("transaction {} error, something get wrong: {message}", ext_hash), ActionLevel::Error, log_target))?; self.remove_transaction_and_decrement_nonce(i); } TxStatus::Invalid { message } => { self.action_tx.send(Action::EventLog(format!("transaction {} invalid: {message}", ext_hash), ActionLevel::Error, log_target))?; self.remove_transaction_and_decrement_nonce(i); } TxStatus::Dropped { message } => { self.action_tx.send(Action::EventLog(format!("transaction {} was dropped: {message}", ext_hash), ActionLevel::Error, log_target))?; self.remove_transaction_and_decrement_nonce(i); } } }, _ => { self.action_tx.send(Action::EventLog(format!("transaction {} was dropped", ext_hash), ActionLevel::Error, log_target))?; self.remove_transaction_and_decrement_nonce(i); } } } Ok(()) }, Action::GetSystemHealth => legacy_rpc_calls::get_system_health(&self.action_tx, &self.rpc_client).await, Action::GetNodeName => legacy_rpc_calls::get_node_name(&self.action_tx, &self.rpc_client).await, Action::GetGenesisHash => legacy_rpc_calls::get_genesis_hash(&self.action_tx, &self.rpc_client).await, Action::GetChainName => legacy_rpc_calls::get_chain_name(&self.action_tx, &self.rpc_client).await, Action::GetChainVersion => legacy_rpc_calls::get_system_version(&self.action_tx, &self.rpc_client).await, Action::GetPendingExtrinsics => legacy_rpc_calls::get_pending_extrinsics(&self.action_tx, &self.rpc_client).await, Action::GetConnectedPeers => legacy_rpc_calls::get_connected_peers(&self.action_tx, &self.rpc_client).await, Action::GetListenAddresses => legacy_rpc_calls::get_listen_addresses(&self.action_tx, &self.rpc_client).await, Action::GetLocalIdentity => legacy_rpc_calls::get_local_identity(&self.action_tx, &self.rpc_client).await, Action::GetBlockAuthor(hash, logs) => predefined_calls::get_block_author(&self.action_tx, &self.online_client_api, &logs, &hash).await, Action::GetActiveEra => predefined_calls::get_active_era(&self.action_tx, &self.online_client_api).await, Action::GetCurrentEra => predefined_calls::get_current_era(&self.action_tx, &self.online_client_api).await, Action::GetEpochProgress => predefined_calls::get_epoch_progress(&self.action_tx, &self.online_client_api).await, Action::GetExistentialDeposit => predefined_calls::get_existential_deposit(&self.action_tx, &self.online_client_api).await, Action::GetTotalIssuance => predefined_calls::get_total_issuance(&self.action_tx, &self.online_client_api).await, Action::GetValidatorsNumber => predefined_calls::get_validators_number(&self.action_tx, &self.online_client_api).await, Action::GetNominatorsNumber => predefined_calls::get_nominators_number(&self.action_tx, &self.online_client_api).await, Action::GetInflation => predefined_calls::get_inflation(&self.action_tx, &self.online_client_api).await, Action::GetCurrentValidatorEraRewards => predefined_calls::get_current_validator_reward_in_era(&self.action_tx, &self.online_client_api).await, Action::SetSender(seed, maybe_nonce) => { self.store_sender_nonce(&seed, maybe_nonce); Ok(()) } Action::GetValidatorLedger(stash) => { self.store_stash_if_possible(stash); predefined_calls::get_validators_ledger(&self.action_tx, &self.online_client_api, &stash).await } Action::GetIsStashBonded(stash) => { self.store_stash_if_possible(stash); predefined_calls::get_is_stash_bonded(&self.action_tx, &self.online_client_api, &stash).await }, Action::GetErasStakersOverview(stash) => { self.store_stash_if_possible(stash); predefined_calls::get_staking_value_ratio(&self.action_tx, &self.online_client_api, &stash).await }, Action::GetValidatorPrefs(stash) => { self.store_stash_if_possible(stash); predefined_calls::get_validator_prefs(&self.action_tx, &self.online_client_api, &stash).await }, Action::GetValidatorAllRewards(stash) => { self.store_stash_if_possible(stash); predefined_calls::get_validator_staking_results(&self.action_tx, &self.online_client_api, &stash).await }, Action::GetNominatorsByValidator(stash) => { self.store_stash_if_possible(stash); predefined_calls::get_nominators_by_validator(&self.action_tx, &self.online_client_api, &stash).await }, Action::GetQueuedSessionKeys(stash) => { self.store_stash_if_possible(stash); predefined_calls::get_queued_session_keys(&self.action_tx, &self.online_client_api, &self.rpc_client, &stash).await }, Action::GetSessionKeys(stash) => { self.store_stash_if_possible(stash); predefined_calls::get_session_keys(&self.action_tx, &self.online_client_api, &self.rpc_client, &stash).await }, Action::BalanceRequest(account_id, remove) => { if remove { let _ = self.accounts_to_watch.remove(&account_id); Ok(()) } else { let _ = self.accounts_to_watch.insert(account_id); predefined_calls::get_balance(&self.action_tx, &self.online_client_api, &account_id).await } } Action::TransferBalance(sender, receiver, amount) => { let sender_cloned = sender.clone(); let maybe_nonce = self.senders.get_mut(&sender); let sender: [u8; 32] = hex::decode(sender) .expect("stored seed is valid hex string; qed") .as_slice() .try_into() .expect("stored seed is valid length; qed"); if let Ok(tx_progress) = predefined_txs::transfer_balance( &self.action_tx, &self.online_client_api, &sender, &receiver, &amount, maybe_nonce, ).await { self.transactions_to_watch.push(TxToWatch { tx_progress, sender: sender_cloned, target: ActionTarget::WalletLog, }); } Ok(()) } _ => Ok(()) } } }