initial commit; draft implementation

Signed-off-by: Uncle Stretch <uncle.stretch@ghostchain.io>
This commit is contained in:
Uncle Stretch 2025-12-21 16:15:11 +03:00
commit 813667093e
Signed by: str3tch
GPG Key ID: 84F3190747EE79AA
18 changed files with 7869 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

5242
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

31
Cargo.toml Normal file
View File

@ -0,0 +1,31 @@
[package]
name = "ghost-echo"
version = "0.0.1"
edition = "2021"
[[bin]]
name = "ghost-gossiper"
path = "src/bin/main.rs"
[dependencies]
anyhow = "1.0.97"
async-trait = "0.1.88"
chrono = "0.4.42"
clap = { version = "4.5.32", features = ["derive", "env"] }
crossterm = "0.28.1"
futures = "0.3.31"
futures-timer = "3.0.3"
hex = "0.4.3"
libp2p = { version = "0.56", features = ["identify", "ping", "tokio", "gossipsub", "macros", "relay", "kad", "rsa", "ed25519", "quic", "request-response", "dns", "memory-connection-limits", "tcp", "noise", "yamux", "autonat", "tls", "dcutr"] }
libp2p-webrtc = { version = "0.9.0-alpha", features = ["tokio", "pem"] }
quick-protobuf = "0.8.1"
rand = "0.9.0"
rand_core = { version = "0.6.4", features = ["getrandom"] }
ratatui = "0.29.0"
serde_json = "1.0.140"
signal-hook = "0.3.17"
tokio = { version = "1.47.1", features = ["full"] }
tokio-util = { version = "0.7.14", features = ["full"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
unsigned-varint = "0.8.0"

110
src/bin/main.rs Normal file
View File

@ -0,0 +1,110 @@
use ghost_gossiper::prelude::*;
use anyhow::Result;
use clap::Parser;
use libp2p::{identity, PeerId};
use libp2p_webrtc::tokio::Certificate;
use std::path::{Path, PathBuf};
use tokio::{fs, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {
// parse the command line arguments
let opt = Options::parse();
// initialize the tracing logger and get the receiver for log messages
let from_log = Log::init();
// create a shutdown token
let shutdown = CancellationToken::new();
// load the identity and certificate
let local_key = read_or_create_identity(&opt.local_key_path).await?;
let webrtc_cert = read_or_create_certificate(&opt.local_cert_path).await?;
// create the ui and the channels to communicate with it
let (mut ui, to_ui, from_ui) = if opt.headless {
Headless::build(local_key.public().into(), from_log, shutdown.clone())
} else {
Tui::build(local_key.public().into(), from_log, shutdown.clone())
};
// create the peer, connecting it to the ui
let mut peer = Peer::new(local_key, webrtc_cert, to_ui, from_ui, shutdown.clone()).await?;
// spawn tasks for both the swarm and the ui
let peer_task: JoinHandle<Result<()>> = tokio::spawn(async move { peer.run().await });
let ui_task: JoinHandle<Result<()>> = tokio::spawn(async move { ui.run().await });
// wait for the tasks to finish
let (ui_result, peer_result) = tokio::try_join!(peer_task, ui_task)?;
// check the inner results
ui_result?;
peer_result?;
Ok(())
}
async fn read_or_create_certificate(path: &Path) -> Result<Certificate> {
if path.exists() {
let pem = fs::read_to_string(&path).await?;
info!("Using existing certificate from {}", path.display());
return Ok(Certificate::from_pem(&pem)?);
}
let cert = Certificate::generate(&mut rand_core::OsRng)?;
fs::write(&path, &cert.serialize_pem().as_bytes()).await?;
info!(
"Generated new certificate and wrote it to {}",
path.display()
);
Ok(cert)
}
async fn read_or_create_identity(path: &Path) -> Result<identity::Keypair> {
let mut key_path = PathBuf::from(path);
let is_key = key_path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext == "key")
.unwrap_or(false);
if !is_key {
key_path.set_extension("key");
}
let mut peer_id_path = PathBuf::from(path);
let is_peer_id = peer_id_path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext == "peerid")
.unwrap_or(false);
if !is_peer_id {
peer_id_path.set_extension("peerid");
}
if key_path.exists() {
let bytes = fs::read(&key_path).await?;
info!("Using existing identity from {}", key_path.display());
// This only works for ed25519 but that is what we are using
return Ok(identity::Keypair::from_protobuf_encoding(&bytes)?);
}
let identity = identity::Keypair::generate_ed25519();
fs::write(&key_path, &identity.to_protobuf_encoding()?).await?;
let peer_id: PeerId = identity.public().into();
fs::write(&peer_id_path, peer_id.to_string()).await?;
info!(
"Generated new identity and wrote it to {}",
key_path.display()
);
Ok(identity)
}

57
src/chatpeer.rs Normal file
View File

@ -0,0 +1,57 @@
use libp2p::PeerId;
use std::fmt;
/// A wrapper for PeerId for chat peers
/// TODO: expand this to include a user-set name, and possibly a user-set avatar
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct ChatPeer(PeerId);
impl ChatPeer {
/// Get the peer id
pub fn id(&self) -> PeerId {
self.0
}
/// Get the peer name
pub fn name(&self) -> String {
short_id(&self.0)
}
}
impl From<ChatPeer> for PeerId {
fn from(peer: ChatPeer) -> PeerId {
peer.0
}
}
impl From<&PeerId> for ChatPeer {
fn from(peer: &PeerId) -> Self {
ChatPeer(peer.to_owned())
}
}
impl From<PeerId> for ChatPeer {
fn from(peer: PeerId) -> Self {
ChatPeer(peer)
}
}
impl fmt::Debug for ChatPeer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} ({})", &self.0, short_id(&self.0))
}
}
impl fmt::Display for ChatPeer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", short_id(&self.0))
}
}
// Get the last 8 characters of a PeerId
fn short_id(peer: &PeerId) -> String {
let s = peer.to_string();
s.chars()
.skip(s.chars().count().saturating_sub(7))
.collect()
}

