|
3 | 3 |
|
4 | 4 | //! Native |
5 | 5 |
|
6 | | -#[cfg(feature = "socks")] |
7 | 6 | use std::net::SocketAddr; |
8 | 7 | #[cfg(feature = "tor")] |
9 | 8 | use std::path::PathBuf; |
| 9 | +use std::time::Duration; |
10 | 10 |
|
11 | 11 | #[cfg(feature = "tor")] |
12 | 12 | use arti_client::DataStream; |
13 | 13 | use tokio::io::{AsyncRead, AsyncWrite}; |
14 | | -#[cfg(feature = "socks")] |
15 | 14 | use tokio::net::TcpStream; |
16 | 15 | use tokio_tungstenite::tungstenite::protocol::Role; |
17 | 16 | pub use tokio_tungstenite::tungstenite::Message; |
@@ -40,13 +39,156 @@ pub async fn connect(url: &Url, mode: &ConnectionMode) -> Result<WebSocket, Erro |
40 | 39 | } |
41 | 40 | } |
42 | 41 |
|
| 42 | +/// Happy Eyeballs connection delay (RFC 8305). |
| 43 | +const HAPPY_EYEBALLS_DELAY: Duration = Duration::from_millis(250); |
| 44 | + |
43 | 45 | async fn connect_direct(url: &Url) -> Result<WebSocket, Error> { |
| 46 | + let host: &str = url.host_str().ok_or_else(Error::empty_host)?; |
| 47 | + let port: u16 = url |
| 48 | + .port_or_known_default() |
| 49 | + .ok_or_else(Error::invalid_port)?; |
| 50 | + |
| 51 | + let tcp_stream = happy_eyeballs_connect(host, port).await?; |
| 52 | + |
44 | 53 | // NOT REMOVE `Box::pin`! |
45 | 54 | // Use `Box::pin` to fix stack overflow on windows targets due to large `Future` |
46 | | - let (stream, _) = Box::pin(tokio_tungstenite::connect_async(url.as_str())).await?; |
| 55 | + let (stream, _) = Box::pin(tokio_tungstenite::client_async_tls( |
| 56 | + url.as_str(), |
| 57 | + tcp_stream, |
| 58 | + )) |
| 59 | + .await?; |
47 | 60 | Ok(WebSocket::tokio(Box::new(stream))) |
48 | 61 | } |
49 | 62 |
|
| 63 | +/// Connect to a host using the Happy Eyeballs algorithm (RFC 8305). |
| 64 | +/// |
| 65 | +/// When DNS returns both IPv6 and IPv4 addresses, tries the preferred family |
| 66 | +/// first and starts the other family after a 250ms delay if the first hasn't |
| 67 | +/// connected yet. Uses whichever connection succeeds first. |
| 68 | +async fn happy_eyeballs_connect(host: &str, port: u16) -> Result<TcpStream, Error> { |
| 69 | + let addrs: Vec<SocketAddr> = tokio::net::lookup_host(format!("{host}:{port}")) |
| 70 | + .await? |
| 71 | + .collect(); |
| 72 | + |
| 73 | + if addrs.is_empty() { |
| 74 | + return Err(std::io::Error::new( |
| 75 | + std::io::ErrorKind::AddrNotAvailable, |
| 76 | + "DNS resolution returned no addresses", |
| 77 | + ) |
| 78 | + .into()); |
| 79 | + } |
| 80 | + |
| 81 | + // Separate into IPv6 and IPv4, preserving order within each group |
| 82 | + let mut ipv6: Vec<SocketAddr> = Vec::new(); |
| 83 | + let mut ipv4: Vec<SocketAddr> = Vec::new(); |
| 84 | + for addr in addrs { |
| 85 | + if addr.is_ipv6() { |
| 86 | + ipv6.push(addr); |
| 87 | + } else { |
| 88 | + ipv4.push(addr); |
| 89 | + } |
| 90 | + } |
| 91 | + |
| 92 | + // If only one family, try addresses sequentially |
| 93 | + if ipv4.is_empty() { |
| 94 | + return try_addrs_sequential(&ipv6).await; |
| 95 | + } |
| 96 | + if ipv6.is_empty() { |
| 97 | + return try_addrs_sequential(&ipv4).await; |
| 98 | + } |
| 99 | + |
| 100 | + // Both families available: Happy Eyeballs |
| 101 | + // Try first IPv6 address, after delay start first IPv4 in parallel |
| 102 | + let ipv6_first = ipv6[0]; |
| 103 | + let ipv4_first = ipv4[0]; |
| 104 | + |
| 105 | + // Pin the IPv6 future so it survives across select boundaries |
| 106 | + let ipv6_fut = TcpStream::connect(ipv6_first); |
| 107 | + tokio::pin!(ipv6_fut); |
| 108 | + |
| 109 | + // Phase 1: Give IPv6 a 250ms head start |
| 110 | + tokio::select! { |
| 111 | + result = &mut ipv6_fut => { |
| 112 | + match result { |
| 113 | + Ok(stream) => return Ok(stream), |
| 114 | + // IPv6 failed fast, try IPv4 directly |
| 115 | + Err(_) => return try_addrs_sequential(&ipv4).await, |
| 116 | + } |
| 117 | + } |
| 118 | + _ = tokio::time::sleep(HAPPY_EYEBALLS_DELAY) => { |
| 119 | + // Timer fired, IPv6 still pending. Start IPv4 and race both. |
| 120 | + } |
| 121 | + } |
| 122 | + |
| 123 | + // Phase 2: Race the still-pending IPv6 against a new IPv4 attempt. |
| 124 | + // Use a loop so that if one fails, we keep waiting for the other. |
| 125 | + let ipv4_fut = TcpStream::connect(ipv4_first); |
| 126 | + tokio::pin!(ipv4_fut); |
| 127 | + |
| 128 | + let mut ipv6_done = false; |
| 129 | + let mut ipv4_done = false; |
| 130 | + |
| 131 | + loop { |
| 132 | + tokio::select! { |
| 133 | + result = &mut ipv6_fut, if !ipv6_done => { |
| 134 | + match result { |
| 135 | + Ok(stream) => return Ok(stream), |
| 136 | + Err(_) => { ipv6_done = true; } |
| 137 | + } |
| 138 | + } |
| 139 | + result = &mut ipv4_fut, if !ipv4_done => { |
| 140 | + match result { |
| 141 | + Ok(stream) => return Ok(stream), |
| 142 | + Err(_) => { ipv4_done = true; } |
| 143 | + } |
| 144 | + } |
| 145 | + } |
| 146 | + if ipv6_done && ipv4_done { |
| 147 | + break; |
| 148 | + } |
| 149 | + } |
| 150 | + |
| 151 | + // Both initial attempts failed, try remaining addresses sequentially |
| 152 | + // Interleave remaining IPv4 and IPv6 per RFC 8305 |
| 153 | + let mut remaining = Vec::new(); |
| 154 | + let ipv4_remaining = ipv4.iter().skip(1); |
| 155 | + let ipv6_remaining = ipv6.iter().skip(1); |
| 156 | + |
| 157 | + let mut ipv4_iter = ipv4_remaining.peekable(); |
| 158 | + let mut ipv6_iter = ipv6_remaining.peekable(); |
| 159 | + |
| 160 | + // Interleave: take one from ipv4, then one from ipv6, alternating |
| 161 | + while ipv4_iter.peek().is_some() || ipv6_iter.peek().is_some() { |
| 162 | + if let Some(addr) = ipv4_iter.next() { |
| 163 | + remaining.push(*addr); |
| 164 | + } |
| 165 | + if let Some(addr) = ipv6_iter.next() { |
| 166 | + remaining.push(*addr); |
| 167 | + } |
| 168 | + } |
| 169 | + |
| 170 | + try_addrs_sequential(&remaining).await |
| 171 | +} |
| 172 | + |
| 173 | +/// Try connecting to addresses sequentially, returning the first success. |
| 174 | +async fn try_addrs_sequential(addrs: &[SocketAddr]) -> Result<TcpStream, Error> { |
| 175 | + let mut last_err = None; |
| 176 | + for addr in addrs { |
| 177 | + match TcpStream::connect(addr).await { |
| 178 | + Ok(stream) => return Ok(stream), |
| 179 | + Err(e) => last_err = Some(e), |
| 180 | + } |
| 181 | + } |
| 182 | + Err(last_err |
| 183 | + .unwrap_or_else(|| { |
| 184 | + std::io::Error::new( |
| 185 | + std::io::ErrorKind::AddrNotAvailable, |
| 186 | + "no addresses to connect to", |
| 187 | + ) |
| 188 | + }) |
| 189 | + .into()) |
| 190 | +} |
| 191 | + |
50 | 192 | #[cfg(feature = "socks")] |
51 | 193 | async fn connect_proxy(url: &Url, proxy: SocketAddr) -> Result<WebSocket, Error> { |
52 | 194 | let host: &str = url.host_str().ok_or_else(Error::empty_host)?; |
|
0 commit comments