initial draft for the ghost-walker

Signed-off-by: Uncle Stretch <uncle.stretch@ghostchain.io>
This commit is contained in:
Uncle Stretch 2024-12-26 14:41:16 +03:00
commit b3cd0d6386
Signed by: str3tch
GPG Key ID: 84F3190747EE79AA
18 changed files with 6736 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
target/

4083
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

24
Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "ghost-walker"
version = "0.1.0"
edition = "2021"
[dependencies]
asynchronous-codec = "0.7.0"
bytes = "1.9.0"
clap = { version = "4.5.23", features = ["derive"] }
codec = { version = "3.6.12", package = "parity-scale-codec", features = ["derive"] }
either = "1.13.0"
futures = "0.3.31"
hex = "0.4.3"
ip_network = "0.4.1"
libp2p = { version = "0.52.0", features = ["dns", "identify", "kad", "macros", "mdns", "noise", "ping", "tcp", "tokio", "yamux", "websocket", "request-response"] }
maxminddb = "0.24.0"
pin-project = "1.1.7"
primitive-types = { version = "0.13.1", default-features = false, features = ["codec", "scale-info", "serde"] }
thiserror = "2.0.9"
tokio = { version = "1.42.0", features = ["macros", "time", "rt-multi-thread"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "fmt"] }
trust-dns-resolver = "0.23.2"
unsigned-varint = { version = "0.8.0", features = ["futures", "asynchronous_codec"] }

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 MiB

76
src/locator.rs Normal file
View File

@ -0,0 +1,76 @@
use std::{collections::HashMap, net::IpAddr};
use trust_dns_resolver::{
config::{ResolverConfig, ResolverOpts},
TokioAsyncResolver,
};
use maxminddb::{geoip2::City, Reader as GeoIpReader};
use libp2p::{
identify::Info, multiaddr::Protocol,
PeerId
};
pub struct Locator<'a> {
peers: &'a HashMap<&'a PeerId, &'a Info>,
db: maxminddb::Reader<&'static [u8]>,
}
impl<'a> Locator<'a> {
const CITY_DATA: &'static [u8] = include_bytes!("../artifacts/GeoLite2-City.mmdb");
pub fn new(peers: &'a HashMap<&'a PeerId, &'a Info>) -> Self {
Self {
peers,
db: GeoIpReader::from_source(Self::CITY_DATA)
.expect("City data is always valid; qed"),
}
}
pub async fn locate_peers(&self) {
let mut geo_peers: HashMap<PeerId, String> = HashMap::new();
let resolver = TokioAsyncResolver::tokio(
ResolverConfig::default(),
ResolverOpts::default(),
);
for (peer, info) in self.peers {
for addr in &info.listen_addrs {
let located = match addr.iter().next() {
Some(Protocol::Ip4(ip)) => self.locate(IpAddr::V4(ip)),
Some(Protocol::Ip6(ip)) => self.locate(IpAddr::V6(ip)),
Some(Protocol::Dns(dns)) |
Some(Protocol::Dns4(dns)) |
Some(Protocol::Dns6(dns)) => {
let Ok(lookup) = resolver.lookup_ip(dns.to_string()).await else {
continue;
};
lookup.iter().find_map(|ip| self.locate(ip))
}
_ => continue,
};
let Some(located) = located else { continue; };
geo_peers.insert(**peer, located);
}
}
for (peer, location) in geo_peers {
println!("\tPeer={peer} location={location}");
}
}
fn locate(&self, ip: IpAddr) -> Option<String> {
let City { city, .. } = self.db.lookup(ip).ok()?;
let city = city
.as_ref()?
.names
.as_ref()?
.get("en")?
.to_string()
.into_boxed_str();
Some(city.into_string())
}
}

54
src/main.rs Normal file
View File

@ -0,0 +1,54 @@
use clap::Parser;
mod walker;
mod locator;
mod p2p;
use walker::Walker;
#[derive(Debug, Parser)]
struct Opts {
#[clap(
short,
long,
default_value = "0x7657c8c868863dea692178d462fe9018ac8f1e16f51be31eea8f29274d85525b")]
genesis: String,
#[clap(
short,
long,
default_value = "/dns/bootnode69.chain.ghostchain.io/tcp/30334/p2p/12D3KooWF9SWxz9dmy6vfndQhoxqCa7PESaoFWEiF8Jkqh4xKDRf",
use_value_delimiter = true,
value_parser)]
bootnodes: Vec<String>,
#[clap(
short,
long,
default_value = "60",
value_parser = parse_duration,
)]
timeout: std::time::Duration,
}
fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
let seconds = arg.parse()?;
Ok(std::time::Duration::from_secs(seconds))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let filter = tracing_subscriber::filter::EnvFilter::from_default_env();
tracing_subscriber::fmt()
.with_env_filter(filter)
.init();
let args = Opts::parse();
Walker::new()
.with_genesis(args.genesis)
.with_bootnodes(args.bootnodes)
.with_timeout(args.timeout)
.build()?
.walk_around()
.await
}

58
src/p2p/discovery.rs Normal file
View File

@ -0,0 +1,58 @@
use std::time::Duration;
use libp2p::{
kad::{store::MemoryStore, Behaviour as Kademlia, Config as KademliaConfig},
PeerId, StreamProtocol,
};
pub type Discovery = Kademlia<MemoryStore>;
pub struct DiscoveryBuilder {
max_packet_size: usize,
record_ttl: Option<Duration>,
provider_ttl: Option<Duration>,
query_timeout: Duration,
}
impl DiscoveryBuilder {
pub fn new() -> DiscoveryBuilder {
DiscoveryBuilder {
max_packet_size: 8192,
record_ttl: None,
provider_ttl: None,
query_timeout: Duration::from_secs(60),
}
}
pub fn record_ttl(mut self, record_ttl: Option<Duration>) -> Self {
self.record_ttl = record_ttl;
self
}
pub fn provider_ttl(mut self, provider_ttl: Option<Duration>) -> Self {
self.provider_ttl = provider_ttl;
self
}
pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
self.query_timeout = query_timeout;
self
}
pub fn build(self, local_peer_id: PeerId, genesis_hash: &str) -> Discovery {
let mut config = KademliaConfig::default();
config.set_max_packet_size(self.max_packet_size);
config.set_record_ttl(self.record_ttl);
config.set_provider_record_ttl(self.provider_ttl);
config.set_query_timeout(self.query_timeout);
let kademlia_protocols = vec![
StreamProtocol::try_from_owned(format!("/{genesis_hash}/kad"))
.expect("Protocol name starts with '/'; qed"),
];
config.set_protocol_names(kademlia_protocols.into_iter().map(Into::into).collect());
let store = MemoryStore::new(local_peer_id);
Kademlia::with_config(local_peer_id, store, config)
}
}

13
src/p2p/mod.rs Normal file
View File

@ -0,0 +1,13 @@
use libp2p::swarm::NetworkBehaviour;
pub mod discovery;
pub mod notifications;
pub mod peer_behaviour;
pub mod transport;
#[derive(NetworkBehaviour)]
pub struct Behaviour {
pub notifications: notifications::behaviour::Notifications,
pub peer_info: peer_behaviour::PeerBehaviour,
pub discovery: discovery::Discovery,
}

View File