202
src/file_exchange.rs Normal file
View File

@ -0,0 +1,202 @@
use async_trait::async_trait;
use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use libp2p::{request_response, StreamProtocol};
// Simple file exchange protocol. The format that the peers support consists of two different
// messages, one to request a file and one to receive the file.
//
// To request a file a peer sends the varuint encoded length of the file id string followed by the
// file id string itself.
//
// Request:
// varuint - file id length
// bytes - file id
//
// The file response message consists of a varuint length followed by the contents of the file.
//
// Response:
// varuint - file contents length
// bytes - file contents
//
/// The codec for the file exchange protocol.
#[derive(Default, Clone)]
pub struct Codec;
/// The request message for the file exchange protocol.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Request {
/// The identifier of the file that is being requested.
pub file_id: String,
}
/// The response message for the file exchange protocol.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Response {
/// The contents of the file that is being sent.
pub file_body: Vec<u8>,
}
#[async_trait]
impl request_response::Codec for Codec {
type Protocol = StreamProtocol;
type Request = Request;
type Response = Response;
async fn read_request<T>(&mut self, _: &StreamProtocol, io: &mut T) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
let vec = read_length_prefixed(io, 1_000_000).await?;
if vec.is_empty() {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok(Request {
file_id: String::from_utf8(vec).unwrap(),
})
}
async fn read_response<T>(
&mut self,
_: &StreamProtocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
let vec = read_length_prefixed(io, 500_000_000).await?; // update transfer maximum
if vec.is_empty() {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok(Response { file_body: vec })
}
async fn write_request<T>(
&mut self,
_: &StreamProtocol,
io: &mut T,
Request { file_id }: Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
write_length_prefixed(io, file_id).await?;
Ok(())
}
async fn write_response<T>(
&mut self,
_: &StreamProtocol,
io: &mut T,
Response { file_body }: Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
write_length_prefixed(io, file_body).await?;
Ok(())
}
}
/// Writes a message to the given socket with a length prefix appended to it. Also flushes the socket.
///
/// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is
/// > compatible with what [`read_length_prefixed`] expects.
pub async fn write_length_prefixed(
socket: &mut (impl AsyncWrite + Unpin),
data: impl AsRef<[u8]>,
) -> Result<(), io::Error> {
write_varint(socket, data.as_ref().len()).await?;
socket.write_all(data.as_ref()).await?;
socket.flush().await?;
Ok(())
}
/// Writes a variable-length integer to the `socket`.
///
/// > **Note**: Does **NOT** flush the socket.
pub async fn write_varint(
socket: &mut (impl AsyncWrite + Unpin),
len: usize,
) -> Result<(), io::Error> {
let mut len_data = unsigned_varint::encode::usize_buffer();
let encoded_len = unsigned_varint::encode::usize(len, &mut len_data).len();
socket.write_all(&len_data[..encoded_len]).await?;
Ok(())
}
/// Reads a variable-length integer from the `socket`.
///
/// As a special exception, if the `socket` is empty and EOFs right at the beginning, then we
/// return `Ok(0)`.
///
/// > **Note**: This function reads bytes one by one from the `socket`. It is therefore encouraged
/// > to use some sort of buffering mechanism.
async fn read_varint(socket: &mut (impl AsyncRead + Unpin)) -> Result<usize, io::Error> {
let mut buffer = unsigned_varint::encode::usize_buffer();
let mut buffer_len = 0;
loop {
match socket.read(&mut buffer[buffer_len..buffer_len + 1]).await? {
0 => {
// Reaching EOF before finishing to read the length is an error, unless the EOF is
// at the very beginning of the substream, in which case we assume that the data is
// empty.
if buffer_len == 0 {
return Ok(0);
} else {
return Err(io::ErrorKind::UnexpectedEof.into());
}
}
n => debug_assert_eq!(n, 1),
}
buffer_len += 1;
match unsigned_varint::decode::usize(&buffer[..buffer_len]) {
Ok((len, _)) => return Ok(len),
Err(unsigned_varint::decode::Error::Overflow) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"overflow in variable-length integer",
));
}
// TODO: why do we have a `__Nonexhaustive` variant in the error? I don't know how to process it
// Err(unsigned_varint::decode::Error::Insufficient) => {}
Err(_) => {}
}
}
}
/// Reads a length-prefixed message from the given socket.
///
/// The `max_size` parameter is the maximum size in bytes of the message that we accept. This is
/// necessary in order to avoid DoS attacks where the remote sends us a message of several
/// gigabytes.
///
/// > **Note**: Assumes that a variable-length prefix indicates the length of the message. This is
/// > compatible with what [`write_length_prefixed`] does.
async fn read_length_prefixed(
socket: &mut (impl AsyncRead + Unpin),
max_size: usize,
) -> io::Result<Vec<u8>> {
let len = read_varint(socket).await?;
if len > max_size {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Received data size ({len} bytes) exceeds maximum ({max_size} bytes)"),
));
}
let mut buf = vec![0; len];
socket.read_exact(&mut buf).await?;
Ok(buf)
}

