ghost-node/pallets/slow-clap/src/lib.rs
Uncle Stinky 591cce1fb1
rustfmt the ghost-slow-clap pallet
Signed-off-by: Uncle Stinky <uncle.stinky@ghostchain.io>
2025-06-25 18:20:10 +03:00

1258 lines
46 KiB
Rust

// Ensure we're `no_std` when compiling for Wasm.
#![cfg_attr(not(feature = "std"), no_std)]
use codec::{Decode, Encode, MaxEncodedLen};
use scale_info::TypeInfo;
use serde::{Deserialize, Deserializer};
use frame_support::{
pallet_prelude::*,
traits::{
tokens::fungible::{Inspect, Mutate},
EstimateNextSessionRotation, Get, OneSessionHandler, ValidatorSet,
ValidatorSetWithIdentification,
},
WeakBoundedVec,
};
use frame_system::{
offchain::{SendTransactionTypes, SubmitTransaction},
pallet_prelude::*,
};
pub use pallet::*;
use sp_core::H256;
use sp_runtime::{
offchain::{
self as rt_offchain,
storage::{MutateStorageError, StorageRetrievalError, StorageValueRef},
storage_lock::{StorageLock, Time},
HttpError,
},
traits::{BlockNumberProvider, Convert, Saturating},
Perbill, RuntimeAppPublic, RuntimeDebug, SaturatedConversion,
};
use sp_staking::{
offence::{Kind, Offence, ReportOffence},
SessionIndex,
};
use sp_std::{collections::btree_map::BTreeMap, prelude::*, vec::Vec};
use ghost_networks::{
NetworkData, NetworkDataBasicHandler, NetworkDataInspectHandler, NetworkDataMutateHandler,
NetworkType,
};
pub mod weights;
pub use crate::weights::WeightInfo;
mod benchmarking;
mod mock;
mod tests;
pub mod sr25519 {
mod app_sr25519 {
use sp_application_crypto::{app_crypto, sr25519, KeyTypeId};
const SLOW_CLAP: KeyTypeId = KeyTypeId(*b"slow");
app_crypto!(sr25519, SLOW_CLAP);
}
sp_application_crypto::with_pair! {
pub type AuthorityPair = app_sr25519::Pair;
}
pub type AuthoritySignature = app_sr25519::Signature;
pub type AuthorityId = app_sr25519::Public;
}
const LOG_TARGET: &str = "runtime::ghost-slow-clap";
const DB_PREFIX: &[u8] = b"slow_clap::";
const FETCH_TIMEOUT_PERIOD: u64 = 3_000;
const LOCK_BLOCK_EXPIRATION: u64 = 10;
const NUMBER_OF_TOPICS: usize = 3;
pub type AuthIndex = u32;
#[derive(RuntimeDebug, Clone, PartialEq, Deserialize, Encode, Decode)]
struct EvmResponse {
#[serde(default)]
id: Option<u32>,
#[serde(default, deserialize_with = "de_string_to_bytes")]
jsonrpc: Option<Vec<u8>>,
#[serde(default, deserialize_with = "de_string_to_bytes")]
error: Option<Vec<u8>>,
#[serde(default)]
result: Option<EvmResponseType>,
}
#[derive(RuntimeDebug, Clone, PartialEq, Deserialize, Encode, Decode)]
#[serde(untagged)]
enum EvmResponseType {
#[serde(deserialize_with = "de_string_to_u64_pure")]
BlockNumber(u64),
TransactionLogs(Vec<Log>),
}
#[derive(RuntimeDebug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Encode, Decode)]
#[serde(rename_all = "camelCase")]
struct Log {
#[serde(default, deserialize_with = "de_string_to_h256")]
transaction_hash: Option<H256>,
#[serde(default, deserialize_with = "de_string_to_u64")]
block_number: Option<u64>,
#[serde(default, deserialize_with = "de_string_to_vec_of_bytes")]
topics: Vec<Vec<u8>>,
#[serde(default, deserialize_with = "de_string_to_bytes")]
address: Option<Vec<u8>>,
#[serde(default, deserialize_with = "de_string_to_btree_map")]
data: BTreeMap<u128, u128>,
removed: bool,
}
impl Log {
fn is_sufficient(&self) -> bool {
self.transaction_hash.is_some()
&& self.block_number.is_some()
&& self.topics.len() == NUMBER_OF_TOPICS
}
}
pub fn de_string_to_bytes<'de, D>(de: D) -> Result<Option<Vec<u8>>, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(de)?;
Ok(Some(s.as_bytes().to_vec()))
}
pub fn de_string_to_u64<'de, D>(de: D) -> Result<Option<u64>, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(de)?;
let s = if s.starts_with("0x") { &s[2..] } else { &s };
Ok(u64::from_str_radix(s, 16).ok())
}
pub fn de_string_to_u64_pure<'de, D>(de: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(de)?;
let s = if s.starts_with("0x") { &s[2..] } else { &s };
Ok(u64::from_str_radix(s, 16).unwrap_or_default())
}
pub fn de_string_to_h256<'de, D>(de: D) -> Result<Option<H256>, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(de)?;
let start_index = if s.starts_with("0x") { 2 } else { 0 };
let h256: Vec<_> = (start_index..s.len())
.step_by(2)
.map(|i| u8::from_str_radix(&s[i..i + 2], 16).expect("valid u8 symbol; qed"))
.collect();
Ok(Some(H256::from_slice(&h256)))
}
pub fn de_string_to_vec_of_bytes<'de, D>(de: D) -> Result<Vec<Vec<u8>>, D::Error>
where
D: Deserializer<'de>,
{
let strings: Vec<&str> = Deserialize::deserialize(de)?;
Ok(strings
.iter()
.map(|s| {
let start_index = if s.starts_with("0x") { 2 } else { 0 };
(start_index..s.len())
.step_by(2)
.map(|i| u8::from_str_radix(&s[i..i + 2], 16).expect("valid u8 symbol; qed"))
.collect::<Vec<u8>>()
})
.collect::<Vec<Vec<u8>>>())
}
pub fn de_string_to_btree_map<'de, D>(de: D) -> Result<BTreeMap<u128, u128>, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(de)?;
let start_index = if s.starts_with("0x") { 2 } else { 0 };
Ok(BTreeMap::from_iter((start_index..s.len()).step_by(64).map(
|i| {
(
u128::from_str_radix(&s[i..i + 32], 16).expect("valid u8 symbol; qed"),
u128::from_str_radix(&s[i + 32..i + 64], 16).expect("valid u8 symbol; qed"),
)
},
)))
}
#[derive(
RuntimeDebug, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, TypeInfo, MaxEncodedLen,
)]
pub struct Clap<AccountId, NetworkId, Balance> {
pub session_index: SessionIndex,
pub authority_index: AuthIndex,
pub transaction_hash: H256,
pub block_number: u64,
pub removed: bool,
pub network_id: NetworkId,
pub receiver: AccountId,
pub amount: Balance,
}
#[derive(Default, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
pub struct SessionAuthorityInfo {
pub claps: u32,
pub disabled: bool,
}
#[cfg_attr(test, derive(PartialEq))]
enum OffchainErr<NetworkId> {
FailedSigning,
SubmitTransaction,
HttpJsonParsingError,
HttpBytesParsingError,
HttpRequestError(HttpError),
RequestUncompleted,
HttpResponseNotOk(u16),
ErrorInEvmResponse,
NoStoredNetworks,
NotValidator,
StorageRetrievalError(NetworkId),
ConcurrentModificationError(NetworkId),
UtxoNotImplemented(NetworkId),
UnknownNetworkType(NetworkId),
OffchainTimeoutPeriod(NetworkId),
TooManyRequests(NetworkId),
}
impl<NetworkId: core::fmt::Debug> core::fmt::Debug for OffchainErr<NetworkId> {
fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
match *self {
OffchainErr::FailedSigning => write!(fmt, "Failed to sign clap."),
OffchainErr::SubmitTransaction => write!(fmt, "Failed to submit transaction."),
OffchainErr::HttpJsonParsingError => write!(fmt, "Failed to parse evm response as JSON."),
OffchainErr::HttpBytesParsingError => write!(fmt, "Failed to parse evm response as bytes."),
OffchainErr::HttpRequestError(http_error) => match http_error {
HttpError::DeadlineReached => write!(fmt, "Requested action couldn't been completed within a deadline."),
HttpError::IoError => write!(fmt, "There was an IO error while processing the request."),
HttpError::Invalid => write!(fmt, "The ID of the request is invalid in this context."),
},
OffchainErr::ConcurrentModificationError(ref network_id) => write!(fmt, "The underlying DB failed to update due to a concurrent modification for network #{:?}.", network_id),
OffchainErr::StorageRetrievalError(ref network_id) => write!(fmt, "Storage value found for network #{:?} but it's undecodable.", network_id),
OffchainErr::RequestUncompleted => write!(fmt, "Failed to complete request."),
OffchainErr::HttpResponseNotOk(code) => write!(fmt, "Http response returned code {:?}.", code),
OffchainErr::ErrorInEvmResponse => write!(fmt, "Error in evm reponse."),
OffchainErr::NoStoredNetworks => write!(fmt, "No networks stored for the offchain slow claps."),
OffchainErr::NotValidator => write!(fmt, "Not a validator for slow clap, `--validator` flag needed."),
OffchainErr::UtxoNotImplemented(ref network_id) => write!(fmt, "Network #{:?} is marked as UTXO, which is not implemented yet.", network_id),
OffchainErr::UnknownNetworkType(ref network_id) => write!(fmt, "Unknown type for network #{:?}.", network_id),
OffchainErr::OffchainTimeoutPeriod(ref network_id) => write!(fmt, "Offchain request should be in-flight for network #{:?}.", network_id),
OffchainErr::TooManyRequests(ref network_id) => write!(fmt, "Too many requests over RPC endpoint for network #{:?}.", network_id),
}
}
}
pub type NetworkIdOf<T> = <<T as Config>::NetworkDataHandler as NetworkDataBasicHandler>::NetworkId;
pub type BalanceOf<T> =
<<T as Config>::Currency as Inspect<<T as frame_system::Config>::AccountId>>::Balance;
pub type ValidatorId<T> = <<T as Config>::ValidatorSet as ValidatorSet<
<T as frame_system::Config>::AccountId,
>>::ValidatorId;
pub type IdentificationTuple<T> = (
ValidatorId<T>,
<<T as Config>::ValidatorSet as ValidatorSetWithIdentification<
<T as frame_system::Config>::AccountId,
>>::Identification,
);
type OffchainResult<T, A> = Result<A, OffchainErr<NetworkIdOf<T>>>;
#[frame_support::pallet]
pub mod pallet {
use super::*;
const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
#[pallet::pallet]
#[pallet::storage_version(STORAGE_VERSION)]
#[pallet::without_storage_info]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config: SendTransactionTypes<Call<Self>> + frame_system::Config {
type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
type AuthorityId: Member
+ Parameter
+ RuntimeAppPublic
+ Ord
+ MaybeSerializeDeserialize
+ MaxEncodedLen;
type NextSessionRotation: EstimateNextSessionRotation<BlockNumberFor<Self>>;
type ValidatorSet: ValidatorSetWithIdentification<Self::AccountId>;
type Currency: Inspect<Self::AccountId> + Mutate<Self::AccountId>;
type NetworkDataHandler: NetworkDataBasicHandler
+ NetworkDataInspectHandler<NetworkData>
+ NetworkDataMutateHandler<NetworkData, BalanceOf<Self>>;
type BlockNumberProvider: BlockNumberProvider<BlockNumber = BlockNumberFor<Self>>;
type ReportUnresponsiveness: ReportOffence<
Self::AccountId,
IdentificationTuple<Self>,
ThrottlingOffence<IdentificationTuple<Self>>,
>;
#[pallet::constant]
type MaxAuthorities: Get<u32>;
#[pallet::constant]
type ApplauseThreshold: Get<u32>;
#[pallet::constant]
type OffenceThreshold: Get<u32>;
#[pallet::constant]
type UnsignedPriority: Get<TransactionPriority>;
#[pallet::constant]
type HistoryDepth: Get<SessionIndex>;
type WeightInfo: WeightInfo;
}
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
pub enum Event<T: Config> {
AuthoritiesEquilibrium,
SomeAuthoritiesTrottling {
throttling: Vec<IdentificationTuple<T>>,
},
Clapped {
authority_id: AuthIndex,
network_id: NetworkIdOf<T>,
transaction_hash: H256,
receiver: T::AccountId,
amount: BalanceOf<T>,
},
Applaused {
network_id: NetworkIdOf<T>,
receiver: T::AccountId,
received_amount: BalanceOf<T>,
},
}
#[pallet::error]
pub enum Error<T> {
NotEnoughClaps,
NotAnAuthority,
CurrentValidatorIsDisabled,
AlreadyClapped,
UnregisteredClapRemove,
TooMuchAuthorities,
CouldNotAccumulateCommission,
CouldNotAccumulateIncomingImbalance,
CouldNotIncreaseGatekeeperAmount,
}
#[pallet::storage]
#[pallet::getter(fn received_claps)]
pub(super) type ReceivedClaps<T: Config> = StorageNMap<
_,
(
NMapKey<Twox64Concat, SessionIndex>,
NMapKey<Twox64Concat, H256>,
NMapKey<Twox64Concat, H256>,
),
BoundedBTreeSet<AuthIndex, T::MaxAuthorities>,
ValueQuery,
>;
#[pallet::storage]
#[pallet::getter(fn applauses_for_transaction)]
pub(super) type ApplausesForTransaction<T: Config> = StorageNMap<
_,
(
NMapKey<Twox64Concat, SessionIndex>,
NMapKey<Twox64Concat, H256>,
NMapKey<Twox64Concat, H256>,
),
bool,
ValueQuery,
>;
#[pallet::storage]
#[pallet::getter(fn claps_in_session)]
pub(super) type ClapsInSession<T: Config> = StorageMap<
_,
Twox64Concat,
SessionIndex,
BTreeMap<AuthIndex, SessionAuthorityInfo>,
ValueQuery,
>;
#[pallet::storage]
#[pallet::getter(fn authorities)]
pub(super) type Authorities<T: Config> = StorageMap<
_,
Twox64Concat,
SessionIndex,
WeakBoundedVec<T::AuthorityId, T::MaxAuthorities>,
ValueQuery,
>;
#[pallet::genesis_config]
#[derive(frame_support::DefaultNoBound)]
pub struct GenesisConfig<T: Config> {
pub authorities: Vec<T::AuthorityId>,
}
#[pallet::genesis_build]
impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
fn build(&self) {
if !self.authorities.is_empty() {
Pallet::<T>::initialize_authorities(self.authorities.clone());
}
}
}
#[pallet::call]
impl<T: Config> Pallet<T> {
#[pallet::call_index(0)]
#[pallet::weight((T::WeightInfo::slow_clap(), DispatchClass::Normal, Pays::No))]
pub fn slow_clap(
origin: OriginFor<T>,
clap: Clap<T::AccountId, NetworkIdOf<T>, BalanceOf<T>>,
// since signature verification is done in `validate_unsigned`
// we can skip doing it here again.
_signature: <T::AuthorityId as RuntimeAppPublic>::Signature,
) -> DispatchResult {
ensure_none(origin)?;
Self::try_slow_clap(&clap)?;
Ok(())
}
#[pallet::call_index(1)]
#[pallet::weight(T::WeightInfo::self_applause())]
pub fn self_applause(
origin: OriginFor<T>,
network_id: NetworkIdOf<T>,
session_index: SessionIndex,
transaction_hash: H256,
receiver: T::AccountId,
amount: BalanceOf<T>,
) -> DispatchResult {
let _ = ensure_signed(origin)?;
Self::applause_if_posible(
network_id,
session_index,
transaction_hash,
receiver,
amount,
)
}
}
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn offchain_worker(now: BlockNumberFor<T>) {
match Self::start_slow_clapping(now) {
Ok(iter) => {
for result in iter.into_iter() {
if let Err(e) = result {
log::info!(
target: LOG_TARGET,
"👏 Skipping slow clap at {:?}: {:?}",
now,
e,
)
}
}
}
Err(e) => log::info!(
target: LOG_TARGET,
"👏 Could not start slow clap at {:?}: {:?}",
now,
e,
),
}
}
}
#[pallet::validate_unsigned]
impl<T: Config> ValidateUnsigned for Pallet<T> {
type Call = Call<T>;
fn validate_unsigned(_source: TransactionSource, call: &Self::Call) -> TransactionValidity {
if let Call::slow_clap { clap, signature } = call {
let authorities = Authorities::<T>::get(&clap.session_index);
let authority = match authorities.get(clap.authority_index as usize) {
Some(authority) => authority,
None => return InvalidTransaction::BadSigner.into(),
};
let signature_valid =
clap.using_encoded(|encoded_clap| authority.verify(&encoded_clap, signature));
if !signature_valid {
return InvalidTransaction::BadProof.into();
}
ValidTransaction::with_tag_prefix("SlowClap")
.priority(T::UnsignedPriority::get())
.and_provides(signature)
.longevity(LOCK_BLOCK_EXPIRATION)
.propagate(true)
.build()
} else {
InvalidTransaction::Call.into()
}
}
}
}
impl<T: Config> Pallet<T> {
fn create_storage_key(first: &[u8], second: &[u8]) -> Vec<u8> {
let mut key = DB_PREFIX.to_vec();
key.extend(first);
key.extend(second);
key
}
fn read_persistent_offchain_storage<R: codec::Decode>(
storage_key: &[u8],
default_value: R,
) -> R {
StorageValueRef::persistent(&storage_key)
.get::<R>()
.ok()
.flatten()
.unwrap_or(default_value)
}
fn generate_unique_hash(
receiver: &T::AccountId,
amount: &BalanceOf<T>,
network_id: &NetworkIdOf<T>,
) -> H256 {
let mut clap_args_str = receiver.encode();
clap_args_str.extend(&amount.encode());
clap_args_str.extend(&network_id.encode());
H256::from_slice(&sp_io::hashing::keccak_256(&clap_args_str)[..])
}
fn u64_to_hexadecimal_bytes(value: u64) -> Vec<u8> {
let mut hex_str = Vec::new();
hex_str.push(b'0');
hex_str.push(b'x');
if value == 0 {
hex_str.push(b'0');
return hex_str;
}
for i in (0..16).rev() {
let nibble = (value >> (i * 4)) & 0xF;
if nibble != 0 || hex_str.len() > 2 {
hex_str.push(match nibble {
0..=9 => b'0' + nibble as u8,
10..=15 => b'a' + (nibble - 10) as u8,
_ => unreachable!(),
});
}
}
hex_str
}
fn try_slow_clap(clap: &Clap<T::AccountId, NetworkIdOf<T>, BalanceOf<T>>) -> DispatchResult {
let authorities = Authorities::<T>::get(&clap.session_index);
ensure!(
authorities.get(clap.authority_index as usize).is_some(),
Error::<T>::NotAnAuthority
);
let clap_unique_hash =
Self::generate_unique_hash(&clap.receiver, &clap.amount, &clap.network_id);
let received_claps_key = (
clap.session_index,
&clap.transaction_hash,
&clap_unique_hash,
);
let number_of_received_claps =
ReceivedClaps::<T>::try_mutate(&received_claps_key, |tree_of_claps| {
let number_of_claps = tree_of_claps.len();
match (tree_of_claps.contains(&clap.authority_index), clap.removed) {
(true, true) => tree_of_claps
.remove(&clap.authority_index)
.then(|| number_of_claps.saturating_sub(1))
.ok_or(Error::<T>::UnregisteredClapRemove),
(true, false) => Err(Error::<T>::AlreadyClapped),
(false, true) => Err(Error::<T>::UnregisteredClapRemove),
(false, false) => tree_of_claps
.try_insert(clap.authority_index)
.map(|_| number_of_claps.saturating_add(1))
.map_err(|_| Error::<T>::TooMuchAuthorities),
}
})?;
ClapsInSession::<T>::try_mutate(&clap.session_index, |claps_details| {
if claps_details
.get(&clap.authority_index)
.map(|x| x.disabled)
.unwrap_or_default()
{
return Err(Error::<T>::CurrentValidatorIsDisabled);
}
(*claps_details)
.entry(clap.authority_index)
.and_modify(|individual| (*individual).claps.saturating_inc())
.or_insert(SessionAuthorityInfo {
claps: 1u32,
disabled: false,
});
Ok(())
})?;
Self::deposit_event(Event::<T>::Clapped {
authority_id: clap.authority_index,
network_id: clap.network_id,
transaction_hash: clap.transaction_hash,
receiver: clap.receiver.clone(),
amount: clap.amount,
});
let enough_authorities =
Perbill::from_rational(number_of_received_claps as u32, authorities.len() as u32)
> Perbill::from_percent(T::ApplauseThreshold::get());
if enough_authorities {
let _ = Self::try_applause(&clap, &received_claps_key).inspect_err(|error_msg| {
log::info!(
target: LOG_TARGET,
"👏 Could not applause because of: {:?}",
error_msg,
)
});
}
Ok(())
}
fn try_applause(
clap: &Clap<T::AccountId, NetworkIdOf<T>, BalanceOf<T>>,
received_claps_key: &(SessionIndex, &H256, &H256),
) -> DispatchResult {
ApplausesForTransaction::<T>::try_mutate(received_claps_key, |is_applaused| {
if *is_applaused || T::NetworkDataHandler::is_nullification_period() {
return Ok(());
}
let commission = T::NetworkDataHandler::get(&clap.network_id)
.map(|network_data| Perbill::from_parts(network_data.incoming_fee))
.unwrap_or_default()
.mul_ceil(clap.amount);
let final_amount = clap.amount.saturating_sub(commission);
let _ =
T::NetworkDataHandler::increase_gatekeeper_amount(&clap.network_id, &clap.amount)
.map_err(|_| Error::<T>::CouldNotIncreaseGatekeeperAmount)?;
let _ = T::NetworkDataHandler::accumulate_incoming_imbalance(&final_amount)
.map_err(|_| Error::<T>::CouldNotAccumulateIncomingImbalance)?;
let _ = T::NetworkDataHandler::accumulate_commission(&commission)
.map_err(|_| Error::<T>::CouldNotAccumulateCommission)?;
if final_amount > T::Currency::minimum_balance() {
T::Currency::mint_into(&clap.receiver, final_amount)?;
}
*is_applaused = true;
Self::deposit_event(Event::<T>::Applaused {
network_id: clap.network_id,
receiver: clap.receiver.clone(),
received_amount: final_amount,
});
Ok(())
})
}
fn applause_if_posible(
network_id: NetworkIdOf<T>,
session_index: SessionIndex,
transaction_hash: H256,
receiver: T::AccountId,
amount: BalanceOf<T>,
) -> DispatchResult {
let clap_unique_hash = Self::generate_unique_hash(&receiver, &amount, &network_id);
let received_claps_key = (session_index, &transaction_hash, &clap_unique_hash);
let clap = Clap {
authority_index: Default::default(),
block_number: Default::default(),
removed: false,
session_index,
network_id,
receiver,
amount,
transaction_hash,
};
let enough_authorities = Perbill::from_rational(
ReceivedClaps::<T>::get(&received_claps_key).len() as u32,
Authorities::<T>::get(session_index).len() as u32,
) > Perbill::from_percent(T::ApplauseThreshold::get());
ensure!(enough_authorities, Error::<T>::NotEnoughClaps);
Self::try_applause(&clap, &received_claps_key)?;
Ok(())
}
fn start_slow_clapping(
block_number: BlockNumberFor<T>,
) -> OffchainResult<T, impl Iterator<Item = OffchainResult<T, ()>>> {
sp_io::offchain::is_validator()
.then(|| ())
.ok_or(OffchainErr::NotValidator)?;
let session_index = T::ValidatorSet::session_index();
let networks_len = T::NetworkDataHandler::iter().count();
let network_in_use = T::NetworkDataHandler::iter()
.nth(
block_number
.into()
.as_usize()
.checked_rem(networks_len)
.unwrap_or_default(),
)
.ok_or(OffchainErr::NoStoredNetworks)?;
let network_id_encoded = network_in_use.0.encode();
let last_timestamp_key = Self::create_storage_key(b"last-timestamp-", &network_id_encoded);
let rate_limit_delay_key = Self::create_storage_key(b"rate-limit-", &network_id_encoded);
let rate_limit_delay = Self::read_persistent_offchain_storage(
&rate_limit_delay_key,
network_in_use.1.rate_limit_delay,
);
let network_lock_key = Self::create_storage_key(b"network-lock-", &network_id_encoded);
let block_until =
rt_offchain::Duration::from_millis(networks_len as u64 * FETCH_TIMEOUT_PERIOD);
let mut network_lock = StorageLock::<Time>::with_deadline(&network_lock_key, block_until);
network_lock
.try_lock()
.map_err(|_| OffchainErr::OffchainTimeoutPeriod(network_in_use.0))?;
StorageValueRef::persistent(&last_timestamp_key)
.mutate(
|result_timestamp: Result<Option<u64>, StorageRetrievalError>| {
let current_timestmap = sp_io::offchain::timestamp().unix_millis();
match result_timestamp {
Ok(option_timestamp) => match option_timestamp {
Some(stored_timestamp) if stored_timestamp > current_timestmap => {
Err(OffchainErr::TooManyRequests(network_in_use.0).into())
}
_ => Ok(current_timestmap.saturating_add(rate_limit_delay)),
},
Err(_) => Err(OffchainErr::StorageRetrievalError(network_in_use.0).into()),
}
},
)
.map_err(|error| match error {
MutateStorageError::ValueFunctionFailed(offchain_error) => offchain_error,
MutateStorageError::ConcurrentModification(_) => {
OffchainErr::ConcurrentModificationError(network_in_use.0).into()
}
})?;
Ok(
Self::local_authorities(&session_index).map(move |(authority_index, authority_key)| {
Self::do_evm_claps_or_save_block(
authority_index,
authority_key,
session_index,
network_in_use.0,
&network_in_use.1,
)
}),
)
}
fn do_evm_claps_or_save_block(
authority_index: AuthIndex,
authority_key: T::AuthorityId,
session_index: SessionIndex,
network_id: NetworkIdOf<T>,
network_data: &NetworkData,
) -> OffchainResult<T, ()> {
let network_id_encoded = network_id.encode();
let block_number_key = Self::create_storage_key(b"block-", &network_id_encoded);
let block_distance_key = Self::create_storage_key(b"block-distance-", &network_id_encoded);
let endpoint_key = Self::create_storage_key(b"endpoint-", &network_id_encoded);
let rpc_endpoint = Self::read_persistent_offchain_storage(
&endpoint_key,
network_data.default_endpoint.clone(),
);
let max_block_distance = Self::read_persistent_offchain_storage(
&block_distance_key,
network_data.block_distance,
);
StorageValueRef::persistent(&block_number_key)
.mutate(
|result_block_range: Result<Option<(u64, u64)>, StorageRetrievalError>| {
match result_block_range {
Ok(maybe_block_range) => {
let request_body = match maybe_block_range {
Some((from_block, to_block))
if from_block < to_block.saturating_sub(1) =>
{
Self::prepare_request_body_for_latest_transfers(
from_block,
to_block.saturating_sub(1),
network_data,
)
}
_ => Self::prepare_request_body_for_latest_block(network_data),
};
let response_bytes =
Self::fetch_from_remote(&rpc_endpoint, &request_body)?;
match network_data.network_type {
NetworkType::Evm => {
let maybe_new_evm_block = Self::apply_evm_response(
&response_bytes,
authority_index,
authority_key,
session_index,
network_id,
)?;
let estimated_block = maybe_new_evm_block
.map(|new_evm_block| {
new_evm_block
.saturating_sub(network_data.finality_delay)
})
.unwrap_or_default();
Ok(match maybe_block_range {
Some((from_block, to_block)) => match maybe_new_evm_block {
Some(_) => {
match estimated_block.checked_sub(from_block) {
Some(current_distance)
if current_distance
< max_block_distance =>
{
(from_block, estimated_block)
}
_ => (
from_block,
from_block
.saturating_add(max_block_distance)
.min(estimated_block),
),
}
}
None => (to_block, to_block),
},
None => (estimated_block, estimated_block),
})
}
NetworkType::Utxo => {
Err(OffchainErr::UtxoNotImplemented(network_id).into())
}
_ => Err(OffchainErr::UnknownNetworkType(network_id).into()),
}
}
Err(_) => Err(OffchainErr::StorageRetrievalError(network_id).into()),
}
},
)
.map_err(|error| match error {
MutateStorageError::ValueFunctionFailed(offchain_error) => offchain_error,
MutateStorageError::ConcurrentModification(_) => {
OffchainErr::ConcurrentModificationError(network_id).into()
}
})
.map(|_| ())
}
fn apply_evm_response(
response_bytes: &[u8],
authority_index: AuthIndex,
authority_key: T::AuthorityId,
session_index: SessionIndex,
network_id: NetworkIdOf<T>,
) -> OffchainResult<T, Option<u64>> {
match Self::parse_evm_response(&response_bytes)? {
EvmResponseType::BlockNumber(new_evm_block) => {
log::info!(
target: LOG_TARGET,
"🧐 New evm block #{:?} found for network {:?}",
new_evm_block,
network_id,
);
Ok(Some(new_evm_block))
}
EvmResponseType::TransactionLogs(evm_logs) => {
let claps: Vec<_> = evm_logs
.iter()
.filter_map(|log| {
log.is_sufficient().then(|| Clap {
authority_index,
session_index,
network_id,
removed: log.removed,
receiver: T::AccountId::decode(&mut &log.topics[1][0..32])
.expect("32 bytes always construct an AccountId32"),
amount: u128::from_be_bytes(
log.topics[2][16..32]
.try_into()
.expect("amount is valid hex; qed"),
)
.saturated_into::<BalanceOf<T>>(),
transaction_hash: log
.transaction_hash
.clone()
.expect("tx hash exists; qed"),
block_number: log.block_number.expect("block number exists; qed"),
})
})
.collect();
log::info!(
target: LOG_TARGET,
"🧐 {:?} evm logs found for network {:?}",
claps.len(),
network_id,
);
for clap in claps {
let signature = authority_key
.sign(&clap.encode())
.ok_or(OffchainErr::FailedSigning)?;
let call = Call::slow_clap { clap, signature };
SubmitTransaction::<T, Call<T>>::submit_unsigned_transaction(call.into())
.map_err(|_| OffchainErr::SubmitTransaction)?;
}
Ok(None)
}
}
}
fn local_authorities(
session_index: &SessionIndex,
) -> impl Iterator<Item = (u32, T::AuthorityId)> {
let authorities = Authorities::<T>::get(session_index);
let mut local_authorities = T::AuthorityId::all();
local_authorities.sort();
authorities
.into_iter()
.enumerate()
.filter_map(move |(index, authority)| {
local_authorities
.binary_search(&authority)
.ok()
.map(|location| (index as u32, local_authorities[location].clone()))
})
}
fn fetch_from_remote(rpc_endpoint: &[u8], request_body: &[u8]) -> OffchainResult<T, Vec<u8>> {
let rpc_endpoint_str =
core::str::from_utf8(rpc_endpoint).expect("rpc endpoint valid str; qed");
let request_body_str =
core::str::from_utf8(request_body).expect("request body valid str: qed");
let deadline = sp_io::offchain::timestamp()
.add(rt_offchain::Duration::from_millis(FETCH_TIMEOUT_PERIOD));
let pending = rt_offchain::http::Request::post(&rpc_endpoint_str, vec![request_body_str])
.add_header("Accept", "application/json")
.add_header("Content-Type", "application/json")
.deadline(deadline)
.send()
.map_err(|err| OffchainErr::HttpRequestError(err))?;
let response = pending
.try_wait(deadline)
.map_err(|_| OffchainErr::RequestUncompleted)?
.map_err(|_| OffchainErr::RequestUncompleted)?;
if response.code != 200 {
return Err(OffchainErr::HttpResponseNotOk(response.code));
}
Ok(response.body().collect::<Vec<u8>>())
}
fn prepare_request_body_for_latest_block(network_data: &NetworkData) -> Vec<u8> {
match network_data.network_type {
NetworkType::Evm => {
b"{\"id\":0,\"jsonrpc\":\"2.0\",\"method\":\"eth_blockNumber\"}".to_vec()
}
_ => Default::default(),
}
}
fn prepare_request_body_for_latest_transfers(
from_block: u64,
to_block: u64,
network_data: &NetworkData,
) -> Vec<u8> {
match network_data.network_type {
NetworkType::Evm => {
let mut body =
b"{\"id\":0,\"jsonrpc\":\"2.0\",\"method\":\"eth_getLogs\",\"params\":[{"
.to_vec();
body.extend(b"\"fromBlock\":\"".to_vec());
body.extend(Self::u64_to_hexadecimal_bytes(from_block));
body.extend(b"\",\"toBlock\":\"".to_vec());
body.extend(Self::u64_to_hexadecimal_bytes(to_block));
body.extend(b"\",\"address\":\"".to_vec());
body.extend(network_data.gatekeeper.to_vec());
body.extend(b"\",\"topics\":[\"".to_vec());
body.extend(network_data.topic_name.to_vec());
body.extend(b"\"]}]}".to_vec());
body
}
_ => Default::default(),
}
}
fn parse_evm_response(response_bytes: &[u8]) -> OffchainResult<T, EvmResponseType> {
let response_str = sp_std::str::from_utf8(&response_bytes)
.map_err(|_| OffchainErr::HttpBytesParsingError)?;
let response_result: EvmResponse =
serde_json::from_str(&response_str).map_err(|_| OffchainErr::HttpJsonParsingError)?;
if response_result.error.is_some() {
return Err(OffchainErr::ErrorInEvmResponse);
}
Ok(response_result
.result
.ok_or(OffchainErr::ErrorInEvmResponse)?)
}
fn calculate_median_claps(session_index: &SessionIndex) -> u32 {
let mut claps_in_session = ClapsInSession::<T>::get(session_index)
.values()
.filter_map(|value| (!value.disabled).then(|| value.claps))
.collect::<Vec<_>>();
if claps_in_session.is_empty() {
return 0;
}
claps_in_session.sort();
let number_of_claps = claps_in_session.len();
if number_of_claps % 2 == 0 {
let mid_left = claps_in_session[number_of_claps / 2 - 1];
let mid_right = claps_in_session[number_of_claps / 2];
(mid_left + mid_right) / 2
} else {
claps_in_session[number_of_claps / 2]
}
}
fn is_good_actor(
authority_index: usize,
session_index: SessionIndex,
median_claps: u32,
) -> bool {
if median_claps == 0 {
return true;
}
let number_of_claps = ClapsInSession::<T>::get(session_index)
.entry(authority_index as AuthIndex)
.or_default()
.claps;
let authority_deviation = if number_of_claps < median_claps {
Perbill::from_rational(median_claps - number_of_claps, median_claps)
} else {
Perbill::from_rational(number_of_claps - median_claps, median_claps)
};
authority_deviation < Perbill::from_percent(T::OffenceThreshold::get())
}
fn initialize_authorities(authorities: Vec<T::AuthorityId>) {
let session_index = T::ValidatorSet::session_index();
assert!(
Authorities::<T>::get(&session_index).is_empty(),
"Authorities are already initilized!"
);
let bounded_authorities = WeakBoundedVec::<_, T::MaxAuthorities>::try_from(authorities)
.expect("more than the maximum number of authorities");
if let Some(target_session_index) = session_index.checked_sub(T::HistoryDepth::get()) {
Self::clear_history(&target_session_index);
}
Authorities::<T>::set(&session_index, bounded_authorities);
ClapsInSession::<T>::set(&session_index, Default::default());
}
fn clear_history(target_session_index: &SessionIndex) {
ClapsInSession::<T>::remove(target_session_index);
let mut cursor = ReceivedClaps::<T>::clear_prefix((target_session_index,), u32::MAX, None);
debug_assert!(cursor.maybe_cursor.is_none());
cursor =
ApplausesForTransaction::<T>::clear_prefix((target_session_index,), u32::MAX, None);
debug_assert!(cursor.maybe_cursor.is_none());
}
#[cfg(test)]
fn set_test_authorities(session_index: SessionIndex, authorities: Vec<T::AuthorityId>) {
let bounded_authorities = WeakBoundedVec::<_, T::MaxAuthorities>::try_from(authorities)
.expect("more than the maximum number of authorities");
Authorities::<T>::set(session_index, bounded_authorities);
}
#[cfg(feature = "runtime-benchmarks")]
fn trigger_nullification_for_benchmark() {
T::NetworkDataHandler::trigger_nullification();
}
}
impl<T: Config> sp_runtime::BoundToRuntimeAppPublic for Pallet<T> {
type Public = T::AuthorityId;
}
impl<T: Config> BlockNumberProvider for Pallet<T> {
type BlockNumber = BlockNumberFor<T>;
fn current_block_number() -> Self::BlockNumber {
T::BlockNumberProvider::current_block_number()
}
}
impl<T: Config> OneSessionHandler<T::AccountId> for Pallet<T> {
type Key = T::AuthorityId;
fn on_genesis_session<'a, I: 'a>(validators: I)
where
I: Iterator<Item = (&'a T::AccountId, T::AuthorityId)>,
{
let authorities = validators.map(|x| x.1).collect::<Vec<_>>();
Self::initialize_authorities(authorities);
}
fn on_new_session<'a, I: 'a>(_changed: bool, validators: I, _queued_validators: I)
where
I: Iterator<Item = (&'a T::AccountId, T::AuthorityId)>,
{
let authorities = validators.map(|x| x.1).collect::<Vec<_>>();
Self::initialize_authorities(authorities);
}
fn on_before_session_ending() {
let session_index = T::ValidatorSet::session_index();
let validators = T::ValidatorSet::validators();
let authorities = Authorities::<T>::get(&session_index);
let median_claps = Self::calculate_median_claps(&session_index);
let offenders = validators
.into_iter()
.enumerate()
.filter(|(index, _)| !Self::is_good_actor(*index, session_index, median_claps))
.filter_map(|(_, id)| {
<T::ValidatorSet as ValidatorSetWithIdentification<T::AccountId>>::IdentificationOf::convert(
id.clone(),
).map(|full_id| (id, full_id))
})
.collect::<Vec<IdentificationTuple<T>>>();
if offenders.is_empty() {
Self::deposit_event(Event::<T>::AuthoritiesEquilibrium);
} else {
Self::deposit_event(Event::<T>::SomeAuthoritiesTrottling {
throttling: offenders.clone(),
});
let validator_set_count = authorities.len() as u32;
let offence = ThrottlingOffence {
session_index,
validator_set_count,
offenders,
};
if let Err(e) = T::ReportUnresponsiveness::report_offence(vec![], offence) {
sp_runtime::print(e)
}
}
}
fn on_disabled(validator_index: u32) {
let session_index = T::ValidatorSet::session_index();
ClapsInSession::<T>::mutate(&session_index, |claps_details| {
(*claps_details)
.entry(validator_index as AuthIndex)
.and_modify(|individual| (*individual).disabled = true)
.or_insert(SessionAuthorityInfo {
claps: 0u32,
disabled: true,
});
});
}
}
#[derive(RuntimeDebug, TypeInfo)]
#[cfg_attr(feature = "std", derive(Clone, PartialEq, Eq))]
pub struct ThrottlingOffence<Offender> {
pub session_index: SessionIndex,
pub validator_set_count: u32,
pub offenders: Vec<Offender>,
}
impl<Offender: Clone> Offence<Offender> for ThrottlingOffence<Offender> {
const ID: Kind = *b"slow-clap:throtl";
type TimeSlot = SessionIndex;
fn offenders(&self) -> Vec<Offender> {
self.offenders.clone()
}
fn session_index(&self) -> SessionIndex {
self.session_index
}
fn validator_set_count(&self) -> u32 {
self.validator_set_count
}
fn time_slot(&self) -> Self::TimeSlot {
self.session_index
}
fn slash_fraction(&self, offenders_count: u32) -> Perbill {
if let Some(threshold) = offenders_count.checked_sub(self.validator_set_count / 10 + 1) {
let x = Perbill::from_rational(3 * threshold, self.validator_set_count);
x.saturating_mul(Perbill::from_percent(7))
} else {
Perbill::default()
}
}
}