@ -0,0 +1,308 @@
#![allow(dead_code)]
use crate::p2p::notifications::{
handler::{
NotificationsHandler, NotificationsHandlerFromBehavior,
NotificationsHandlerToBehavior,
},
messages::BlockHash,
messages::ProtocolRole,
};
use bytes::BytesMut;
use futures::channel::mpsc;
use libp2p::{
core::{ConnectedPoint, Endpoint},
swarm::{
derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionDenied,
ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm,
},
Multiaddr, PeerId,
};
use std::{
collections::{HashMap, HashSet, VecDeque},
task::{Poll, Waker},
};
const LOG_TARGET: &str = "ghost-wlaker-behavior";
#[derive(Debug)]
pub enum NotificationsToSwarm {
CustomProtocolOpen {
peer_id: PeerId,
index: usize,
received_handshake: Vec<u8>,
inbound: bool,
sender: mpsc::Sender<Vec<u8>>,
},
CustomProtocolClosed {
peer_id: PeerId,
index: usize,
},
Notification {
peer_id: PeerId,
index: usize,
message: BytesMut,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProtocolsData {
pub genesis_hash: BlockHash,
pub node_role: ProtocolRole,
}
pub struct Notifications {
events: VecDeque<ToSwarm<NotificationsToSwarm, NotificationsHandlerFromBehavior>>,
peers_details: HashMap<PeerId, HashSet<ConnectionId>>,
data: ProtocolsData,
waker: Option<Waker>,
}
impl Notifications {
pub fn new(data: ProtocolsData) -> Self {
Notifications {
events: VecDeque::with_capacity(16),
peers_details: HashMap::default(),
data,
waker: None,
}
}
fn propagate_event(
&mut self,
event: ToSwarm<NotificationsToSwarm, NotificationsHandlerFromBehavior>,
) {
if let Some(waker) = self.waker.take() {
waker.wake();
}
self.events.push_back(event);
}
}
impl NetworkBehaviour for Notifications {
type ConnectionHandler = NotificationsHandler;
type ToSwarm = NotificationsToSwarm;
fn handle_pending_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<(), ConnectionDenied> {
Ok(())
}
fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
Ok(Vec::new())
}
fn handle_established_inbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
peer: libp2p::PeerId,
local_addr: &libp2p::Multiaddr,
remote_addr: &libp2p::Multiaddr,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
tracing::debug!(target: LOG_TARGET, "Notifications new inbound for peer={:?}", peer);
let handler = NotificationsHandler::new(
peer,
ConnectedPoint::Listener {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone(),
},
self.data.clone(),
);
Ok(handler)
}
fn handle_established_outbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
peer: libp2p::PeerId,
addr: &libp2p::Multiaddr,
_role_override: libp2p::core::Endpoint,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
tracing::debug!(target: LOG_TARGET, "Notifications new outbound for peer={:?}", peer);
let handler = NotificationsHandler::new(
peer,
ConnectedPoint::Dialer {
role_override: Endpoint::Dialer,
address: addr.clone(),
},
self.data.clone(),
);
Ok(handler)
}
fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<Self::ConnectionHandler>) {
match event {
libp2p::swarm::FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
..
}) => {
tracing::debug!(target: LOG_TARGET,
"Notifications swarm connection established peer={:?} connection={:?}",
peer_id,
connection_id
);
self.peers_details
.entry(peer_id)
.and_modify(|entry| {
let _ = entry.insert(connection_id);
})
.or_insert_with(|| {
let mut hash = HashSet::new();
hash.insert(connection_id);
hash
});
for index in 0..2 {
self.propagate_event(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotificationsHandlerFromBehavior::Open { index },
});
}
}
libp2p::swarm::FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
..
}) => {
tracing::debug!(target: LOG_TARGET,
"Notifications swarm connection closed peer={:?} connection={:?}",
peer_id,
connection_id
);
if let Some(details) = self.peers_details.get_mut(&peer_id) {
let removed = details.remove(&connection_id);
if !removed {
tracing::trace!(target: LOG_TARGET,
"Notifications swarm connection closed for untracked connection peer={:?} connection={:?}",
peer_id,
connection_id
);
}
} else {
tracing::trace!(target: LOG_TARGET,
"Notifications swarm connection closed for untracked peer, peer={:?} connection={:?}",
peer_id,
connection_id
);
}
for index in 0..2 {
self.propagate_event(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotificationsHandlerFromBehavior::Close { index },
});
}
}
_ => (),
}
}
fn on_connection_handler_event(
&mut self,
peer_id: libp2p::PeerId,
connection_id: libp2p::swarm::ConnectionId,
event: libp2p::swarm::THandlerOutEvent<Self>,
) {
tracing::debug!(target: LOG_TARGET,
"Notifications new substream for peer {:?} {:?}",
peer_id,
event
);
match event {
NotificationsHandlerToBehavior::HandshakeCompleted {
index,
handshake,
is_inbound,
sender,
..
} => {
tracing::trace!(target: LOG_TARGET,
"Notifications handler complited handshake peer={:?} connection={:?} index={:?} handshake={:?}",
peer_id,
connection_id,
index,
handshake,
);
self.propagate_event(ToSwarm::GenerateEvent(
NotificationsToSwarm::CustomProtocolOpen {
index,
peer_id,
received_handshake: handshake,
inbound: is_inbound,
sender,
},
));
}
NotificationsHandlerToBehavior::HandshakeError { index } => {
tracing::trace!(target: LOG_TARGET,
"Notifications handler error handshake peer={:?} connection={:?} index={:?}",
peer_id,
connection_id,
index,
);
}
NotificationsHandlerToBehavior::OpenDesiredByRemote { index } => {
// Note: extend to reject protocols for specific peers in the future.
self.propagate_event(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotificationsHandlerFromBehavior::Open { index },
});
}
NotificationsHandlerToBehavior::CloseDesired { index } => {
self.propagate_event(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotificationsHandlerFromBehavior::Close { index },
});
}
NotificationsHandlerToBehavior::Close { .. } => {}
NotificationsHandlerToBehavior::Notification { bytes, index } => {
self.propagate_event(ToSwarm::GenerateEvent(NotificationsToSwarm::Notification {
peer_id,
index,
message: bytes,
}));
}
}
}
fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
_params: &mut impl libp2p::swarm::PollParameters,
) -> std::task::Poll<ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
self.waker = Some(cx.waker().clone());
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
Poll::Pending
}
}

View File