2
src/generated/mod.rs Normal file
View File

@ -0,0 +1,2 @@
// Automatically generated mod.rs
pub mod peer;

10
src/generated/peer.proto Normal file
View File

@ -0,0 +1,10 @@
syntax = "proto3";
package peer;
message Peer {
// public key of the peer
bytes publicKey = 1;
// array of multiaddrs for the peer
repeated bytes multiAddrs = 2;
}

52
src/generated/peer.rs Normal file
View File

@ -0,0 +1,52 @@
// Automatically generated rust module for 'peer.proto' file
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(unused_imports)]
#![allow(unknown_lints)]
#![allow(clippy::all)]
#![cfg_attr(rustfmt, rustfmt_skip)]
use std::borrow::Cow;
use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result};
use quick_protobuf::sizeofs::*;
use super::*;
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct Peer<'a> {
pub publicKey: Cow<'a, [u8]>,
pub multiAddrs: Vec<Cow<'a, [u8]>>,
}
impl<'a> MessageRead<'a> for Peer<'a> {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.publicKey = r.read_bytes(bytes).map(Cow::Borrowed)?,
Ok(18) => msg.multiAddrs.push(r.read_bytes(bytes).map(Cow::Borrowed)?),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl<'a> MessageWrite for Peer<'a> {
fn get_size(&self) -> usize {
0
+ if self.publicKey == Cow::Borrowed(b"") { 0 } else { 1 + sizeof_len((&self.publicKey).len()) }
+ self.multiAddrs.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if self.publicKey != Cow::Borrowed(b"") { w.write_with_tag(10, |w| w.write_bytes(&**&self.publicKey))?; }
for s in &self.multiAddrs { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
Ok(())
}
}

55
src/lib.rs Normal file
View File

@ -0,0 +1,55 @@
//! rust-libp2p-webrtc-peer crate
#![warn(missing_docs)]
#![deny(
trivial_casts,
trivial_numeric_casts,
unused_import_braces,
unused_qualifications
)]
/// The chat peer module
pub mod chatpeer;
pub use chatpeer::ChatPeer;
/// The peer file transfer protocol
pub mod file_exchange;
pub use file_exchange::{Codec, Request, Response};
/// The peer logging module
pub mod log;
pub use log::Log;
/// The peer message module
pub mod message;
pub use message::Message;
/// The command line options module
pub mod options;
pub use options::Options;
/// The peer module
pub mod peer;
pub use peer::Peer;
/// The protobuf generated module
mod proto {
#![allow(unreachable_pub)]
include!("generated/mod.rs");
pub(crate) use self::peer::Peer;
}
/// The peer ui module
pub mod ui;
pub use ui::{Headless, Tui, Ui};
/// The misc util module
pub mod util;
pub use util::{
decode_unknown_protobuf, extract_ip_multiaddr, ipaddr_to_multiaddr, is_private_ip,
pretty_print_fields, split_peer_id, WireType,
};
/// Prelude module
pub mod prelude {
pub use super::*;
}

71
src/log.rs Normal file
View File

@ -0,0 +1,71 @@
use std::fmt;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{
field::{Field, Visit},
Event, Level, Subscriber,
};
use tracing_subscriber::{
filter::EnvFilter, layer::Context, prelude::*, registry::LookupSpan, Layer,
};
// Custom tracing layer to send log events over mpsc
struct MpscLayer {
sender: Sender<Message>,
}
/// Custom tracing event that is send and sync
#[derive(Clone, Debug)]
pub struct Message {
/// The log level of the event
pub level: Level,
/// The log message of the event
pub message: String,
}
// Implement a visitor to extract fields from the event
struct FieldVisitor {
message: Option<String>,
}
impl Visit for FieldVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
if field.name() == "message" {
self.message = Some(format!("{:?}", value));
}
}
}
impl<S> Layer<S> for MpscLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let mut visitor = FieldVisitor { message: None };
event.record(&mut visitor);
let event_data = Message {
level: *event.metadata().level(),
message: visitor.message.unwrap_or_default(),
};
let _ = self.sender.try_send(event_data);
}
}
/// Async tracing logger wrapper that filters and feeds log messages over an mpsc channel for
/// integration into the TUI gui.
pub struct Log;
impl Log {
/// Starts the logger and returns the task handle and receiver for the log messages.
pub fn init() -> Receiver<Message> {
let (sender, receiver) = mpsc::channel(16);
let filter = EnvFilter::from_default_env();
let layer = MpscLayer { sender }.with_filter(filter);
tracing_subscriber::registry().with(layer).init();
receiver
}
}

