├── autobahn ├── .gitignore ├── fuzzingserver.json ├── fuzzingclient.json ├── server-test.js └── Makefile ├── rust-toolchain ├── fuzz ├── .gitignore ├── .rustfmt.toml ├── fuzz_targets │ ├── unmask.rs │ ├── parse_frame.rs │ └── parse_frame_fragment.rs ├── Cargo.toml └── Cargo.lock ├── .rustfmt.toml ├── .gitignore ├── .cargo └── config.toml ├── tests ├── ui.rs ├── ui │ ├── 01-send-executor.rs │ └── 02-!send-executor.rs ├── upgrade.rs ├── concurrency.rs └── split.rs ├── benches ├── README.md ├── unmask.rs ├── Makefile ├── run.js ├── 100-20-chart.svg ├── load_test.c ├── 200-16384-chart.svg └── 500-16384-chart.svg ├── .github └── workflows │ └── rust.yml ├── examples ├── axum.rs ├── localhost.crt ├── echo_server.rs ├── echo_server_split.rs ├── localhost.key ├── autobahn_client.rs ├── tls_client.rs └── tls_server.rs ├── src ├── error.rs ├── handshake.rs ├── mask.rs ├── close.rs ├── upgrade.rs ├── fragment.rs ├── frame.rs └── lib.rs ├── Cargo.toml ├── README.md └── LICENSE /autobahn/.gitignore: -------------------------------------------------------------------------------- 1 | reports/ -------------------------------------------------------------------------------- /rust-toolchain: -------------------------------------------------------------------------------- 1 | 1.76.0 2 | -------------------------------------------------------------------------------- /fuzz/.gitignore: -------------------------------------------------------------------------------- 1 | target 2 | corpus 3 | artifacts 4 | coverage 5 | -------------------------------------------------------------------------------- /.rustfmt.toml: -------------------------------------------------------------------------------- 1 | max_width = 80 2 | tab_spaces = 2 3 | edition = "2021" 4 | -------------------------------------------------------------------------------- /fuzz/.rustfmt.toml: -------------------------------------------------------------------------------- 1 | max_width = 80 2 | tab_spaces = 2 3 | edition = "2021" -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- 1 | /target 2 | *.txt 3 | *.o 4 | .DS_Store 5 | benches/load_test 6 | -------------------------------------------------------------------------------- /.cargo/config.toml: -------------------------------------------------------------------------------- 1 | [target.x86_64-unknown-linux-gnu] 2 | rustflags = ["-C", "target-cpu=native"] 3 | -------------------------------------------------------------------------------- /tests/ui.rs: -------------------------------------------------------------------------------- 1 | #[test] 2 | fn ui() { 3 | let t = trybuild::TestCases::new(); 4 | t.pass("tests/ui/01-send-executor.rs"); 5 | t.pass("tests/ui/02-!send-executor.rs"); 6 | } 7 | -------------------------------------------------------------------------------- /fuzz/fuzz_targets/unmask.rs: -------------------------------------------------------------------------------- 1 | #![no_main] 2 | 3 | use libfuzzer_sys::fuzz_target; 4 | 5 | fuzz_target!(|data: &[u8]| { 6 | let mut data = data.to_vec(); 7 | fastwebsockets::unmask(&mut data, [1, 2, 3, 4]); 8 | }); -------------------------------------------------------------------------------- /autobahn/fuzzingserver.json: -------------------------------------------------------------------------------- 1 | { 2 | "url": "ws://127.0.0.1:9001", 3 | "outdir": "./reports/client", 4 | "cases": ["1.*", "2.*", "3.*", "4.*", "5.*", "6.*", "7.*", "9.*", "10.*"], 5 | "exclude-cases": [], 6 | "exclude-agent-cases": {} 7 | } -------------------------------------------------------------------------------- /autobahn/fuzzingclient.json: -------------------------------------------------------------------------------- 1 | { 2 | "outdir": "./reports/servers", 3 | "servers": [ 4 | { 5 | "agent": "fastwebsockets", 6 | "url": "ws://localhost:8080" 7 | } 8 | ], 9 | "cases": [ 10 | "1.*", 11 | "2.*", 12 | "3.*", 13 | "4.*", 14 | "5.*", 15 | "6.*", 16 | "7.*", 17 | "9.*", 18 | "10.*" 19 | ], 20 | "exclude-cases": [ 21 | "11.*", 22 | "12.*", 23 | "13.*" 24 | ], 25 | "exclude-agent-cases": {} 26 | } 27 | -------------------------------------------------------------------------------- /benches/README.md: -------------------------------------------------------------------------------- 1 | ### Plaintext, echo WebSocket server benchmark 2 | 3 | ![](./200-16384-chart.svg) 4 | 5 | ![](./500-16384-chart.svg) 6 | 7 | ![](./100-20-chart.svg) 8 | 9 | Y-axis is number of messages sent per sec. (size per message = payload size) 10 | 11 | ``` 12 | Linux divy 5.19.0-1022-gcp #24~22.04.1-Ubuntu SMP x86_64 GNU/Linux 13 | 14 | 32GiB System memory 15 | Intel(R) Xeon(R) CPU @ 3.10GHz 16 | 17 | fastwebsockets 0.4.2 18 | rust-websocket 0.26.5 19 | uWebSockets (main d043038) 20 | tokio-tungstenite 0.18.0 21 | ``` 22 | -------------------------------------------------------------------------------- /benches/unmask.rs: -------------------------------------------------------------------------------- 1 | use criterion::*; 2 | 3 | fn benchmark(c: &mut Criterion) { 4 | const STREAM_SIZE: usize = 64 << 20; 5 | 6 | let mut data: Vec = (0..STREAM_SIZE).map(|_| rand::random()).collect(); 7 | let mut group = c.benchmark_group("unmask2"); 8 | group.throughput(Throughput::Bytes(STREAM_SIZE as u64)); 9 | group.bench_function("unmask 64 << 20", |b| { 10 | b.iter(|| { 11 | fastwebsockets::unmask(black_box(&mut data), [1, 2, 3, 4]); 12 | }); 13 | }); 14 | group.finish(); 15 | } 16 | 17 | criterion_group!(benches, benchmark); 18 | criterion_main!(benches); 19 | -------------------------------------------------------------------------------- /benches/Makefile: -------------------------------------------------------------------------------- 1 | PATH_TO_USOCKETS=../../uWebSockets/uSockets/src 2 | 3 | # brew link --force openssl 4 | # /opt/homebrew/opt/openssl@3/ 5 | OPENSSL_FLAGS=-L/opt/homebrew/opt/openssl@3/lib -I/opt/homebrew/opt/openssl@3/include 6 | 7 | OTHER_LIBS=`ls ../../uWebSockets/benchmarks/*.o | grep -E "bsd|context|loop|quic|socket|udp"` 8 | default: 9 | clang -flto -O3 -DLIBUS_USE_OPENSSL $(OPENSSL_FLAGS) -I$(PATH_TO_USOCKETS) $(PATH_TO_USOCKETS)/eventing/*.c $(PATH_TO_USOCKETS)/crypto/*.c load_test.c -c 10 | clang++ -flto -O3 -DLIBUS_USE_OPENSSL $(OPENSSL_FLAGS) -I$(PATH_TO_USOCKETS) $(PATH_TO_USOCKETS)/crypto/*.cpp -c -std=c++17 11 | 12 | clang++ -flto -O3 -DLIBUS_USE_OPENSSL $(OPENSSL_FLAGS) `ls *.o` $(OTHER_LIBS) -lssl -lcrypto -o load_test 13 | -------------------------------------------------------------------------------- /fuzz/Cargo.toml: -------------------------------------------------------------------------------- 1 | [package] 2 | name = "fastwebsockets-fuzz" 3 | version = "0.0.0" 4 | publish = false 5 | edition = "2021" 6 | 7 | [package.metadata] 8 | cargo-fuzz = true 9 | 10 | [dependencies] 11 | libfuzzer-sys = "0.4" 12 | tokio = { version = "1.0", features = ["full"] } 13 | futures = "0.3.27" 14 | 15 | [dependencies.fastwebsockets] 16 | path = ".." 17 | 18 | # Prevent this from interfering with workspaces 19 | [workspace] 20 | members = ["."] 21 | 22 | [profile.release] 23 | debug = 1 24 | 25 | [[bin]] 26 | name = "parse_frame" 27 | path = "fuzz_targets/parse_frame.rs" 28 | test = false 29 | doc = false 30 | 31 | [[bin]] 32 | name = "parse_frame_fragment" 33 | path = "fuzz_targets/parse_frame_fragment.rs" 34 | test = false 35 | doc = false 36 | 37 | [[bin]] 38 | name = "unmask" 39 | path = "fuzz_targets/unmask.rs" 40 | test = false 41 | doc = false 42 | -------------------------------------------------------------------------------- /.github/workflows/rust.yml: -------------------------------------------------------------------------------- 1 | name: Rust 2 | 3 | on: [push, pull_request] 4 | 5 | env: 6 | CARGO_TERM_COLOR: always 7 | 8 | jobs: 9 | build: 10 | 11 | strategy: 12 | matrix: 13 | os: [ubuntu-latest, macos-latest, windows-latest] 14 | runs-on: ${{ matrix.os }} 15 | 16 | steps: 17 | - uses: actions/checkout@v3 18 | - name: Install latest nightly 19 | uses: actions-rs/toolchain@v1 20 | with: 21 | toolchain: 1.76.0 22 | override: true 23 | components: rustfmt 24 | - uses: denoland/setup-deno@v1 25 | with: 26 | deno-version: v1.x 27 | - name: Build 28 | run: cargo build --verbose --all-features --all-targets 29 | - name: Test 30 | run: cargo test --verbose --all-features 31 | - name: Check formatting 32 | run: cargo fmt -- --check --verbose 33 | - name: Autobahn|Testsuite 34 | if: matrix.os == 'ubuntu-latest' 35 | run: | 36 | cargo build --verbose --release --all-features --example echo_server 37 | deno run -A --unstable autobahn/server-test.js 38 | -------------------------------------------------------------------------------- /autobahn/server-test.js: -------------------------------------------------------------------------------- 1 | import { $ } from "https://deno.land/x/dax/mod.ts"; 2 | 3 | const pwd = new URL(".", import.meta.url).pathname; 4 | 5 | const AUTOBAHN_TESTSUITE_DOCKER = 6 | "crossbario/autobahn-testsuite:0.8.2@sha256:5d4ba3aa7d6ab2fdbf6606f3f4ecbe4b66f205ce1cbc176d6cdf650157e52242"; 7 | 8 | const server = $`target/release/examples/echo_server`.spawn(); 9 | await $`docker run --name fuzzingserver -v ${pwd}/fuzzingclient.json:/fuzzingclient.json:ro -v ${pwd}/reports:/reports -p 9001:9001 --net=host --rm ${AUTOBAHN_TESTSUITE_DOCKER} wstest -m fuzzingclient -s fuzzingclient.json`.cwd(pwd); 10 | 11 | const { fastwebsockets } = JSON.parse( 12 | Deno.readTextFileSync("./autobahn/reports/servers/index.json"), 13 | ); 14 | const result = Object.values(fastwebsockets); 15 | 16 | function failed(name) { 17 | return name != "OK" && name != "INFORMATIONAL" && name != "NON-STRICT"; 18 | } 19 | 20 | const failedtests = result.filter((outcome) => failed(outcome.behavior)); 21 | 22 | console.log( 23 | `%c${result.length - failedtests.length} / ${result.length} tests OK`, 24 | `color: ${failedtests.length == 0 ? "green" : "red"}`, 25 | ); 26 | 27 | Deno.exit(failedtests.length == 0 ? 0 : 1); 28 | -------------------------------------------------------------------------------- /examples/axum.rs: -------------------------------------------------------------------------------- 1 | use axum::{response::IntoResponse, routing::get, Router}; 2 | use fastwebsockets::upgrade; 3 | use fastwebsockets::OpCode; 4 | use fastwebsockets::WebSocketError; 5 | 6 | #[tokio::main] 7 | async fn main() { 8 | let app = Router::new().route("/", get(ws_handler)); 9 | 10 | let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); 11 | axum::serve(listener, app).await.unwrap(); 12 | } 13 | 14 | async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> { 15 | let mut ws = fastwebsockets::FragmentCollector::new(fut.await?); 16 | 17 | loop { 18 | let frame = ws.read_frame().await?; 19 | match frame.opcode { 20 | OpCode::Close => break, 21 | OpCode::Text | OpCode::Binary => { 22 | ws.write_frame(frame).await?; 23 | } 24 | _ => {} 25 | } 26 | } 27 | 28 | Ok(()) 29 | } 30 | 31 | async fn ws_handler(ws: upgrade::IncomingUpgrade) -> impl IntoResponse { 32 | let (response, fut) = ws.upgrade().unwrap(); 33 | tokio::task::spawn(async move { 34 | if let Err(e) = handle_client(fut).await { 35 | eprintln!("Error in websocket connection: {}", e); 36 | } 37 | }); 38 | response 39 | } 40 | -------------------------------------------------------------------------------- /autobahn/Makefile: -------------------------------------------------------------------------------- 1 | AUTOBAHN_TESTSUITE_DOCKER := crossbario/autobahn-testsuite:0.8.2@sha256:5d4ba3aa7d6ab2fdbf6606f3f4ecbe4b66f205ce1cbc176d6cdf650157e52242 2 | 3 | build-server: 4 | sudo cargo build --release --example echo_server --features "upgrade" 5 | 6 | run-server: build-server 7 | echo ${PWD} 8 | docker run -d \ 9 | --name fuzzingserver \ 10 | -u `id -u`:`id -g` \ 11 | -v ${PWD}/fuzzingclient.json:/fuzzingclient.json:ro \ 12 | -v ${PWD}/reports:/reports \ 13 | -p 9001:9001 \ 14 | --net=host \ 15 | --rm \ 16 | $(AUTOBAHN_TESTSUITE_DOCKER) \ 17 | wstest -m fuzzingclient -s fuzzingclient.json 18 | ../target/release/examples/echo_server 19 | 20 | build-client: 21 | sudo cargo build --release --example autobahn_client --features "upgrade" 22 | 23 | run-client: build-client 24 | echo ${PWD} 25 | docker run -d \ 26 | --name fuzzingserver \ 27 | -u `id -u`:`id -g` \ 28 | -v ${PWD}/fuzzingserver.json:/fuzzingserver.json:ro \ 29 | -v ${PWD}/reports:/reports \ 30 | -p 9001:9001 \ 31 | --rm \ 32 | $(AUTOBAHN_TESTSUITE_DOCKER) \ 33 | wstest -m fuzzingserver -s fuzzingserver.json 34 | sleep 5 35 | ../target/release/examples/autobahn_client 36 | 37 | .PHONY: build-server run-server build-client run-client -------------------------------------------------------------------------------- /tests/ui/01-send-executor.rs: -------------------------------------------------------------------------------- 1 | use fastwebsockets::WebSocket; 2 | use hyper::header::CONNECTION; 3 | use hyper::header::UPGRADE; 4 | use hyper::upgrade::Upgraded; 5 | use hyper::body::Bytes; 6 | use http_body_util::Empty; 7 | use hyper::Request; 8 | use std::future::Future; 9 | use tokio::net::TcpStream; 10 | use hyper_util::rt::tokio::TokioIo; 11 | use anyhow::Result; 12 | 13 | struct SpawnExecutor; 14 | 15 | impl hyper::rt::Executor for SpawnExecutor 16 | where 17 | Fut: Future + Send + 'static, 18 | Fut::Output: Send + 'static, 19 | { 20 | fn execute(&self, fut: Fut) { 21 | tokio::task::spawn(fut); 22 | } 23 | } 24 | 25 | async fn connect( 26 | path: &str, 27 | ) -> Result>> { 28 | let stream = TcpStream::connect("localhost:9001").await?; 29 | 30 | let req = Request::builder() 31 | .method("GET") 32 | .uri(format!("http://localhost:9001/{}", path)) 33 | .header("Host", "localhost:9001") 34 | .header(UPGRADE, "websocket") 35 | .header(CONNECTION, "upgrade") 36 | .header( 37 | "Sec-WebSocket-Key", 38 | fastwebsockets::handshake::generate_key(), 39 | ) 40 | .header("Sec-WebSocket-Version", "13") 41 | .body(Empty::::new())?; 42 | 43 | let (ws, _) = 44 | fastwebsockets::handshake::client(&SpawnExecutor, req, stream).await?; 45 | Ok(ws) 46 | } 47 | 48 | fn main() {} 49 | -------------------------------------------------------------------------------- /tests/ui/02-!send-executor.rs: -------------------------------------------------------------------------------- 1 | use fastwebsockets::WebSocket; 2 | use hyper::header::CONNECTION; 3 | use hyper::header::UPGRADE; 4 | use hyper::upgrade::Upgraded; 5 | use hyper::body::Bytes; 6 | use http_body_util::Empty; 7 | use hyper::Request; 8 | use std::future::Future; 9 | use tokio::net::TcpStream; 10 | use hyper_util::rt::tokio::TokioIo; 11 | use anyhow::Result; 12 | 13 | struct SpawnLocalExecutor; 14 | 15 | impl hyper::rt::Executor for SpawnLocalExecutor 16 | where 17 | Fut: Future + 'static, 18 | Fut::Output: 'static, 19 | { 20 | fn execute(&self, fut: Fut) { 21 | tokio::task::spawn_local(fut); 22 | } 23 | } 24 | 25 | async fn connect( 26 | path: &str, 27 | ) -> Result>> { 28 | let stream = TcpStream::connect("localhost:9001").await?; 29 | 30 | let req = Request::builder() 31 | .method("GET") 32 | .uri(format!("http://localhost:9001/{}", path)) 33 | .header("Host", "localhost:9001") 34 | .header(UPGRADE, "websocket") 35 | .header(CONNECTION, "upgrade") 36 | .header( 37 | "Sec-WebSocket-Key", 38 | fastwebsockets::handshake::generate_key(), 39 | ) 40 | .header("Sec-WebSocket-Version", "13") 41 | .body(Empty::::new())?; 42 | 43 | let (ws, _) = 44 | fastwebsockets::handshake::client(&SpawnLocalExecutor, req, stream).await?; 45 | Ok(ws) 46 | } 47 | 48 | fn main() {} 49 | -------------------------------------------------------------------------------- /src/error.rs: -------------------------------------------------------------------------------- 1 | use thiserror::Error; 2 | 3 | #[derive(Error, Debug)] 4 | pub enum WebSocketError { 5 | #[error("Invalid fragment")] 6 | InvalidFragment, 7 | #[error("Invalid UTF-8")] 8 | InvalidUTF8, 9 | #[error("Invalid continuation frame")] 10 | InvalidContinuationFrame, 11 | #[error("Invalid status code: {0}")] 12 | InvalidStatusCode(u16), 13 | #[error("Invalid upgrade header")] 14 | InvalidUpgradeHeader, 15 | #[error("Invalid connection header")] 16 | InvalidConnectionHeader, 17 | #[error("Connection is closed")] 18 | ConnectionClosed, 19 | #[error("Invalid close frame")] 20 | InvalidCloseFrame, 21 | #[error("Invalid close code")] 22 | InvalidCloseCode, 23 | #[error("Unexpected EOF")] 24 | UnexpectedEOF, 25 | #[error("Reserved bits are not zero")] 26 | ReservedBitsNotZero, 27 | #[error("Control frame must not be fragmented")] 28 | ControlFrameFragmented, 29 | #[error("Ping frame too large")] 30 | PingFrameTooLarge, 31 | #[error("Frame too large")] 32 | FrameTooLarge, 33 | #[error("Sec-Websocket-Version must be 13")] 34 | InvalidSecWebsocketVersion, 35 | #[error("Invalid value")] 36 | InvalidValue, 37 | #[error("Sec-WebSocket-Key header is missing")] 38 | MissingSecWebSocketKey, 39 | #[error(transparent)] 40 | IoError(#[from] std::io::Error), 41 | #[cfg(feature = "upgrade")] 42 | #[error(transparent)] 43 | HTTPError(#[from] hyper::Error), 44 | #[cfg(feature = "unstable-split")] 45 | #[error("Failed to send frame")] 46 | SendError(#[from] Box), 47 | } 48 | -------------------------------------------------------------------------------- /examples/localhost.crt: -------------------------------------------------------------------------------- 1 | -----BEGIN CERTIFICATE----- 2 | MIIEtDCCApwCCQDtm3HlNW4u5TANBgkqhkiG9w0BAQsFADAcMQswCQYDVQQGEwJJ 3 | TjENMAsGA1UEAwwERGl2eTAeFw0yMzA0MjIxMjAzMjZaFw0yODA0MjAxMjAzMjZa 4 | MBwxCzAJBgNVBAYTAklOMQ0wCwYDVQQDDAREaXZ5MIICIjANBgkqhkiG9w0BAQEF 5 | AAOCAg8AMIICCgKCAgEAvqQG1wPW3F53JjydkfDJSnHMJYtvqjsVIHbJWVV/Aes8 6 | OKp/JvdpzlP8YRLu6KI/mutya6iuGt+xHLXJdRJYAThoke5QML27s9raxOfl3+wO 7 | AwUtGYP9G0KcwVFVbxOD/edJ84NSwSL6o0MqfiHReydi7Gc6xyRa6R8PPpJ2ckWV 8 | nx8r/m/LCG5TxAPCU1GbGx3sWhvDJyzL7Yj/X2y7wqIVsJy/lMz765ND01LtvmlJ 9 | IG7N9hnmcoVgxCxrWmBQ+x4YIAJx7OWcs/vuvSjsxJuxlRl+YeiZqilvm5u3Fopn 10 | x5xzE1oN+vBU5ncDVqfoidsh5w7BkPHgHbZWE7Ba1wlp9mJqBMBe1ko9/xVJjmlb 11 | ot+EinTDYGxhUfngh7tGt45bJjNHFINPf3WSCRPUancF/lJjHoTvlVAGYMZUMwNz 12 | ENo+chYCg2Bb5c8+/OuYgtfqtSCttdw+Eo0V49zue/i7leGD1IQ+pgskdGvCa1UG 13 | bwHkSbC61U9tDwHyjju8oi0wMEYsVBMyjy25wuS/iYCte5J1pfrCIuziR/xAiYfF 14 | +oC0Hd828Tujtbii5YtXXr3Bjb+A78lnkadecXUBfIe8yqtPkgiMOPSWUM0KEuJr 15 | EYvnZX4wdhfz9AD0NmgZrIvTlXE0s0hjVHvBzgJLzmIBHLMHGcZVeDoCe9oPF0sC 16 | AwEAATANBgkqhkiG9w0BAQsFAAOCAgEAAAQNpXGAqng6YNy1AOVsPSXecVrcqxfp 17 | l28SSfDzrvhtLs5FaK9MxER5Lt3c4Fmlq+fC5mg7eAHQcVQvpvnCufMDH17yZF04 18 | X/EzIniQ4fVkhyWtukDm18orpCQfDIyJ0iRSsFDzDyi/UZFIsPkD0lumNjGZY4m/ 19 | VoELlwIAeoDgDplSDBqJ2N0dgxZYUKDjqS0hLBnp8RfETLTbXtpQCVN3Q35gJApB 20 | gGRtwOKYf5GZPIcp0iDNumRLPLtqYanT/cD8nd54Bil13925l5dqy0/ozfm2I+NT 21 | TYAd3b2q8Mexs/rJD8naCE7BM+zfbUkoUPOy1Q1y9A/5CfhjCAdJlnDnK0B+isW8 22 | HNl9U4pySDQRg66oUUZDboRGh+G7qQuPA2ewAz42KvtfS3XX6zaYxpQlrrjut7db 23 | Df0y7fYumdmQtqZQJ2MtJHI9pZQ3+zxGq5RN/xZNh53XPIurCiqBypfHj4nSFNIq 24 | VADjJEITr+oiFabDjp5jiwoewtEGCdT0PuzaY/iADvlxTOMdy6AUdRTkhLob930F 25 | 1QtKU45rwHTbaPxLdjvnKMI2ElwqVyFS5H5YNgM2xSWkRMmqPlvihh3a7M+Ux/Ri 26 | C878EKTdkCNTXUpCCJhMGhrXTzYkJ5G+Nh9ERcTGuLkw6uzkUdbAmYgOn2GN3xvl 27 | Q26ks4m/6Fs= 28 | -----END CERTIFICATE----- 29 | -------------------------------------------------------------------------------- /fuzz/fuzz_targets/parse_frame.rs: -------------------------------------------------------------------------------- 1 | #![no_main] 2 | 3 | use libfuzzer_sys::fuzz_target; 4 | 5 | use tokio::sync::oneshot::Sender; 6 | use tokio::sync::oneshot; 7 | use std::pin::Pin; 8 | use std::task::{Context, Poll}; 9 | 10 | #[derive(Debug)] 11 | struct ArbitraryByteStream { 12 | data: Vec, 13 | tx: Option>, 14 | } 15 | 16 | impl ArbitraryByteStream { 17 | fn new(data: Vec, tx: Sender<()>) -> Self { 18 | Self { 19 | data, 20 | tx: Some(tx), 21 | } 22 | } 23 | } 24 | 25 | impl tokio::io::AsyncRead for ArbitraryByteStream { 26 | fn poll_read( 27 | self: Pin<&mut Self>, 28 | _cx: &mut Context<'_>, 29 | buf: &mut tokio::io::ReadBuf<'_>, 30 | ) -> Poll> { 31 | let this = self.get_mut(); 32 | let len = std::cmp::min(buf.remaining(), this.data.len()); 33 | let data = this.data.drain(..len).collect::>(); 34 | buf.put_slice(&data); 35 | 36 | if this.data.is_empty() { 37 | if let Some(tx) = this.tx.take() { 38 | let _ = tx.send(()).unwrap(); 39 | } 40 | 41 | return Poll::Pending; 42 | } 43 | 44 | Poll::Ready(Ok(())) 45 | } 46 | } 47 | 48 | impl tokio::io::AsyncWrite for ArbitraryByteStream { 49 | fn poll_write( 50 | self: Pin<&mut Self>, 51 | _cx: &mut Context<'_>, 52 | buf: &[u8], 53 | ) -> Poll> { 54 | Poll::Ready(Ok(buf.len())) 55 | } 56 | 57 | fn poll_flush( 58 | self: std::pin::Pin<&mut Self>, 59 | _cx: &mut Context<'_>, 60 | ) -> Poll> { 61 | Poll::Ready(Ok(())) 62 | } 63 | 64 | fn poll_shutdown( 65 | self: std::pin::Pin<&mut Self>, 66 | _cx: &mut Context<'_>, 67 | ) -> Poll> { 68 | Poll::Ready(Ok(())) 69 | } 70 | } 71 | 72 | fuzz_target!(|data: &[u8]| { 73 | let (tx, rx) = oneshot::channel(); 74 | let stream = ArbitraryByteStream::new(data.to_vec(), tx); 75 | 76 | let mut ws = fastwebsockets::WebSocket::after_handshake(stream, fastwebsockets::Role::Server); 77 | ws.set_writev(false); 78 | ws.set_auto_close(true); 79 | ws.set_auto_pong(true); 80 | ws.set_max_message_size(u16::MAX as usize); 81 | 82 | futures::executor::block_on(async move { 83 | tokio::select! { 84 | // We've read all the data, so we're done 85 | _ = rx => {} 86 | // We've read a frame, so we're done 87 | _ = ws.read_frame() => {} 88 | } 89 | }); 90 | }); 91 | -------------------------------------------------------------------------------- /fuzz/fuzz_targets/parse_frame_fragment.rs: -------------------------------------------------------------------------------- 1 | #![no_main] 2 | 3 | use libfuzzer_sys::fuzz_target; 4 | 5 | use tokio::sync::oneshot::Sender; 6 | use tokio::sync::oneshot; 7 | use std::pin::Pin; 8 | use std::task::{Context, Poll}; 9 | 10 | // Copy pasta from fuzz/parse_frame.rs 11 | 12 | #[derive(Debug)] 13 | struct ArbitraryByteStream { 14 | data: Vec, 15 | tx: Option>, 16 | } 17 | 18 | impl ArbitraryByteStream { 19 | fn new(data: Vec, tx: Sender<()>) -> Self { 20 | Self { 21 | data, 22 | tx: Some(tx), 23 | } 24 | } 25 | } 26 | 27 | impl tokio::io::AsyncRead for ArbitraryByteStream { 28 | fn poll_read( 29 | self: Pin<&mut Self>, 30 | _cx: &mut Context<'_>, 31 | buf: &mut tokio::io::ReadBuf<'_>, 32 | ) -> Poll> { 33 | let this = self.get_mut(); 34 | let len = std::cmp::min(buf.remaining(), this.data.len()); 35 | let data = this.data.drain(..len).collect::>(); 36 | buf.put_slice(&data); 37 | 38 | if this.data.is_empty() { 39 | if let Some(tx) = this.tx.take() { 40 | let _ = tx.send(()).unwrap(); 41 | } 42 | 43 | return Poll::Pending; 44 | } 45 | 46 | Poll::Ready(Ok(())) 47 | } 48 | } 49 | 50 | impl tokio::io::AsyncWrite for ArbitraryByteStream { 51 | fn poll_write( 52 | self: Pin<&mut Self>, 53 | _cx: &mut Context<'_>, 54 | buf: &[u8], 55 | ) -> Poll> { 56 | Poll::Ready(Ok(buf.len())) 57 | } 58 | 59 | fn poll_flush( 60 | self: std::pin::Pin<&mut Self>, 61 | _cx: &mut Context<'_>, 62 | ) -> Poll> { 63 | Poll::Ready(Ok(())) 64 | } 65 | 66 | fn poll_shutdown( 67 | self: std::pin::Pin<&mut Self>, 68 | _cx: &mut Context<'_>, 69 | ) -> Poll> { 70 | Poll::Ready(Ok(())) 71 | } 72 | } 73 | 74 | fuzz_target!(|data: &[u8]| { 75 | let (tx, rx) = oneshot::channel(); 76 | let stream = ArbitraryByteStream::new(data.to_vec(), tx); 77 | 78 | let mut ws = fastwebsockets::WebSocket::after_handshake(stream, fastwebsockets::Role::Server); 79 | ws.set_writev(false); 80 | ws.set_auto_close(true); 81 | ws.set_auto_pong(true); 82 | ws.set_max_message_size(u16::MAX as usize); 83 | 84 | let mut ws = fastwebsockets::FragmentCollector::new(ws); 85 | 86 | futures::executor::block_on(async move { 87 | tokio::select! { 88 | // We've read all the data, so we're done 89 | _ = rx => {} 90 | // We've read a frame, so we're done 91 | _ = ws.read_frame() => {} 92 | } 93 | }); 94 | }); 95 | -------------------------------------------------------------------------------- /examples/echo_server.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | use fastwebsockets::upgrade; 16 | use fastwebsockets::OpCode; 17 | use fastwebsockets::WebSocketError; 18 | use http_body_util::Empty; 19 | use hyper::body::Bytes; 20 | use hyper::body::Incoming; 21 | use hyper::server::conn::http1; 22 | use hyper::service::service_fn; 23 | use hyper::Request; 24 | use hyper::Response; 25 | use tokio::net::TcpListener; 26 | 27 | async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> { 28 | let mut ws = fastwebsockets::FragmentCollector::new(fut.await?); 29 | 30 | loop { 31 | let frame = ws.read_frame().await?; 32 | match frame.opcode { 33 | OpCode::Close => break, 34 | OpCode::Text | OpCode::Binary => { 35 | ws.write_frame(frame).await?; 36 | } 37 | _ => {} 38 | } 39 | } 40 | 41 | Ok(()) 42 | } 43 | async fn server_upgrade( 44 | mut req: Request, 45 | ) -> Result>, WebSocketError> { 46 | let (response, fut) = upgrade::upgrade(&mut req)?; 47 | 48 | tokio::task::spawn(async move { 49 | if let Err(e) = tokio::task::unconstrained(handle_client(fut)).await { 50 | eprintln!("Error in websocket connection: {}", e); 51 | } 52 | }); 53 | 54 | Ok(response) 55 | } 56 | 57 | fn main() -> Result<(), WebSocketError> { 58 | let rt = tokio::runtime::Builder::new_current_thread() 59 | .enable_io() 60 | .build() 61 | .unwrap(); 62 | 63 | rt.block_on(async move { 64 | let listener = TcpListener::bind("127.0.0.1:8080").await?; 65 | println!("Server started, listening on {}", "127.0.0.1:8080"); 66 | loop { 67 | let (stream, _) = listener.accept().await?; 68 | println!("Client connected"); 69 | tokio::spawn(async move { 70 | let io = hyper_util::rt::TokioIo::new(stream); 71 | let conn_fut = http1::Builder::new() 72 | .serve_connection(io, service_fn(server_upgrade)) 73 | .with_upgrades(); 74 | if let Err(e) = conn_fut.await { 75 | println!("An error occurred: {:?}", e); 76 | } 77 | }); 78 | } 79 | }) 80 | } 81 | -------------------------------------------------------------------------------- /examples/echo_server_split.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | use fastwebsockets::upgrade; 16 | use fastwebsockets::FragmentCollectorRead; 17 | use fastwebsockets::OpCode; 18 | use fastwebsockets::WebSocketError; 19 | use http_body_util::Empty; 20 | use hyper::body::Bytes; 21 | use hyper::body::Incoming; 22 | use hyper::server::conn::http1; 23 | use hyper::service::service_fn; 24 | use hyper::Request; 25 | use hyper::Response; 26 | use tokio::net::TcpListener; 27 | 28 | async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> { 29 | let ws = fut.await?; 30 | let (rx, mut tx) = ws.split(tokio::io::split); 31 | let mut rx = FragmentCollectorRead::new(rx); 32 | loop { 33 | // Empty send_fn is fine because the benchmark does not create obligated writes. 34 | let frame = rx 35 | .read_frame::<_, WebSocketError>(&mut move |_| async { 36 | unreachable!(); 37 | }) 38 | .await?; 39 | match frame.opcode { 40 | OpCode::Close => break, 41 | OpCode::Text | OpCode::Binary => { 42 | tx.write_frame(frame).await?; 43 | } 44 | _ => {} 45 | } 46 | } 47 | 48 | Ok(()) 49 | } 50 | async fn server_upgrade( 51 | mut req: Request, 52 | ) -> Result>, WebSocketError> { 53 | let (response, fut) = upgrade::upgrade(&mut req)?; 54 | 55 | tokio::task::spawn(async move { 56 | if let Err(e) = tokio::task::unconstrained(handle_client(fut)).await { 57 | eprintln!("Error in websocket connection: {}", e); 58 | } 59 | }); 60 | 61 | Ok(response) 62 | } 63 | 64 | fn main() -> Result<(), WebSocketError> { 65 | let rt = tokio::runtime::Builder::new_current_thread() 66 | .enable_io() 67 | .build() 68 | .unwrap(); 69 | 70 | rt.block_on(async move { 71 | let listener = TcpListener::bind("127.0.0.1:8080").await?; 72 | println!("Server started, listening on {}", "127.0.0.1:8080"); 73 | loop { 74 | let (stream, _) = listener.accept().await?; 75 | println!("Client connected"); 76 | tokio::spawn(async move { 77 | let io = hyper_util::rt::TokioIo::new(stream); 78 | let conn_fut = http1::Builder::new() 79 | .serve_connection(io, service_fn(server_upgrade)) 80 | .with_upgrades(); 81 | if let Err(e) = conn_fut.await { 82 | println!("An error occurred: {:?}", e); 83 | } 84 | }); 85 | } 86 | }) 87 | } 88 | -------------------------------------------------------------------------------- /examples/localhost.key: -------------------------------------------------------------------------------- 1 | -----BEGIN PRIVATE KEY----- 2 | MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQC+pAbXA9bcXncm 3 | PJ2R8MlKccwli2+qOxUgdslZVX8B6zw4qn8m92nOU/xhEu7ooj+a63JrqK4a37Ec 4 | tcl1ElgBOGiR7lAwvbuz2trE5+Xf7A4DBS0Zg/0bQpzBUVVvE4P950nzg1LBIvqj 5 | Qyp+IdF7J2LsZzrHJFrpHw8+knZyRZWfHyv+b8sIblPEA8JTUZsbHexaG8MnLMvt 6 | iP9fbLvCohWwnL+UzPvrk0PTUu2+aUkgbs32GeZyhWDELGtaYFD7HhggAnHs5Zyz 7 | ++69KOzEm7GVGX5h6JmqKW+bm7cWimfHnHMTWg368FTmdwNWp+iJ2yHnDsGQ8eAd 8 | tlYTsFrXCWn2YmoEwF7WSj3/FUmOaVui34SKdMNgbGFR+eCHu0a3jlsmM0cUg09/ 9 | dZIJE9RqdwX+UmMehO+VUAZgxlQzA3MQ2j5yFgKDYFvlzz7865iC1+q1IK213D4S 10 | jRXj3O57+LuV4YPUhD6mCyR0a8JrVQZvAeRJsLrVT20PAfKOO7yiLTAwRixUEzKP 11 | LbnC5L+JgK17knWl+sIi7OJH/ECJh8X6gLQd3zbxO6O1uKLli1devcGNv4DvyWeR 12 | p15xdQF8h7zKq0+SCIw49JZQzQoS4msRi+dlfjB2F/P0APQ2aBmsi9OVcTSzSGNU 13 | e8HOAkvOYgEcswcZxlV4OgJ72g8XSwIDAQABAoICACH5GxrwHTcSQot22+GpFkYE 14 | 94ttSM3+T2qEoKch3EtcP1Qd1iD8kEdrohsug5LDbzBNawuSeMxjNq3WG3uYdERr 15 | Z/8xh+rXtP59LuVOKiH4cBrLrljQs6dK/KJauy3bPXde40fZDENM13uGuajWn/0h 16 | bLiSQOBCM0098rqE4UTF7772kCF8jKMI/jZ9MQEmFs0DTR5VujZd/k1rT48S0ncB 17 | 6XmaxW1gBjjZ+olLSwDWxGhaNqv3u6CG8lKjU9I8PdIyb7wsk17TIFTWvZnKFD+J 18 | O2FFtMb/63pufewuGLeUnJ/u2ncFYl5ou8iCRv8HVyJSAb2qXIZXBEhnOPmzQMyn 19 | +NEkX/3w8LpWhbd0RCsYeIQqYWTBX97kUXgbDtMCOCPH/DdDcJBQWRm5TUJt062K 20 | dDOwqg1jWOQ8F8nr3OwJ9E7NPoBQO5zWVfG6i3opxcFhoWGdTF0oIEhT42aX20ip 21 | AqQGwrW72j+qZdLt8nHK38kvnq8RinS7I2mw8QVhGFqN84x2WIUSiny9+lVD4lfT 22 | ckcSblKL9BpjYVkDvQ3s6BS92RCYqFaBwES4s5oFd2V+Ffrz3cCqp59k/5bZ6x7h 23 | ia+Hw2/Mmtp4TaAVcGrHOvAmnZcnfV9jvsdRzBeihom/gtnVrA8ctsipxhE+Ylkx 24 | q4g/xHHuBpRLMxusD3cBAoIBAQD8fR27xKLYw6aCgLW5pYMDUp8YaFW3uhsA3VPu 25 | UlErdF7QJwGHq7e4eiWp9W1lsc7Wq99PnetRxrs1HJNjExVcLSpCOdXOA0BGO8hI 26 | VgnmFr2doAmjLAXZzSDWnt7jl2FP4Casyv01/EmtpfFdnsXtdJa4s8ITZ5k9cRaV 27 | z28YkpMSXkoY83E4YOK2TrwzvgK7mwFjjV1x3xZZu6QXeMppCGxYQoCooNUvO6uO 28 | r4npHXJVBUdz+mMxci08v0sqhmakk4YRPq0A9A5b55zlSZwwE51d8wSgwfqdFygx 29 | EULkLCs881tM5jfIXcnHSxOQ23jd7bUHy8Hdll1U741AYXPLAoIBAQDBSrnORhzQ 30 | urN33OUvdYBIQT8pFDPNC7hoEPhxrk1V3N47Kvmxgc88pzrscqNI0e93XixRqjc5 31 | bCeR07qWl/xWlD8/MVu2MGHxtdjoXEDUtynUoiVe6D1hea4hcF4BkBWAhLKIz81R 32 | 5fU9p/RzZmWy8Fbc+ZV8GvX64LS/orWGmQvJx9Q0byZui6JUUrChRrYZWeFX+ehB 33 | 5Y/5BHsOL15HntqBfF3v6zK7vzQ03Aqd9vWxR3xUlbpUvxiax5Kg+fuiksmwNKIh 34 | P3/nhoP3LBtBZUx4h/Jdt0e/NFHtDXdIbxaaHO/jbTfy1tg4/wCp9OneMZF2kMVj 35 | PpU7wetwr3qBAoIBAFOEV0d63Zrx7KwSQworc1CwDawXJvNk/fWlQFP+qpbDIXGc 36 | 1Wa5KEY/MSIs6ojO7eoYY/+D7wjXwajp0N7euxwIXIgXdV91t9cDg1ZaD2AqeYIg 37 | I8/ziePndEtJtdR2iFvRezmA04z97Kkh0Nr03+eRvyFNZI7in8+xDpVzTf5EzZ0v 38 | zza9n9/UPGmtVZeP7Ht95FG3uwclkdEQvlB9RgbEIIJ5TPF6ccnz5OWHrwiLEvyI 39 | iIAWfKUobUpAxG5GksExgxFFOBiuoelIjZ9SX/WPJ2iiMA+02l8H/+VrHkM3UP4S 40 | SUsAg8clLs9bSBeMYUiXjmALyA6x5CFqM8Dt+00CggEAPnVgDvh27Te3MF8vq6ND 41 | XZW/zA1cI8DKyM3bChjxonIpWWMspiA1D/tVvfvZKXm08JR8q7Ld/28kZinNnEXm 42 | Yy+qNEhFw1xk+c7yFTtiM5owKSZv/vf6hZnlG6cMqWKeoBXA/xZu2Sz+jvrLsdJ/ 43 | wE+LMgJwPFcV7whXP6lbEPA5b+1jc8IK4CO8w5SowKRxyUVS3LPDSi/c0vGQtee2 44 | hlwdbUP7ssAEd8h0HTSRNbQMdkmMMmTjfej2EWW1ytCccE8QXyDS1v2G3hCIagFV 45 | mU8bY8NCHOhRhcZpRrlYNw62dfwtxAaR0qV73wb/duvN+l94CqEDN2uMm2+xHYuG 46 | gQKCAQEA76Lce+5FCR31hD3Gm0/RbA4jFDzkV5dY8JxOwzZHGzOUJpVd2sZ3a+rQ 47 | BIGC5EiSh9cdIKPqfuxjf+ufzhrA1rzxJVLPWHiUT+37ionXsUpIQQLeeaEpeHFb 48 | Dqg+vu2y+Fg9vYDXTKVZWXADp9kH+KtgpvrcaL2k4UkY2q+jKVLvTt+ezwWTWZFF 49 | QSFSMpTiAAo/kSEryG9DGnyvC5UZsgKsV9eQe7rkMg8p6TjFANcx6oDR6M6fchtn 50 | YmrKkFivZU2bhmGM1HJCIcmAIXtqsf6gb8CqqqX0NQb5m23OJU3NC7N9g34ofhCm 51 | GPx3/+N92+2q031KtpGtHOvcSrHFMA== 52 | -----END PRIVATE KEY----- 53 | -------------------------------------------------------------------------------- /examples/autobahn_client.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | use std::future::Future; 16 | 17 | use anyhow::Result; 18 | use bytes::Bytes; 19 | use fastwebsockets::FragmentCollector; 20 | use fastwebsockets::Frame; 21 | use fastwebsockets::OpCode; 22 | use http_body_util::Empty; 23 | use hyper::header::CONNECTION; 24 | use hyper::header::UPGRADE; 25 | use hyper::upgrade::Upgraded; 26 | use hyper::Request; 27 | use hyper_util::rt::TokioIo; 28 | use tokio::net::TcpStream; 29 | 30 | struct SpawnExecutor; 31 | 32 | impl hyper::rt::Executor for SpawnExecutor 33 | where 34 | Fut: Future + Send + 'static, 35 | Fut::Output: Send + 'static, 36 | { 37 | fn execute(&self, fut: Fut) { 38 | tokio::task::spawn(fut); 39 | } 40 | } 41 | 42 | async fn connect(path: &str) -> Result>> { 43 | let stream = TcpStream::connect("localhost:9001").await?; 44 | 45 | let req = Request::builder() 46 | .method("GET") 47 | .uri(format!("http://localhost:9001/{}", path)) 48 | .header("Host", "localhost:9001") 49 | .header(UPGRADE, "websocket") 50 | .header(CONNECTION, "upgrade") 51 | .header( 52 | "Sec-WebSocket-Key", 53 | fastwebsockets::handshake::generate_key(), 54 | ) 55 | .header("Sec-WebSocket-Version", "13") 56 | .body(Empty::::new())?; 57 | 58 | let (ws, _) = 59 | fastwebsockets::handshake::client(&SpawnExecutor, req, stream).await?; 60 | Ok(FragmentCollector::new(ws)) 61 | } 62 | 63 | async fn get_case_count() -> Result { 64 | let mut ws = connect("getCaseCount").await?; 65 | let msg = ws.read_frame().await?; 66 | ws.write_frame(Frame::close(1000, &[])).await?; 67 | Ok(std::str::from_utf8(&msg.payload)?.parse()?) 68 | } 69 | 70 | #[tokio::main(flavor = "current_thread")] 71 | async fn main() -> Result<()> { 72 | let count = get_case_count().await?; 73 | 74 | for case in 1..=count { 75 | let mut ws = 76 | connect(&format!("runCase?case={}&agent=fastwebsockets", case)).await?; 77 | 78 | loop { 79 | let msg = match ws.read_frame().await { 80 | Ok(msg) => msg, 81 | Err(e) => { 82 | println!("Error: {}", e); 83 | ws.write_frame(Frame::close_raw(vec![].into())).await?; 84 | break; 85 | } 86 | }; 87 | 88 | match msg.opcode { 89 | OpCode::Text | OpCode::Binary => { 90 | ws.write_frame(Frame::new(true, msg.opcode, None, msg.payload)) 91 | .await?; 92 | } 93 | OpCode::Close => { 94 | break; 95 | } 96 | _ => {} 97 | } 98 | } 99 | } 100 | 101 | let mut ws = connect("updateReports?agent=fastwebsockets").await?; 102 | ws.write_frame(Frame::close(1000, &[])).await?; 103 | 104 | Ok(()) 105 | } 106 | -------------------------------------------------------------------------------- /benches/run.js: -------------------------------------------------------------------------------- 1 | import { $ } from "https://deno.land/x/dax@0.31.1/mod.ts"; 2 | import { chart } from "https://deno.land/x/fresh_charts/core.ts"; 3 | import { TextLineStream } from "https://deno.land/std/streams/text_line_stream.ts"; 4 | 5 | function wait(ms) { 6 | return new Promise((resolve) => setTimeout(resolve, ms)); 7 | } 8 | 9 | function load_test(conn, port, bytes) { 10 | return $`benches/load_test ${conn} 0.0.0.0 ${port} 0 0 ${bytes}` 11 | .stdout("piped").spawn(); 12 | } 13 | 14 | const targets = [ 15 | // https://github.com/denoland/fastwebsockets 16 | { 17 | port: 8080, 18 | name: "fastwebsockets", 19 | server: "target/release/examples/echo_server", 20 | }, 21 | // https://github.com/uNetworking/uWebSockets 22 | { port: 9001, name: "uWebSockets", server: "../uWebSockets/EchoServer" }, 23 | // https://github.com/snapview/tokio-tungstenite 24 | { 25 | port: 8080, 26 | name: "tokio-tungstenite", 27 | server: "../tokio-tungstenite/target/release/examples/echo-server", 28 | }, 29 | // https://github.com/websockets-rs/rust-websocket 30 | { 31 | port: 9002, 32 | name: "rust-websocket", 33 | server: "../rust-websocket/target/release/examples/async-autobahn-server", 34 | }, 35 | ]; 36 | 37 | const cases = [ 38 | { 39 | conn: 100, 40 | bytes: 20, 41 | }, 42 | { 43 | conn: 10, 44 | bytes: 1024, 45 | }, 46 | { 47 | conn: 10, 48 | bytes: 16 * 1024, 49 | }, 50 | { 51 | conn: 200, 52 | bytes: 16 * 1024, 53 | }, 54 | { 55 | conn: 500, 56 | bytes: 16 * 1024, 57 | }, 58 | ]; 59 | 60 | for (const { conn, bytes } of cases) { 61 | let results = {}; 62 | for (const { port, name, server, arg } of targets) { 63 | let logs = []; 64 | try { 65 | const proc = $`${server} ${arg || ""}`.spawn(); 66 | console.log(`Waiting for ${name} to start...`); 67 | await wait(1000); 68 | 69 | const client = load_test(conn, port, bytes == 20 ? "" : bytes); 70 | const readable = client.stdout().pipeThrough(new TextDecoderStream()) 71 | .pipeThrough(new TextLineStream()); 72 | let count = 0; 73 | for await (const data of readable) { 74 | logs.push(data); 75 | count++; 76 | if (count === 5) { 77 | break; 78 | } 79 | } 80 | client.abort(); 81 | proc.abort(); 82 | await proc; 83 | await client; 84 | } catch (e) { 85 | console.log(e); 86 | } 87 | 88 | const lines = logs.filter((line) => 89 | line.length > 0 && line.startsWith("Msg/sec") 90 | ); 91 | const mps = lines.map((line) => parseInt(line.split(" ")[1].trim()), 10); 92 | const avg = mps.reduce((a, b) => a + b, 0) / mps.length; 93 | results[name] = avg; 94 | } 95 | 96 | results = Object.fromEntries( 97 | Object.entries(results).sort(([, a], [, b]) => b - a), 98 | ); 99 | 100 | const title = `Connections: ${conn}, Payload size: ${bytes}`; 101 | const svg = chart({ 102 | type: "bar", 103 | data: { 104 | labels: Object.keys(results), 105 | datasets: [ 106 | { 107 | label: title, 108 | data: Object.values(results), 109 | backgroundColor: [ 110 | "rgba(54, 162, 235, 255)", 111 | ], 112 | }, 113 | ], 114 | }, 115 | }); 116 | 117 | Deno.writeTextFileSync(`./benches/${conn}-${bytes}-chart.svg`, svg); 118 | } 119 | -------------------------------------------------------------------------------- /Cargo.toml: -------------------------------------------------------------------------------- 1 | [package] 2 | name = "fastwebsockets" 3 | description = "A fast RFC6455 WebSocket server implementation" 4 | version = "0.10.0" 5 | authors = ["Divy Srivastava "] 6 | license = "Apache-2.0" 7 | edition = "2021" 8 | repository = "https://github.com/denoland/fastwebsockets" 9 | 10 | [[example]] 11 | name = "echo_server" 12 | path = "examples/echo_server.rs" 13 | required-features = ["upgrade"] 14 | 15 | [[example]] 16 | name = "autobahn_client" 17 | path = "examples/autobahn_client.rs" 18 | required-features = ["upgrade"] 19 | 20 | [[example]] 21 | name = "tls_client" 22 | path = "examples/tls_client.rs" 23 | required-features = ["upgrade"] 24 | 25 | [[example]] 26 | name = "tls_server" 27 | path = "examples/tls_server.rs" 28 | required-features = ["upgrade"] 29 | 30 | [[example]] 31 | name = "axum" 32 | path = "examples/axum.rs" 33 | required-features = ["upgrade", "with_axum"] 34 | 35 | [[example]] 36 | name = "echo_server_split" 37 | path = "examples/echo_server_split.rs" 38 | required-features = ["upgrade", "unstable-split"] 39 | 40 | [dependencies] 41 | tokio = { version = "1.25.0", default-features = false, features = ["io-util"] } 42 | simdutf8 = { version = "0.1.5", optional = true } 43 | hyper-util = { version = "0.1.0", features = ["tokio"], optional = true } 44 | http-body-util = { version = "0.1.0", optional = true } 45 | hyper = { version = "1", features = [ 46 | "http1", 47 | "server", 48 | "client", 49 | ], optional = true } 50 | pin-project = { version = "1.0.8", optional = true } 51 | base64 = { version = "0.22.0", optional = true } 52 | sha1 = { version = "0.10.5", optional = true } 53 | utf-8 = "0.7.5" 54 | rand = "0.8.4" 55 | thiserror = "1.0.40" 56 | bytes = "1.5.0" 57 | 58 | # Axum integration 59 | axum-core = { version = "0.5.0", optional = true } 60 | http = { version = "1", optional = true } 61 | async-trait = { version = "0.1", optional = true } 62 | 63 | [features] 64 | default = [] 65 | upgrade = [ 66 | "hyper", 67 | "pin-project", 68 | "base64", 69 | "sha1", 70 | "hyper-util", 71 | "http-body-util", 72 | ] 73 | unstable-split = [] 74 | # Axum integration 75 | with_axum = ["axum-core", "http", "async-trait"] 76 | 77 | [dev-dependencies] 78 | tokio = { version = "1.25.0", features = ["full", "macros"] } 79 | tokio-rustls = "0.24.0" 80 | rustls-pemfile = "1.0" 81 | hyper-util = { version = "0.1.0", features = ["tokio"] } 82 | http-body-util = { version = "0.1.0" } 83 | hyper = { version = "1", features = ["http1", "server", "client"] } 84 | assert2 = "0.3.4" 85 | trybuild = "1.0.80" 86 | criterion = "0.4.0" 87 | anyhow = "1.0.71" 88 | webpki-roots = "0.23.0" 89 | bytes = "1.4.0" 90 | axum = "0.8.1" 91 | 92 | [[test]] 93 | name = "upgrade" 94 | path = "tests/upgrade.rs" 95 | required-features = ["upgrade"] 96 | 97 | [[test]] 98 | name = "split" 99 | path = "tests/split.rs" 100 | required-features = ["upgrade", "unstable-split"] 101 | 102 | [[test]] 103 | name = "concurrency" 104 | path = "tests/concurrency.rs" 105 | required-features = ["upgrade"] 106 | 107 | [[bench]] 108 | name = "unmask" 109 | harness = false 110 | 111 | # Build release with debug symbols: cargo build --profile=release-with-debug 112 | [profile.release-with-debug] 113 | inherits = "release" 114 | debug = true 115 | 116 | [profile.release] 117 | opt-level = 3 118 | lto = true 119 | codegen-units = 1 120 | 121 | [package.metadata.docs.rs] 122 | features = ["upgrade", "with_axum"] 123 | -------------------------------------------------------------------------------- /examples/tls_client.rs: -------------------------------------------------------------------------------- 1 | use std::future::Future; 2 | use std::sync::Arc; 3 | 4 | use anyhow::Result; 5 | use bytes::Bytes; 6 | use fastwebsockets::FragmentCollector; 7 | use fastwebsockets::Frame; 8 | use fastwebsockets::OpCode; 9 | use http_body_util::Empty; 10 | use hyper::header::CONNECTION; 11 | use hyper::header::UPGRADE; 12 | use hyper::upgrade::Upgraded; 13 | use hyper::Request; 14 | use hyper_util::rt::TokioIo; 15 | use tokio::net::TcpStream; 16 | use tokio_rustls::rustls::ClientConfig; 17 | use tokio_rustls::rustls::OwnedTrustAnchor; 18 | use tokio_rustls::TlsConnector; 19 | 20 | struct SpawnExecutor; 21 | 22 | impl hyper::rt::Executor for SpawnExecutor 23 | where 24 | Fut: Future + Send + 'static, 25 | Fut::Output: Send + 'static, 26 | { 27 | fn execute(&self, fut: Fut) { 28 | tokio::task::spawn(fut); 29 | } 30 | } 31 | 32 | fn tls_connector() -> Result { 33 | let mut root_store = tokio_rustls::rustls::RootCertStore::empty(); 34 | 35 | root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map( 36 | |ta| { 37 | OwnedTrustAnchor::from_subject_spki_name_constraints( 38 | ta.subject, 39 | ta.spki, 40 | ta.name_constraints, 41 | ) 42 | }, 43 | )); 44 | 45 | let config = ClientConfig::builder() 46 | .with_safe_defaults() 47 | .with_root_certificates(root_store) 48 | .with_no_client_auth(); 49 | 50 | Ok(TlsConnector::from(Arc::new(config))) 51 | } 52 | 53 | async fn connect(domain: &str) -> Result>> { 54 | let mut addr = String::from(domain); 55 | addr.push_str(":9443"); // Port number for binance stream 56 | 57 | let tcp_stream = TcpStream::connect(&addr).await?; 58 | let tls_connector = tls_connector().unwrap(); 59 | let domain = 60 | tokio_rustls::rustls::ServerName::try_from(domain).map_err(|_| { 61 | std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid dnsname") 62 | })?; 63 | 64 | let tls_stream = tls_connector.connect(domain, tcp_stream).await?; 65 | 66 | let req = Request::builder() 67 | .method("GET") 68 | .uri(format!("wss://{}/ws/btcusdt@bookTicker", &addr)) //stream we want to subscribe to 69 | .header("Host", &addr) 70 | .header(UPGRADE, "websocket") 71 | .header(CONNECTION, "upgrade") 72 | .header( 73 | "Sec-WebSocket-Key", 74 | fastwebsockets::handshake::generate_key(), 75 | ) 76 | .header("Sec-WebSocket-Version", "13") 77 | .body(Empty::::new())?; 78 | 79 | let (ws, _) = 80 | fastwebsockets::handshake::client(&SpawnExecutor, req, tls_stream).await?; 81 | Ok(FragmentCollector::new(ws)) 82 | } 83 | 84 | #[tokio::main(flavor = "current_thread")] 85 | async fn main() -> Result<()> { 86 | let domain = "data-stream.binance.com"; 87 | let mut ws = connect(domain).await?; 88 | 89 | loop { 90 | let msg = match ws.read_frame().await { 91 | Ok(msg) => msg, 92 | Err(e) => { 93 | println!("Error: {}", e); 94 | ws.write_frame(Frame::close_raw(vec![].into())).await?; 95 | break; 96 | } 97 | }; 98 | 99 | match msg.opcode { 100 | OpCode::Text => { 101 | let payload = 102 | String::from_utf8(msg.payload.to_vec()).expect("Invalid UTF-8 data"); 103 | // Normally deserialise from json here, print just to show it works 104 | println!("{:?}", payload); 105 | } 106 | OpCode::Close => { 107 | break; 108 | } 109 | _ => {} 110 | } 111 | } 112 | Ok(()) 113 | } 114 | -------------------------------------------------------------------------------- /examples/tls_server.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | use anyhow::Result; 16 | use fastwebsockets::upgrade; 17 | use fastwebsockets::OpCode; 18 | use http_body_util::Empty; 19 | use hyper::body::Bytes; 20 | use hyper::body::Incoming; 21 | use hyper::server::conn::http1; 22 | use hyper::service::service_fn; 23 | use hyper::Request; 24 | use hyper::Response; 25 | use std::sync::Arc; 26 | use tokio::net::TcpListener; 27 | use tokio_rustls::rustls; 28 | use tokio_rustls::rustls::Certificate; 29 | use tokio_rustls::rustls::PrivateKey; 30 | use tokio_rustls::TlsAcceptor; 31 | 32 | async fn handle_client(fut: upgrade::UpgradeFut) -> Result<()> { 33 | let mut ws = fut.await?; 34 | ws.set_writev(false); 35 | let mut ws = fastwebsockets::FragmentCollector::new(ws); 36 | 37 | loop { 38 | let frame = ws.read_frame().await?; 39 | match frame.opcode { 40 | OpCode::Close => break, 41 | OpCode::Text | OpCode::Binary => { 42 | ws.write_frame(frame).await?; 43 | } 44 | _ => {} 45 | } 46 | } 47 | 48 | Ok(()) 49 | } 50 | 51 | async fn server_upgrade( 52 | mut req: Request, 53 | ) -> Result>> { 54 | let (response, fut) = upgrade::upgrade(&mut req)?; 55 | 56 | tokio::spawn(async move { 57 | if let Err(e) = handle_client(fut).await { 58 | eprintln!("Error in websocket connection: {}", e); 59 | } 60 | }); 61 | 62 | Ok(response) 63 | } 64 | 65 | fn tls_acceptor() -> Result { 66 | static KEY: &[u8] = include_bytes!("./localhost.key"); 67 | static CERT: &[u8] = include_bytes!("./localhost.crt"); 68 | 69 | let mut keys: Vec = 70 | rustls_pemfile::pkcs8_private_keys(&mut &*KEY) 71 | .map(|mut certs| certs.drain(..).map(PrivateKey).collect()) 72 | .unwrap(); 73 | let certs = rustls_pemfile::certs(&mut &*CERT) 74 | .map(|mut certs| certs.drain(..).map(Certificate).collect()) 75 | .unwrap(); 76 | dbg!(&certs); 77 | let config = rustls::ServerConfig::builder() 78 | .with_safe_defaults() 79 | .with_no_client_auth() 80 | .with_single_cert(certs, keys.remove(0))?; 81 | Ok(TlsAcceptor::from(Arc::new(config))) 82 | } 83 | 84 | #[tokio::main(flavor = "current_thread")] 85 | async fn main() -> Result<()> { 86 | let acceptor = tls_acceptor()?; 87 | let listener = TcpListener::bind("127.0.0.1:8080").await?; 88 | println!("Server started, listening on {}", "127.0.0.1:8080"); 89 | loop { 90 | let (stream, _) = listener.accept().await?; 91 | println!("Client connected"); 92 | let acceptor = acceptor.clone(); 93 | tokio::spawn(async move { 94 | let stream = acceptor.accept(stream).await.unwrap(); 95 | let io = hyper_util::rt::TokioIo::new(stream); 96 | let conn_fut = http1::Builder::new() 97 | .serve_connection(io, service_fn(server_upgrade)) 98 | .with_upgrades(); 99 | if let Err(e) = conn_fut.await { 100 | println!("An error occurred: {:?}", e); 101 | } 102 | }); 103 | } 104 | } 105 | -------------------------------------------------------------------------------- /tests/upgrade.rs: -------------------------------------------------------------------------------- 1 | // https://github.com/de-vri-es/hyper-tungstenite-rs/tree/main/tests 2 | 3 | use http_body_util::Empty; 4 | use hyper::body::Bytes; 5 | use hyper::body::Incoming; 6 | use hyper::header::CONNECTION; 7 | use hyper::header::UPGRADE; 8 | use hyper::server::conn::http1; 9 | use hyper::service::service_fn; 10 | use hyper::Request; 11 | use hyper::Response; 12 | use hyper_util::rt::TokioIo; 13 | use std::future::Future; 14 | use std::net::Ipv6Addr; 15 | use tokio::net::TcpStream; 16 | 17 | use assert2::assert; 18 | use assert2::let_assert; 19 | 20 | struct TestExecutor; 21 | 22 | impl hyper::rt::Executor for TestExecutor 23 | where 24 | Fut: Future + Send + 'static, 25 | Fut::Output: Send + 'static, 26 | { 27 | fn execute(&self, fut: Fut) { 28 | tokio::spawn(fut); 29 | } 30 | } 31 | 32 | #[tokio::test] 33 | async fn hyper() { 34 | // Bind a TCP listener to an ephemeral port. 35 | let_assert!( 36 | Ok(listener) = 37 | tokio::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)).await 38 | ); 39 | let_assert!(Ok(bind_addr) = listener.local_addr()); 40 | 41 | // Spawn the server in a task. 42 | tokio::spawn(async move { 43 | loop { 44 | let (stream, _) = listener.accept().await.unwrap(); 45 | let io = TokioIo::new(stream); 46 | 47 | tokio::spawn(async move { 48 | if let Err(err) = http1::Builder::new() 49 | .serve_connection(io, service_fn(upgrade_websocket)) 50 | .with_upgrades() 51 | .await 52 | { 53 | println!("Error serving connection: {:?}", err); 54 | } 55 | }); 56 | } 57 | }); 58 | 59 | // Try to create a websocket connection with the server. 60 | let_assert!(Ok(stream) = TcpStream::connect(bind_addr).await); 61 | let_assert!( 62 | Ok(req) = Request::builder() 63 | .method("GET") 64 | .uri("ws://localhost/foo") 65 | .header("Host", "localhost") 66 | .header(UPGRADE, "websocket") 67 | .header(CONNECTION, "upgrade") 68 | .header( 69 | "Sec-WebSocket-Key", 70 | fastwebsockets::handshake::generate_key(), 71 | ) 72 | .header("Sec-WebSocket-Version", "13") 73 | .body(Empty::::new()) 74 | ); 75 | let_assert!(Ok((mut stream, _response)) = fastwebsockets::handshake::client(&TestExecutor, req, stream).await); 76 | 77 | let_assert!(Ok(message) = stream.read_frame().await); 78 | assert!(message.opcode == fastwebsockets::OpCode::Text); 79 | assert!(message.payload == b"Hello!"); 80 | 81 | let_assert!( 82 | Ok(()) = stream 83 | .write_frame(fastwebsockets::Frame::text(b"Goodbye!".to_vec().into())) 84 | .await 85 | ); 86 | let_assert!(Ok(close_frame) = stream.read_frame().await); 87 | assert!(close_frame.opcode == fastwebsockets::OpCode::Close); 88 | } 89 | 90 | async fn upgrade_websocket( 91 | mut request: Request, 92 | ) -> Result>, fastwebsockets::WebSocketError> { 93 | assert!(fastwebsockets::upgrade::is_upgrade_request(&request) == true); 94 | 95 | let (response, stream) = fastwebsockets::upgrade::upgrade(&mut request)?; 96 | tokio::spawn(async move { 97 | let_assert!(Ok(mut stream) = stream.await); 98 | assert!(let Ok(()) = stream.write_frame(fastwebsockets::Frame::text(b"Hello!".to_vec().into())).await); 99 | let_assert!(Ok(reply) = stream.read_frame().await); 100 | assert!(reply.opcode == fastwebsockets::OpCode::Text); 101 | assert!(reply.payload == b"Goodbye!"); 102 | 103 | assert!(let Ok(()) = stream.write_frame(fastwebsockets::Frame::close_raw(vec![].into())).await); 104 | }); 105 | 106 | Ok(response) 107 | } 108 | -------------------------------------------------------------------------------- /tests/concurrency.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | use anyhow::Result; 16 | use fastwebsockets::upgrade; 17 | use fastwebsockets::Frame; 18 | use fastwebsockets::OpCode; 19 | use http_body_util::Empty; 20 | use hyper::body::Bytes; 21 | use hyper::body::Incoming; 22 | use hyper::server::conn::http1; 23 | use hyper::service::service_fn; 24 | use hyper::Request; 25 | use hyper::Response; 26 | use hyper_util::rt::TokioIo; 27 | use tokio::net::TcpListener; 28 | 29 | use fastwebsockets::handshake; 30 | use fastwebsockets::WebSocket; 31 | use hyper::header::CONNECTION; 32 | use hyper::header::UPGRADE; 33 | use hyper::upgrade::Upgraded; 34 | use std::future::Future; 35 | use tokio::net::TcpStream; 36 | 37 | const N_CLIENTS: usize = 20; 38 | 39 | async fn handle_client( 40 | client_id: usize, 41 | fut: upgrade::UpgradeFut, 42 | ) -> Result<()> { 43 | let mut ws = fut.await?; 44 | ws.set_writev(false); 45 | let mut ws = fastwebsockets::FragmentCollector::new(ws); 46 | 47 | ws.write_frame(Frame::binary(client_id.to_ne_bytes().as_ref().into())) 48 | .await 49 | .unwrap(); 50 | 51 | Ok(()) 52 | } 53 | 54 | async fn server_upgrade( 55 | mut req: Request, 56 | ) -> Result>> { 57 | let (response, fut) = upgrade::upgrade(&mut req)?; 58 | 59 | let client_id: usize = req 60 | .headers() 61 | .get("CLIENT-ID") 62 | .unwrap() 63 | .to_str() 64 | .unwrap() 65 | .parse() 66 | .unwrap(); 67 | tokio::spawn(async move { 68 | handle_client(client_id, fut).await.unwrap(); 69 | }); 70 | 71 | Ok(response) 72 | } 73 | 74 | async fn connect(client_id: usize) -> Result>> { 75 | let stream = TcpStream::connect("localhost:8080").await?; 76 | 77 | let req = Request::builder() 78 | .method("GET") 79 | .uri("http://localhost:8080/") 80 | .header("Host", "localhost:8080") 81 | .header(UPGRADE, "websocket") 82 | .header(CONNECTION, "upgrade") 83 | .header("CLIENT-ID", &format!("{}", client_id)) 84 | .header( 85 | "Sec-WebSocket-Key", 86 | fastwebsockets::handshake::generate_key(), 87 | ) 88 | .header("Sec-WebSocket-Version", "13") 89 | .body(Empty::::new())?; 90 | 91 | let (ws, _) = handshake::client(&SpawnExecutor, req, stream).await?; 92 | Ok(ws) 93 | } 94 | 95 | async fn start_client(client_id: usize) -> Result<()> { 96 | let mut ws = connect(client_id).await.unwrap(); 97 | let frame = ws.read_frame().await?; 98 | match frame.opcode { 99 | OpCode::Close => {} 100 | OpCode::Binary => { 101 | let n = usize::from_ne_bytes(frame.payload[..].try_into().unwrap()); 102 | assert_eq!(n, client_id); 103 | } 104 | _ => { 105 | panic!("Unexpected"); 106 | } 107 | } 108 | Ok(()) 109 | } 110 | 111 | #[tokio::test(flavor = "multi_thread")] 112 | async fn test() -> Result<()> { 113 | let listener = TcpListener::bind("127.0.0.1:8080").await?; 114 | println!("Server started, listening on {}", "127.0.0.1:8080"); 115 | tokio::spawn(async move { 116 | loop { 117 | let (stream, _) = listener.accept().await.unwrap(); 118 | tokio::spawn(async move { 119 | let io = TokioIo::new(stream); 120 | let conn_fut = http1::Builder::new() 121 | .serve_connection(io, service_fn(server_upgrade)) 122 | .with_upgrades(); 123 | conn_fut.await.unwrap(); 124 | }); 125 | } 126 | }); 127 | let mut tasks = Vec::with_capacity(N_CLIENTS); 128 | for client in 0..N_CLIENTS { 129 | tasks.push(tokio::spawn(start_client(client))); 130 | } 131 | for handle in tasks { 132 | handle.await.unwrap().unwrap(); 133 | } 134 | Ok(()) 135 | } 136 | 137 | struct SpawnExecutor; 138 | 139 | impl hyper::rt::Executor for SpawnExecutor 140 | where 141 | Fut: Future + Send + 'static, 142 | Fut::Output: Send + 'static, 143 | { 144 | fn execute(&self, fut: Fut) { 145 | tokio::task::spawn(fut); 146 | } 147 | } 148 | -------------------------------------------------------------------------------- /tests/split.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | use anyhow::Result; 16 | use fastwebsockets::upgrade; 17 | use fastwebsockets::Frame; 18 | use fastwebsockets::OpCode; 19 | use http_body_util::Empty; 20 | use hyper::body::Bytes; 21 | use hyper::body::Incoming; 22 | use hyper::server::conn::http1; 23 | use hyper::service::service_fn; 24 | use hyper::Request; 25 | use hyper::Response; 26 | use hyper_util::rt::TokioIo; 27 | use tokio::net::TcpListener; 28 | 29 | use fastwebsockets::handshake; 30 | use fastwebsockets::WebSocketRead; 31 | use fastwebsockets::WebSocketWrite; 32 | use hyper::header::CONNECTION; 33 | use hyper::header::UPGRADE; 34 | use hyper::upgrade::Upgraded; 35 | use tokio::sync::Mutex; 36 | 37 | use std::future::Future; 38 | use std::rc::Rc; 39 | 40 | use tokio::net::TcpStream; 41 | 42 | const N_CLIENTS: usize = 20; 43 | 44 | async fn handle_client( 45 | client_id: usize, 46 | fut: upgrade::UpgradeFut, 47 | ) -> Result<()> { 48 | let mut ws = fut.await?; 49 | ws.set_writev(false); 50 | let mut ws = fastwebsockets::FragmentCollector::new(ws); 51 | 52 | ws.write_frame(Frame::binary(client_id.to_ne_bytes().as_ref().into())) 53 | .await 54 | .unwrap(); 55 | 56 | Ok(()) 57 | } 58 | 59 | async fn server_upgrade( 60 | mut req: Request, 61 | ) -> Result>> { 62 | let (response, fut) = upgrade::upgrade(&mut req)?; 63 | 64 | let client_id: usize = req 65 | .headers() 66 | .get("CLIENT-ID") 67 | .unwrap() 68 | .to_str() 69 | .unwrap() 70 | .parse() 71 | .unwrap(); 72 | tokio::spawn(async move { 73 | handle_client(client_id, fut).await.unwrap(); 74 | }); 75 | 76 | Ok(response) 77 | } 78 | 79 | async fn connect( 80 | client_id: usize, 81 | ) -> Result<( 82 | WebSocketRead>>, 83 | WebSocketWrite>>, 84 | )> { 85 | let stream = TcpStream::connect("localhost:8080").await?; 86 | 87 | let req = Request::builder() 88 | .method("GET") 89 | .uri("http://localhost:8080/") 90 | .header("Host", "localhost:8080") 91 | .header(UPGRADE, "websocket") 92 | .header(CONNECTION, "upgrade") 93 | .header("CLIENT-ID", &format!("{}", client_id)) 94 | .header( 95 | "Sec-WebSocket-Key", 96 | fastwebsockets::handshake::generate_key(), 97 | ) 98 | .header("Sec-WebSocket-Version", "13") 99 | .body(Empty::::new())?; 100 | 101 | let (ws, _) = handshake::client(&SpawnExecutor, req, stream).await?; 102 | Ok(ws.split(tokio::io::split)) 103 | } 104 | 105 | async fn start_client(client_id: usize) -> Result<()> { 106 | let (mut r, w) = connect(client_id).await.unwrap(); 107 | let w = Rc::new(Mutex::new(w)); 108 | let frame = r 109 | .read_frame(&mut move |frame| { 110 | let w = w.clone(); 111 | async move { w.lock().await.write_frame(frame).await } 112 | }) 113 | .await?; 114 | match frame.opcode { 115 | OpCode::Close => {} 116 | OpCode::Binary => { 117 | let n = usize::from_ne_bytes(frame.payload[..].try_into().unwrap()); 118 | assert_eq!(n, client_id); 119 | } 120 | _ => { 121 | panic!("Unexpected"); 122 | } 123 | } 124 | Ok(()) 125 | } 126 | 127 | #[tokio::test(flavor = "multi_thread")] 128 | async fn test() -> Result<()> { 129 | let listener = TcpListener::bind("127.0.0.1:8080").await?; 130 | println!("Server started, listening on {}", "127.0.0.1:8080"); 131 | tokio::spawn(async move { 132 | loop { 133 | let (stream, _) = listener.accept().await.unwrap(); 134 | tokio::spawn(async move { 135 | let io = TokioIo::new(stream); 136 | let conn_fut = http1::Builder::new() 137 | .serve_connection(io, service_fn(server_upgrade)) 138 | .with_upgrades(); 139 | conn_fut.await.unwrap(); 140 | }); 141 | } 142 | }); 143 | let mut tasks = Vec::with_capacity(N_CLIENTS); 144 | for client in 0..N_CLIENTS { 145 | tasks.push(start_client(client)); 146 | } 147 | for handle in tasks { 148 | handle.await.unwrap(); 149 | } 150 | Ok(()) 151 | } 152 | 153 | struct SpawnExecutor; 154 | 155 | impl hyper::rt::Executor for SpawnExecutor 156 | where 157 | Fut: Future + Send + 'static, 158 | Fut::Output: Send + 'static, 159 | { 160 | fn execute(&self, fut: Fut) { 161 | tokio::task::spawn(fut); 162 | } 163 | } 164 | -------------------------------------------------------------------------------- /src/handshake.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | use hyper::body::Incoming; 16 | use hyper::upgrade::Upgraded; 17 | use hyper::Request; 18 | use hyper::Response; 19 | use hyper::StatusCode; 20 | 21 | use base64::engine::general_purpose::STANDARD; 22 | use base64::Engine; 23 | 24 | use hyper_util::rt::TokioIo; 25 | use tokio::io::AsyncRead; 26 | use tokio::io::AsyncWrite; 27 | 28 | use std::future::Future; 29 | use std::pin::Pin; 30 | 31 | use crate::Role; 32 | use crate::WebSocket; 33 | use crate::WebSocketError; 34 | 35 | /// Perform the client handshake. 36 | /// 37 | /// This function is used to perform the client handshake. It takes a hyper 38 | /// executor, a `hyper::Request` and a stream. 39 | /// 40 | /// # Example 41 | /// 42 | /// ``` 43 | /// use fastwebsockets::handshake; 44 | /// use fastwebsockets::WebSocket; 45 | /// use hyper::{Request, body::Bytes, upgrade::Upgraded, header::{UPGRADE, CONNECTION}}; 46 | /// use hyper_util::rt::TokioIo; 47 | /// use http_body_util::Empty; 48 | /// use tokio::net::TcpStream; 49 | /// use std::future::Future; 50 | /// use anyhow::Result; 51 | /// 52 | /// async fn connect() -> Result>> { 53 | /// let stream = TcpStream::connect("localhost:9001").await?; 54 | /// 55 | /// let req = Request::builder() 56 | /// .method("GET") 57 | /// .uri("http://localhost:9001/") 58 | /// .header("Host", "localhost:9001") 59 | /// .header(UPGRADE, "websocket") 60 | /// .header(CONNECTION, "upgrade") 61 | /// .header( 62 | /// "Sec-WebSocket-Key", 63 | /// fastwebsockets::handshake::generate_key(), 64 | /// ) 65 | /// .header("Sec-WebSocket-Version", "13") 66 | /// .body(Empty::::new())?; 67 | /// 68 | /// let (ws, _) = handshake::client(&SpawnExecutor, req, stream).await?; 69 | /// Ok(ws) 70 | /// } 71 | /// 72 | /// // Tie hyper's executor to tokio runtime 73 | /// struct SpawnExecutor; 74 | /// 75 | /// impl hyper::rt::Executor for SpawnExecutor 76 | /// where 77 | /// Fut: Future + Send + 'static, 78 | /// Fut::Output: Send + 'static, 79 | /// { 80 | /// fn execute(&self, fut: Fut) { 81 | /// tokio::task::spawn(fut); 82 | /// } 83 | /// } 84 | /// ``` 85 | pub async fn client( 86 | executor: &E, 87 | request: Request, 88 | socket: S, 89 | ) -> Result<(WebSocket>, Response), WebSocketError> 90 | where 91 | S: AsyncRead + AsyncWrite + Send + Unpin + 'static, 92 | E: hyper::rt::Executor + Send>>>, 93 | B: hyper::body::Body + 'static + Send, 94 | B::Data: Send, 95 | B::Error: Into>, 96 | { 97 | let (mut sender, conn) = 98 | hyper::client::conn::http1::handshake(TokioIo::new(socket)).await?; 99 | let fut = Box::pin(async move { 100 | if let Err(e) = conn.with_upgrades().await { 101 | eprintln!("Error polling connection: {}", e); 102 | } 103 | }); 104 | executor.execute(fut); 105 | 106 | let mut response = sender.send_request(request).await?; 107 | verify(&response)?; 108 | 109 | match hyper::upgrade::on(&mut response).await { 110 | Ok(upgraded) => Ok(( 111 | WebSocket::after_handshake(TokioIo::new(upgraded), Role::Client), 112 | response, 113 | )), 114 | Err(e) => Err(e.into()), 115 | } 116 | } 117 | 118 | /// Generate a random key for the `Sec-WebSocket-Key` header. 119 | pub fn generate_key() -> String { 120 | // a base64-encoded (see Section 4 of [RFC4648]) value that, 121 | // when decoded, is 16 bytes in length (RFC 6455) 122 | let r: [u8; 16] = rand::random(); 123 | STANDARD.encode(r) 124 | } 125 | 126 | // https://github.com/snapview/tungstenite-rs/blob/314feea3055a93e585882fb769854a912a7e6dae/src/handshake/client.rs#L189 127 | fn verify(response: &Response) -> Result<(), WebSocketError> { 128 | if response.status() != StatusCode::SWITCHING_PROTOCOLS { 129 | return Err(WebSocketError::InvalidStatusCode( 130 | response.status().as_u16(), 131 | )); 132 | } 133 | 134 | let headers = response.headers(); 135 | 136 | if !headers 137 | .get("Upgrade") 138 | .and_then(|h| h.to_str().ok()) 139 | .map(|h| h.eq_ignore_ascii_case("websocket")) 140 | .unwrap_or(false) 141 | { 142 | return Err(WebSocketError::InvalidUpgradeHeader); 143 | } 144 | 145 | if !headers 146 | .get("Connection") 147 | .and_then(|h| h.to_str().ok()) 148 | .map(|h| h.eq_ignore_ascii_case("Upgrade")) 149 | .unwrap_or(false) 150 | { 151 | return Err(WebSocketError::InvalidConnectionHeader); 152 | } 153 | 154 | Ok(()) 155 | } 156 | -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- 1 | [![Crates.io](https://img.shields.io/crates/v/fastwebsockets.svg)](https://crates.io/crates/fastwebsockets) 2 | 3 | [Documentation](https://docs.rs/fastwebsockets) | [Benchmarks](benches/) 4 | 5 | _fastwebsockets_ is a fast WebSocket protocol implementation. 6 | 7 | Passes the 8 | Autobahn|TestSuite1 9 | and fuzzed with LLVM's libfuzzer. 10 | 11 | You can use it as a raw websocket frame parser and deal with spec compliance 12 | yourself, or you can use it as a full-fledged websocket client/server. 13 | 14 | ```rust 15 | use fastwebsockets::{Frame, OpCode, WebSocket}; 16 | 17 | async fn handle_client( 18 | mut socket: TcpStream, 19 | ) -> Result<(), WebSocketError> { 20 | handshake(&mut socket).await?; 21 | 22 | let mut ws = WebSocket::after_handshake(socket); 23 | ws.set_writev(true); 24 | ws.set_auto_close(true); 25 | ws.set_auto_pong(true); 26 | 27 | loop { 28 | let frame = ws.read_frame().await?; 29 | 30 | match frame { 31 | OpCode::Close => break, 32 | OpCode::Text | OpCode::Binary => { 33 | let frame = Frame::new(true, frame.opcode, None, frame.payload); 34 | ws.write_frame(frame).await?; 35 | } 36 | } 37 | } 38 | 39 | Ok(()) 40 | } 41 | ``` 42 | 43 | **Fragmentation** 44 | 45 | By default, fastwebsockets will give the application raw frames with FIN set. 46 | Other crates like tungstenite which will give you a single message with all the 47 | frames concatenated. 48 | 49 | For concanated frames, use `FragmentCollector`: 50 | 51 | ```rust 52 | let mut ws = WebSocket::after_handshake(socket); 53 | let mut ws = FragmentCollector::new(ws); 54 | 55 | let incoming = ws.read_frame().await?; 56 | // Always returns full messages 57 | assert!(incoming.fin); 58 | ``` 59 | 60 | > permessage-deflate is not supported yet. 61 | 62 | **HTTP Upgrade** 63 | 64 | Enable the `upgrade` feature to do server-side upgrades and client-side 65 | handshakes. 66 | 67 | This feature is powered by [hyper](https://docs.rs/hyper). 68 | 69 | ```rust 70 | use fastwebsockets::upgrade::upgrade; 71 | use hyper::{Request, body::{Incoming, Bytes}, Response}; 72 | use http_body_util::Empty; 73 | use anyhow::Result; 74 | 75 | async fn server_upgrade( 76 | mut req: Request, 77 | ) -> Result>> { 78 | let (response, fut) = upgrade::upgrade(&mut req)?; 79 | 80 | tokio::spawn(async move { 81 | if let Err(e) = handle_client(fut).await { 82 | eprintln!("Error in websocket connection: {}", e); 83 | } 84 | }); 85 | 86 | Ok(response) 87 | } 88 | ``` 89 | 90 | Use the `handshake` module for client-side handshakes. 91 | 92 | ```rust 93 | use fastwebsockets::handshake; 94 | use fastwebsockets::WebSocket; 95 | use hyper::{Request, body::Bytes, upgrade::Upgraded, header::{UPGRADE, CONNECTION}}; 96 | use http_body_util::Empty; 97 | use tokio::net::TcpStream; 98 | use std::future::Future; 99 | use anyhow::Result; 100 | 101 | async fn connect() -> Result> { 102 | let stream = TcpStream::connect("localhost:9001").await?; 103 | 104 | let req = Request::builder() 105 | .method("GET") 106 | .uri("http://localhost:9001/") 107 | .header("Host", "localhost:9001") 108 | .header(UPGRADE, "websocket") 109 | .header(CONNECTION, "upgrade") 110 | .header( 111 | "Sec-WebSocket-Key", 112 | fastwebsockets::handshake::generate_key(), 113 | ) 114 | .header("Sec-WebSocket-Version", "13") 115 | .body(Empty::::new())?; 116 | 117 | let (ws, _) = handshake::client(&SpawnExecutor, req, stream).await?; 118 | Ok(ws) 119 | } 120 | 121 | // Tie hyper's executor to tokio runtime 122 | struct SpawnExecutor; 123 | 124 | impl hyper::rt::Executor for SpawnExecutor 125 | where 126 | Fut: Future + Send + 'static, 127 | Fut::Output: Send + 'static, 128 | { 129 | fn execute(&self, fut: Fut) { 130 | tokio::task::spawn(fut); 131 | } 132 | } 133 | ``` 134 | 135 | **Usage with Axum** 136 | 137 | Enable the Axum integration with `features = ["upgrade", "with_axum"]` in Cargo.toml. 138 | 139 | ```rust 140 | use axum::{response::IntoResponse, routing::get, Router}; 141 | use fastwebsockets::upgrade; 142 | use fastwebsockets::OpCode; 143 | use fastwebsockets::WebSocketError; 144 | 145 | #[tokio::main] 146 | async fn main() { 147 | let app = Router::new().route("/", get(ws_handler)); 148 | 149 | let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); 150 | axum::serve(listener, app).await.unwrap(); 151 | } 152 | 153 | async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> { 154 | let mut ws = fastwebsockets::FragmentCollector::new(fut.await?); 155 | 156 | loop { 157 | let frame = ws.read_frame().await?; 158 | match frame.opcode { 159 | OpCode::Close => break, 160 | OpCode::Text | OpCode::Binary => { 161 | ws.write_frame(frame).await?; 162 | } 163 | _ => {} 164 | } 165 | } 166 | 167 | Ok(()) 168 | } 169 | 170 | async fn ws_handler(ws: upgrade::IncomingUpgrade) -> impl IntoResponse { 171 | let (response, fut) = ws.upgrade().unwrap(); 172 | 173 | tokio::task::spawn(async move { 174 | if let Err(e) = handle_client(fut).await { 175 | eprintln!("Error in websocket connection: {}", e); 176 | } 177 | }); 178 | 179 | response 180 | } 181 | ``` 182 | 183 | 184 | -------------------------------------------------------------------------------- /src/mask.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | #[inline] 16 | fn unmask_easy(payload: &mut [u8], mask: [u8; 4]) { 17 | payload.iter_mut().enumerate().for_each(|(i, v)| { 18 | *v ^= mask[i & 3]; 19 | }); 20 | } 21 | 22 | // TODO(@littledivy): Compiler does a good job at auto-vectorizing `unmask_fallback` with 23 | // -C target-cpu=native. Below is a manual implementation. 24 | // 25 | // #[cfg(all(target_arch = "x86_64", feature = "simd"))] 26 | // #[inline] 27 | // fn unmask_x86_64(payload: &mut [u8], mask: [u8; 4]) { 28 | // #[inline] 29 | // fn sse2(payload: &mut [u8], mask: [u8; 4]) { 30 | // const ALIGNMENT: usize = 16; 31 | // unsafe { 32 | // use std::arch::x86_64::*; 33 | // 34 | // let len = payload.len(); 35 | // if len < ALIGNMENT { 36 | // return unmask_fallback(payload, mask); 37 | // } 38 | // 39 | // let start = len - len % ALIGNMENT; 40 | // 41 | // let mut aligned_mask = [0; ALIGNMENT]; 42 | // 43 | // for j in (0..ALIGNMENT).step_by(4) { 44 | // aligned_mask[j] = mask[j % 4]; 45 | // aligned_mask[j + 1] = mask[(j % 4) + 1]; 46 | // aligned_mask[j + 2] = mask[(j % 4) + 2]; 47 | // aligned_mask[j + 3] = mask[(j % 4) + 3]; 48 | // } 49 | // 50 | // let mask_m = _mm_loadu_si128(aligned_mask.as_ptr() as *const _); 51 | // 52 | // for index in (0..start).step_by(ALIGNMENT) { 53 | // let ptr = payload.as_mut_ptr().add(index); 54 | // let mut v = _mm_loadu_si128(ptr as *const _); 55 | // v = _mm_xor_si128(v, mask_m); 56 | // _mm_storeu_si128(ptr as *mut _, v); 57 | // } 58 | // 59 | // if len != start { 60 | // unmask_fallback(&mut payload[start..], mask); 61 | // } 62 | // } 63 | // } 64 | // #[cfg(target_feature = "sse2")] 65 | // { 66 | // return sse2(payload, mask); 67 | // } 68 | // 69 | // #[cfg(not(target_feature = "sse2"))] 70 | // { 71 | // use core::mem; 72 | // use std::sync::atomic::AtomicPtr; 73 | // use std::sync::atomic::Ordering; 74 | // 75 | // type FnRaw = *mut (); 76 | // type FnImpl = unsafe fn(&mut [u8], [u8; 4]); 77 | // 78 | // unsafe fn get_impl(input: &mut [u8], mask: [u8; 4]) { 79 | // let fun = if std::is_x86_feature_detected!("sse2") { 80 | // sse2 81 | // } else { 82 | // unmask_fallback 83 | // }; 84 | // FN.store(fun as FnRaw, Ordering::Relaxed); 85 | // (fun)(input, mask); 86 | // } 87 | // 88 | // static FN: AtomicPtr<()> = AtomicPtr::new(get_impl as FnRaw); 89 | // 90 | // if payload.len() < 16 { 91 | // return unmask_fallback(payload, mask); 92 | // } 93 | // 94 | // let fun = FN.load(Ordering::Relaxed); 95 | // unsafe { mem::transmute::(fun)(payload, mask) } 96 | // } 97 | // } 98 | 99 | // Faster version of `unmask_easy()` which operates on 4-byte blocks. 100 | // https://github.com/snapview/tungstenite-rs/blob/e5efe537b87a6705467043fe44bb220ddf7c1ce8/src/protocol/frame/mask.rs#L23 101 | // 102 | // https://godbolt.org/z/EPTYo5jK8 103 | #[inline] 104 | fn unmask_fallback(buf: &mut [u8], mask: [u8; 4]) { 105 | let mask_u32 = u32::from_ne_bytes(mask); 106 | 107 | let (prefix, words, suffix) = unsafe { buf.align_to_mut::() }; 108 | unmask_easy(prefix, mask); 109 | let head = prefix.len() & 3; 110 | let mask_u32 = if head > 0 { 111 | if cfg!(target_endian = "big") { 112 | mask_u32.rotate_left(8 * head as u32) 113 | } else { 114 | mask_u32.rotate_right(8 * head as u32) 115 | } 116 | } else { 117 | mask_u32 118 | }; 119 | for word in words.iter_mut() { 120 | *word ^= mask_u32; 121 | } 122 | unmask_easy(suffix, mask_u32.to_ne_bytes()); 123 | } 124 | 125 | /// Unmask a payload using the given 4-byte mask. 126 | #[inline] 127 | pub fn unmask(payload: &mut [u8], mask: [u8; 4]) { 128 | unmask_fallback(payload, mask) 129 | } 130 | 131 | #[cfg(test)] 132 | mod tests { 133 | use super::*; 134 | 135 | #[test] 136 | fn test_unmask() { 137 | let mut payload = [0u8; 33]; 138 | let mask = [1, 2, 3, 4]; 139 | unmask(&mut payload, mask); 140 | assert_eq!( 141 | &payload, 142 | &[ 143 | 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 144 | 1, 2, 3, 4, 1, 2, 3, 4, 1 145 | ] 146 | ); 147 | } 148 | 149 | #[test] 150 | fn length_variation_unmask() { 151 | for len in &[0, 2, 3, 8, 16, 18, 31, 32, 40] { 152 | let mut payload = vec![0u8; *len]; 153 | let mask = [1, 2, 3, 4]; 154 | unmask(&mut payload, mask); 155 | 156 | let expected = (0..*len).map(|i| (i & 3) as u8 + 1).collect::>(); 157 | assert_eq!(payload, expected); 158 | } 159 | } 160 | 161 | #[test] 162 | fn length_variation_unmask_2() { 163 | for len in &[0, 2, 3, 8, 16, 18, 31, 32, 40] { 164 | let mut payload = vec![0u8; *len]; 165 | let mask = rand::random::<[u8; 4]>(); 166 | unmask(&mut payload, mask); 167 | 168 | let expected = (0..*len).map(|i| mask[i & 3]).collect::>(); 169 | assert_eq!(payload, expected); 170 | } 171 | } 172 | } 173 | -------------------------------------------------------------------------------- /src/close.rs: -------------------------------------------------------------------------------- 1 | // Mostly copied from https://github.com/snapview/tungstenite-rs/blob/42b8797e8b7f39efb7d9322dc8af3e9089db4f7d/src/protocol/frame/coding.rs#L117 2 | // 3 | // Copyright (c) 2017 Alexey Galakhov 4 | // Copyright (c) 2016 Jason Housley 5 | // Dual licensed under MIT and Apache 2.0 6 | // --- 7 | // Copyright 2023 Divy Srivastava 8 | // 9 | // Licensed under the Apache License, Version 2.0 (the "License"); 10 | // you may not use this file except in compliance with the License. 11 | // You may obtain a copy of the License at 12 | // 13 | // http://www.apache.org/licenses/LICENSE-2.0 14 | // 15 | // Unless required by applicable law or agreed to in writing, software 16 | // distributed under the License is distributed on an "AS IS" BASIS, 17 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 18 | // See the License for the specific language governing permissions and 19 | // limitations under the License. 20 | 21 | use self::CloseCode::*; 22 | /// Status code used to indicate why an endpoint is closing the WebSocket connection. 23 | #[derive(Debug, Eq, PartialEq, Clone, Copy)] 24 | pub enum CloseCode { 25 | /// Indicates a normal closure, meaning that the purpose for 26 | /// which the connection was established has been fulfilled. 27 | Normal, 28 | /// Indicates that an endpoint is "going away", such as a server 29 | /// going down or a browser having navigated away from a page. 30 | Away, 31 | /// Indicates that an endpoint is terminating the connection due 32 | /// to a protocol error. 33 | Protocol, 34 | /// Indicates that an endpoint is terminating the connection 35 | /// because it has received a type of data it cannot accept (e.g., an 36 | /// endpoint that understands only text data MAY send this if it 37 | /// receives a binary message). 38 | Unsupported, 39 | /// Indicates that no status code was included in a closing frame. This 40 | /// close code makes it possible to use a single method, `on_close` to 41 | /// handle even cases where no close code was provided. 42 | Status, 43 | /// Indicates an abnormal closure. If the abnormal closure was due to an 44 | /// error, this close code will not be used. Instead, the `on_error` method 45 | /// of the handler will be called with the error. However, if the connection 46 | /// is simply dropped, without an error, this close code will be sent to the 47 | /// handler. 48 | Abnormal, 49 | /// Indicates that an endpoint is terminating the connection 50 | /// because it has received data within a message that was not 51 | /// consistent with the type of the message (e.g., non-UTF-8 \[RFC3629\] 52 | /// data within a text message). 53 | Invalid, 54 | /// Indicates that an endpoint is terminating the connection 55 | /// because it has received a message that violates its policy. This 56 | /// is a generic status code that can be returned when there is no 57 | /// other more suitable status code (e.g., Unsupported or Size) or if there 58 | /// is a need to hide specific details about the policy. 59 | Policy, 60 | /// Indicates that an endpoint is terminating the connection 61 | /// because it has received a message that is too big for it to 62 | /// process. 63 | Size, 64 | /// Indicates that an endpoint (client) is terminating the 65 | /// connection because it has expected the server to negotiate one or 66 | /// more extension, but the server didn't return them in the response 67 | /// message of the WebSocket handshake. The list of extensions that 68 | /// are needed should be given as the reason for closing. 69 | /// Note that this status code is not used by the server, because it 70 | /// can fail the WebSocket handshake instead. 71 | Extension, 72 | /// Indicates that a server is terminating the connection because 73 | /// it encountered an unexpected condition that prevented it from 74 | /// fulfilling the request. 75 | Error, 76 | /// Indicates that the server is restarting. A client may choose to reconnect, 77 | /// and if it does, it should use a randomized delay of 5-30 seconds between attempts. 78 | Restart, 79 | /// Indicates that the server is overloaded and the client should either connect 80 | /// to a different IP (when multiple targets exist), or reconnect to the same IP 81 | /// when a user has performed an action. 82 | Again, 83 | #[doc(hidden)] 84 | Tls, 85 | #[doc(hidden)] 86 | Reserved(u16), 87 | #[doc(hidden)] 88 | Iana(u16), 89 | #[doc(hidden)] 90 | Library(u16), 91 | #[doc(hidden)] 92 | Bad(u16), 93 | } 94 | 95 | impl CloseCode { 96 | /// Check if this CloseCode is allowed. 97 | pub fn is_allowed(self) -> bool { 98 | !matches!(self, Bad(_) | Reserved(_) | Status | Abnormal | Tls) 99 | } 100 | } 101 | 102 | impl From for CloseCode { 103 | fn from(code: u16) -> CloseCode { 104 | match code { 105 | 1000 => Normal, 106 | 1001 => Away, 107 | 1002 => Protocol, 108 | 1003 => Unsupported, 109 | 1005 => Status, 110 | 1006 => Abnormal, 111 | 1007 => Invalid, 112 | 1008 => Policy, 113 | 1009 => Size, 114 | 1010 => Extension, 115 | 1011 => Error, 116 | 1012 => Restart, 117 | 1013 => Again, 118 | 1015 => Tls, 119 | 1..=999 => Bad(code), 120 | 1016..=2999 => Reserved(code), 121 | 3000..=3999 => Iana(code), 122 | 4000..=4999 => Library(code), 123 | _ => Bad(code), 124 | } 125 | } 126 | } 127 | 128 | impl From for u16 { 129 | fn from(code: CloseCode) -> u16 { 130 | match code { 131 | Normal => 1000, 132 | Away => 1001, 133 | Protocol => 1002, 134 | Unsupported => 1003, 135 | Status => 1005, 136 | Abnormal => 1006, 137 | Invalid => 1007, 138 | Policy => 1008, 139 | Size => 1009, 140 | Extension => 1010, 141 | Error => 1011, 142 | Restart => 1012, 143 | Again => 1013, 144 | Tls => 1015, 145 | Reserved(code) => code, 146 | Iana(code) => code, 147 | Library(code) => code, 148 | Bad(code) => code, 149 | } 150 | } 151 | } 152 | -------------------------------------------------------------------------------- /src/upgrade.rs: -------------------------------------------------------------------------------- 1 | // Port of hyper_tunstenite for fastwebsockets. 2 | // https://github.com/de-vri-es/hyper-tungstenite-rs 3 | // 4 | // Copyright 2021, Maarten de Vries maarten@de-vri.es 5 | // BSD 2-Clause "Simplified" License 6 | // 7 | // Copyright 2023 Divy Srivastava 8 | // 9 | // Licensed under the Apache License, Version 2.0 (the "License"); 10 | // you may not use this file except in compliance with the License. 11 | // You may obtain a copy of the License at 12 | // 13 | // http://www.apache.org/licenses/LICENSE-2.0 14 | // 15 | // Unless required by applicable law or agreed to in writing, software 16 | // distributed under the License is distributed on an "AS IS" BASIS, 17 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 18 | // See the License for the specific language governing permissions and 19 | // limitations under the License. 20 | 21 | use base64; 22 | use base64::engine::general_purpose::STANDARD; 23 | use base64::Engine; 24 | use http_body_util::Empty; 25 | use hyper::body::Bytes; 26 | use hyper::Request; 27 | use hyper::Response; 28 | use hyper_util::rt::TokioIo; 29 | use pin_project::pin_project; 30 | use sha1::Digest; 31 | use sha1::Sha1; 32 | use std::pin::Pin; 33 | use std::task::Context; 34 | use std::task::Poll; 35 | 36 | use crate::Role; 37 | use crate::WebSocket; 38 | use crate::WebSocketError; 39 | 40 | fn sec_websocket_protocol(key: &[u8]) -> String { 41 | let mut sha1 = Sha1::new(); 42 | sha1.update(key); 43 | sha1.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); // magic string 44 | let result = sha1.finalize(); 45 | STANDARD.encode(&result[..]) 46 | } 47 | 48 | type Error = WebSocketError; 49 | 50 | pub struct IncomingUpgrade { 51 | key: String, 52 | on_upgrade: hyper::upgrade::OnUpgrade, 53 | } 54 | 55 | impl IncomingUpgrade { 56 | pub fn upgrade(self) -> Result<(Response>, UpgradeFut), Error> { 57 | let response = Response::builder() 58 | .status(hyper::StatusCode::SWITCHING_PROTOCOLS) 59 | .header(hyper::header::CONNECTION, "upgrade") 60 | .header(hyper::header::UPGRADE, "websocket") 61 | .header("Sec-WebSocket-Accept", self.key) 62 | .body(Empty::new()) 63 | .expect("bug: failed to build response"); 64 | 65 | let stream = UpgradeFut { 66 | inner: self.on_upgrade, 67 | }; 68 | 69 | Ok((response, stream)) 70 | } 71 | } 72 | 73 | #[cfg(feature = "with_axum")] 74 | impl axum_core::extract::FromRequestParts for IncomingUpgrade 75 | where 76 | S: Send + Sync, 77 | { 78 | type Rejection = hyper::StatusCode; 79 | 80 | async fn from_request_parts( 81 | parts: &mut http::request::Parts, 82 | _state: &S, 83 | ) -> Result { 84 | let key = parts 85 | .headers 86 | .get("Sec-WebSocket-Key") 87 | .ok_or(hyper::StatusCode::BAD_REQUEST)?; 88 | if parts 89 | .headers 90 | .get("Sec-WebSocket-Version") 91 | .map(|v| v.as_bytes()) 92 | != Some(b"13") 93 | { 94 | return Err(hyper::StatusCode::BAD_REQUEST); 95 | } 96 | 97 | let on_upgrade = parts 98 | .extensions 99 | .remove::() 100 | .ok_or(hyper::StatusCode::BAD_REQUEST)?; 101 | Ok(Self { 102 | on_upgrade, 103 | key: sec_websocket_protocol(key.as_bytes()), 104 | }) 105 | } 106 | } 107 | 108 | /// A future that resolves to a websocket stream when the associated HTTP upgrade completes. 109 | #[pin_project] 110 | #[derive(Debug)] 111 | pub struct UpgradeFut { 112 | #[pin] 113 | inner: hyper::upgrade::OnUpgrade, 114 | } 115 | 116 | /// Try to upgrade a received `hyper::Request` to a websocket connection. 117 | /// 118 | /// The function returns a HTTP response and a future that resolves to the websocket stream. 119 | /// The response body *MUST* be sent to the client before the future can be resolved. 120 | /// 121 | /// This functions checks `Sec-WebSocket-Key` and `Sec-WebSocket-Version` headers. 122 | /// It does not inspect the `Origin`, `Sec-WebSocket-Protocol` or `Sec-WebSocket-Extensions` headers. 123 | /// You can inspect the headers manually before calling this function, 124 | /// and modify the response headers appropriately. 125 | /// 126 | /// This function also does not look at the `Connection` or `Upgrade` headers. 127 | /// To check if a request is a websocket upgrade request, you can use [`is_upgrade_request`]. 128 | /// Alternatively you can inspect the `Connection` and `Upgrade` headers manually. 129 | /// 130 | pub fn upgrade( 131 | mut request: impl std::borrow::BorrowMut>, 132 | ) -> Result<(Response>, UpgradeFut), Error> { 133 | let request = request.borrow_mut(); 134 | 135 | let key = request 136 | .headers() 137 | .get("Sec-WebSocket-Key") 138 | .ok_or(WebSocketError::MissingSecWebSocketKey)?; 139 | if request 140 | .headers() 141 | .get("Sec-WebSocket-Version") 142 | .map(|v| v.as_bytes()) 143 | != Some(b"13") 144 | { 145 | return Err(WebSocketError::InvalidSecWebsocketVersion); 146 | } 147 | 148 | let response = Response::builder() 149 | .status(hyper::StatusCode::SWITCHING_PROTOCOLS) 150 | .header(hyper::header::CONNECTION, "upgrade") 151 | .header(hyper::header::UPGRADE, "websocket") 152 | .header( 153 | "Sec-WebSocket-Accept", 154 | &sec_websocket_protocol(key.as_bytes()), 155 | ) 156 | .body(Empty::new()) 157 | .expect("bug: failed to build response"); 158 | 159 | let stream = UpgradeFut { 160 | inner: hyper::upgrade::on(request), 161 | }; 162 | 163 | Ok((response, stream)) 164 | } 165 | 166 | /// Check if a request is a websocket upgrade request. 167 | /// 168 | /// If the `Upgrade` header lists multiple protocols, 169 | /// this function returns true if of them are `"websocket"`, 170 | /// If the server supports multiple upgrade protocols, 171 | /// it would be more appropriate to try each listed protocol in order. 172 | pub fn is_upgrade_request(request: &hyper::Request) -> bool { 173 | header_contains_value(request.headers(), hyper::header::CONNECTION, "Upgrade") 174 | && header_contains_value( 175 | request.headers(), 176 | hyper::header::UPGRADE, 177 | "websocket", 178 | ) 179 | } 180 | 181 | /// Check if there is a header of the given name containing the wanted value. 182 | fn header_contains_value( 183 | headers: &hyper::HeaderMap, 184 | header: impl hyper::header::AsHeaderName, 185 | value: impl AsRef<[u8]>, 186 | ) -> bool { 187 | let value = value.as_ref(); 188 | for header in headers.get_all(header) { 189 | if header 190 | .as_bytes() 191 | .split(|&c| c == b',') 192 | .any(|x| trim(x).eq_ignore_ascii_case(value)) 193 | { 194 | return true; 195 | } 196 | } 197 | false 198 | } 199 | 200 | fn trim(data: &[u8]) -> &[u8] { 201 | trim_end(trim_start(data)) 202 | } 203 | 204 | fn trim_start(data: &[u8]) -> &[u8] { 205 | if let Some(start) = data.iter().position(|x| !x.is_ascii_whitespace()) { 206 | &data[start..] 207 | } else { 208 | b"" 209 | } 210 | } 211 | 212 | fn trim_end(data: &[u8]) -> &[u8] { 213 | if let Some(last) = data.iter().rposition(|x| !x.is_ascii_whitespace()) { 214 | &data[..last + 1] 215 | } else { 216 | b"" 217 | } 218 | } 219 | 220 | impl std::future::Future for UpgradeFut { 221 | type Output = Result>, Error>; 222 | 223 | fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { 224 | let this = self.project(); 225 | let upgraded = match this.inner.poll(cx) { 226 | Poll::Pending => return Poll::Pending, 227 | Poll::Ready(x) => x, 228 | }; 229 | Poll::Ready(Ok(WebSocket::after_handshake( 230 | TokioIo::new(upgraded?), 231 | Role::Server, 232 | ))) 233 | } 234 | } 235 | -------------------------------------------------------------------------------- /benches/100-20-chart.svg: -------------------------------------------------------------------------------- 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | 32 | 33 | 34 | 35 | 36 | 37 | 38 | 39 | 40 | 41 | 42 | 43 | 44 | 45 | 46 | 47 | 48 | 49 | 50 | 51 | 52 | 53 | 54 | 55 | 56 | 57 | 58 | 59 | 60 | 61 | 62 | 63 | 64 | 65 | 66 | 67 | 68 | 69 | 70 | 71 | 72 | 73 | 74 | 75 | 76 | 77 | 78 | 79 | Connections: 100, Payload size: 20 81 | 82 | 83 | 84 | 85 | uWebSockets 87 | 88 | fastwebsockets 90 | 91 | tokio-tungstenite 93 | 94 | rust-websocket 96 | 97 | 98 | 99 | 100 | 0 102 | 103 | 50,000 105 | 106 | 100,000 108 | 109 | 150,000 111 | 112 | 200,000 114 | 115 | 250,000 117 | 118 | 119 | 120 | 121 | 122 | 123 | 124 | 125 | 126 | 127 | 128 | 129 | -------------------------------------------------------------------------------- /src/fragment.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | #[cfg(feature = "unstable-split")] 16 | use std::future::Future; 17 | 18 | use crate::error::WebSocketError; 19 | use crate::frame::Frame; 20 | use crate::OpCode; 21 | use crate::ReadHalf; 22 | use crate::WebSocket; 23 | #[cfg(feature = "unstable-split")] 24 | use crate::WebSocketRead; 25 | use crate::WriteHalf; 26 | use tokio::io::AsyncRead; 27 | use tokio::io::AsyncWrite; 28 | 29 | pub enum Fragment { 30 | Text(Option, Vec), 31 | Binary(Vec), 32 | } 33 | 34 | impl Fragment { 35 | /// Returns the payload of the fragment. 36 | fn take_buffer(self) -> Vec { 37 | match self { 38 | Fragment::Text(_, buffer) => buffer, 39 | Fragment::Binary(buffer) => buffer, 40 | } 41 | } 42 | } 43 | 44 | /// Collects fragmented messages over a WebSocket connection and returns the completed message once all fragments have been received. 45 | /// 46 | /// This is useful for applications that do not want to deal with fragmented messages and the default behavior of tungstenite. 47 | /// The payload is buffered in memory until the final fragment is received 48 | /// so use this when streaming messages is not an option. 49 | /// 50 | /// # Example 51 | /// 52 | /// ``` 53 | /// use tokio::net::TcpStream; 54 | /// use fastwebsockets::{WebSocket, FragmentCollector, OpCode, Role}; 55 | /// use anyhow::Result; 56 | /// 57 | /// async fn handle_client( 58 | /// socket: TcpStream, 59 | /// ) -> Result<()> { 60 | /// let ws = WebSocket::after_handshake(socket, Role::Server); 61 | /// let mut ws = FragmentCollector::new(ws); 62 | /// 63 | /// loop { 64 | /// let frame = ws.read_frame().await?; 65 | /// match frame.opcode { 66 | /// OpCode::Close => break, 67 | /// OpCode::Text | OpCode::Binary => { 68 | /// ws.write_frame(frame).await?; 69 | /// } 70 | /// _ => {} 71 | /// } 72 | /// } 73 | /// Ok(()) 74 | /// } 75 | /// ``` 76 | /// 77 | pub struct FragmentCollector { 78 | stream: S, 79 | read_half: ReadHalf, 80 | write_half: WriteHalf, 81 | fragments: Fragments, 82 | } 83 | 84 | impl<'f, S> FragmentCollector { 85 | /// Creates a new `FragmentCollector` with the provided `WebSocket`. 86 | pub fn new(ws: WebSocket) -> FragmentCollector 87 | where 88 | S: AsyncRead + AsyncWrite + Unpin, 89 | { 90 | let (stream, read_half, write_half) = ws.into_parts_internal(); 91 | FragmentCollector { 92 | stream, 93 | read_half, 94 | write_half, 95 | fragments: Fragments::new(), 96 | } 97 | } 98 | 99 | /// Reads a WebSocket frame, collecting fragmented messages until the final frame is received and returns the completed message. 100 | /// 101 | /// Text frames payload is guaranteed to be valid UTF-8. 102 | pub async fn read_frame(&mut self) -> Result, WebSocketError> 103 | where 104 | S: AsyncRead + AsyncWrite + Unpin, 105 | { 106 | loop { 107 | let (res, obligated_send) = 108 | self.read_half.read_frame_inner(&mut self.stream).await; 109 | let is_closed = self.write_half.closed; 110 | if let Some(obligated_send) = obligated_send { 111 | if !is_closed { 112 | self.write_frame(obligated_send).await?; 113 | } 114 | } 115 | let Some(frame) = res? else { 116 | continue; 117 | }; 118 | if is_closed && frame.opcode != OpCode::Close { 119 | return Err(WebSocketError::ConnectionClosed); 120 | } 121 | if let Some(frame) = self.fragments.accumulate(frame)? { 122 | return Ok(frame); 123 | } 124 | } 125 | } 126 | 127 | /// See `WebSocket::write_frame`. 128 | pub async fn write_frame( 129 | &mut self, 130 | frame: Frame<'f>, 131 | ) -> Result<(), WebSocketError> 132 | where 133 | S: AsyncRead + AsyncWrite + Unpin, 134 | { 135 | self.write_half.write_frame(&mut self.stream, frame).await?; 136 | Ok(()) 137 | } 138 | 139 | /// Consumes the `FragmentCollector` and returns the underlying stream. 140 | #[inline] 141 | pub fn into_inner(self) -> S { 142 | self.stream 143 | } 144 | } 145 | 146 | #[cfg(feature = "unstable-split")] 147 | pub struct FragmentCollectorRead { 148 | stream: S, 149 | read_half: ReadHalf, 150 | fragments: Fragments, 151 | } 152 | 153 | #[cfg(feature = "unstable-split")] 154 | impl<'f, S> FragmentCollectorRead { 155 | /// Creates a new `FragmentCollector` with the provided `WebSocket`. 156 | pub fn new(ws: WebSocketRead) -> FragmentCollectorRead 157 | where 158 | S: AsyncRead + Unpin, 159 | { 160 | let (stream, read_half) = ws.into_parts_internal(); 161 | FragmentCollectorRead { 162 | stream, 163 | read_half, 164 | fragments: Fragments::new(), 165 | } 166 | } 167 | 168 | /// Reads a WebSocket frame, collecting fragmented messages until the final frame is received and returns the completed message. 169 | /// 170 | /// Text frames payload is guaranteed to be valid UTF-8. 171 | /// 172 | /// # Arguments 173 | /// 174 | /// * `send_fn`: Closure must ensure frames are sent by write side of split WebSocket to correctly implement auto-close and auto-pong. 175 | pub async fn read_frame( 176 | &mut self, 177 | send_fn: &mut impl FnMut(Frame<'f>) -> R, 178 | ) -> Result, WebSocketError> 179 | where 180 | S: AsyncRead + Unpin, 181 | E: Into>, 182 | R: Future>, 183 | { 184 | loop { 185 | let (res, obligated_send) = 186 | self.read_half.read_frame_inner(&mut self.stream).await; 187 | if let Some(frame) = obligated_send { 188 | let res = send_fn(frame).await; 189 | res.map_err(|e| WebSocketError::SendError(e.into()))?; 190 | } 191 | let Some(frame) = res? else { 192 | continue; 193 | }; 194 | if let Some(frame) = self.fragments.accumulate(frame)? { 195 | return Ok(frame); 196 | } 197 | } 198 | } 199 | } 200 | 201 | /// Accumulates potentially fragmented [`Frame`]s to defragment the incoming WebSocket stream. 202 | struct Fragments { 203 | fragments: Option, 204 | opcode: OpCode, 205 | } 206 | 207 | impl Fragments { 208 | pub fn new() -> Self { 209 | Self { 210 | fragments: None, 211 | opcode: OpCode::Close, 212 | } 213 | } 214 | 215 | pub fn accumulate<'f>( 216 | &mut self, 217 | frame: Frame<'f>, 218 | ) -> Result>, WebSocketError> { 219 | match frame.opcode { 220 | OpCode::Text | OpCode::Binary => { 221 | if frame.fin { 222 | if self.fragments.is_some() { 223 | return Err(WebSocketError::InvalidFragment); 224 | } 225 | return Ok(Some(Frame::new(true, frame.opcode, None, frame.payload))); 226 | } else { 227 | self.fragments = match frame.opcode { 228 | OpCode::Text => match utf8::decode(&frame.payload) { 229 | Ok(text) => Some(Fragment::Text(None, text.as_bytes().to_vec())), 230 | Err(utf8::DecodeError::Incomplete { 231 | valid_prefix, 232 | incomplete_suffix, 233 | }) => Some(Fragment::Text( 234 | Some(incomplete_suffix), 235 | valid_prefix.as_bytes().to_vec(), 236 | )), 237 | Err(utf8::DecodeError::Invalid { .. }) => { 238 | return Err(WebSocketError::InvalidUTF8); 239 | } 240 | }, 241 | OpCode::Binary => Some(Fragment::Binary(frame.payload.into())), 242 | _ => unreachable!(), 243 | }; 244 | self.opcode = frame.opcode; 245 | } 246 | } 247 | OpCode::Continuation => match self.fragments.as_mut() { 248 | None => { 249 | return Err(WebSocketError::InvalidContinuationFrame); 250 | } 251 | Some(Fragment::Text(data, input)) => { 252 | let mut tail = &frame.payload[..]; 253 | if let Some(mut incomplete) = data.take() { 254 | if let Some((result, rest)) = 255 | incomplete.try_complete(&frame.payload) 256 | { 257 | tail = rest; 258 | match result { 259 | Ok(text) => { 260 | input.extend_from_slice(text.as_bytes()); 261 | } 262 | Err(_) => { 263 | return Err(WebSocketError::InvalidUTF8); 264 | } 265 | } 266 | } else { 267 | tail = &[]; 268 | data.replace(incomplete); 269 | } 270 | } 271 | 272 | match utf8::decode(tail) { 273 | Ok(text) => { 274 | input.extend_from_slice(text.as_bytes()); 275 | } 276 | Err(utf8::DecodeError::Incomplete { 277 | valid_prefix, 278 | incomplete_suffix, 279 | }) => { 280 | input.extend_from_slice(valid_prefix.as_bytes()); 281 | *data = Some(incomplete_suffix); 282 | } 283 | Err(utf8::DecodeError::Invalid { valid_prefix, .. }) => { 284 | input.extend_from_slice(valid_prefix.as_bytes()); 285 | return Err(WebSocketError::InvalidUTF8); 286 | } 287 | } 288 | 289 | if frame.fin { 290 | return Ok(Some(Frame::new( 291 | true, 292 | self.opcode, 293 | None, 294 | self.fragments.take().unwrap().take_buffer().into(), 295 | ))); 296 | } 297 | } 298 | Some(Fragment::Binary(data)) => { 299 | data.extend_from_slice(&frame.payload); 300 | if frame.fin { 301 | return Ok(Some(Frame::new( 302 | true, 303 | self.opcode, 304 | None, 305 | self.fragments.take().unwrap().take_buffer().into(), 306 | ))); 307 | } 308 | } 309 | }, 310 | _ => return Ok(Some(frame)), 311 | } 312 | 313 | Ok(None) 314 | } 315 | } 316 | -------------------------------------------------------------------------------- /src/frame.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | use tokio::io::AsyncWriteExt; 16 | 17 | use bytes::BytesMut; 18 | use core::ops::Deref; 19 | 20 | use crate::WebSocketError; 21 | 22 | macro_rules! repr_u8 { 23 | ($(#[$meta:meta])* $vis:vis enum $name:ident { 24 | $($(#[$vmeta:meta])* $vname:ident $(= $val:expr)?,)* 25 | }) => { 26 | $(#[$meta])* 27 | $vis enum $name { 28 | $($(#[$vmeta])* $vname $(= $val)?,)* 29 | } 30 | 31 | impl core::convert::TryFrom for $name { 32 | type Error = WebSocketError; 33 | 34 | fn try_from(v: u8) -> Result { 35 | match v { 36 | $(x if x == $name::$vname as u8 => Ok($name::$vname),)* 37 | _ => Err(WebSocketError::InvalidValue), 38 | } 39 | } 40 | } 41 | } 42 | } 43 | 44 | pub enum Payload<'a> { 45 | BorrowedMut(&'a mut [u8]), 46 | Borrowed(&'a [u8]), 47 | Owned(Vec), 48 | Bytes(BytesMut), 49 | } 50 | 51 | impl<'a> core::fmt::Debug for Payload<'a> { 52 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 53 | f.debug_struct("Payload").field("len", &self.len()).finish() 54 | } 55 | } 56 | 57 | impl Deref for Payload<'_> { 58 | type Target = [u8]; 59 | 60 | fn deref(&self) -> &Self::Target { 61 | match self { 62 | Payload::Borrowed(borrowed) => borrowed, 63 | Payload::BorrowedMut(borrowed_mut) => borrowed_mut, 64 | Payload::Owned(owned) => owned.as_ref(), 65 | Payload::Bytes(b) => b.as_ref(), 66 | } 67 | } 68 | } 69 | 70 | impl<'a> From<&'a mut [u8]> for Payload<'a> { 71 | fn from(borrowed: &'a mut [u8]) -> Payload<'a> { 72 | Payload::BorrowedMut(borrowed) 73 | } 74 | } 75 | 76 | impl<'a> From<&'a [u8]> for Payload<'a> { 77 | fn from(borrowed: &'a [u8]) -> Payload<'a> { 78 | Payload::Borrowed(borrowed) 79 | } 80 | } 81 | 82 | impl From> for Payload<'_> { 83 | fn from(owned: Vec) -> Self { 84 | Payload::Owned(owned) 85 | } 86 | } 87 | 88 | impl From> for Vec { 89 | fn from(cow: Payload<'_>) -> Self { 90 | match cow { 91 | Payload::Borrowed(borrowed) => borrowed.to_vec(), 92 | Payload::BorrowedMut(borrowed_mut) => borrowed_mut.to_vec(), 93 | Payload::Owned(owned) => owned, 94 | Payload::Bytes(b) => Vec::from(b), 95 | } 96 | } 97 | } 98 | 99 | impl Payload<'_> { 100 | #[inline(always)] 101 | pub fn to_mut(&mut self) -> &mut [u8] { 102 | match self { 103 | Payload::Borrowed(borrowed) => { 104 | *self = Payload::Owned(borrowed.to_owned()); 105 | match self { 106 | Payload::Owned(owned) => owned, 107 | _ => unreachable!(), 108 | } 109 | } 110 | Payload::BorrowedMut(borrowed) => borrowed, 111 | Payload::Owned(ref mut owned) => owned, 112 | Payload::Bytes(b) => b.as_mut(), 113 | } 114 | } 115 | } 116 | 117 | impl<'a> PartialEq<&'_ [u8]> for Payload<'a> { 118 | fn eq(&self, other: &&'_ [u8]) -> bool { 119 | self.deref() == *other 120 | } 121 | } 122 | 123 | impl<'a, const N: usize> PartialEq<&'_ [u8; N]> for Payload<'a> { 124 | fn eq(&self, other: &&'_ [u8; N]) -> bool { 125 | self.deref() == *other 126 | } 127 | } 128 | 129 | /// Represents a WebSocket frame. 130 | pub struct Frame<'f> { 131 | /// Indicates if this is the final frame in a message. 132 | pub fin: bool, 133 | /// The opcode of the frame. 134 | pub opcode: OpCode, 135 | /// The masking key of the frame, if any. 136 | mask: Option<[u8; 4]>, 137 | /// The payload of the frame. 138 | pub payload: Payload<'f>, 139 | } 140 | 141 | const MAX_HEAD_SIZE: usize = 16; 142 | 143 | impl<'f> Frame<'f> { 144 | /// Creates a new WebSocket `Frame`. 145 | pub fn new( 146 | fin: bool, 147 | opcode: OpCode, 148 | mask: Option<[u8; 4]>, 149 | payload: Payload<'f>, 150 | ) -> Self { 151 | Self { 152 | fin, 153 | opcode, 154 | mask, 155 | payload, 156 | } 157 | } 158 | 159 | /// Create a new WebSocket text `Frame`. 160 | /// 161 | /// This is a convenience method for `Frame::new(true, OpCode::Text, None, payload)`. 162 | /// 163 | /// This method does not check if the payload is valid UTF-8. 164 | pub fn text(payload: Payload<'f>) -> Self { 165 | Self { 166 | fin: true, 167 | opcode: OpCode::Text, 168 | mask: None, 169 | payload, 170 | } 171 | } 172 | 173 | /// Create a new WebSocket binary `Frame`. 174 | /// 175 | /// This is a convenience method for `Frame::new(true, OpCode::Binary, None, payload)`. 176 | pub fn binary(payload: Payload<'f>) -> Self { 177 | Self { 178 | fin: true, 179 | opcode: OpCode::Binary, 180 | mask: None, 181 | payload, 182 | } 183 | } 184 | 185 | /// Create a new WebSocket close `Frame`. 186 | /// 187 | /// This is a convenience method for `Frame::new(true, OpCode::Close, None, payload)`. 188 | /// 189 | /// This method does not check if `code` is a valid close code and `reason` is valid UTF-8. 190 | pub fn close(code: u16, reason: &[u8]) -> Self { 191 | let mut payload = Vec::with_capacity(2 + reason.len()); 192 | payload.extend_from_slice(&code.to_be_bytes()); 193 | payload.extend_from_slice(reason); 194 | 195 | Self { 196 | fin: true, 197 | opcode: OpCode::Close, 198 | mask: None, 199 | payload: payload.into(), 200 | } 201 | } 202 | 203 | /// Create a new WebSocket close `Frame` with a raw payload. 204 | /// 205 | /// This is a convenience method for `Frame::new(true, OpCode::Close, None, payload)`. 206 | /// 207 | /// This method does not check if `payload` is valid Close frame payload. 208 | pub fn close_raw(payload: Payload<'f>) -> Self { 209 | Self { 210 | fin: true, 211 | opcode: OpCode::Close, 212 | mask: None, 213 | payload, 214 | } 215 | } 216 | 217 | /// Create a new WebSocket pong `Frame`. 218 | /// 219 | /// This is a convenience method for `Frame::new(true, OpCode::Pong, None, payload)`. 220 | pub fn pong(payload: Payload<'f>) -> Self { 221 | Self { 222 | fin: true, 223 | opcode: OpCode::Pong, 224 | mask: None, 225 | payload, 226 | } 227 | } 228 | 229 | /// Checks if the frame payload is valid UTF-8. 230 | pub fn is_utf8(&self) -> bool { 231 | #[cfg(feature = "simd")] 232 | return simdutf8::basic::from_utf8(&self.payload).is_ok(); 233 | 234 | #[cfg(not(feature = "simd"))] 235 | return std::str::from_utf8(&self.payload).is_ok(); 236 | } 237 | 238 | pub fn mask(&mut self) { 239 | if let Some(mask) = self.mask { 240 | crate::mask::unmask(self.payload.to_mut(), mask); 241 | } else { 242 | let mask: [u8; 4] = rand::random(); 243 | crate::mask::unmask(self.payload.to_mut(), mask); 244 | self.mask = Some(mask); 245 | } 246 | } 247 | 248 | /// Unmasks the frame payload in-place. This method does nothing if the frame is not masked. 249 | /// 250 | /// Note: By default, the frame payload is unmasked by `WebSocket::read_frame`. 251 | pub fn unmask(&mut self) { 252 | if let Some(mask) = self.mask { 253 | crate::mask::unmask(self.payload.to_mut(), mask); 254 | } 255 | } 256 | 257 | /// Formats the frame header into the head buffer. Returns the size of the length field. 258 | /// 259 | /// # Panics 260 | /// 261 | /// This method panics if the head buffer is not at least n-bytes long, where n is the size of the length field (0, 2, 4, or 10) 262 | pub fn fmt_head(&mut self, head: &mut [u8]) -> usize { 263 | head[0] = (self.fin as u8) << 7 | (self.opcode as u8); 264 | 265 | let len = self.payload.len(); 266 | let size = if len < 126 { 267 | head[1] = len as u8; 268 | 2 269 | } else if len < 65536 { 270 | head[1] = 126; 271 | head[2..4].copy_from_slice(&(len as u16).to_be_bytes()); 272 | 4 273 | } else { 274 | head[1] = 127; 275 | head[2..10].copy_from_slice(&(len as u64).to_be_bytes()); 276 | 10 277 | }; 278 | 279 | if let Some(mask) = self.mask { 280 | head[1] |= 0x80; 281 | head[size..size + 4].copy_from_slice(&mask); 282 | size + 4 283 | } else { 284 | size 285 | } 286 | } 287 | 288 | pub async fn writev( 289 | &mut self, 290 | stream: &mut S, 291 | ) -> Result<(), std::io::Error> 292 | where 293 | S: AsyncWriteExt + Unpin, 294 | { 295 | use std::io::IoSlice; 296 | 297 | let mut head = [0; MAX_HEAD_SIZE]; 298 | let size = self.fmt_head(&mut head); 299 | 300 | let total = size + self.payload.len(); 301 | 302 | let mut b = [IoSlice::new(&head[..size]), IoSlice::new(&self.payload)]; 303 | 304 | let mut n = stream.write_vectored(&b).await?; 305 | if n == total { 306 | return Ok(()); 307 | } 308 | 309 | // Slightly more optimized than (unstable) write_all_vectored for 2 iovecs. 310 | while n <= size { 311 | b[0] = IoSlice::new(&head[n..size]); 312 | n += stream.write_vectored(&b).await?; 313 | } 314 | 315 | // Header out of the way. 316 | if n < total && n > size { 317 | stream.write_all(&self.payload[n - size..]).await?; 318 | } 319 | 320 | Ok(()) 321 | } 322 | 323 | /// Writes the frame to the buffer and returns a slice of the buffer containing the frame. 324 | pub fn write<'a>(&mut self, buf: &'a mut Vec) -> &'a [u8] { 325 | fn reserve_enough(buf: &mut Vec, len: usize) { 326 | if buf.len() < len { 327 | buf.resize(len, 0); 328 | } 329 | } 330 | let len = self.payload.len(); 331 | reserve_enough(buf, len + MAX_HEAD_SIZE); 332 | 333 | let size = self.fmt_head(buf); 334 | buf[size..size + len].copy_from_slice(&self.payload); 335 | &buf[..size + len] 336 | } 337 | } 338 | 339 | repr_u8! { 340 | #[repr(u8)] 341 | #[derive(Debug, Copy, Clone, PartialEq, Eq)] 342 | pub enum OpCode { 343 | Continuation = 0x0, 344 | Text = 0x1, 345 | Binary = 0x2, 346 | Close = 0x8, 347 | Ping = 0x9, 348 | Pong = 0xA, 349 | } 350 | } 351 | 352 | #[inline] 353 | pub fn is_control(opcode: OpCode) -> bool { 354 | matches!(opcode, OpCode::Close | OpCode::Ping | OpCode::Pong) 355 | } 356 | -------------------------------------------------------------------------------- /benches/load_test.c: -------------------------------------------------------------------------------- 1 | /* This is a simple yet efficient WebSocket server benchmark much like WRK */ 2 | 3 | #define _BSD_SOURCE 4 | 5 | #ifdef __APPLE__ 6 | #include 7 | 8 | #define htobe16(x) OSSwapHostToBigInt16(x) 9 | #define htole16(x) OSSwapHostToLittleInt16(x) 10 | #define be16toh(x) OSSwapBigToHostInt16(x) 11 | #define le16toh(x) OSSwapLittleToHostInt16(x) 12 | 13 | #define htobe32(x) OSSwapHostToBigInt32(x) 14 | #define htole32(x) OSSwapHostToLittleInt32(x) 15 | #define be32toh(x) OSSwapBigToHostInt32(x) 16 | #define le32toh(x) OSSwapLittleToHostInt32(x) 17 | 18 | #define htobe64(x) OSSwapHostToBigInt64(x) 19 | #define htole64(x) OSSwapHostToLittleInt64(x) 20 | #define be64toh(x) OSSwapBigToHostInt64(x) 21 | #define le64toh(x) OSSwapLittleToHostInt64(x) 22 | #else 23 | #include 24 | #endif 25 | 26 | #include 27 | 28 | #include 29 | int SSL; 30 | 31 | #include 32 | #include 33 | #include 34 | /* Whatever type we selected (compressed or not) */ 35 | unsigned char *web_socket_request; 36 | int web_socket_request_size; 37 | 38 | char *upgrade_request; 39 | int upgrade_request_length; 40 | 41 | /* Compressed message */ 42 | unsigned char web_socket_request_deflate[13] = { 43 | 130 | 64, 128 | 7, 0, 0, 0, 0, 0xf2, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00}; 44 | 45 | /* Not compressed */ 46 | unsigned char web_socket_request_text_small[26] = {130, 128 | 20, 1, 2, 3, 4}; 47 | unsigned int web_socket_request_text_size = 26; 48 | unsigned char *web_socket_request_text = web_socket_request_text_small; 49 | 50 | /* Called to swap from small text message to big text message */ 51 | void init_big_message(unsigned int size) { 52 | if (size < 65536) { 53 | web_socket_request_text_size = size + 6 + 2; 54 | 55 | web_socket_request_text = malloc(web_socket_request_text_size); 56 | web_socket_request_text[0] = 130; 57 | web_socket_request_text[1] = 126 | 0x80; 58 | uint16_t msg_size = htobe16(size); 59 | memcpy(&web_socket_request_text[2], &msg_size, 2); 60 | web_socket_request_text[4] = 1; 61 | web_socket_request_text[4] = 2; 62 | web_socket_request_text[4] = 3; 63 | web_socket_request_text[4] = 4; 64 | 65 | return; 66 | } 67 | 68 | web_socket_request_text_size = size + 6 + 8; 69 | 70 | web_socket_request_text = malloc(web_socket_request_text_size); 71 | web_socket_request_text[0] = 130; 72 | web_socket_request_text[1] = 255; 73 | uint64_t msg_size = htobe64(size); 74 | memcpy(&web_socket_request_text[2], &msg_size, 8); 75 | web_socket_request_text[10] = 1; 76 | web_socket_request_text[10] = 2; 77 | web_socket_request_text[10] = 3; 78 | web_socket_request_text[10] = 4; 79 | } 80 | 81 | char request_deflate[] = 82 | "GET / HTTP/1.1\r\n" 83 | "Upgrade: websocket\r\n" 84 | "Connection: Upgrade\r\n" 85 | "Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n" 86 | "Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\n" 87 | "Host: server.example.com\r\n" 88 | "Sec-WebSocket-Version: 13\r\n\r\n"; 89 | 90 | char request_text[] = 91 | "GET / HTTP/1.1\r\n" 92 | "Upgrade: websocket\r\n" 93 | "Connection: Upgrade\r\n" 94 | "Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n" 95 | //"Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\n" 96 | "Host: server.example.com\r\n" 97 | "Sec-WebSocket-Version: 13\r\n\r\n"; 98 | char *host; 99 | int port; 100 | int connections; 101 | 102 | int responses; 103 | 104 | struct http_socket { 105 | /* How far we have streamed our websocket request */ 106 | int offset; 107 | 108 | /* How far we have streamed our upgrade request */ 109 | int upgrade_offset; 110 | 111 | /* Whether or not we have received the upgrade response */ 112 | int is_upgraded; 113 | 114 | /* How many bytes we expect to be echoed back to us before we consider the 115 | * echo done */ 116 | int outstanding_bytes; 117 | }; 118 | 119 | /* We don't need any of these */ 120 | void on_wakeup(struct us_loop_t *loop) {} 121 | 122 | void on_pre(struct us_loop_t *loop) {} 123 | 124 | /* This is not HTTP POST, it is merely an event emitted post loop iteration */ 125 | void on_post(struct us_loop_t *loop) {} 126 | 127 | void next_connection(struct us_socket_t *s) { 128 | /* We could wait with this until properly upgraded */ 129 | if (--connections) { 130 | us_socket_context_connect(SSL, us_socket_context(SSL, s), host, port, NULL, 131 | 0, sizeof(struct http_socket)); 132 | } else { 133 | printf("Running benchmark now...\n"); 134 | 135 | us_socket_timeout(SSL, s, LIBUS_TIMEOUT_GRANULARITY); 136 | } 137 | } 138 | 139 | struct us_socket_t *on_http_socket_writable(struct us_socket_t *s) { 140 | struct http_socket *http_socket = (struct http_socket *)us_socket_ext(SSL, s); 141 | 142 | /* Are we still not upgraded yet? */ 143 | if (http_socket->upgrade_offset < upgrade_request_length) { 144 | http_socket->upgrade_offset += us_socket_write( 145 | SSL, s, upgrade_request + http_socket->upgrade_offset, 146 | upgrade_request_length - http_socket->upgrade_offset, 0); 147 | 148 | /* Now we should be */ 149 | if (http_socket->upgrade_offset == upgrade_request_length) { 150 | next_connection(s); 151 | } 152 | } else { 153 | /* Stream whatever is remaining of the request */ 154 | http_socket->offset += us_socket_write( 155 | SSL, s, (char *)web_socket_request + http_socket->offset, 156 | web_socket_request_size - http_socket->offset, 0); 157 | } 158 | 159 | return s; 160 | } 161 | 162 | struct us_socket_t *on_http_socket_close(struct us_socket_t *s, int code, 163 | void *reason) { 164 | 165 | printf("Closed!\n"); 166 | 167 | return s; 168 | } 169 | 170 | struct us_socket_t *on_http_socket_end(struct us_socket_t *s) { 171 | return us_socket_close(SSL, s, 0, NULL); 172 | } 173 | 174 | struct us_socket_t *on_http_socket_data(struct us_socket_t *s, char *data, 175 | int length) { 176 | /* Get socket extension and the socket's context's extension */ 177 | struct http_socket *http_socket = (struct http_socket *)us_socket_ext(SSL, s); 178 | 179 | if (http_socket->is_upgraded) { 180 | 181 | /* If we are upgraded we now count to see if we receive the corect echo */ 182 | http_socket->outstanding_bytes -= length; 183 | 184 | if (http_socket->outstanding_bytes == 0) { 185 | /* We got exactly the correct amount of bytes back, send another message 186 | */ 187 | http_socket->offset = us_socket_write(SSL, s, (char *)web_socket_request, 188 | web_socket_request_size, 0); 189 | http_socket->outstanding_bytes = web_socket_request_size - 4; 190 | 191 | /* Increase stats */ 192 | responses++; 193 | } else if (http_socket->outstanding_bytes < 0) { 194 | /* This should never happen */ 195 | printf("ERROR: outstanding bytes negative!"); 196 | exit(0); 197 | } 198 | } else { 199 | /* We assume the last 4 bytes will be delivered in one chunk */ 200 | if (length >= 4 && memcmp(data + length - 4, "\r\n\r\n", 4) == 0) { 201 | /* We are upgraded so start sending the message for echoing */ 202 | http_socket->offset = us_socket_write(SSL, s, (char *)web_socket_request, 203 | web_socket_request_size, 0); 204 | 205 | /* Server will echo back the same message minus 4 bytes for mask */ 206 | http_socket->outstanding_bytes = web_socket_request_size - 4; 207 | http_socket->is_upgraded = 1; 208 | } 209 | } 210 | 211 | return s; 212 | } 213 | 214 | struct us_socket_t *on_http_socket_open(struct us_socket_t *s, int is_client, 215 | char *ip, int ip_length) { 216 | struct http_socket *http_socket = (struct http_socket *)us_socket_ext(SSL, s); 217 | 218 | /* Reset offsets */ 219 | http_socket->offset = 0; 220 | 221 | http_socket->is_upgraded = 0; 222 | 223 | /* Send an upgrade request */ 224 | http_socket->upgrade_offset = 225 | us_socket_write(SSL, s, upgrade_request, upgrade_request_length, 0); 226 | if (http_socket->upgrade_offset == upgrade_request_length) { 227 | next_connection(s); 228 | } 229 | 230 | return s; 231 | } 232 | 233 | struct us_socket_t *on_http_socket_timeout(struct us_socket_t *s) { 234 | /* Print current statistics */ 235 | printf("Msg/sec: %f\n", ((float)responses) / LIBUS_TIMEOUT_GRANULARITY); 236 | fflush(stdout); 237 | responses = 0; 238 | us_socket_timeout(SSL, s, LIBUS_TIMEOUT_GRANULARITY); 239 | 240 | return s; 241 | } 242 | 243 | int main(int argc, char **argv) { 244 | 245 | /* Parse host and port */ 246 | if (argc != 6 && argc != 7) { 247 | printf("Usage: connections host port ssl deflate [size_mb]\n"); 248 | return 0; 249 | } 250 | 251 | port = atoi(argv[3]); 252 | host = malloc(strlen(argv[2]) + 1); 253 | memcpy(host, argv[2], strlen(argv[2]) + 1); 254 | connections = atoi(argv[1]); 255 | // SSL = atoi(argv[4]); 256 | if (atoi(argv[5])) { 257 | /* Set up deflate */ 258 | web_socket_request = web_socket_request_deflate; 259 | web_socket_request_size = sizeof(web_socket_request_deflate); 260 | 261 | upgrade_request = request_deflate; 262 | upgrade_request_length = sizeof(request_deflate) - 1; 263 | } else { 264 | /* Only if we are NOT using defalte can we support testing with 100mb for 265 | * now */ 266 | if (argc == 7) { 267 | int size_bytes = atoi(argv[6]); 268 | printf("Using message size of %d bytes\n", size_bytes); 269 | init_big_message(size_bytes); 270 | } 271 | 272 | web_socket_request = web_socket_request_text; 273 | web_socket_request_size = web_socket_request_text_size; 274 | 275 | upgrade_request = request_text; 276 | upgrade_request_length = sizeof(request_text) - 1; 277 | } 278 | 279 | /* Create the event loop */ 280 | struct us_loop_t *loop = us_create_loop(0, on_wakeup, on_pre, on_post, 0); 281 | 282 | /* Create a socket context for HTTP */ 283 | struct us_socket_context_options_t options = {}; 284 | 285 | // options.key_file_name = 286 | //"/Users/divy/gh/fastwebsockets/examples/localhost.key", 287 | // options.cert_file_name = 288 | //"/Users/divy/gh/fastwebsockets/examples/localhost.crt", 289 | // options.passphrase = "1234"; 290 | struct us_socket_context_t *http_context = 291 | us_create_socket_context(SSL, loop, 0, options); 292 | 293 | /* Set up event handlers */ 294 | us_socket_context_on_open(SSL, http_context, on_http_socket_open); 295 | us_socket_context_on_data(SSL, http_context, on_http_socket_data); 296 | us_socket_context_on_writable(SSL, http_context, on_http_socket_writable); 297 | us_socket_context_on_close(SSL, http_context, on_http_socket_close); 298 | us_socket_context_on_timeout(SSL, http_context, on_http_socket_timeout); 299 | us_socket_context_on_end(SSL, http_context, on_http_socket_end); 300 | 301 | /* Start making HTTP connections */ 302 | us_socket_context_connect(SSL, http_context, host, port, NULL, 0, 303 | sizeof(struct http_socket)); 304 | 305 | us_loop_run(loop); 306 | } 307 | -------------------------------------------------------------------------------- /LICENSE: -------------------------------------------------------------------------------- 1 | Apache License 2 | Version 2.0, January 2004 3 | http://www.apache.org/licenses/ 4 | 5 | TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 6 | 7 | 1. Definitions. 8 | 9 | "License" shall mean the terms and conditions for use, reproduction, 10 | and distribution as defined by Sections 1 through 9 of this document. 11 | 12 | "Licensor" shall mean the copyright owner or entity authorized by 13 | the copyright owner that is granting the License. 14 | 15 | "Legal Entity" shall mean the union of the acting entity and all 16 | other entities that control, are controlled by, or are under common 17 | control with that entity. For the purposes of this definition, 18 | "control" means (i) the power, direct or indirect, to cause the 19 | direction or management of such entity, whether by contract or 20 | otherwise, or (ii) ownership of fifty percent (50%) or more of the 21 | outstanding shares, or (iii) beneficial ownership of such entity. 22 | 23 | "You" (or "Your") shall mean an individual or Legal Entity 24 | exercising permissions granted by this License. 25 | 26 | "Source" form shall mean the preferred form for making modifications, 27 | including but not limited to software source code, documentation 28 | source, and configuration files. 29 | 30 | "Object" form shall mean any form resulting from mechanical 31 | transformation or translation of a Source form, including but 32 | not limited to compiled object code, generated documentation, 33 | and conversions to other media types. 34 | 35 | "Work" shall mean the work of authorship, whether in Source or 36 | Object form, made available under the License, as indicated by a 37 | copyright notice that is included in or attached to the work 38 | (an example is provided in the Appendix below). 39 | 40 | "Derivative Works" shall mean any work, whether in Source or Object 41 | form, that is based on (or derived from) the Work and for which the 42 | editorial revisions, annotations, elaborations, or other modifications 43 | represent, as a whole, an original work of authorship. For the purposes 44 | of this License, Derivative Works shall not include works that remain 45 | separable from, or merely link (or bind by name) to the interfaces of, 46 | the Work and Derivative Works thereof. 47 | 48 | "Contribution" shall mean any work of authorship, including 49 | the original version of the Work and any modifications or additions 50 | to that Work or Derivative Works thereof, that is intentionally 51 | submitted to Licensor for inclusion in the Work by the copyright owner 52 | or by an individual or Legal Entity authorized to submit on behalf of 53 | the copyright owner. For the purposes of this definition, "submitted" 54 | means any form of electronic, verbal, or written communication sent 55 | to the Licensor or its representatives, including but not limited to 56 | communication on electronic mailing lists, source code control systems, 57 | and issue tracking systems that are managed by, or on behalf of, the 58 | Licensor for the purpose of discussing and improving the Work, but 59 | excluding communication that is conspicuously marked or otherwise 60 | designated in writing by the copyright owner as "Not a Contribution." 61 | 62 | "Contributor" shall mean Licensor and any individual or Legal Entity 63 | on behalf of whom a Contribution has been received by Licensor and 64 | subsequently incorporated within the Work. 65 | 66 | 2. Grant of Copyright License. Subject to the terms and conditions of 67 | this License, each Contributor hereby grants to You a perpetual, 68 | worldwide, non-exclusive, no-charge, royalty-free, irrevocable 69 | copyright license to reproduce, prepare Derivative Works of, 70 | publicly display, publicly perform, sublicense, and distribute the 71 | Work and such Derivative Works in Source or Object form. 72 | 73 | 3. Grant of Patent License. Subject to the terms and conditions of 74 | this License, each Contributor hereby grants to You a perpetual, 75 | worldwide, non-exclusive, no-charge, royalty-free, irrevocable 76 | (except as stated in this section) patent license to make, have made, 77 | use, offer to sell, sell, import, and otherwise transfer the Work, 78 | where such license applies only to those patent claims licensable 79 | by such Contributor that are necessarily infringed by their 80 | Contribution(s) alone or by combination of their Contribution(s) 81 | with the Work to which such Contribution(s) was submitted. If You 82 | institute patent litigation against any entity (including a 83 | cross-claim or counterclaim in a lawsuit) alleging that the Work 84 | or a Contribution incorporated within the Work constitutes direct 85 | or contributory patent infringement, then any patent licenses 86 | granted to You under this License for that Work shall terminate 87 | as of the date such litigation is filed. 88 | 89 | 4. Redistribution. You may reproduce and distribute copies of the 90 | Work or Derivative Works thereof in any medium, with or without 91 | modifications, and in Source or Object form, provided that You 92 | meet the following conditions: 93 | 94 | (a) You must give any other recipients of the Work or 95 | Derivative Works a copy of this License; and 96 | 97 | (b) You must cause any modified files to carry prominent notices 98 | stating that You changed the files; and 99 | 100 | (c) You must retain, in the Source form of any Derivative Works 101 | that You distribute, all copyright, patent, trademark, and 102 | attribution notices from the Source form of the Work, 103 | excluding those notices that do not pertain to any part of 104 | the Derivative Works; and 105 | 106 | (d) If the Work includes a "NOTICE" text file as part of its 107 | distribution, then any Derivative Works that You distribute must 108 | include a readable copy of the attribution notices contained 109 | within such NOTICE file, excluding those notices that do not 110 | pertain to any part of the Derivative Works, in at least one 111 | of the following places: within a NOTICE text file distributed 112 | as part of the Derivative Works; within the Source form or 113 | documentation, if provided along with the Derivative Works; or, 114 | within a display generated by the Derivative Works, if and 115 | wherever such third-party notices normally appear. The contents 116 | of the NOTICE file are for informational purposes only and 117 | do not modify the License. You may add Your own attribution 118 | notices within Derivative Works that You distribute, alongside 119 | or as an addendum to the NOTICE text from the Work, provided 120 | that such additional attribution notices cannot be construed 121 | as modifying the License. 122 | 123 | You may add Your own copyright statement to Your modifications and 124 | may provide additional or different license terms and conditions 125 | for use, reproduction, or distribution of Your modifications, or 126 | for any such Derivative Works as a whole, provided Your use, 127 | reproduction, and distribution of the Work otherwise complies with 128 | the conditions stated in this License. 129 | 130 | 5. Submission of Contributions. Unless You explicitly state otherwise, 131 | any Contribution intentionally submitted for inclusion in the Work 132 | by You to the Licensor shall be under the terms and conditions of 133 | this License, without any additional terms or conditions. 134 | Notwithstanding the above, nothing herein shall supersede or modify 135 | the terms of any separate license agreement you may have executed 136 | with Licensor regarding such Contributions. 137 | 138 | 6. Trademarks. This License does not grant permission to use the trade 139 | names, trademarks, service marks, or product names of the Licensor, 140 | except as required for reasonable and customary use in describing the 141 | origin of the Work and reproducing the content of the NOTICE file. 142 | 143 | 7. Disclaimer of Warranty. Unless required by applicable law or 144 | agreed to in writing, Licensor provides the Work (and each 145 | Contributor provides its Contributions) on an "AS IS" BASIS, 146 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 147 | implied, including, without limitation, any warranties or conditions 148 | of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A 149 | PARTICULAR PURPOSE. You are solely responsible for determining the 150 | appropriateness of using or redistributing the Work and assume any 151 | risks associated with Your exercise of permissions under this License. 152 | 153 | 8. Limitation of Liability. In no event and under no legal theory, 154 | whether in tort (including negligence), contract, or otherwise, 155 | unless required by applicable law (such as deliberate and grossly 156 | negligent acts) or agreed to in writing, shall any Contributor be 157 | liable to You for damages, including any direct, indirect, special, 158 | incidental, or consequential damages of any character arising as a 159 | result of this License or out of the use or inability to use the 160 | Work (including but not limited to damages for loss of goodwill, 161 | work stoppage, computer failure or malfunction, or any and all 162 | other commercial damages or losses), even if such Contributor 163 | has been advised of the possibility of such damages. 164 | 165 | 9. Accepting Warranty or Additional Liability. While redistributing 166 | the Work or Derivative Works thereof, You may choose to offer, 167 | and charge a fee for, acceptance of support, warranty, indemnity, 168 | or other liability obligations and/or rights consistent with this 169 | License. However, in accepting such obligations, You may act only 170 | on Your own behalf and on Your sole responsibility, not on behalf 171 | of any other Contributor, and only if You agree to indemnify, 172 | defend, and hold each Contributor harmless for any liability 173 | incurred by, or claims asserted against, such Contributor by reason 174 | of your accepting any such warranty or additional liability. 175 | 176 | END OF TERMS AND CONDITIONS 177 | 178 | APPENDIX: How to apply the Apache License to your work. 179 | 180 | To apply the Apache License to your work, attach the following 181 | boilerplate notice, with the fields enclosed by brackets "[]" 182 | replaced with your own identifying information. (Don't include 183 | the brackets!) The text should be enclosed in the appropriate 184 | comment syntax for the file format. We also recommend that a 185 | file or class name and description of purpose be included on the 186 | same "printed page" as the copyright notice for easier 187 | identification within third-party archives. 188 | 189 | Copyright 2023 Divy Srivastava 190 | 191 | Licensed under the Apache License, Version 2.0 (the "License"); 192 | you may not use this file except in compliance with the License. 193 | You may obtain a copy of the License at 194 | 195 | http://www.apache.org/licenses/LICENSE-2.0 196 | 197 | Unless required by applicable law or agreed to in writing, software 198 | distributed under the License is distributed on an "AS IS" BASIS, 199 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 200 | See the License for the specific language governing permissions and 201 | limitations under the License. -------------------------------------------------------------------------------- /benches/200-16384-chart.svg: -------------------------------------------------------------------------------- 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | 32 | 33 | 34 | 35 | 36 | 37 | 38 | 39 | 40 | 41 | 42 | 43 | 44 | 45 | 46 | 47 | 48 | 49 | 50 | 51 | 52 | 53 | 54 | 55 | 56 | 57 | 58 | 59 | 60 | 61 | 62 | 63 | 64 | 65 | 66 | 67 | 68 | 69 | 70 | 71 | 72 | 73 | 74 | 75 | 76 | 77 | 78 | 79 | 80 | 81 | 82 | 83 | 84 | 85 | 86 | 87 | 88 | 89 | 90 | 91 | Connections: 200, Payload size: 16384 93 | 94 | 95 | 96 | 97 | uWebSockets 99 | 100 | fastwebsockets 102 | 103 | tokio-tungstenite 105 | 106 | rust-websocket 108 | 109 | 110 | 111 | 112 | 0 114 | 115 | 20,000 117 | 118 | 40,000 120 | 121 | 60,000 123 | 124 | 80,000 126 | 127 | 100,000 129 | 130 | 120,000 132 | 133 | 140,000 135 | 136 | 137 | 138 | 139 | 140 | 141 | 142 | 143 | 144 | 145 | 146 | 147 | -------------------------------------------------------------------------------- /benches/500-16384-chart.svg: -------------------------------------------------------------------------------- 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | 32 | 33 | 34 | 35 | 36 | 37 | 38 | 39 | 40 | 41 | 42 | 43 | 44 | 45 | 46 | 47 | 48 | 49 | 50 | 51 | 52 | 53 | 54 | 55 | 56 | 57 | 58 | 59 | 60 | 61 | 62 | 63 | 64 | 65 | 66 | 67 | 68 | 69 | 70 | 71 | 72 | 73 | 74 | 75 | 76 | 77 | 78 | 79 | 80 | 81 | 82 | 83 | 84 | 85 | 86 | 87 | 88 | 89 | 90 | 91 | Connections: 500, Payload size: 16384 93 | 94 | 95 | 96 | 97 | uWebSockets 99 | 100 | fastwebsockets 102 | 103 | tokio-tungstenite 105 | 106 | rust-websocket 108 | 109 | 110 | 111 | 112 | 0 114 | 115 | 20,000 117 | 118 | 40,000 120 | 121 | 60,000 123 | 124 | 80,000 126 | 127 | 100,000 129 | 130 | 120,000 132 | 133 | 140,000 135 | 136 | 137 | 138 | 139 | 140 | 141 | 142 | 143 | 144 | 145 | 146 | 147 | -------------------------------------------------------------------------------- /fuzz/Cargo.lock: -------------------------------------------------------------------------------- 1 | # This file is automatically @generated by Cargo. 2 | # It is not intended for manual editing. 3 | version = 3 4 | 5 | [[package]] 6 | name = "arbitrary" 7 | version = "1.3.0" 8 | source = "registry+https://github.com/rust-lang/crates.io-index" 9 | checksum = "e2d098ff73c1ca148721f37baad5ea6a465a13f9573aba8641fbbbae8164a54e" 10 | 11 | [[package]] 12 | name = "autocfg" 13 | version = "1.1.0" 14 | source = "registry+https://github.com/rust-lang/crates.io-index" 15 | checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" 16 | 17 | [[package]] 18 | name = "bitflags" 19 | version = "1.3.2" 20 | source = "registry+https://github.com/rust-lang/crates.io-index" 21 | checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" 22 | 23 | [[package]] 24 | name = "bytes" 25 | version = "1.4.0" 26 | source = "registry+https://github.com/rust-lang/crates.io-index" 27 | checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" 28 | 29 | [[package]] 30 | name = "cc" 31 | version = "1.0.79" 32 | source = "registry+https://github.com/rust-lang/crates.io-index" 33 | checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" 34 | dependencies = [ 35 | "jobserver", 36 | ] 37 | 38 | [[package]] 39 | name = "cfg-if" 40 | version = "1.0.0" 41 | source = "registry+https://github.com/rust-lang/crates.io-index" 42 | checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" 43 | 44 | [[package]] 45 | name = "fastwebsockets" 46 | version = "0.2.0" 47 | dependencies = [ 48 | "cc", 49 | "rand", 50 | "simdutf8", 51 | "tokio", 52 | "utf-8", 53 | ] 54 | 55 | [[package]] 56 | name = "fastwebsockets-fuzz" 57 | version = "0.0.0" 58 | dependencies = [ 59 | "fastwebsockets", 60 | "futures", 61 | "libfuzzer-sys", 62 | "tokio", 63 | ] 64 | 65 | [[package]] 66 | name = "futures" 67 | version = "0.3.27" 68 | source = "registry+https://github.com/rust-lang/crates.io-index" 69 | checksum = "531ac96c6ff5fd7c62263c5e3c67a603af4fcaee2e1a0ae5565ba3a11e69e549" 70 | dependencies = [ 71 | "futures-channel", 72 | "futures-core", 73 | "futures-executor", 74 | "futures-io", 75 | "futures-sink", 76 | "futures-task", 77 | "futures-util", 78 | ] 79 | 80 | [[package]] 81 | name = "futures-channel" 82 | version = "0.3.27" 83 | source = "registry+https://github.com/rust-lang/crates.io-index" 84 | checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac" 85 | dependencies = [ 86 | "futures-core", 87 | "futures-sink", 88 | ] 89 | 90 | [[package]] 91 | name = "futures-core" 92 | version = "0.3.27" 93 | source = "registry+https://github.com/rust-lang/crates.io-index" 94 | checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" 95 | 96 | [[package]] 97 | name = "futures-executor" 98 | version = "0.3.27" 99 | source = "registry+https://github.com/rust-lang/crates.io-index" 100 | checksum = "1997dd9df74cdac935c76252744c1ed5794fac083242ea4fe77ef3ed60ba0f83" 101 | dependencies = [ 102 | "futures-core", 103 | "futures-task", 104 | "futures-util", 105 | ] 106 | 107 | [[package]] 108 | name = "futures-io" 109 | version = "0.3.27" 110 | source = "registry+https://github.com/rust-lang/crates.io-index" 111 | checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91" 112 | 113 | [[package]] 114 | name = "futures-macro" 115 | version = "0.3.27" 116 | source = "registry+https://github.com/rust-lang/crates.io-index" 117 | checksum = "3eb14ed937631bd8b8b8977f2c198443447a8355b6e3ca599f38c975e5a963b6" 118 | dependencies = [ 119 | "proc-macro2", 120 | "quote", 121 | "syn", 122 | ] 123 | 124 | [[package]] 125 | name = "futures-sink" 126 | version = "0.3.27" 127 | source = "registry+https://github.com/rust-lang/crates.io-index" 128 | checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2" 129 | 130 | [[package]] 131 | name = "futures-task" 132 | version = "0.3.27" 133 | source = "registry+https://github.com/rust-lang/crates.io-index" 134 | checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" 135 | 136 | [[package]] 137 | name = "futures-util" 138 | version = "0.3.27" 139 | source = "registry+https://github.com/rust-lang/crates.io-index" 140 | checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab" 141 | dependencies = [ 142 | "futures-channel", 143 | "futures-core", 144 | "futures-io", 145 | "futures-macro", 146 | "futures-sink", 147 | "futures-task", 148 | "memchr", 149 | "pin-project-lite", 150 | "pin-utils", 151 | "slab", 152 | ] 153 | 154 | [[package]] 155 | name = "getrandom" 156 | version = "0.2.9" 157 | source = "registry+https://github.com/rust-lang/crates.io-index" 158 | checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" 159 | dependencies = [ 160 | "cfg-if", 161 | "libc", 162 | "wasi", 163 | ] 164 | 165 | [[package]] 166 | name = "hermit-abi" 167 | version = "0.2.6" 168 | source = "registry+https://github.com/rust-lang/crates.io-index" 169 | checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" 170 | dependencies = [ 171 | "libc", 172 | ] 173 | 174 | [[package]] 175 | name = "jobserver" 176 | version = "0.1.26" 177 | source = "registry+https://github.com/rust-lang/crates.io-index" 178 | checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" 179 | dependencies = [ 180 | "libc", 181 | ] 182 | 183 | [[package]] 184 | name = "libc" 185 | version = "0.2.140" 186 | source = "registry+https://github.com/rust-lang/crates.io-index" 187 | checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" 188 | 189 | [[package]] 190 | name = "libfuzzer-sys" 191 | version = "0.4.6" 192 | source = "registry+https://github.com/rust-lang/crates.io-index" 193 | checksum = "beb09950ae85a0a94b27676cccf37da5ff13f27076aa1adbc6545dd0d0e1bd4e" 194 | dependencies = [ 195 | "arbitrary", 196 | "cc", 197 | "once_cell", 198 | ] 199 | 200 | [[package]] 201 | name = "lock_api" 202 | version = "0.4.9" 203 | source = "registry+https://github.com/rust-lang/crates.io-index" 204 | checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" 205 | dependencies = [ 206 | "autocfg", 207 | "scopeguard", 208 | ] 209 | 210 | [[package]] 211 | name = "log" 212 | version = "0.4.17" 213 | source = "registry+https://github.com/rust-lang/crates.io-index" 214 | checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" 215 | dependencies = [ 216 | "cfg-if", 217 | ] 218 | 219 | [[package]] 220 | name = "memchr" 221 | version = "2.5.0" 222 | source = "registry+https://github.com/rust-lang/crates.io-index" 223 | checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" 224 | 225 | [[package]] 226 | name = "mio" 227 | version = "0.8.6" 228 | source = "registry+https://github.com/rust-lang/crates.io-index" 229 | checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" 230 | dependencies = [ 231 | "libc", 232 | "log", 233 | "wasi", 234 | "windows-sys", 235 | ] 236 | 237 | [[package]] 238 | name = "num_cpus" 239 | version = "1.15.0" 240 | source = "registry+https://github.com/rust-lang/crates.io-index" 241 | checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" 242 | dependencies = [ 243 | "hermit-abi", 244 | "libc", 245 | ] 246 | 247 | [[package]] 248 | name = "once_cell" 249 | version = "1.17.1" 250 | source = "registry+https://github.com/rust-lang/crates.io-index" 251 | checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" 252 | 253 | [[package]] 254 | name = "parking_lot" 255 | version = "0.12.1" 256 | source = "registry+https://github.com/rust-lang/crates.io-index" 257 | checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" 258 | dependencies = [ 259 | "lock_api", 260 | "parking_lot_core", 261 | ] 262 | 263 | [[package]] 264 | name = "parking_lot_core" 265 | version = "0.9.7" 266 | source = "registry+https://github.com/rust-lang/crates.io-index" 267 | checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" 268 | dependencies = [ 269 | "cfg-if", 270 | "libc", 271 | "redox_syscall", 272 | "smallvec", 273 | "windows-sys", 274 | ] 275 | 276 | [[package]] 277 | name = "pin-project-lite" 278 | version = "0.2.9" 279 | source = "registry+https://github.com/rust-lang/crates.io-index" 280 | checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" 281 | 282 | [[package]] 283 | name = "pin-utils" 284 | version = "0.1.0" 285 | source = "registry+https://github.com/rust-lang/crates.io-index" 286 | checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 287 | 288 | [[package]] 289 | name = "ppv-lite86" 290 | version = "0.2.17" 291 | source = "registry+https://github.com/rust-lang/crates.io-index" 292 | checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" 293 | 294 | [[package]] 295 | name = "proc-macro2" 296 | version = "1.0.54" 297 | source = "registry+https://github.com/rust-lang/crates.io-index" 298 | checksum = "e472a104799c74b514a57226160104aa483546de37e839ec50e3c2e41dd87534" 299 | dependencies = [ 300 | "unicode-ident", 301 | ] 302 | 303 | [[package]] 304 | name = "quote" 305 | version = "1.0.26" 306 | source = "registry+https://github.com/rust-lang/crates.io-index" 307 | checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" 308 | dependencies = [ 309 | "proc-macro2", 310 | ] 311 | 312 | [[package]] 313 | name = "rand" 314 | version = "0.8.5" 315 | source = "registry+https://github.com/rust-lang/crates.io-index" 316 | checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" 317 | dependencies = [ 318 | "libc", 319 | "rand_chacha", 320 | "rand_core", 321 | ] 322 | 323 | [[package]] 324 | name = "rand_chacha" 325 | version = "0.3.1" 326 | source = "registry+https://github.com/rust-lang/crates.io-index" 327 | checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" 328 | dependencies = [ 329 | "ppv-lite86", 330 | "rand_core", 331 | ] 332 | 333 | [[package]] 334 | name = "rand_core" 335 | version = "0.6.4" 336 | source = "registry+https://github.com/rust-lang/crates.io-index" 337 | checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" 338 | dependencies = [ 339 | "getrandom", 340 | ] 341 | 342 | [[package]] 343 | name = "redox_syscall" 344 | version = "0.2.16" 345 | source = "registry+https://github.com/rust-lang/crates.io-index" 346 | checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" 347 | dependencies = [ 348 | "bitflags", 349 | ] 350 | 351 | [[package]] 352 | name = "scopeguard" 353 | version = "1.1.0" 354 | source = "registry+https://github.com/rust-lang/crates.io-index" 355 | checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" 356 | 357 | [[package]] 358 | name = "signal-hook-registry" 359 | version = "1.4.1" 360 | source = "registry+https://github.com/rust-lang/crates.io-index" 361 | checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" 362 | dependencies = [ 363 | "libc", 364 | ] 365 | 366 | [[package]] 367 | name = "simdutf8" 368 | version = "0.1.4" 369 | source = "registry+https://github.com/rust-lang/crates.io-index" 370 | checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" 371 | 372 | [[package]] 373 | name = "slab" 374 | version = "0.4.8" 375 | source = "registry+https://github.com/rust-lang/crates.io-index" 376 | checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" 377 | dependencies = [ 378 | "autocfg", 379 | ] 380 | 381 | [[package]] 382 | name = "smallvec" 383 | version = "1.10.0" 384 | source = "registry+https://github.com/rust-lang/crates.io-index" 385 | checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" 386 | 387 | [[package]] 388 | name = "socket2" 389 | version = "0.4.9" 390 | source = "registry+https://github.com/rust-lang/crates.io-index" 391 | checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" 392 | dependencies = [ 393 | "libc", 394 | "winapi", 395 | ] 396 | 397 | [[package]] 398 | name = "syn" 399 | version = "1.0.109" 400 | source = "registry+https://github.com/rust-lang/crates.io-index" 401 | checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" 402 | dependencies = [ 403 | "proc-macro2", 404 | "quote", 405 | "unicode-ident", 406 | ] 407 | 408 | [[package]] 409 | name = "tokio" 410 | version = "1.26.0" 411 | source = "registry+https://github.com/rust-lang/crates.io-index" 412 | checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" 413 | dependencies = [ 414 | "autocfg", 415 | "bytes", 416 | "libc", 417 | "memchr", 418 | "mio", 419 | "num_cpus", 420 | "parking_lot", 421 | "pin-project-lite", 422 | "signal-hook-registry", 423 | "socket2", 424 | "tokio-macros", 425 | "windows-sys", 426 | ] 427 | 428 | [[package]] 429 | name = "tokio-macros" 430 | version = "1.8.2" 431 | source = "registry+https://github.com/rust-lang/crates.io-index" 432 | checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" 433 | dependencies = [ 434 | "proc-macro2", 435 | "quote", 436 | "syn", 437 | ] 438 | 439 | [[package]] 440 | name = "unicode-ident" 441 | version = "1.0.8" 442 | source = "registry+https://github.com/rust-lang/crates.io-index" 443 | checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" 444 | 445 | [[package]] 446 | name = "utf-8" 447 | version = "0.7.6" 448 | source = "registry+https://github.com/rust-lang/crates.io-index" 449 | checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" 450 | 451 | [[package]] 452 | name = "wasi" 453 | version = "0.11.0+wasi-snapshot-preview1" 454 | source = "registry+https://github.com/rust-lang/crates.io-index" 455 | checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" 456 | 457 | [[package]] 458 | name = "winapi" 459 | version = "0.3.9" 460 | source = "registry+https://github.com/rust-lang/crates.io-index" 461 | checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" 462 | dependencies = [ 463 | "winapi-i686-pc-windows-gnu", 464 | "winapi-x86_64-pc-windows-gnu", 465 | ] 466 | 467 | [[package]] 468 | name = "winapi-i686-pc-windows-gnu" 469 | version = "0.4.0" 470 | source = "registry+https://github.com/rust-lang/crates.io-index" 471 | checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" 472 | 473 | [[package]] 474 | name = "winapi-x86_64-pc-windows-gnu" 475 | version = "0.4.0" 476 | source = "registry+https://github.com/rust-lang/crates.io-index" 477 | checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" 478 | 479 | [[package]] 480 | name = "windows-sys" 481 | version = "0.45.0" 482 | source = "registry+https://github.com/rust-lang/crates.io-index" 483 | checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" 484 | dependencies = [ 485 | "windows-targets", 486 | ] 487 | 488 | [[package]] 489 | name = "windows-targets" 490 | version = "0.42.2" 491 | source = "registry+https://github.com/rust-lang/crates.io-index" 492 | checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" 493 | dependencies = [ 494 | "windows_aarch64_gnullvm", 495 | "windows_aarch64_msvc", 496 | "windows_i686_gnu", 497 | "windows_i686_msvc", 498 | "windows_x86_64_gnu", 499 | "windows_x86_64_gnullvm", 500 | "windows_x86_64_msvc", 501 | ] 502 | 503 | [[package]] 504 | name = "windows_aarch64_gnullvm" 505 | version = "0.42.2" 506 | source = "registry+https://github.com/rust-lang/crates.io-index" 507 | checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" 508 | 509 | [[package]] 510 | name = "windows_aarch64_msvc" 511 | version = "0.42.2" 512 | source = "registry+https://github.com/rust-lang/crates.io-index" 513 | checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" 514 | 515 | [[package]] 516 | name = "windows_i686_gnu" 517 | version = "0.42.2" 518 | source = "registry+https://github.com/rust-lang/crates.io-index" 519 | checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" 520 | 521 | [[package]] 522 | name = "windows_i686_msvc" 523 | version = "0.42.2" 524 | source = "registry+https://github.com/rust-lang/crates.io-index" 525 | checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" 526 | 527 | [[package]] 528 | name = "windows_x86_64_gnu" 529 | version = "0.42.2" 530 | source = "registry+https://github.com/rust-lang/crates.io-index" 531 | checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" 532 | 533 | [[package]] 534 | name = "windows_x86_64_gnullvm" 535 | version = "0.42.2" 536 | source = "registry+https://github.com/rust-lang/crates.io-index" 537 | checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" 538 | 539 | [[package]] 540 | name = "windows_x86_64_msvc" 541 | version = "0.42.2" 542 | source = "registry+https://github.com/rust-lang/crates.io-index" 543 | checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" 544 | -------------------------------------------------------------------------------- /src/lib.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2023 Divy Srivastava 2 | // 3 | // Licensed under the Apache License, Version 2.0 (the "License"); 4 | // you may not use this file except in compliance with the License. 5 | // You may obtain a copy of the License at 6 | // 7 | // http://www.apache.org/licenses/LICENSE-2.0 8 | // 9 | // Unless required by applicable law or agreed to in writing, software 10 | // distributed under the License is distributed on an "AS IS" BASIS, 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | // See the License for the specific language governing permissions and 13 | // limitations under the License. 14 | 15 | //! _fastwebsockets_ is a minimal, fast WebSocket server implementation. 16 | //! 17 | //! [https://github.com/denoland/fastwebsockets](https://github.com/denoland/fastwebsockets) 18 | //! 19 | //! Passes the _Autobahn|TestSuite_ and fuzzed with LLVM's _libfuzzer_. 20 | //! 21 | //! You can use it as a raw websocket frame parser and deal with spec compliance yourself, or you can use it as a full-fledged websocket server. 22 | //! 23 | //! # Example 24 | //! 25 | //! ``` 26 | //! use tokio::net::TcpStream; 27 | //! use fastwebsockets::{WebSocket, OpCode, Role}; 28 | //! use anyhow::Result; 29 | //! 30 | //! async fn handle( 31 | //! socket: TcpStream, 32 | //! ) -> Result<()> { 33 | //! let mut ws = WebSocket::after_handshake(socket, Role::Server); 34 | //! ws.set_writev(false); 35 | //! ws.set_auto_close(true); 36 | //! ws.set_auto_pong(true); 37 | //! 38 | //! loop { 39 | //! let frame = ws.read_frame().await?; 40 | //! match frame.opcode { 41 | //! OpCode::Close => break, 42 | //! OpCode::Text | OpCode::Binary => { 43 | //! ws.write_frame(frame).await?; 44 | //! } 45 | //! _ => {} 46 | //! } 47 | //! } 48 | //! Ok(()) 49 | //! } 50 | //! ``` 51 | //! 52 | //! ## Fragmentation 53 | //! 54 | //! By default, fastwebsockets will give the application raw frames with FIN set. Other 55 | //! crates like tungstenite which will give you a single message with all the frames 56 | //! concatenated. 57 | //! 58 | //! For concanated frames, use `FragmentCollector`: 59 | //! ``` 60 | //! use fastwebsockets::{FragmentCollector, WebSocket, Role}; 61 | //! use tokio::net::TcpStream; 62 | //! use anyhow::Result; 63 | //! 64 | //! async fn handle( 65 | //! socket: TcpStream, 66 | //! ) -> Result<()> { 67 | //! let mut ws = WebSocket::after_handshake(socket, Role::Server); 68 | //! let mut ws = FragmentCollector::new(ws); 69 | //! let incoming = ws.read_frame().await?; 70 | //! // Always returns full messages 71 | //! assert!(incoming.fin); 72 | //! Ok(()) 73 | //! } 74 | //! ``` 75 | //! 76 | //! _permessage-deflate is not supported yet._ 77 | //! 78 | //! ## HTTP Upgrades 79 | //! 80 | //! Enable the `upgrade` feature to do server-side upgrades and client-side 81 | //! handshakes. 82 | //! 83 | //! This feature is powered by [hyper](https://docs.rs/hyper). 84 | //! 85 | //! ``` 86 | //! use fastwebsockets::upgrade::upgrade; 87 | //! use http_body_util::Empty; 88 | //! use hyper::{Request, body::{Incoming, Bytes}, Response}; 89 | //! use anyhow::Result; 90 | //! 91 | //! async fn server_upgrade( 92 | //! mut req: Request, 93 | //! ) -> Result>> { 94 | //! let (response, fut) = upgrade(&mut req)?; 95 | //! 96 | //! tokio::spawn(async move { 97 | //! let ws = fut.await; 98 | //! // Do something with the websocket 99 | //! }); 100 | //! 101 | //! Ok(response) 102 | //! } 103 | //! ``` 104 | //! 105 | //! Use the `handshake` module for client-side handshakes. 106 | //! 107 | //! ``` 108 | //! use fastwebsockets::handshake; 109 | //! use fastwebsockets::FragmentCollector; 110 | //! use hyper::{Request, body::Bytes, upgrade::Upgraded, header::{UPGRADE, CONNECTION}}; 111 | //! use http_body_util::Empty; 112 | //! use hyper_util::rt::TokioIo; 113 | //! use tokio::net::TcpStream; 114 | //! use std::future::Future; 115 | //! use anyhow::Result; 116 | //! 117 | //! async fn connect() -> Result>> { 118 | //! let stream = TcpStream::connect("localhost:9001").await?; 119 | //! 120 | //! let req = Request::builder() 121 | //! .method("GET") 122 | //! .uri("http://localhost:9001/") 123 | //! .header("Host", "localhost:9001") 124 | //! .header(UPGRADE, "websocket") 125 | //! .header(CONNECTION, "upgrade") 126 | //! .header( 127 | //! "Sec-WebSocket-Key", 128 | //! fastwebsockets::handshake::generate_key(), 129 | //! ) 130 | //! .header("Sec-WebSocket-Version", "13") 131 | //! .body(Empty::::new())?; 132 | //! 133 | //! let (ws, _) = handshake::client(&SpawnExecutor, req, stream).await?; 134 | //! Ok(FragmentCollector::new(ws)) 135 | //! } 136 | //! 137 | //! // Tie hyper's executor to tokio runtime 138 | //! struct SpawnExecutor; 139 | //! 140 | //! impl hyper::rt::Executor for SpawnExecutor 141 | //! where 142 | //! Fut: Future + Send + 'static, 143 | //! Fut::Output: Send + 'static, 144 | //! { 145 | //! fn execute(&self, fut: Fut) { 146 | //! tokio::task::spawn(fut); 147 | //! } 148 | //! } 149 | //! ``` 150 | 151 | #![cfg_attr(docsrs, feature(doc_cfg))] 152 | 153 | mod close; 154 | mod error; 155 | mod fragment; 156 | mod frame; 157 | /// Client handshake. 158 | #[cfg(feature = "upgrade")] 159 | #[cfg_attr(docsrs, doc(cfg(feature = "upgrade")))] 160 | pub mod handshake; 161 | mod mask; 162 | /// HTTP upgrades. 163 | #[cfg(feature = "upgrade")] 164 | #[cfg_attr(docsrs, doc(cfg(feature = "upgrade")))] 165 | pub mod upgrade; 166 | 167 | use bytes::Buf; 168 | 169 | use bytes::BytesMut; 170 | #[cfg(feature = "unstable-split")] 171 | use std::future::Future; 172 | 173 | use tokio::io::AsyncRead; 174 | use tokio::io::AsyncReadExt; 175 | use tokio::io::AsyncWrite; 176 | use tokio::io::AsyncWriteExt; 177 | 178 | pub use crate::close::CloseCode; 179 | pub use crate::error::WebSocketError; 180 | pub use crate::fragment::FragmentCollector; 181 | #[cfg(feature = "unstable-split")] 182 | pub use crate::fragment::FragmentCollectorRead; 183 | pub use crate::frame::Frame; 184 | pub use crate::frame::OpCode; 185 | pub use crate::frame::Payload; 186 | pub use crate::mask::unmask; 187 | 188 | #[derive(Copy, Clone, PartialEq)] 189 | pub enum Role { 190 | Server, 191 | Client, 192 | } 193 | 194 | pub(crate) struct WriteHalf { 195 | role: Role, 196 | closed: bool, 197 | vectored: bool, 198 | auto_apply_mask: bool, 199 | writev_threshold: usize, 200 | write_buffer: Vec, 201 | } 202 | 203 | pub(crate) struct ReadHalf { 204 | role: Role, 205 | auto_apply_mask: bool, 206 | auto_close: bool, 207 | auto_pong: bool, 208 | writev_threshold: usize, 209 | max_message_size: usize, 210 | buffer: BytesMut, 211 | } 212 | 213 | #[cfg(feature = "unstable-split")] 214 | pub struct WebSocketRead { 215 | stream: S, 216 | read_half: ReadHalf, 217 | } 218 | 219 | #[cfg(feature = "unstable-split")] 220 | pub struct WebSocketWrite { 221 | stream: S, 222 | write_half: WriteHalf, 223 | } 224 | 225 | #[cfg(feature = "unstable-split")] 226 | /// Create a split `WebSocketRead`/`WebSocketWrite` pair from a stream that has already completed the WebSocket handshake. 227 | pub fn after_handshake_split( 228 | read: R, 229 | write: W, 230 | role: Role, 231 | ) -> (WebSocketRead, WebSocketWrite) 232 | where 233 | R: AsyncRead + Unpin, 234 | W: AsyncWrite + Unpin, 235 | { 236 | ( 237 | WebSocketRead { 238 | stream: read, 239 | read_half: ReadHalf::after_handshake(role), 240 | }, 241 | WebSocketWrite { 242 | stream: write, 243 | write_half: WriteHalf::after_handshake(role), 244 | }, 245 | ) 246 | } 247 | 248 | #[cfg(feature = "unstable-split")] 249 | impl<'f, S> WebSocketRead { 250 | /// Consumes the `WebSocketRead` and returns the underlying stream. 251 | #[inline] 252 | pub(crate) fn into_parts_internal(self) -> (S, ReadHalf) { 253 | (self.stream, self.read_half) 254 | } 255 | 256 | pub fn set_writev_threshold(&mut self, threshold: usize) { 257 | self.read_half.writev_threshold = threshold; 258 | } 259 | 260 | /// Sets whether to automatically close the connection when a close frame is received. When set to `false`, the application will have to manually send close frames. 261 | /// 262 | /// Default: `true` 263 | pub fn set_auto_close(&mut self, auto_close: bool) { 264 | self.read_half.auto_close = auto_close; 265 | } 266 | 267 | /// Sets whether to automatically send a pong frame when a ping frame is received. 268 | /// 269 | /// Default: `true` 270 | pub fn set_auto_pong(&mut self, auto_pong: bool) { 271 | self.read_half.auto_pong = auto_pong; 272 | } 273 | 274 | /// Sets the maximum message size in bytes. If a message is received that is larger than this, the connection will be closed. 275 | /// 276 | /// Default: 64 MiB 277 | pub fn set_max_message_size(&mut self, max_message_size: usize) { 278 | self.read_half.max_message_size = max_message_size; 279 | } 280 | 281 | /// Sets whether to automatically apply the mask to the frame payload. 282 | /// 283 | /// Default: `true` 284 | pub fn set_auto_apply_mask(&mut self, auto_apply_mask: bool) { 285 | self.read_half.auto_apply_mask = auto_apply_mask; 286 | } 287 | 288 | /// Reads a frame from the stream. 289 | pub async fn read_frame( 290 | &mut self, 291 | send_fn: &mut impl FnMut(Frame<'f>) -> R, 292 | ) -> Result 293 | where 294 | S: AsyncRead + Unpin, 295 | E: Into>, 296 | R: Future>, 297 | { 298 | loop { 299 | let (res, obligated_send) = 300 | self.read_half.read_frame_inner(&mut self.stream).await; 301 | if let Some(frame) = obligated_send { 302 | let res = send_fn(frame).await; 303 | res.map_err(|e| WebSocketError::SendError(e.into()))?; 304 | } 305 | if let Some(frame) = res? { 306 | break Ok(frame); 307 | } 308 | } 309 | } 310 | } 311 | 312 | #[cfg(feature = "unstable-split")] 313 | impl<'f, S> WebSocketWrite { 314 | /// Sets whether to use vectored writes. This option does not guarantee that vectored writes will be always used. 315 | /// 316 | /// Default: `true` 317 | pub fn set_writev(&mut self, vectored: bool) { 318 | self.write_half.vectored = vectored; 319 | } 320 | 321 | pub fn set_writev_threshold(&mut self, threshold: usize) { 322 | self.write_half.writev_threshold = threshold; 323 | } 324 | 325 | /// Sets whether to automatically apply the mask to the frame payload. 326 | /// 327 | /// Default: `true` 328 | pub fn set_auto_apply_mask(&mut self, auto_apply_mask: bool) { 329 | self.write_half.auto_apply_mask = auto_apply_mask; 330 | } 331 | 332 | pub fn is_closed(&self) -> bool { 333 | self.write_half.closed 334 | } 335 | 336 | pub async fn write_frame( 337 | &mut self, 338 | frame: Frame<'f>, 339 | ) -> Result<(), WebSocketError> 340 | where 341 | S: AsyncWrite + Unpin, 342 | { 343 | self.write_half.write_frame(&mut self.stream, frame).await 344 | } 345 | 346 | pub async fn flush(&mut self) -> Result<(), WebSocketError> 347 | where 348 | S: AsyncWrite + Unpin, 349 | { 350 | flush(&mut self.stream).await 351 | } 352 | } 353 | 354 | #[inline] 355 | async fn flush(stream: &mut S) -> Result<(), WebSocketError> 356 | where 357 | S: AsyncWrite + Unpin, 358 | { 359 | stream.flush().await.map_err(WebSocketError::IoError) 360 | } 361 | 362 | /// WebSocket protocol implementation over an async stream. 363 | pub struct WebSocket { 364 | stream: S, 365 | write_half: WriteHalf, 366 | read_half: ReadHalf, 367 | } 368 | 369 | impl<'f, S> WebSocket { 370 | /// Creates a new `WebSocket` from a stream that has already completed the WebSocket handshake. 371 | /// 372 | /// Use the `upgrade` feature to handle server upgrades and client handshakes. 373 | /// 374 | /// # Example 375 | /// 376 | /// ``` 377 | /// use tokio::net::TcpStream; 378 | /// use fastwebsockets::{WebSocket, OpCode, Role}; 379 | /// use anyhow::Result; 380 | /// 381 | /// async fn handle_client( 382 | /// socket: TcpStream, 383 | /// ) -> Result<()> { 384 | /// let mut ws = WebSocket::after_handshake(socket, Role::Server); 385 | /// // ... 386 | /// Ok(()) 387 | /// } 388 | /// ``` 389 | pub fn after_handshake(stream: S, role: Role) -> Self 390 | where 391 | S: AsyncRead + AsyncWrite + Unpin, 392 | { 393 | Self { 394 | stream, 395 | write_half: WriteHalf::after_handshake(role), 396 | read_half: ReadHalf::after_handshake(role), 397 | } 398 | } 399 | 400 | /// Split a [`WebSocket`] into a [`WebSocketRead`] and [`WebSocketWrite`] half. Note that the split version does not 401 | /// handle fragmented packets and you may wish to create a [`FragmentCollectorRead`] over top of the read half that 402 | /// is returned. 403 | #[cfg(feature = "unstable-split")] 404 | pub fn split( 405 | self, 406 | split_fn: impl Fn(S) -> (R, W), 407 | ) -> (WebSocketRead, WebSocketWrite) 408 | where 409 | S: AsyncRead + AsyncWrite + Unpin, 410 | R: AsyncRead + Unpin, 411 | W: AsyncWrite + Unpin, 412 | { 413 | let (stream, read, write) = self.into_parts_internal(); 414 | let (r, w) = split_fn(stream); 415 | ( 416 | WebSocketRead { 417 | stream: r, 418 | read_half: read, 419 | }, 420 | WebSocketWrite { 421 | stream: w, 422 | write_half: write, 423 | }, 424 | ) 425 | } 426 | 427 | /// Consumes the `WebSocket` and returns the underlying stream. 428 | #[inline] 429 | pub fn into_inner(self) -> S { 430 | // self.write_half.into_inner().stream 431 | self.stream 432 | } 433 | 434 | /// Consumes the `WebSocket` and returns the underlying stream. 435 | #[inline] 436 | pub(crate) fn into_parts_internal(self) -> (S, ReadHalf, WriteHalf) { 437 | (self.stream, self.read_half, self.write_half) 438 | } 439 | 440 | /// Sets whether to use vectored writes. This option does not guarantee that vectored writes will be always used. 441 | /// 442 | /// Default: `true` 443 | pub fn set_writev(&mut self, vectored: bool) { 444 | self.write_half.vectored = vectored; 445 | } 446 | 447 | pub fn set_writev_threshold(&mut self, threshold: usize) { 448 | self.read_half.writev_threshold = threshold; 449 | self.write_half.writev_threshold = threshold; 450 | } 451 | 452 | /// Sets whether to automatically close the connection when a close frame is received. When set to `false`, the application will have to manually send close frames. 453 | /// 454 | /// Default: `true` 455 | pub fn set_auto_close(&mut self, auto_close: bool) { 456 | self.read_half.auto_close = auto_close; 457 | } 458 | 459 | /// Sets whether to automatically send a pong frame when a ping frame is received. 460 | /// 461 | /// Default: `true` 462 | pub fn set_auto_pong(&mut self, auto_pong: bool) { 463 | self.read_half.auto_pong = auto_pong; 464 | } 465 | 466 | /// Sets the maximum message size in bytes. If a message is received that is larger than this, the connection will be closed. 467 | /// 468 | /// Default: 64 MiB 469 | pub fn set_max_message_size(&mut self, max_message_size: usize) { 470 | self.read_half.max_message_size = max_message_size; 471 | } 472 | 473 | /// Sets whether to automatically apply the mask to the frame payload. 474 | /// 475 | /// Default: `true` 476 | pub fn set_auto_apply_mask(&mut self, auto_apply_mask: bool) { 477 | self.read_half.auto_apply_mask = auto_apply_mask; 478 | self.write_half.auto_apply_mask = auto_apply_mask; 479 | } 480 | 481 | pub fn is_closed(&self) -> bool { 482 | self.write_half.closed 483 | } 484 | 485 | /// Writes a frame to the stream. 486 | /// 487 | /// # Example 488 | /// 489 | /// ``` 490 | /// use fastwebsockets::{WebSocket, Frame, OpCode}; 491 | /// use tokio::net::TcpStream; 492 | /// use anyhow::Result; 493 | /// 494 | /// async fn send( 495 | /// ws: &mut WebSocket 496 | /// ) -> Result<()> { 497 | /// let mut frame = Frame::binary(vec![0x01, 0x02, 0x03].into()); 498 | /// ws.write_frame(frame).await?; 499 | /// Ok(()) 500 | /// } 501 | /// ``` 502 | pub async fn write_frame( 503 | &mut self, 504 | frame: Frame<'f>, 505 | ) -> Result<(), WebSocketError> 506 | where 507 | S: AsyncRead + AsyncWrite + Unpin, 508 | { 509 | self.write_half.write_frame(&mut self.stream, frame).await?; 510 | Ok(()) 511 | } 512 | 513 | /// Flushes the data from the underlying stream. 514 | /// 515 | /// if the underlying stream is buffered (i.e: TlsStream), it is needed to call flush 516 | /// to be sure that the written frame are correctly pushed down to the bottom stream/channel. 517 | /// 518 | pub async fn flush(&mut self) -> Result<(), WebSocketError> 519 | where 520 | S: AsyncWrite + Unpin, 521 | { 522 | flush(&mut self.stream).await 523 | } 524 | 525 | /// Reads a frame from the stream. 526 | /// 527 | /// This method will unmask the frame payload. For fragmented frames, use `FragmentCollector::read_frame`. 528 | /// 529 | /// Text frames payload is guaranteed to be valid UTF-8. 530 | /// 531 | /// # Example 532 | /// 533 | /// ``` 534 | /// use fastwebsockets::{OpCode, WebSocket, Frame}; 535 | /// use tokio::net::TcpStream; 536 | /// use anyhow::Result; 537 | /// 538 | /// async fn echo( 539 | /// ws: &mut WebSocket 540 | /// ) -> Result<()> { 541 | /// let frame = ws.read_frame().await?; 542 | /// match frame.opcode { 543 | /// OpCode::Text | OpCode::Binary => { 544 | /// ws.write_frame(frame).await?; 545 | /// } 546 | /// _ => {} 547 | /// } 548 | /// Ok(()) 549 | /// } 550 | /// ``` 551 | pub async fn read_frame(&mut self) -> Result, WebSocketError> 552 | where 553 | S: AsyncRead + AsyncWrite + Unpin, 554 | { 555 | loop { 556 | let (res, obligated_send) = 557 | self.read_half.read_frame_inner(&mut self.stream).await; 558 | let is_closed = self.write_half.closed; 559 | if let Some(frame) = obligated_send { 560 | if !is_closed { 561 | self.write_half.write_frame(&mut self.stream, frame).await?; 562 | } 563 | } 564 | if let Some(frame) = res? { 565 | if is_closed && frame.opcode != OpCode::Close { 566 | return Err(WebSocketError::ConnectionClosed); 567 | } 568 | break Ok(frame); 569 | } 570 | } 571 | } 572 | } 573 | 574 | const MAX_HEADER_SIZE: usize = 14; 575 | 576 | impl ReadHalf { 577 | pub fn after_handshake(role: Role) -> Self { 578 | let buffer = BytesMut::with_capacity(8192); 579 | 580 | Self { 581 | role, 582 | auto_apply_mask: true, 583 | auto_close: true, 584 | auto_pong: true, 585 | writev_threshold: 1024, 586 | max_message_size: 64 << 20, 587 | buffer, 588 | } 589 | } 590 | 591 | /// Attempt to read a single frame from from the incoming stream, returning any send obligations if 592 | /// `auto_close` or `auto_pong` are enabled. Callers to this function are obligated to send the 593 | /// frame in the latter half of the tuple if one is specified, unless the write half of this socket 594 | /// has been closed. 595 | /// 596 | /// XXX: Do not expose this method to the public API. 597 | pub(crate) async fn read_frame_inner<'f, S>( 598 | &mut self, 599 | stream: &mut S, 600 | ) -> (Result>, WebSocketError>, Option>) 601 | where 602 | S: AsyncRead + Unpin, 603 | { 604 | let mut frame = match self.parse_frame_header(stream).await { 605 | Ok(frame) => frame, 606 | Err(e) => return (Err(e), None), 607 | }; 608 | 609 | if self.role == Role::Server && self.auto_apply_mask { 610 | frame.unmask() 611 | }; 612 | 613 | match frame.opcode { 614 | OpCode::Close if self.auto_close => { 615 | match frame.payload.len() { 616 | 0 => {} 617 | 1 => return (Err(WebSocketError::InvalidCloseFrame), None), 618 | _ => { 619 | let code = close::CloseCode::from(u16::from_be_bytes( 620 | frame.payload[0..2].try_into().unwrap(), 621 | )); 622 | 623 | #[cfg(feature = "simd")] 624 | if simdutf8::basic::from_utf8(&frame.payload[2..]).is_err() { 625 | return (Err(WebSocketError::InvalidUTF8), None); 626 | }; 627 | 628 | #[cfg(not(feature = "simd"))] 629 | if std::str::from_utf8(&frame.payload[2..]).is_err() { 630 | return (Err(WebSocketError::InvalidUTF8), None); 631 | }; 632 | 633 | if !code.is_allowed() { 634 | return ( 635 | Err(WebSocketError::InvalidCloseCode), 636 | Some(Frame::close(1002, &frame.payload[2..])), 637 | ); 638 | } 639 | } 640 | }; 641 | 642 | let obligated_send = Frame::close_raw(frame.payload.to_owned().into()); 643 | (Ok(Some(frame)), Some(obligated_send)) 644 | } 645 | OpCode::Ping if self.auto_pong => { 646 | (Ok(None), Some(Frame::pong(frame.payload))) 647 | } 648 | OpCode::Text => { 649 | if frame.fin && !frame.is_utf8() { 650 | (Err(WebSocketError::InvalidUTF8), None) 651 | } else { 652 | (Ok(Some(frame)), None) 653 | } 654 | } 655 | _ => (Ok(Some(frame)), None), 656 | } 657 | } 658 | 659 | async fn parse_frame_header<'a, S>( 660 | &mut self, 661 | stream: &mut S, 662 | ) -> Result, WebSocketError> 663 | where 664 | S: AsyncRead + Unpin, 665 | { 666 | macro_rules! eof { 667 | ($n:expr) => {{ 668 | if $n == 0 { 669 | return Err(WebSocketError::UnexpectedEOF); 670 | } 671 | }}; 672 | } 673 | 674 | // Read the first two bytes 675 | while self.buffer.remaining() < 2 { 676 | eof!(stream.read_buf(&mut self.buffer).await?); 677 | } 678 | 679 | let fin = self.buffer[0] & 0b10000000 != 0; 680 | let rsv1 = self.buffer[0] & 0b01000000 != 0; 681 | let rsv2 = self.buffer[0] & 0b00100000 != 0; 682 | let rsv3 = self.buffer[0] & 0b00010000 != 0; 683 | 684 | if rsv1 || rsv2 || rsv3 { 685 | return Err(WebSocketError::ReservedBitsNotZero); 686 | } 687 | 688 | let opcode = frame::OpCode::try_from(self.buffer[0] & 0b00001111)?; 689 | let masked = self.buffer[1] & 0b10000000 != 0; 690 | 691 | let length_code = self.buffer[1] & 0x7F; 692 | let extra = match length_code { 693 | 126 => 2, 694 | 127 => 8, 695 | _ => 0, 696 | }; 697 | 698 | self.buffer.advance(2); 699 | while self.buffer.remaining() < extra + masked as usize * 4 { 700 | eof!(stream.read_buf(&mut self.buffer).await?); 701 | } 702 | 703 | let payload_len: usize = match extra { 704 | 0 => usize::from(length_code), 705 | 2 => self.buffer.get_u16() as usize, 706 | #[cfg(any(target_pointer_width = "64", target_pointer_width = "128"))] 707 | 8 => self.buffer.get_u64() as usize, 708 | // On 32bit systems, usize is only 4bytes wide so we must check for usize overflowing 709 | #[cfg(any( 710 | target_pointer_width = "8", 711 | target_pointer_width = "16", 712 | target_pointer_width = "32" 713 | ))] 714 | 8 => match usize::try_from(self.buffer.get_u64()) { 715 | Ok(length) => length, 716 | Err(_) => return Err(WebSocketError::FrameTooLarge), 717 | }, 718 | _ => unreachable!(), 719 | }; 720 | 721 | let mask = if masked { 722 | Some(self.buffer.get_u32().to_be_bytes()) 723 | } else { 724 | None 725 | }; 726 | 727 | if frame::is_control(opcode) && !fin { 728 | return Err(WebSocketError::ControlFrameFragmented); 729 | } 730 | 731 | if opcode == OpCode::Ping && payload_len > 125 { 732 | return Err(WebSocketError::PingFrameTooLarge); 733 | } 734 | 735 | if payload_len >= self.max_message_size { 736 | return Err(WebSocketError::FrameTooLarge); 737 | } 738 | 739 | // Reserve a bit more to try to get next frame header and avoid a syscall to read it next time 740 | self.buffer.reserve(payload_len + MAX_HEADER_SIZE); 741 | while payload_len > self.buffer.remaining() { 742 | eof!(stream.read_buf(&mut self.buffer).await?); 743 | } 744 | 745 | // if we read too much it will stay in the buffer, for the next call to this method 746 | let payload = self.buffer.split_to(payload_len); 747 | let frame = Frame::new(fin, opcode, mask, Payload::Bytes(payload)); 748 | Ok(frame) 749 | } 750 | } 751 | 752 | impl WriteHalf { 753 | pub fn after_handshake(role: Role) -> Self { 754 | Self { 755 | role, 756 | closed: false, 757 | auto_apply_mask: true, 758 | vectored: true, 759 | writev_threshold: 1024, 760 | write_buffer: Vec::with_capacity(2), 761 | } 762 | } 763 | 764 | /// Writes a frame to the provided stream. 765 | pub async fn write_frame<'a, S>( 766 | &'a mut self, 767 | stream: &mut S, 768 | mut frame: Frame<'a>, 769 | ) -> Result<(), WebSocketError> 770 | where 771 | S: AsyncWrite + Unpin, 772 | { 773 | if self.role == Role::Client && self.auto_apply_mask { 774 | frame.mask(); 775 | } 776 | 777 | if frame.opcode == OpCode::Close { 778 | self.closed = true; 779 | } else if self.closed { 780 | return Err(WebSocketError::ConnectionClosed); 781 | } 782 | 783 | if self.vectored && frame.payload.len() > self.writev_threshold { 784 | frame.writev(stream).await?; 785 | } else { 786 | let text = frame.write(&mut self.write_buffer); 787 | stream.write_all(text).await?; 788 | } 789 | 790 | Ok(()) 791 | } 792 | } 793 | 794 | #[cfg(test)] 795 | mod tests { 796 | use super::*; 797 | 798 | const _: () = { 799 | const fn assert_unsync() { 800 | // Generic trait with a blanket impl over `()` for all types. 801 | trait AmbiguousIfImpl { 802 | // Required for actually being able to reference the trait. 803 | fn some_item() {} 804 | } 805 | 806 | impl AmbiguousIfImpl<()> for T {} 807 | 808 | // Used for the specialized impl when *all* traits in 809 | // `$($t)+` are implemented. 810 | #[allow(dead_code)] 811 | struct Invalid; 812 | 813 | impl AmbiguousIfImpl for T {} 814 | 815 | // If there is only one specialized trait impl, type inference with 816 | // `_` can be resolved and this can compile. Fails to compile if 817 | // `$x` implements `AmbiguousIfImpl`. 818 | let _ = >::some_item; 819 | } 820 | assert_unsync::>(); 821 | }; 822 | } 823 | --------------------------------------------------------------------------------