@ -0,0 +1,663 @@
#![allow(dead_code)]
use crate::p2p::notifications::{
behaviour::ProtocolsData,
messages::BlockAnnouncesHandshake,
upgrades::{
combine_upgrades::CombineUpgrades,
handshake::{
HandshakeInbound, HandshakeInboundSubstream, HandshakeOutbound,
HandshakeOutboundSubstream,
},
},
};
use bytes::BytesMut;
use codec::Encode;
use futures::{channel::mpsc, prelude::*, SinkExt};
use libp2p::{
core::ConnectedPoint,
swarm::{
handler::{ConnectionEvent, FullyNegotiatedInbound},
ConnectionHandler, ConnectionHandlerEvent, KeepAlive,
Stream as NegotiatedSubstream, SubstreamProtocol,
},
PeerId,
};
use std::{
collections::VecDeque,
mem,
pin::Pin,
task::{Context, Poll},
};
const LOG_TARGET: &str = "ghost-walker-handler";
pub struct ProtocolDetails {
pub name: String,
pub handshake: Vec<u8>,
pub upgrade: HandshakeInbound,
pub state: State,
}
pub struct NotificationsHandler {
protocols: Vec<ProtocolDetails>,
pending_events: VecDeque<
ConnectionHandlerEvent<
HandshakeOutbound,
usize,
NotificationsHandlerToBehavior,
NotificationsHandlerError,
>,
>,
endpoint: ConnectedPoint,
peer: PeerId,
}
#[derive(Debug, Clone)]
pub enum NotificationsHandlerFromBehavior {
Open { index: usize },
Close { index: usize },
}
#[derive(Debug, Clone)]
pub enum NotificationsHandlerToBehavior {
HandshakeCompleted {
index: usize,
endpoint: ConnectedPoint,
handshake: Vec<u8>,
is_inbound: bool,
sender: mpsc::Sender<Vec<u8>>,
},
HandshakeError {
index: usize,
},
OpenDesiredByRemote {
index: usize,
},
CloseDesired {
index: usize,
},
Close {
index: usize,
},
Notification {
index: usize,
bytes: BytesMut,
},
}
pub enum State {
Closed {
pending_opening: bool,
},
OpenDesiredByRemote {
inbound_substream: HandshakeInboundSubstream<NegotiatedSubstream>,
pending_opening: bool,
},
Opening {
inbound_substream: Option<HandshakeInboundSubstream<NegotiatedSubstream>>,
inbound: bool,
},
Open {
recv: stream::Peekable<mpsc::Receiver<Vec<u8>>>,
inbound_substream: Option<HandshakeInboundSubstream<NegotiatedSubstream>>,
outbound_substream: Option<HandshakeOutboundSubstream<NegotiatedSubstream>>,
},
}
impl NotificationsHandler {
pub fn new(peer: PeerId, endpoint: ConnectedPoint, data: ProtocolsData) -> Self {
let genesis_string = hex::encode(data.genesis_hash);
let blocks = format!("/{}/block-announces/1", genesis_string);
let tx = format!("/{}/transactions/1", genesis_string);
let block_announces = BlockAnnouncesHandshake::from_genesis(data.genesis_hash);
let protocols = vec![
ProtocolDetails {
name: blocks.clone(),
handshake: block_announces.encode(),
upgrade: HandshakeInbound {
name: blocks.clone(),
},
state: State::Closed {
pending_opening: false,
},
},
ProtocolDetails {
name: tx.clone(),
handshake: vec![data.node_role.encoded()],
upgrade: HandshakeInbound { name: tx.clone() },
state: State::Closed {
pending_opening: false,
},
},
];
NotificationsHandler {
peer,
pending_events: VecDeque::with_capacity(16),
endpoint,
protocols,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum NotificationsHandlerError {}
impl ConnectionHandler for NotificationsHandler {
// Received and submitted events.
type FromBehaviour = NotificationsHandlerFromBehavior;
type ToBehaviour = NotificationsHandlerToBehavior;
type Error = NotificationsHandlerError;
// Handle handshakes.
type InboundProtocol = CombineUpgrades<HandshakeInbound>;
type OutboundProtocol = HandshakeOutbound;
// Extra information upon connections.
type OutboundOpenInfo = usize;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
let protocol_upgrades: Vec<_> = self.protocols.iter().map(|p| p.upgrade.clone()).collect();
let combine_upgrades = CombineUpgrades::from(protocol_upgrades);
SubstreamProtocol::new(combine_upgrades, ())
}
fn on_connection_event(
&mut self,
event: ConnectionEvent<
'_,
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol, ..
}) => {
let (mut stream, index) = (protocol.data, protocol.index);
tracing::debug!(target: LOG_TARGET,
"Handler negotiated inbound peer={:?} index={:?}",
self.peer,
index
);
let proto = &mut self.protocols[index];
match proto.state {
State::Closed { pending_opening } => {
tracing::trace!(
target: LOG_TARGET,
"Handler negotiated inbound Closed -> OpenDesiredByRemote peer={:?} index={:?}",
self.peer,
index
);
self.pending_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
NotificationsHandlerToBehavior::OpenDesiredByRemote { index },
));
proto.state = State::OpenDesiredByRemote {
inbound_substream: stream.substream,
pending_opening,
};
}
State::OpenDesiredByRemote { .. } => {
tracing::trace!(
target: LOG_TARGET,
"Handler negotiated inbound OpenDesiredByRemote peer={:?} index={:?}",
self.peer,
index
);
}
State::Opening {
ref mut inbound_substream,
..
}
| State::Open {
ref mut inbound_substream,
..
} => {
if inbound_substream.is_some() {
tracing::trace!(
target: LOG_TARGET,
"Handler negotiated inbound handshake already handled peer={:?} index={:?}",
self.peer,
index
);
return;
}
tracing::trace!(
target: LOG_TARGET,
"Handler negotiated inbound setup handshake peer={:?} index={:?}",
self.peer,
index
);
let handshake_message = proto.handshake.clone();
stream.substream.set_handshake(handshake_message);
*inbound_substream = Some(stream.substream);
}
}
}
ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
let (opened, index) = (outbound.protocol, outbound.info);
tracing::debug!(
target: LOG_TARGET,
"Handler negotiated outbound peer={:?} index={:?}",
self.peer,
index
);
let proto = &mut self.protocols[index];
match proto.state {
State::Closed {
ref mut pending_opening,
}
| State::OpenDesiredByRemote {
ref mut pending_opening,
..
} => {
tracing::trace!(
target: LOG_TARGET,
"Handler negotiated outbound Closed|OpenDesiredByRemote peer={:?} index={:?}",
self.peer,
index
);
*pending_opening = false;
}
State::Opening {
ref mut inbound_substream,
inbound,
} => {
tracing::trace!(
target: LOG_TARGET,
"Handler negotiated outbound Opening successful peer={:?} index={:?}",
self.peer,
index
);
let (send, recv) = mpsc::channel(1024);
proto.state = State::Open {
inbound_substream: inbound_substream.take(),
outbound_substream: Some(opened.substream),
recv: recv.peekable(),
};
self.pending_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
NotificationsHandlerToBehavior::HandshakeCompleted {
index,
endpoint: self.endpoint.clone(),
handshake: opened.handshake,
is_inbound: inbound,
sender: send,
},
));
}
State::Open { .. } => {
tracing::trace!(
target: LOG_TARGET,
"Handler negotiated outbound Open missmatch-state peer={:?} index={:?}",
self.peer,
index
);
}
}
}
ConnectionEvent::DialUpgradeError(err) => {
tracing::debug!(
target: LOG_TARGET,
"Handler DialError peer={:?} index={:?} error={:?}",
self.peer,
err.info,
err.error,
);
let proto = &mut self.protocols[err.info];
match proto.state {
State::Closed {
ref mut pending_opening,
}
| State::OpenDesiredByRemote {
ref mut pending_opening,
..
} => {
tracing::trace!(
target: LOG_TARGET,
"Handler DialError Closed|OpenDesiredByRemote peer={:?} info={:?}",
self.peer,
err.info,
);
*pending_opening = false;
}
State::Opening { .. } => {
proto.state = State::Closed {
pending_opening: false,
};
tracing::trace!(
target: LOG_TARGET,
"Handler DialError Opening -> Closed peer={:?} info={:?}",
self.peer,
err.info,
);
self.pending_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
NotificationsHandlerToBehavior::HandshakeError { index: err.info },
));
}
State::Open { .. } => {}
}
}
_ => {}
}
}
fn on_behaviour_event(&mut self, message: NotificationsHandlerFromBehavior) {
match message {
NotificationsHandlerFromBehavior::Open { index } => {
tracing::debug!(
target: LOG_TARGET,
"Handler from behaviour Open peer={:?} index={:?}",
self.peer,
index
);
let proto = &mut self.protocols[index];
match &mut proto.state {
State::Closed { pending_opening } => {
if !*pending_opening {
let protocol = HandshakeOutbound {
name: proto.name.clone(),
handshake: proto.handshake.clone(),
};
tracing::trace!(
target: LOG_TARGET,
"Handler from behaviour Closed -> request new substream peer={:?} index={:?}",
self.peer,
index
);
self.pending_events.push_back(
ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(protocol, index),
},
)
}
tracing::trace!(
target: LOG_TARGET,
"Handler from behaviour Closed -> Opening peer={:?} index={:?}",
self.peer,
index
);
proto.state = State::Opening {
inbound_substream: None,
inbound: false,
};
}
State::OpenDesiredByRemote {
inbound_substream,
pending_opening,
} => {
if !*pending_opening {
let protocol = HandshakeOutbound {
name: proto.name.clone(),
handshake: proto.handshake.clone(),
};
tracing::trace!(
target: LOG_TARGET,
"Handler from behaviour OpenDesiredByRemote -> request new substream peer={:?} index={:?}",
self.peer,
index
);
self.pending_events.push_back(
ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(protocol, index),
},
)
}
tracing::trace!(
target: LOG_TARGET,
"Handler from behaviour OpenDesiredByRemote setup handshake peer={:?} index={:?}",
self.peer,
index
);
let handshake = proto.handshake.clone();
inbound_substream.set_handshake(handshake);
let inbound_substream = match mem::replace(
&mut proto.state,
State::Opening {
inbound_substream: None,
inbound: false,
},
) {
State::OpenDesiredByRemote {
inbound_substream, ..
} => inbound_substream,
_ => unreachable!(),
};
proto.state = State::Opening {
inbound_substream: Some(inbound_substream),
inbound: true,
};
}
State::Opening { .. } | State::Open { .. } => {
tracing::trace!(
target: LOG_TARGET,
"Handler from behaviour Opening|Open statemissmatch peer={:?} index={:?}",
self.peer,
index
);
}
}
}
NotificationsHandlerFromBehavior::Close { index } => {
tracing::debug!(
target: LOG_TARGET,
"Handler from behaviour Close peer={:?} index={:?}",
self.peer,
index
);
let proto = &mut self.protocols[index];
match proto.state {
State::Closed { .. } => {}
State::OpenDesiredByRemote {
pending_opening, ..
} => {
proto.state = State::Closed { pending_opening };
}
State::Opening { .. } => {
proto.state = State::Closed {
pending_opening: true,
};
tracing::trace!(
target: LOG_TARGET,
"Handler from behaviour Close with handshake in progress peer={:?} index={:?}",
self.peer,
index
);
self.pending_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
NotificationsHandlerToBehavior::HandshakeError { index },
));
}
State::Open { .. } => {
proto.state = State::Closed {
pending_opening: false,
};
}
}
self.pending_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
NotificationsHandlerToBehavior::Close { index },
));
}
}
}
fn connection_keep_alive(&self) -> KeepAlive {
if self
.protocols
.iter()
.any(|p| !matches!(p.state, State::Closed { .. }))
{
return KeepAlive::Yes;
}
KeepAlive::No
}
#[allow(deprecated)]
fn poll(
&mut self,
cx: &mut Context,
) -> Poll<ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
> {
if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(ev);
}
for index in 0..self.protocols.len() {
if let State::Open {
outbound_substream: Some(outbound_substream),
recv,
..
} = &mut self.protocols[index].state
{
loop {
match Pin::new(&mut *recv).as_mut().poll_peek(cx) {
Poll::Ready(Some(..)) => {}
_ => break,
};
match outbound_substream.poll_ready_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => break,
};
let message = match recv.poll_next_unpin(cx) {
Poll::Ready(Some(message)) => message,
Poll::Ready(None) | Poll::Pending => {
debug_assert!(false);
break;
}
};
tracing::trace!(
target: LOG_TARGET,
"Handler poll send message peer={:?} index={:?} message={:?}",
self.peer,
index,
message
);
let _ = outbound_substream.start_send_unpin(message);
}
}
}
for index in 0..self.protocols.len() {
if let State::Open {
outbound_substream: outbound_substream @ Some(_),
..
} = &mut self.protocols[index].state
{
match Sink::poll_flush(Pin::new(outbound_substream.as_mut().unwrap()), cx) {
Poll::Pending | Poll::Ready(Ok(())) => {}
Poll::Ready(Err(_)) => {
*outbound_substream = None;
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
NotificationsHandlerToBehavior::CloseDesired { index },
));
}
}
}
}
for index in 0..self.protocols.len() {
match &mut self.protocols[index].state {
State::Open {
inbound_substream: inbound_substream @ Some(_),
..
} => match Stream::poll_next(Pin::new(inbound_substream.as_mut().unwrap()), cx) {
Poll::Pending => {}
Poll::Ready(Some(Ok(bytes))) => {
let event = NotificationsHandlerToBehavior::Notification { index, bytes };
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
}
Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *inbound_substream = None,
},
State::OpenDesiredByRemote {
inbound_substream,
pending_opening,
} => match HandshakeInboundSubstream::poll_process(Pin::new(inbound_substream), cx)
{
Poll::Pending => {}
Poll::Ready(Ok(void)) => match void {},
Poll::Ready(Err(_)) => {
self.protocols[index].state = State::Closed {
pending_opening: *pending_opening,
};
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
NotificationsHandlerToBehavior::CloseDesired { index },
));
}
},
State::Opening {
inbound_substream: inbound_substream @ Some(_),
..
} => match HandshakeInboundSubstream::poll_process(
Pin::new(inbound_substream.as_mut().unwrap()),
cx,
) {
Poll::Pending => {}
Poll::Ready(Ok(void)) => match void {},
Poll::Ready(Err(_)) => *inbound_substream = None,
},
_ => (),
}
}
Poll::Pending
}
}

