feat(connlib): directly send wireguard traffic instead of tunneling it through WebRTC datachannels (#2643)

This PR started as part of a degradation in performance for the
gateways.

The way to test performance in a realistic enviroment is using a GCP vm
as a client and an AWS vm as a gateway with a single iperf server behind
the gateway.

Then the `iperf` results with current main:

```
Connecting to host 172.31.92.238, port 5201
Reverse mode, remote host 172.31.92.238 is sending
[  5] local 100.83.194.77 port 58426 connected to 172.31.92.238 port 5201
[ ID] Interval           Transfer     Bitrate
[  5]   0.00-1.00   sec  1.01 MBytes  8.50 Mbits/sec                  
[  5]   1.00-2.00   sec  1.14 MBytes  9.59 Mbits/sec                  
[  5]   2.00-3.00   sec   699 KBytes  5.73 Mbits/sec                  
[  5]   3.00-4.00   sec  1.11 MBytes  9.31 Mbits/sec                  
[  5]   4.00-5.00   sec   664 KBytes  5.44 Mbits/sec                  
[  5]   5.00-6.00   sec   591 KBytes  4.84 Mbits/sec                  
[  5]   6.00-7.00   sec   722 KBytes  5.91 Mbits/sec                  
[  5]   7.00-8.00   sec   833 KBytes  6.83 Mbits/sec                  
[  5]   8.00-9.00   sec   738 KBytes  6.04 Mbits/sec                  
[  5]   9.00-10.00  sec   836 KBytes  6.85 Mbits/sec                  
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.06  sec  8.78 MBytes  7.32 Mbits/sec    3             sender
[  5]   0.00-10.00  sec  8.23 MBytes  6.90 Mbits/sec                  receiver

iperf Done.
```

Most of the performance problems were due to using SCTP and DTLS.

So I created a
[fork](https://github.com/firezone/webrtc/tree/expose-new-endpoint) of
webrtc that let us circumvent those, since we don't need them because we
are depending on wireguard for encryption.

With those changes much better throughput is achieved:

```
gabriel@cloudshell:~ (firezone-personal-instances)$ iperf3 -R -c 172.31.92.238
Connecting to host 172.31.92.238, port 5201
Reverse mode, remote host 172.31.92.238 is sending
[  5] local 100.83.194.77 port 51206 connected to 172.31.92.238 port 5201
[ ID] Interval           Transfer     Bitrate
[  5]   0.00-1.00   sec  5.60 MBytes  47.0 Mbits/sec                  
[  5]   1.00-2.00   sec  17.2 MBytes   144 Mbits/sec                  
[  5]   2.00-3.00   sec  15.8 MBytes   132 Mbits/sec                  
[  5]   3.00-4.00   sec  14.8 MBytes   125 Mbits/sec                  
[  5]   4.00-5.00   sec  15.9 MBytes   133 Mbits/sec                  
[  5]   5.00-6.00   sec  15.8 MBytes   133 Mbits/sec                  
[  5]   6.00-7.00   sec  15.3 MBytes   128 Mbits/sec                  
[  5]   7.00-8.00   sec  15.6 MBytes   131 Mbits/sec                  
[  5]   8.00-9.00   sec  15.6 MBytes   131 Mbits/sec                  
[  5]   9.00-10.00  sec  16.0 MBytes   134 Mbits/sec                  
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.05  sec   151 MBytes   126 Mbits/sec   74             sender
[  5]   0.00-10.00  sec   148 MBytes   124 Mbits/sec                  receiver

iperf Done
```

However, this is still worse than it was achieved with a previous
commit(`21afdf0a9a113c996d60a63b2e8c8f32d3aeb87`):
```
gabriel@cloudshell:~ (firezone-personal-instances)$ iperf3 -R -c 172.31.92.238
Connecting to host 172.31.92.238, port 5201
Reverse mode, remote host 172.31.92.238 is sending
[  5] local 100.100.68.41 port 49762 connected to 172.31.92.238 port 5201
[ ID] Interval           Transfer     Bitrate
[  5]   0.00-1.00   sec  6.14 MBytes  51.5 Mbits/sec                  
[  5]   1.00-2.00   sec  17.1 MBytes   144 Mbits/sec                  
[  5]   2.00-3.00   sec  22.8 MBytes   191 Mbits/sec                  
[  5]   3.00-4.00   sec  23.5 MBytes   197 Mbits/sec                  
[  5]   4.00-5.00   sec  23.0 MBytes   193 Mbits/sec                  
[  5]   5.00-6.00   sec  22.1 MBytes   185 Mbits/sec                  
[  5]   6.00-7.00   sec  23.0 MBytes   193 Mbits/sec                  
[  5]   7.00-8.00   sec  22.7 MBytes   190 Mbits/sec                  
[  5]   8.00-9.00   sec  21.0 MBytes   176 Mbits/sec                  
[  5]   9.00-10.00  sec  19.9 MBytes   167 Mbits/sec                  
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.05  sec   204 MBytes   170 Mbits/sec  127             sender
[  5]   0.00-10.00  sec   201 MBytes   169 Mbits/sec                  receiver
```

My profiling suggested that this is due to reading/writing packets
happening in its own dedicated tasks. So much so that maybe in the
future we should even consider spawning their own dedicated runtime so
that those loops have a dedicated OS thread.

Also, probably using a multi-queue interface will give us huge gains if
we have a dedicated task for each queue(currently the interface is
started as a multi-queue but a single file descriptor is used) for
handling multiple concurrent clients.

However, the changes proposed in this PR are good enough for now as long
as performance don't degrade.

In that line I will create a CI that reports the throughput using the
local `docker-compose.yml` file that we should always check before
merging, that is not the be all end all of the performance story but for
smaller PRs the correlation to real world throughput should be enough.

For bigger PRs we should manually test before merging for now, until we
have a way in CI to spin up some realistic tests(note that vms should be
in separate cloud enviroments, the same-cloud links are so reliable that
we miss actual performance degradation due to dropped packets). On this
note I'll write a small manual on how to conduct those tests with full
current results that we should use always before merging new PRs that
affect the hot-path. cc @thomaseizinger

Finally, when testing these changes I found some flakiness regarding the
re-connection path. So I changed things so that we cleanup connections
only using wireguard's error(connection expiration). This is quite slow
for now (~120 seconds) but in the future we can issue an ice restart
each time wireguard keepalive expires(rekey timeout) so that we can
restart connection each ~30 seconds and we can reduce the keepalive time
out from the portal to accelerate it even more. And in the future we can
get smarter about it.

---------

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
This commit is contained in:
Gabi
2023-11-15 23:59:48 -03:00
committed by GitHub
parent ce7c5198fa
commit bc8f438a56
19 changed files with 570 additions and 598 deletions

284
rust/Cargo.lock generated
View File

@@ -212,7 +212,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -414,7 +414,7 @@ dependencies = [
[[package]]
name = "boringtun"
version = "0.6.0"
source = "git+https://github.com/cloudflare/boringtun?branch=master#e1d6360d6ab4529fc942a078e4c54df107abe2ba"
source = "git+https://github.com/cloudflare/boringtun?branch=master#f672bb6c1e1e371240a8d151f15854687eb740bb"
dependencies = [
"aead",
"base64 0.13.1",
@@ -428,7 +428,7 @@ dependencies = [
"nix 0.25.1",
"parking_lot",
"rand_core",
"ring 0.16.20",
"ring 0.17.5",
"tracing",
"untrusted 0.9.0",
"x25519-dalek",
@@ -594,9 +594,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.4.7"
version = "4.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac495e00dcec98c83465d5ad66c5c4fabd652fd6686e7c6269b117e729a6f17b"
checksum = "2275f18819641850fa26c89acc84d465c1bf91ce57bc2748b28c420473352f64"
dependencies = [
"clap_builder",
"clap_derive",
@@ -604,9 +604,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.4.7"
version = "4.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c77ed9a32a62e6ca27175d00d29d05ca32e396ea1eb5fb01d8256b669cec7663"
checksum = "07cdf1b148b25c1e1f7a42225e30a0d99a615cd4637eae7365548dd4529b95bc"
dependencies = [
"anstream",
"anstyle",
@@ -623,7 +623,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -799,9 +799,9 @@ checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
[[package]]
name = "cpufeatures"
version = "0.2.9"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1"
checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0"
dependencies = [
"libc",
]
@@ -817,9 +817,9 @@ dependencies = [
[[package]]
name = "crc-catalog"
version = "2.2.0"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crc32fast"
@@ -851,9 +851,9 @@ dependencies = [
[[package]]
name = "crypto-bigint"
version = "0.5.3"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "740fe28e594155f10cfc383984cbefd529d7396050557148f79cb0f621204124"
checksum = "28f85c3514d2a6e64160359b45a3918c3b4178bcbf4ae5d03ab2d02e521c479a"
dependencies = [
"generic-array",
"rand_core",
@@ -909,13 +909,13 @@ dependencies = [
[[package]]
name = "curve25519-dalek-derive"
version = "0.1.0"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b"
checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -997,7 +997,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -1070,7 +1070,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -1088,9 +1088,9 @@ dependencies = [
[[package]]
name = "errno"
version = "0.3.5"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860"
checksum = "7c18ee0ed65a5f1f81cac6b1d213b69c35fa47d4252ad41f1486dbd8226fe36e"
dependencies = [
"libc",
"windows-sys 0.48.0",
@@ -1114,9 +1114,9 @@ dependencies = [
[[package]]
name = "fiat-crypto"
version = "0.2.1"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0870c84016d4b481be5c9f323c24f65e31e901ae618f0e80f4308fb00de1d2d"
checksum = "53a56f0780318174bad1c127063fd0c5fdfb35398e3cd79ffaab931a6c79df80"
[[package]]
name = "firezone-cli-utils"
@@ -1126,6 +1126,7 @@ dependencies = [
"ctrlc",
"ip_network",
"tracing",
"tracing-log 0.2.0",
"tracing-subscriber",
"url",
]
@@ -1290,9 +1291,9 @@ dependencies = [
[[package]]
name = "futures-bounded"
version = "0.2.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a2b7bc3e71d5b3c6e1436bd600d88a7a9315b3589883018123646767ea2d522"
checksum = "43bcbe9c086773ba37692e50f578c4c459c0a5ffe07bd924d0ea485b2a46c9c7"
dependencies = [
"futures-timer",
"futures-util",
@@ -1339,7 +1340,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -1400,9 +1401,9 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.10"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f"
dependencies = [
"cfg-if",
"libc",
@@ -1576,9 +1577,9 @@ dependencies = [
[[package]]
name = "http"
version = "0.2.9"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb"
dependencies = [
"bytes",
"fnv",
@@ -1631,7 +1632,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.4.9",
"socket2 0.4.10",
"tokio",
"tower-service",
"tracing",
@@ -1640,9 +1641,9 @@ dependencies = [
[[package]]
name = "hyper-rustls"
version = "0.24.1"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97"
checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
dependencies = [
"futures-util",
"http",
@@ -1729,7 +1730,7 @@ dependencies = [
[[package]]
name = "interceptor"
version = "0.10.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"async-trait",
"bytes",
@@ -1793,9 +1794,9 @@ dependencies = [
[[package]]
name = "ipnet"
version = "2.8.0"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "is-terminal"
@@ -1868,9 +1869,9 @@ checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130"
[[package]]
name = "js-sys"
version = "0.3.64"
version = "0.3.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a"
checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8"
dependencies = [
"wasm-bindgen",
]
@@ -1927,9 +1928,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linux-raw-sys"
version = "0.4.10"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f"
checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829"
[[package]]
name = "lock_api"
@@ -2049,9 +2050,9 @@ dependencies = [
[[package]]
name = "mio"
version = "0.8.8"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0"
dependencies = [
"libc",
"wasi",
@@ -2424,7 +2425,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.4.1",
"redox_syscall",
"smallvec",
"windows-targets 0.48.5",
]
@@ -2500,7 +2501,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -2527,9 +2528,9 @@ dependencies = [
[[package]]
name = "platforms"
version = "3.1.2"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8"
checksum = "14e6ab3f592e6fb464fc9712d8d6e6912de6473954635fd76a589d832cffcbb0"
[[package]]
name = "pnet_base"
@@ -2549,7 +2550,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -2610,9 +2611,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "primeorder"
version = "0.13.2"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c2fcef82c0ec6eefcc179b978446c399b3cdf73c392c35604e399eee6df1ee3"
checksum = "c7dbe9ed3b56368bd99483eb32fe9c17fdd3730aebadc906918ce78d54c7eeb4"
dependencies = [
"elliptic-curve",
]
@@ -2755,15 +2756,6 @@ dependencies = [
"url",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
@@ -2911,7 +2903,7 @@ dependencies = [
[[package]]
name = "rtcp"
version = "0.10.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"bytes",
"thiserror",
@@ -2939,7 +2931,7 @@ dependencies = [
[[package]]
name = "rtp"
version = "0.9.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"bytes",
"rand",
@@ -2980,9 +2972,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.19"
version = "0.38.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed"
checksum = "ffb93593068e9babdad10e4fce47dc9b3ac25315a72a59766ffd9e9a71996a04"
dependencies = [
"bitflags 2.4.1",
"errno",
@@ -2993,12 +2985,12 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.7"
version = "0.21.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8"
checksum = "446e14c5cda4f3f30fe71863c34ec70f5ac79d6087097ad0bb433e1be5edf04c"
dependencies = [
"log",
"ring 0.16.20",
"ring 0.17.5",
"rustls-webpki",
"sct",
]
@@ -3017,21 +3009,21 @@ dependencies = [
[[package]]
name = "rustls-pemfile"
version = "1.0.3"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2"
checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c"
dependencies = [
"base64 0.21.5",
]
[[package]]
name = "rustls-webpki"
version = "0.101.6"
version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [
"ring 0.16.20",
"untrusted 0.7.1",
"ring 0.17.5",
"untrusted 0.9.0",
]
[[package]]
@@ -3084,18 +3076,18 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sct"
version = "0.7.0"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [
"ring 0.16.20",
"untrusted 0.7.1",
"ring 0.17.5",
"untrusted 0.9.0",
]
[[package]]
name = "sdp"
version = "0.6.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"rand",
"substring",
@@ -3159,22 +3151,22 @@ checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090"
[[package]]
name = "serde"
version = "1.0.190"
version = "1.0.192"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.190"
version = "1.0.192"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -3267,9 +3259,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.11.1"
version = "1.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a"
checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970"
[[package]]
name = "smbios-lib"
@@ -3298,9 +3290,9 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.4.9"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662"
checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d"
dependencies = [
"libc",
"winapi",
@@ -3353,7 +3345,7 @@ dependencies = [
"proc-macro2",
"quote",
"structmeta-derive",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -3364,13 +3356,13 @@ checksum = "a60bcaff7397072dca0017d1db428e30d5002e00b6847703e2e42005c95fbe00"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
name = "stun"
version = "0.5.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"base64 0.21.5",
"crc",
@@ -3473,9 +3465,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.38"
version = "2.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b"
checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a"
dependencies = [
"proc-macro2",
"quote",
@@ -3523,22 +3515,22 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.8.0"
version = "3.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef"
checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5"
dependencies = [
"cfg-if",
"fastrand",
"redox_syscall 0.3.5",
"redox_syscall",
"rustix",
"windows-sys 0.48.0",
]
[[package]]
name = "termcolor"
version = "1.3.0"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6093bad37da69aab9d123a8091e4be0aa4a03e4d601ec641c327398315f62b64"
checksum = "ff1bc3d3f05aff0403e8ac0d92ced918ec05b666a43f83297ccef5bea8a3d449"
dependencies = [
"winapi-util",
]
@@ -3552,7 +3544,7 @@ dependencies = [
"proc-macro2",
"quote",
"structmeta",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -3572,7 +3564,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -3631,9 +3623,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.33.0"
version = "1.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9"
dependencies = [
"backtrace",
"bytes",
@@ -3660,13 +3652,13 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.1.0"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -3805,11 +3797,12 @@ dependencies = [
[[package]]
name = "tracing-appender"
version = "0.2.2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e"
checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf"
dependencies = [
"crossbeam-channel",
"thiserror",
"time",
"tracing-subscriber",
]
@@ -3822,7 +3815,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]
[[package]]
@@ -3837,12 +3830,23 @@ dependencies = [
[[package]]
name = "tracing-log"
version = "0.1.3"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2"
dependencies = [
"lazy_static",
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
@@ -3856,7 +3860,7 @@ dependencies = [
"opentelemetry",
"tracing",
"tracing-core",
"tracing-log",
"tracing-log 0.1.4",
"tracing-subscriber",
]
@@ -3872,7 +3876,7 @@ dependencies = [
"smallvec",
"tracing",
"tracing-core",
"tracing-log",
"tracing-log 0.1.4",
"tracing-subscriber",
]
@@ -3920,9 +3924,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.17"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [
"matchers",
"nu-ansi-term",
@@ -3936,7 +3940,7 @@ dependencies = [
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-log 0.2.0",
"tracing-serde",
]
@@ -3998,7 +4002,7 @@ dependencies = [
[[package]]
name = "turn"
version = "0.7.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"async-trait",
"base64 0.21.5",
@@ -4177,9 +4181,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.87"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342"
checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
@@ -4187,24 +4191,24 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.87"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd"
checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.37"
version = "0.4.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03"
checksum = "9afec9963e3d0994cac82455b2b3502b81a7f40f9a0d32181f7528d9f4b43e02"
dependencies = [
"cfg-if",
"js-sys",
@@ -4214,9 +4218,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.87"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d"
checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -4224,22 +4228,22 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.87"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.87"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b"
[[package]]
name = "wasm-streams"
@@ -4256,9 +4260,9 @@ dependencies = [
[[package]]
name = "web-sys"
version = "0.3.64"
version = "0.3.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b"
checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -4273,7 +4277,7 @@ checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc"
[[package]]
name = "webrtc"
version = "0.9.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"arc-swap",
"async-trait",
@@ -4315,7 +4319,7 @@ dependencies = [
[[package]]
name = "webrtc-data"
version = "0.8.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"bytes",
"log",
@@ -4328,7 +4332,7 @@ dependencies = [
[[package]]
name = "webrtc-dtls"
version = "0.8.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"aes",
"aes-gcm",
@@ -4363,7 +4367,7 @@ dependencies = [
[[package]]
name = "webrtc-ice"
version = "0.10.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"arc-swap",
"async-trait",
@@ -4386,7 +4390,7 @@ dependencies = [
[[package]]
name = "webrtc-mdns"
version = "0.6.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"log",
"socket2 0.5.5",
@@ -4398,7 +4402,7 @@ dependencies = [
[[package]]
name = "webrtc-media"
version = "0.7.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"byteorder",
"bytes",
@@ -4410,7 +4414,7 @@ dependencies = [
[[package]]
name = "webrtc-sctp"
version = "0.9.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"arc-swap",
"async-trait",
@@ -4426,7 +4430,7 @@ dependencies = [
[[package]]
name = "webrtc-srtp"
version = "0.11.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"aead",
"aes",
@@ -4448,7 +4452,7 @@ dependencies = [
[[package]]
name = "webrtc-util"
version = "0.8.0"
source = "git+https://github.com/firezone/webrtc?branch=master#f4c503333ac8ee01d0ad4e8ba827baa9f83f3fe2"
source = "git+https://github.com/firezone/webrtc?branch=expose-new-endpoint#4918ae812ab7e45b8ab000efbc316b5850290ddb"
dependencies = [
"async-trait",
"bitflags 1.3.2",
@@ -4744,5 +4748,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.39",
]

View File

@@ -38,7 +38,7 @@ phoenix-channel = { path = "phoenix-channel"}
[patch.crates-io]
boringtun = { git = "https://github.com/cloudflare/boringtun", branch = "master" } # Contains unreleased patches we need (bump of x25519-dalek)
webrtc = { git = "https://github.com/firezone/webrtc", branch = "master" }
webrtc = { git = "https://github.com/firezone/webrtc", branch = "expose-new-endpoint" }
[profile.release]
strip = true

View File

@@ -107,7 +107,7 @@ impl<CB: Callbacks + 'static> ControlPlane<CB> {
}
#[tracing::instrument(level = "trace", skip(self))]
pub async fn connect(
pub fn connect(
&mut self,
Connect {
gateway_rtc_session_description,
@@ -116,15 +116,11 @@ impl<CB: Callbacks + 'static> ControlPlane<CB> {
..
}: Connect,
) {
if let Err(e) = self
.tunnel
.received_offer_response(
resource_id,
gateway_rtc_session_description,
gateway_public_key.0.into(),
)
.await
{
if let Err(e) = self.tunnel.received_offer_response(
resource_id,
gateway_rtc_session_description,
gateway_public_key.0.into(),
) {
let _ = self.tunnel.callbacks().on_error(&e);
}
}
@@ -228,7 +224,7 @@ impl<CB: Callbacks + 'static> ControlPlane<CB> {
Messages::ConnectionDetails(connection_details) => {
self.connection_details(connection_details, reference)
}
Messages::Connect(connect) => self.connect(connect).await,
Messages::Connect(connect) => self.connect(connect),
Messages::ResourceAdded(resource) => self.add_resource(resource).await,
Messages::ResourceRemoved(resource) => self.remove_resource(resource.id),
Messages::ResourceUpdated(resource) => self.update_resource(resource),

View File

@@ -1,6 +1,5 @@
use std::{collections::HashSet, net::IpAddr};
use firezone_tunnel::RTCSessionDescription;
use serde::{Deserialize, Serialize};
use connlib_shared::messages::{
@@ -8,7 +7,7 @@ use connlib_shared::messages::{
ReuseConnection,
};
use url::Url;
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
use webrtc::ice_transport::{ice_candidate::RTCIceCandidate, ice_parameters::RTCIceParameters};
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize, Clone)]
pub struct InitClient {
@@ -32,7 +31,7 @@ pub struct ConnectionDetails {
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Connect {
pub gateway_rtc_session_description: RTCSessionDescription,
pub gateway_rtc_session_description: RTCIceParameters,
pub resource_id: ResourceId,
pub gateway_public_key: Key,
pub persistent_keepalive: u64,
@@ -70,7 +69,7 @@ pub struct BroadcastGatewayIceCandidates {
/// Gateway's id the ice candidates are meant for
pub gateway_ids: Vec<GatewayId>,
/// Actual RTC ice candidates
pub candidates: Vec<RTCIceCandidateInit>,
pub candidates: Vec<RTCIceCandidate>,
}
/// A gateway's ice candidate message.
@@ -79,7 +78,7 @@ pub struct GatewayIceCandidates {
/// Gateway's id the ice candidates are from
pub gateway_id: GatewayId,
/// Actual RTC ice candidates
pub candidates: Vec<RTCIceCandidateInit>,
pub candidates: Vec<RTCIceCandidate>,
}
/// The replies that can arrive from the channel by a client
@@ -182,8 +181,9 @@ mod test {
"resource_id": "ea6570d1-47c7-49d2-9dc3-efff1c0c9e0b",
"gateway_public_key": "dvy0IwyxAi+txSbAdT7WKgf7K4TekhKzrnYwt5WfbSM=",
"gateway_rtc_session_description": {
"sdp": "v=0\\r\\no=- 6423047867593421607 871431568 IN IP4 0.0.0.0\\r\\ns=-\\r\\nt=0 0\\r\\na=fingerprint:sha-256 65:8C:0B:EC:C5:B8:AB:2C:C7:47:F6:1A:6F:C3:4F:70:C7:06:34:84:FE:4E:FD:E5:C4:D2:4F:7C:ED:AF:0D:17\\r\\na=group:BUNDLE 0\\r\\nm=application 9 UDP/DTLS/SCTP webrtc-datachannel\\r\\nc=IN IP4 0.0.0.0\\r\\na=setup:active\\r\\na=mid:0\\r\\na=sendrecv\\r\\na=sctp-port:5000\\r\\na=ice-ufrag:zDSijpzITpzCfjbw\\r\\na=ice-pwd:QGufrJIKwqRjhDsNTdddVLFXmvGQJxke\\r\\na=candidate:167090039 1 udp 2130706431 :: 33628 typ host\\r\\na=candidate:167090039 2 udp 2130706431 :: 33628 typ host\\r\\na=candidate:1081386133 1 udp 2130706431 100.102.249.43 51575 typ host\\r\\na=candidate:1081386133 2 udp 2130706431 100.102.249.43 51575 typ host\\r\\na=candidate:1290078212 1 udp 2130706431 172.28.0.7 58698 typ host\\r\\na=candidate:1290078212 2 udp 2130706431 172.28.0.7 58698 typ host\\r\\na=candidate:349389859 1 udp 2130706431 172.20.0.3 51567 typ host\\r\\na=candidate:349389859 2 udp 2130706431 172.20.0.3 51567 typ host\\r\\na=candidate:936829106 1 udp 1694498815 172.28.0.7 35458 typ srflx raddr 0.0.0.0 rport 35458\\r\\na=candidate:936829106 2 udp 1694498815 172.28.0.7 35458 typ srflx raddr 0.0.0.0 rport 35458\\r\\na=candidate:936829106 1 udp 1694498815 172.28.0.7 46603 typ srflx raddr 0.0.0.0 rport 46603\\r\\na=candidate:936829106 2 udp 1694498815 172.28.0.7 46603 typ srflx raddr 0.0.0.0 rport 46603\\r\\na=end-of-candidates\\r\\n",
"type": "answer"
"ice_lite":false,
"password": "xEwoXEzHuSyrcgOCSRnwOXQVnbnbeGeF",
"username_fragment": "PvCPFevCOgkvVCtH"
},
"persistent_keepalive": 25
}

View File

@@ -6,7 +6,7 @@ use ip_network::IpNetwork;
use serde::{Deserialize, Serialize};
use std::{fmt, str::FromStr};
use uuid::Uuid;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::ice_transport::ice_parameters::RTCIceParameters;
mod key;
@@ -92,7 +92,7 @@ pub struct RequestConnection {
/// The preshared key the client generated for the connection that it is trying to establish.
pub client_preshared_key: SecretKey,
/// Client's local RTC Session Description that the client will use for this connection.
pub client_rtc_session_description: RTCSessionDescription,
pub client_rtc_session_description: RTCIceParameters,
}
/// Represent a request to reuse an existing gateway connection from a client to a given resource.

View File

@@ -19,6 +19,7 @@ use futures_bounded::{PushError, StreamMap};
use hickory_resolver::lookup::Lookup;
use ip_network::IpNetwork;
use ip_network_table::IpNetworkTable;
use rand_core::OsRng;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::net::IpAddr;
@@ -26,7 +27,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::Instant;
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
impl<CB> Tunnel<CB, ClientState>
where
@@ -128,9 +129,9 @@ where
/// [`Tunnel`] state specific to clients.
pub struct ClientState {
active_candidate_receivers: StreamMap<GatewayId, RTCIceCandidateInit>,
active_candidate_receivers: StreamMap<GatewayId, RTCIceCandidate>,
/// We split the receivers of ICE candidates into two phases because we only want to start sending them once we've received an SDP from the gateway.
waiting_for_sdp_from_gatway: HashMap<GatewayId, Receiver<RTCIceCandidateInit>>,
waiting_for_sdp_from_gatway: HashMap<GatewayId, Receiver<RTCIceCandidate>>,
// TODO: Make private
pub awaiting_connection: HashMap<ResourceId, AwaitingConnectionDetails>,
@@ -139,6 +140,7 @@ pub struct ClientState {
awaiting_connection_timers: StreamMap<ResourceId, Instant>,
pub gateway_public_keys: HashMap<GatewayId, PublicKey>,
pub gateway_preshared_keys: HashMap<GatewayId, StaticSecret>,
resources_gateways: HashMap<ResourceId, GatewayId>,
resources: ResourceTable<ResourceDescription>,
dns_queries: BoundedQueue<DnsQuery<'static>>,
@@ -236,11 +238,12 @@ impl ClientState {
pub fn on_connection_failed(&mut self, resource: ResourceId) {
self.awaiting_connection.remove(&resource);
self.awaiting_connection_timers.remove(resource);
let Some(gateway) = self.resources_gateways.remove(&resource) else {
return;
};
self.gateway_awaiting_connection.remove(&gateway);
self.awaiting_connection_timers.remove(resource);
}
pub fn on_connection_intent(&mut self, destination: IpAddr) {
@@ -298,8 +301,13 @@ impl ClientState {
&mut self,
resource: ResourceId,
gateway: GatewayId,
shared_key: StaticSecret,
) -> Result<PeerConfig, ConnlibError> {
let shared_key = self
.gateway_preshared_keys
.get(&gateway)
.ok_or(Error::ControlProtocolError)?
.clone();
let Some(public_key) = self.gateway_public_keys.remove(&gateway) else {
self.awaiting_connection.remove(&resource);
self.gateway_awaiting_connection.remove(&gateway);
@@ -312,24 +320,34 @@ impl ClientState {
.get_by_id(&resource)
.ok_or(Error::ControlProtocolError)?;
Ok(PeerConfig {
let config = PeerConfig {
persistent_keepalive: None,
public_key,
ips: desc.ips(),
preshared_key: SecretKey::new(Key(shared_key.to_bytes())),
})
};
// Tidy up state once everything succeeded.
self.gateway_awaiting_connection.remove(&gateway);
self.awaiting_connection.remove(&resource);
Ok(config)
}
pub fn gateway_by_resource(&self, resource: &ResourceId) -> Option<GatewayId> {
self.resources_gateways.get(resource).copied()
}
pub fn add_waiting_ice_receiver(
pub fn add_waiting_gateway(
&mut self,
id: GatewayId,
receiver: Receiver<RTCIceCandidateInit>,
) {
receiver: Receiver<RTCIceCandidate>,
) -> StaticSecret {
self.waiting_for_sdp_from_gatway.insert(id, receiver);
let preshared_key = StaticSecret::random_from_rng(OsRng);
self.gateway_preshared_keys
.insert(id, preshared_key.clone());
preshared_key
}
pub fn activate_ice_candidate_receiver(&mut self, id: GatewayId, key: PublicKey) {
@@ -401,6 +419,7 @@ impl Default for ClientState {
resources_gateways: Default::default(),
resources: Default::default(),
dns_queries: BoundedQueue::with_capacity(DNS_QUERIES_QUEUE_SIZE),
gateway_preshared_keys: Default::default(),
}
}
}

View File

@@ -1,25 +1,23 @@
use arc_swap::ArcSwapOption;
use bytes::Bytes;
use futures::channel::mpsc;
use futures_util::SinkExt;
use std::sync::Arc;
use ip_network::IpNetwork;
use ip_network_table::IpNetworkTable;
use std::{fmt, sync::Arc};
use connlib_shared::{
messages::{Relay, RequestConnection, ReuseConnection},
Callbacks, Error, Result,
};
use webrtc::data_channel::OnCloseHdlrFn;
use webrtc::peer_connection::OnPeerConnectionStateChangeHdlrFn;
use webrtc::{
ice_transport::{
ice_candidate::RTCIceCandidateInit, ice_credential_type::RTCIceCredentialType,
ice_server::RTCIceServer,
},
peer_connection::{
configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
RTCPeerConnection,
},
use webrtc::ice_transport::RTCIceTransport;
use webrtc::ice_transport::{
ice_candidate::RTCIceCandidate, ice_gatherer::RTCIceGatherOptions,
ice_parameters::RTCIceParameters,
};
use webrtc::ice_transport::{ice_credential_type::RTCIceCredentialType, ice_server::RTCIceServer};
use crate::{RoleState, Tunnel};
use crate::{device_channel::Device, peer::Peer, peer_handler, ConnectedPeer, RoleState, Tunnel};
mod client;
mod gateway;
@@ -44,7 +42,7 @@ where
pub async fn add_ice_candidate(
&self,
conn_id: TRoleState::Id,
ice_candidate: RTCIceCandidateInit,
ice_candidate: RTCIceCandidate,
) -> Result<()> {
let peer_connection = self
.peer_connections
@@ -52,53 +50,25 @@ where
.get(&conn_id)
.ok_or(Error::ControlProtocolError)?
.clone();
peer_connection.add_ice_candidate(ice_candidate).await?;
peer_connection
.add_remote_candidate(Some(ice_candidate))
.await?;
Ok(())
}
}
pub fn on_peer_connection_state_change_handler<TId>(
conn_id: TId,
stop_command_sender: mpsc::Sender<TId>,
) -> OnPeerConnectionStateChangeHdlrFn
where
TId: Copy + Send + Sync + 'static,
{
Box::new(move |state| {
let mut sender = stop_command_sender.clone();
tracing::trace!(?state, "peer_state_update");
Box::pin(async move {
if state == RTCPeerConnectionState::Failed {
let _ = sender.send(conn_id).await;
}
})
})
}
pub fn on_dc_close_handler<TId>(
conn_id: TId,
stop_command_sender: mpsc::Sender<TId>,
) -> OnCloseHdlrFn
where
TId: Copy + Send + Sync + 'static,
{
Box::new(move || {
let mut sender = stop_command_sender.clone();
tracing::debug!("channel_closed");
Box::pin(async move {
let _ = sender.send(conn_id).await;
})
})
pub(crate) struct IceConnection {
pub ice_params: RTCIceParameters,
pub ice_transport: Arc<RTCIceTransport>,
pub ice_candidate_rx: mpsc::Receiver<RTCIceCandidate>,
}
#[tracing::instrument(level = "trace", skip(webrtc))]
pub async fn new_peer_connection(
pub(crate) async fn new_ice_connection(
webrtc: &webrtc::api::API,
relays: Vec<Relay>,
) -> Result<(Arc<RTCPeerConnection>, mpsc::Receiver<RTCIceCandidateInit>)> {
let config = RTCConfiguration {
) -> Result<IceConnection> {
let config = RTCIceGatherOptions {
ice_servers: relays
.into_iter()
.map(|srv| match srv {
@@ -110,7 +80,6 @@ pub async fn new_peer_connection(
urls: vec![turn.uri],
username: turn.username,
credential: turn.password,
// TODO: check what this is used for
credential_type: RTCIceCredentialType::Password,
},
})
@@ -119,30 +88,71 @@ pub async fn new_peer_connection(
..Default::default()
};
let peer_connection = Arc::new(webrtc.new_peer_connection(config).await?);
let gatherer = Arc::new(webrtc.new_ice_gatherer(config)?);
let ice_transport = Arc::new(webrtc.new_ice_transport(Arc::clone(&gatherer)));
let (ice_candidate_tx, ice_candidate_rx) = mpsc::channel(ICE_CANDIDATE_BUFFER);
peer_connection.on_ice_candidate(Box::new(move |candidate| {
let Some(candidate) = candidate else {
return Box::pin(async {});
};
let mut ice_candidate_tx = ice_candidate_tx.clone();
Box::pin(async move {
let ice_candidate = match candidate.to_json() {
Ok(ice_candidate) => ice_candidate,
Err(e) => {
tracing::warn!("Failed to serialize ICE candidate to JSON: {e}",);
return;
}
gatherer.on_local_candidate({
let gatherer = gatherer.clone();
Box::new(move |candidate| {
let Some(candidate) = candidate else {
gatherer.on_local_candidate(Box::new(|_| Box::pin(async {})));
return Box::pin(async {});
};
if ice_candidate_tx.send(ice_candidate).await.is_err() {
debug_assert!(false, "receiver was dropped before sender")
}
let mut ice_candidate_tx = ice_candidate_tx.clone();
Box::pin(async move {
if ice_candidate_tx.send(candidate).await.is_err() {
debug_assert!(false, "receiver was dropped before sender");
}
})
})
}));
});
Ok((peer_connection, ice_candidate_rx))
gatherer.gather().await?;
Ok(IceConnection {
ice_params: gatherer.get_local_parameters().await?,
ice_transport,
ice_candidate_rx,
})
}
fn insert_peers<TId: Copy>(
peers_by_ip: &mut IpNetworkTable<ConnectedPeer<TId>>,
ips: &Vec<IpNetwork>,
peer: ConnectedPeer<TId>,
) {
for ip in ips {
peers_by_ip.insert(*ip, peer.clone());
}
}
fn start_handlers<TId>(
device: Arc<ArcSwapOption<Device>>,
callbacks: impl Callbacks + 'static,
peer: Arc<Peer<TId>>,
ice: Arc<RTCIceTransport>,
peer_receiver: tokio::sync::mpsc::Receiver<Bytes>,
) where
TId: Copy + Send + Sync + fmt::Debug + 'static,
{
ice.on_connection_state_change(Box::new(|_| Box::pin(async {})));
tokio::spawn({
async move {
// If this fails receiver will be dropped and the connection will expire at some point
// this will not fail though, since this is always called after start ice
let Some(ep) = ice.new_endpoint(Box::new(|_| true)).await else {
return;
};
tokio::spawn(peer_handler::start_peer_handler(
device,
callbacks,
peer,
ep.clone(),
));
tokio::spawn(peer_handler::handle_packet(ep, peer_receiver));
}
});
}

View File

@@ -1,48 +1,55 @@
use std::sync::Arc;
use boringtun::x25519::{PublicKey, StaticSecret};
use boringtun::x25519::PublicKey;
use connlib_shared::{
control::Reference,
messages::{GatewayId, Key, Relay, RequestConnection, ResourceId},
Callbacks,
};
use rand_core::OsRng;
use secrecy::Secret;
use webrtc::{
data_channel::data_channel_init::RTCDataChannelInit,
peer_connection::{
peer_connection_state::RTCPeerConnectionState,
sdp::session_description::RTCSessionDescription, RTCPeerConnection,
},
use webrtc::ice_transport::{
ice_parameters::RTCIceParameters, ice_role::RTCIceRole,
ice_transport_state::RTCIceTransportState, RTCIceTransport,
};
use crate::control_protocol::{
new_peer_connection, on_dc_close_handler, on_peer_connection_state_change_handler,
use crate::{
control_protocol::{new_ice_connection, IceConnection},
PEER_QUEUE_SIZE,
};
use crate::{peer::Peer, ClientState, ConnectedPeer, Error, Request, Result, Tunnel};
#[tracing::instrument(level = "trace", skip(tunnel))]
use super::{insert_peers, start_handlers};
#[tracing::instrument(level = "trace", skip(tunnel, ice))]
fn set_connection_state_update<CB>(
tunnel: &Arc<Tunnel<CB, ClientState>>,
peer_connection: &Arc<RTCPeerConnection>,
ice: &Arc<RTCIceTransport>,
gateway_id: GatewayId,
resource_id: ResourceId,
) where
CB: Callbacks + 'static,
{
let tunnel = Arc::clone(tunnel);
peer_connection.on_peer_connection_state_change(Box::new(
move |state: RTCPeerConnectionState| {
let tunnel = Arc::clone(&tunnel);
Box::pin(async move {
tracing::trace!("peer_state");
if state == RTCPeerConnectionState::Failed {
tunnel.role_state.lock().on_connection_failed(resource_id);
tunnel.peer_connections.lock().remove(&gateway_id);
ice.on_connection_state_change(Box::new(move |state| {
let tunnel = Arc::clone(&tunnel);
tracing::trace!(%state, "peer_state");
Box::pin(async move {
if state == RTCIceTransportState::Failed {
// There's a really unlikely race condition but this line needs to be before on_connection_failed.
// if we clear up the gateway awaiting flag before removing the connection a new connection could be
// established that replaces this one and this line removes it.
let ice = tunnel.peer_connections.lock().remove(&gateway_id);
if let Some(ice) = ice {
if let Err(err) = ice.stop().await {
tracing::warn!(%err, "couldn't stop ice transport: {err:#}");
}
}
})
},
));
tunnel.role_state.lock().on_connection_failed(resource_id);
}
})
}));
}
impl<CB> Tunnel<CB, ClientState>
@@ -84,120 +91,73 @@ where
return Ok(Request::ReuseConnection(connection));
}
let peer_connection = {
let (peer_connection, receiver) = new_peer_connection(&self.webrtc_api, relays).await?;
self.role_state
.lock()
.add_waiting_ice_receiver(gateway_id, receiver);
let peer_connection = Arc::new(peer_connection);
let mut peer_connections = self.peer_connections.lock();
peer_connections.insert(gateway_id, Arc::clone(&peer_connection));
peer_connection
};
let IceConnection {
ice_params,
ice_transport,
ice_candidate_rx,
} = new_ice_connection(&self.webrtc_api, relays).await?;
let preshared_key = self
.role_state
.lock()
.add_waiting_gateway(gateway_id, ice_candidate_rx);
self.peer_connections
.lock()
.insert(gateway_id, Arc::clone(&ice_transport));
set_connection_state_update(self, &peer_connection, gateway_id, resource_id);
let data_channel = peer_connection
.create_data_channel(
"data",
Some(RTCDataChannelInit {
ordered: Some(false),
max_retransmits: Some(0),
..Default::default()
}),
)
.await?;
let d = Arc::clone(&data_channel);
let tunnel = Arc::clone(self);
let preshared_key = StaticSecret::random_from_rng(OsRng);
let p_key = preshared_key.clone();
data_channel.on_open(Box::new(move || {
Box::pin(async move {
tracing::trace!("new_data_channel_opened");
let index = tunnel.next_index();
let peer_config = match tunnel
.role_state
.lock()
.create_peer_config_for_new_connection(resource_id, gateway_id, p_key)
{
Ok(c) => c,
Err(e) => {
tunnel.peer_connections.lock().remove(&gateway_id);
tracing::warn!(err = ?e, "channel_open");
let _ = tunnel.callbacks.on_error(&e);
return;
}
};
d.on_close(on_dc_close_handler(
gateway_id,
tunnel.stop_peer_command_sender.clone(),
));
let d = d.detach().await.expect(
"only fails if not opened or not enabled, both of which are always true for us",
);
let peer = Arc::new(Peer::new(
tunnel.private_key.clone(),
index,
peer_config.clone(),
gateway_id,
None,
tunnel.rate_limiter.clone(),
));
{
let mut peers_by_ip = tunnel.peers_by_ip.write();
for ip in peer_config.ips {
peers_by_ip.insert(
ip,
ConnectedPeer {
inner: peer.clone(),
channel: d.clone(),
},
);
}
tunnel
.role_state
.lock()
.gateway_awaiting_connection
.remove(&gateway_id);
}
if let Some(conn) = tunnel.peer_connections.lock().get(&gateway_id) {
conn.on_peer_connection_state_change(on_peer_connection_state_change_handler(
gateway_id,
tunnel.stop_peer_command_sender.clone(),
));
}
tokio::spawn(tunnel.start_peer_handler(peer, d));
tunnel
.role_state
.lock()
.awaiting_connection
.remove(&resource_id);
})
}));
let offer = peer_connection.create_offer(None).await?;
peer_connection.set_local_description(offer.clone()).await?;
set_connection_state_update(self, &ice_transport, gateway_id, resource_id);
Ok(Request::NewConnection(RequestConnection {
resource_id,
gateway_id,
client_preshared_key: Secret::new(Key(preshared_key.to_bytes())),
client_rtc_session_description: offer,
client_rtc_session_description: ice_params,
}))
}
fn new_tunnel(
&self,
resource_id: ResourceId,
gateway_id: GatewayId,
ice: Arc<RTCIceTransport>,
) -> Result<()> {
let peer_config = self
.role_state
.lock()
.create_peer_config_for_new_connection(resource_id, gateway_id)?;
let peer = Arc::new(Peer::new(
self.private_key.clone(),
self.next_index(),
peer_config.clone(),
gateway_id,
None,
self.rate_limiter.clone(),
));
let (peer_sender, peer_receiver) = tokio::sync::mpsc::channel(PEER_QUEUE_SIZE);
start_handlers(
Arc::clone(&self.device),
self.callbacks.clone(),
peer.clone(),
ice,
peer_receiver,
);
// Partial reads of peers_by_ip can be problematic in the very unlikely case of an expiration
// before inserting finishes.
insert_peers(
&mut self.peers_by_ip.write(),
&peer_config.ips,
ConnectedPeer {
inner: peer,
channel: peer_sender,
},
);
Ok(())
}
/// Called when a response to [Tunnel::request_connection] is ready.
///
/// Once this is called, if everything goes fine, a new tunnel should be started between the 2 peers.
@@ -207,10 +167,10 @@ where
/// - `rtc_sdp`: Remote SDP.
/// - `gateway_public_key`: Public key of the gateway that is handling that resource for this connection.
#[tracing::instrument(level = "trace", skip(self))]
pub async fn received_offer_response(
pub fn received_offer_response(
self: &Arc<Self>,
resource_id: ResourceId,
rtc_sdp: RTCSessionDescription,
rtc_ice_params: RTCIceParameters,
gateway_public_key: PublicKey,
) -> Result<()> {
let gateway_id = self
@@ -224,11 +184,27 @@ where
.get(&gateway_id)
.ok_or(Error::UnknownResource)?
.clone();
peer_connection.set_remote_description(rtc_sdp).await?;
self.role_state
.lock()
.activate_ice_candidate_receiver(gateway_id, gateway_public_key);
let tunnel = self.clone();
// RTCIceTransport::start blocks until there's an ice connection.
tokio::spawn(async move {
if let Err(e) = peer_connection
.start(&rtc_ice_params, Some(RTCIceRole::Controlling))
.await
.map_err(Into::into)
.and_then(|_| tunnel.new_tunnel(resource_id, gateway_id, peer_connection))
{
tracing::warn!(%gateway_id, err = ?e, "Can't start tunnel: {e:#}");
tunnel.role_state.lock().on_connection_failed(resource_id);
let peer_connection = tunnel.peer_connections.lock().remove(&gateway_id);
if let Some(peer_connection) = peer_connection {
let _ = peer_connection.stop().await;
}
}
});
Ok(())
}

View File

@@ -1,16 +1,20 @@
use std::sync::Arc;
use crate::{
control_protocol::{insert_peers, start_handlers},
peer::Peer,
ConnectedPeer, GatewayState, PeerConfig, Tunnel, PEER_QUEUE_SIZE,
};
use chrono::{DateTime, Utc};
use connlib_shared::{
messages::{ClientId, Relay, ResourceDescription},
Callbacks, Error, Result,
};
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use crate::control_protocol::{
new_peer_connection, on_dc_close_handler, on_peer_connection_state_change_handler,
use std::sync::Arc;
use webrtc::ice_transport::{
ice_parameters::RTCIceParameters, ice_role::RTCIceRole, RTCIceTransport,
};
use crate::{peer::Peer, ConnectedPeer, GatewayState, PeerConfig, Tunnel};
use super::{new_ice_connection, IceConnection};
impl<CB> Tunnel<CB, GatewayState>
where
@@ -28,104 +32,46 @@ where
/// - `client_id`: UUID of the remote client.
///
/// # Returns
/// An [RTCSessionDescription] of the local sdp, with candidates gathered.
/// An [RTCIceParameters] of the local sdp, with candidates gathered.
pub async fn set_peer_connection_request(
self: &Arc<Self>,
sdp_session: RTCSessionDescription,
remote_params: RTCIceParameters,
peer: PeerConfig,
relays: Vec<Relay>,
client_id: ClientId,
expires_at: DateTime<Utc>,
resource: ResourceDescription,
) -> Result<RTCSessionDescription> {
let (peer_connection, receiver) = new_peer_connection(&self.webrtc_api, relays).await?;
) -> Result<RTCIceParameters> {
let IceConnection {
ice_params: local_params,
ice_transport: ice,
ice_candidate_rx,
} = new_ice_connection(&self.webrtc_api, relays).await?;
self.role_state
.lock()
.add_new_ice_receiver(client_id, receiver);
.add_new_ice_receiver(client_id, ice_candidate_rx);
let index = self.next_index();
let tunnel = Arc::clone(self);
self.peer_connections
.lock()
.insert(client_id, Arc::clone(&peer_connection));
.insert(client_id, Arc::clone(&ice));
peer_connection.on_peer_connection_state_change(on_peer_connection_state_change_handler(
client_id,
tunnel.stop_peer_command_sender.clone(),
));
let tunnel = self.clone();
tokio::spawn(async move {
if let Err(e) = ice
.start(&remote_params, Some(RTCIceRole::Controlled))
.await
.map_err(Into::into)
.and_then(|_| tunnel.new_tunnel(peer, client_id, resource, expires_at, ice))
{
tracing::warn!(%client_id, err = ?e, "Can't start tunnel: {e:#}");
let peer_connection = tunnel.peer_connections.lock().remove(&client_id);
if let Some(peer_connection) = peer_connection {
let _ = peer_connection.stop().await;
}
}
});
peer_connection.on_data_channel(Box::new(move |d| {
tracing::trace!("new_data_channel");
let data_channel = Arc::clone(&d);
let peer_config = peer.clone();
let tunnel = Arc::clone(&tunnel);
let resource = resource.clone();
Box::pin(async move {
d.on_open(Box::new(move || {
tracing::trace!(?peer_config.ips, "new_data_channel_open");
Box::pin(async move {
{
let Some(device) = tunnel.device.load().clone() else {
let e = Error::NoIface;
tracing::error!(err = ?e, "channel_open");
let _ = tunnel.callbacks().on_error(&e);
return;
};
for &ip in &peer_config.ips {
match device.add_route(ip, tunnel.callbacks()).await {
Ok(maybe_new_device) => {
assert!(maybe_new_device.is_none(), "gateway does not run on android and thus never produces a new device upon `add_route`")
}
Err(e) => {
let _ = tunnel.callbacks.on_error(&e);
}
}
}
}
data_channel
.on_close(on_dc_close_handler(client_id, tunnel.stop_peer_command_sender.clone()));
let data_channel = data_channel.detach().await.expect("only fails if not opened or not enabled, both of which are always true for us");
let peer = Arc::new(Peer::new(
tunnel.private_key.clone(),
index,
peer_config.clone(),
client_id,
Some((resource, expires_at)),
tunnel.rate_limiter.clone()
));
// Holding two mutexes here
{
let mut peers_by_ip = tunnel.peers_by_ip.write();
for ip in peer_config.ips {
peers_by_ip.insert(ip, ConnectedPeer {
inner: peer.clone(),
channel: data_channel.clone(),
});
}
}
tokio::spawn(tunnel.clone().start_peer_handler(peer, data_channel));
})
}))
})
}));
peer_connection.set_remote_description(sdp_session).await?;
// TODO: remove tunnel IP from answer
let answer = peer_connection.create_answer(None).await?;
peer_connection.set_local_description(answer).await?;
let local_desc = peer_connection
.local_description()
.await
.ok_or(Error::ConnectionEstablishError)?;
Ok(local_desc)
Ok(local_params)
}
pub fn allow_access(
@@ -143,4 +89,56 @@ where
peer.inner.add_resource(resource, expires_at);
}
}
fn new_tunnel(
&self,
peer_config: PeerConfig,
client_id: ClientId,
resource: ResourceDescription,
expires_at: DateTime<Utc>,
ice: Arc<RTCIceTransport>,
) -> Result<()> {
tracing::trace!(?peer_config.ips, "new_data_channel_open");
let device = self.device.load().clone().ok_or(Error::NoIface)?;
let callbacks = self.callbacks.clone();
let ips = peer_config.ips.clone();
// Worst thing if this is not run before peers_by_ip is that some packets are lost to the default route
tokio::spawn(async move {
for ip in ips {
if let Ok(res) = device.add_route(ip, &callbacks).await {
assert!(res.is_none(), "gateway does not run on android and thus never produces a new device upon `add_route`");
}
}
});
let peer = Arc::new(Peer::new(
self.private_key.clone(),
self.next_index(),
peer_config.clone(),
client_id,
Some((resource, expires_at)),
self.rate_limiter.clone(),
));
let (peer_sender, peer_receiver) = tokio::sync::mpsc::channel(PEER_QUEUE_SIZE);
start_handlers(
Arc::clone(&self.device),
self.callbacks.clone(),
peer.clone(),
ice,
peer_receiver,
);
insert_peers(
&mut self.peers_by_ip.write(),
&peer_config.ips,
ConnectedPeer {
inner: peer,
channel: peer_sender,
},
);
Ok(())
}
}

View File

@@ -6,7 +6,7 @@ use libc::{
IFF_MULTI_QUEUE, IFF_NO_PI, IFF_TUN, IFNAMSIZ, O_NONBLOCK, O_RDWR,
};
use netlink_packet_route::{rtnl::link::nlas::Nla, RT_SCOPE_UNIVERSE};
use rtnetlink::{new_connection, Handle};
use rtnetlink::{new_connection, Error::NetlinkError, Handle};
use std::{
ffi::{c_int, c_short, c_uchar},
io,
@@ -20,6 +20,7 @@ const TUNSETIFF: u64 = 0x4004_54ca;
const TUN_FILE: &[u8] = b"/dev/net/tun\0";
const RT_PROT_STATIC: u8 = 4;
const DEFAULT_MTU: u32 = 1280;
const FILE_ALREADY_EXISTS: i32 = -17;
#[repr(C)]
union IfrIfru {
@@ -182,35 +183,26 @@ impl IfaceDevice {
.output_interface(self.interface_index)
.protocol(RT_PROT_STATIC)
.scope(RT_SCOPE_UNIVERSE);
match route {
let res = match route {
IpNetwork::V4(ipnet) => {
req.v4()
.destination_prefix(ipnet.network_address(), ipnet.netmask())
.execute()
.await?
.await
}
IpNetwork::V6(ipnet) => {
req.v6()
.destination_prefix(ipnet.network_address(), ipnet.netmask())
.execute()
.await?
.await
}
}
/*
TODO: This works for ignoring the error but the route isn't added afterwards
let's try removing all routes on init for the given interface I think that will work.
match res {
Ok(_)
| Err(rtnetlink::Error::NetlinkError(netlink_packet_core::error::ErrorMessage {
code: NETLINK_ERROR_FILE_EXISTS,
..
})) => Ok(()),
};
match res {
Ok(_) => Ok(None),
Err(NetlinkError(err)) if err.raw_code() == FILE_ALREADY_EXISTS => Ok(None),
Err(err) => Err(err.into()),
}
*/
Ok(None)
}
#[tracing::instrument(level = "trace", skip(self))]

View File

@@ -9,7 +9,7 @@ use futures_bounded::{PushError, StreamMap};
use std::sync::Arc;
use std::task::{ready, Context, Poll};
use std::time::Duration;
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
impl<CB> Tunnel<CB, GatewayState>
where
@@ -37,11 +37,11 @@ where
/// [`Tunnel`] state specific to gateways.
pub struct GatewayState {
candidate_receivers: StreamMap<ClientId, RTCIceCandidateInit>,
pub candidate_receivers: StreamMap<ClientId, RTCIceCandidate>,
}
impl GatewayState {
pub fn add_new_ice_receiver(&mut self, id: ClientId, receiver: Receiver<RTCIceCandidateInit>) {
pub fn add_new_ice_receiver(&mut self, id: ClientId, receiver: Receiver<RTCIceCandidate>) {
match self.candidate_receivers.try_push(id, receiver) {
Ok(()) => {}
Err(PushError::BeyondCapacity(_)) => {
@@ -75,7 +75,7 @@ impl RoleState for GatewayState {
return Poll::Ready(Event::SignalIceCandidate {
conn_id,
candidate: c,
})
});
}
(id, Some(Err(e))) => {
tracing::warn!(gateway_id = %id, "ICE gathering timed out: {e}")

View File

@@ -7,6 +7,7 @@ use boringtun::{
x25519::{PublicKey, StaticSecret},
};
use bytes::Bytes;
use connlib_shared::{messages::Key, CallbackErrorFacade, Callbacks, Error};
use ip_network::IpNetwork;
use ip_network_table::IpNetworkTable;
@@ -22,22 +23,18 @@ use webrtc::{
interceptor_registry::register_default_interceptors, media_engine::MediaEngine,
setting_engine::SettingEngine, APIBuilder, API,
},
ice_transport::{ice_candidate::RTCIceCandidate, RTCIceTransport},
interceptor::registry::Registry,
peer_connection::RTCPeerConnection,
};
use arc_swap::ArcSwapOption;
use futures::channel::mpsc;
use futures_util::task::AtomicWaker;
use futures_util::{SinkExt, StreamExt};
use itertools::Itertools;
use std::collections::VecDeque;
use std::task::{ready, Context, Poll};
use std::{collections::HashMap, fmt, net::IpAddr, sync::Arc, time::Duration};
use std::{collections::HashSet, hash::Hash};
use tokio::time::Interval;
use webrtc::data::data_channel::DataChannel;
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
use connlib_shared::{
messages::{GatewayId, ResourceDescription},
@@ -48,7 +45,6 @@ pub use client::ClientState;
use connlib_shared::error::ConnlibError;
pub use control_protocol::Request;
pub use gateway::GatewayState;
pub use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use crate::ip_packet::MutableIpPacket;
use connlib_shared::messages::{ClientId, SecretKey};
@@ -69,6 +65,11 @@ mod resource_table;
const MAX_UDP_SIZE: usize = (1 << 16) - 1;
const DNS_QUERIES_QUEUE_SIZE: usize = 100;
// Why do we need such big channel? I have not the slightless idea
// but if we make it smaller things get quite slower.
// Since eventually we will have a UDP socket with try_send
// I don't think there's a point to having this.
const PEER_QUEUE_SIZE: usize = 1_000;
/// For how long we will attempt to gather ICE candidates before aborting.
///
@@ -114,16 +115,13 @@ pub struct Tunnel<CB: Callbacks, TRoleState: RoleState> {
public_key: PublicKey,
#[allow(clippy::type_complexity)]
peers_by_ip: RwLock<IpNetworkTable<ConnectedPeer<TRoleState::Id>>>,
peer_connections: Mutex<HashMap<TRoleState::Id, Arc<RTCPeerConnection>>>,
peer_connections: Mutex<HashMap<TRoleState::Id, Arc<RTCIceTransport>>>,
webrtc_api: API,
callbacks: CallbackErrorFacade<CB>,
/// State that differs per role, i.e. clients vs gateways.
role_state: Mutex<TRoleState>,
stop_peer_command_receiver: Mutex<mpsc::Receiver<TRoleState::Id>>,
stop_peer_command_sender: mpsc::Sender<TRoleState::Id>,
rate_limit_reset_interval: Mutex<Interval>,
peer_refresh_interval: Mutex<Interval>,
mtu_refresh_interval: Mutex<Interval>,
@@ -189,6 +187,8 @@ where
return Poll::Ready(Ok(None));
};
tracing::trace!(target: "wire", action = "read", from = "device", dest = %packet.destination());
let mut role_state = self.role_state.lock();
let packet = match role_state.handle_dns(packet) {
@@ -267,9 +267,10 @@ where
}
}
#[derive(Clone)]
pub struct ConnectedPeer<TId> {
inner: Arc<Peer<TId>>,
channel: Arc<DataChannel>,
channel: tokio::sync::mpsc::Sender<Bytes>,
}
// TODO: For now we only use these fields with debug
@@ -314,16 +315,13 @@ where
continue;
};
let peer = peers.remove(peer_to_remove).expect("just found it");
peers.remove(peer_to_remove);
let channel = peer.channel.clone();
tokio::spawn(async move { channel.close().await });
if let Some(conn) = self.peer_connections.lock().remove(&conn_id) {
tokio::spawn({
let callbacks = self.callbacks.clone();
async move {
if let Err(e) = conn.close().await {
if let Err(e) = conn.stop().await {
tracing::warn!(%conn_id, error = ?e, "Can't close peer");
let _ = callbacks.on_error(&e.into());
}
@@ -373,19 +371,11 @@ where
}
};
let callbacks = self.callbacks.clone();
let peer_channel = peer.channel.clone();
let mut stop_command_sender = self.stop_peer_command_sender.clone();
tokio::spawn(async move {
if let Err(e) = peer_channel.write(&bytes).await {
let err = e.into();
tracing::error!("Failed to send packet to peer: {err:?}");
let _ = callbacks.on_error(&err);
if err.is_fatal_connection_error() {
let _ = stop_command_sender.send(conn_id).await;
}
if let Err(e) = peer_channel.send(bytes).await {
tracing::error!("Failed to send packet to peer: {e:#}");
}
});
}
@@ -415,14 +405,6 @@ where
return Poll::Ready(event);
}
if let Poll::Ready(Some(conn_id)) =
self.stop_peer_command_receiver.lock().poll_next_unpin(cx)
{
self.peers_to_stop.lock().push_back(conn_id);
continue;
}
return Poll::Pending;
}
}
@@ -436,20 +418,13 @@ where
) {
let peer_id = peer.inner.conn_id;
match peer.inner.encapsulate(packet, dest, write_buf) {
match peer.inner.encapsulate(packet, write_buf) {
Ok(None) => {}
Ok(Some(b)) => {
tokio::spawn({
let channel = peer.channel.clone();
let mut sender = self.stop_peer_command_sender.clone();
async move {
if let Err(e) = channel.write(&b).await {
tracing::error!(resource_address = %dest, err = ?e, "failed to handle packet {e:#}");
let _ = sender.send(peer_id).await;
}
}
});
tracing::trace!(target: "wire", action = "writing", to = "peer", %dest);
if peer.channel.try_send(b).is_err() {
tracing::warn!(target: "wire", action = "dropped", to = "peer", %dest);
}
}
Err(e) => {
tracing::error!(resource_address = %dest, err = ?e, "failed to handle packet {e:#}");
@@ -500,7 +475,7 @@ impl<'a> DnsQuery<'a> {
pub enum Event<TId> {
SignalIceCandidate {
conn_id: TId,
candidate: RTCIceCandidateInit,
candidate: RTCIceCandidate,
},
ConnectionIntent {
resource: ResourceDescription,
@@ -537,7 +512,6 @@ where
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut media_engine)?;
let mut setting_engine = SettingEngine::default();
setting_engine.detach_data_channels();
setting_engine.set_interface_filter(Box::new(|name| !name.contains("tun")));
let webrtc_api = APIBuilder::new()
@@ -546,8 +520,6 @@ where
.with_setting_engine(setting_engine)
.build();
let (stop_peer_command_sender, stop_peer_command_receiver) = mpsc::channel(10);
Ok(Self {
rate_limiter,
private_key,
@@ -561,8 +533,6 @@ where
write_buf: Mutex::new(Box::new([0u8; MAX_UDP_SIZE])),
callbacks: CallbackErrorFacade(callbacks),
role_state: Default::default(),
stop_peer_command_receiver: Mutex::new(stop_peer_command_receiver),
stop_peer_command_sender,
rate_limit_reset_interval: Mutex::new(rate_limit_reset_interval()),
peer_refresh_interval: Mutex::new(peer_refresh_interval()),
mtu_refresh_interval: Mutex::new(mtu_refresh_interval()),

View File

@@ -91,8 +91,7 @@ where
peer_config.persistent_keepalive,
index,
Some(rate_limiter),
)
.expect("never actually fails"); // See https://github.com/cloudflare/boringtun/pull/366.
);
let mut allowed_ips = IpNetworkTable::new();
for ip in peer_config.ips {
@@ -186,7 +185,6 @@ where
pub(crate) fn encapsulate(
&self,
mut packet: MutableIpPacket,
dest: IpAddr,
buf: &mut [u8],
) -> Result<Option<Bytes>> {
if let Some(resource) = self.get_translation(packet.to_immutable().source()) {
@@ -211,8 +209,6 @@ where
_ => panic!("Unexpected result from `encapsulate`"),
};
tracing::trace!(target: "wire", action = "writing", from = "iface", to = %dest);
Ok(Some(Bytes::copy_from_slice(packet)))
}

View File

@@ -1,60 +1,52 @@
use std::future::Future;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
use bytes::Bytes;
use connlib_shared::Callbacks;
use futures_util::SinkExt;
use webrtc::data::data_channel::DataChannel;
use webrtc::mux::endpoint::Endpoint;
use webrtc::util::Conn;
use crate::device_channel::Device;
use crate::peer::WriteTo;
use crate::{peer::Peer, RoleState, Tunnel, MAX_UDP_SIZE};
use crate::{peer::Peer, MAX_UDP_SIZE};
impl<CB, TRoleState> Tunnel<CB, TRoleState>
where
CB: Callbacks + 'static,
TRoleState: RoleState,
pub(crate) async fn start_peer_handler<TId>(
device: Arc<ArcSwapOption<Device>>,
callbacks: impl Callbacks + 'static,
peer: Arc<Peer<TId>>,
channel: Arc<Endpoint>,
) where
TId: Copy + fmt::Debug + Send + Sync + 'static,
{
pub(crate) fn start_peer_handler(
&self,
peer: Arc<Peer<TRoleState::Id>>,
channel: Arc<DataChannel>,
) -> impl Future<Output = ()> + Send + 'static {
let device = Arc::clone(&self.device);
let callbacks = self.callbacks.clone();
let mut sender = self.stop_peer_command_sender.clone();
loop {
let Some(device) = device.load().clone() else {
tracing::debug!("Device temporarily not available");
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
};
let result = peer_handler(&callbacks, &peer, channel.clone(), &device).await;
async move {
loop {
let Some(device) = device.load().clone() else {
tracing::debug!("Device temporarily not available");
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
};
let result = peer_handler(&callbacks, &peer, channel.clone(), &device).await;
if matches!(result, Err(ref err) if err.raw_os_error() == Some(9)) {
tracing::warn!("bad_file_descriptor");
continue;
}
if let Err(e) = result {
tracing::error!(err = ?e, "peer_handle_error");
}
break;
}
tracing::debug!(peer = ?peer.stats(), "peer_stopped");
let _ = sender.send(peer.conn_id).await;
if matches!(result, Err(ref err) if err.raw_os_error() == Some(9)) {
tracing::warn!("bad_file_descriptor");
continue;
}
if let Err(e) = result {
tracing::error!(err = ?e, "peer_handle_error");
}
break;
}
tracing::debug!(peer = ?peer.stats(), "peer_stopped");
}
async fn peer_handler<TId>(
callbacks: &impl Callbacks,
peer: &Arc<Peer<TId>>,
channel: Arc<DataChannel>,
channel: Arc<Endpoint>,
device: &Device,
) -> std::io::Result<()>
where
@@ -62,7 +54,7 @@ where
{
let mut src_buf = [0u8; MAX_UDP_SIZE];
let mut dst_buf = [0u8; MAX_UDP_SIZE];
while let Ok(size) = channel.read(&mut src_buf[..]).await {
while let Ok(size) = channel.recv(&mut src_buf[..]).await {
tracing::trace!(target: "wire", action = "read", bytes = size, from = "peer");
// TODO: Double check that this can only happen on closed channel
@@ -78,9 +70,8 @@ where
match peer.decapsulate(src, &mut dst_buf) {
Ok(Some(WriteTo::Network(bytes))) => {
for packet in bytes {
if let Err(e) = channel.write(&packet).await {
if let Err(e) = channel.send(&packet).await {
tracing::error!("Couldn't send packet to connected peer: {e}");
let _ = callbacks.on_error(&e.into());
}
}
}
@@ -97,3 +88,20 @@ where
Ok(())
}
pub(crate) async fn handle_packet(
ep: Arc<Endpoint>,
mut receiver: tokio::sync::mpsc::Receiver<Bytes>,
) {
while let Some(packet) = receiver.recv().await {
if ep.send(&packet).await.is_err() {
tracing::warn!(target: "wire", action = "dropped", "endpoint failure");
}
}
if ep.close().await.is_err() {
tracing::warn!("failed to close endpoint");
}
tracing::trace!("closed endpoint");
}

View File

@@ -11,5 +11,6 @@ ip_network = "0.4"
url = { version = "2.3.1", default-features = false }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing = { workspace = true }
tracing-log = "0.2"
clap = { version = "4.4", features = ["derive", "env"] }
ctrlc = "3.4"

View File

@@ -1,4 +1,5 @@
use clap::Args;
use tracing_log::LogTracer;
use tracing_subscriber::{
fmt, prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Layer, Registry,
};
@@ -19,6 +20,7 @@ where
.with(additional_layer.with_filter(EnvFilter::from_default_env()))
.with(fmt::layer().with_filter(EnvFilter::from_default_env()));
tracing::subscriber::set_global_default(subscriber).expect("Could not set global default");
LogTracer::init().unwrap();
}
/// Arguments common to all Firezone CLI components.

View File

@@ -21,7 +21,7 @@ firezone-cli-utils = { workspace = true }
phoenix-channel = { workspace = true }
secrecy = { workspace = true }
serde = { version = "1.0", default-features = false, features = ["std", "derive"] }
tokio = { version = "1.33", default-features = false, features = ["sync", "macros"] }
tokio = { version = "1.33", default-features = false, features = ["sync", "macros", "rt-multi-thread"] }
tokio-tungstenite = { version = "0.20", default-features = false, features = ["connect", "handshake", "rustls-tls-webpki-roots"] }
tracing = { workspace = true }
tracing-subscriber = "0.3.17"

View File

@@ -12,7 +12,7 @@ use std::convert::Infallible;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::ice_transport::ice_parameters::RTCIceParameters;
pub const PHOENIX_TOPIC: &str = "gateway";
@@ -22,7 +22,7 @@ pub struct Eventloop {
// TODO: Strongly type request reference (currently `String`)
connection_request_tasks:
futures_bounded::FuturesMap<(ClientId, String), Result<RTCSessionDescription, Error>>,
futures_bounded::FuturesMap<(ClientId, String), Result<RTCIceParameters, Error>>,
add_ice_candidate_tasks: futures_bounded::FuturesSet<Result<(), Error>>,
print_stats_timer: tokio::time::Interval,
@@ -155,7 +155,7 @@ impl Eventloop {
..
}) => {
for candidate in candidates {
tracing::debug!(client = %client_id, candidate = %candidate.candidate, "Adding ICE candidate from client");
tracing::debug!(client = %client_id, %candidate, "Adding ICE candidate from client");
let tunnel = Arc::clone(&self.tunnel);
if self
@@ -178,7 +178,7 @@ impl Eventloop {
conn_id: client,
candidate,
}) => {
tracing::debug!(%client, candidate = %candidate.candidate, "Sending ICE candidate to client");
tracing::debug!(%client, %candidate, "Sending ICE candidate to client");
let _id = self.portal.send(
PHOENIX_TOPIC,

View File

@@ -4,9 +4,8 @@ use chrono::{serde::ts_seconds, DateTime, Utc};
use connlib_shared::messages::{
ActorId, ClientId, Interface, Peer, Relay, ResourceDescription, ResourceId,
};
use firezone_tunnel::RTCSessionDescription;
use serde::{Deserialize, Serialize};
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
use webrtc::ice_transport::{ice_candidate::RTCIceCandidate, ice_parameters::RTCIceParameters};
// TODO: Should this have a resource?
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize, Clone)]
@@ -24,7 +23,7 @@ pub struct Actor {
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Client {
pub id: ClientId,
pub rtc_session_description: RTCSessionDescription,
pub rtc_session_description: RTCIceParameters,
pub peer: Peer,
}
@@ -100,7 +99,7 @@ pub struct BroadcastClientIceCandidates {
/// Client's id the ice candidates are meant for
pub client_ids: Vec<ClientId>,
/// Actual RTC ice candidates
pub candidates: Vec<RTCIceCandidateInit>,
pub candidates: Vec<RTCIceCandidate>,
}
/// A client's ice candidate message.
@@ -109,7 +108,7 @@ pub struct ClientIceCandidates {
/// Client's id the ice candidates came from
pub client_id: ClientId,
/// Actual RTC ice candidates
pub candidates: Vec<RTCIceCandidateInit>,
pub candidates: Vec<RTCIceCandidate>,
}
// These messages can be sent from a gateway
@@ -128,7 +127,7 @@ pub enum EgressMessages {
pub struct ConnectionReady {
#[serde(rename = "ref")]
pub reference: String,
pub gateway_rtc_session_description: RTCSessionDescription,
pub gateway_rtc_session_description: RTCIceParameters,
}
#[cfg(test)]
@@ -155,8 +154,9 @@ mod test {
"preshared_key": "sMeTuiJ3mezfpVdan948CmisIWbwBZ1z7jBNnbVtfVg="
},
"rtc_session_description": {
"sdp": "v=0\r\no=- 8696424395893049643 650344226 IN IP4 0.0.0.0\r\ns=-\r\nt=0 0\r\na=fingerprint:sha-256 AF:57:6F:03:CA:BD:0E:6E:F0:26:BA:B4:36:FE:2E:48:2D:FA:B7:39:84:BA:9E:FB:3F:DC:1F:46:ED:18:01:40\r\na=group:BUNDLE 0\r\nm=application 9 UDP/DTLS/SCTP webrtc-datachannel\r\nc=IN IP4 0.0.0.0\r\na=setup:actpass\r\na=mid:0\r\na=sendrecv\r\na=sctp-port:5000\r\na=ice-ufrag:KOLSoUEJdNfpgLoM\r\na=ice-pwd:WvOTEYbBZwpRgERbKVjkPGsGwZsUoyKQ\r\na=candidate:312688668 1 udp 2130706431 172.28.0.100 46924 typ host\r\na=candidate:312688668 2 udp 2130706431 172.28.0.100 46924 typ host\r\na=candidate:1090862588 1 udp 2130706431 100.114.114.30 32969 typ host\r\na=candidate:1090862588 2 udp 2130706431 100.114.114.30 32969 typ host\r\na=candidate:2835903154 1 udp 1694498815 172.28.0.100 59817 typ srflx raddr 0.0.0.0 rport 59817\r\na=candidate:2835903154 2 udp 1694498815 172.28.0.100 59817 typ srflx raddr 0.0.0.0 rport 59817\r\na=candidate:2835903154 1 udp 1694498815 172.28.0.100 45350 typ srflx raddr 0.0.0.0 rport 45350\r\na=candidate:2835903154 2 udp 1694498815 172.28.0.100 45350 typ srflx raddr 0.0.0.0 rport 45350\r\na=candidate:167090039 1 udp 2130706431 :: 55852 typ host\r\na=candidate:167090039 2 udp 2130706431 :: 55852 typ host\r\na=end-of-candidates\r\n",
"type": "offer"
"ice_lite":false,
"password": "xEwoXEzHuSyrcgOCSRnwOXQVnbnbeGeF",
"username_fragment": "PvCPFevCOgkvVCtH"
}
},
"resource": {