25
src/message.rs Normal file
View File

@ -0,0 +1,25 @@
use crate::ChatPeer;
use libp2p::core::PeerId;
/// The different types of messages sent between the UI and the Peer
#[derive(Debug)]
pub enum Message {
/// Send chat message
Chat {
/// The peer sending the message
from: Option<ChatPeer>,
/// The message sent
data: Vec<u8>,
},
/// All gossipsub peers and their topics
AllPeers {
/// The peers and their topics
peers: Vec<(PeerId, Vec<String>)>,
},
/// Add a peer
AddPeer(ChatPeer),
/// Remove a peer
RemovePeer(ChatPeer),
/// Add an event message
Event(String),
}

71
src/options.rs Normal file
View File

@ -0,0 +1,71 @@
use clap::Parser;
use std::{net::IpAddr, path::PathBuf};
const LISTEN_ADDR: [&str; 1] = ["0.0.0.0"];
const LOCAL_KEY_PATH: &str = "./local";
const LOCAL_CERT_PATH: &str = "./cert.pem";
/// The rust peer command line options
#[derive(Debug, Parser)]
#[clap(name = "ghost-gossiper p2p messenger")]
pub struct Options {
/// WebRTC UDP port for the app.
#[clap(long, env, default_value = "0")]
pub webrtc_port: u16,
/// QUIC UDP port for the app.
#[clap(long, env, default_value = "0")]
pub quic_port: u16,
/// TCP port for the app.
#[clap(long, env, default_value = "0")]
pub tcp_port: u16,
/// Address to listen on.
#[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',', default_values = LISTEN_ADDR)]
pub listen_addresses: Vec<IpAddr>,
/// If known, the external address of this node. Will be used to correctly advertise our external address across all transports.
#[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',')]
pub external_addresses: Vec<IpAddr>,
/// Nodes to connect to on startup. Can be specified several times.
#[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',')]
pub connect: Vec<String>,
/// If set, the path to the local certificate file.
#[clap(long, env, default_value = LOCAL_CERT_PATH)]
pub local_cert_path: PathBuf,
/// If set, the path to the local key file.
#[clap(long, env, default_value = LOCAL_KEY_PATH)]
pub local_key_path: PathBuf,
/// If set, the peer will make autonat client requests (default: true)
#[clap(long, env, default_value = "true")]
pub autonat_client: bool,
/// If set, the peer will act as an autonat server
#[clap(long, env)]
pub autonat_server: bool,
/// If set, the peer will try to upgrade connections using DCUtR (default: true)
#[clap(long, env, default_value = "true")]
pub dcutr: bool,
/// If set, the peer will not initialize the TUI and will run headless.
#[clap(long, env)]
pub headless: bool,
/// If set, the peer will use kademlia (default: true)
#[clap(long, env, default_value = "true")]
pub kademlia: bool,
/// If set, the peer will support relay client connections (default: true)
#[clap(long, env, default_value = "true")]
pub relay_client: bool,
/// If set, the peer will act as a relay server
#[clap(long, env)]
pub relay_server: bool,
}

1114
src/peer.rs Normal file

File diff suppressed because it is too large Load Diff

114
src/ui/headless.rs Normal file
View File

@ -0,0 +1,114 @@
#![allow(dead_code)]
use crate::{log::Message as LogMessage, ChatPeer, Message, Ui};
use async_trait::async_trait;
use libp2p::core::PeerId;
use signal_hook::{consts::SIGTERM, iterator::Signals};
use std::{collections::HashSet, time::Duration};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_util::sync::CancellationToken;
/// A headless UI for the peer
pub struct Headless {
// my peer id
me: ChatPeer,
// we receive log messages from the log thread
from_log: Receiver<LogMessage>,
// we send UI messages to the peer thread
to_peer: Sender<Message>,
// we receive UI messages from the peer thread
from_peer: Receiver<Message>,
// the shutdown token
shutdown: CancellationToken,
// the list of peers
peers: HashSet<ChatPeer>,
}
impl Headless {
/// Create a new UI instance
pub fn build(
me: PeerId,
from_log: Receiver<LogMessage>,
shutdown: CancellationToken,
) -> (Box<dyn Ui + Send>, Sender<Message>, Receiver<Message>) {
// create a new channels for sending/receiving messages
let (to_peer, from_ui) = mpsc::channel::<Message>(64);
let (to_ui, from_peer) = mpsc::channel::<Message>(64);
// create a new TUI instance
let ui: Box<dyn Ui> = Box::new(Self {
me: me.into(),
from_log,
to_peer,
from_peer,
shutdown,
peers: HashSet::new(),
});
(ui, to_ui, from_ui)
}
}
#[async_trait]
impl Ui for Headless {
/// Run the UI
async fn run(&mut self) -> anyhow::Result<()> {
// Register the SIGTERM signal
let mut signals = Signals::new([SIGTERM])?;
println!("Headless UI started");
println!("Press Ctrl+C to exit");
println!("My peer id: {} ({})", self.me.id(), self.me);
// Main loop
'main: loop {
// Process log messages
if let Ok(log) = self.from_log.try_recv() {
//TODO: remove this after [PR 5966](https://github.com/libp2p/rust-libp2p/pull/5966)
if !log.message.starts_with("Can't send data channel") {
println!("{}", log.message);
}
}
// Process peer messages
if let Ok(ui_message) = self.from_peer.try_recv() {
match ui_message {
Message::Chat { from, data } => {
let from = from.map_or("Unknown".to_string(), |peer| peer.to_string());
let message =
String::from_utf8(data).unwrap_or("Invalid UTF-8".to_string());
println!("{}: {}", from, message);
}
Message::AddPeer(peer) => {
if self.peers.insert(peer) {
println!(
"Adding peer:\n\tpeer id: {}\n\tname: {}",
peer.id(),
peer.name()
);
}
}
Message::RemovePeer(peer) => {
if self.peers.remove(&peer) {
println!("Removing peer: {peer:?}");
}
}
Message::Event(event) => {
println!("{}", event);
}
_ => {}
}
}
// check if we have received the shutdown signal from the OS
if signals.pending().next() == Some(SIGTERM) {
println!("Received SIGTERM, shutting down");
self.shutdown.cancel();
break 'main;
}
tokio::time::sleep(Duration::from_millis(18)).await;
}
Ok(())
}
}