View File

@ -0,0 +1,76 @@
#![allow(dead_code)]
use codec::{Decode, Encode, Input, Output};
use hex::FromHexError;
pub type BlockHash = primitive_types::H256;
pub type BlockNumber = u32;
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct BlockAnnouncesHandshake {
pub roles: u8,
pub best_number: BlockNumber,
pub best_hash: BlockHash,
pub genesis_hash: BlockHash,
}
impl BlockAnnouncesHandshake {
pub fn from_genesis(genesis_hash: BlockHash) -> Self {
Self {
roles: 4,
best_number: 0,
best_hash: genesis_hash,
genesis_hash,
}
}
pub fn from_hex_genesis(genesis_hash: &str) -> Result<Self, FromHexError> {
let raw_bytes = hex::decode(genesis_hash.trim_start_matches("0x"))?;
let genesis_hash = primitive_types::H256::from_slice(raw_bytes.as_slice());
Ok(Self::from_genesis(genesis_hash))
}
}
mod role_bytes {
pub const FULL_NODE: u8 = 0b_0000_0001;
pub const LIGHT_NODE: u8 = 0b_0000_0010;
pub const AUTHORITY: u8 = 0b_0000_0100;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProtocolRole {
FullNode,
LightNode,
Authority,
}
impl ProtocolRole {
pub fn encoded(&self) -> u8 {
match self {
ProtocolRole::FullNode => role_bytes::FULL_NODE,
ProtocolRole::LightNode => role_bytes::LIGHT_NODE,
ProtocolRole::Authority => role_bytes::AUTHORITY,
}
}
}
impl Encode for ProtocolRole {
fn encode_to<T: Output + ?Sized>(&self, dest: &mut T) {
dest.push_byte(self.encoded())
}
}
impl codec::Decode for ProtocolRole {
fn decode<I: Input>(input: &mut I) -> Result<Self, codec::Error> {
let byte = input.read_byte()?;
let role = match byte {
role_bytes::FULL_NODE => ProtocolRole::FullNode,
role_bytes::LIGHT_NODE => ProtocolRole::LightNode,
role_bytes::AUTHORITY => ProtocolRole::Authority,
_ => return Err(codec::Error::from("Invalid bytes")),
};
Ok(role)
}
}

View File

@ -0,0 +1,4 @@
pub mod behaviour;
pub mod handler;
pub mod messages;
pub mod upgrades;

View File

@ -0,0 +1,97 @@
use futures::prelude::*;
use libp2p::core::upgrade::{InboundUpgrade, UpgradeInfo};
use std::{
pin::Pin,
task::{Context, Poll},
vec,
};
#[derive(Debug, Clone)]
pub struct CombineUpgrades<T>(pub Vec<T>);
impl<T> From<Vec<T>> for CombineUpgrades<T> {
fn from(list: Vec<T>) -> Self {
Self(list)
}
}
impl<T: UpgradeInfo> UpgradeInfo for CombineUpgrades<T> {
type Info = ProtocolResponse<T::Info>;
type InfoIter = vec::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
self.0
.iter()
.enumerate()
.flat_map(|(index, protocol)| {
protocol
.protocol_info()
.into_iter()
.map(move |data| ProtocolResponse { data, index })
})
.collect::<Vec<_>>()
.into_iter()
}
}
impl<T, C> InboundUpgrade<C> for CombineUpgrades<T>
where
T: InboundUpgrade<C>,
{
type Output = ProtocolResponse<T::Output>;
type Error = ProtocolResponse<T::Error>;
type Future = FutureProtocolResponse<T::Future>;
fn upgrade_inbound(mut self, sock: C, info: Self::Info) -> Self::Future {
// Negociated only once.
let protocol = self.0.remove(info.index);
let future = protocol.upgrade_inbound(sock, info.data);
FutureProtocolResponse {
data: future,
index: info.index,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ProtocolResponse<T> {
pub data: T,
pub index: usize,
}
impl<T: AsRef<str>> AsRef<str> for ProtocolResponse<T> {
fn as_ref(&self) -> &str {
self.data.as_ref()
}
}
#[pin_project::pin_project]
pub struct FutureProtocolResponse<T> {
#[pin]
data: T,
index: usize,
}
impl<Out, Err, T> Future for FutureProtocolResponse<T>
where
T: Future<Output = Result<Out, Err>>,
{
type Output = Result<ProtocolResponse<Out>, ProtocolResponse<Err>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
match Future::poll(this.data, cx) {
Poll::Ready(Ok(value)) => Poll::Ready(Ok(ProtocolResponse {
data: value,
index: *this.index,
})),
Poll::Ready(Err(error)) => Poll::Ready(Err(ProtocolResponse {
data: error,
index: *this.index,
})),
Poll::Pending => Poll::Pending,
}
}
}

View File

@ -0,0 +1,454 @@
#![allow(dead_code)]
use asynchronous_codec::Framed;
use bytes::BytesMut;
use futures::prelude::*;
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use unsigned_varint::codec::UviBytes;
const LOG_TARGET: &str = "ghost-walker-upgrades";
use std::{
convert::Infallible,
io, mem,
pin::Pin,
task::{Context, Poll},
vec,
};
const MAX_HANDSHAKE_SIZE: usize = 1024;
#[derive(Debug, Clone)]
pub struct HandshakeInbound {
pub name: String,
}
#[pin_project::pin_project]
pub struct HandshakeInboundSubstream<TSubstream> {
#[pin]
socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
state: HandshakeInboundSubstreamState,
negotiated_name: String,
}
#[derive(Debug)]
pub enum HandshakeInboundSubstreamState {
Waiting,
Sending(Vec<u8>),
Flush,
Done,
NeedsClose,
FullyClosed,
}
#[derive(Debug, Clone)]
pub struct HandshakeOutbound {
pub name: String,
pub handshake: Vec<u8>,
}
#[pin_project::pin_project]
pub struct HandshakeOutboundSubstream<TSubstream> {
#[pin]
socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
}
impl HandshakeInbound {
pub fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
}
}
impl UpgradeInfo for HandshakeInbound {
type Info = String;
type InfoIter = vec::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
vec![self.name.clone()].into_iter()
}
}
impl<TSubstream> InboundUpgrade<TSubstream> for HandshakeInbound
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = HandshakeInboundOpen<TSubstream>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Error = HandshakeError;
fn upgrade_inbound(self, mut socket: TSubstream, negotiated_name: Self::Info) -> Self::Future {
tracing::debug!(
target: LOG_TARGET,
"HandshakeInbound name={:?} current_name={:?}",
negotiated_name,
self.name
);
Box::pin(async move {
let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?;
tracing::trace!(
target: LOG_TARGET,
"HandshakeInbound length={:?} name={:?}",
handshake_len,
negotiated_name
);
if handshake_len > MAX_HANDSHAKE_SIZE {
return Err(HandshakeError::TooLarge {
requested: handshake_len,
max: MAX_HANDSHAKE_SIZE,
});
}
let mut handshake = vec![0u8; handshake_len];
if !handshake.is_empty() {
socket.read_exact(&mut handshake).await?;
}
tracing::trace!(
target: LOG_TARGET,
"HandshakeInbound received handshake={:?} name={:?}",
handshake,
negotiated_name
);
let mut codec: UviBytes<io::Cursor<Vec<u8>>> = UviBytes::default();
codec.set_max_len(usize::MAX);
let substream = HandshakeInboundSubstream {
socket: Framed::new(socket, codec),
state: HandshakeInboundSubstreamState::Waiting,
negotiated_name,
};
Ok(HandshakeInboundOpen {
handshake,
substream,
})
})
}
}
pub struct HandshakeInboundOpen<TSubstream> {
pub handshake: Vec<u8>,
pub substream: HandshakeInboundSubstream<TSubstream>,
}
impl<TSubstream> HandshakeInboundSubstream<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
pub fn set_handshake(&mut self, handshake: Vec<u8>) {
match &self.state {
HandshakeInboundSubstreamState::Waiting => (),
_ => return,
};
self.state = HandshakeInboundSubstreamState::Sending(handshake);
}
pub fn poll_process(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<Infallible, io::Error>> {
let mut this = self.project();
loop {
let state = mem::replace(this.state, HandshakeInboundSubstreamState::Done);
match state {
HandshakeInboundSubstreamState::Sending(handshake) => {
match Sink::poll_ready(this.socket.as_mut(), cx) {
Poll::Ready(_) => {
tracing::trace!(target: LOG_TARGET, "HandshakeInboundSubstream: poll_process: Sink is ready start sending name={:?}", this.negotiated_name);
*this.state = HandshakeInboundSubstreamState::Flush;
match Sink::start_send(this.socket.as_mut(), io::Cursor::new(handshake))
{
Ok(()) => {}
Err(err) => {
tracing::error!(target: LOG_TARGET, "HandshakeInboundSubstream: poll_process: Failed to start sending name={:?} error={:?}", this.negotiated_name, err);
return Poll::Ready(Err(err));
}
}
}
Poll::Pending => {
*this.state = HandshakeInboundSubstreamState::Sending(handshake);
return Poll::Pending;
}
}
}
HandshakeInboundSubstreamState::Flush => {
tracing::trace!(
target: LOG_TARGET,
"HandshakeInboundSubstream: poll_process: poll_flush name={:?}",
this.negotiated_name
);
match Sink::poll_flush(this.socket.as_mut(), cx)? {
Poll::Ready(()) => *this.state = HandshakeInboundSubstreamState::Done,
Poll::Pending => {
*this.state = HandshakeInboundSubstreamState::Flush;
return Poll::Pending;
}
}
}
state => {
*this.state = state;
return Poll::Pending;
}
}
}
}
}
impl<TSubstream> Stream for HandshakeInboundSubstream<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
type Item = Result<BytesMut, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let state = mem::replace(this.state, HandshakeInboundSubstreamState::Done);
tracing::trace!(
target: LOG_TARGET,
"HandshakeInboundSubstream: poll_next: state={:?} name={:?}",
state,
this.negotiated_name
);
match state {
HandshakeInboundSubstreamState::Waiting => {
*this.state = HandshakeInboundSubstreamState::Waiting;
return Poll::Pending;
}
HandshakeInboundSubstreamState::Sending(handshake) => {
match Sink::poll_ready(this.socket.as_mut(), cx) {
Poll::Ready(_) => {
*this.state = HandshakeInboundSubstreamState::Flush;
match Sink::start_send(this.socket.as_mut(), io::Cursor::new(handshake))
{
Ok(()) => (),
Err(err) => {
tracing::trace!(
target: LOG_TARGET,
"HandshakeInboundSubstream: Cannot send handshake name={:?}",
this.negotiated_name
);
return Poll::Ready(Some(Err(err)));
}
}
}
Poll::Pending => {
*this.state = HandshakeInboundSubstreamState::Sending(handshake);
return Poll::Pending;
}
}
}
HandshakeInboundSubstreamState::Flush => {
match Sink::poll_flush(this.socket.as_mut(), cx)? {
Poll::Ready(()) => *this.state = HandshakeInboundSubstreamState::Done,
Poll::Pending => {
*this.state = HandshakeInboundSubstreamState::Flush;
return Poll::Pending;
}
}
}
HandshakeInboundSubstreamState::Done => {
match Stream::poll_next(this.socket.as_mut(), cx) {
Poll::Ready(None) => {
tracing::trace!(
target: LOG_TARGET,
"HandshakeInboundSubstream: poll_next: Closing in response to peer name={:?}",
this.negotiated_name
);
*this.state = HandshakeInboundSubstreamState::NeedsClose
}
Poll::Ready(Some(result)) => {
*this.state = HandshakeInboundSubstreamState::Done;
return Poll::Ready(Some(result));
}
Poll::Pending => {
*this.state = HandshakeInboundSubstreamState::Done;
return Poll::Pending;
}
}
}
HandshakeInboundSubstreamState::NeedsClose => {
match Sink::poll_close(this.socket.as_mut(), cx)? {
Poll::Ready(()) => {
tracing::trace!(
target: LOG_TARGET,
"HandshakeInboundSubstream: poll_close: fully clsoed name={:?}",
this.negotiated_name
);
*this.state = HandshakeInboundSubstreamState::FullyClosed
}
Poll::Pending => {
*this.state = HandshakeInboundSubstreamState::NeedsClose;
return Poll::Pending;
}
}
}
HandshakeInboundSubstreamState::FullyClosed => return Poll::Ready(None),
}
}
}
}
impl HandshakeOutbound {
pub fn new(name: impl Into<String>, handshake: impl Into<Vec<u8>>) -> Self {
Self {
name: name.into(),
handshake: handshake.into(),
}
}
}
impl UpgradeInfo for HandshakeOutbound {
type Info = String;
type InfoIter = vec::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
vec![self.name.clone()].into_iter()
}
}
impl<TSubstream> OutboundUpgrade<TSubstream> for HandshakeOutbound
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = HandshakeOutboundOpen<TSubstream>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Error = HandshakeError;
fn upgrade_outbound(self, mut socket: TSubstream, negotiated_name: Self::Info) -> Self::Future {
tracing::debug!(
target: LOG_TARGET,
"HandshakeOutbound name={:?} current_name={:?}",
negotiated_name,
self.name
);
Box::pin(async move {
tracing::trace!(
target: LOG_TARGET,
"HandshakeOutbound prepare to write handshake={:?} name={:?}",
self.handshake,
negotiated_name
);
upgrade::write_length_prefixed(&mut socket, &self.handshake).await?;
tracing::trace!(
target: LOG_TARGET,
"HandshakeOutbound prepare to read handshake length name={:?}",
negotiated_name
);
let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?;
tracing::trace!(
target: LOG_TARGET,
"HandshakeOutbound handshake len={:?} name={:?}",
handshake_len,
negotiated_name
);
if handshake_len > MAX_HANDSHAKE_SIZE {
return Err(HandshakeError::TooLarge {
requested: handshake_len,
max: MAX_HANDSHAKE_SIZE,
});
}
let mut handshake = vec![0u8; handshake_len];
if !handshake.is_empty() {
socket.read_exact(&mut handshake).await?;
}
let mut codec = UviBytes::default();
codec.set_max_len(usize::MAX);
Ok(HandshakeOutboundOpen {
handshake,
substream: HandshakeOutboundSubstream {
socket: Framed::new(socket, codec),
},
})
})
}
}
pub struct HandshakeOutboundOpen<TSubstream> {
pub handshake: Vec<u8>,
pub substream: HandshakeOutboundSubstream<TSubstream>,
}
impl<TSubstream> Sink<Vec<u8>> for HandshakeOutboundSubstream<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
type Error = std::io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Sink::poll_ready(this.socket.as_mut(), cx)
}
fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
let mut this = self.project();
Sink::start_send(this.socket.as_mut(), io::Cursor::new(item))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Sink::poll_flush(this.socket.as_mut(), cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Sink::poll_close(this.socket.as_mut(), cx)
}
}
#[derive(Debug, thiserror::Error)]
pub enum HandshakeError {
#[error(transparent)]
Io(#[from] io::Error),
#[error("Initial message or handshake was too large: {requested}")]
TooLarge {
requested: usize,
max: usize,
},
#[error(transparent)]
VarintDecode(#[from] unsigned_varint::decode::Error),
}
impl From<unsigned_varint::io::ReadError> for HandshakeError {
fn from(err: unsigned_varint::io::ReadError) -> Self {
match err {
unsigned_varint::io::ReadError::Io(err) => Self::Io(err),
unsigned_varint::io::ReadError::Decode(err) => Self::VarintDecode(err),
_ => {
tracing::warn!(target: LOG_TARGET, "Unrecognized varint decoding error");
Self::Io(From::from(io::ErrorKind::InvalidData))
}
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum HandshakeOutboundError {
#[error(transparent)]
Io(#[from] io::Error),
}

View File

@ -0,0 +1,2 @@
pub mod handshake;
pub mod combine_upgrades;

438
src/p2p/peer_behaviour.rs Normal file
View File

@ -0,0 +1,438 @@
use std::{
collections::{HashMap, HashSet},
task::{Context, Poll},
};
use either::Either;
use libp2p::{
core::{ConnectedPoint, Endpoint},
identify::{Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent, Info as IdentifyInfo},
ping::{Behaviour as Ping, Config as PingConfig},
swarm::{
behaviour::{
AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure,
FromSwarm, ListenFailure,
},
ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId,
NetworkBehaviour, PollParameters, THandler, THandlerInEvent, THandlerOutEvent,
ToSwarm,
},
identity::PublicKey,
Multiaddr, PeerId,
};
pub const AGENT: &str = "ghost-walker-agent";
const LOG_TARGET: &str = "ghost-walker-peer-behaviour";
pub struct PeerBehaviour {
ping: Ping,
identify: Identify,
details: HashMap<PeerId, NodeDetails>,
external_addresses: HashSet<Multiaddr>,
}
impl PeerBehaviour {
pub fn new(local_public_key: PublicKey) -> PeerBehaviour {
let identify_config = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key)
.with_agent_version(AGENT.to_string())
// Do not cache peer info.
.with_cache_size(0);
let identify = Identify::new(identify_config);
Self {
ping: Ping::new(PingConfig::new()),
identify,
details: HashMap::default(),
external_addresses: HashSet::default(),
}
}
}
struct NodeDetails {
pub connections: Vec<ConnectedPoint>,
pub protocol_version: Option<String>,
pub agent_version: Option<String>,
pub protocols: HashSet<String>,
}
impl NodeDetails {
pub fn new(connection: ConnectedPoint) -> NodeDetails {
const INITIAL_CAPACITY: usize = 16;
let mut connections = Vec::with_capacity(INITIAL_CAPACITY);
connections.push(connection);
NodeDetails {
connections,
protocol_version: None,
agent_version: None,
protocols: HashSet::new(),
}
}
}
#[derive(Debug)]
pub enum PeerInfoEvent {
Identified {
peer_id: PeerId,
info: IdentifyInfo,
},
}
impl NetworkBehaviour for PeerBehaviour {
type ConnectionHandler = ConnectionHandlerSelect<
<Ping as NetworkBehaviour>::ConnectionHandler,
<Identify as NetworkBehaviour>::ConnectionHandler,
>;
type ToSwarm = PeerInfoEvent;
fn handle_pending_inbound_connection(
&mut self,
connection_id: ConnectionId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<(), ConnectionDenied> {
self.ping
.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
self.identify
.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
}
fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
// Multiaddr is returned by other protocols.
Ok(Vec::new())
}
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
let ping_handler = self.ping.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)?;
let identify_handler = self.identify.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)?;
Ok(ping_handler.select(identify_handler))
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
let ping_handler = self.ping.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
)?;
let identify_handler = self.identify.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
)?;
Ok(ping_handler.select(identify_handler))
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(
e @ ConnectionEstablished {
peer_id, endpoint, ..
},
) => {
self.ping
.on_swarm_event(FromSwarm::ConnectionEstablished(e));
self.identify
.on_swarm_event(FromSwarm::ConnectionEstablished(e));
self.details
.entry(peer_id)
.and_modify(|details| {
details.connections.push(endpoint.clone());
})
.or_insert_with(|| NodeDetails::new(endpoint.clone()));
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
handler,
remaining_established,
}) => {
let (ping_handler, identity_handler) = handler.into_inner();
self.ping
.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
handler: ping_handler,
remaining_established,
}));
self.identify
.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
handler: identity_handler,
remaining_established,
}));
if let Some(node) = self.details.get_mut(&peer_id) {
node.connections.retain(|conn| conn != endpoint)
}
}
FromSwarm::DialFailure(DialFailure {
peer_id,
error,
connection_id,
}) => {
self.ping
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
error,
connection_id,
}));
self.identify
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
error,
connection_id,
}));
}
FromSwarm::ListenerClosed(e) => {
self.ping.on_swarm_event(FromSwarm::ListenerClosed(e));
self.identify.on_swarm_event(FromSwarm::ListenerClosed(e));
}
FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
error,
connection_id,
}) => {
self.ping
.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
error,
connection_id,
}));
self.identify
.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
error,
connection_id,
}));
}
FromSwarm::ListenerError(e) => {
self.ping.on_swarm_event(FromSwarm::ListenerError(e));
self.identify.on_swarm_event(FromSwarm::ListenerError(e));
}
FromSwarm::NewListener(e) => {
self.ping.on_swarm_event(FromSwarm::NewListener(e));
self.identify.on_swarm_event(FromSwarm::NewListener(e));
}
FromSwarm::ExpiredListenAddr(e) => {
self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
self.identify
.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
}
FromSwarm::AddressChange(
e @ AddressChange {
peer_id, old, new, ..
},
) => {
self.ping.on_swarm_event(FromSwarm::AddressChange(e));
self.identify.on_swarm_event(FromSwarm::AddressChange(e));
self.details.entry(peer_id).and_modify(|details| {
details
.connections
.iter_mut()
.find(|conn| conn == &old)
.map(|conn| *conn = new.clone());
});
}
FromSwarm::NewListenAddr(e) => {
self.ping.on_swarm_event(FromSwarm::NewListenAddr(e));
self.identify.on_swarm_event(FromSwarm::NewListenAddr(e));
}
FromSwarm::NewExternalAddrCandidate(e) => {
self.ping
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
self.identify
.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
}
FromSwarm::ExternalAddrConfirmed(e) => {
self.ping
.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
self.identify
.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
self.external_addresses.insert(e.addr.clone());
}
FromSwarm::ExternalAddrExpired(e) => {
self.external_addresses.remove(e.addr);
}
}
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
match event {
Either::Left(event) => {
self.ping
.on_connection_handler_event(peer_id, connection_id, event)
}
Either::Right(event) => {
self.identify
.on_connection_handler_event(peer_id, connection_id, event)
}
}
}
fn poll(
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
loop {
match self.ping.poll(cx, params) {
Poll::Pending => break,
Poll::Ready(ToSwarm::GenerateEvent(ev)) => {
tracing::debug!(target: LOG_TARGET,
"PingEvent peer_id={:?} connection_id={:?} result {:?}",
ev.peer,
ev.connection,
ev.result
);
}
Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }),
Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler,
event,
}) => {
return Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler,
event: Either::Left(event),
})
}
Poll::Ready(ToSwarm::CloseConnection {
peer_id,
connection,
}) => {
return Poll::Ready(ToSwarm::CloseConnection {
peer_id,
connection,
})
}
Poll::Ready(ToSwarm::ListenOn { opts }) => {
return Poll::Ready(ToSwarm::ListenOn { opts })
}
Poll::Ready(ToSwarm::RemoveListener { id }) => {
return Poll::Ready(ToSwarm::RemoveListener { id })
}
Poll::Ready(ToSwarm::ExternalAddrExpired(address)) => {
return Poll::Ready(ToSwarm::ExternalAddrExpired(address))
}
Poll::Ready(ToSwarm::ExternalAddrConfirmed(address)) => {
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(address))
}
Poll::Ready(ToSwarm::NewExternalAddrCandidate(address)) => {
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(address))
}
}
}
loop {
match self.identify.poll(cx, params) {
Poll::Pending => break,
Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
IdentifyEvent::Received { peer_id, info, .. } => {
self.details.entry(peer_id).and_modify(|details| {
details.agent_version = Some(info.agent_version.clone());
details.protocol_version = Some(info.protocol_version.clone());
details.protocols = info
.protocols
.iter()
.map(|proto| proto.to_string())
.collect();
});
let event = PeerInfoEvent::Identified { peer_id, info };
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
IdentifyEvent::Error { peer_id, error } => {
tracing::debug!(target: LOG_TARGET, "Identification with peer={:?} error={}", peer_id, error)
}
IdentifyEvent::Pushed { .. } => {}
IdentifyEvent::Sent { .. } => {}
},
Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }),
Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler,
event,
}) => {
return Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler,
event: Either::Right(event),
})
}
Poll::Ready(ToSwarm::ListenOn { opts }) => {
return Poll::Ready(ToSwarm::ListenOn { opts })
}
Poll::Ready(ToSwarm::RemoveListener { id }) => {
return Poll::Ready(ToSwarm::RemoveListener { id })
}
Poll::Ready(ToSwarm::ExternalAddrExpired(address)) => {
return Poll::Ready(ToSwarm::ExternalAddrExpired(address))
}
Poll::Ready(ToSwarm::ExternalAddrConfirmed(address)) => {
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(address))
}
Poll::Ready(ToSwarm::NewExternalAddrCandidate(address)) => {
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(address))
}
Poll::Ready(ToSwarm::CloseConnection {
peer_id,
connection,
}) => {
return Poll::Ready(ToSwarm::CloseConnection {
peer_id,
connection,
})
}
}
}
Poll::Pending
}
}

