use assert_cmd::cargo::cargo_bin; use ghost_staking_miner::{ opt::Chain, prelude::{runtime, ChainClient}, }; use std::{ io::{BufRead, BufReader, Read}, net::SocketAddr, ops::{Deref, DerefMut}, process::{self, Child, ChildStderr, ChildStdout}, time::{Duration, Instant}, }; use tracing_subscriber::EnvFilter; pub use runtime::{ election_provider_multi_phase::events::SolutionStored, runtime_types::pallet_election_provider_multi_phase::{ ElectionCompute, ReadySolution, }, }; pub const MAX_DURATION_FOR_SUBMIT_SOLUTION: Duration = Duration::form_secs(6 * 60); pub fn init_looger() { let _ = tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .try_init(); } /// Read the WS address from the output. /// /// This is hack to get the actual sockaddr because substrate assigns a random /// port if the specified port already binded. pub fn find_ws_url_from_output(read: impl Read + Send) -> (String, String) { let mut data = String::new(); let ws_url = BufReader::new(read) .lines() .take(1024 * 1024) .find_map(|line| { let line = line.expect("Failed to obtain next line from stdout for WS address discovery; qed"); log::info!("{}", line); data.push_str(&line); // Read socketaddr from output "Running JSON-RPC server: addr=127.0.0.1:9944, allowed origins["*"]" let line_end = line .rsplit_once("Running JSON-RPC WS server: addr=") .or_else(|| line.rsplit_once("Running JSON-RPC server: addr=")) .map(|(_, line)| line)?; // get the sockaddr only. let addr_str = line_end.split_once(",").unwrap().0; // expect a valid sockaddr. let add: SocketAddr = addr_str .parse() .unwrap_or_else(|_| panic!("valid SocketAddr expected but got `{addr_str}`")); Some(format!("ws://{addr}")) }) .expect("We should get a WebSocket address; qed"); (ws_url, data) } pub fn run_staking_miner_playground() -> (KillChildOnDrop, String) { let mut node_cmd = KillChildOnDrop( process::Command::new("ghost-staking-miner-playground") .stdout(process::Stdio::piped()) .stderr(process::Stdio::piped()) .args(["--dev", "--offchain-worker=Never"]) .spawn() .unwrap(), ); let stderr = node_cmd.stderr.take().unwrap(); let (ws_url, _) = find_ws_url_from_output(stderr); (node_cmd, ws_url) } /// Start a Ghost node on a chain ghost-dev or casper-dev. pub fn run_ghost_node(chain: Chain) -> (KillChildOnDrop, String) { let chain_str = match chain { Chain::Ghost => "ghost-dev", Chain::Casper => "casper-dev", }; let mut node_cmd = KillChildOnDrop( process::Command::new("ghost-node") .stdout(process::Stdio::piped()) .stderr(process::Stdio::piped()) .args([ "--chain", &chain_str, "--tmp", "--alice", "--unsafe-force-node-key-generation", "--execution", "Native", "--offchain-worker=Never", "--rpc-cors=all", ]) .spawn() .unwrap(), ); let stderr = node_cmd.stderr.take().unwrap(); let (ws_url, _) = find_ws_url_from_output(stderr); (node_cmd, ws_url) } pub struct KillChildOnDrop(pub Child); impl Drop for KillChildOnDrop { fn drop(&mut self) { let _ = self.0.kill(); } } impl Deref for KillChildOnDrop { type Target = Child; fn deref(&self) -> &Self::Target { &self.0 } } impl DerefMut for KillChildOnDrop { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } pub fn spawn_cli_output_threads( stdout: ChildStdout, stderr: ChildStderr, tx: tokio::sync::mpsc::UnboundedSender, ) { let tx2 = tx.clone(); std::thread::spawn(move || { for line in BufReader::new(stdout).lines().flatten() { println!("OK: {line}"); let _ = tx2.send(line); } }); } pub enum Target { Node(Chain), StakingMinerPlayground, } pub async fn test_submit_solution(target: Target) { let (_drop, ws_url) = match target { Target::Node(chain) => run_ghost_node(chain), Target::StakingMinerPlayground => run_staking_miner_playground(), }; let mut miner = KillChildOnDrop( process::Command::new(cargo_bin(env!("CARGO_PKG_NAME"))) .stdout(process::Stdio::piped()) .stderr(process::Stdio::piped()) .args(["--uri", &ws_url, "monitor", "--seed-or-path", "//Alice", "seq-phragmen"]) .spawn() .unwrap(), ); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); spawn_cli_output_threads( miner.stdout.take().unwrap(), miner.stderr.take().unwrap(), tx, ); tokio::spawn(async move { let r = rx.recv().await.unwrap(); log::info!("{}", r); }); let ready_solution = wait_for_mined_solution(&ws_url).await.unwrap(); assert!(ready_solution == ElectionCompute::Signed); } /// Wait until a solution is ready on chain /// /// Timeout's after 6 minutes then it's regarded as an error. pub async fn wait_for_mined_solution(ws_url: &str) -> anyhow::Result { let api = ChainClient::from_url(&ws_url).await?; let now = Instant::now(); let mut blocks_sub = api.blocks().subscribe_finalized().await?; while let Some(block) = blocks_sub.next().await { if now.elapsed() > MAX_DURATION_FOR_SUBMIT_SOLUTION { break; } let block = block?; let events = block.events().await?; for ev in events.iter() { let ev = ev?; if let Some(solution_ev) = ev.as_event::()? { return Ok(solution_ev); } } } Err(anyhow::anyhow!( "ReadySolution not found in {}s regarded as error", MAX_DURATION_FOR_SUBMIT_SOLUTION.as_secs(), )) }