rate limit for the rpc endpoint based on network_id, storage guard lock for the current network_id and other minor improvements

Signed-off-by: Uncle Stinky <uncle.stinky@ghostchain.io>
This commit is contained in:
Uncle Stinky 2025-06-25 18:09:29 +03:00
parent 0c3636fe79
commit 8464da831f
Signed by: st1nky
GPG Key ID: 016064BD97603B40
2 changed files with 201 additions and 155 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "ghost-slow-clap" name = "ghost-slow-clap"
version = "0.3.30" version = "0.3.31"
description = "Applause protocol for the EVM bridge" description = "Applause protocol for the EVM bridge"
license.workspace = true license.workspace = true
authors.workspace = true authors.workspace = true

View File

@ -27,6 +27,7 @@ use sp_runtime::{
offchain::{ offchain::{
self as rt_offchain, HttpError, self as rt_offchain, HttpError,
storage::{MutateStorageError, StorageRetrievalError, StorageValueRef}, storage::{MutateStorageError, StorageRetrievalError, StorageValueRef},
storage_lock::{StorageLock, Time},
}, },
traits::{BlockNumberProvider, Convert, Saturating}, traits::{BlockNumberProvider, Convert, Saturating},
}; };
@ -204,10 +205,14 @@ enum OffchainErr<NetworkId> {
RequestUncompleted, RequestUncompleted,
HttpResponseNotOk(u16), HttpResponseNotOk(u16),
ErrorInEvmResponse, ErrorInEvmResponse,
NoStoredNetworks,
NotValidator,
StorageRetrievalError(NetworkId), StorageRetrievalError(NetworkId),
ConcurrentModificationError(NetworkId), ConcurrentModificationError(NetworkId),
UtxoNotImplemented(NetworkId), UtxoNotImplemented(NetworkId),
UnknownNetworkType(NetworkId), UnknownNetworkType(NetworkId),
OffchainTimeoutPeriod(NetworkId),
TooManyRequests(NetworkId),
} }
impl<NetworkId: core::fmt::Debug> core::fmt::Debug for OffchainErr<NetworkId> { impl<NetworkId: core::fmt::Debug> core::fmt::Debug for OffchainErr<NetworkId> {
@ -220,15 +225,20 @@ impl<NetworkId: core::fmt::Debug> core::fmt::Debug for OffchainErr<NetworkId> {
OffchainErr::HttpRequestError(http_error) => match http_error { OffchainErr::HttpRequestError(http_error) => match http_error {
HttpError::DeadlineReached => write!(fmt, "Requested action couldn't been completed within a deadline."), HttpError::DeadlineReached => write!(fmt, "Requested action couldn't been completed within a deadline."),
HttpError::IoError => write!(fmt, "There was an IO error while processing the request."), HttpError::IoError => write!(fmt, "There was an IO error while processing the request."),
HttpError::Invalid => write!(fmt, "The ID of the request is invalid in this context"), HttpError::Invalid => write!(fmt, "The ID of the request is invalid in this context."),
}, },
OffchainErr::ConcurrentModificationError(ref network_id) => write!(fmt, "The underlying DB failed to update due to a concurrent modification for network #{:?}", network_id), OffchainErr::ConcurrentModificationError(ref network_id) => write!(fmt, "The underlying DB failed to update due to a concurrent modification for network #{:?}.", network_id),
OffchainErr::StorageRetrievalError(ref network_id) => write!(fmt, "Storage value found for network #{:?} but it's undecodable", network_id), OffchainErr::StorageRetrievalError(ref network_id) => write!(fmt, "Storage value found for network #{:?} but it's undecodable.", network_id),
OffchainErr::RequestUncompleted => write!(fmt, "Failed to complete request."), OffchainErr::RequestUncompleted => write!(fmt, "Failed to complete request."),
OffchainErr::HttpResponseNotOk(code) => write!(fmt, "Http response returned code {:?}", code), OffchainErr::HttpResponseNotOk(code) => write!(fmt, "Http response returned code {:?}.", code),
OffchainErr::ErrorInEvmResponse => write!(fmt, "Error in evm reponse."), OffchainErr::ErrorInEvmResponse => write!(fmt, "Error in evm reponse."),
OffchainErr::NoStoredNetworks => write!(fmt, "No networks stored for the offchain slow claps."),
OffchainErr::NotValidator => write!(fmt, "Not a validator for slow clap, `--validator` flag needed."),
OffchainErr::UtxoNotImplemented(ref network_id) => write!(fmt, "Network #{:?} is marked as UTXO, which is not implemented yet.", network_id), OffchainErr::UtxoNotImplemented(ref network_id) => write!(fmt, "Network #{:?} is marked as UTXO, which is not implemented yet.", network_id),
OffchainErr::UnknownNetworkType(ref network_id) => write!(fmt, "Unknown type for network #{:?}.", network_id), OffchainErr::UnknownNetworkType(ref network_id) => write!(fmt, "Unknown type for network #{:?}.", network_id),
OffchainErr::OffchainTimeoutPeriod(ref network_id) => write!(fmt, "Offchain request should be in-flight for network #{:?}.", network_id),
OffchainErr::TooManyRequests(ref network_id) => write!(fmt, "Too many requests over RPC endpoint for network #{:?}.", network_id),
} }
} }
} }
@ -438,33 +448,23 @@ pub mod pallet {
#[pallet::hooks] #[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> { impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn offchain_worker(now: BlockNumberFor<T>) { fn offchain_worker(now: BlockNumberFor<T>) {
// Only send messages of we are a potential validator. match Self::start_slow_clapping(now) {
if sp_io::offchain::is_validator() { Ok(iter) => for result in iter.into_iter() {
let networks_len = T::NetworkDataHandler::iter().count();
if networks_len == 0 {
log::info!(
target: LOG_TARGET,
"👏 Skipping slow clap at {:?}: no evm networks",
now,
);
} else {
for result in Self::start_slow_clapping(now, networks_len).into_iter().flatten() {
if let Err(e) = result { if let Err(e) = result {
log::info!( log::info!(
target: LOG_TARGET, target: LOG_TARGET,
"👏 Skipping slow clap at {:?}: {:?}", "👏 Skipping slow clap at {:?}: {:?}",
now, now,
e, e,
); )
} }
} },
} Err(e) => log::info!(
} else {
log::info!(
target: LOG_TARGET, target: LOG_TARGET,
"🤥 Not a validator, skipping slow clap at {:?}.", "👏 Could not start slow clap at {:?}: {:?}",
now, now,
); e,
),
} }
} }
} }
@ -503,6 +503,21 @@ pub mod pallet {
} }
impl<T: Config> Pallet<T> { impl<T: Config> Pallet<T> {
fn create_storage_key(first: &[u8], second: &[u8]) -> Vec<u8> {
let mut key = DB_PREFIX.to_vec();
key.extend(first);
key.extend(second);
key
}
fn read_persistent_offchain_storage<R: codec::Decode>(storage_key: &[u8], default_value: R) -> R {
StorageValueRef::persistent(&storage_key)
.get::<R>()
.ok()
.flatten()
.unwrap_or(default_value)
}
fn generate_unique_hash( fn generate_unique_hash(
receiver: &T::AccountId, receiver: &T::AccountId,
amount: &BalanceOf<T>, amount: &BalanceOf<T>,
@ -515,6 +530,30 @@ impl<T: Config> Pallet<T> {
H256::from_slice(&sp_io::hashing::keccak_256(&clap_args_str)[..]) H256::from_slice(&sp_io::hashing::keccak_256(&clap_args_str)[..])
} }
fn u64_to_hexadecimal_bytes(value: u64) -> Vec<u8> {
let mut hex_str = Vec::new();
hex_str.push(b'0');
hex_str.push(b'x');
if value == 0 {
hex_str.push(b'0');
return hex_str;
}
for i in (0..16).rev() {
let nibble = (value >> (i * 4)) & 0xF;
if nibble != 0 || hex_str.len() > 2 {
hex_str.push(match nibble {
0..=9 => b'0' + nibble as u8,
10..=15 => b'a' + (nibble - 10) as u8,
_ => unreachable!(),
});
}
}
hex_str
}
fn try_slow_clap(clap: &Clap<T::AccountId, NetworkIdOf<T>, BalanceOf<T>>) -> DispatchResult { fn try_slow_clap(clap: &Clap<T::AccountId, NetworkIdOf<T>, BalanceOf<T>>) -> DispatchResult {
let authorities = Authorities::<T>::get(&clap.session_index); let authorities = Authorities::<T>::get(&clap.session_index);
ensure!(authorities.get(clap.authority_index as usize).is_some(), Error::<T>::NotAnAuthority); ensure!(authorities.get(clap.authority_index as usize).is_some(), Error::<T>::NotAnAuthority);
@ -565,13 +604,12 @@ impl<T: Config> Pallet<T> {
) > Perbill::from_percent(T::ApplauseThreshold::get()); ) > Perbill::from_percent(T::ApplauseThreshold::get());
if enough_authorities { if enough_authorities {
if let Err(error_msg) = Self::try_applause(&clap, &received_claps_key) { let _ = Self::try_applause(&clap, &received_claps_key)
log::info!( .inspect_err(|error_msg| log::info!(
target: LOG_TARGET, target: LOG_TARGET,
"👏 Could not applause because of: {:?}", "👏 Could not applause because of: {:?}",
error_msg, error_msg,
); ));
}
} }
Ok(()) Ok(())
@ -651,15 +689,57 @@ impl<T: Config> Pallet<T> {
} }
fn start_slow_clapping( fn start_slow_clapping(
block_number: BlockNumberFor<T>, block_number: BlockNumberFor<T>
networks_len: usize,
) -> OffchainResult<T, impl Iterator<Item = OffchainResult<T, ()>>> { ) -> OffchainResult<T, impl Iterator<Item = OffchainResult<T, ()>>> {
let session_index = T::ValidatorSet::session_index(); sp_io::offchain::is_validator()
let network_in_use = T::NetworkDataHandler::iter() .then(|| ())
.nth(block_number.into().as_usize() % networks_len) .ok_or(OffchainErr::NotValidator)?;
.expect("network should exist; qed");
Ok(Self::local_authorities().map(move |(authority_index, authority_key)| { let session_index = T::ValidatorSet::session_index();
let networks_len = T::NetworkDataHandler::iter().count();
let network_in_use = T::NetworkDataHandler::iter()
.nth(block_number.into()
.as_usize()
.checked_rem(networks_len)
.unwrap_or_default())
.ok_or(OffchainErr::NoStoredNetworks)?;
let network_id_encoded = network_in_use.0.encode();
let last_timestamp_key = Self::create_storage_key(b"last-timestamp-", &network_id_encoded);
let rate_limit_delay_key = Self::create_storage_key(b"rate-limit-", &network_id_encoded);
let rate_limit_delay = Self::read_persistent_offchain_storage(
&rate_limit_delay_key,
network_in_use.1.rate_limit_delay,
);
let network_lock_key = Self::create_storage_key(b"network-lock-", &network_id_encoded);
let block_until = rt_offchain::Duration::from_millis(networks_len as u64 * FETCH_TIMEOUT_PERIOD);
let mut network_lock = StorageLock::<Time>::with_deadline(&network_lock_key, block_until);
network_lock.try_lock().map_err(|_| {
OffchainErr::OffchainTimeoutPeriod(network_in_use.0)
})?;
StorageValueRef::persistent(&last_timestamp_key)
.mutate(|result_timestamp: Result<Option<u64>, StorageRetrievalError>| {
let current_timestmap = sp_io::offchain::timestamp().unix_millis();
match result_timestamp {
Ok(option_timestamp) => match option_timestamp {
Some(stored_timestamp) if stored_timestamp > current_timestmap =>
Err(OffchainErr::TooManyRequests(network_in_use.0).into()),
_ => Ok(current_timestmap.saturating_add(rate_limit_delay)),
},
Err(_) => Err(OffchainErr::StorageRetrievalError(network_in_use.0).into()),
}
})
.map_err(|error| match error {
MutateStorageError::ValueFunctionFailed(offchain_error) => offchain_error,
MutateStorageError::ConcurrentModification(_) =>
OffchainErr::ConcurrentModificationError(network_in_use.0).into(),
})?;
Ok(Self::local_authorities(&session_index).map(move |(authority_index, authority_key)| {
Self::do_evm_claps_or_save_block( Self::do_evm_claps_or_save_block(
authority_index, authority_index,
authority_key, authority_key,
@ -670,13 +750,6 @@ impl<T: Config> Pallet<T> {
})) }))
} }
fn create_storage_key(first: &[u8], second: &[u8]) -> Vec<u8> {
let mut key = DB_PREFIX.to_vec();
key.extend(first);
key.extend(second);
key
}
fn do_evm_claps_or_save_block( fn do_evm_claps_or_save_block(
authority_index: AuthIndex, authority_index: AuthIndex,
authority_key: T::AuthorityId, authority_key: T::AuthorityId,
@ -690,19 +763,17 @@ impl<T: Config> Pallet<T> {
let block_distance_key = Self::create_storage_key(b"block-distance-", &network_id_encoded); let block_distance_key = Self::create_storage_key(b"block-distance-", &network_id_encoded);
let endpoint_key = Self::create_storage_key(b"endpoint-", &network_id_encoded); let endpoint_key = Self::create_storage_key(b"endpoint-", &network_id_encoded);
let rpc_endpoint = StorageValueRef::persistent(&endpoint_key) let rpc_endpoint = Self::read_persistent_offchain_storage(
.get() &endpoint_key,
.ok() network_data.default_endpoint.clone(),
.flatten() );
.unwrap_or(network_data.default_endpoint.clone()); let max_block_distance = Self::read_persistent_offchain_storage(
&block_distance_key,
network_data.block_distance,
);
let max_block_distance = StorageValueRef::persistent(&block_distance_key) StorageValueRef::persistent(&block_number_key)
.get() .mutate(|result_block_range: Result<Option<(u64, u64)>, StorageRetrievalError>| {
.ok()
.flatten()
.unwrap_or(network_data.block_distance);
let mutation_result = StorageValueRef::persistent(&block_number_key).mutate(|result_block_range: Result<Option<(u64, u64)>, StorageRetrievalError>| {
match result_block_range { match result_block_range {
Ok(maybe_block_range) => { Ok(maybe_block_range) => {
let request_body = match maybe_block_range { let request_body = match maybe_block_range {
@ -747,13 +818,13 @@ impl<T: Config> Pallet<T> {
} }
Err(_) => Err(OffchainErr::StorageRetrievalError(network_id).into()) Err(_) => Err(OffchainErr::StorageRetrievalError(network_id).into())
} }
}); })
.map_err(|error| match error {
match mutation_result { MutateStorageError::ValueFunctionFailed(offchain_error) => offchain_error,
Ok(_) => Ok(()), MutateStorageError::ConcurrentModification(_) =>
Err(MutateStorageError::ValueFunctionFailed(offchain_error)) => Err(offchain_error), OffchainErr::ConcurrentModificationError(network_id).into(),
Err(MutateStorageError::ConcurrentModification(_)) => Err(OffchainErr::ConcurrentModificationError(network_id).into()), })
} .map(|_| ())
} }
fn apply_evm_response( fn apply_evm_response(
@ -818,9 +889,8 @@ impl<T: Config> Pallet<T> {
} }
} }
fn local_authorities() -> impl Iterator<Item = (u32, T::AuthorityId)> { fn local_authorities(session_index: &SessionIndex) -> impl Iterator<Item = (u32, T::AuthorityId)> {
let session_index = T::ValidatorSet::session_index(); let authorities = Authorities::<T>::get(session_index);
let authorities = Authorities::<T>::get(&session_index);
let mut local_authorities = T::AuthorityId::all(); let mut local_authorities = T::AuthorityId::all();
local_authorities.sort(); local_authorities.sort();
@ -860,30 +930,6 @@ impl<T: Config> Pallet<T> {
Ok(response.body().collect::<Vec<u8>>()) Ok(response.body().collect::<Vec<u8>>())
} }
fn u64_to_hexadecimal_bytes(value: u64) -> Vec<u8> {
let mut hex_str = Vec::new();
hex_str.push(b'0');
hex_str.push(b'x');
if value == 0 {
hex_str.push(b'0');
return hex_str;
}
for i in (0..16).rev() {
let nibble = (value >> (i * 4)) & 0xF;
if nibble != 0 || hex_str.len() > 2 {
hex_str.push(match nibble {
0..=9 => b'0' + nibble as u8,
10..=15 => b'a' + (nibble - 10) as u8,
_ => unreachable!(),
});
}
}
hex_str
}
fn prepare_request_body_for_latest_block(network_data: &NetworkData) -> Vec<u8> { fn prepare_request_body_for_latest_block(network_data: &NetworkData) -> Vec<u8> {
match network_data.network_type { match network_data.network_type {
NetworkType::Evm => b"{\"id\":0,\"jsonrpc\":\"2.0\",\"method\":\"eth_blockNumber\"}".to_vec(), NetworkType::Evm => b"{\"id\":0,\"jsonrpc\":\"2.0\",\"method\":\"eth_blockNumber\"}".to_vec(),
@ -924,6 +970,28 @@ impl<T: Config> Pallet<T> {
Ok(response_result.result.ok_or(OffchainErr::ErrorInEvmResponse)?) Ok(response_result.result.ok_or(OffchainErr::ErrorInEvmResponse)?)
} }
fn calculate_median_claps(session_index: &SessionIndex) -> u32 {
let mut claps_in_session = ClapsInSession::<T>::get(session_index)
.values()
.filter_map(|value| (!value.disabled).then(|| value.claps))
.collect::<Vec<_>>();
if claps_in_session.is_empty() {
return 0;
}
claps_in_session.sort();
let number_of_claps = claps_in_session.len();
if number_of_claps % 2 == 0 {
let mid_left = claps_in_session[number_of_claps / 2 - 1];
let mid_right = claps_in_session[number_of_claps / 2];
(mid_left + mid_right) / 2
} else {
claps_in_session[number_of_claps / 2]
}
}
fn is_good_actor( fn is_good_actor(
authority_index: usize, authority_index: usize,
session_index: SessionIndex, session_index: SessionIndex,
@ -968,28 +1036,6 @@ impl<T: Config> Pallet<T> {
debug_assert!(cursor.maybe_cursor.is_none()); debug_assert!(cursor.maybe_cursor.is_none());
} }
fn calculate_median_claps(session_index: &SessionIndex) -> u32 {
let mut claps_in_session = ClapsInSession::<T>::get(session_index)
.values()
.filter_map(|value| (!value.disabled).then(|| value.claps))
.collect::<Vec<_>>();
if claps_in_session.is_empty() {
return 0;
}
claps_in_session.sort();
let number_of_claps = claps_in_session.len();
if number_of_claps % 2 == 0 {
let mid_left = claps_in_session[number_of_claps / 2 - 1];
let mid_right = claps_in_session[number_of_claps / 2];
(mid_left + mid_right) / 2
} else {
claps_in_session[number_of_claps / 2]
}
}
#[cfg(test)] #[cfg(test)]
fn set_test_authorities(session_index: SessionIndex, authorities: Vec<T::AuthorityId>) { fn set_test_authorities(session_index: SessionIndex, authorities: Vec<T::AuthorityId>) {
let bounded_authorities = WeakBoundedVec::<_, T::MaxAuthorities>::try_from(authorities) let bounded_authorities = WeakBoundedVec::<_, T::MaxAuthorities>::try_from(authorities)