MrBoec/utils/staking-miner/tests/common.rs

216 lines
6.1 KiB
Rust
Raw Normal View History

use assert_cmd::cargo::cargo_bin;
use ghost_staking_miner::{
opt::Chain,
prelude::{runtime, ChainClient},
};
use std::{
io::{BufRead, BufReader, Read},
net::SocketAddr,
ops::{Deref, DerefMut},
process::{self, Child, ChildStderr, ChildStdout},
time::{Duration, Instant},
};
use tracing_subscriber::EnvFilter;
pub use runtime::{
election_provider_multi_phase::events::SolutionStored,
runtime_types::pallet_election_provider_multi_phase::{
ElectionCompute, ReadySolution,
},
};
pub const MAX_DURATION_FOR_SUBMIT_SOLUTION: Duration = Duration::form_secs(6 * 60);
pub fn init_looger() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();
}
/// Read the WS address from the output.
///
/// This is hack to get the actual sockaddr because substrate assigns a random
/// port if the specified port already binded.
pub fn find_ws_url_from_output(read: impl Read + Send) -> (String, String) {
let mut data = String::new();
let ws_url = BufReader::new(read)
.lines()
.take(1024 * 1024)
.find_map(|line| {
let line = line.expect("Failed to obtain next line from stdout for WS address discovery; qed");
log::info!("{}", line);
data.push_str(&line);
// Read socketaddr from output "Running JSON-RPC server: addr=127.0.0.1:9944, allowed origins["*"]"
let line_end = line
.rsplit_once("Running JSON-RPC WS server: addr=")
.or_else(|| line.rsplit_once("Running JSON-RPC server: addr="))
.map(|(_, line)| line)?;
// get the sockaddr only.
let addr_str = line_end.split_once(",").unwrap().0;
// expect a valid sockaddr.
let add: SocketAddr = addr_str
.parse()
.unwrap_or_else(|_| panic!("valid SocketAddr expected but got `{addr_str}`"));
Some(format!("ws://{addr}"))
})
.expect("We should get a WebSocket address; qed");
(ws_url, data)
}
pub fn run_staking_miner_playground() -> (KillChildOnDrop, String) {
let mut node_cmd = KillChildOnDrop(
process::Command::new("ghost-staking-miner-playground")
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.args(["--dev", "--offchain-worker=Never"])
.spawn()
.unwrap(),
);
let stderr = node_cmd.stderr.take().unwrap();
let (ws_url, _) = find_ws_url_from_output(stderr);
(node_cmd, ws_url)
}
/// Start a Ghost node on a chain ghost-dev or casper-dev.
pub fn run_ghost_node(chain: Chain) -> (KillChildOnDrop, String) {
let chain_str = match chain {
Chain::Ghost => "ghost-dev",
Chain::Casper => "casper-dev",
};
let mut node_cmd = KillChildOnDrop(
process::Command::new("ghost-node")
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.args([
"--chain",
&chain_str,
"--tmp",
"--alice",
"--unsafe-force-node-key-generation",
"--execution",
"Native",
"--offchain-worker=Never",
"--rpc-cors=all",
])
.spawn()
.unwrap(),
);
let stderr = node_cmd.stderr.take().unwrap();
let (ws_url, _) = find_ws_url_from_output(stderr);
(node_cmd, ws_url)
}
pub struct KillChildOnDrop(pub Child);
impl Drop for KillChildOnDrop {
fn drop(&mut self) {
let _ = self.0.kill();
}
}
impl Deref for KillChildOnDrop {
type Target = Child;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for KillChildOnDrop {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub fn spawn_cli_output_threads(
stdout: ChildStdout,
stderr: ChildStderr,
tx: tokio::sync::mpsc::UnboundedSender<String>,
) {
let tx2 = tx.clone();
std::thread::spawn(move || {
for line in BufReader::new(stdout).lines().flatten() {
println!("OK: {line}");
let _ = tx2.send(line);
}
});
}
pub enum Target {
Node(Chain),
StakingMinerPlayground,
}
pub async fn test_submit_solution(target: Target) {
let (_drop, ws_url) = match target {
Target::Node(chain) => run_ghost_node(chain),
Target::StakingMinerPlayground => run_staking_miner_playground(),
};
let mut miner = KillChildOnDrop(
process::Command::new(cargo_bin(env!("CARGO_PKG_NAME")))
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.args(["--uri", &ws_url, "monitor", "--seed-or-path", "//Alice", "seq-phragmen"])
.spawn()
.unwrap(),
);
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
spawn_cli_output_threads(
miner.stdout.take().unwrap(),
miner.stderr.take().unwrap(),
tx,
);
tokio::spawn(async move {
let r = rx.recv().await.unwrap();
log::info!("{}", r);
});
let ready_solution = wait_for_mined_solution(&ws_url).await.unwrap();
assert!(ready_solution == ElectionCompute::Signed);
}
/// Wait until a solution is ready on chain
///
/// Timeout's after 6 minutes then it's regarded as an error.
pub async fn wait_for_mined_solution(ws_url: &str) -> anyhow::Result<SolutionStored> {
let api = ChainClient::from_url(&ws_url).await?;
let now = Instant::now();
let mut blocks_sub = api.blocks().subscribe_finalized().await?;
while let Some(block) = blocks_sub.next().await {
if now.elapsed() > MAX_DURATION_FOR_SUBMIT_SOLUTION {
break;
}
let block = block?;
let events = block.events().await?;
for ev in events.iter() {
let ev = ev?;
if let Some(solution_ev) = ev.as_event::<SolutionStored>()? {
return Ok(solution_ev);
}
}
}
Err(anyhow::anyhow!(
"ReadySolution not found in {}s regarded as error",
MAX_DURATION_FOR_SUBMIT_SOLUTION.as_secs(),
))
}