15
src/ui/mod.rs Normal file
View File

@ -0,0 +1,15 @@
/// the async UI trait
/// the async UI trait
#[async_trait::async_trait]
pub trait Ui: Send {
/// Run the UI
async fn run(&mut self) -> anyhow::Result<()>;
}
/// the TUI implementation
pub mod tui;
pub use tui::Tui;
/// the headless implementation
pub mod headless;
pub use headless::Headless;

508
src/ui/tui.rs Normal file
View File

@ -0,0 +1,508 @@
use crate::{log::Message as LogMessage, ChatPeer, Message, Ui};
use async_trait::async_trait;
use crossterm::{
event::{
self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEvent, KeyModifiers,
MouseEvent, MouseEventKind,
},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use libp2p::core::PeerId;
use ratatui::{
backend::CrosstermBackend,
layout::{Alignment, Constraint, Direction, Layout},
prelude::{Buffer, Rect, Widget},
style::{Color, Modifier, Style},
text::{Line, Span},
widgets::{Block, Borders, List, ListItem, Paragraph},
Terminal,
};
use std::{
collections::{HashSet, VecDeque}, hash::{DefaultHasher, Hash, Hasher}, io, option::Option, time::Duration
};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
/// A simple UI for the peer
pub struct Tui {
// my peer id
me: ChatPeer,
// we receive log messages from the log thread
from_log: Receiver<LogMessage>,
// we send UI messages to the peer thread
to_peer: Sender<Message>,
// we receive UI messages from the peer thread
from_peer: Receiver<Message>,
// the shutdown token
shutdown: CancellationToken,
}
impl Tui {
/// Create a new UI instance
pub fn build(
me: PeerId,
from_log: Receiver<LogMessage>,
shutdown: CancellationToken,
) -> (Box<dyn Ui + Send>, Sender<Message>, Receiver<Message>) {
// create a new channels for sending/receiving messages
let (to_peer, from_ui) = mpsc::channel::<Message>(64);
let (to_ui, from_peer) = mpsc::channel::<Message>(64);
// create a new TUI instance
let ui: Box<dyn Ui> = Box::new(Self {
me: me.into(),
from_log,
to_peer,
from_peer,
shutdown,
});
(ui, to_ui, from_ui)
}
}
#[async_trait]
impl Ui for Tui {
/// Run the UI
async fn run(&mut self) -> anyhow::Result<()> {
// the currently selected tab
let mut selected_tab = 0;
// TUI setup
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
// Log Widget
let mut log_widget = LinesWidget::new("Log", 200);
// Chat Widget
let mut chat_widget = ChatWidget::new(&self.me);
// Main loop
loop {
// Process log messages
if let Ok(log) = self.from_log.try_recv() {
//TODO: remove this after [PR 5966](https://github.com/libp2p/rust-libp2p/pull/5966)
if !log.message.starts_with("Can't send data channel") {
log_widget.add_line(log.message);
}
}
// Process peer messages
if let Ok(ui_message) = self.from_peer.try_recv() {
match ui_message {
Message::Chat { from, data } => {
let message =
String::from_utf8(data).unwrap_or("Invalid UTF-8".to_string());
chat_widget.add_chat(from, message);
}
Message::AllPeers { peers } => {
for (peer, topics) in peers {
let mut peer_str = format!("{peer}: ");
for topic in topics {
peer_str.push_str(&format!("\n\t{}, ", topic));
}
info!("{peer_str}");
}
}
Message::AddPeer(peer) => {
if chat_widget.peers.insert(peer) {
log_widget.add_line(format!(
"Adding peer:\n\tpeer id: {}\n\tname: {}",
peer.id(),
peer.name()
));
}
}
Message::RemovePeer(peer) => {
if chat_widget.peers.remove(&peer) {
log_widget.add_line(format!("Removing peer: {peer:?}"));
}
}
Message::Event(event) => {
log_widget.add_line(event);
}
}
}
// Draw the UI
terminal.draw(|f| match selected_tab {
0 => f.render_widget(&mut chat_widget, f.area()),
1 => f.render_widget(&mut log_widget, f.area()),
_ => {}
})?;
// Handle input events
if event::poll(Duration::from_millis(18))? {
match event::read()? {
Event::Key(key) => match key {
// Handle ctrl+c
KeyEvent {
code: KeyCode::Char('c'),
modifiers: KeyModifiers::CONTROL,
..
} => {
info!("Received Ctrl+C, shutting down...");
self.shutdown.cancel();
break;
}
// Handle ctrl+shift+p
KeyEvent {
code: KeyCode::Char('p'),
modifiers: KeyModifiers::CONTROL | KeyModifiers::SHIFT,
..
} => {
error!("All peers sent");
self.to_peer
.send(Message::AllPeers { peers: vec![] })
.await?;
}
// Handle all other key events
_ => match key.code {
KeyCode::Tab => {
selected_tab = (selected_tab + 1) % 2;
}
KeyCode::Char(c) if selected_tab == 0 => {
chat_widget.input.push(c);
}
KeyCode::Backspace if selected_tab == 0 => {
chat_widget.input.pop();
}
KeyCode::Enter if selected_tab == 0 => {
let input_str = chat_widget.input.as_str();
if input_str.len() > 0 {
match input_str {
"/quit" => {
info!("Received /quit command, shutting down...");
self.shutdown.cancel();
break;
}
_ => {
// send the chat message to the swarm to be gossiped
self.to_peer
.send(Message::Chat {
from: Some(self.me),
data: chat_widget.input.clone().into_bytes(),
})
.await?;
// add our chat to the local chat widget
chat_widget
.add_chat(Some(self.me), chat_widget.input.clone());
}
}
// clear the input
chat_widget.input.clear();
}
}
_ => {}
},
},
Event::Mouse(event) => match selected_tab {
0 => {
let _ = chat_widget.mouse_event(event);
}
1 => {
let _ = log_widget.mouse_event(event);
}
_ => {}
},
_ => {}
}
}
}
// Cleanup
disable_raw_mode()?;
execute!(io::stdout(), LeaveAlternateScreen, DisableMouseCapture)?;
Ok(())
}
}
// Function to wrap text into multiple lines based on a max width
fn wrap_text<'a>(line: Line<'a>, max_width: usize) -> Vec<Line<'a>> {
let mut wrapped_lines = Vec::new();
let mut words_with_styles: Vec<(String, Style)> = Vec::new();
for span in line.spans {
let style = span.style;
for word in span.content.split_inclusive(' ') {
words_with_styles.push((word.to_string(), style));
}
}
if words_with_styles.is_empty() {
return vec![Line::from("")];
}
let leading_whitespace = words_with_styles[0].0
.chars()
.take_while(|c| c.is_whitespace())
.collect::<String>();
let mut current_line_spans: Vec<Span<'a>> = Vec::new();
let mut current_line_width = 0;
for (word_raw, style) in words_with_styles {
let word = word_raw.replace('\t', " ");
let word_len = word.chars().count();
if current_line_width + word_len > max_width {
if !current_line_spans.is_empty() {
wrapped_lines.push(Line::from(current_line_spans.drain(..).collect::<Vec<_>>()));
}
if word_len > max_width {
let mut remaining = word.clone();
while !remaining.is_empty() {
let split_point = std::cmp::min(remaining.len(), max_width);
let (chunk, rest) = remaining.split_at(split_point);
let formatted_chunk = format!("{}{}", leading_whitespace, chunk);
wrapped_lines.push(Line::from(Span::styled(formatted_chunk, style)));
remaining = rest.to_string();
}
current_line_width = 0;
} else {
let formatted_word = format!("{}{}", leading_whitespace, word.trim_start());
current_line_width = formatted_word.chars().count();
current_line_spans.push(Span::styled(formatted_word, style));
}
} else {
let content = if current_line_spans.is_empty() && current_line_width == 0 {
format!("{}{}", leading_whitespace, word.trim_start())
} else {
word.clone()
};
current_line_width += content.chars().count();
current_line_spans.push(Span::styled(content, style));
}
}
if !current_line_spans.is_empty() {
wrapped_lines.push(Line::from(current_line_spans));
}
wrapped_lines
}
// Lines Widget
struct LinesWidget<'a> {
title: String,
max: usize,
lines: VecDeque<Line<'a>>,
scroll: usize,
area: Rect,
}
impl<'a> LinesWidget<'a> {
// Create a new LogWidget instance
fn new(title: impl Into<String>, max: usize) -> Self {
Self {
title: title.into(),
max,
lines: VecDeque::new(),
scroll: 0,
area: Rect::default(),
}
}
// Handle a mouse event
fn mouse_event(&mut self, event: MouseEvent) -> bool {
// check if the event happened in our area
let x = event.column;
let y = event.row;
if x >= self.area.x
&& x < self.area.x + self.area.width
&& y >= self.area.y
&& y < self.area.y + self.area.height
{
match event.kind {
MouseEventKind::ScrollUp => {
self.scroll += 1;
}
MouseEventKind::ScrollDown => {
if self.scroll > 0 {
self.scroll -= 1;
}
}
_ => {}
}
true
} else {
false
}
}
// Add a line to the widget
fn add_line(&mut self, line: impl Into<Line<'a>>) {
self.lines.push_back(line.into());
if self.lines.len() > self.max {
self.lines.drain(0..(self.lines.len() - self.max));
}
}
}
impl<'a> Widget for &mut LinesWidget<'a> {
fn render(self, area: Rect, buf: &mut Buffer) {
let block = Block::default()
.title(self.title.as_str())
.title_alignment(Alignment::Right)
.borders(Borders::ALL)
.style(Style::default());
self.area = block.inner(area);
let inner_area = self.area;
let max_lines = inner_area.height as usize;
let mut logs: Vec<ListItem> = self
.lines
.iter()
.flat_map(|l| {
let wrapped_lines = wrap_text(l.clone(), inner_area.width as usize - 2);
wrapped_lines
.into_iter()
.map(ListItem::new)
.collect::<Vec<_>>()
})
.collect();
if logs.len() > max_lines {
if logs.len() > (max_lines + self.scroll) {
logs.drain(0..(logs.len() - max_lines - self.scroll));
} else {
self.scroll = max_lines;
}
}
List::new(logs).block(block).render(area, buf);
}
}
// Chat Widget
struct ChatWidget<'a> {
me: &'a ChatPeer,
peers: HashSet<ChatPeer>,
chat: LinesWidget<'a>,
input: String,
}
impl<'a> ChatWidget<'a> {
// Create a new ChatWidget instance
fn new(me: &'a ChatPeer) -> Self {
let mut peers = HashSet::new();
peers.insert(*me);
ChatWidget {
me,
peers,
chat: LinesWidget::new("Chat", 100),
input: String::new(),
}
}
// Handle a mouse event
fn mouse_event(&mut self, event: MouseEvent) -> bool {
self.chat.mouse_event(event) || self.chat.mouse_event(event)
}
// Add a chat message to the widget
fn add_chat(&mut self, peer: Option<ChatPeer>, message: impl Into<String>) {
use ratatui::prelude::Stylize;
let peer = peer.map_or("Unknown".to_string(), |p| p.to_string());
let time_str = chrono::Local::now().format("[%H:%M:%S]");
let mut hasher = DefaultHasher::new();
peer.hash(&mut hasher);
let hash = hasher.finish();
let color_index = (hash % 216) as u8 + 16;
self.chat.add_line(
Line::from(vec![
time_str.to_string().into(),
" ".into(),
peer.fg(Color::Indexed(color_index)),
": ".into(),
message.into().into()
])
)
}
}
impl Widget for &mut ChatWidget<'_> {
fn render(self, area: Rect, buf: &mut Buffer) {
let layout = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Length(3),
Constraint::Fill(1),
Constraint::Length(3),
]
.as_ref(),
)
.split(area);
let topic_block = Block::default()
.title("Conversation")
.title_alignment(Alignment::Right)
.borders(Borders::ALL)
.style(Style::default());
Paragraph::new("Topic: ghost-gossip-main")
.block(topic_block)
.render(layout[0], buf);
// calculate the layout for the top row
let content_layout = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(100), Constraint::Length(24)].as_ref())
.split(layout[1]);
// render the chat messages
self.chat.render(content_layout[0], buf);
// render the peers list
let peers_block = Block::default()
.title("Peers")
.title_alignment(Alignment::Right)
.borders(Borders::ALL)
.style(Style::default());
let peers: Vec<ListItem> = self
.peers
.iter()
.map(|p| {
if p == self.me {
ListItem::new(Span::styled(
format!("{} (You)", p),
Style::default().add_modifier(Modifier::ITALIC),
))
} else {
ListItem::new(Span::raw(p.to_string()))
}
})
.collect();
List::new(peers)
.block(peers_block)
.render(content_layout[1], buf);
// render the chat input
let message_block = Block::default()
.title("Type a message")
.title_alignment(Alignment::Right)
.borders(Borders::ALL)
.style(Style::default());
Paragraph::new(format!("{} > {}", self.me, self.input.clone()))
.block(message_block)
.render(layout[2], buf);
}
}

