├── src └── bbperf │ ├── __init__.py │ ├── .gitignore │ ├── exceptions.py │ ├── udp_string_sender_thread.py │ ├── tcp-graph.gp │ ├── const.py │ ├── graph.py │ ├── data_sample_evaluator_class.py │ ├── udp_helper.py │ ├── udp-graph.gp │ ├── tcp_helper.py │ ├── udp_rate_manager_class.py │ ├── run_mode_manager_class.py │ ├── bbperf.py │ ├── json_output_class.py │ ├── data_receiver_thread.py │ ├── util.py │ ├── data_sender_thread.py │ ├── control_receiver_thread.py │ ├── output.py │ ├── server.py │ ├── tcp_control_connection_class.py │ └── client.py ├── .gitignore ├── bbperf.service ├── LICENSE ├── pyproject.toml ├── tests └── run_test.sh ├── DEPLOY.txt └── README.md /src/bbperf/__init__.py: -------------------------------------------------------------------------------- 1 | -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- 1 | dist 2 | .venv 3 | .vscode 4 | -------------------------------------------------------------------------------- /src/bbperf/.gitignore: -------------------------------------------------------------------------------- 1 | __pycache__ 2 | .vscode 3 | .venv* 4 | -------------------------------------------------------------------------------- /src/bbperf/exceptions.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | class PeerDisconnectedException(Exception): 5 | pass 6 | -------------------------------------------------------------------------------- /bbperf.service: -------------------------------------------------------------------------------- 1 | [Unit] 2 | Description=bbperf 3 | After=syslog.target network.target 4 | 5 | # StartLimitIntervalSec StartLimitBurst 6 | # stop restarting if fails X times with Y interval (zero disables) 7 | # default 5 times in 10 seconds 8 | StartLimitIntervalSec=0 9 | 10 | [Service] 11 | Type=simple 12 | User=mfreemon 13 | ExecStart=/home/mfreemon/.venv-bbperf/bin/bbperf -s 14 | Restart=always 15 | 16 | # RestartSec 17 | # wait time until restart 18 | RestartSec=2 19 | 20 | [Install] 21 | WantedBy=multi-user.target 22 | 23 | -------------------------------------------------------------------------------- /LICENSE: -------------------------------------------------------------------------------- 1 | Copyright (c) 2024 Cloudflare, Inc. 2 | 3 | Licensed under the Apache License, Version 2.0 (the "License"); 4 | you may not use this file except in compliance with the License. 5 | You may obtain a copy of the License at 6 | 7 | http://www.apache.org/licenses/LICENSE-2.0 8 | 9 | Unless required by applicable law or agreed to in writing, software 10 | distributed under the License is distributed on an "AS IS" BASIS, 11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | See the License for the specific language governing permissions and 13 | limitations under the License. 14 | 15 | -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- 1 | [build-system] 2 | requires = ["hatchling >= 1.26"] 3 | build-backend = "hatchling.build" 4 | 5 | [project] 6 | name = "bbperf" 7 | version = "0.0.30" 8 | authors = [ 9 | { name="Mike Freemon", email="mfreemon@cloudflare.com" }, 10 | ] 11 | description = "An end-to-end performance and bufferbloat measurement tool" 12 | readme = "README.md" 13 | requires-python = ">=3.9" 14 | classifiers = [ 15 | "Programming Language :: Python :: 3", 16 | "Operating System :: OS Independent", 17 | "Environment :: Console", 18 | "License :: OSI Approved :: Apache Software License", 19 | "Topic :: System :: Networking", 20 | ] 21 | dependencies = [ 22 | "numpy", 23 | ] 24 | license = "Apache-2.0" 25 | license-files = ["LICEN[CS]E*"] 26 | 27 | [project.urls] 28 | Homepage = "https://github.com/cloudflare/bbperf" 29 | Issues = "https://github.com/cloudflare/bbperf/issues" 30 | 31 | [project.scripts] 32 | bbperf = "bbperf.bbperf:mainline" 33 | -------------------------------------------------------------------------------- /tests/run_test.sh: -------------------------------------------------------------------------------- 1 | #!/bin/bash 2 | 3 | # Copyright (c) 2024 Cloudflare, Inc. 4 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 5 | 6 | do_run() { 7 | ARGS=$1 8 | 9 | #bbperf -c $ARGS 10 | sudo ip netns exec ns1 bash -c ". $HOME/.venv-314/bin/activate ; cd $HOME/bbperf/src ; python3 -m bbperf.bbperf $ARGS" 11 | } 12 | 13 | #SERVER_ADDR=127.0.0.1 14 | SERVER_ADDR=10.66.30.2 15 | 16 | EXTRAARGS="-v -t 10" 17 | 18 | set -x 19 | 20 | do_run "-c $SERVER_ADDR $EXTRAARGS" 21 | 22 | do_run "-c $SERVER_ADDR $EXTRAARGS -R" 23 | 24 | do_run "-c $SERVER_ADDR $EXTRAARGS -u" 25 | 26 | do_run "-c $SERVER_ADDR $EXTRAARGS -u -R" 27 | 28 | EXTRAARGS="-t 10" 29 | 30 | do_run "-c $SERVER_ADDR $EXTRAARGS" 31 | 32 | do_run "-c $SERVER_ADDR $EXTRAARGS -R" 33 | 34 | do_run "-c $SERVER_ADDR $EXTRAARGS -u" 35 | 36 | do_run "-c $SERVER_ADDR $EXTRAARGS -u -R" 37 | 38 | do_run "-c $SERVER_ADDR $EXTRAARGS -J /tmp/foo578439759837.out" 39 | 40 | head /tmp/foo578439759837.out 41 | tail /tmp/foo578439759837.out 42 | sudo rm /tmp/foo578439759837.out 43 | 44 | -------------------------------------------------------------------------------- /src/bbperf/udp_string_sender_thread.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | 6 | from . import udp_helper 7 | 8 | 9 | # falling off the end of this method terminates the process 10 | def run(readyevent, doneevent, args, data_sock, peer_addr, string_to_send): 11 | if args.verbosity: 12 | print("udp string sender thread: start of process", flush=True) 13 | 14 | ping_interval_sec = 0.2 15 | ping_duration_sec = 5 16 | total_pings_to_send = ping_duration_sec / ping_interval_sec 17 | 18 | send_count = 0 19 | 20 | readyevent.set() 21 | 22 | while True: 23 | if doneevent.is_set(): 24 | break 25 | 26 | udp_helper.sendto(data_sock, peer_addr, string_to_send.encode()) 27 | send_count += 1 28 | 29 | time.sleep(ping_interval_sec) 30 | 31 | if send_count > total_pings_to_send: 32 | break 33 | 34 | if args.verbosity: 35 | print("udp string sender thread: end of process", flush=True) 36 | -------------------------------------------------------------------------------- /DEPLOY.txt: -------------------------------------------------------------------------------- 1 | 2 | PREREQUISITES 3 | 4 | Appropriate account/credentials to upload to the PyPI site 5 | 6 | python3 -m pip install --upgrade pip build twine 7 | 8 | BUILD 9 | 10 | Create tar.gz and whl files in dist directory: 11 | 12 | cd /path/to/bbperf/git/repo/dir <-- the directory with the pyproject.toml file 13 | 14 | vim src/bbperf/const.py <-- increment the version number 15 | git add src/bbperf/const.py 16 | 17 | vim pyproject.toml <-- increment the version number 18 | git add pyproject.toml 19 | 20 | git commit -m 'version 0.0.2' 21 | 22 | git tag -a v0.0.2 -m 'version 0.0.2' 23 | 24 | git push origin 25 | git push origin --tags 26 | 27 | rm dist/* 28 | python3 -m build 29 | 30 | UPLOAD 31 | 32 | Upload dist files to production PyPI site: 33 | 34 | cd /path/to/bbperf/git/repo/dir <-- the directory with the pyproject.toml file 35 | 36 | python3 -m twine upload dist/* 37 | 38 | To view: 39 | https://pypi.org 40 | 41 | MISCELLANEOUS 42 | 43 | Using the test PyPI site: 44 | 45 | python3 -m twine upload --repository testpypi dist/* 46 | 47 | To view: 48 | https://test.pypi.org 49 | 50 | To install from test PyPI: 51 | python3 -m pip install --upgrade --index-url https://test.pypi.org/simple/ --nodeps bbperf 52 | 53 | Installing from the local git repository: 54 | 55 | python3 -m pip install /path/to/bbperf/git/repo/dir <-- the directory with the pyproject.toml file 56 | 57 | -------------------------------------------------------------------------------- /src/bbperf/tcp-graph.gp: -------------------------------------------------------------------------------- 1 | #!/usr/bin/gnuplot 2 | 3 | # Copyright (c) 2024 Cloudflare, Inc. 4 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 5 | 6 | #datafile1 = "/tmp/bbperf-tcp-data-j9xh25q3" 7 | 8 | pngfile1 = datafile1.".png" 9 | 10 | set grid 11 | 12 | set key right top 13 | set key box opaque 14 | 15 | set style data lines 16 | 17 | # noenhanced to avoid need to escape underscores in labels 18 | set terminal pngcairo size 1200,1000 noenhanced 19 | set output pngfile1 20 | 21 | # generate stats for column 2 22 | # nooutput - do not sent to "screen" 23 | # name - prefix 24 | stats datafile1 using 1 nooutput name "XRANGE" 25 | 26 | set multiplot title graphtitle layout 3,1 27 | 28 | set lmargin 12 29 | 30 | # dt 1 (solid), dt 2 (dotted), dt 4 (dot dash) 31 | # lc 1 (purple), lc 4 (orange), lc 6 (blue), lc 7 (red), lc 8 (black) 32 | 33 | set ylabel "Mbps" 34 | 35 | plot datafile1 using ($1-XRANGE_min):6 title "receiver throughput (L7)" lw 2 lc 6, \ 36 | "" using ($1-XRANGE_min):4 title "sender throughput (L7)" lw 2 lc 1 37 | 38 | set ylabel "ms" 39 | 40 | plot datafile1 using ($1-XRANGE_min):7 title "unloaded RTT (L7)" lw 2 lc 1, \ 41 | "" using ($1-XRANGE_min):8 title "RTT (L7)" lw 2 lc 6 42 | 43 | set ylabel "bytes" 44 | 45 | plot datafile1 using ($1-XRANGE_min):9 title "BDP" lw 2 lc 1, \ 46 | "" using ($1-XRANGE_min):10 title "buffered data" lw 2 lc 6 47 | 48 | unset multiplot 49 | 50 | -------------------------------------------------------------------------------- /src/bbperf/const.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | BBPERF_VERSION = "0.0.30" 5 | 6 | SERVER_PORT = 5301 7 | 8 | DEFAULT_VALID_DATA_COLLECTION_TIME_SEC = 20 9 | 10 | # max duration for calibration phase 11 | MAX_DURATION_CALIBRATION_TIME_SEC = 20 12 | 13 | # cap the amount of time we will wait for valid data 14 | MAX_DATA_COLLECTION_TIME_WITHOUT_VALID_DATA = 60 15 | 16 | # ignore incoming data for this amount of time after starting data collection phase 17 | DATA_SAMPLE_IGNORE_TIME_ALWAYS_SEC = 1 18 | DATA_SAMPLE_IGNORE_TIME_TCP_MAX_SEC = 5 19 | DATA_SAMPLE_IGNORE_TIME_UDP_MAX_SEC = 10 20 | 21 | # for socket recv() 22 | BUFSZ = (128 * 1024) 23 | 24 | PAYLOAD_1K = b'a'*1024 25 | PAYLOAD_128K = b'a'*(128 * 1024) 26 | 27 | RUN_MODE_CALIBRATING = 1 28 | RUN_MODE_RUNNING = 2 29 | RUN_MODE_STOP = 3 30 | 31 | SAMPLE_INTERVAL_SEC = 0.1 32 | STDOUT_INTERVAL_SEC = 1 33 | 34 | # pacing for UDP sends 35 | UDP_BATCHES_PER_SECOND = 1000 36 | UDP_DELAY_BETWEEN_BATCH_STARTS = (1.0 / UDP_BATCHES_PER_SECOND) 37 | UDP_NEGATIVE_DELAY_BETWEEN_BATCHES_WARNING_EVERY = UDP_BATCHES_PER_SECOND 38 | 39 | SETUP_COMPLETE_MSG = "setup complete" 40 | START_MSG = " start " 41 | UDP_STOP_MSG = "stop" 42 | TCP_CONTROL_INITIAL_ACK = "control initial ack" 43 | TCP_CONTROL_ARGS_ACK = "control args ack" 44 | UDP_DATA_INITIAL_ACK = "data initial ack" 45 | 46 | SOCKET_TIMEOUT_SEC=30 47 | 48 | UDP_DEFAULT_INITIAL_RATE = 8000 49 | 50 | UDP_MIN_RATE = 100 51 | UDP_MAX_RATE = 800000 52 | -------------------------------------------------------------------------------- /src/bbperf/graph.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import os 5 | import argparse 6 | import subprocess 7 | 8 | 9 | def create_graph(args, datafile1, pngfilename): 10 | 11 | if args.graph_file: 12 | filename_in_title = args.graph_file 13 | else: 14 | filename_in_title = pngfilename 15 | 16 | this_script_dir = os.path.dirname(os.path.abspath(__file__)) 17 | 18 | if args.udp: 19 | gp_file = this_script_dir + "/udp-graph.gp" 20 | graph_title = "bbperf UDP {}".format(filename_in_title) 21 | else: 22 | gp_file = this_script_dir + "/tcp-graph.gp" 23 | graph_title = "bbperf TCP {}".format(filename_in_title) 24 | 25 | gnuplot_script = "datafile1 = \"{}\" ; graphtitle = \"{}\" ; load \"{}\"".format( 26 | datafile1, graph_title, gp_file) 27 | 28 | result = subprocess.run(["gnuplot", "-e", gnuplot_script], capture_output=True) 29 | 30 | if args.verbosity or (result.returncode != 0): 31 | print("gnuplot -e {}".format(gnuplot_script), flush=True) 32 | print("returncode: {}".format(result.returncode), flush=True) 33 | print("stdout: {}".format(result.stdout), flush=True) 34 | print("stderr: {}".format(result.stderr), flush=True) 35 | 36 | 37 | if __name__ == '__main__': 38 | datafile1 = "/tmp/bbperf-tcp-data-aa97m9xl" 39 | 40 | args_d = { "udp": True } 41 | 42 | # recreate args as if it came directly from argparse 43 | args = argparse.Namespace(**args_d) 44 | 45 | create_graph(args, datafile1) 46 | -------------------------------------------------------------------------------- /src/bbperf/data_sample_evaluator_class.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | 6 | from . import const 7 | 8 | class DataSampleEvaluatorClass: 9 | 10 | # args are client args 11 | def __init__(self, args0): 12 | self.args = args0 13 | self.valid_flag = False 14 | self.max_ramp_time = self.args.max_ramp_time 15 | 16 | if not self.max_ramp_time: 17 | if self.args.udp: 18 | self.max_ramp_time = const.DATA_SAMPLE_IGNORE_TIME_UDP_MAX_SEC 19 | else: 20 | self.max_ramp_time = const.DATA_SAMPLE_IGNORE_TIME_TCP_MAX_SEC 21 | 22 | if self.args.verbosity: 23 | print("max_ramp_time is {}".format(self.max_ramp_time), flush=True) 24 | 25 | 26 | # once a sample is valid then all subsequent samples are valid 27 | def is_sample_valid(self, run_mode_running_start_time, dropped_this_interval_percent, curr_time): 28 | if self.valid_flag: 29 | return True 30 | 31 | # samples are never valid until we have passed the "ignore time" 32 | if curr_time < (run_mode_running_start_time + const.DATA_SAMPLE_IGNORE_TIME_ALWAYS_SEC): 33 | return False 34 | 35 | # udp -- can we exit early? 36 | if self.args.udp: 37 | if dropped_this_interval_percent > 0: 38 | self.valid_flag = True 39 | return True 40 | 41 | if curr_time > (run_mode_running_start_time + self.max_ramp_time): 42 | self.valid_flag = True 43 | return True 44 | 45 | return False 46 | 47 | -------------------------------------------------------------------------------- /src/bbperf/udp_helper.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | 6 | from . import const 7 | 8 | 9 | def sendto(data_sock, peer_addr, payload_bytes): 10 | num_payload_bytes = len(payload_bytes) 11 | 12 | num_bytes_sent = data_sock.sendto(payload_bytes, peer_addr) 13 | 14 | if num_bytes_sent <= 0 or num_bytes_sent != num_payload_bytes: 15 | raise Exception("ERROR: udp_helper.sendto(): send failed") 16 | 17 | 18 | def send_stop_message(data_sock, peer_addr): 19 | payload_bytes = const.UDP_STOP_MSG.encode() 20 | 21 | # 3 times just in case the first one does not make it to the destination 22 | for i in range(3): 23 | try: 24 | data_sock.sendto(payload_bytes, peer_addr) 25 | except: 26 | # probable "ConnectionRefusedError: [Errno 111] Connection refused" here if first message was processed successfully 27 | pass 28 | 29 | time.sleep(0.1) 30 | 31 | 32 | def wait_for_string(data_sock, peer_addr, expected_string): 33 | expected_bytes = expected_string.encode() 34 | start_time = time.time() 35 | 36 | while True: 37 | payload_bytes, pkt_from_addr = data_sock.recvfrom(len(expected_bytes)) 38 | 39 | if pkt_from_addr != peer_addr: 40 | continue 41 | 42 | payload_str = payload_bytes.decode() 43 | 44 | if payload_str == expected_string: 45 | break 46 | 47 | elapsed_time = time.time() - start_time 48 | if elapsed_time > 60: 49 | raise Exception("ERROR: failed to UDP recv string: {}".format(expected_string)) 50 | 51 | -------------------------------------------------------------------------------- /src/bbperf/udp-graph.gp: -------------------------------------------------------------------------------- 1 | #!/usr/bin/gnuplot 2 | 3 | # Copyright (c) 2024 Cloudflare, Inc. 4 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 5 | 6 | #datafile1 = "/tmp/bbperf-udp-data-aa97m9xl" 7 | 8 | pngfile1 = datafile1.".png" 9 | 10 | set grid 11 | 12 | set key right top 13 | set key box opaque 14 | 15 | set style data lines 16 | 17 | # noenhanced to avoid need to escape underscores in labels 18 | set terminal pngcairo size 1200,1300 noenhanced 19 | set output pngfile1 20 | 21 | # generate stats for column 2 22 | # nooutput - do not sent to "screen" 23 | # name - prefix 24 | stats datafile1 using 1 nooutput name "XRANGE" 25 | 26 | set multiplot title graphtitle layout 5,1 27 | 28 | set lmargin 12 29 | 30 | # dt 1 (solid), dt 2 (dotted), dt 4 (dot dash) 31 | # lc 1 (purple), lc 4 (orange), lc 6 (blue), lc 7 (red), lc 8 (black) 32 | 33 | set ylabel "pps" 34 | 35 | plot datafile1 using ($1-XRANGE_min):5 title "receiver pps" lw 2 lc 6, \ 36 | "" using ($1-XRANGE_min):3 title "sender pps" lw 2 lc 1 dt 2 37 | 38 | set ylabel "Mbps" 39 | 40 | plot datafile1 using ($1-XRANGE_min):6 title "receiver throughput" lw 2 lc 6, \ 41 | "" using ($1-XRANGE_min):4 title "sender throughput" lw 2 lc 1 dt 2 42 | 43 | set ylabel "ms" 44 | 45 | plot datafile1 using ($1-XRANGE_min):7 title "unloaded RTT" lw 2 lc 1, \ 46 | "" using ($1-XRANGE_min):8 title "RTT" lw 2 lc 6 47 | 48 | set ylabel "bytes" 49 | 50 | plot datafile1 using ($1-XRANGE_min):9 title "BDP" lw 2 lc 1, \ 51 | "" using ($1-XRANGE_min):10 title "buffered data" lw 2 lc 6 52 | 53 | set ylabel "percent" 54 | 55 | plot datafile1 using ($1-XRANGE_min):13 title "pkt loss %" lw 2 lc 6 56 | 57 | unset multiplot 58 | 59 | -------------------------------------------------------------------------------- /src/bbperf/tcp_helper.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import socket 5 | import struct 6 | 7 | from .exceptions import PeerDisconnectedException 8 | 9 | 10 | def recv_exact_num_bytes(data_sock, total_num_bytes_to_read): 11 | payload_bytes = bytearray() 12 | num_bytes_read = 0 13 | 14 | while num_bytes_read < total_num_bytes_to_read: 15 | 16 | num_bytes_remaining = total_num_bytes_to_read - num_bytes_read 17 | 18 | recv_bytes = data_sock.recv(num_bytes_remaining) 19 | 20 | if len(recv_bytes) == 0: 21 | raise PeerDisconnectedException() 22 | 23 | num_bytes_received = len(recv_bytes) 24 | 25 | if num_bytes_received == 0: 26 | raise PeerDisconnectedException() 27 | 28 | num_bytes_read += num_bytes_received 29 | 30 | payload_bytes.extend(recv_bytes) 31 | 32 | return payload_bytes 33 | 34 | 35 | def get_congestion_control(data_sock): 36 | cc_algo_bytes = data_sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_CONGESTION, 1024) 37 | # cc_algo is null-terminated bytes 38 | cc_algo_str = cc_algo_bytes.split(b'\x00')[0].decode() 39 | return cc_algo_str 40 | 41 | 42 | def set_congestion_control(client_args, data_sock): 43 | if get_congestion_control(data_sock) == client_args.congestion: 44 | # already set, nothing to do here 45 | return 46 | 47 | data_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CONGESTION, client_args.congestion.encode()) 48 | 49 | cc_algo_str = get_congestion_control(data_sock) 50 | if cc_algo_str != client_args.congestion: 51 | raise Exception("ERROR: unexpected congestion control in effect: {}".format(cc_algo_str)) 52 | 53 | 54 | def set_tcp_notsent_lowat(data_sock): 55 | lowat_value = 128 * 1024 56 | lowat_val_bytes = struct.pack('I', lowat_value) 57 | data_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NOTSENT_LOWAT, lowat_val_bytes) 58 | -------------------------------------------------------------------------------- /src/bbperf/udp_rate_manager_class.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import numpy 5 | 6 | from . import const 7 | from . import util 8 | 9 | # this is a simple congestion control algorithm for the udp test 10 | class UdpRateManagerClass: 11 | 12 | # args are client args 13 | def __init__(self, args, shared_udp_sending_rate_pps): 14 | self.args = args 15 | self.shared_udp_sending_rate_pps = shared_udp_sending_rate_pps 16 | self.receiver_pps_list = [] 17 | self.last_new_rate = 0 18 | 19 | # control receiver calls this with interval pps (every 0.1 seconds) 20 | def update(self, r_record): 21 | # gut checks to avoid updating based on bogus input 22 | if r_record["r_sender_total_pkts_sent"] < 100: 23 | return 24 | if r_record["r_sender_interval_pkts_sent"] < 10: 25 | return 26 | if r_record["receiver_pps"] < 100: 27 | return 28 | 29 | self.receiver_pps_list.append(r_record["receiver_pps"]) 30 | if len(self.receiver_pps_list) > 10: 31 | self.receiver_pps_list = self.receiver_pps_list[1:] 32 | 33 | receiver_pps_p90 = numpy.percentile(self.receiver_pps_list, 90) 34 | 35 | new_rate = int(receiver_pps_p90 * 1.2) 36 | 37 | if new_rate < const.UDP_MIN_RATE: 38 | new_rate = const.UDP_MIN_RATE 39 | # cap it to something big 40 | if new_rate > const.UDP_MAX_RATE: 41 | new_rate = const.UDP_MAX_RATE 42 | 43 | delta_rate = new_rate - self.last_new_rate 44 | 45 | if abs(delta_rate) < 2: 46 | # do not bother with tiny changes 47 | return 48 | 49 | if self.args.verbosity > 1: 50 | print("UdpRateManager: update: receiver pps {:6d} old rate {:6d} new rate {:6d} delta {:7d}".format( 51 | r_record["receiver_pps"], 52 | self.shared_udp_sending_rate_pps.value, 53 | new_rate, 54 | delta_rate), 55 | flush=True 56 | ) 57 | 58 | self.shared_udp_sending_rate_pps.value = new_rate 59 | self.last_new_rate = new_rate 60 | -------------------------------------------------------------------------------- /src/bbperf/run_mode_manager_class.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | 6 | from . import const 7 | 8 | from .data_sample_evaluator_class import DataSampleEvaluatorClass 9 | 10 | class RunModeManagerClass: 11 | 12 | # args are client args 13 | def __init__(self, args0, shared_run_mode0): 14 | self.args = args0 15 | self.shared_run_mode = shared_run_mode0 16 | 17 | self.job_start_time = None 18 | self.run_mode_running_start_time = None 19 | self.min_rtt_ms = None 20 | self.last_10_rtt_list = [] 21 | self.total_dropped_as_of_last_interval = 0 22 | self.data_sample_evaluator = DataSampleEvaluatorClass(self.args) 23 | self.first_valid_sample_time = None 24 | 25 | 26 | # updates shared_run_mode and r_record["is_sample_valid"] 27 | def update(self, r_record): 28 | curr_time = time.time() 29 | 30 | # first record 31 | if self.job_start_time is None: 32 | self.job_start_time = curr_time 33 | 34 | curr_rtt_ms = r_record["rtt_ms"] 35 | 36 | # update unloaded latency? 37 | if r_record["r_record_type"] == "cal": 38 | if (self.min_rtt_ms is None) or (curr_rtt_ms < self.min_rtt_ms): 39 | self.min_rtt_ms = curr_rtt_ms 40 | 41 | # CALIBRATING 42 | if self.shared_run_mode.value == const.RUN_MODE_CALIBRATING: 43 | 44 | # check to see if we should leave calibration 45 | self.last_10_rtt_list.append(curr_rtt_ms) 46 | if len(self.last_10_rtt_list) > 10: 47 | self.last_10_rtt_list = self.last_10_rtt_list[1:11] 48 | 49 | # are we done calibrating? 50 | # because either end early or hit max calibration time 51 | if ((min(self.last_10_rtt_list) > self.min_rtt_ms) or 52 | (curr_time > self.job_start_time + const.MAX_DURATION_CALIBRATION_TIME_SEC)): 53 | 54 | self.shared_run_mode.value = const.RUN_MODE_RUNNING 55 | self.run_mode_running_start_time = curr_time 56 | 57 | return 58 | 59 | # run mode is RUNNING or STOP 60 | 61 | # have we reached max time for data run without getting any valid data samples? 62 | if ((self.first_valid_sample_time is None) and (curr_time > (self.run_mode_running_start_time + const.MAX_DATA_COLLECTION_TIME_WITHOUT_VALID_DATA))): 63 | self.shared_run_mode.value = const.RUN_MODE_STOP 64 | 65 | # check to see if we should stop RUNNING 66 | 67 | if self.args.udp: 68 | dropped_this_interval = r_record["total_dropped"] - self.total_dropped_as_of_last_interval 69 | if dropped_this_interval < 0: 70 | dropped_this_interval = 0 71 | dropped_this_interval_percent = (dropped_this_interval * 100.0) / r_record["r_sender_interval_pkts_sent"] 72 | # remember this for next loop: 73 | self.total_dropped_as_of_last_interval = r_record["total_dropped"] 74 | else: 75 | dropped_this_interval = -1 76 | dropped_this_interval_percent = -1 77 | 78 | r_record["interval_dropped"] = dropped_this_interval 79 | r_record["interval_dropped_percent"] = dropped_this_interval_percent 80 | 81 | if self.data_sample_evaluator.is_sample_valid( 82 | self.run_mode_running_start_time, 83 | dropped_this_interval_percent, 84 | curr_time): 85 | 86 | r_record["is_sample_valid"] = 1 87 | if self.first_valid_sample_time is None: 88 | self.first_valid_sample_time = curr_time 89 | 90 | else: 91 | r_record["is_sample_valid"] = 0 92 | 93 | if self.first_valid_sample_time and (curr_time > (self.first_valid_sample_time + self.args.time)): 94 | self.shared_run_mode.value = const.RUN_MODE_STOP 95 | 96 | -------------------------------------------------------------------------------- /src/bbperf/bbperf.py: -------------------------------------------------------------------------------- 1 | #!/usr/bin/python3 2 | 3 | # Copyright (c) 2024 Cloudflare, Inc. 4 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 5 | 6 | import argparse 7 | 8 | from . import client 9 | from . import server 10 | from . import util 11 | from . import const 12 | 13 | def mainline(): 14 | print("bbperf version {}".format(const.BBPERF_VERSION), flush=True) 15 | 16 | parser = argparse.ArgumentParser(description="bbperf: end to end performance and bufferbloat measurement tool") 17 | 18 | parser.add_argument("-s", "--server", 19 | action="store_true", 20 | default=False, 21 | help="run in server mode") 22 | 23 | parser.add_argument("-c", "--client", 24 | metavar="SERVER_ADDR", 25 | default=None, 26 | help="run in client mode (specify either DNS name or IP address)") 27 | 28 | parser.add_argument("-p", "--port", 29 | metavar="SERVER_PORT", 30 | type=int, 31 | default=const.SERVER_PORT, 32 | help="server port (default: {})".format(const.SERVER_PORT)) 33 | 34 | parser.add_argument("-u", "--udp", 35 | action="store_true", 36 | default=False, 37 | help="run in UDP mode (default: TCP mode)") 38 | 39 | parser.add_argument("-R", "--reverse", 40 | action="store_true", 41 | default=False, 42 | help="data flow in download direction (server to client)") 43 | 44 | parser.add_argument("--max-ramp-time", 45 | metavar="SECONDS", 46 | type=int, 47 | default=None, 48 | help="max duration in seconds before collecting data samples (tcp default: {}, udp default: {})".format( 49 | const.DATA_SAMPLE_IGNORE_TIME_TCP_MAX_SEC, 50 | const.DATA_SAMPLE_IGNORE_TIME_UDP_MAX_SEC)) 51 | 52 | parser.add_argument("-t", "--time", 53 | metavar="SECONDS", 54 | type=int, 55 | default=const.DEFAULT_VALID_DATA_COLLECTION_TIME_SEC, 56 | help="duration in seconds to collect valid data samples (default: {})".format(const.DEFAULT_VALID_DATA_COLLECTION_TIME_SEC)) 57 | 58 | parser.add_argument("-v", "--verbosity", 59 | action="count", 60 | default=0, 61 | help="increase output verbosity (can be repeated)") 62 | 63 | parser.add_argument("-q", "--quiet", 64 | action="count", 65 | default=0, 66 | help="decrease output verbosity (can be repeated)") 67 | 68 | parser.add_argument("-J", "--json-file", 69 | default=None, 70 | help="JSON output file") 71 | 72 | parser.add_argument("-g", "--graph", 73 | action="store_true", 74 | default=False, 75 | help="generate graph and save in tmp file (requires gnuplot)") 76 | 77 | parser.add_argument("--graph-file", 78 | metavar="GRAPH_FILE", 79 | default=None, 80 | help="generate graph and save in the specified file (requires gnuplot)") 81 | 82 | parser.add_argument("-k", "--keep", 83 | action="store_true", 84 | default=False, 85 | help="keep data file") 86 | 87 | parser.add_argument("-B", "--bind", 88 | metavar="BIND_ADDR", 89 | default="0.0.0.0", 90 | help="bind server sockets to address") 91 | 92 | parser.add_argument("--local-data-port", 93 | metavar="LOCAL_DATA_PORT", 94 | type=int, 95 | default=0, 96 | help="local port for data connection (default: ephemeral)") 97 | 98 | parser.add_argument("-C", "--congestion", 99 | metavar="CC_ALGORITHM", 100 | default="cubic", 101 | help="congestion control algorithm (default: cubic)") 102 | 103 | args = parser.parse_args() 104 | 105 | util.validate_and_finalize_args(args) 106 | 107 | if args.client: 108 | client.client_mainline(args) 109 | else: 110 | server.server_mainline(args) 111 | 112 | 113 | if __name__ == '__main__': 114 | mainline() 115 | -------------------------------------------------------------------------------- /src/bbperf/json_output_class.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import sys 5 | import json 6 | import numpy 7 | 8 | class JsonOutputClass: 9 | 10 | def __init__(self, args): 11 | self.args = args 12 | self.output_dict = {} 13 | self.output_dict["entries"] = [] 14 | self.unloaded_rtt_ms = None 15 | 16 | if self.args.json_file: 17 | self.json_output_file = open(self.args.json_file, 'w') 18 | 19 | def set_unloaded_rtt_ms(self, rtt_ms): 20 | self.unloaded_rtt_ms = rtt_ms 21 | 22 | def add_entry(self, entry): 23 | self.output_dict["entries"].append(entry) 24 | 25 | def create_aggregate_stats(self): 26 | loaded_rtt_ms_list = [] 27 | receiver_throughput_rate_mbps_list = [] 28 | excess_buffered_bytes_list = [] 29 | receiver_pps_list = [] 30 | pkt_loss_percent_list = [] 31 | 32 | for entry in self.output_dict["entries"]: 33 | if entry["is_sample_valid"]: 34 | loaded_rtt_ms_list.append(entry["loaded_rtt_ms"]) 35 | receiver_throughput_rate_mbps_list.append(entry["receiver_throughput_rate_mbps"]) 36 | excess_buffered_bytes_list.append(entry["excess_buffered_bytes"]) 37 | receiver_pps_list.append(entry["receiver_pps"]) 38 | pkt_loss_percent_list.append(entry["pkt_loss_percent"]) 39 | 40 | num_samples = len(loaded_rtt_ms_list) 41 | if num_samples < 10: 42 | print("ERROR: not enough valid samples for summary statistics: {} samples".format(num_samples), 43 | file=sys.stderr, 44 | flush=True) 45 | return 46 | 47 | summary_dict = self.output_dict["summary"] = {} 48 | 49 | summary_dict["num_samples"] = num_samples 50 | 51 | summary_dict["unloaded_rtt_ms"] = self.unloaded_rtt_ms 52 | 53 | p1, p10, p50, p90, p99 = numpy.percentile(loaded_rtt_ms_list, [1, 10, 50, 90, 99]) 54 | summary_dict["loaded_rtt_ms"] = {} 55 | summary_dict["loaded_rtt_ms"]["p1"] = p1 56 | summary_dict["loaded_rtt_ms"]["p10"] = p10 57 | summary_dict["loaded_rtt_ms"]["p50"] = p50 58 | summary_dict["loaded_rtt_ms"]["p90"] = p90 59 | summary_dict["loaded_rtt_ms"]["p99"] = p99 60 | 61 | p1, p10, p50, p90, p99 = numpy.percentile(receiver_throughput_rate_mbps_list, [1, 10, 50, 90, 99]) 62 | summary_dict["receiver_throughput_rate_mbps"] = {} 63 | summary_dict["receiver_throughput_rate_mbps"]["p1"] = p1 64 | summary_dict["receiver_throughput_rate_mbps"]["p10"] = p10 65 | summary_dict["receiver_throughput_rate_mbps"]["p50"] = p50 66 | summary_dict["receiver_throughput_rate_mbps"]["p90"] = p90 67 | summary_dict["receiver_throughput_rate_mbps"]["p99"] = p99 68 | 69 | p1, p10, p50, p90, p99 = numpy.percentile(excess_buffered_bytes_list, [1, 10, 50, 90, 99]) 70 | summary_dict["excess_buffered_bytes"] = {} 71 | summary_dict["excess_buffered_bytes"]["p1"] = p1 72 | summary_dict["excess_buffered_bytes"]["p10"] = p10 73 | summary_dict["excess_buffered_bytes"]["p50"] = p50 74 | summary_dict["excess_buffered_bytes"]["p90"] = p90 75 | summary_dict["excess_buffered_bytes"]["p99"] = p99 76 | 77 | p1, p10, p50, p90, p99 = numpy.percentile(receiver_pps_list, [1, 10, 50, 90, 99]) 78 | summary_dict["receiver_pps"] = {} 79 | summary_dict["receiver_pps"]["p1"] = p1 80 | summary_dict["receiver_pps"]["p10"] = p10 81 | summary_dict["receiver_pps"]["p50"] = p50 82 | summary_dict["receiver_pps"]["p90"] = p90 83 | summary_dict["receiver_pps"]["p99"] = p99 84 | 85 | p1, p10, p50, p90, p99 = numpy.percentile(pkt_loss_percent_list, [1, 10, 50, 90, 99]) 86 | summary_dict["pkt_loss_percent"] = {} 87 | summary_dict["pkt_loss_percent"]["p1"] = p1 88 | summary_dict["pkt_loss_percent"]["p10"] = p10 89 | summary_dict["pkt_loss_percent"]["p50"] = p50 90 | summary_dict["pkt_loss_percent"]["p90"] = p90 91 | summary_dict["pkt_loss_percent"]["p99"] = p99 92 | 93 | def write_output(self): 94 | self.create_aggregate_stats() 95 | 96 | # write to stdout 97 | if (self.args.quiet < 2) and ("summary" in self.output_dict): 98 | str_out = json.dumps(self.output_dict["summary"], indent=4) 99 | print(str_out, flush=True) 100 | 101 | # write to file if requested 102 | if self.args.json_file: 103 | json.dump(self.output_dict, self.json_output_file, indent=4) 104 | self.json_output_file.close() 105 | -------------------------------------------------------------------------------- /src/bbperf/data_receiver_thread.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | import socket 6 | 7 | from . import const 8 | from . import util 9 | 10 | # args are client args 11 | def run(readyevent, args, control_conn, data_sock, peer_addr): 12 | 13 | if args.verbosity: 14 | print("starting data receiver process", flush=True) 15 | 16 | # do not block for very long on the below recv calls as we want to do these thing even 17 | # if the flow of data packets has stopped: 18 | # 1. to send our interval stats on the intervals 19 | # 2. to exit the process when the "end of run" timer expires 20 | data_sock.settimeout(0.01) 21 | 22 | total_recv_calls = 0 23 | 24 | start_time_sec = time.time() 25 | 26 | interval_start_time = start_time_sec 27 | interval_end_time = interval_start_time + const.SAMPLE_INTERVAL_SEC 28 | 29 | interval_pkts_received = 0 30 | interval_bytes_received = 0 31 | 32 | socket_timeout_timer_active = False 33 | socket_timeout_timer_start_time = None 34 | 35 | readyevent.set() 36 | 37 | # do until end of test duration 38 | # we will not get a connection close with udp 39 | while True: 40 | num_bytes_read = 0 41 | 42 | try: 43 | if args.udp: 44 | # recv with short timeout 45 | bytes_read, pkt_from_addr = data_sock.recvfrom(const.BUFSZ) 46 | 47 | # validate peer address 48 | # only accept packets from our client 49 | 50 | if pkt_from_addr != peer_addr: 51 | # ignore this datagram 52 | continue 53 | 54 | else: 55 | # tcp 56 | # recv with short timeout 57 | bytes_read = data_sock.recv(const.BUFSZ) 58 | 59 | num_bytes_read = len(bytes_read) 60 | 61 | if num_bytes_read == 0: 62 | # (tcp only) peer has disconnected 63 | if args.verbosity: 64 | print("peer disconnected (data socket)", flush=True) 65 | # exit process 66 | break 67 | 68 | socket_timeout_timer_active = False 69 | 70 | except socket.timeout: 71 | if socket_timeout_timer_active: 72 | if (time.time() - socket_timeout_timer_start_time) > const.SOCKET_TIMEOUT_SEC: 73 | raise Exception("FATAL: data_receiver_thread: timeout during data socket read") 74 | else: 75 | socket_timeout_timer_active = True 76 | socket_timeout_timer_start_time = time.time() 77 | 78 | if args.udp and num_bytes_read == len(const.UDP_STOP_MSG) and (bytes_read.decode() == const.UDP_STOP_MSG): 79 | if args.verbosity: 80 | print("data receiver thread: received udp stop message, exiting", flush=True) 81 | break 82 | 83 | if num_bytes_read == 0: 84 | # recv must have timed out, skip the rest of this loop 85 | continue 86 | 87 | curr_time_sec = time.time() 88 | 89 | total_recv_calls += 1 90 | 91 | interval_pkts_received += 1 # valid for udp only 92 | interval_bytes_received += num_bytes_read 93 | 94 | # end of interval 95 | # send interval record over control connection 96 | if curr_time_sec > interval_end_time: 97 | interval_time_sec = curr_time_sec - interval_start_time 98 | 99 | # find the packet send time in the user payload 100 | 101 | a_b_block = None 102 | 103 | idx_of_a = bytes_read.find(b' a ') 104 | if idx_of_a > -1: 105 | idx_of_b = bytes_read.find(b' b ', idx_of_a) 106 | if idx_of_b > -1: 107 | a_b_block = bytes_read[ idx_of_a : idx_of_b + 3 ] 108 | 109 | if a_b_block is None: 110 | # skip sending for this packet, but stay "in" sample interval 111 | continue 112 | 113 | # sending info back to client on control connection 114 | 115 | ba = bytearray() 116 | ba.extend(a_b_block) 117 | ba.extend(str(interval_time_sec).encode()) 118 | ba.extend(b' ') 119 | ba.extend(str(interval_pkts_received).encode()) 120 | ba.extend(b' ') 121 | ba.extend(str(interval_bytes_received).encode()) 122 | ba.extend(b' ') 123 | ba.extend(str(total_recv_calls).encode()) # num of pkts received, valid for udp only 124 | ba.extend(b' c ') 125 | 126 | control_conn.send_bytes(ba) 127 | 128 | interval_bytes_received = 0 129 | interval_pkts_received = 0 130 | 131 | interval_start_time = curr_time_sec 132 | interval_end_time = interval_start_time + const.SAMPLE_INTERVAL_SEC 133 | 134 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 135 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 136 | 137 | 138 | # peer disconnected (or an error) 139 | util.done_with_socket(data_sock) 140 | control_conn.close() 141 | 142 | if args.verbosity: 143 | print("exiting data receiver process", flush=True) 144 | -------------------------------------------------------------------------------- /src/bbperf/util.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import sys 5 | import socket 6 | 7 | from . import const 8 | 9 | 10 | def validate_and_finalize_args(args): 11 | if args.server and args.client: 12 | raise Exception("ERROR: cannot be both client and server") 13 | 14 | if (not args.server) and (not args.client): 15 | raise Exception("ERROR: must be either a client or a server") 16 | 17 | if args.port > 65535: 18 | raise Exception("ERROR: invalid server port {}".format(args.port)) 19 | 20 | if args.verbosity and args.quiet: 21 | raise Exception("ERROR: cannot specify both verbosity and quiet") 22 | 23 | if args.congestion not in [ "cubic", "bbr"]: 24 | raise Exception("ERROR: congestion control algorithm must be either cubic or bbr, but found {}".format(args.congestion)) 25 | 26 | if args.graph_file and (not args.graph_file.endswith(".png")): 27 | raise Exception("ERROR: argument --graph-file must end with \".png\"") 28 | 29 | # set max_run_time_failsafe_sec 30 | # never run longer than this under any circumstances 31 | max_run_time_failsafe_sec = const.MAX_DURATION_CALIBRATION_TIME_SEC 32 | 33 | if args.udp: 34 | max_run_time_failsafe_sec += const.DATA_SAMPLE_IGNORE_TIME_UDP_MAX_SEC 35 | else: 36 | max_run_time_failsafe_sec += const.DATA_SAMPLE_IGNORE_TIME_TCP_MAX_SEC 37 | 38 | max_run_time_failsafe_sec += const.MAX_DATA_COLLECTION_TIME_WITHOUT_VALID_DATA 39 | max_run_time_failsafe_sec += args.time 40 | 41 | d = vars(args) 42 | d["max_run_time_failsafe_sec"] = max_run_time_failsafe_sec 43 | 44 | 45 | def convert_udp_pps_to_batch_size(packets_per_sec): 46 | 47 | batch_size = int(packets_per_sec / const.UDP_BATCHES_PER_SECOND) 48 | 49 | if batch_size < 1: 50 | batch_size = 1 51 | 52 | return batch_size 53 | 54 | 55 | def done_with_socket(mysock): 56 | try: 57 | mysock.shutdown(socket.SHUT_RDWR) 58 | except: 59 | pass 60 | 61 | try: 62 | mysock.close() 63 | except: 64 | pass 65 | 66 | 67 | def threads_are_running(thread_list): 68 | any_running = False 69 | 70 | for t in thread_list: 71 | if t.is_alive(): 72 | any_running = True 73 | else: 74 | if t.exitcode != 0: 75 | # thread existing abnormally -- kill everything and go home 76 | raise Exception("FATAL: one of the subprocesses existed abnormally, name: {}, exitcode: {}".format(t.name, t.exitcode)) 77 | 78 | return any_running 79 | 80 | 81 | def parse_r_record(args, s1): 82 | r_record = {} 83 | 84 | swords = s1.split() 85 | 86 | # literal "a" 87 | r_record["r_record_type"] = swords[1] 88 | r_record["r_pkt_sent_time_sec"] = float(swords[2]) 89 | r_record["r_sender_interval_duration_sec"] = float(swords[3]) 90 | r_record["r_sender_interval_pkts_sent"] = int(swords[4]) # valid for udp only 91 | r_record["r_sender_interval_bytes_sent"] = int(swords[5]) 92 | r_record["r_sender_total_pkts_sent"] = int(swords[6]) # valid for udp only 93 | # literal "b" 94 | r_record["r_receiver_interval_duration_sec"] = float(swords[8]) 95 | r_record["r_receiver_interval_pkts_received"] = int(swords[9]) # valid for udp only 96 | r_record["r_receiver_interval_bytes_received"] = int(swords[10]) 97 | r_record["r_receiver_total_pkts_received"] = int(swords[11]) # valid for udp only 98 | # literal "c" 99 | r_record["r_pkt_received_time_sec"] = float(swords[13]) 100 | r_record["interval_dropped"] = int(swords[14]) 101 | r_record["interval_dropped_percent"] = float(swords[15]) 102 | r_record["is_sample_valid"] = int(swords[16]) 103 | # literal "d" 104 | 105 | r_record["rtt_sec"] = r_record["r_pkt_received_time_sec"] - r_record["r_pkt_sent_time_sec"] 106 | r_record["rtt_ms"] = r_record["rtt_sec"] * 1000 107 | 108 | try: 109 | # first record received has zeros 110 | sender_interval_rate_bps = (r_record["r_sender_interval_bytes_sent"] * 8.0) / r_record["r_sender_interval_duration_sec"] 111 | except ZeroDivisionError: 112 | sender_interval_rate_bps = 0 113 | 114 | r_record["sender_interval_rate_mbps"] = sender_interval_rate_bps / (10 ** 6) 115 | 116 | r_record["receiver_interval_rate_bytes_per_sec"] = r_record["r_receiver_interval_bytes_received"] / r_record["r_receiver_interval_duration_sec"] 117 | 118 | receiver_interval_rate_bps = r_record["receiver_interval_rate_bytes_per_sec"] * 8 119 | r_record["receiver_interval_rate_mbps"] = receiver_interval_rate_bps / (10 ** 6) 120 | 121 | r_record["buffered_bytes"] = int( r_record["receiver_interval_rate_bytes_per_sec"] * r_record["rtt_sec"] ) 122 | 123 | if args.udp: 124 | try: 125 | # first record received has zeroes 126 | r_record["sender_pps"] = int(r_record["r_sender_interval_pkts_sent"] / r_record["r_sender_interval_duration_sec"]) 127 | except ZeroDivisionError: 128 | r_record["sender_pps"] = 0 129 | 130 | r_record["receiver_pps"] = int(r_record["r_receiver_interval_pkts_received"] / r_record["r_receiver_interval_duration_sec"]) 131 | r_record["total_dropped"] = r_record["r_sender_total_pkts_sent"] - r_record["r_receiver_total_pkts_received"] 132 | if r_record["total_dropped"] < 0: 133 | # this can happen if we happen to pick up an "early" a_b block (probably just negative by 1 or 2, not a big deal) 134 | r_record["total_dropped"] = 0 135 | 136 | else: 137 | r_record["sender_pps"] = -1 138 | r_record["receiver_pps"] = -1 139 | r_record["total_dropped"] = -1 140 | 141 | return r_record 142 | 143 | 144 | def validate_data_connection(args, run_id, data_connection_initial_str): 145 | w = data_connection_initial_str.split(" ") 146 | 147 | if w[0] != "data": 148 | raise Exception("ERROR: data connection invalid (does not start with data) run_id {} received {}".format( 149 | run_id, data_connection_initial_str)) 150 | 151 | data_connection_run_id = w[1] 152 | 153 | if data_connection_run_id != run_id: 154 | raise Exception("ERROR: data connection invalid (run_id incorrect) control run_id {} data run_id {} received {}".format( 155 | run_id, data_connection_run_id, data_connection_initial_str)) 156 | 157 | if args.verbosity: 158 | print("data run_id is valid", flush=True) 159 | -------------------------------------------------------------------------------- /src/bbperf/data_sender_thread.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | import socket 6 | import select 7 | 8 | from . import util 9 | from . import const 10 | from . import udp_helper 11 | 12 | 13 | # falling off the end of this method terminates the process 14 | def run(args, data_sock, peer_addr, shared_run_mode, shared_udp_sending_rate_pps): 15 | if args.verbosity: 16 | print("data sender: start of process", flush=True) 17 | 18 | # udp autorate 19 | if args.udp: 20 | udp_pps = shared_udp_sending_rate_pps.value 21 | udp_batch_size = util.convert_udp_pps_to_batch_size(udp_pps) 22 | 23 | # start sending 24 | 25 | if args.verbosity: 26 | print("data sender: sending", flush=True) 27 | 28 | start_time_sec = time.time() 29 | calibration_start_time = start_time_sec 30 | curr_time_sec = start_time_sec 31 | 32 | interval_start_time = curr_time_sec 33 | interval_end_time = interval_start_time + const.SAMPLE_INTERVAL_SEC 34 | 35 | interval_time_sec = 0.0 36 | interval_send_count = 0 37 | interval_bytes_sent = 0 38 | 39 | accum_send_count = 0 40 | accum_bytes_sent = 0 41 | 42 | total_send_counter = 1 43 | num_negative_delay = 0 44 | 45 | while True: 46 | curr_time_sec = time.time() 47 | 48 | if (shared_run_mode.value == const.RUN_MODE_CALIBRATING): 49 | # double the limit to avoid a race condition with the run mode manager 50 | if curr_time_sec > (calibration_start_time + (2 * const.MAX_DURATION_CALIBRATION_TIME_SEC)): 51 | error_msg = "FATAL: data_sender_thread: time in calibration exceeded max allowed" 52 | print(error_msg, flush=True) 53 | raise Exception(error_msg) 54 | 55 | is_calibrated = False 56 | else: 57 | is_calibrated = True 58 | 59 | record_type = b'run' if is_calibrated else b'cal' 60 | 61 | # we want to be fast here, since this is data write loop, so use ba.extend 62 | 63 | ba = bytearray(b' a ' + 64 | record_type + b' ' + 65 | str(curr_time_sec).encode() + b' ' + 66 | str(interval_time_sec).encode() + b' ' + 67 | str(interval_send_count).encode() + b' ' + 68 | str(interval_bytes_sent).encode() + b' ' + 69 | str(total_send_counter).encode() + b' b ') 70 | 71 | if args.udp: 72 | ba.extend(const.PAYLOAD_1K) 73 | elif is_calibrated: 74 | ba.extend(const.PAYLOAD_128K) 75 | else: 76 | ba.extend(const.PAYLOAD_1K) 77 | 78 | # send an entire batch 79 | 80 | batch_size = udp_batch_size if args.udp else 1 81 | 82 | try: 83 | batch_counter = 0 84 | 85 | while batch_counter < batch_size: 86 | # blocking 87 | # we want to block here, as blocked time should "count" 88 | 89 | if args.udp: 90 | num_bytes_sent = data_sock.sendto(ba, peer_addr) 91 | else: 92 | # tcp 93 | # we use select to take advantage of tcp_notsent_lowat 94 | # timeout is 20 seconds 95 | _, _, _ = select.select( [], [data_sock], [], 20.0) 96 | num_bytes_sent = data_sock.send(ba) 97 | 98 | if num_bytes_sent <= 0: 99 | raise Exception("ERROR: data_sender_thread.run(): send failed") 100 | 101 | batch_counter += 1 102 | total_send_counter += 1 103 | accum_send_count += 1 104 | accum_bytes_sent += num_bytes_sent 105 | 106 | except ConnectionResetError: 107 | print("Connection reset by peer", flush=True) 108 | # exit process 109 | break 110 | 111 | except BrokenPipeError: 112 | # this can happen at the end of a tcp reverse test 113 | print("broken pipe error", flush=True) 114 | # exit process 115 | break 116 | 117 | except BlockingIOError: 118 | # same as EAGAIN EWOULDBLOCK 119 | # we did not send, loop back up and try again 120 | continue 121 | 122 | except socket.timeout: 123 | error_msg = "FATAL: data_sender_thread: socket timeout" 124 | print(error_msg, flush=True) 125 | raise Exception(error_msg) 126 | 127 | # end of batch loop 128 | 129 | curr_time_sec = time.time() 130 | 131 | if curr_time_sec > interval_end_time: 132 | interval_time_sec = curr_time_sec - interval_start_time 133 | interval_send_count = accum_send_count 134 | interval_bytes_sent = accum_bytes_sent 135 | 136 | interval_start_time = curr_time_sec 137 | interval_end_time = interval_start_time + const.SAMPLE_INTERVAL_SEC 138 | accum_send_count = 0 139 | accum_bytes_sent = 0 140 | 141 | # update udp autorate 142 | if args.udp: 143 | udp_pps = shared_udp_sending_rate_pps.value 144 | udp_batch_size = util.convert_udp_pps_to_batch_size(udp_pps) 145 | 146 | # send very slowly at first to establish unloaded latency 147 | if not is_calibrated: 148 | time.sleep(0.2) 149 | if args.udp: 150 | # initialize udp batch start time here in case next loop is batch processing 151 | current_udp_batch_start_time = time.time() 152 | continue 153 | 154 | # normal end of test 155 | if shared_run_mode.value == const.RUN_MODE_STOP: 156 | break 157 | 158 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 159 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 160 | 161 | # pause between udp batches if necessary 162 | if args.udp: 163 | delay_sec = const.UDP_DELAY_BETWEEN_BATCH_STARTS - (curr_time_sec - current_udp_batch_start_time) 164 | if delay_sec < 0: 165 | num_negative_delay += 1 166 | if (num_negative_delay % const.UDP_NEGATIVE_DELAY_BETWEEN_BATCHES_WARNING_EVERY) == 0: 167 | print("WARNING: udp sender is cpu constrained, results may be invalid: {}".format(num_negative_delay), flush=True) 168 | elif delay_sec > 0: 169 | time.sleep(delay_sec) 170 | current_udp_batch_start_time += const.UDP_DELAY_BETWEEN_BATCH_STARTS 171 | 172 | 173 | # send STOP message 174 | if args.udp: 175 | if args.verbosity: 176 | print("data sender: sending udp stop message", flush=True) 177 | udp_helper.send_stop_message(data_sock, peer_addr) 178 | 179 | util.done_with_socket(data_sock) 180 | 181 | if args.verbosity: 182 | print("data sender: end of process", flush=True) 183 | -------------------------------------------------------------------------------- /src/bbperf/control_receiver_thread.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | 6 | from . import util 7 | from . import const 8 | 9 | from .exceptions import PeerDisconnectedException 10 | from .udp_rate_manager_class import UdpRateManagerClass 11 | from .run_mode_manager_class import RunModeManagerClass 12 | 13 | # direction up, runs on client 14 | # args are client args (not server args) 15 | # falling off the end of this method terminates the process 16 | def run_recv_term_queue(readyevent, args, control_conn, results_queue, shared_run_mode, shared_udp_sending_rate_pps): 17 | if args.verbosity: 18 | print("starting control receiver process: run_recv_term_queue", flush=True) 19 | 20 | run_mode_manager = RunModeManagerClass(args, shared_run_mode) 21 | udp_rate_manager = UdpRateManagerClass(args, shared_udp_sending_rate_pps) 22 | 23 | readyevent.set() 24 | 25 | start_time_sec = time.time() 26 | 27 | while True: 28 | 29 | try: 30 | bytes_read = control_conn.recv_a_c_block() 31 | 32 | except ConnectionResetError: 33 | if args.verbosity: 34 | print("connection reset error", flush=True) 35 | # exit process 36 | break 37 | 38 | except PeerDisconnectedException: 39 | if args.verbosity: 40 | print("peer disconnected (control socket)", flush=True) 41 | # exit process 42 | break 43 | 44 | curr_time_sec = time.time() 45 | curr_time_str = str(curr_time_sec) 46 | 47 | received_str = bytes_read.decode() 48 | 49 | # the zeroes will be updated below 50 | tmp_str = received_str + curr_time_str + " 0 0 0 d " 51 | 52 | r_record = util.parse_r_record(args, tmp_str) 53 | 54 | # updates shared_run_mode 55 | # r_record["interval_dropped"] 56 | # r_record["interval_dropped_percent"] 57 | # r_record["is_sample_valid"] 58 | run_mode_manager.update(r_record) 59 | 60 | if args.udp: 61 | udp_rate_manager.update(r_record) 62 | 63 | new_str = (received_str + curr_time_str + " " + 64 | str(r_record["interval_dropped"]) + " " + 65 | str(r_record["interval_dropped_percent"]) + " " + 66 | str(r_record["is_sample_valid"]) + " d ") 67 | 68 | results_queue.put(new_str) 69 | 70 | if args.verbosity > 3: 71 | print("control receiver process: {}".format(new_str), flush=True) 72 | 73 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 74 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 75 | 76 | control_conn.close() 77 | 78 | if args.verbosity: 79 | print("exiting control receiver process: run_recv_term_queue", flush=True) 80 | 81 | 82 | # direction down, runs on server 83 | # args are client args (not server args) 84 | # falling off the end of this method terminates the process 85 | def run_recv_term_send(readyevent, args, control_conn, shared_run_mode, shared_udp_sending_rate_pps): 86 | if args.verbosity: 87 | print("starting control receiver process: run_recv_term_send", flush=True) 88 | 89 | run_mode_manager = RunModeManagerClass(args, shared_run_mode) 90 | udp_rate_manager = UdpRateManagerClass(args, shared_udp_sending_rate_pps) 91 | 92 | readyevent.set() 93 | 94 | start_time_sec = time.time() 95 | 96 | while True: 97 | 98 | try: 99 | bytes_read = control_conn.recv_a_c_block() 100 | 101 | except ConnectionResetError: 102 | if args.verbosity: 103 | print("connection reset error", flush=True) 104 | # exit process 105 | break 106 | 107 | except PeerDisconnectedException: 108 | if args.verbosity: 109 | print("peer disconnected (control socket)", flush=True) 110 | # exit process 111 | break 112 | 113 | curr_time_sec = time.time() 114 | curr_time_str = str(curr_time_sec) 115 | 116 | received_str = bytes_read.decode() 117 | 118 | # the zeroes will be updated below 119 | tmp_str = received_str + curr_time_str + " 0 0 0 d " 120 | 121 | r_record = util.parse_r_record(args, tmp_str) 122 | 123 | # updates shared_run_mode 124 | # r_record["interval_dropped"] 125 | # r_record["interval_dropped_percent"] 126 | # r_record["is_sample_valid"] 127 | run_mode_manager.update(r_record) 128 | 129 | if args.udp: 130 | udp_rate_manager.update(r_record) 131 | 132 | new_str = (received_str + curr_time_str + " " + 133 | str(r_record["interval_dropped"]) + " " + 134 | str(r_record["interval_dropped_percent"]) + " " + 135 | str(r_record["is_sample_valid"]) + " d ") 136 | 137 | control_conn.send_string(new_str) 138 | 139 | if args.verbosity > 3: 140 | print("control receiver process: {}".format(new_str), flush=True) 141 | 142 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 143 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 144 | 145 | control_conn.close() 146 | 147 | if args.verbosity: 148 | print("exiting control receiver process: run_recv_term_send", flush=True) 149 | 150 | 151 | # direction down, runs on client (passthru) 152 | # args are client args (not server args) -- this always runs on client 153 | # falling off the end of this method terminates the process 154 | def run_recv_queue(readyevent, args, control_conn, results_queue): 155 | if args.verbosity: 156 | print("starting control receiver process: run_recv_queue", flush=True) 157 | 158 | readyevent.set() 159 | 160 | start_time_sec = time.time() 161 | 162 | while True: 163 | try: 164 | bytes_read = control_conn.recv_a_d_block() 165 | 166 | except ConnectionResetError: 167 | if args.verbosity: 168 | print("connection reset error", flush=True) 169 | # exit process 170 | break 171 | 172 | except PeerDisconnectedException: 173 | if args.verbosity: 174 | print("peer disconnected (control socket)", flush=True) 175 | # exit process 176 | break 177 | 178 | curr_time_sec = time.time() 179 | 180 | received_str = bytes_read.decode() 181 | 182 | # passthru as is 183 | results_queue.put(received_str) 184 | 185 | if args.verbosity > 3: 186 | print("control receiver process: {}".format(received_str), flush=True) 187 | 188 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 189 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 190 | 191 | control_conn.close() 192 | 193 | if args.verbosity: 194 | print("exiting control receiver process: run_recv_queue", flush=True) 195 | -------------------------------------------------------------------------------- /src/bbperf/output.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import os 5 | import time 6 | import tempfile 7 | 8 | from . import const 9 | from . import util 10 | 11 | from .json_output_class import JsonOutputClass 12 | 13 | 14 | args = None 15 | tmpfile1 = None 16 | tmpfile2 = None 17 | last_line_to_stdout_time = 0 18 | print_header1 = True 19 | print_header2 = True 20 | print_header3 = True 21 | relative_start_time_sec = None 22 | json_output = None 23 | unloaded_latency_rtt_ms = None 24 | 25 | 26 | def init(args0): 27 | global args 28 | global tmpfile1 29 | global tmpfile2 30 | global json_output 31 | 32 | args = args0 33 | 34 | # create and open file 35 | 36 | if args.udp: 37 | tmp_graph_filename_prefix = "bbperf-graph-data-udp-" 38 | tmp_raw_filename_prefix = "bbperf-raw-data-udp-" 39 | else: 40 | tmp_graph_filename_prefix = "bbperf-graph-data-tcp-" 41 | tmp_raw_filename_prefix = "bbperf-raw-data-tcp-" 42 | 43 | tmpfile1 = tempfile.NamedTemporaryFile(prefix=tmp_graph_filename_prefix, delete=False) 44 | tmpfile2 = tempfile.NamedTemporaryFile(prefix=tmp_raw_filename_prefix, delete=False) 45 | 46 | json_output = JsonOutputClass(args) 47 | 48 | 49 | def get_graph_data_file_name(): 50 | return tmpfile1.name 51 | 52 | def get_raw_data_file_name(): 53 | return tmpfile2.name 54 | 55 | def term(): 56 | tmpfile1.close() 57 | tmpfile2.close() 58 | 59 | json_output.write_output() 60 | 61 | 62 | def delete_data_files(): 63 | if args.verbosity: 64 | print("deleting graph data file: {}".format(tmpfile1.name), flush=True) 65 | print("deleting raw data file: {}".format(tmpfile2.name), flush=True) 66 | 67 | os.remove(tmpfile1.name) 68 | os.remove(tmpfile2.name) 69 | 70 | 71 | def write_data_to_file(lineout, fileout): 72 | lineout_bytes = "{}\n".format(lineout).encode() 73 | fileout.file.write(lineout_bytes) 74 | 75 | def write_raw_data_to_file(lineout): 76 | write_data_to_file(lineout, tmpfile2) 77 | 78 | def write_graph_data_to_file(lineout): 79 | write_data_to_file(lineout, tmpfile1) 80 | 81 | 82 | # keep in mind here that the interval data is coming in at a faster 83 | # rate than what we want to (normally) display on stdout 84 | 85 | def print_output(s1): 86 | global args 87 | global last_line_to_stdout_time 88 | global print_header1 89 | global print_header2 90 | global print_header3 91 | global relative_start_time_sec 92 | global unloaded_latency_rtt_ms 93 | 94 | write_raw_data_to_file(s1) 95 | 96 | curr_time = time.time() 97 | 98 | r_record = util.parse_r_record(args, s1) 99 | 100 | if relative_start_time_sec is None: 101 | # first incoming result has arrived 102 | relative_start_time_sec = r_record["r_pkt_sent_time_sec"] 103 | relative_pkt_sent_time_sec = 0 104 | relative_pkt_received_time_sec = 0 105 | else: 106 | relative_pkt_sent_time_sec = r_record["r_pkt_sent_time_sec"] - relative_start_time_sec 107 | relative_pkt_received_time_sec = r_record["r_pkt_received_time_sec"] - relative_start_time_sec 108 | 109 | if r_record["r_record_type"] == "run": 110 | json_output.set_unloaded_rtt_ms(unloaded_latency_rtt_ms) 111 | 112 | bdp_bytes = int( r_record["receiver_interval_rate_bytes_per_sec"] * (unloaded_latency_rtt_ms / 1000.0) ) 113 | 114 | if bdp_bytes > 0: 115 | bloat_factor = float(r_record["buffered_bytes"]) / bdp_bytes 116 | else: 117 | bloat_factor = 0 118 | 119 | if print_header3: 120 | lineout = "sent_time recv_time sender_pps sender_Mbps receiver_pps receiver_Mbps unloaded_rtt_ms rtt_ms BDP_bytes buffered_bytes bloat_factor pkts_dropped pkts_dropped_percent" 121 | write_graph_data_to_file(lineout) 122 | print_header3 = False 123 | 124 | # write to file the same data and same rate as what we are receiving over the control connection 125 | lineout = "{} {} {} {} {} {} {} {} {} {} {} {} {}".format( 126 | relative_pkt_sent_time_sec, 127 | relative_pkt_received_time_sec, 128 | r_record["sender_pps"], 129 | r_record["sender_interval_rate_mbps"], 130 | r_record["receiver_pps"], 131 | r_record["receiver_interval_rate_mbps"], 132 | unloaded_latency_rtt_ms, 133 | r_record["rtt_ms"], 134 | bdp_bytes, 135 | r_record["buffered_bytes"], 136 | bloat_factor, 137 | r_record["interval_dropped"], 138 | r_record["interval_dropped_percent"] 139 | ) 140 | 141 | write_graph_data_to_file(lineout) 142 | 143 | # add to JSON output 144 | excess = r_record["buffered_bytes"] - bdp_bytes 145 | if excess < 0: 146 | excess = 0 147 | new_entry = { 148 | "sent_time_sec": r_record["r_pkt_sent_time_sec"], 149 | "received_time_sec": r_record["r_pkt_received_time_sec"], 150 | "loaded_rtt_ms": r_record["rtt_ms"], 151 | "sender_throughput_rate_mbps": r_record["sender_interval_rate_mbps"], 152 | "receiver_throughput_rate_mbps": r_record["receiver_interval_rate_mbps"], 153 | "bdp_bytes": bdp_bytes, 154 | "excess_buffered_bytes": excess, 155 | "receiver_pps": r_record["receiver_pps"], 156 | "pkt_loss_percent": r_record["interval_dropped_percent"], 157 | "is_sample_valid": r_record["is_sample_valid"] 158 | } 159 | json_output.add_entry(new_entry) 160 | 161 | # write to stdout at the rate of one line per second 162 | # each stdout line will be a 0.1s snapshot 163 | if ((curr_time > (last_line_to_stdout_time + const.STDOUT_INTERVAL_SEC)) and not args.quiet) or args.verbosity > 2: 164 | if print_header2: 165 | print(" sent_time recv_time sender_Mbps receiver_Mbps sender_pps receiver_pps unloaded_rtt_ms rtt_ms BDP_bytes buffered_bytes bloat pkts_dropped drop%", flush=True) 166 | print_header2 = False 167 | 168 | if r_record["interval_dropped_percent"] < 0: 169 | dropped_percent_str = " n/a" 170 | else: 171 | dropped_percent_str = "{:6.3f}%".format(r_record["interval_dropped_percent"]) 172 | 173 | print("{:11.6f} {:11.6f} {:11.3f} {:11.3f} {:8d} {:8d} {:8.3f} {:9.3f} {:9d} {:9d} {:6.1f}x {:6d} {}".format( 174 | relative_pkt_sent_time_sec, 175 | relative_pkt_received_time_sec, 176 | r_record["sender_interval_rate_mbps"], 177 | r_record["receiver_interval_rate_mbps"], 178 | r_record["sender_pps"], 179 | r_record["receiver_pps"], 180 | unloaded_latency_rtt_ms, 181 | r_record["rtt_ms"], 182 | bdp_bytes, 183 | r_record["buffered_bytes"], 184 | bloat_factor, 185 | r_record["interval_dropped"], 186 | dropped_percent_str 187 | ), 188 | flush=True) 189 | 190 | last_line_to_stdout_time = curr_time 191 | 192 | else: 193 | # calibrating 194 | # do we have a new unloaded latency? 195 | if (unloaded_latency_rtt_ms is None) or (r_record["rtt_ms"] < unloaded_latency_rtt_ms): 196 | unloaded_latency_rtt_ms = r_record["rtt_ms"] 197 | 198 | if ((curr_time > (last_line_to_stdout_time + const.STDOUT_INTERVAL_SEC)) and not args.quiet) or args.verbosity > 2: 199 | if print_header1: 200 | print("calibrating", flush=True) 201 | print(" sent_time recv_time rtt_ms", flush=True) 202 | print_header1 = False 203 | 204 | print("{:11.6f} {:11.6f} {:11.6f}".format( 205 | relative_pkt_sent_time_sec, 206 | relative_pkt_received_time_sec, 207 | unloaded_latency_rtt_ms 208 | ), 209 | flush=True) 210 | 211 | last_line_to_stdout_time = curr_time 212 | -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- 1 |

