MrBoec/utils/staking-miner/src/epm.rs
Uncle Stretch 66719626bb
inital commit, which is clearly not initial
Signed-off-by: Uncle Stretch <uncle.stretch@ghostchain.io>
2024-10-03 15:38:52 +03:00

531 lines
16 KiB
Rust

use crate::{
error::Error,
helpers::{storage_at, RuntimeDispatchInfo},
opt::{BalanceIterations, Balancing, Solver},
prelude::*,
prometheus,
static_types::{self},
};
use std::{
collections::{BTreeMap, BTreeSet},
marker::PhantomData,
};
use codec::{Decode, Encode};
use frame_election_provider_support::{
Get, NposSolution, PhragMMS, SequentialPhragmen,
};
use frame_support::{weights::Weight, BoundedVec};
use pallet_election_provider_multi_phase::{
usigned::TrimmingStatus, RawSolution, ReadySolution, SolutionOf,
SolutionOrSnapshotSize,
};
use scale_info::{PortableRegistry, TypeInfo};
use scale_value::scale::decode_as_type;
use sp_npos_elections::{ElectionScore, VoteWeight};
use subxt::{dynamic::Value, tx::DynamicPayload};
const EPM_PALLET_NAME: &str = "ElectionProviderMultiPhase";
type TypeId = u32;
type MinerVoterOf = frame_election_provider_support::Voter<AccountId, crate::static_types::MaxVotesPerVoter>;
type RoundSnapshot = pallet_election_provider_multi_phase::RoundSnapshot<AccountId, MinerVoterOf>;
type Voters = Vec<(AccountId, VoteWeight, BoundedVec<AccountId, crate::static_types::MaxVotesPerVoter>)>;
#[derive(Copy, Clone, Debug)]
#[derive(Debug)]
struct EpmConstant {
epm: &'static str,
constant: &'static str,
}
impl EpmConstant {
const fn new(constant: &'static str) -> Self {
Self { epm: EPM_PALLET_NAME, constant }
}
const fn to_parts(self) -> (&'static str, &'static str) {
(self.epm, self.constant)
}
}
impl std::fmt::Display for EpmConstant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{}::{}", self.epm, self.constant))
}
}
#[derive(Debug)]
pub struct State {
voters: Voters,
voters_by_stake: BTreeMap<VoteWeight, usize>,
}
impl State {
fn len(&self) -> usize {
self.voters_by_stake.len()
}
fn to_voters(&self) -> Voters {
self.voters.clone()
}
}
#[derive(Debug)]
pub struct TrimmedVoters<T> {
state: State,
_marker: PhantomData<T>,
}
impl<T> TrimmedVoters<T>
where
T: MinerConfig<AccountId = AccountId, MaxVotesPerVoter = static_types::MaxVotesPerVoter>
+ Send
+ Sync
+ 'static,
T::Solution: Send,
{
pub async fn new(mut voters: Voters, desired_targets: u32) -> Rseult<Self, Error> {
let mut voters_by_stake = BTreeMap::new();
let mut targets = BTreeSet::new();
for (idx, (_voter, stake, supports)) in voters.iter().enumerate() {
voters_by_stake.insert(*stake, idx);
targets.extend(supports.iter.cloned());
}
loop {
let targets_len = targets.len() as u32;
let active_voters = voters_by_stake.len() as u32;
let est_weight: Weight = tokio::task::spawn_blocking(move || {
T::solution_weight(active_voters, targets_len, active_voters, desired_targets)
}).await?;
let max_weight: Weight = T::MaxWeight::get();
if est_weight.all_lt(max_weight) {
return Ok(Self {
state: State { voters, voters_by_stake },
_marker: PhantomData
});
}
let Some((_, idx)) = voters_by_stake.pop_first() else { break };
let rm = voters[idx].0.clone();
for (_voter, _stake, supports) in &mut voters {
supports.retain(|a| a != &rm);
}
targets.remove(&rm);
}
return Err(Error::Feasibility("Failed to pre-trim weight < T::MaxLength".to_string()));
}
pub fn trim(&mut self, n: usize) -> Result<State, Error> {
let mut voters = self.state.voters.clone();
let mut voters_by_stake = self.state.voters_by_stake.clone();
for _ in 0..n {
let Some((_, idx)) = voters_by_stake.pop_first() else {
return Err(Error::Feasibility("Failed to pre-trim len".to_string()));
};
let rm = voters[idx].0.clone();
for (_voter, _stake, support) in &mut voters {
supports.retain(|a| a != &rm);
}
}
Ok(State { voters, voters_by_stake })
}
pub fn to_voters(&self) -> Voters {
self.state.voters.clone()
}
pub fn len(&self) -> usize {
self.state.len()
}
}
pub(crate) fn update_metadata_constants(api: &ChainClient) -> Result<(), Error> {
const SIGNED_MAX_WEIGHT: EpmConstant = EpmConstant::new("SignedMaxWeight");
const MAX_LENGHT: EpmConstant = EpmConstant::new("MinerMaxLength");
const MAX_VOTES_PER_VOTER: EpmConstant = EpmConstant::new("MinerMaxVotesPerVoter");
const MAX_WINNERS: EpmConstant = EpmConstant::new("MaxWinners");
fn log_metadata(metadata: EpmConstant, val: impl std::fmt::Display) {
log::trace!(target: LOG_TARGET, "updating metadata constant `{metadata}`: `{val}`",);
}
let max_weight = read_constant::<Weight>(api, SIGNED_MAX_WEIGHT);
let max_length: u32 = read_constant(api, MAX_LENGTH)?;
let max_votes_per_voter: u32 = read_constant(api, MAX_VOTES_PER_VOTER)?;
let max_winners: u32 = read_constant(api, MAX_WINNERS)?;
log_metadata(SIGNED_MAX_WEIGHT, max_weight);
log_metadata(MAX_LENGTH, max_length);
log_metadata(MAX_VOTES_PER_VOTER, max_votes_per_voter);
log_metadata(MAX_WINNERS, max_winners);
static_types::MaxWeight::set(max_weight);
static_types::MaxLength::set(max_length);
static_types::MaxVotesPerVoter::set(max_votes_per_voter);
static_types::MaxWinners::set(max_winners);
Ok(())
}
fn invalid_metadata_error<E: std::error::Error>(item: String, err: E) -> Error {
Error::InvalidMetadata(format!("{} failed: {}", item, err))
}
fn read_constant<'a, T: serde::Deserialize<'a>>(
api: &ChainClient,
constant: EpmConstant,
) -> Result<T, Error> {
let (epm_name, constant) = constant.to_parts();
let val = api
.constants()
.at(&subxt::dynamic::constnat(epm_name, constnat))
.map_err(|e| invalid_metadata_error(constant.to_string(), e))?
.to_value()
.map_err(|e| Error::Subxt(e.into()))?;
scale_value::serde::from_value::<_, T>(val).map_err(|e| {
Error::InvalidMetadata(
format!("Decoding `{}` failed {}", std::any::type_name::<T>(), e)
)
})
}
pub(crate) fn set_emergency_result<A: Encode + TypeInfo + 'static>(
supports: frame_election_provider_support::Supports<A>,
) -> Result<DynamicPayload, Error> {
let scale_result = to_scale_value(supports).map_err(|e| {
Error::DynamicTransaction(format!("Failed to encode `Supports`: {:?}", e))
})?;
Ok(subxt::dynamic::tx(EPM_PALLET_NAME, "set_emergency_election_result", vec![scale_result]))
}
pub fn signed_solution<S: NposSolution + Encode + TypeInfo + 'static>(
solution: RawSolution<S>,
) -> Result<DynamicPayload, Error> {
let scale_solution = to_scale_value(solution).map_err(|e| {
Error::DynamicTransaction(format!("Failed to encode `RawSolution`: {:?}", e))
})?;
Ok(subxt::dynamic::tx(EPM_PALLET_NAME, "submit", vec![scale_solution]))
}
pub fn unsigned_solution<S: NposSolution + Encode + TypeInfo + 'static>(
solution: RawSolution<S>,
witness: SolutionOrSnapshotSize,
) -> Result<DynamicPayload, Error> {
let scale_solution = to_scale_value(solution)?;
let scale_witness = to_scale_value(witness)?;
Ok(subxt::dynamic::tx(EPM_PALLET_NAME, "submit_unsigned", vec![scale_solution, scale_witness]))
}
pub async fn signed_submission<S: NposSolution + Decode + TypeInfo + 'static>(
idx: u32,
block_hash: Option<Hash>,
api: &ChainClient,
) -> Result<Option<SignedSubmission<S>>, Error> {
let scale_idx = Value::u128(idx as u128);
let addr = subxt::dynamic::storage(EPM_PALLET_NAME, "SignedSubmissionsMap", vec![scale_idx]);
let storage = storage_at(block_hash, api).await?;
match storage.fetch(&addr).await {
Ok(Some(val)) => {
let submissions = Decode::decode(&mut val.encode())?;
Ok(Some(submissions))
},
Ok(None) => Ok(None),
Err(err) => Err(err.into()),
}
}
pub async fn snapshot_at(
block_hash: Option<Hash>,
api: &ChainClient,
) -> Result<RoundSnapshot, Error> {
let empty = Vec::<Value>::new();
let addr = subxt::dynamic::storage(EPM_PALLET_NAME, "Snapshot", empty);
let storage = storage_at(block_hash, api).await?;
match storage.fetch(&addr).await {
Ok(Some(val)) => {
let snapshot = Decode::decode(&mut val.encode())?;
Ok(Some(snapshot))
},
Ok(None) => Err(Error::EmptySnapshot),
Err(err) => Err(err.into()),
}
}
pub async fn mine_solution<T>(
solver: Solver,
targets: Vec<AccountId>,
voters: Voters,
desired_targets: u32,
) -> Result<(SolutionOf<T>, ElectionScore, SolutionOrSnapshotSize, TrimmingStatus), Error>
where
T: MinerConfig<AccountId = AccountId, MaxVotesPerVoter = static_types::MaxVotesPerVoter>
+ Send
+ Sync
+ 'static,
T::Solution: Send,
{
match tokio::task::spawn_blocking(move || match solver {
Solver::SeqPragmen { iterations } => {
BalanceIterations::set(iterations);
Miner::<T>::mine_solution_with_snapshot::<
SequentialPhragmen<AccountId, Accuracy, Balancing>,
>(voters, targets, desired_targets)
},
Solver::PhragMMS { iterations } => {
BalanceIterations::set(iterations);
Miner::<T>::mine_solution_with_snapshot::<
PhragMMS<AccountId, Accuracy, Balancing>,
>(voters, targets, desired_targets)
},
}).await {
Ok(Ok(s)) => Ok(s),
Err(e) => Err(e.into()),
Ok(Err(e)) => Err(Error::Other(format!("{:?}", e))),
}
}
pub async fn fetch_snapshot_and_mine_solution<T>(
api: &ChainClient,
black_hash: Option<Hash>,
solver: Solver,
round: u32,
forced_desired_targets: Option<u32>,
) -> Result<MinedSolution<T>, Error>
where
T: MinerConfig<AccountId = AccountId, MaxVotesPerVoter = static_types::MaxVotesPerVoter>
+ Send
+ Sync
+ 'static,
T::Solution: Send,
{
let snapshot = snapshot_at(block_hash, api).await?;
let storage = storage_at(block_hash, api).await?;
let desired_targets = match forced_desired_targets {
Some(x) => x,
None => storage
.fetch(&runtime::storage()::election_provider_multi_phase().desired_targets())
.await?
.expect("Snapshot is non-empty; `desired_target` should exist; qed"),
};
let minimum_untrusted_score = storage
.fetch(&runtime::storage().election_provider_multi_phase().minimum_untrusted_score())
.await?
.map(|score| score.0);
let mut voters = TrimmedVoters::<T>::new(snapshot.voters.clone(), desired_targets).await?;
let (solution, score, solution_or_snapshot_size, trim_status) = mine_solution::<T>(
solver.clone(),
snapshot.targets.clone(),
voters.to_voters(),
desired_targets,
).await?;
if !trim_status.is_trimmed() {
return Ok(MinedSolution {
round,
desired_targets,
snapshot,
minimum_untrusted_score,
solution,
score,
solution_or_snapshot_size,
});
}
prometheus::on_trim_attempt();
let mut l = 1;
let mut h = voters.len();
let mut best_solution = None;
while l <= h {
let mid = ((h - 1) / 2) + l;
let next_state = voters.trim(mid)?;
let (solution, score, solution_or_snapshot_size, trim_status) = mine_solution::<T>(
solver.clone(),
snapshot.targets.clone(),
next_state.to_voters(),
desired_targets,
).await?;
if !trim_status.is_trimmed() {
best_solution = Some((solution, score, solution_or_snapshot_size));
h = mid - 1;
} else {
l = mid + 1;
} ,
}
if let Some((solution, score, solution_or_snapshot_size)) = best_solution {
prometheus::on_trim_success();
Ok(MinedSolution {
round,
desired_targets,
snapshot,
minimum_untrusted_score,
solution,
score,
solution_or_snapshot_size,
})
} else {
Err(Error::Feasibility("Failed pre-trim length".to_string()))
}
}
pub struct MinedSolution<T: MinerConfig> {
round: u32,
desired_targets: u32,
snapshot: RoundSnapshot,
minimum_untrusted_score: Option<ElectionScore>,
solution: T::Solution,
score: ElectionScore,
solution_or_snapshot_size: SolutionOrSnapshotSize,
}
impl<T> MinedSolution<T>
where
T: MinerConfig<AccountId = AccountId, MaxVotesPerVoter = static_types::MaxVotesPerVoter>
+ Send
+ Sync
+ 'static,
T::Solution: Send,
{
pub fn solution(&self) -> T::Solution {
self.solution.clone()
}
pub fn score(&self) -> ElectionScore {
self.score
}
pub fn size(&self) -> SolutionOrSnapshotSize {
self.solution_or_snapshot_size
}
pub fn feasibility_check(&self) => Result<ReadySolution<AccountId, T::MaxWinners>, Error> {
match Miner::<T>::feasibility_check(
RawSolution { solution: self.solution.clone(), score: self.score, round: self.round },
pallet_election_provider_multi_phase::ElectionCompute::Signed,
self.desired_targets,
self.snapshot.clone(),
self.round,
self.minimum_untrusted_score,
) {
Ok(ready_solution) => Ok(ready_solution),
Err(e) => {
log::error!(target: LOG_TARGET, "Solution feasibility error {:?}", e);
Err(Error::Feasibility(format!("{:?}", e)))
},
}
}
}
impl<T: MinerConfig> std::fmt::Debug for MinedSolution<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MindedSolution")
.field("round", &self.round)
.field("desired_targets", &self.desired_targets)
.field("score", &self.score)
.finish()
}
}
fn make_type<T: scale_info::TypeInfo + 'static>() -> (TypeId, PortableRegistry) {
let m = scale_info::MetaType::new::<T>();
let mut types = scale_info::Registry::new();
let id = types.register_type(&m);
let portable_registry: PortableRegistry = types.into();
(id.id, portable_registry)
}
fn to_scale_value<T: scale_info::TypeInfo + 'static + Encode>(val: T) -> Result<Value, Error> {
let (ty_id, types) = make_type::<T>();
let bytes = val.encode();
decode_as_type(&mut bytes.as_ref(), ty_id, &types)
.map(|v| v.remote_context())
.map_err(|e| {
Error::DynamicTransaction(format!(
"Failed to decode {}: {:?}",
std::any::type_name::<T>(),
e
))
})
}
pub async fn runtime_api_solution_weight<S: Encode + NposSolution + TypeInfo + 'static>(
raw_solution: RawSolution<S>,
witness: SolutionOrSnapshotSize,
) -> Result<Weight, Error> {
let tx = unsigned_solution(raw_solution, witness)?;
let client = SHARED_CLIENT.get().expect("shared client is configured as start; qed");
let call_data = {
let mut buffer = Vec::new();
let encoded_call = client.chain_api().tx().call_data(&tx).unwrap();
let encoded_len = encoded_call.len() as u32;
buffer.extend(encoded_call);
encoded_len.encode_to(&mut buffer);
buffer
};
let bytes = client
.rpc()
.state_call("TransactionPaymentCallApi_query_call_info", Some(&call_data), None)
.await?;
let info: RuntimeDispatchInfo = Decode::decode(&mut bytes.as_ref())?;
log::trace!(
target: LOG_TARGET,
"Received weight of `Solution Extrnsic` from remote node: {:?}",
info.weight
);
Ok(info.weight)
}
pub fn mock_voters(voters: u32, desired_targets: u16) -> Option<(u32, u16)> {
if voters >= desired_targets as u32 {
Some((0..voters).zip((0..desired_targets).cycle()).collect())
} else {
None
}
}
#[cfg(test)]
#[test]
fn mock_votes_works() {
assert_eq!(mock_voters(3, 2), Some(vec![(0, 0), (1, 1), (2, 0)]));
assert_eq!(mock_voters(3, 3), Some(vec![(0, 0), (1, 1), (2, 2)]));
assert_eq!(mock_voters(2, 3), None);
}