189
src/util.rs Normal file
View File

@ -0,0 +1,189 @@
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
use quick_protobuf::reader::BytesReader;
use std::{convert::TryFrom, fmt, net::IpAddr};
/// Define protobuf wire types since they are no longer in quick-protobuf
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum WireType {
/// Varint wire type
Varint = 0,
/// Fixed64 wire type
Fixed64 = 1,
/// Length-delimited wire type
LengthDelimited = 2,
/// Start group wire type
StartGroup = 3,
/// End group wire type
EndGroup = 4,
/// Fixed32 wire type
Fixed32 = 5,
}
/// Error type for TryFrom conversion
#[derive(Debug)]
pub struct InvalidWireType(u32);
impl fmt::Display for InvalidWireType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Invalid wire type value: {}", self.0)
}
}
impl std::error::Error for InvalidWireType {}
impl TryFrom<u32> for WireType {
type Error = InvalidWireType;
fn try_from(tag: u32) -> Result<Self, Self::Error> {
// Extract wire type from the lower 3 bits
let wire_type_value = tag & 0x07;
match wire_type_value {
0 => Ok(WireType::Varint),
1 => Ok(WireType::Fixed64),
2 => Ok(WireType::LengthDelimited),
3 => Ok(WireType::StartGroup),
4 => Ok(WireType::EndGroup),
5 => Ok(WireType::Fixed32),
invalid => Err(InvalidWireType(invalid)),
}
}
}
/// Decode an unknown protobuf message into a list of fields
pub fn decode_unknown_protobuf(bytes: &[u8]) -> anyhow::Result<Vec<String>> {
let mut reader = BytesReader::from_bytes(bytes);
let mut fields = Vec::new();
// Read the next tag
while let Ok(tag) = reader.next_tag(bytes) {
// Extract field number and wire type
let field_number = tag >> 3;
let wire_type = WireType::try_from(tag).map_err(|e| {
quick_protobuf::Error::Message(format!("Invalid wire type value: {}", e.0))
})?;
// Decode the value based on wire type
let value = match wire_type {
WireType::Varint => {
let varint = reader.read_varint64(bytes)?;
format!("int64: {}", varint) // Could also be int32, uint32, etc.
}
WireType::Fixed64 => {
let fixed64 = reader.read_fixed64(bytes)?;
format!("fixed64: {}", fixed64) // Could also be double
}
WireType::LengthDelimited => {
let len = reader.read_varint32(bytes)? as usize;
let data = reader.read_bytes(bytes)?;
// Try to interpret as string; if it fails, treat as raw bytes
match std::str::from_utf8(data) {
Ok(s) => format!("string: \"{}\"", s),
Err(_) => format!("bytes({}): {}", len, hex::encode(data)),
}
}
WireType::Fixed32 => {
let fixed32 = reader.read_fixed32(bytes)?;
format!("fixed32: {}", fixed32) // Could also be float
}
WireType::StartGroup | WireType::EndGroup => {
// Groups are deprecated and rare; skip for simplicity
return Err(
quick_protobuf::Error::Message("Groups not supported".to_string()).into(),
);
}
};
fields.push(format!(
"Field {} ({:?}): {}",
field_number, wire_type, value
));
}
Ok(fields)
}
/// Pretty print a list of fields
pub fn pretty_print_fields(fields: &[String]) -> String {
let mut output = String::new();
output.push_str("Decoded Protobuf Message {\n");
for field in fields {
output.push_str(" ");
output.push_str(field);
output.push('\n');
}
output.push('}');
output
}
/// Split the PeerId from a Multiaddr
pub fn split_peer_id(multiaddr: Multiaddr) -> Option<(Multiaddr, PeerId)> {
let mut base_addr = Multiaddr::empty();
let mut peer_id = None;
// Iterate over the protocols in the Multiaddr
for protocol in multiaddr.into_iter() {
if let Protocol::P2p(id) = protocol {
peer_id = Some(id);
break; // Stop once we find the P2p component
} else {
base_addr.push(protocol); // Add non-P2p components to the base address
}
}
peer_id.map(|id| (base_addr, id))
}
/// Extract the IP address from a Multiaddr
pub fn extract_ip_multiaddr(multiaddr: &Multiaddr) -> Option<Multiaddr> {
let mut result = Multiaddr::empty();
for component in multiaddr.into_iter() {
match component {
Protocol::Ip4(addr) => {
result.push(Protocol::Ip4(addr));
return Some(result);
}
Protocol::Ip6(addr) => {
result.push(Protocol::Ip6(addr));
return Some(result);
}
_ => continue,
}
}
None
}
/// Check if a Multiaddr contains a private IP address
pub fn is_private_ip(multiaddr: &Multiaddr) -> bool {
for component in multiaddr.into_iter() {
match component {
Protocol::Ip4(addr) => {
return addr.is_private() || // 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16
addr.is_loopback() || // 127.0.0.0/8
addr.is_link_local() || // 169.254.0.0/16
addr.is_unspecified(); // 0.0.0.0
}
Protocol::Ip6(addr) => {
return addr.is_loopback() || // ::1
addr.is_unspecified() || // ::
// Unique Local Address (fc00::/7 where 8th bit is 1)
(addr.segments()[0] & 0xfe00 == 0xfc00) ||
// Link-Local unicast (fe80::/10)
(addr.segments()[0] & 0xffc0 == 0xfe80);
}
_ => continue,
}
}
false
}
/// Convert an IP address to a Multiaddr
pub fn ipaddr_to_multiaddr(ip: &IpAddr) -> Multiaddr {
let multiaddr = match ip {
IpAddr::V4(ipv4) => Multiaddr::empty().with(Protocol::Ip4(*ipv4)),
IpAddr::V6(ipv6) => Multiaddr::empty().with(Protocol::Ip6(*ipv6)),
};
multiaddr
}