bbperf - An end-to-end performance and bufferbloat measurement tool

2 | 3 | `bbperf` measures what matters most. 4 | 5 | Traditional network performance measurement tools collect metrics such as latency and throughput regardless of the conditions that exist during the collection period. While valuable for many uses, that approach can miss reporting the actual performance that real user payloads experience on production networks. This tool only reports performance metrics when the flow is operating at "max buffer usage". Max buffer usage is when the active flow has filled any and all buffers that exist along the packet path between the endpoints. 6 | 7 | User payload is used to measure latency and throughput. This accounts for the performance impact of transparent proxies, transparent tunnels, transparent firewalls, and all the other things that are not visible to the endpoints. It also simplifies the interpretation of retransmissions on user performance, which is non-intuitive at best. This is because some retransmissions are due to the real loss of user payload while many are not. In this tool, the loss of user payload will show up in the latency and throughput metrics, i.e. higher latencies and lower throughput. 8 | 9 | Features: 10 | 11 | * Latency, both unloaded and loaded, is measured by the same flow that is under test. 12 | 13 | Other tools will commonly measure latency using a different flow or different protocol. One of the reasons why using different protocols and/or different flows is not desirable is because fair queuing will cause the latency of those other flows to be much lower (better) than the flow that matters. 14 | 15 | * Throughput 16 | 17 | Both sender and receiver rates are collected, but the receiver rate (a.k.a. goodput) is the important one. 18 | 19 | * Bufferbloat is calculated 20 | 21 | It is often assumed that TCP receive buffers are the only source of bufferbloat. While that is common, it misses many other locations where bufferbloat may occur. This tool reports the effects of all sources of bufferbloat, not just TCP receive buffers. 22 | 23 | `bbperf` calculates both the BDP (bandwidth delay product) and the total amount of buffer actually used. The difference between those two is reported as "excess buffer usage". A small number for this metric is normal and expected, but a large number, relative to BDP, is bufferbloat. Bufferbloat also appears as a large difference between unloaded and loaded latency. 24 | 25 | * Both TCP and UDP are supported 26 | 27 | Both benchmark tests will wait until it has reached "max buffer usage" before collecting metrics data. For TCP, it will wait for the sending and receiving rates to match. For UDP, the sending rate will be automatically adjusted to be just above the maximum packet rate without dropping packets before starting its metrics collection. 28 | 29 | * `bbperf` measures the performance of data flow in one direction only. 30 | 31 | Network routing can be asymmetric, bottleneck links are asymmetric, bufferbloat is asymmetric, all of which means that performance is asymmetric. `bbperf` allows us to see the asymmetry. 32 | 33 | Data flow in `bbperf` is one way. The direction of data flow is from the client host to the server host (unless the `-R` option is specified). That is the direction being measured, and is what is reported in the metrics. 34 | 35 | Latency is measured round trip, but the return traffic (from the data receiver back to the data sender) is low-volume and should not contribute any bufferbloat-related latency to the measurement. This cannot be guaranteed, in the same way that it cannot be guaranteed that the unloaded latency measurement does not contain any bufferbloat-induced latency. But it does ensure that no bufferbloat-induced latency is cause by `bbperf`s own flow. 36 | 37 | * Automatic generation of graphs 38 | 39 | ### Usage 40 | 41 | To run a test: 42 | 43 | 1. Start the server on one host 44 | ``` 45 | $ bbperf.py -s 46 | ``` 47 | 48 | 2. Run the client on another host 49 | ``` 50 | $ bbperf.py -c [additional options as desired] 51 | ``` 52 | 53 | `bbperf` will use port 5301 between the client and server (by default). 54 | 55 | The first few seconds performs a calibration, during which it captures the unloaded latency between endpoints. 56 | 57 | The direction of data flow is from the client to the server. That is reversed when the "-R" option is specified. 58 | 59 | The duration of this tool is non-deterministic. The time option (`-t`/`--time`) specifies how long to run _after_ valid data samples are observed. `bbperf` will automatically detect when it has enough data samples for the calibration, which establishes the unloaded latency value. It will also not collect data samples during inital ramp up of the flow. 60 | 61 | Should `bbperf` not detect any valid data samples for 60 seconds after calibration is complete, the tool will exit without results. An example of when that might happen is if the sending host is cpu constrained such that no bottleneck is created on the network. 62 | 63 | ``` 64 | $ bbperf.py --help 65 | usage: bbperf.py [-h] [-s] [-c SERVER_ADDR] [-p SERVER_PORT] [-u] [-R] [--max-ramp-time SECONDS] [-t SECONDS] [-v] [-q] [-J JSON_FILE] [-g] 66 | [--graph-file GRAPH_FILE] [-k] [-B BIND_ADDR] [--local-data-port LOCAL_DATA_PORT] [-C CC_ALGORITHM] 67 | 68 | bbperf: end to end performance and bufferbloat measurement tool 69 | 70 | options: 71 | -h, --help show this help message and exit 72 | -s, --server run in server mode 73 | -c SERVER_ADDR, --client SERVER_ADDR 74 | run in client mode (specify either DNS name or IP address) 75 | -p SERVER_PORT, --port SERVER_PORT 76 | server port (default: 5301) 77 | -u, --udp run in UDP mode (default: TCP mode) 78 | -R, --reverse data flow in download direction (server to client) 79 | --max-ramp-time SECONDS 80 | max duration in seconds before collecting data samples (tcp default: 5, udp default: 10) 81 | -t SECONDS, --time SECONDS 82 | duration in seconds to collect valid data samples (default: 20) 83 | -v, --verbosity increase output verbosity (can be repeated) 84 | -q, --quiet decrease output verbosity (can be repeated) 85 | -J JSON_FILE, --json-file JSON_FILE 86 | JSON output file 87 | -g, --graph generate graph and save in tmp file (requires gnuplot) 88 | --graph-file GRAPH_FILE 89 | generate graph and save in the specified file (requires gnuplot) 90 | -k, --keep keep data file 91 | -B BIND_ADDR, --bind BIND_ADDR 92 | bind server sockets to address 93 | --local-data-port LOCAL_DATA_PORT 94 | local port for data connection (default: ephemeral) 95 | -C CC_ALGORITHM, --congestion CC_ALGORITHM 96 | congestion control algorithm (default: cubic) 97 | ``` 98 | 99 | Output from `bbperf` includes the following information: 100 | ``` 101 | sent_time time when a packet was sent 102 | recv_time time when a packet was received 103 | sender_pps packets per second sent 104 | sender_Mbps bits per second sent 105 | receiver_pps packets per second received 106 | receiver_Mbps bits per second received 107 | unloaded_rtt_ms unloaded RTT in milliseconds (determined during calibration) 108 | rtt_ms RTT in milliseconds 109 | BDP_bytes Calculated BDP in bytes 110 | buffered_bytes Actual bytes in flight 111 | bloat Ratio of buffered bytes to BDP 112 | pkts_dropped number of packets dropped (UDP only) 113 | drop% percentage of packets dropped (UDP only) 114 | ``` 115 | 116 | Output to standard out is controlled via the `--verbosity` and `--quiet` options as follows: 117 | ``` 118 | -qq nothing to stdout except errors 119 | -q run summary in json format only (no interval output) 120 | (neither option) progress update once per second plus run summary in json format (default) 121 | -v plus one-time messages showing progress setting up and running the test 122 | -vv plus rate change events (udp only) 123 | -vvv plus interval output at the rate of one per 0.1 seconds 124 | -vvvv plus all control connection messages 125 | ``` 126 | 127 | ### Installation 128 | 129 | `bbperf` is available via PyPI repository (pypi.org) and can be installed using pip. 130 | 131 | ``` 132 | python3 -m venv bbperf-venv 133 | . bbperf-venv/bin/active 134 | pip install bbperf 135 | 136 | bbperf.py [options] 137 | ``` 138 | 139 | In the event python3 is not already installed on the host: 140 | 141 | ``` 142 | apt-get install python3 python3-pip (Debian/Ubuntu) 143 | dnf install python3 python3-pip (Fedora/RHEL) 144 | ``` 145 | 146 | --- 147 | Copyright (c) 2024 Cloudflare, Inc.
148 | Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 149 | 150 | -------------------------------------------------------------------------------- /src/bbperf/server.py: -------------------------------------------------------------------------------- 1 | #!/usr/bin/python3 2 | 3 | # Copyright (c) 2024 Cloudflare, Inc. 4 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 5 | 6 | import time 7 | import socket 8 | import multiprocessing 9 | 10 | from . import data_sender_thread 11 | from . import data_receiver_thread 12 | from . import control_receiver_thread 13 | from . import udp_string_sender_thread 14 | from . import util 15 | from . import const 16 | from . import tcp_helper 17 | 18 | from .tcp_control_connection_class import TcpControlConnectionClass 19 | 20 | 21 | def server_mainline(args): 22 | listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 23 | 24 | listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 25 | 26 | server_addr = (args.bind, args.port) 27 | 28 | print("binding tcp control socket to local address {}".format(server_addr), flush=True) 29 | listen_sock.bind(server_addr) 30 | 31 | listen_sock.listen(32) # listen backlog 32 | listen_sock.setblocking(True) 33 | 34 | server_port = listen_sock.getsockname()[1] 35 | 36 | while True: 37 | print("server listening on port {}".format(server_port), flush=True) 38 | 39 | # accept control connection 40 | 41 | # blocking 42 | control_sock, _ = listen_sock.accept() 43 | 44 | client_control_addr = control_sock.getpeername() 45 | 46 | print("client connected (control socket): client addr {}, server addr {}".format( 47 | client_control_addr, server_addr), flush=True) 48 | 49 | control_conn = TcpControlConnectionClass(control_sock) 50 | control_conn.set_args(args) 51 | 52 | curr_client_start_time = time.time() 53 | 54 | run_id = control_conn.wait_for_control_initial_string() 55 | 56 | control_conn.send_control_initial_ack() 57 | 58 | client_args = control_conn.wait_for_args_from_client() 59 | 60 | control_conn.send_control_args_ack() 61 | 62 | control_conn.set_args(client_args) 63 | 64 | # accept data connection 65 | 66 | # "data " + uuid of 36 characters 67 | len_data_connection_initial_string = 5 + 36 68 | 69 | if client_args.udp: 70 | # data connection is udp 71 | if client_args.verbosity: 72 | print("creating udp data connection", flush=True) 73 | 74 | # unconnected socket to catch just the first packet 75 | # we need to do it this way so we can figure out the client addr for our connected socket 76 | data_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 77 | 78 | print("binding udp data socket to local address {}".format(server_addr), flush=True) 79 | data_sock.bind(server_addr) 80 | data_sock.settimeout(const.SOCKET_TIMEOUT_SEC) 81 | if client_args.verbosity: 82 | print("created udp data connection, no client addr, server addr {}".format(server_addr), flush=True) 83 | 84 | if client_args.verbosity: 85 | print("waiting to receive data initial string", flush=True) 86 | 87 | payload_bytes, client_data_addr = data_sock.recvfrom(len_data_connection_initial_string) 88 | payload_str = payload_bytes.decode() 89 | 90 | if client_args.verbosity: 91 | print("received data initial string: client data addr: {} string: {}".format(client_data_addr, payload_str), flush=True) 92 | 93 | # check run_id 94 | util.validate_data_connection(client_args, run_id, payload_str) 95 | 96 | if client_args.verbosity: 97 | print("sending data initial ack (async udp)", flush=True) 98 | 99 | # start and keep sending the data initial ack asynchronously 100 | readyevent = multiprocessing.Event() 101 | doneevent = multiprocessing.Event() 102 | udp_data_initial_ack_sender_process = multiprocessing.Process( 103 | name = "udpdatainitialacksender", 104 | target = udp_string_sender_thread.run, 105 | args = (readyevent, doneevent, client_args, data_sock, client_data_addr, const.UDP_DATA_INITIAL_ACK), 106 | daemon = True) 107 | udp_data_initial_ack_sender_process.start() 108 | if not readyevent.wait(timeout=60): 109 | raise Exception("ERROR: process failed to become ready") 110 | 111 | else: 112 | # data connection is tcp 113 | if client_args.verbosity: 114 | print("creating data connection (tcp), waiting for accept", flush=True) 115 | 116 | data_sock, _ = listen_sock.accept() 117 | data_sock.settimeout(const.SOCKET_TIMEOUT_SEC) 118 | tcp_helper.set_congestion_control(client_args, data_sock) 119 | tcp_helper.set_tcp_notsent_lowat(data_sock) 120 | client_data_addr = data_sock.getpeername() 121 | if client_args.verbosity: 122 | print("accepted tcp data connection, client {}, server {}".format( 123 | client_data_addr, server_addr), flush=True) 124 | 125 | if client_args.verbosity: 126 | print("waiting to receive data initial string", flush=True) 127 | 128 | payload_bytes = tcp_helper.recv_exact_num_bytes(data_sock, len_data_connection_initial_string) 129 | payload_str = payload_bytes.decode() 130 | 131 | if client_args.verbosity: 132 | print("received data initial string: {}".format(payload_str), flush=True) 133 | 134 | # check run_id 135 | util.validate_data_connection(client_args, run_id, payload_str) 136 | 137 | 138 | shared_run_mode = multiprocessing.Value('i', const.RUN_MODE_CALIBRATING) 139 | shared_udp_sending_rate_pps = multiprocessing.Value('i', const.UDP_DEFAULT_INITIAL_RATE) 140 | 141 | if client_args.reverse: 142 | # direction down 143 | 144 | control_conn.send_setup_complete_message() 145 | 146 | readyevent = multiprocessing.Event() 147 | 148 | control_receiver_process = multiprocessing.Process( 149 | name = "controlreceiver", 150 | target = control_receiver_thread.run_recv_term_send, 151 | args = (readyevent, client_args, control_conn, shared_run_mode, shared_udp_sending_rate_pps), 152 | daemon = True) 153 | 154 | data_sender_process = multiprocessing.Process( 155 | name = "datasender", 156 | target = data_sender_thread.run, 157 | args = (client_args, data_sock, client_data_addr, shared_run_mode, shared_udp_sending_rate_pps), 158 | daemon = True) 159 | 160 | control_conn.wait_for_start_message() 161 | 162 | if client_args.udp: 163 | # stop sending UDP data init acks 164 | if client_args.verbosity: 165 | print("stopping sending udp data initial acks to client", flush=True) 166 | doneevent.set() 167 | 168 | control_receiver_process.start() 169 | if not readyevent.wait(timeout=60): 170 | raise Exception("ERROR: process failed to become ready") 171 | 172 | data_sender_process.start() 173 | 174 | thread_list = [] 175 | thread_list.append(control_receiver_process) 176 | thread_list.append(data_sender_process) 177 | 178 | else: 179 | # direction up 180 | 181 | readyevent = multiprocessing.Event() 182 | 183 | data_receiver_process = multiprocessing.Process( 184 | name = "datareceiver", 185 | target = data_receiver_thread.run, 186 | args = (readyevent, client_args, control_conn, data_sock, client_data_addr), 187 | daemon = True) 188 | 189 | data_receiver_process.start() 190 | if not readyevent.wait(timeout=60): 191 | raise Exception("ERROR: process failed to become ready") 192 | 193 | thread_list = [] 194 | thread_list.append(data_receiver_process) 195 | 196 | control_conn.send_setup_complete_message() 197 | 198 | print("test running, {} {}, control conn addr {}, data conn addr {}, server addr {}, elapsed startup time {} seconds".format( 199 | "udp" if client_args.udp else "tcp", 200 | "down" if client_args.reverse else "up", 201 | client_control_addr, 202 | client_data_addr, 203 | server_addr, 204 | (time.time() - curr_client_start_time)), 205 | flush=True) 206 | 207 | start_time_sec = time.time() 208 | 209 | while True: 210 | if util.threads_are_running(thread_list): 211 | time.sleep(0.01) 212 | else: 213 | break 214 | 215 | curr_time_sec = time.time() 216 | 217 | if ((curr_time_sec - start_time_sec) > client_args.max_run_time_failsafe_sec): 218 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 219 | 220 | if client_args.verbosity: 221 | print("test finished, cleaning up", flush=True) 222 | 223 | util.done_with_socket(data_sock) 224 | control_conn.close() 225 | 226 | print("client ended", flush=True) 227 | -------------------------------------------------------------------------------- /src/bbperf/tcp_control_connection_class.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import json 5 | import argparse 6 | import socket 7 | import select 8 | 9 | from . import const 10 | from . import util 11 | 12 | from .exceptions import PeerDisconnectedException 13 | 14 | # this needs to be serializable to get from driver to child thread 15 | class TcpControlConnectionClass: 16 | 17 | # class variables 18 | 19 | def __init__(self, control_sock): 20 | self.control_sock = control_sock 21 | self.args = None 22 | 23 | self.read_buffer = bytearray() 24 | 25 | # set TCP_NODELAY because the control messages back to the 26 | # sender from the data receiver are part of the RTT measurement 27 | control_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 28 | 29 | 30 | def set_args(self, args): 31 | self.args = args 32 | 33 | 34 | def send_bytes(self, payload_bytes): 35 | self.control_sock.sendall(payload_bytes) 36 | 37 | if self.args.verbosity > 3: 38 | print("control conn sending: {}".format(payload_bytes.decode()), flush=True) 39 | 40 | 41 | def send_string(self, str0): 42 | self.send_bytes(str0.encode()) 43 | 44 | 45 | def send_control_initial_string(self, run_id): 46 | 47 | control_initial_string = "control " + run_id 48 | 49 | if self.args.verbosity: 50 | print("sending control initial string: {}".format(control_initial_string), flush=True) 51 | 52 | self.send_string(control_initial_string) 53 | 54 | if self.args.verbosity: 55 | print("sent control initial string", flush=True) 56 | 57 | 58 | def wait_for_control_initial_string(self): 59 | 60 | print("waiting to receive control initial string from client", flush=True) 61 | 62 | # "control " + uuid of 36 characters 63 | len_str = 8 + 36 64 | 65 | received_bytes = self.recv_exact_num_bytes(len_str) 66 | 67 | received_str = received_bytes.decode() 68 | 69 | uuid = received_str[8:] 70 | 71 | print("received control initial string: run_id: {}".format(uuid), flush=True) 72 | 73 | return uuid 74 | 75 | 76 | def send_control_initial_ack(self): 77 | 78 | print("sending control initial ack", flush=True) 79 | 80 | self.send_string(const.TCP_CONTROL_INITIAL_ACK) 81 | 82 | print("sent control initial ack", flush=True) 83 | 84 | 85 | def wait_for_control_initial_ack(self): 86 | 87 | if self.args.verbosity: 88 | print("waiting for control initial ack", flush=True) 89 | 90 | received_bytes = self.recv_exact_num_bytes(len(const.TCP_CONTROL_INITIAL_ACK)) 91 | 92 | received_str = received_bytes.decode() 93 | 94 | if received_str != const.TCP_CONTROL_INITIAL_ACK: 95 | raise Exception("ERROR: received invalid control initial ack: {}".format(received_str)) 96 | 97 | if self.args.verbosity: 98 | print("received control initial ack", flush=True) 99 | 100 | 101 | def send_args_to_server(self, args): 102 | 103 | if self.args.verbosity: 104 | print("sending args to server: {}".format(vars(args)), flush=True) 105 | 106 | args_json = json.dumps(vars(args)) 107 | 108 | self.send_string(args_json) 109 | 110 | if self.args.verbosity: 111 | print("sent args to server", flush=True) 112 | 113 | 114 | def wait_for_args_from_client(self): 115 | 116 | print("waiting for args from client", flush=True) 117 | 118 | # starts with "{" and ends with "}" 119 | 120 | substr_idx = self.recv_into_buffer_until_substr_found(b'}') 121 | 122 | received_bytes = self.read_buffer[ 0 : substr_idx + 1 ] 123 | self.read_buffer = self.read_buffer[ substr_idx + 1 : ] 124 | 125 | received_str = received_bytes.decode() 126 | 127 | args_d = json.loads(received_str) 128 | 129 | # recreate args as if it came directly from argparse 130 | args = argparse.Namespace(**args_d) 131 | 132 | print("received args from client: {}".format(vars(args)), flush=True) 133 | 134 | return args 135 | 136 | 137 | def send_control_args_ack(self): 138 | 139 | print("sending control args ack", flush=True) 140 | 141 | self.send_string(const.TCP_CONTROL_ARGS_ACK) 142 | 143 | print("sent control args ack", flush=True) 144 | 145 | 146 | def wait_for_control_args_ack(self): 147 | 148 | if self.args.verbosity: 149 | print("waiting for control args ack", flush=True) 150 | 151 | received_bytes = self.recv_exact_num_bytes(len(const.TCP_CONTROL_ARGS_ACK)) 152 | 153 | received_str = received_bytes.decode() 154 | 155 | if received_str != const.TCP_CONTROL_ARGS_ACK: 156 | raise Exception("ERROR: received invalid control args ack: {}".format(received_str)) 157 | 158 | if self.args.verbosity: 159 | print("received control args ack", flush=True) 160 | 161 | 162 | def send_setup_complete_message(self): 163 | 164 | if self.args.verbosity: 165 | print("sending setup complete message to client", flush=True) 166 | 167 | self.send_string(const.SETUP_COMPLETE_MSG) 168 | 169 | if self.args.verbosity: 170 | print("sent setup complete message to client", flush=True) 171 | 172 | 173 | def wait_for_setup_complete_message(self): 174 | 175 | if self.args.verbosity: 176 | print("waiting for connection setup complete message from server", flush=True) 177 | 178 | received_bytes = self.recv_exact_num_bytes(len(const.SETUP_COMPLETE_MSG)) 179 | 180 | received_str = received_bytes.decode() 181 | 182 | if received_str != const.SETUP_COMPLETE_MSG: 183 | raise Exception("ERROR: client_mainline: setup complete message was not received") 184 | 185 | if self.args.verbosity: 186 | print("connection setup complete message received from server", flush=True) 187 | 188 | 189 | def send_start_message(self): 190 | 191 | if self.args.verbosity: 192 | print("sending start message to server", flush=True) 193 | 194 | self.send_string(const.START_MSG) 195 | 196 | if self.args.verbosity: 197 | print("sent start message to server", flush=True) 198 | 199 | 200 | def wait_for_start_message(self): 201 | 202 | if self.args.verbosity: 203 | print("waiting for start message from client", flush=True) 204 | 205 | received_bytes = self.recv_exact_num_bytes(len(const.START_MSG)) 206 | 207 | received_str = received_bytes.decode() 208 | 209 | if received_str != const.START_MSG: 210 | raise Exception("ERROR: failed to receive start message") 211 | 212 | if self.args.verbosity: 213 | print("received start message from client", flush=True) 214 | 215 | 216 | def recv(self, max_bytes_to_read): 217 | # block here because we don't want the recv() to block indefinitely 218 | rlist, _, _ = select.select( [self.control_sock], [], [], const.SOCKET_TIMEOUT_SEC) 219 | 220 | if len(rlist) == 0: 221 | raise Exception("ERROR: select() timed out") 222 | 223 | recv_bytes = self.control_sock.recv(max_bytes_to_read) 224 | 225 | if len(recv_bytes) == 0: 226 | raise PeerDisconnectedException() 227 | 228 | self.read_buffer.extend(recv_bytes) 229 | 230 | 231 | def recv_into_buffer_until_minimum_size(self, minimum_buffer_size): 232 | 233 | while len(self.read_buffer) < minimum_buffer_size: 234 | 235 | num_bytes_remaining = minimum_buffer_size - len(self.read_buffer) 236 | 237 | self.recv(num_bytes_remaining) 238 | 239 | 240 | def recv_exact_num_bytes(self, exact_num_bytes_to_read): 241 | 242 | self.recv_into_buffer_until_minimum_size(exact_num_bytes_to_read) 243 | 244 | received_bytes = self.read_buffer[ 0 : exact_num_bytes_to_read ] 245 | self.read_buffer = self.read_buffer[ exact_num_bytes_to_read : ] 246 | 247 | return received_bytes 248 | 249 | 250 | def recv_into_buffer_until_substr_found(self, substr_bytes): 251 | 252 | while True: 253 | 254 | substr_idx = self.read_buffer.find(substr_bytes) 255 | if substr_idx > -1: 256 | # found 257 | break 258 | 259 | self.recv(const.BUFSZ) 260 | 261 | return substr_idx 262 | 263 | 264 | def recv_a_c_block(self): 265 | start_bytes = b' a ' 266 | end_bytes = b' c ' 267 | 268 | substr_idx = self.recv_into_buffer_until_substr_found(end_bytes) 269 | 270 | received_bytes = self.read_buffer[ 0 : substr_idx + 3 ] 271 | self.read_buffer = self.read_buffer[ substr_idx + 3 : ] 272 | 273 | if not (received_bytes.startswith(start_bytes) and received_bytes.endswith(end_bytes)): 274 | raise Exception("recv_a_c_block failed") 275 | 276 | return received_bytes 277 | 278 | 279 | def recv_a_d_block(self): 280 | start_bytes = b' a ' 281 | end_bytes = b' d ' 282 | 283 | substr_idx = self.recv_into_buffer_until_substr_found(end_bytes) 284 | 285 | received_bytes = self.read_buffer[ 0 : substr_idx + 3 ] 286 | self.read_buffer = self.read_buffer[ substr_idx + 3 : ] 287 | 288 | if not (received_bytes.startswith(start_bytes) and received_bytes.endswith(end_bytes)): 289 | raise Exception("recv_a_d_block failed") 290 | 291 | return received_bytes 292 | 293 | 294 | def close(self): 295 | util.done_with_socket(self.control_sock) 296 | -------------------------------------------------------------------------------- /src/bbperf/client.py: -------------------------------------------------------------------------------- 1 | #!/usr/bin/python3 2 | 3 | # Copyright (c) 2024 Cloudflare, Inc. 4 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 5 | 6 | import multiprocessing 7 | import time 8 | import queue 9 | import socket 10 | import uuid 11 | import ipaddress 12 | import shutil 13 | 14 | from . import data_sender_thread 15 | from . import udp_string_sender_thread 16 | from . import data_receiver_thread 17 | from . import control_receiver_thread 18 | from . import util 19 | from . import const 20 | from . import output 21 | from . import graph 22 | from . import tcp_helper 23 | from . import udp_helper 24 | 25 | from .tcp_control_connection_class import TcpControlConnectionClass 26 | 27 | 28 | def client_mainline(args): 29 | client_start_time = time.time() 30 | 31 | if args.verbosity: 32 | print("args: {}".format(args), flush=True) 33 | 34 | if args.client: 35 | try: 36 | # is the arg already an IP address? 37 | ipaddress.ip_address(args.client) 38 | server_ip = args.client 39 | 40 | except ValueError: 41 | # not an ip address, must be a hostname 42 | try: 43 | server_ip = socket.gethostbyname(args.client) 44 | 45 | except socket.gaierror as e: 46 | raise Exception("ERROR: unable to resolve hostname {}, {}".format(args.client, e)) 47 | 48 | server_port = args.port 49 | server_addr = (server_ip, server_port) 50 | 51 | # create control connection 52 | 53 | if args.verbosity: 54 | print("creating control connection to server at {}".format(server_addr), flush=True) 55 | 56 | control_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 57 | control_sock.connect(server_addr) 58 | 59 | client_control_addr = control_sock.getsockname() 60 | 61 | if args.verbosity: 62 | print("created control connection, client {}, server {}".format( 63 | client_control_addr, server_addr), flush=True) 64 | 65 | control_conn = TcpControlConnectionClass(control_sock) 66 | control_conn.set_args(args) 67 | 68 | # generate a random UUID (36 character string) 69 | run_id = str(uuid.uuid4()) 70 | 71 | control_conn.send_control_initial_string(run_id) 72 | 73 | control_conn.wait_for_control_initial_ack() 74 | 75 | control_conn.send_args_to_server(args) 76 | 77 | control_conn.wait_for_control_args_ack() 78 | 79 | # create data connection 80 | 81 | if args.verbosity: 82 | print("creating data connection to server at {}".format(server_addr), flush=True) 83 | 84 | data_initial_string = "data " + run_id 85 | 86 | if args.udp: 87 | data_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 88 | # bind client data connection to specific local port 89 | if args.local_data_port > 0: 90 | data_sock.bind(('0.0.0.0', args.local_data_port)) 91 | data_sock.settimeout(const.SOCKET_TIMEOUT_SEC) 92 | # must send something just to bind a local addr 93 | # this packet is not used by the server 94 | data_sock.sendto("foo".encode(), (server_ip, 65535)) 95 | client_data_addr = data_sock.getsockname() 96 | if args.verbosity: 97 | print("created udp data connection, client {}, no server addr".format(client_data_addr), flush=True) 98 | 99 | if args.verbosity: 100 | print("sending data initial string (async udp): {}".format(data_initial_string), flush=True) 101 | 102 | # start and keep sending the data connection initial string asynchronously 103 | readyevent = multiprocessing.Event() 104 | doneevent = multiprocessing.Event() 105 | udp_data_initial_string_sender_process = multiprocessing.Process( 106 | name = "udpdatainitialstringsender", 107 | target = udp_string_sender_thread.run, 108 | args = (readyevent, doneevent, args, data_sock, server_addr, data_initial_string), 109 | daemon = True) 110 | udp_data_initial_string_sender_process.start() 111 | if not readyevent.wait(timeout=60): 112 | raise Exception("ERROR: process failed to become ready") 113 | 114 | if args.verbosity: 115 | print("waiting for data initial ack", flush=True) 116 | 117 | # wait for data init ack 118 | udp_helper.wait_for_string(data_sock, server_addr, const.UDP_DATA_INITIAL_ACK) 119 | 120 | if args.verbosity: 121 | print("received data initial ack", flush=True) 122 | 123 | # stop sending data initial string 124 | doneevent.set() 125 | 126 | else: 127 | data_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 128 | # bind client data connection to specific local port 129 | if args.local_data_port > 0: 130 | data_sock.bind(('0.0.0.0', args.local_data_port)) 131 | tcp_helper.set_congestion_control(args, data_sock) 132 | tcp_helper.set_tcp_notsent_lowat(data_sock) 133 | data_sock.connect(server_addr) 134 | data_sock.settimeout(const.SOCKET_TIMEOUT_SEC) 135 | client_data_addr = data_sock.getsockname() 136 | if args.verbosity: 137 | print("created tcp data connection, client {}, server {}".format( 138 | client_data_addr, server_addr), flush=True) 139 | 140 | if args.verbosity: 141 | print("sending data initial string (tcp): {}".format(data_initial_string), flush=True) 142 | data_sock.sendall(data_initial_string.encode()) 143 | if args.verbosity: 144 | print("sent data initial string (tcp)", flush=True) 145 | 146 | control_conn.wait_for_setup_complete_message() 147 | 148 | shared_run_mode = multiprocessing.Value('i', const.RUN_MODE_CALIBRATING) 149 | shared_udp_sending_rate_pps = multiprocessing.Value('i', const.UDP_DEFAULT_INITIAL_RATE) 150 | control_receiver_results_queue = multiprocessing.Queue() 151 | 152 | if args.reverse: 153 | # direction down 154 | 155 | readyevent = multiprocessing.Event() 156 | 157 | data_receiver_process = multiprocessing.Process( 158 | name = "datareceiver", 159 | target = data_receiver_thread.run, 160 | args = (readyevent, args, control_conn, data_sock, server_addr), 161 | daemon = True) 162 | 163 | data_receiver_process.start() 164 | if not readyevent.wait(timeout=60): 165 | raise Exception("ERROR: process failed to become ready") 166 | 167 | readyevent = multiprocessing.Event() 168 | 169 | control_receiver_process = multiprocessing.Process( 170 | name = "controlreceiver", 171 | target = control_receiver_thread.run_recv_queue, 172 | args = (readyevent, args, control_conn, control_receiver_results_queue), 173 | daemon = True) 174 | 175 | control_receiver_process.start() 176 | if not readyevent.wait(timeout=60): 177 | raise Exception("ERROR: process failed to become ready") 178 | 179 | # test starts here 180 | 181 | control_conn.send_start_message() 182 | 183 | thread_list = [] 184 | thread_list.append(data_receiver_process) 185 | thread_list.append(control_receiver_process) 186 | 187 | 188 | else: 189 | # direction up 190 | 191 | readyevent = multiprocessing.Event() 192 | 193 | control_receiver_process = multiprocessing.Process( 194 | name = "controlreceiver", 195 | target = control_receiver_thread.run_recv_term_queue, 196 | args = (readyevent, args, control_conn, control_receiver_results_queue, shared_run_mode, shared_udp_sending_rate_pps), 197 | daemon = True) 198 | 199 | control_receiver_process.start() 200 | if not readyevent.wait(timeout=60): 201 | raise Exception("ERROR: process failed to become ready") 202 | 203 | data_sender_process = multiprocessing.Process( 204 | name = "datasender", 205 | target = data_sender_thread.run, 206 | args = (args, data_sock, server_addr, shared_run_mode, shared_udp_sending_rate_pps), 207 | daemon = True) 208 | 209 | # test starts here 210 | data_sender_process.start() 211 | 212 | thread_list = [] 213 | thread_list.append(control_receiver_process) 214 | thread_list.append(data_sender_process) 215 | 216 | 217 | if args.verbosity: 218 | print("test running, {} {}, control conn addr {}, data conn addr {}, server addr {}, elapsed startup time {} seconds".format( 219 | "udp" if args.udp else "tcp", 220 | "down" if args.reverse else "up", 221 | client_control_addr, 222 | client_data_addr, 223 | server_addr, 224 | (time.time() - client_start_time)), 225 | flush=True) 226 | 227 | # output loop 228 | 229 | output.init(args) 230 | 231 | start_time_sec = time.time() 232 | 233 | while True: 234 | try: 235 | s1 = control_receiver_results_queue.get_nowait() 236 | except queue.Empty: 237 | s1 = None 238 | 239 | if s1: 240 | output.print_output(s1) 241 | continue 242 | 243 | if util.threads_are_running(thread_list): 244 | # nothing in queues, but test is still running 245 | time.sleep(0.01) 246 | else: 247 | break 248 | 249 | curr_time_sec = time.time() 250 | 251 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 252 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 253 | 254 | if args.verbosity: 255 | print("test finished, generating output", flush=True) 256 | 257 | output.term() 258 | 259 | util.done_with_socket(data_sock) 260 | control_conn.close() 261 | 262 | graphdatafilename = output.get_graph_data_file_name() 263 | rawdatafilename = output.get_raw_data_file_name() 264 | 265 | if (args.graph or args.graph_file) and not args.quiet: 266 | pngfilename = graphdatafilename + ".png" 267 | 268 | graph.create_graph(args, graphdatafilename, pngfilename) 269 | 270 | if args.graph_file: 271 | try: 272 | # move the pngfile to the user specified destination 273 | shutil.move(pngfilename, args.graph_file) 274 | print("created graph: {}".format(args.graph_file), flush=True) 275 | 276 | except Exception as e: 277 | print("ERROR: during move of graph png file: {}".format(e), flush=True) 278 | 279 | else: 280 | print("created graph: {}".format(pngfilename), flush=True) 281 | 282 | if args.keep and not args.quiet: 283 | print("keeping graph data file: {}".format(graphdatafilename), flush=True) 284 | print("keeping raw data file: {}".format(rawdatafilename), flush=True) 285 | else: 286 | output.delete_data_files() 287 | 288 | if args.verbosity: 289 | print("test complete, exiting") 290 | --------------------------------------------------------------------------------