From ef69d13a4479abf65450a024db3f1829133b81e7 Mon Sep 17 00:00:00 2001 From: Uncle Stretch Date: Sun, 28 Dec 2025 15:04:38 +0300 Subject: [PATCH] network configuration is exposed by cli flags Signed-off-by: Uncle Stretch --- Cargo.lock | 718 +------------------------------------------- Cargo.toml | 3 +- src/bin/main.rs | 26 +- src/message.rs | 18 +- src/options.rs | 576 +++++++++++++++++++++++++++++++++++- src/peer.rs | 723 ++++++++++++++++++++++++++------------------- src/ui/headless.rs | 17 +- src/ui/tui.rs | 20 +- 8 files changed, 1028 insertions(+), 1073 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc65ea0..d196dc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,41 +144,19 @@ version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" -[[package]] -name = "arc-swap" -version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" - [[package]] name = "arrayref" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" -[[package]] -name = "asn1-rs" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5493c3bedbacf7fd7382c6346bbd66687d12bbaad3a89a2d2c303ee6cf20b048" -dependencies = [ - "asn1-rs-derive 0.5.1", - "asn1-rs-impl", - "displaydoc", - "nom", - "num-traits", - "rusticata-macros", - "thiserror 1.0.69", - "time", -] - [[package]] name = "asn1-rs" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56624a96882bb8c26d61312ae18cb45868e5a9992ea73c58e45c3101e56a1e60" dependencies = [ - "asn1-rs-derive 0.6.0", + "asn1-rs-derive", "asn1-rs-impl", "displaydoc", "nom", @@ -188,18 +166,6 @@ dependencies = [ "time", ] -[[package]] -name = "asn1-rs-derive" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "synstructure", -] - [[package]] name = "asn1-rs-derive" version = "0.6.0" @@ -339,12 +305,6 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" -[[package]] -name = "base16ct" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" - [[package]] name = "base64" version = "0.22.1" @@ -357,15 +317,6 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" -[[package]] -name = "bincode" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" -dependencies = [ - "serde", -] - [[package]] name = "bitflags" version = "1.3.2" @@ -396,15 +347,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "block-padding" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" -dependencies = [ - "generic-array", -] - [[package]] name = "bs58" version = "0.5.1" @@ -447,15 +389,6 @@ dependencies = [ "rustversion", ] -[[package]] -name = "cbc" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" -dependencies = [ - "cipher", -] - [[package]] name = "cc" version = "1.2.34" @@ -465,18 +398,6 @@ dependencies = [ "shlex", ] -[[package]] -name = "ccm" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ae3c82e4355234767756212c570e29833699ab63e6ffd161887314cc5b43847" -dependencies = [ - "aead", - "cipher", - "ctr", - "subtle", -] - [[package]] name = "cfg-if" version = "1.0.3" @@ -646,21 +567,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "critical-section" version = "1.2.0" @@ -732,18 +638,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" -[[package]] -name = "crypto-bigint" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" -dependencies = [ - "generic-array", - "rand_core 0.6.4", - "subtle", - "zeroize", -] - [[package]] name = "crypto-common" version = "0.1.6" @@ -859,31 +753,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" dependencies = [ "const-oid", - "pem-rfc7468", "zeroize", ] -[[package]] -name = "der-parser" -version = "9.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cd0a5c643689626bec213c4d8bd4d96acc8ffdb4ad4bb6bc16abf27d5f4b553" -dependencies = [ - "asn1-rs 0.6.2", - "displaydoc", - "nom", - "num-bigint", - "num-traits", - "rusticata-macros", -] - [[package]] name = "der-parser" version = "10.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07da5016415d5a3c4dd39b11ed26f915f52fc4e0dc197d87908bc916e51bc1a6" dependencies = [ - "asn1-rs 0.7.1", + "asn1-rs", "displaydoc", "nom", "num-bigint", @@ -907,7 +786,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", - "const-oid", "crypto-common", "subtle", ] @@ -929,20 +807,6 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6add3b8cff394282be81f3fc1a0605db594ed69890078ca6e2cab1c408bcf04" -[[package]] -name = "ecdsa" -version = "0.16.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca" -dependencies = [ - "der", - "digest", - "elliptic-curve", - "rfc6979", - "signature", - "spki", -] - [[package]] name = "ed25519" version = "2.2.3" @@ -973,27 +837,6 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" -[[package]] -name = "elliptic-curve" -version = "0.13.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47" -dependencies = [ - "base16ct", - "crypto-bigint", - "digest", - "ff", - "generic-array", - "group", - "hkdf", - "pem-rfc7468", - "pkcs8", - "rand_core 0.6.4", - "sec1", - "subtle", - "zeroize", -] - [[package]] name = "enum-as-inner" version = "0.6.1" @@ -1043,16 +886,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "ff" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0b50bfb653653f9ca9095b427bed08ab8d75a137839d9ad64eb11810d5b6393" -dependencies = [ - "rand_core 0.6.4", - "subtle", -] - [[package]] name = "fiat-crypto" version = "0.2.9" @@ -1229,7 +1062,6 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", - "zeroize", ] [[package]] @@ -1271,7 +1103,7 @@ dependencies = [ [[package]] name = "ghost-echo" -version = "0.0.1" +version = "0.0.2" dependencies = [ "anyhow", "async-trait", @@ -1282,7 +1114,6 @@ dependencies = [ "futures-timer", "hex", "libp2p", - "libp2p-webrtc", "quick-protobuf", "rand 0.9.2", "rand_core 0.6.4", @@ -1302,17 +1133,6 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" -[[package]] -name = "group" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" -dependencies = [ - "ff", - "rand_core 0.6.4", - "subtle", -] - [[package]] name = "h2" version = "0.4.12" @@ -1746,7 +1566,6 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" dependencies = [ - "block-padding", "generic-array", ] @@ -1763,26 +1582,6 @@ dependencies = [ "syn", ] -[[package]] -name = "interceptor" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ab04c530fd82e414e40394cabe5f0ebfe30d119f10fe29d6e3561926af412e" -dependencies = [ - "async-trait", - "bytes", - "log", - "portable-atomic", - "rand 0.8.5", - "rtcp", - "rtp", - "thiserror 1.0.69", - "tokio", - "waitgroup", - "webrtc-srtp", - "webrtc-util", -] - [[package]] name = "io-uring" version = "0.7.10" @@ -2323,7 +2122,7 @@ dependencies = [ "rustls", "rustls-webpki", "thiserror 2.0.16", - "x509-parser 0.17.0", + "x509-parser", "yasna", ] @@ -2342,54 +2141,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "libp2p-webrtc" -version = "0.9.0-alpha.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57bc51d86236d33762bccf5015e4ece458c549476c362040d4e1e6f3615e41b0" -dependencies = [ - "async-trait", - "futures", - "futures-timer", - "hex", - "if-watch", - "libp2p-core", - "libp2p-identity", - "libp2p-noise", - "libp2p-webrtc-utils", - "multihash", - "rand 0.8.5", - "rcgen", - "stun", - "thiserror 2.0.16", - "tokio", - "tokio-util", - "tracing", - "webrtc", -] - -[[package]] -name = "libp2p-webrtc-utils" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "490abff5ee5f9a7a77f0145c79cc97c76941231a3626f4dee18ebf2abb95618f" -dependencies = [ - "asynchronous-codec", - "bytes", - "futures", - "hex", - "libp2p-core", - "libp2p-identity", - "libp2p-noise", - "quick-protobuf", - "quick-protobuf-codec", - "rand 0.8.5", - "serde", - "sha2", - "tinytemplate", - "tracing", -] - [[package]] name = "libp2p-yamux" version = "0.47.0" @@ -2476,31 +2227,12 @@ dependencies = [ "regex-automata 0.1.10", ] -[[package]] -name = "md-5" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" -dependencies = [ - "cfg-if", - "digest", -] - [[package]] name = "memchr" version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" -[[package]] -name = "memoffset" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" -dependencies = [ - "autocfg", -] - [[package]] name = "memory-stats" version = "1.2.0" @@ -2684,8 +2416,6 @@ dependencies = [ "bitflags 1.3.2", "cfg-if", "libc", - "memoffset", - "pin-utils", ] [[package]] @@ -2776,22 +2506,13 @@ dependencies = [ "memchr", ] -[[package]] -name = "oid-registry" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8d8034d9489cdaf79228eb9f6a3b8d7bb32ba00d6645ebd48eef4077ceb5bd9" -dependencies = [ - "asn1-rs 0.6.2", -] - [[package]] name = "oid-registry" version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12f40cff3dde1b6087cc5d5f5d4d65712f34016a03ed60e9c08dcc392736b5b7" dependencies = [ - "asn1-rs 0.7.1", + "asn1-rs", ] [[package]] @@ -2822,30 +2543,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" -[[package]] -name = "p256" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9863ad85fa8f4460f9c48cb909d38a0d689dba1f6f6988a5e3e0d31071bcd4b" -dependencies = [ - "ecdsa", - "elliptic-curve", - "primeorder", - "sha2", -] - -[[package]] -name = "p384" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe42f1670a52a47d448f14b6a5c61dd78fce51856e68edaa38f7ae3a46b8d6b6" -dependencies = [ - "ecdsa", - "elliptic-curve", - "primeorder", - "sha2", -] - [[package]] name = "parking" version = "2.2.1" @@ -2891,15 +2588,6 @@ dependencies = [ "serde", ] -[[package]] -name = "pem-rfc7468" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" -dependencies = [ - "base64ct", -] - [[package]] name = "percent-encoding" version = "2.3.2" @@ -3015,15 +2703,6 @@ dependencies = [ "zerocopy", ] -[[package]] -name = "primeorder" -version = "0.13.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6" -dependencies = [ - "elliptic-curve", -] - [[package]] name = "proc-macro2" version = "1.0.101" @@ -3259,7 +2938,6 @@ dependencies = [ "ring", "rustls-pki-types", "time", - "x509-parser 0.16.0", "yasna", ] @@ -3322,16 +3000,6 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95325155c684b1c89f7765e30bc1c42e4a6da51ca513615660cb8a62ef9a88e3" -[[package]] -name = "rfc6979" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2" -dependencies = [ - "hmac", - "subtle", -] - [[package]] name = "ring" version = "0.17.14" @@ -3346,17 +3014,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rtcp" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8306430fb118b7834bbee50e744dc34826eca1da2158657a3d6cbc70e24c2096" -dependencies = [ - "bytes", - "thiserror 1.0.69", - "webrtc-util", -] - [[package]] name = "rtnetlink" version = "0.13.1" @@ -3375,21 +3032,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "rtp" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e68baca5b6cb4980678713f0d06ef3a432aa642baefcbfd0f4dd2ef9eb5ab550" -dependencies = [ - "bytes", - "memchr", - "portable-atomic", - "rand 0.8.5", - "serde", - "thiserror 1.0.69", - "webrtc-util", -] - [[package]] name = "rustc-demangle" version = "0.1.26" @@ -3516,32 +3158,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sdp" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02a526161f474ae94b966ba622379d939a8fe46c930eebbadb73e339622599d5" -dependencies = [ - "rand 0.8.5", - "substring", - "thiserror 1.0.69", - "url", -] - -[[package]] -name = "sec1" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc" -dependencies = [ - "base16ct", - "der", - "generic-array", - "pkcs8", - "subtle", - "zeroize", -] - [[package]] name = "semver" version = "1.0.26" @@ -3580,17 +3196,6 @@ dependencies = [ "serde", ] -[[package]] -name = "sha1" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest", -] - [[package]] name = "sha2" version = "0.10.9" @@ -3653,7 +3258,6 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ - "digest", "rand_core 0.6.4", ] @@ -3669,15 +3273,6 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" -[[package]] -name = "smol_str" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd538fb6910ac1099850255cf94a94df6551fbdd602454387d0adb2d1ca6dead" -dependencies = [ - "serde", -] - [[package]] name = "snow" version = "0.9.6" @@ -3765,34 +3360,6 @@ dependencies = [ "syn", ] -[[package]] -name = "stun" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea256fb46a13f9204e9dee9982997b2c3097db175a9fddaa8350310d03c4d5a3" -dependencies = [ - "base64", - "crc", - "lazy_static", - "md-5", - "rand 0.8.5", - "ring", - "subtle", - "thiserror 1.0.69", - "tokio", - "url", - "webrtc-util", -] - -[[package]] -name = "substring" -version = "1.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42ee6433ecef213b2e72f587ef64a2f5943e7cd16fbd82dbe8bc07486c534c86" -dependencies = [ - "autocfg", -] - [[package]] name = "subtle" version = "2.6.1" @@ -3952,16 +3519,6 @@ dependencies = [ "zerovec", ] -[[package]] -name = "tinytemplate" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" -dependencies = [ - "serde", - "serde_json", -] - [[package]] name = "tinyvec" version = "1.10.0" @@ -4098,27 +3655,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "turn" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0044fdae001dd8a1e247ea6289abf12f4fcea1331a2364da512f9cd680bbd8cb" -dependencies = [ - "async-trait", - "base64", - "futures", - "log", - "md-5", - "portable-atomic", - "rand 0.8.5", - "ring", - "stun", - "thiserror 1.0.69", - "tokio", - "tokio-util", - "webrtc-util", -] - [[package]] name = "typenum" version = "1.18.0" @@ -4247,15 +3783,6 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" -[[package]] -name = "waitgroup" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1f50000a783467e6c0200f9d10642f4bc424e39efc1b770203e88b488f79292" -dependencies = [ - "atomic-waker", -] - [[package]] name = "want" version = "0.3.1" @@ -4348,217 +3875,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webrtc" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30367074d9f18231d28a74fab0120856b2b665da108d71a12beab7185a36f97b" -dependencies = [ - "arc-swap", - "async-trait", - "bytes", - "cfg-if", - "hex", - "interceptor", - "lazy_static", - "log", - "pem", - "portable-atomic", - "rand 0.8.5", - "rcgen", - "regex", - "ring", - "rtcp", - "rtp", - "rustls", - "sdp", - "serde", - "serde_json", - "sha2", - "smol_str", - "stun", - "thiserror 1.0.69", - "time", - "tokio", - "turn", - "url", - "waitgroup", - "webrtc-data", - "webrtc-dtls", - "webrtc-ice", - "webrtc-mdns", - "webrtc-media", - "webrtc-sctp", - "webrtc-srtp", - "webrtc-util", -] - -[[package]] -name = "webrtc-data" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec93b991efcd01b73c5b3503fa8adba159d069abe5785c988ebe14fcf8f05d1" -dependencies = [ - "bytes", - "log", - "portable-atomic", - "thiserror 1.0.69", - "tokio", - "webrtc-sctp", - "webrtc-util", -] - -[[package]] -name = "webrtc-dtls" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7c9b89fc909f9da0499283b1112cd98f72fec28e55a54a9e352525ca65cd95c" -dependencies = [ - "aes", - "aes-gcm", - "async-trait", - "bincode", - "byteorder", - "cbc", - "ccm", - "der-parser 9.0.0", - "hkdf", - "hmac", - "log", - "p256", - "p384", - "pem", - "portable-atomic", - "rand 0.8.5", - "rand_core 0.6.4", - "rcgen", - "ring", - "rustls", - "sec1", - "serde", - "sha1", - "sha2", - "subtle", - "thiserror 1.0.69", - "tokio", - "webrtc-util", - "x25519-dalek", - "x509-parser 0.16.0", -] - -[[package]] -name = "webrtc-ice" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0348b28b593f7709ac98d872beb58c0009523df652c78e01b950ab9c537ff17d" -dependencies = [ - "arc-swap", - "async-trait", - "crc", - "log", - "portable-atomic", - "rand 0.8.5", - "serde", - "serde_json", - "stun", - "thiserror 1.0.69", - "tokio", - "turn", - "url", - "uuid", - "waitgroup", - "webrtc-mdns", - "webrtc-util", -] - -[[package]] -name = "webrtc-mdns" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6dfe9686c6c9c51428da4de415cb6ca2dc0591ce2b63212e23fd9cccf0e316b" -dependencies = [ - "log", - "socket2 0.5.10", - "thiserror 1.0.69", - "tokio", - "webrtc-util", -] - -[[package]] -name = "webrtc-media" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e153be16b8650021ad3e9e49ab6e5fa9fb7f6d1c23c213fd8bbd1a1135a4c704" -dependencies = [ - "byteorder", - "bytes", - "rand 0.8.5", - "rtp", - "thiserror 1.0.69", -] - -[[package]] -name = "webrtc-sctp" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5faf3846ec4b7e64b56338d62cbafe084aa79806b0379dff5cc74a8b7a2b3063" -dependencies = [ - "arc-swap", - "async-trait", - "bytes", - "crc", - "log", - "portable-atomic", - "rand 0.8.5", - "thiserror 1.0.69", - "tokio", - "webrtc-util", -] - -[[package]] -name = "webrtc-srtp" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "771db9993712a8fb3886d5be4613ebf27250ef422bd4071988bf55f1ed1a64fa" -dependencies = [ - "aead", - "aes", - "aes-gcm", - "byteorder", - "bytes", - "ctr", - "hmac", - "log", - "rtcp", - "rtp", - "sha1", - "subtle", - "thiserror 1.0.69", - "tokio", - "webrtc-util", -] - -[[package]] -name = "webrtc-util" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1438a8fd0d69c5775afb4a71470af92242dbd04059c61895163aa3c1ef933375" -dependencies = [ - "async-trait", - "bitflags 1.3.2", - "bytes", - "ipnet", - "lazy_static", - "libc", - "log", - "nix", - "portable-atomic", - "rand 0.8.5", - "thiserror 1.0.69", - "tokio", - "winapi", -] - [[package]] name = "widestring" version = "1.2.0" @@ -5033,36 +4349,18 @@ dependencies = [ "zeroize", ] -[[package]] -name = "x509-parser" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcbc162f30700d6f3f82a24bf7cc62ffe7caea42c0b2cba8bf7f3ae50cf51f69" -dependencies = [ - "asn1-rs 0.6.2", - "data-encoding", - "der-parser 9.0.0", - "lazy_static", - "nom", - "oid-registry 0.7.1", - "ring", - "rusticata-macros", - "thiserror 1.0.69", - "time", -] - [[package]] name = "x509-parser" version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4569f339c0c402346d4a75a9e39cf8dad310e287eef1ff56d4c68e5067f53460" dependencies = [ - "asn1-rs 0.7.1", + "asn1-rs", "data-encoding", - "der-parser 10.0.0", + "der-parser", "lazy_static", "nom", - "oid-registry 0.8.1", + "oid-registry", "rusticata-macros", "thiserror 2.0.16", "time", diff --git a/Cargo.toml b/Cargo.toml index 030b14e..82e5039 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ghost-echo" -version = "0.0.2" +version = "0.0.3" edition = "2021" [[bin]] @@ -17,7 +17,6 @@ futures = "0.3.31" futures-timer = "3.0.3" hex = "0.4.3" libp2p = { version = "0.56", features = ["identify", "ping", "tokio", "gossipsub", "macros", "relay", "kad", "rsa", "ed25519", "quic", "request-response", "dns", "memory-connection-limits", "tcp", "noise", "yamux", "autonat", "tls", "dcutr"] } -libp2p-webrtc = { version = "0.9.0-alpha", features = ["tokio", "pem"] } quick-protobuf = "0.8.1" rand = "0.9.0" rand_core = { version = "0.6.4", features = ["getrandom"] } diff --git a/src/bin/main.rs b/src/bin/main.rs index 733cc5d..036e079 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -3,7 +3,6 @@ use ghost_echo::prelude::*; use anyhow::Result; use clap::Parser; use libp2p::{identity, PeerId}; -use libp2p_webrtc::tokio::Certificate; use std::path::{Path, PathBuf}; use tokio::{fs, task::JoinHandle}; use tokio_util::sync::CancellationToken; @@ -20,9 +19,8 @@ async fn main() -> Result<()> { // create a shutdown token let shutdown = CancellationToken::new(); - // load the identity and certificate + // load the local identity let local_key = read_or_create_identity(&opt.local_key_path).await?; - let webrtc_cert = read_or_create_certificate(&opt.local_cert_path).await?; // create the ui and the channels to communicate with it let (mut ui, to_ui, from_ui) = if opt.headless { @@ -32,7 +30,7 @@ async fn main() -> Result<()> { }; // create the peer, connecting it to the ui - let mut peer = Peer::new(local_key, webrtc_cert, to_ui, from_ui, shutdown.clone()).await?; + let mut peer = Peer::new(local_key, to_ui, from_ui, shutdown.clone()).await?; // spawn tasks for both the swarm and the ui let peer_task: JoinHandle> = tokio::spawn(async move { peer.run().await }); @@ -48,26 +46,6 @@ async fn main() -> Result<()> { Ok(()) } -async fn read_or_create_certificate(path: &Path) -> Result { - if path.exists() { - let pem = fs::read_to_string(&path).await?; - - info!("Using existing certificate from {}", path.display()); - - return Ok(Certificate::from_pem(&pem)?); - } - - let cert = Certificate::generate(&mut rand_core::OsRng)?; - fs::write(&path, &cert.serialize_pem().as_bytes()).await?; - - info!( - "Generated new certificate and wrote it to {}", - path.display() - ); - - Ok(cert) -} - async fn read_or_create_identity(path: &Path) -> Result { let mut key_path = PathBuf::from(path); let is_key = key_path diff --git a/src/message.rs b/src/message.rs index 2228d59..c5113d3 100644 --- a/src/message.rs +++ b/src/message.rs @@ -17,9 +17,23 @@ pub enum Message { peers: Vec<(PeerId, Vec)>, }, /// Add a peer - AddPeer(ChatPeer), + AddPeer { + /// Topic name. + topic: Option, + /// Peer id. + chat_peer: ChatPeer, + }, /// Remove a peer - RemovePeer(ChatPeer), + RemovePeer { + /// Topic name. + topic: Option, + /// Peer id. + chat_peer: ChatPeer, + }, /// Add an event message Event(String), + /// Add new topic + AddTopic(String), + /// Remove existing topic + RemoveTopic(String), } diff --git a/src/options.rs b/src/options.rs index ef5605c..ffc33bd 100644 --- a/src/options.rs +++ b/src/options.rs @@ -1,18 +1,39 @@ -use clap::Parser; -use std::{net::IpAddr, path::PathBuf}; +use clap::{Args, Parser, ValueEnum}; +use libp2p::{ + gossipsub::ValidationMode, + identity::Keypair, + kad::{BucketInserts, Caching, StoreInserts}, + Multiaddr, PeerId, +}; +use std::{net::IpAddr, num::NonZero, path::PathBuf, time::Duration}; const LISTEN_ADDR: [&str; 1] = ["0.0.0.0"]; const LOCAL_KEY_PATH: &str = "./local"; -const LOCAL_CERT_PATH: &str = "./cert.pem"; + +/// Available authenticity options. +#[derive(ValueEnum, Clone, Copy, Debug, PartialEq, Eq)] +pub enum GossipAuthenticity { + /// Message signing is enabled, the author will be the owner of the key and the sequence number + /// will be linearly increasing. + Signed, + + /// Message signing is disabled. The specified PeerId will be used as the author of all + /// published messages. The sequence number will be randomized. + Author, + + /// Message signing is disabled. A random PeerId will be used when publishing each message. The + /// sequence number will be randomized. + RandomAuthor, + + /// Message signing is disabled. The author of the message and the sequence numbers are + /// excluded from the message. + Anonymous, +} /// The rust peer command line options #[derive(Debug, Parser)] -#[clap(name = "ghost-gossiper p2p messenger")] +#[clap(author, version, about)] pub struct Options { - /// WebRTC UDP port for the app. - #[clap(long, env, default_value = "0")] - pub webrtc_port: u16, - /// QUIC UDP port for the app. #[clap(long, env, default_value = "0")] pub quic_port: u16, @@ -21,11 +42,11 @@ pub struct Options { #[clap(long, env, default_value = "0")] pub tcp_port: u16, - /// Address to listen on. + /// Addresses to listen on. #[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',', default_values = LISTEN_ADDR)] pub listen_addresses: Vec, - /// If known, the external address of this node. Will be used to correctly advertise our external address across all transports. + /// The external address of this node. Will be used to correctly advertise our external address across all transports. #[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',')] pub external_addresses: Vec, @@ -33,19 +54,31 @@ pub struct Options { #[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',')] pub connect: Vec, - /// If set, the path to the local certificate file. - #[clap(long, env, default_value = LOCAL_CERT_PATH)] - pub local_cert_path: PathBuf, + /// Bootnodes to be connected to bootstrap the network connectivity. + #[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',')] + pub bootnodes: Option>, - /// If set, the path to the local key file. + /// Topics to be subscribed on. + #[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',')] + pub topics: Option>, + + /// The path to the local key file. #[clap(long, env, default_value = LOCAL_KEY_PATH)] pub local_key_path: PathBuf, - /// If set, the peer will make autonat client requests (default: true) + /// Connection limits options. + #[command(flatten)] + pub connection_limits_options: ConnectionLimitsOptions, + + /// The peer will make autonat client requests. #[clap(long, env, default_value = "true")] pub autonat_client: bool, - /// If set, the peer will act as an autonat server + /// Autonat client options. + #[command(flatten)] + pub autonat_client_options: AutonatClientOptions, + + /// Make peer act as an autonat server. #[clap(long, env)] pub autonat_server: bool, @@ -53,6 +86,23 @@ pub struct Options { #[clap(long, env, default_value = "true")] pub dcutr: bool, + /// Gossipsub options. + #[command(flatten)] + pub gossipsub_options: GossipsubOptions, + + /// Identify options. + #[command(flatten)] + pub identify_options: IdentifyOptions, + + /// Memory connection limits options. New inbound and outbound connections will be denied + /// when the threshold is reached. + #[command(flatten)] + pub memory_connection_limits_options: MemoryConnectionLimitsOptions, + + /// Ping options. + #[command(flatten)] + pub ping_options: PingOptions, + /// If set, the peer will not initialize the TUI and will run headless. #[clap(long, env)] pub headless: bool, @@ -61,6 +111,10 @@ pub struct Options { #[clap(long, env, default_value = "true")] pub kademlia: bool, + /// Kademlia options. + #[command(flatten)] + pub kademlia_options: KademliaOptions, + /// If set, the peer will support relay client connections (default: true) #[clap(long, env, default_value = "true")] pub relay_client: bool, @@ -68,4 +122,494 @@ pub struct Options { /// If set, the peer will act as a relay server #[clap(long, env)] pub relay_server: bool, + + /// Relay server options. + #[command(flatten)] + pub relay_server_options: RelayServerOptions, + + /// Request response options. + #[command(flatten)] + pub request_response_options: RequestResponseOptions, +} + +/// Connection limits options. +#[derive(Debug, Clone, Copy, Args)] +pub struct ConnectionLimitsOptions { + /// Configure the maximum number of concurrently incoming connections being established. + #[clap(long, env, default_value = "16")] + pub pending_incoming: u32, + + /// Configure the maximum number of concurrently outgoing connections being established. + #[clap(long, env, default_value = "8")] + pub pending_outgoing: u32, + + /// Configure the maximum number of concurrent established inbound connections. + #[clap(long, env, default_value = "25")] + pub established_incoming: u32, + + /// Configure the maximum number of concurrent established outbound connections. + #[clap(long, env, default_value = "25")] + pub established_outgoing: u32, + + /// Configure the maximum number of concurrent established connections (both inbound and + /// outbound). + #[clap(long, env, default_value = "50")] + pub max_established: u32, + + /// Configure the maximum number of concurrent established connections per peer, regardless of + /// direction (incoming or outgoing). + #[clap(long, env, default_value = "1")] + pub established_per_peer: u32, +} + +/// Autonat client options. +#[derive(Debug, Clone, Copy, Args)] +#[group(requires("autonat_client"))] +pub struct AutonatClientOptions { + /// Maximum number of candidate addresses your node will try to verify at once. + #[clap(long, env, default_value = "10")] + pub max_candidates: usize, + + /// Define the time between reachibility probes in seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "5")] + pub probe_interval: Duration, +} + +/// Determines if published messages should be signed or not. +#[derive(Debug, Clone, Args)] +pub struct GossipAuthenticityOptions { + /// Determines if published messages should be signed or not. + #[clap(long, env, value_enum, default_value_t = GossipAuthenticity::Signed)] + pub authenticity: GossipAuthenticity, + + /// Peer id to be used for authoring, local peer will be used by default. + #[clap(long, env, value_parser = parse_peer_id)] + pub peer_id: Option, + + /// Keypair to be used for signing, local key will be used by default. + #[clap(long, env, value_parser = parse_keypair)] + pub keypair: Option, +} +/// Gossipsub options. +#[derive(Debug, Clone, Args)] +pub struct GossipsubOptions { + /// Determines if published messages should be signed or not. + #[command(flatten)] + pub authenticity_options: GossipAuthenticityOptions, + + /// Number of heartbeats to keep in the memcache. + #[clap(long, env, default_value = "5")] + pub history_length: usize, + + /// Number of past heartbeats to gossip about. + #[clap(long, env, default_value = "3")] + pub history_gossip: usize, + + /// Target number of peers for the mesh network. + #[clap(long, env, default_value = "6")] + pub mesh_n: usize, + + /// Minimum number of peers in mesh network before adding more. + #[clap(long, env, default_value = "1")] + pub mesh_n_low: usize, + + /// Maximum number of peers in mesh network before removing some. + #[clap(long, env, default_value = "12")] + pub mesh_n_high: usize, + + /// Number of the retained peers that will be high-scoring, while the reminder are chosen + /// randomly. + #[clap(long, env, default_value = "4")] + pub retain_scores: usize, + + /// Minimum number of peers to emit gossip to during a heartbeat. + #[clap(long, env, default_value = "6")] + pub lazy: usize, + + /// Number of peers will emit gossip to at each heartbeat. "Max(gossip_factor * + /// (total_number_non_mesh_peers), gossip_lazy)". + #[clap(long, env, default_value = "0.25")] + pub factor: f64, + + /// Initial delay in each heartbeat in seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "5")] + pub heartbeat_initial_delay: Duration, + + /// Time between each heartbeat in seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "1")] + pub heartbeat_interval: Duration, + + /// The number of heartbeat ticks until we recheck the connection to explicit peers and + /// reconnecting if necessary. + #[clap(long, env, default_value = "300")] + pub check_explicit_peers_ticks: u64, + + /// Time to live for fanout peers in seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "60")] + pub fanout_ttl: Duration, + + /// The maximum byte size for each gossip in bytes. + #[clap(long, env, default_value = "2048")] + pub max_transmit_size: usize, + + /// Time period that messages are stored in the cache in seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "60")] + pub duplicate_cache_time: Duration, + + /// Allow message validation before propagation them to peers. + #[clap(long, env, default_value = "false")] + pub validate_messages: bool, + + /// Determines the level of validation used when receiving messages. + #[clap(long, env, value_parser = parse_validation_mode, default_value = "permissive")] + pub validation_mode: ValidationMode, + + /// How long a peer must wait before attempting to graft into out mesh again after being + /// pruned in seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "60")] + pub prune_backoff: Duration, + + /// How long to wait before resubscribing to the topic in seconds. + #[clap(long, env, default_value = "10")] + pub unsubscribe_backoff: u64, + + /// Number of heartbeat slots considered as slack for backoff. + #[clap(long, env, default_value = "1")] + pub backoff_slack: u32, + + /// Whether to do flood publishing or not. + #[clap(long, env, default_value = "true")] + pub flood_publish: bool, + + /// Time since the last PRUNE triggers penalty in seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "10")] + pub graft_flood_threshold: Duration, + + /// Minimum number of outbound peers in the mesh network before adding more. Should be smaller + /// or equal than "mesh_n / 2" and smaller than "mesh_n_low". + #[clap(long, env, default_value = "2")] + pub mesh_outbound_min: usize, + + /// Number of heartbeat ticks that specify the interval in which opportunistic grafting is + /// applied. + #[clap(long, env, default_value = "60")] + pub opportunistic_graft_ticks: u64, + + /// How many times we will allow a peer to request the same message id through IWANT gossip + /// before we start ignoring them. + #[clap(long, env, default_value = "60")] + pub gossip_retransimission: u32, + + /// The maximum number of new peers to graft to during opportunistic grafting. + #[clap(long, env, default_value = "2")] + pub opportunistic_graft_peers: usize, + + /// The maximum number of messages we will process in a given RPC. + #[clap(long, env)] + pub max_messages_per_rpc: Option, + + /// The maximum number of messages to include in an IHAVE message. + #[clap(long, env, default_value = "5000")] + pub max_ihave_length: usize, + + /// The maximum number of IHAVE messages to accept from a peer within a heartbeat. + #[clap(long, env, default_value = "10")] + pub max_ihave_messages: usize, + + /// Allow messages that are sent to us that has the same message source as we have specified + /// locally. + #[clap(long, env, default_value = "false")] + pub allow_self_origin: bool, + + /// Time to wait for a message requested through IWANT following an IHAVE advertisement in + /// seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "3")] + pub iwant_followup_time: Duration, + + /// Published message ids time cache duration in seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "10")] + pub published_message_ids_cache_time: Duration, + + /// The max number of messages handler can buffer. + #[clap(long, env, default_value = "5000")] + pub connection_handler_queue_len: usize, + + /// The duration a message to be published can wait to be sent before it is abandoned in + /// seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "5")] + pub publish_queue_duration: Duration, + + /// The duration a message to be forwarded can wait to be sent before it is abandoned in + /// seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "1")] + pub forward_queue_duration: Duration, + + /// The message size threshold for which IDONTWANT messages are sent in bytes. + #[clap(long, env, default_value = "1024")] + pub idontwant_message_size_threshold: usize, + + /// Send IDONTWANT messages after publishing message on gossip. + #[clap(long, env, default_value = "false")] + pub idontwant_on_publish: bool, +} + +/// Identify options. +#[derive(Debug, Clone, Args)] +pub struct IdentifyOptions { + /// The interval at which identification requests are sent to the remote on established + /// connections after the first request, i.e. the delay between identification requests. + #[clap(long, env, value_parser = parse_seconds, default_value = "300")] + pub identify_interval: Duration, + + /// Whether new or expired listen addresses of the local node should trigger an active push of + /// an identify message to all connected peers. + #[clap(long, env, default_value = "false")] + pub push_listen_addr_updates: bool, + + /// How many entries of discovered peers to keep before we discard the least-recent used one. + #[clap(long, env, default_value = "100")] + pub cache_size: usize, + + /// Whether to include our listen addresses in our responses. If enabled, we will effectively + /// only share our external addresses. + #[clap(long, env, default_value = "false")] + pub hide_listen_addrs: bool, +} + +/// New inbound and outbound connections will be denied when the threshold is reached. +#[derive(Debug, Clone, Args)] +#[group(multiple = false)] +pub struct MemoryConnectionLimitsOptions { + /// Sets the process memory usage threshold in the percentage of the total physical memory. + #[clap(long, env, default_value = "0.9", conflicts_with = "max_bytes")] + pub max_percentage: f64, + + /// Sets the process memory usage threshold in absolute bytes. + #[clap(long, env)] + pub max_bytes: Option, +} + +/// Ping options. +#[derive(Debug, Clone, Args)] +pub struct PingOptions { + /// Sets the ping timeout. + #[clap(long, env, value_parser = parse_seconds, default_value = "20")] + pub ping_timeout: Duration, + + /// Sets the ping interval. + #[clap(long, env, value_parser = parse_seconds, default_value = "15")] + pub ping_interval: Duration, +} + +/// Kademlia options. +#[derive(Debug, Clone, Args)] +#[group(requires = "kademlia")] +pub struct KademliaOptions { + /// Sets the timeout for a single query in seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "60")] + pub query_timeout: Duration, + + /// Sets the replication factor to use. + #[clap(long, env, value_parser = parse_non_zero_usize, default_value = "20")] + pub replication_factor: NonZero, + + /// Sets the allowed level of parallelismfor iterative queries. + #[clap(long, env, value_parser = parse_non_zero_usize, default_value = "3")] + pub parallelism: NonZero, + + /// Require iterative queries to use disjoint paths for increased resiliency in the presence of + /// potentially adversarial nodes. When enabled the number of disjoint paths used equals to + /// configured parallelism. + #[clap(long, env, default_value = "false")] + pub disjoint_query_paths: bool, + + /// Sets the TTL for stored records in seconds. The TTL should be significantly longer than the + /// (re-)publication interval, to avoid premature expiration of records. None means records + /// never expired. + #[clap(long, env, value_parser = parse_seconds, default_value = "172800")] + pub record_ttl: Option, + + /// Sets whether or not records should be filtered before being stored. + #[clap(long, env, value_parser = parse_store_inserts, default_value = "unfiltered")] + pub record_filtering: StoreInserts, + + /// Sets the (re-)plication interval for stored seconds. Interval should be significantly + /// shorter than the publication interval, to ensure persistence between re-publications. + #[clap(long, env, value_parser = parse_seconds, default_value = "3600")] + pub record_replication_interval: Option, + + /// Sets the (re-)publication interval of stored records in seconds. This interval should be significantly + /// shorter than the record TTL, to ensure records do not expire prematurely. None means that + /// stored records are never automatically re-published. + #[clap(long, env, value_parser = parse_seconds, default_value = "79200")] + pub record_publication_interval: Option, + + /// Sets the TTL for provider records. None means that stored provider records never expire. + /// Must be significantly larger than the provider publication interval. + #[clap(long, env, value_parser = parse_seconds, default_value = "172800")] + pub provider_record_ttl: Option, + + /// Sets the interval at which provider records for keys provided by the local node are + /// re-published. None means that stored provider records are never automatically re-published. + /// Must be significantly less than the provider record TTL. + #[clap(long, env, value_parser = parse_seconds, default_value = "43200")] + pub provider_publication_interval: Option, + + /// Modifies the timeout duration of outbound substreams in seconds. + #[clap(long, env, value_parser = parse_seconds, default_value = "10")] + pub substreams_timeout: Duration, + + /// Sets the k-bucket insertation strategy for the Kademlia routing table. + #[clap(long, env, value_parser = parse_bucket_inserts, default_value = "onconnected")] + pub kbuckets_inserts: BucketInserts, + + /// Sets the caching strategy to use for succesful lookups. + #[clap(long, env, value_parser = parse_caching, default_value = "1")] + pub caching: Caching, + + /// Sets the interval in seconds on which bootstrap behavious is called periodically. + #[clap(long, env, value_parser = parse_seconds, default_value = "300")] + pub periodic_bootstrap_interval: Option, + + /// Sets the configuration for the k-buckets. Setting a size higher that 20 may imply + /// additional memory allocations. + #[clap(long, env, value_parser = parse_non_zero_usize, default_value = "20")] + pub kbucket_size: NonZero, + + /// Sets the timeout duration after creation of a pending entry after which it becomes eligible + /// for insertation into a full bucket, replacing the least-recent (dis)connected node. + #[clap(long, env, value_parser = parse_seconds, default_value = "60")] + pub kbucket_timeout: Duration, +} + +/// Relay server options. +#[derive(Debug, Clone, Args)] +#[group(requires = "relay_server")] +pub struct RelayServerOptions { + /// The total number of unique peers relay can hold reservations for at any one time. Once + /// reached, new peers cannot use node as a relay. + #[clap(long, env, default_value = "128")] + pub max_reservations: usize, + + /// Limits how many separate reservations a single PeerId can hold. + #[clap(long, env, default_value = "4")] + pub max_reservations_per_peer: usize, + + /// How long a reservation lasts before the peer must renew it. + #[clap(long, env, value_parser = parse_seconds, default_value = "3600")] + pub reservation_duration: Duration, + + /// The total number of active data tunnels node will allow at once. This prevents node from + /// being overwhelmed by traffic. + #[clap(long, env, default_value = "16")] + pub max_circuits: usize, + + /// The maximum number of active tunnels allowed for any single PeerId. + #[clap(long, env, default_value = "4")] + pub max_circuits_per_peer: usize, + + /// A time limit for a single data session in seconds. If the connection stays open longer than + /// this, the relay will force-close it to free up resources. + #[clap(long, env, value_parser = parse_seconds, default_value = "120")] + pub max_circuit_duration: Duration, + + /// A data cap per circuit in bytes. Once this many bytes have been transferred (uplink + + /// downlink), the relay terminates the connection. + #[clap(long, env, default_value = "524288")] + pub max_circuit_bytes: u64, +} + +/// Request response options. +#[derive(Debug, Clone, Args)] +pub struct RequestResponseOptions { + /// Sets the timeout for inbound and outbound requests. + #[clap(long, env, value_parser = parse_seconds, default_value = "10")] + pub request_timeout: Duration, + + /// Sets the upper bound for the number of concurrent inbound + outbound streams. + #[clap(long, env, default_value = "100")] + pub concurrent_streams: usize, +} + +fn parse_seconds(arg: &str) -> Result { + arg.parse::() + .map(Duration::from_secs) + .map_err(|_| format!("{} is not a valid number for seconds", arg)) +} + +//fn parse_seconds(arg: &str) -> Result, String> { +// match arg.to_lowercase().as_str() { +// "" | "none" => Ok(None), +// secs => { +// let duration = parse_seconds(secs)?; +// Ok(Some(duration)) +// } +// } +//} + +fn parse_non_zero_usize(arg: &str) -> Result, String> { + arg.parse::() + .map_err(|_| format!("{} is not a valid number for usize", arg)) + .and_then(|value| { + NonZero::new(value).ok_or_else(|| format!("value for usize should be greater than 0")) + }) +} + +fn parse_store_inserts(arg: &str) -> Result { + match arg.to_lowercase().as_str() { + "unfiltered" => Ok(StoreInserts::Unfiltered), + "filter" | "filterboth" | "filter-both" | "filter_both" => Ok(StoreInserts::FilterBoth), + _ => Err(format!( + "{} is not valid filter strategy, possible values are: unfiltered and filterboth", + arg + )), + } +} + +fn parse_bucket_inserts(arg: &str) -> Result { + match arg.to_lowercase().as_str() { + "manual" => Ok(BucketInserts::Manual), + "onconnected" | "on-connected" | "on_connected" => Ok(BucketInserts::OnConnected), + _ => Err(format!( + "{} is not valid insert strategy, possible values are: manual and onconnected", + arg + )), + } +} +fn parse_caching(arg: &str) -> Result { + match arg.to_lowercase().as_str() { + "manual" => Ok(Caching::Disabled), + _ => arg + .parse::() + .map(|max_peers| Caching::Enabled { max_peers }) + .map_err(|_| { + format!( + "{} is not valid insert strategy, possible values are: manual and onconnected", + arg + ) + }), + } +} + +fn parse_validation_mode(arg: &str) -> Result { + match arg.to_lowercase().as_str() { + "strict" => Ok(ValidationMode::Strict), + "permissive" => Ok(ValidationMode::Permissive), + "anonymous" => Ok(ValidationMode::Anonymous), + "none" => Ok(ValidationMode::None), + _ => Err(format!( + "{} is not valid mode, possible values are: strict, permissive, anonymous, none", + arg + )), + } +} + +fn parse_peer_id(arg: &str) -> Result { + let bytes = hex::decode(arg).map_err(|e| e.to_string())?; + PeerId::from_bytes(&bytes).map_err(|e| e.to_string()) +} + +fn parse_keypair(arg: &str) -> Result { + let bytes = hex::decode(arg).map_err(|e| e.to_string())?; + Keypair::from_protobuf_encoding(&bytes).map_err(|e| e.to_string()) } diff --git a/src/peer.rs b/src/peer.rs index 1b51bc7..ad3c0bb 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,7 +1,6 @@ use crate::{ - decode_unknown_protobuf, ipaddr_to_multiaddr, is_private_ip, pretty_print_fields, - proto::Peer as DiscoveredPeer, split_peer_id, ChatPeer, Codec as FileExchangeCodec, Message, - Options, Request as FileRequest, + ipaddr_to_multiaddr, is_private_ip, options::GossipAuthenticity, proto::Peer as DiscoveredPeer, + split_peer_id, ChatPeer, Codec as FileExchangeCodec, Message, Options, Request as FileRequest, }; use clap::Parser; use futures::StreamExt; @@ -22,11 +21,13 @@ use libp2p::{ identity::{self, PublicKey}, kad::{ store::MemoryStore, AddProviderOk, Behaviour as Kademlia, Config as KademliaConfig, - Event as KademliaEvent, GetClosestPeersOk, GetProvidersOk, QueryId, QueryResult, RecordKey, + Event as KademliaEvent, GetClosestPeersOk, GetProvidersOk, GetRecordOk, QueryId, + QueryResult, RecordKey, }, memory_connection_limits::Behaviour as MemoryConnectionLimits, multiaddr::{Multiaddr, Protocol}, noise::Config as NoiseConfig, + ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent}, relay::{ client::{Behaviour as RelayClient, Event as RelayClientEvent}, Behaviour as RelayServer, Config as RelayServerConfig, Event as RelayServerEvent, @@ -41,8 +42,6 @@ use libp2p::{ yamux::Config as YamuxConfig, PeerId, StreamProtocol, SwarmBuilder, }; -use libp2p_webrtc as webrtc; -use libp2p_webrtc::tokio::Certificate; use quick_protobuf::{BytesReader, MessageRead}; use rand_core::OsRng; use std::{ @@ -56,6 +55,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; // Universal connectivity agent string +// bootstrap_nodes: opts.boo const UNIVERSAL_CONNECTIVITY_AGENT: &str = "universal-connectivity/0.1.0"; // Protocol Names @@ -69,8 +69,6 @@ const GOSSIPSUB_CHAT_TOPIC: &str = "universal-connectivity"; const GOSSIPSUB_CHAT_FILE_TOPIC: &str = "universal-connectivity-file"; const GOSSIPSUB_PEER_DISCOVERY: &str = "universal-connectivity-browser-peer-discovery"; -// Kademlia bootstrap interval -const KADEMLIA_BOOTSTRAP_INTERVAL: u64 = 300; const IPFS_BOOTSTRAP_NODES: [&str; 4] = [ "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", @@ -89,6 +87,7 @@ struct Behaviour { identify: Identify, kademlia: Toggle>, memory_connection_limits: MemoryConnectionLimits, + ping: Ping, relay_client: Toggle, relay_server: Toggle, request_response: RequestResponse, @@ -118,13 +117,17 @@ pub struct Peer { get_providers_query_id: Option, /// The query id for getting the closest peers to the universal connectivity agent string get_closest_peers_query_id: HashSet, + /// The permanent, publicly reachable peers that serve as the entry point for new nodes joining + /// a network. + bootstrap_nodes: Vec, + /// Vector of topics to be listen on. + subscribed_topics: HashSet, } impl Peer { /// Create a new Peer instance by initializing the swarm and peer state pub async fn new( keypair: identity::Keypair, - tls_cert: Certificate, to_ui: Sender, from_ui: Receiver, shutdown: CancellationToken, @@ -134,12 +137,6 @@ impl Peer { let mut listen_addresses = HashSet::new(); for addr in opt.listen_addresses.iter() { - // add the WebRTC address - listen_addresses.insert( - ipaddr_to_multiaddr(addr) - .with(Protocol::Udp(opt.webrtc_port)) - .with(Protocol::WebRTCDirect), - ); // add the QUIC address listen_addresses.insert( ipaddr_to_multiaddr(addr) @@ -158,122 +155,223 @@ impl Peer { // keep them as Strings because they can be PeerId's or Multiaddr's let to_dial = opt.connect; + // topics to be listen on after start + let subscribed_topics: HashSet = opt + .topics + .map(|topics| topics.into_iter().collect()) + .unwrap_or_else(|| { + let mut set = HashSet::new(); + set.insert(GOSSIPSUB_CHAT_TOPIC.to_string()); + set.insert(GOSSIPSUB_CHAT_FILE_TOPIC.to_string()); + set.insert(GOSSIPSUB_PEER_DISCOVERY.to_string()); + set + }); + + // provided list of boostrap bootnodes + let bootstrap_nodes = opt.bootnodes.unwrap_or_else(|| { + IPFS_BOOTSTRAP_NODES + .iter() + .filter_map(|s| s.parse().ok()) + .collect() + }); + // initialize the swarm let swarm = { let local_peer_id = PeerId::from(keypair.public()); debug!("Local peer id: {local_peer_id}"); // Initialize the autonat client behaviour - let autonat_client = if opt.autonat_client { - let cfg = AutonatClientConfig::default(); - Some(AutonatClient::new(OsRng, cfg)) - } else { - None - } - .into(); + let autonat_client = opt + .autonat_client + .then(|| { + let cfg = AutonatClientConfig::default() + .with_max_candidates(opt.autonat_client_options.max_candidates) + .with_probe_interval(opt.autonat_client_options.probe_interval); + AutonatClient::new(OsRng, cfg) + }) + .into(); - // Initialize the autonat server behaviour - let autonat_server = if opt.autonat_server { - Some(AutonatServer::new(OsRng)) - } else { - None - } - .into(); + // initialize the autonat server behaviour + let autonat_server = opt.autonat_server.then(|| AutonatServer::new(OsRng)).into(); - // Create the ConnectionLimits behaviour + // create the ConnectionLimits behaviour let connection_limits = { let cfg = connection_limits::ConnectionLimits::default() - .with_max_pending_incoming(Some(100)) - .with_max_pending_outgoing(Some(100)) - .with_max_established_per_peer(Some(10)) - .with_max_established(Some(1000)); + .with_max_pending_incoming(Some(opt.connection_limits_options.pending_incoming)) + .with_max_pending_outgoing(Some(opt.connection_limits_options.pending_outgoing)) + .with_max_established_incoming(Some( + opt.connection_limits_options.established_incoming, + )) + .with_max_established_outgoing(Some( + opt.connection_limits_options.established_outgoing, + )) + .with_max_established_per_peer(Some( + opt.connection_limits_options.established_per_peer, + )) + .with_max_established(Some(opt.connection_limits_options.max_established)); ConnectionLimits::new(cfg) }; - // Create the Dcutr behaviour - let dcutr = if opt.dcutr { - Some(Dcutr::new(local_peer_id)) - } else { - None - } - .into(); + // create the Dcutr behaviour + let dcutr = opt.dcutr.then(|| Dcutr::new(local_peer_id)).into(); - // Create a gossipsub behaviour + // create a gossipsub behaviour let gossipsub = { - // This closure creates a unique message id for each message by hashing its contents + // this closure creates a unique message id for each message by hashing its contents let message_id_fn = |message: &GossipsubMessage| { let mut s = DefaultHasher::new(); message.data.hash(&mut s); GossipsubMessageId::from(s.finish().to_string()) }; - // Set a custom gossipsub configuration + // set a custom gossipsub configuration let gossipsub_config = gossipsub::ConfigBuilder::default() - // This sets the kind of message validation. The default is Strict (enforce message signing) - .validation_mode(gossipsub::ValidationMode::Permissive) - // This ensures no two messages of the same content will be propagated. .message_id_fn(message_id_fn) - .mesh_outbound_min(1) - .mesh_n_low(1) - .flood_publish(true) + .history_length(opt.gossipsub_options.history_length) + .history_gossip(opt.gossipsub_options.history_gossip) + .mesh_n(opt.gossipsub_options.mesh_n) + .mesh_n_low(opt.gossipsub_options.mesh_n_low) + .mesh_n_high(opt.gossipsub_options.mesh_n_high) + .retain_scores(opt.gossipsub_options.retain_scores) + .gossip_lazy(opt.gossipsub_options.lazy) + .gossip_factor(opt.gossipsub_options.factor) + .heartbeat_initial_delay(opt.gossipsub_options.heartbeat_initial_delay) + .check_explicit_peers_ticks(opt.gossipsub_options.check_explicit_peers_ticks) + .fanout_ttl(opt.gossipsub_options.fanout_ttl) + .max_transmit_size(opt.gossipsub_options.max_transmit_size) + .duplicate_cache_time(opt.gossipsub_options.duplicate_cache_time) + .validation_mode(opt.gossipsub_options.validation_mode) + .prune_backoff(opt.gossipsub_options.prune_backoff) + .unsubscribe_backoff(opt.gossipsub_options.unsubscribe_backoff) + .backoff_slack(opt.gossipsub_options.backoff_slack) + .flood_publish(opt.gossipsub_options.flood_publish) + .graft_flood_threshold(opt.gossipsub_options.graft_flood_threshold) + .mesh_outbound_min(opt.gossipsub_options.mesh_outbound_min) + .opportunistic_graft_ticks(opt.gossipsub_options.opportunistic_graft_ticks) + .gossip_retransimission(opt.gossipsub_options.gossip_retransimission) + .opportunistic_graft_peers(opt.gossipsub_options.opportunistic_graft_peers) + .max_messages_per_rpc(opt.gossipsub_options.max_messages_per_rpc) + .max_ihave_length(opt.gossipsub_options.max_ihave_length) + .max_ihave_messages(opt.gossipsub_options.max_ihave_messages) + .allow_self_origin(opt.gossipsub_options.allow_self_origin) + .iwant_followup_time(opt.gossipsub_options.iwant_followup_time) + .idontwant_on_publish(opt.gossipsub_options.idontwant_on_publish) + .idontwant_message_size_threshold( + opt.gossipsub_options.idontwant_message_size_threshold, + ) + .forward_queue_duration(opt.gossipsub_options.forward_queue_duration) + .publish_queue_duration(opt.gossipsub_options.publish_queue_duration) + .connection_handler_queue_len( + opt.gossipsub_options.connection_handler_queue_len, + ) + .published_message_ids_cache_time( + opt.gossipsub_options.published_message_ids_cache_time, + ) .build() - .expect("Valid config"); + .expect("gossipsub config should be valid"); + + let auth_opts = opt.gossipsub_options.authenticity_options; + let gossipsub_authenticity = match auth_opts.authenticity { + GossipAuthenticity::Signed => gossipsub::MessageAuthenticity::Signed( + auth_opts.keypair.unwrap_or(keypair.clone()), + ), + GossipAuthenticity::Author => gossipsub::MessageAuthenticity::Author( + auth_opts.peer_id.unwrap_or(local_peer_id), + ), + GossipAuthenticity::RandomAuthor => { + gossipsub::MessageAuthenticity::RandomAuthor + } + GossipAuthenticity::Anonymous => gossipsub::MessageAuthenticity::Anonymous, + }; // build a gossipsub network behaviour - Gossipsub::new( - gossipsub::MessageAuthenticity::Signed(keypair.clone()), - gossipsub_config, - ) - .expect("Correct configuration") + Gossipsub::new(gossipsub_authenticity, gossipsub_config) + .expect("gossipsub could not be instantiated") }; - // Create an Identify behaviour + // create an Identify behaviour let identify = { - let cfg = IdentifyConfig::new( - IPFS_IDENTIFY_PROTOCOL_NAME.to_string(), // bug: https://github.com/libp2p/rust-libp2p/issues/5940 - keypair.public(), - ) - .with_agent_version(UNIVERSAL_CONNECTIVITY_AGENT.to_string()); + let cfg = + IdentifyConfig::new(IPFS_IDENTIFY_PROTOCOL_NAME.to_string(), keypair.public()) + .with_agent_version(UNIVERSAL_CONNECTIVITY_AGENT.to_string()) + .with_interval(opt.identify_options.identify_interval) + .with_push_listen_addr_updates( + opt.identify_options.push_listen_addr_updates, + ) + .with_cache_size(opt.identify_options.cache_size) + .with_hide_listen_addrs(opt.identify_options.hide_listen_addrs); Identify::new(cfg) }; - // Create a Kademlia behaviour - let kademlia: Toggle> = if opt.kademlia { - let mut cfg = KademliaConfig::new(IPFS_KADEMLIA_PROTOCOL_NAME); - cfg.set_query_timeout(Duration::from_secs(60)); - cfg.set_periodic_bootstrap_interval(Some(Duration::from_secs( - KADEMLIA_BOOTSTRAP_INTERVAL, - ))); - let store = MemoryStore::new(local_peer_id); - Some(Kademlia::with_config(local_peer_id, store, cfg)) - } else { - None - } - .into(); + // create ping behaviour + let ping = { + let cfg = PingConfig::new() + .with_interval(opt.ping_options.ping_interval) + .with_timeout(opt.ping_options.ping_timeout); + Ping::new(cfg) + }; - // Create the MemoryConnectionLimits behaviour - let memory_connection_limits = MemoryConnectionLimits::with_max_percentage(0.9); + // create a Kademlia behaviour + let kademlia: Toggle> = opt + .kademlia + .then(|| { + let kad_opts = opt.kademlia_options; + let mut cfg = KademliaConfig::new(IPFS_KADEMLIA_PROTOCOL_NAME); + cfg.set_query_timeout(kad_opts.query_timeout); + cfg.set_replication_factor(kad_opts.replication_factor); + cfg.set_parallelism(kad_opts.parallelism); + cfg.disjoint_query_paths(kad_opts.disjoint_query_paths); + cfg.set_record_ttl(kad_opts.record_ttl); + cfg.set_record_filtering(kad_opts.record_filtering); + cfg.set_replication_interval(kad_opts.record_replication_interval); + cfg.set_publication_interval(kad_opts.record_publication_interval); + cfg.set_provider_record_ttl(kad_opts.provider_record_ttl); + cfg.set_provider_publication_interval(kad_opts.provider_publication_interval); + cfg.set_substreams_timeout(kad_opts.substreams_timeout); + cfg.set_kbucket_inserts(kad_opts.kbuckets_inserts); + cfg.set_periodic_bootstrap_interval(kad_opts.periodic_bootstrap_interval); + cfg.set_kbucket_size(kad_opts.kbucket_size); + cfg.set_kbucket_pending_timeout(kad_opts.kbucket_timeout); + + let store = MemoryStore::new(local_peer_id); + Kademlia::with_config(local_peer_id, store, cfg) + }) + .into(); + + // create the MemoryConnectionLimits behaviour + let mem_conn_limits = opt.memory_connection_limits_options; + let memory_connection_limits = if let Some(max_bytes) = mem_conn_limits.max_bytes { + MemoryConnectionLimits::with_max_bytes(max_bytes) + } else { + MemoryConnectionLimits::with_max_percentage(mem_conn_limits.max_percentage) + }; // Create the RelayServer behaviour - let relay_server = if opt.relay_server { - let cfg = RelayServerConfig { - max_reservations: usize::MAX, - max_reservations_per_peer: 100, - reservation_rate_limiters: Vec::default(), - circuit_src_rate_limiters: Vec::default(), - max_circuits: usize::MAX, - max_circuits_per_peer: 100, - ..Default::default() - }; - Some(RelayServer::new(local_peer_id, cfg)) - } else { - None - } - .into(); + let relay_server = opt + .relay_server + .then(|| { + let relay_opt = opt.relay_server_options; + let cfg = RelayServerConfig { + max_reservations: relay_opt.max_reservations, + max_reservations_per_peer: relay_opt.max_reservations_per_peer, + reservation_duration: relay_opt.reservation_duration, + reservation_rate_limiters: Vec::default(), + + max_circuits: relay_opt.max_circuits, + max_circuits_per_peer: relay_opt.max_circuits_per_peer, + max_circuit_duration: relay_opt.max_circuit_duration, + max_circuit_bytes: relay_opt.max_circuit_bytes, + circuit_src_rate_limiters: Vec::default(), + }; + RelayServer::new(local_peer_id, cfg) + }) + .into(); // Create the RequestResponse behaviour let request_response = { - let cfg = RequestResponseConfig::default(); + let cfg = RequestResponseConfig::default() + .with_request_timeout(opt.request_response_options.request_timeout) + .with_max_concurrent_streams(opt.request_response_options.concurrent_streams); RequestResponse::new([(FILE_EXCHANGE_PROTOCOL_NAME, ProtocolSupport::Full)], cfg) }; @@ -287,26 +385,21 @@ impl Peer { identify, kademlia, memory_connection_limits, + ping, relay_client: None.into(), relay_server, request_response, }; - // Build the swarm + // build the swarm let sb = SwarmBuilder::with_existing_identity(keypair.clone()) .with_tokio() .with_tcp( TcpConfig::new().nodelay(true), - (TlsConfig::new, NoiseConfig::new), // passes the keypair to the constructors + (TlsConfig::new, NoiseConfig::new), YamuxConfig::default, )? .with_quic() - .with_other_transport(|id_keys| { - Ok(webrtc::tokio::Transport::new( - id_keys.clone(), - tls_cert.clone(), - )) - })? .with_dns()?; // if we are to be a relay client, add the relay client behaviour @@ -330,6 +423,8 @@ impl Peer { from_ui, shutdown, swarm, + bootstrap_nodes, + subscribed_topics, bootstrap_query_id: None, start_providing_query_id: None, get_providers_query_id: None, @@ -356,7 +451,7 @@ impl Peer { /// Run the Peer pub async fn run(&mut self) -> anyhow::Result<()> { - // Listen on the given addresses + // listen on the given addresses let addrs: Vec = self.listen_addresses.iter().cloned().collect(); for addr in addrs.iter() { if let Err(e) = self.swarm.listen_on(addr.clone()) { @@ -364,21 +459,21 @@ impl Peer { } } - // Set the external address if passed in + // set the external address if passed in let addrs: Vec = self.external_addresses.drain().collect(); for addr in addrs.iter() { self.update_external_address(addr).await?; } - // Dial the given addresses...they can be PeerId's or Multiaddr's + // dial the given addresses, they can be PeerId's or Multiaddr's for addr in self.to_dial.clone().iter() { if let Ok(addr) = addr.parse::() { // attempt to dial the address - if let Err(e) = self.swarm.dial(addr.clone()) { - self.msg(format!("Failed to dial {addr}: {e}")).await?; - } else { - self.msg(format!("Dialed {addr}")).await?; + match self.swarm.dial(addr.clone()) { + Ok(_) => self.msg(format!("Successfully dialed {addr}")), + Err(e) => self.msg(format!("Failed to dial {addr}: {e}")), } + .await?; // add the address to the kademlia routing table if it is enabled if let Some((multiaddr, peerid)) = split_peer_id(addr) { @@ -386,26 +481,22 @@ impl Peer { kad.add_address(&peerid, multiaddr); } } - } else if let Ok(addr) = addr.parse::() { + } + + if let Ok(addr) = addr.parse::() { // attempt to dial the address - if let Err(e) = self.swarm.dial(addr) { - self.msg(format!("Failed to dial {addr}: {e}")).await?; - } else { - self.msg(format!("Dialed {addr}")).await?; + match self.swarm.dial(addr) { + Ok(_) => self.msg(format!("Successfully dialed {addr}")), + Err(e) => self.msg(format!("Failed to dial {addr}: {e}")), } - } else { - self.msg(format!("Failed to parse {addr}")).await?; + .await?; } } // initiate a bootstrap of kademlia if it is enabled if let Some(ref mut kad) = self.swarm.behaviour_mut().kademlia.as_mut() { // parse the bootstrap multiaddrs - let bootstrappers: Vec = IPFS_BOOTSTRAP_NODES - .iter() - .filter_map(|s| s.parse().ok()) - .collect(); - for addr in bootstrappers.iter() { + for addr in self.bootstrap_nodes.iter() { if let Some((multiaddr, peerid)) = split_peer_id(addr.clone()) { kad.add_address(&peerid, multiaddr); } @@ -419,54 +510,47 @@ impl Peer { } Err(e) => { self.msg(format!("Failed to bootstrap Kademlia: {e}")) - .await?; - self.msg(format!( - "Don't worry, it will try again in {KADEMLIA_BOOTSTRAP_INTERVAL} seconds" - )) - .await?; + .await? } } } - // Initialize the gossipsub topics, the hashes are the same as the topic names + // initialize the gossipsub topics, the hashes are the same as the topic names let chat_topic = GossipsubIdentTopic::new(GOSSIPSUB_CHAT_TOPIC); let file_topic = GossipsubIdentTopic::new(GOSSIPSUB_CHAT_FILE_TOPIC); let peer_discovery = GossipsubIdentTopic::new(GOSSIPSUB_PEER_DISCOVERY); - // Subscribe to the gossipsub topics - info!("Subscribing to topics"); - for topic in [ - chat_topic.clone(), - file_topic.clone(), - peer_discovery.clone(), - ] { - if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) { + // subscribe to the gossipsub topics + info!("Subscribing to {} topics.", self.subscribed_topics.len()); + for topic in self.subscribed_topics.iter() { + let ident_topic = GossipsubIdentTopic::new(topic); + if let Err(e) = self.swarm.behaviour_mut().gossipsub.subscribe(&ident_topic) { debug!("Failed to subscribe to topic {topic}: {e}"); } } - // Create our loop ticker + // create our loop ticker let mut tick = tokio::time::interval(Duration::from_millis(18)); - // Run the main loop + // run the main loop loop { // process messages from the UI if let Ok(message) = self.from_ui.try_recv() { match message { - Message::Chat { data, .. } => { - error!("chat received"); + Message::Chat { data, from } => { + debug!("New message from {:?} received", from); match self .swarm .behaviour_mut() .gossipsub .publish(chat_topic.hash(), data) { - Err(e) => debug!("Failed to publish chat message: {e}"), + Err(e) => debug!("Failed to publish chat message from {:?}: {e}", from), _ => self.msg("Sent chat message from you".to_string()).await?, } } Message::AllPeers { .. } => { - error!("all peers received"); + debug!("All peers received"); let peers = self .swarm .behaviour() @@ -479,6 +563,34 @@ impl Peer { .collect(); self.to_ui.send(Message::AllPeers { peers }).await?; } + Message::AddTopic(topic) => { + let ident_topic = GossipsubIdentTopic::new(&topic); + match self.swarm.behaviour_mut().gossipsub.subscribe(&ident_topic) { + Ok(true) => { + self.subscribed_topics.insert(topic); + debug!("Subscribed to new topic '{ident_topic}'"); + } + Ok(false) => error!("Already subscribe to the topic '{topic}'"), + Err(error) => error!( + "Error occured during topic subscribion: {:#}", + anyhow::Error::from(error) + ), + } + } + Message::RemoveTopic(topic) => { + let ident_topic = GossipsubIdentTopic::new(&topic); + if self + .swarm + .behaviour_mut() + .gossipsub + .unsubscribe(&ident_topic) + { + self.subscribed_topics.remove(&topic); + debug!("Unsubscribed from the topic '{topic}'"); + } else { + error!("Unsubscribed unexisting topic '{ident_topic}'"); + } + } _ => { debug!("Unhandled message: {:?}", message); } @@ -488,97 +600,149 @@ impl Peer { tokio::select! { _ = self.shutdown.cancelled() => { info!("Unsubscribing from topics"); - // Subscribe to the gossipsub topics + // subscribe to the gossipsub topics for topic in &[chat_topic, file_topic, peer_discovery] { if !self.swarm.behaviour_mut().gossipsub.unsubscribe(topic) { debug!("Failed to unsubscribe from topic {topic}"); } } - info!("Shutting down the peer"); + info!("Shutting down the node peer"); break; } _ = tick.tick() => {} Some(event) = self.swarm.next() => match event { + // when a connection to a peer is closed + SwarmEvent::ConnectionClosed { peer_id, cause, num_established, connection_id, .. } => { + debug!("Connection to {peer_id} on {connection_id} closed remaining {num_established}: {cause:?}"); + self.to_ui.send(Message::RemovePeer{chat_peer: peer_id.into(), topic: None}).await?; - // When the swarm in initiates a dial - SwarmEvent::Dialing { peer_id, .. } => { - let peer_id = peer_id.map_or("Unknown".to_string(), |peer_id| peer_id.to_string()); - debug!("Dialing {peer_id}"); + if let Some(ref mut kad) = self.swarm.behaviour_mut().kademlia.as_mut() { + if num_established == 0 { + kad.remove_peer(&peer_id); + } + debug!("Removed {peer_id} from the routing table (if it was in there)."); + } } - // When we have confirmed our external address + // when we successfully connect to a peer + SwarmEvent::ConnectionEstablished { peer_id, connection_id, established_in, .. } => { + debug!("Connected to {peer_id} on connection {connection_id} in {established_in:?}"); + } + + // when the swarm in initiates a dial + SwarmEvent::Dialing { peer_id, connection_id } => { + let peer_id = peer_id.map_or("Unknown".to_string(), |peer_id| peer_id.to_string()); + debug!("Dialing {peer_id} on connection {connection_id}"); + } + + // when a listener has reported the expiration of a listening address + SwarmEvent::ExpiredListenAddr { listener_id, address } => { + debug!("Connection expired for {address} with {listener_id}"); + } + + // when we have confirmed our external address SwarmEvent::ExternalAddrConfirmed { address } => { let p2p_address = address .clone() .with(Protocol::P2p(*self.swarm.local_peer_id())); - self.msg(format!("Confirmed external address: {p2p_address}")).await?; + self.msg(format!("Confirmed external address for current node: {p2p_address}")).await?; } - // When we successfully listen on an address + // when an external address of the local node expired, i.e. is no-longer + // confirmed + SwarmEvent::ExternalAddrExpired { address } => { + let p2p_address = address + .clone() + .with(Protocol::P2p(*self.swarm.local_peer_id())); + self.msg(format!("External address for current node is no-longer confirmed: {p2p_address}")).await?; + } + + // when a new connection arrived on a listener and is in the process of + // protocol negotiation. + SwarmEvent::IncomingConnection { connection_id, local_addr, .. } => { + debug!("New connection {connection_id} arrived on a local listener {local_addr}"); + } + + // when we fail to accept a connection from a peer + SwarmEvent::IncomingConnectionError { error, connection_id, peer_id, .. } => { + let peer_id = peer_id.map_or("Unknown".to_string(), |peer_id| peer_id.to_string()); + debug!("Failed to handle incoming connection {connection_id} from {peer_id}: {:#}", anyhow::Error::from(error)) + } + + // when a one of the listeners gracefully closed + SwarmEvent::ListenerClosed { listener_id, addresses, .. } => { + let len = addresses.len(); + debug!("Listener {listener_id} gracefully closed with {len} connections"); + } + + // when on of the listeners reported a non-fatal error + SwarmEvent::ListenerError { listener_id, error } => { + debug!("Listener {listener_id} reported a non-fatal error: {:#}", anyhow::Error::from(error)); + } + + // when we've discovered a new candidate for an external address for us + SwarmEvent::NewExternalAddrCandidate { address } => { + debug!("New external peer candidate found {address}"); + } + + // when we've discovered a new address of a peer + SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => { + debug!("New address {address} for peer {peer_id} is discovered"); + } + + // when we successfully listen on an address SwarmEvent::NewListenAddr { address, .. } => { let p2p_address = address .clone() .with(Protocol::P2p(*self.swarm.local_peer_id())); - self.msg(format!("Listening on {p2p_address}")) - .await?; + self.msg(format!("New listening address on {p2p_address}")).await?; } - // When we successfully connect to a peer - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - debug!("Connected to {peer_id}"); + // when we fail to connect to a peer, + SwarmEvent::OutgoingConnectionError { peer_id, error, connection_id } => { + let peer_id = peer_id.map_or("Unknown".to_string(), |peer_id| peer_id.to_string()); + debug!("Failed to dial {peer_id} on connection {connection_id}: {error}"); } - // When we fail to connect to a peer - SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - warn!("Failed to dial {peer_id:?}: {error}"); - } - - // When we fail to accept a connection from a peer - SwarmEvent::IncomingConnectionError { error, .. } => { - warn!("{:#}", anyhow::Error::from(error)) - } - - // When a connection to a peer is closed - SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { - warn!("Connection to {peer_id} closed: {cause:?}"); - self.to_ui.send(Message::RemovePeer(peer_id.into())).await?; - - if let Some(ref mut kad) = self.swarm.behaviour_mut().kademlia.as_mut() { - kad.remove_peer(&peer_id); - info!("Removed {peer_id} from the routing table (if it was in there)."); - } - } - - // When we receive an autonat client event + // when we receive an autonat client event SwarmEvent::Behaviour(BehaviourEvent::AutonatClient(AutonatClientEvent { tested_addr, server, result, .. })) => { let result = result.map(|_| "Ok".to_string()).unwrap_or_else(|e| e.to_string()); debug!("NAT test to {tested_addr} with {server}: {result}"); } - // When we receive an autonat server event + // when we receive an autonat server event SwarmEvent::Behaviour(BehaviourEvent::AutonatServer(AutonatServerEvent { tested_addr, client, result, .. })) => { let result = result.map(|_| "Ok".to_string()).unwrap_or_else(|e| e.to_string()); self.msg(format!("NAT tested {tested_addr} to {client}: {result}")).await?; } - // When we receive a dcutr event + // when we receive a dcutr event SwarmEvent::Behaviour(BehaviourEvent::Dcutr(DcutrEvent { remote_peer_id, result })) => { let result = result.map(|_| "Ok".to_string()).unwrap_or_else(|e| e.to_string()); - self.msg(format!("Dcutr connection to {remote_peer_id}: {result}")).await?; + self.msg(format!("DCUtR connection to {remote_peer_id}: {result}")).await?; } - // When we receive a gossipsub event + // when we receive a ping event + SwarmEvent::Behaviour(BehaviourEvent::Ping(PingEvent { peer, connection, result })) => { + let result = result.map(|_| "Ok".to_string()).unwrap_or_else(|e| e.to_string()); + self.msg(format!("Ping to {peer} with connection {connection}: {result}")).await?; + } + + // when we receive a gossipsub event SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(event)) => match event { - GossipsubEvent::Message { .. } => { + GossipsubEvent::Message { propagation_source, .. } => { let msg = UniversalConnectivityMessage::try_from(event)?; - self.msg(format!("{msg}")).await?; + self.msg(format!("New gossipsub message from {propagation_source}: {msg}")).await?; match msg { - UniversalConnectivityMessage::Chat { from, data, ..} => { + UniversalConnectivityMessage::Chat { from, data, topic, ..} => { self.to_ui.send(Message::Chat{from, data}).await?; if let Some(peer) = from { - self.to_ui.send(Message::AddPeer(peer)).await?; + self.to_ui.send(Message::AddPeer{ + chat_peer: peer, + topic: Some(topic.into_string()), + }).await?; } } UniversalConnectivityMessage::File { from, data, .. } => { @@ -586,14 +750,12 @@ impl Peer { if let Some(peer) = from { self.swarm.behaviour_mut().request_response.send_request( &peer.into(), - FileRequest { - file_id: file_id.clone(), - }, + FileRequest { file_id: file_id.clone() }, ); self.msg(format!("Sent file request to {peer} for {file_id}")).await?; } } - UniversalConnectivityMessage::PeerDiscovery { discovered_peer, discovered_addrs, .. } => { + UniversalConnectivityMessage::PeerDiscovery { discovered_peer, discovered_addrs, topic, .. } => { let mut msg = discovered_peer .map_or("\tDialing: Unknown".to_string(), |discovered_peer| { format!("\tDialing: {} ({})", discovered_peer.id(), discovered_peer) @@ -608,23 +770,27 @@ impl Peer { } self.msg(msg).await?; if let Some(peer) = discovered_peer { - self.to_ui.send(Message::AddPeer(peer)).await?; + self.to_ui.send(Message::AddPeer { + chat_peer: peer, + topic: Some(topic.into_string()), + }).await?; } } - _ => {} } } GossipsubEvent::Subscribed { peer_id, topic } => { debug!("{peer_id} subscribed to {topic}"); - if topic.as_str() == GOSSIPSUB_CHAT_TOPIC { - self.to_ui.send(Message::AddPeer(peer_id.into())).await?; - } + self.to_ui.send(Message::AddPeer { + chat_peer: peer_id.into(), + topic: Some(topic.into_string()), + }).await?; } GossipsubEvent::Unsubscribed { peer_id, topic } => { debug!("{peer_id} unsubscribed from {topic}"); - if topic.as_str() == GOSSIPSUB_CHAT_TOPIC { - self.to_ui.send(Message::RemovePeer(peer_id.into())).await?; - } + self.to_ui.send(Message::RemovePeer { + chat_peer: peer_id.into(), + topic: Some(topic.into_string()), + }).await?; } GossipsubEvent::GossipsubNotSupported { peer_id } => { warn!("{peer_id} does not support gossipsub"); @@ -634,14 +800,17 @@ impl Peer { } } - // When we receive an identify event + // when we receive an identify event SwarmEvent::Behaviour(BehaviourEvent::Identify(event)) => match event { - IdentifyEvent::Received { info, .. } => { - //self.update_external_address(&info.observed_addr).await?; + IdentifyEvent::Received { info, peer_id, .. } => { + let agent = format!("{} version: {}", info.agent_version, info.protocol_version); if info.agent_version == UNIVERSAL_CONNECTIVITY_AGENT { - let peer_id: PeerId = info.public_key.into(); - let agent = format!("{} version: {}", info.agent_version, info.protocol_version); - let protocols = info.protocols.iter().map(|p| format!("\n\t\t{p}") ).collect::>().join(""); + let protocols = info.protocols + .iter() + .map(|p| format!("\n\t\t{p}")) + .collect::>() + .join(""); + self.msg(format!("Identify {peer_id}:\n\tagent: {agent}\n\tprotocols: {protocols}")).await?; for addr in info.listen_addrs.iter() { if !is_private_ip(addr) { @@ -650,25 +819,30 @@ impl Peer { } } } + } else { + debug!("Peer {peer_id} is on wrong network. {agent}"); } } - IdentifyEvent::Sent { .. } => { - debug!("identify::Event::Sent"); + IdentifyEvent::Sent { peer_id, connection_id } => { + debug!("Identification of the local node has been sent to a peer {peer_id} on {connection_id}"); } - IdentifyEvent::Pushed { .. } => { - debug!("identify::Event::Pushed"); + IdentifyEvent::Pushed { peer_id, connection_id, .. } => { + debug!("Identification of the local node has been actively pushed to peer {peer_id} on {connection_id}"); } IdentifyEvent::Error { peer_id, error, .. } => { match error { libp2p::swarm::StreamUpgradeError::Timeout => { - // When a browser tab closes, we don't get a swarm event + // when a browser tab closes, we don't get a swarm event // maybe there's a way to get this with TransportEvent // but for now remove the peer from routing table if there's an Identify timeout if let Some(ref mut kad) = self.swarm.behaviour_mut().kademlia.as_mut() { kad.remove_peer(&peer_id); info!("Removed {peer_id} from the routing table (if it was in there)."); } - self.to_ui.send(Message::RemovePeer(peer_id.into())).await?; + self.to_ui.send(Message::RemovePeer { + chat_peer: peer_id.into(), + topic: None, + }).await?; } _ => { debug!("{error}"); @@ -677,7 +851,7 @@ impl Peer { } } - // When we receive a kademlia event + // when we receive a kademlia event SwarmEvent::Behaviour(BehaviourEvent::Kademlia(event)) => match event { KademliaEvent::OutboundQueryProgressed { id, result, step, .. } => match result { QueryResult::Bootstrap(result) => { @@ -687,7 +861,7 @@ impl Peer { Ok(bootstrap) => { if step.last { self.bootstrap_query_id = None; - self.msg("Kademlia bootstrapped".to_string()).await?; + self.msg("Kademlia successfully bootstrapped".to_string()).await?; let mut msgs = Vec::new(); if let Some(ref mut kad) = self.swarm.behaviour_mut().kademlia.as_mut() { @@ -717,20 +891,14 @@ impl Peer { if self.get_closest_peers_query_id.contains(&id) { match result { Ok(GetClosestPeersOk { peers, .. }) => { - //if step.last { - self.get_closest_peers_query_id.remove(&id); - self.msg(format!("Kademlia {} potential universal connectivity peers:", peers.len())).await?; - for peer in peers.iter().cloned() { - self.msg(format!("\t{}:", peer.peer_id)).await?; - for addr in peer.addrs.iter().take(1) { - self.msg(format!("\t\t{addr}")).await?; - } + self.get_closest_peers_query_id.remove(&id); + self.msg(format!("Kademlia {} potential universal connectivity peers:", peers.len())).await?; + for peer in peers.iter().cloned() { + self.msg(format!("\t{}:", peer.peer_id)).await?; + for addr in peer.addrs.iter().take(1) { + self.msg(format!("\t\t{addr}")).await?; } - /* - } else { - self.msg(format!("Kademlia getting closest peers: {}", peers.len())).await?; } - */ } Err(e) => { self.get_closest_peers_query_id.remove(&id); @@ -744,45 +912,31 @@ impl Peer { if id == query_id { match result { Ok(GetProvidersOk::FoundProviders { providers, .. }) => { - //if step.last { - self.get_providers_query_id = None; - let mut msgs = Vec::new(); - if let Some(ref mut kad) = self.swarm.behaviour_mut().kademlia.as_mut() { - let peers: Vec = providers.iter().cloned().collect(); - msgs.push(format!("Kademlia {} found providers", peers.len())); - for peer in peers.iter().cloned() { - self.get_closest_peers_query_id.insert(kad.get_closest_peers(peer)); - } + self.get_providers_query_id = None; + let mut msgs = Vec::new(); + if let Some(ref mut kad) = self.swarm.behaviour_mut().kademlia.as_mut() { + let peers: Vec = providers.iter().cloned().collect(); + msgs.push(format!("Kademlia found {} new providers", peers.len())); + for peer in peers.iter().cloned() { + self.get_closest_peers_query_id.insert(kad.get_closest_peers(peer)); } - for msg in msgs.iter() { - self.msg(msg).await?; - } - /* - } else { - self.get_providers_query_id = None; - self.msg(format!("Kademlia found getting providers: {}", providers.len())).await?; } - */ + for msg in msgs.iter() { + self.msg(msg).await?; + } } Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { closest_peers }) => { - //if step.last { - self.get_providers_query_id = None; - let mut msgs = Vec::new(); - if let Some(ref mut kad) = self.swarm.behaviour_mut().kademlia.as_mut() { - msgs.push(format!("Kademlia {} found providers", closest_peers.len())); - for peer in closest_peers.iter().cloned() { - self.get_closest_peers_query_id.insert(kad.get_closest_peers(peer)); - } + self.get_providers_query_id = None; + let mut msgs = Vec::new(); + if let Some(ref mut kad) = self.swarm.behaviour_mut().kademlia.as_mut() { + msgs.push(format!("Kademlia no additional records, {} known peers", closest_peers.len())); + for peer in closest_peers.iter().cloned() { + self.get_closest_peers_query_id.insert(kad.get_closest_peers(peer)); } - for msg in msgs.iter() { - self.msg(msg).await?; - } - /* - } else { - self.get_providers_query_id = None; - self.msg(format!("Kademlia finished getting providers: {}", closest_peers.len())).await?; } - */ + for msg in msgs.iter() { + self.msg(msg).await?; + } } Err(e) => { self.get_providers_query_id = None; @@ -794,8 +948,10 @@ impl Peer { } } QueryResult::GetRecord(result) => match result { - Ok(_record) => { - self.msg("Kademlia record retrieved".to_string()).await?; + Ok(record) => match record { + GetRecordOk::FoundRecord(peer_record) => debug!("New kademlia record found: {:?}", peer_record.peer), + GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates } => + debug!("No new record for kadmelia, {} cache candidates", cache_candidates.len()), } Err(e) => { self.msg(format!("Failed to retrieve Kademlia record: {e}")).await?; @@ -831,7 +987,7 @@ impl Peer { ref _other => {} } - // When we receive a relay client event + // when we receive a relay client event SwarmEvent::Behaviour(BehaviourEvent::RelayClient(event)) => match event { RelayClientEvent::ReservationReqAccepted { relay_peer_id, renewal, limit } => { self.msg(format!("Relay reservation request accepted:\n\tfrom: {relay_peer_id}\n\trenewed: {renewal}\n\tlimit: {limit:?}")).await?; @@ -844,7 +1000,7 @@ impl Peer { } } - // When we receive a relay server event + // when we receive a relay server event SwarmEvent::Behaviour(BehaviourEvent::RelayServer(event)) => match event { RelayServerEvent::ReservationReqAccepted { src_peer_id, renewed } => { self.msg(format!("Relay reservation request accepted:\n\tfrom: {src_peer_id}\n\trenewed: {renewed}")).await?; @@ -867,27 +1023,25 @@ impl Peer { _ => {} } - // When we receive a request_response event + // when we receive a request_response event SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(event)) => match event { RequestResponseEvent::Message { message, .. } => match message { RequestResponseMessage::Request { request, .. } => { //TODO: support ProtocolSupport::Full - debug!( + error!( "umimplemented: request_response::Message::Request: {:?}", request ); } RequestResponseMessage::Response { response, .. } => { - info!( + error!( "request_response::Message::Response: size:{}", response.file_body.len() ); // TODO: store this file (in memory or disk) and provider it via Kademlia } } - RequestResponseEvent::OutboundFailure { - request_id, error, .. - } => { + RequestResponseEvent::OutboundFailure { request_id, error, .. } => { error!( "request_response::Event::OutboundFailure for request {:?}: {:?}", request_id, error @@ -901,7 +1055,6 @@ impl Peer { } } } - Ok(()) } } @@ -929,13 +1082,6 @@ enum UniversalConnectivityMessage { seq_no: Option, topic: TopicHash, }, - Unknown { - propagation_source: PeerId, - from: Option, - data: Vec, - seq_no: Option, - topic: TopicHash, - }, } impl TryFrom for UniversalConnectivityMessage { @@ -954,13 +1100,6 @@ impl TryFrom for UniversalConnectivityMessage { let topic = message.topic.clone(); match topic.as_str() { - GOSSIPSUB_CHAT_TOPIC => Ok(Self::Chat { - propagation_source, - from, - data, - seq_no, - topic, - }), GOSSIPSUB_CHAT_FILE_TOPIC => Ok(Self::File { propagation_source, from, @@ -1001,7 +1140,7 @@ impl TryFrom for UniversalConnectivityMessage { topic, }) } - _ => Ok(Self::Unknown { + _ => Ok(Self::Chat { propagation_source, from, data, @@ -1087,28 +1226,6 @@ impl fmt::Display for UniversalConnectivityMessage { }); write!(f, "Received peer discovery:\n\tp source: {propagation_source}\n\tsource: {source}\n\tseq no: {seq_no}\n\ttopic: {topic}\n\tfrom: {chat_peer}\n\tpeer: {discovered_peer}\n\tmultiaddrs: {}", discovered_addrs.len()) } - Self::Unknown { - propagation_source, - from, - data, - seq_no, - topic, - } => { - let propagation_source = { - let ps: ChatPeer = propagation_source.into(); - format!("{} ({})", ps.id(), ps) - }; - let chat_peer = from.as_ref().map_or("Unknown".to_string(), |from| { - format!("{} ({})", from.id(), from) - }); - let source = from.as_ref().map_or("Unknown".to_string(), |peer| { - format!("{} ({})", peer.id(), peer) - }); - let seq_no = seq_no.map_or("Unknown".to_string(), |seq_no| seq_no.to_string()); - let fields = decode_unknown_protobuf(data).map_err(|_| fmt::Error)?; - let data = pretty_print_fields(&fields); - write!(f, "Received unknown message:\n\tp source: {propagation_source}\n\tsource: {source}\n\tseq no: {seq_no}\n\ttopic: {topic}\n\tfrom: {chat_peer}\n\tdata: {data}") - } } } } diff --git a/src/ui/headless.rs b/src/ui/headless.rs index 82b6164..1b72b72 100644 --- a/src/ui/headless.rs +++ b/src/ui/headless.rs @@ -78,18 +78,19 @@ impl Ui for Headless { String::from_utf8(data).unwrap_or("Invalid UTF-8".to_string()); println!("{}: {}", from, message); } - Message::AddPeer(peer) => { - if self.peers.insert(peer) { + Message::AddPeer { chat_peer, topic } => { + if self.peers.insert(chat_peer) { println!( - "Adding peer:\n\tpeer id: {}\n\tname: {}", - peer.id(), - peer.name() + "Adding peer:\n\tpeer id: {}\n\tname: {}\n\t{}", + chat_peer.id(), + chat_peer.name(), + topic.unwrap_or_default(), ); } } - Message::RemovePeer(peer) => { - if self.peers.remove(&peer) { - println!("Removing peer: {peer:?}"); + Message::RemovePeer { chat_peer, topic } => { + if self.peers.remove(&chat_peer) { + println!("Removing peer {chat_peer:?} for topic {topic:?}"); } } Message::Event(event) => { diff --git a/src/ui/tui.rs b/src/ui/tui.rs index 2214e86..c8c23cb 100644 --- a/src/ui/tui.rs +++ b/src/ui/tui.rs @@ -114,23 +114,27 @@ impl Ui for Tui { info!("{peer_str}"); } } - Message::AddPeer(peer) => { - if chat_widget.peers.insert(peer) { + Message::AddPeer { chat_peer, topic } => { + if chat_widget.peers.insert(chat_peer) { log_widget.add_line(format!( - "Adding peer:\n\tpeer id: {}\n\tname: {}", - peer.id(), - peer.name() + "Adding peer:\n\tpeer id: {}\n\tname: {}\n\ttopic: {}", + chat_peer.id(), + chat_peer.name(), + topic.unwrap_or_default(), )); } } - Message::RemovePeer(peer) => { - if chat_widget.peers.remove(&peer) { - log_widget.add_line(format!("Removing peer: {peer:?}")); + Message::RemovePeer { chat_peer, topic } => { + if chat_widget.peers.remove(&chat_peer) { + log_widget.add_line(format!( + "Removing peer {chat_peer:?} for topic {topic:?}" + )); } } Message::Event(event) => { log_widget.add_line(event); } + _ => {} } }