73
src/p2p/transport.rs Normal file
View File

@ -0,0 +1,73 @@
#![allow(dead_code)]
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed, upgrade},
dns, identity, noise, tcp, websocket, PeerId, Transport,
};
use std::time::Duration;
pub const KIB: usize = 1024;
pub const MIB: usize = 1024 * KIB;
pub struct TransportBuilder {
timeout: Duration,
yamux_window_size: u32,
yamux_maximum_buffer_size: usize,
}
impl TransportBuilder {
pub fn new() -> TransportBuilder {
TransportBuilder {
timeout: Duration::from_secs(20),
yamux_window_size: 256 * (KIB as u32),
yamux_maximum_buffer_size: MIB,
}
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn yamux_window_size(mut self, yamux_window_size: u32) -> Self {
self.yamux_window_size = yamux_window_size;
self
}
pub fn yamux_maximum_buffer_size(mut self, yamux_maximum_buffer_size: usize) -> Self {
self.yamux_maximum_buffer_size = yamux_maximum_buffer_size;
self
}
pub fn build(self, keypair: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBox)> {
let tcp_config = tcp::Config::new().nodelay(true);
let tcp_trans = tcp::tokio::Transport::new(tcp_config.clone());
let dns = dns::tokio::Transport::system(tcp_trans).expect("Can construct DNS; qed");
let tcp_trans = tcp::tokio::Transport::new(tcp_config);
let dns_for_wss = dns::tokio::Transport::system(tcp_trans)
.expect("Valid config provided; qed");
let transport = websocket::WsConfig::new(dns_for_wss).or_transport(dns);
let authentication_config =
noise::Config::new(&keypair).expect("Can create noise config; qed");
let multiplexing_config = {
let mut yamux_config = libp2p::yamux::Config::default();
yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::on_read());
yamux_config.set_max_buffer_size(self.yamux_maximum_buffer_size);
yamux_config.set_receive_window_size(self.yamux_window_size);
yamux_config
};
transport
.upgrade(upgrade::Version::V1Lazy)
.authenticate(authentication_config)
.multiplex(multiplexing_config)
.timeout(self.timeout)
.boxed()
}
}

312
src/walker.rs Normal file
View File

@ -0,0 +1,312 @@
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
time::Duration,
error::Error,
};
use futures::StreamExt;
use codec::Decode;
use ip_network::IpNetwork;
use primitive_types::H256;
use libp2p::{
identify::Info, identity, kad::{
Event as KademliaEvent, GetClosestPeersError, GetClosestPeersOk,
QueryId, QueryResult,
},
multiaddr::Protocol,
swarm::{Config as SwarmConfig, SwarmEvent},
Multiaddr, PeerId, Swarm
};
use crate::{
locator::Locator,
p2p::{notifications::behaviour::NotificationsToSwarm, BehaviourEvent},
};
use super::p2p::{
discovery::DiscoveryBuilder,
peer_behaviour::{PeerInfoEvent, PeerBehaviour},
transport::{TransportBuilder, MIB},
notifications::{
behaviour::{Notifications, ProtocolsData},
messages::ProtocolRole,
},
Behaviour,
};
pub struct Walker {
genesis: String,
timeout: Duration,
swarm: Swarm<Behaviour>,
queries: HashSet<QueryId>,
discovered_with_address: HashMap<PeerId, HashSet<Multiaddr>>,
peer_details: HashMap<PeerId, Info>,
peer_role: HashMap<PeerId, ProtocolRole>,
dialed_peers: HashMap<PeerId, usize>,
}
impl Walker {
pub fn new() -> WalkerBuilder {
WalkerBuilder::default()
}
fn insert_queries(&mut self, num: usize) {
for _ in 0..num {
self.queries.insert(
self.swarm
.behaviour_mut()
.discovery
.get_closest_peers(PeerId::random()));
}
}
fn dialed_peer(&mut self, peer_id: Option<PeerId>) {
let Some(peer_id) = peer_id else { return };
self.dialed_peers
.entry(peer_id)
.and_modify(|num| *num += 1)
.or_insert(0);
}
async fn inner_walk(&mut self) {
self.insert_queries(128);
let mut previous_tracing_time = std::time::Instant::now();
loop {
let event = self.swarm.select_next_some().await;
match event {
SwarmEvent::Dialing { peer_id, .. } => self.dialed_peer(peer_id),
SwarmEvent::Behaviour(BehaviourEvent::Discovery(event)) => match event {
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetClosestPeers(result),
..
} => {
self.queries.remove(&id);
let peers = match result {
Ok(GetClosestPeersOk { peers, ..}) => peers,
Err(GetClosestPeersError::Timeout { peers, .. }) => peers
};
let num_discovered = peers.len();
let now = std::time::Instant::now();
if now.duration_since(previous_tracing_time) > Duration::from_secs(10) {
previous_tracing_time = now;
tracing::info!("...Discovery in progress last query #{num_discovered}");
}
if self.queries.is_empty() {
self.insert_queries(128);
}
}
KademliaEvent::RoutingUpdated { peer, addresses, .. } => {
match self.discovered_with_address.entry(peer) {
Entry::Occupied(mut occupied) => {
occupied.get_mut().extend(addresses.into_vec());
},
Entry::Vacant(vacant) => {
vacant.insert(addresses.iter().cloned().collect());
},
};
}
KademliaEvent::RoutablePeer { peer, address } | KademliaEvent::PendingRoutablePeer { peer, address } => {
match self.discovered_with_address.entry(peer) {
Entry::Occupied(mut occupied) => {
occupied.get_mut().insert(address);
},
Entry::Vacant(vacant) => {
let mut set = HashSet::new();
set.insert(address);
vacant.insert(set);
},
};
}
_ => (),
},
SwarmEvent::Behaviour(BehaviourEvent::PeerInfo(info_event)) => match info_event {
PeerInfoEvent::Identified { peer_id, info } => {
tracing::debug!("Identified peer_id={:?} info={:?}", peer_id, info);
self.peer_details.insert(peer_id, info);
}
},
SwarmEvent::Behaviour(BehaviourEvent::Notifications(
NotificationsToSwarm::CustomProtocolOpen {
peer_id,
index,
received_handshake,
inbound,
..
}
)) => {
if let Ok(role) = ProtocolRole::decode(&mut &received_handshake[..]) {
tracing::debug!("Identified peer_id={:?} role={:?}", peer_id, role);
self.peer_role.insert(peer_id, role);
}
tracing::debug!(
"Protocol open peer={:?} index={:?} handshake={:?} inbound={:?}",
peer_id,
index,
received_handshake,
inbound,
);
}
_ => (),
}
}
}
fn is_public_adress(&self, addr: &Multiaddr) -> bool {
let ip = match addr.iter().next() {
Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => return true,
_ => return false,
};
ip.is_global()
}
pub async fn walk_around(&mut self) -> Result<(), Box<dyn Error>> {
let _ = tokio::time::timeout(self.timeout, self.inner_walk()).await;
println!("Number of dialed peers: {}", self.dialed_peers.len());
println!("Total peers discovered: {}", self.discovered_with_address.len());
println!("Peers with local iden.: {}", self.peer_details.len());
let infos: HashMap<_, _> = self
.peer_details
.iter()
.filter(|(_, info)| {
info.protocols
.iter()
.any(|stream_proto| stream_proto.as_ref().contains(&self.genesis))
})
.collect();
println!("Support correct genesis {}: {}", &self.genesis, infos.len());
let peers_with_public: HashMap<_, _> = infos
.iter()
.filter(|(_, info)| info.listen_addrs
.iter()
.any(|a| self.is_public_adress(&a)))
.collect();
println!("Peers with public address: {}", peers_with_public.len());
println!("Peers with private address: {}", infos.len() - peers_with_public.len());
println!("Peers with associated role: {}", self.peer_role.len());
Locator::new(&infos).locate_peers().await;
Ok(())
}
}
pub struct WalkerBuilder {
genesis: String,
bootnodes: Vec<String>,
timeout: Duration,
}
impl Default for WalkerBuilder {
fn default() -> Self {
Self {
genesis: Default::default(),
bootnodes: Default::default(),
timeout: Default::default(),
}
}
}
impl WalkerBuilder {
fn create_swarm(&self) -> Result<Swarm<Behaviour>, Box<dyn Error>> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
tracing::info!("Local peer Id {:?}", local_peer_id);
let bootnodes = self.bootnodes
.iter()
.map(|bootnode| {
let parts: Vec<_> = bootnode.split('/').collect();
let peer = parts.last().expect("Bootnode peer should be valid; qed");
let multiaddress: Multiaddr = bootnode
.parse()
.expect("Bootnode multiaddress should be valid; qed");
let peer_id: PeerId = peer
.parse()
.expect("Bootnode should have valid peer ID; qed");
tracing::info!("Bootnode peer={:?}", peer_id);
(peer_id, multiaddress)
})
.collect::<Vec<_>>();
let protocol_data = ProtocolsData {
genesis_hash: H256::from_slice(hex::decode(self.genesis.clone())?.as_slice()),
node_role: ProtocolRole::FullNode,
};
let mut swarm: Swarm<Behaviour> = {
let transport = TransportBuilder::new()
.yamux_maximum_buffer_size(256 * MIB)
.build(local_key.clone());
let discovery = DiscoveryBuilder::new()
.record_ttl(Some(Duration::from_secs(0)))
.provider_ttl(Some(Duration::from_secs(0)))
.query_timeout(Duration::from_secs(5 * 60))
.build(local_peer_id, &self.genesis);
let peer_info = PeerBehaviour::new(local_key.public());
let notifications = Notifications::new(protocol_data);
let behavior = Behaviour {
notifications,
peer_info,
discovery,
};
let config = SwarmConfig::with_tokio_executor();
Swarm::new(transport, behavior, local_peer_id, config)
};
for (peer, multiaddress) in &bootnodes {
swarm
.behaviour_mut()
.discovery
.add_address(peer, multiaddress.clone());
}
Ok(swarm)
}
pub fn with_genesis(mut self, genesis: String) -> Self {
self.genesis = genesis.trim_start_matches("0x").to_string();
self
}
pub fn with_bootnodes(mut self, bootnodes: Vec<String>) -> Self {
self.bootnodes = bootnodes;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn build(&self) -> Result<Walker, Box<dyn Error>> {
let swarm = self.create_swarm()?;
Ok(Walker {
swarm,
genesis: self.genesis.clone(),
timeout: self.timeout,
queries: HashSet::with_capacity(1024),
discovered_with_address: HashMap::with_capacity(1024),
peer_details: HashMap::with_capacity(1024),
peer_role: HashMap::with_capacity(1024),
dialed_peers: HashMap::with_capacity(1024),
})
}
}