├── src └── bbperf │ ├── __init__.py │ ├── .gitignore │ ├── exceptions.py │ ├── udp_string_sender_thread.py │ ├── tcp-graph.gp │ ├── const.py │ ├── graph.py │ ├── data_sample_evaluator_class.py │ ├── udp_helper.py │ ├── udp-graph.gp │ ├── tcp_helper.py │ ├── udp_rate_manager_class.py │ ├── run_mode_manager_class.py │ ├── bbperf.py │ ├── json_output_class.py │ ├── data_receiver_thread.py │ ├── util.py │ ├── data_sender_thread.py │ ├── control_receiver_thread.py │ ├── output.py │ ├── server.py │ ├── tcp_control_connection_class.py │ └── client.py ├── .gitignore ├── bbperf.service ├── LICENSE ├── pyproject.toml ├── tests └── run_test.sh ├── DEPLOY.txt └── README.md /src/bbperf/__init__.py: -------------------------------------------------------------------------------- 1 | -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- 1 | dist 2 | .venv 3 | .vscode 4 | -------------------------------------------------------------------------------- /src/bbperf/.gitignore: -------------------------------------------------------------------------------- 1 | __pycache__ 2 | .vscode 3 | .venv* 4 | -------------------------------------------------------------------------------- /src/bbperf/exceptions.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | class PeerDisconnectedException(Exception): 5 | pass 6 | -------------------------------------------------------------------------------- /bbperf.service: -------------------------------------------------------------------------------- 1 | [Unit] 2 | Description=bbperf 3 | After=syslog.target network.target 4 | 5 | # StartLimitIntervalSec StartLimitBurst 6 | # stop restarting if fails X times with Y interval (zero disables) 7 | # default 5 times in 10 seconds 8 | StartLimitIntervalSec=0 9 | 10 | [Service] 11 | Type=simple 12 | User=mfreemon 13 | ExecStart=/home/mfreemon/.venv-bbperf/bin/bbperf -s 14 | Restart=always 15 | 16 | # RestartSec 17 | # wait time until restart 18 | RestartSec=2 19 | 20 | [Install] 21 | WantedBy=multi-user.target 22 | 23 | -------------------------------------------------------------------------------- /LICENSE: -------------------------------------------------------------------------------- 1 | Copyright (c) 2024 Cloudflare, Inc. 2 | 3 | Licensed under the Apache License, Version 2.0 (the "License"); 4 | you may not use this file except in compliance with the License. 5 | You may obtain a copy of the License at 6 | 7 | http://www.apache.org/licenses/LICENSE-2.0 8 | 9 | Unless required by applicable law or agreed to in writing, software 10 | distributed under the License is distributed on an "AS IS" BASIS, 11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | See the License for the specific language governing permissions and 13 | limitations under the License. 14 | 15 | -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- 1 | [build-system] 2 | requires = ["hatchling >= 1.26"] 3 | build-backend = "hatchling.build" 4 | 5 | [project] 6 | name = "bbperf" 7 | version = "0.0.30" 8 | authors = [ 9 | { name="Mike Freemon", email="mfreemon@cloudflare.com" }, 10 | ] 11 | description = "An end-to-end performance and bufferbloat measurement tool" 12 | readme = "README.md" 13 | requires-python = ">=3.9" 14 | classifiers = [ 15 | "Programming Language :: Python :: 3", 16 | "Operating System :: OS Independent", 17 | "Environment :: Console", 18 | "License :: OSI Approved :: Apache Software License", 19 | "Topic :: System :: Networking", 20 | ] 21 | dependencies = [ 22 | "numpy", 23 | ] 24 | license = "Apache-2.0" 25 | license-files = ["LICEN[CS]E*"] 26 | 27 | [project.urls] 28 | Homepage = "https://github.com/cloudflare/bbperf" 29 | Issues = "https://github.com/cloudflare/bbperf/issues" 30 | 31 | [project.scripts] 32 | bbperf = "bbperf.bbperf:mainline" 33 | -------------------------------------------------------------------------------- /tests/run_test.sh: -------------------------------------------------------------------------------- 1 | #!/bin/bash 2 | 3 | # Copyright (c) 2024 Cloudflare, Inc. 4 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 5 | 6 | do_run() { 7 | ARGS=$1 8 | 9 | #bbperf -c $ARGS 10 | sudo ip netns exec ns1 bash -c ". $HOME/.venv-314/bin/activate ; cd $HOME/bbperf/src ; python3 -m bbperf.bbperf $ARGS" 11 | } 12 | 13 | #SERVER_ADDR=127.0.0.1 14 | SERVER_ADDR=10.66.30.2 15 | 16 | EXTRAARGS="-v -t 10" 17 | 18 | set -x 19 | 20 | do_run "-c $SERVER_ADDR $EXTRAARGS" 21 | 22 | do_run "-c $SERVER_ADDR $EXTRAARGS -R" 23 | 24 | do_run "-c $SERVER_ADDR $EXTRAARGS -u" 25 | 26 | do_run "-c $SERVER_ADDR $EXTRAARGS -u -R" 27 | 28 | EXTRAARGS="-t 10" 29 | 30 | do_run "-c $SERVER_ADDR $EXTRAARGS" 31 | 32 | do_run "-c $SERVER_ADDR $EXTRAARGS -R" 33 | 34 | do_run "-c $SERVER_ADDR $EXTRAARGS -u" 35 | 36 | do_run "-c $SERVER_ADDR $EXTRAARGS -u -R" 37 | 38 | do_run "-c $SERVER_ADDR $EXTRAARGS -J /tmp/foo578439759837.out" 39 | 40 | head /tmp/foo578439759837.out 41 | tail /tmp/foo578439759837.out 42 | sudo rm /tmp/foo578439759837.out 43 | 44 | -------------------------------------------------------------------------------- /src/bbperf/udp_string_sender_thread.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | 6 | from . import udp_helper 7 | 8 | 9 | # falling off the end of this method terminates the process 10 | def run(readyevent, doneevent, args, data_sock, peer_addr, string_to_send): 11 | if args.verbosity: 12 | print("udp string sender thread: start of process", flush=True) 13 | 14 | ping_interval_sec = 0.2 15 | ping_duration_sec = 5 16 | total_pings_to_send = ping_duration_sec / ping_interval_sec 17 | 18 | send_count = 0 19 | 20 | readyevent.set() 21 | 22 | while True: 23 | if doneevent.is_set(): 24 | break 25 | 26 | udp_helper.sendto(data_sock, peer_addr, string_to_send.encode()) 27 | send_count += 1 28 | 29 | time.sleep(ping_interval_sec) 30 | 31 | if send_count > total_pings_to_send: 32 | break 33 | 34 | if args.verbosity: 35 | print("udp string sender thread: end of process", flush=True) 36 | -------------------------------------------------------------------------------- /DEPLOY.txt: -------------------------------------------------------------------------------- 1 | 2 | PREREQUISITES 3 | 4 | Appropriate account/credentials to upload to the PyPI site 5 | 6 | python3 -m pip install --upgrade pip build twine 7 | 8 | BUILD 9 | 10 | Create tar.gz and whl files in dist directory: 11 | 12 | cd /path/to/bbperf/git/repo/dir <-- the directory with the pyproject.toml file 13 | 14 | vim src/bbperf/const.py <-- increment the version number 15 | git add src/bbperf/const.py 16 | 17 | vim pyproject.toml <-- increment the version number 18 | git add pyproject.toml 19 | 20 | git commit -m 'version 0.0.2' 21 | 22 | git tag -a v0.0.2 -m 'version 0.0.2' 23 | 24 | git push origin 25 | git push origin --tags 26 | 27 | rm dist/* 28 | python3 -m build 29 | 30 | UPLOAD 31 | 32 | Upload dist files to production PyPI site: 33 | 34 | cd /path/to/bbperf/git/repo/dir <-- the directory with the pyproject.toml file 35 | 36 | python3 -m twine upload dist/* 37 | 38 | To view: 39 | https://pypi.org 40 | 41 | MISCELLANEOUS 42 | 43 | Using the test PyPI site: 44 | 45 | python3 -m twine upload --repository testpypi dist/* 46 | 47 | To view: 48 | https://test.pypi.org 49 | 50 | To install from test PyPI: 51 | python3 -m pip install --upgrade --index-url https://test.pypi.org/simple/ --nodeps bbperf 52 | 53 | Installing from the local git repository: 54 | 55 | python3 -m pip install /path/to/bbperf/git/repo/dir <-- the directory with the pyproject.toml file 56 | 57 | -------------------------------------------------------------------------------- /src/bbperf/tcp-graph.gp: -------------------------------------------------------------------------------- 1 | #!/usr/bin/gnuplot 2 | 3 | # Copyright (c) 2024 Cloudflare, Inc. 4 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 5 | 6 | #datafile1 = "/tmp/bbperf-tcp-data-j9xh25q3" 7 | 8 | pngfile1 = datafile1.".png" 9 | 10 | set grid 11 | 12 | set key right top 13 | set key box opaque 14 | 15 | set style data lines 16 | 17 | # noenhanced to avoid need to escape underscores in labels 18 | set terminal pngcairo size 1200,1000 noenhanced 19 | set output pngfile1 20 | 21 | # generate stats for column 2 22 | # nooutput - do not sent to "screen" 23 | # name - prefix 24 | stats datafile1 using 1 nooutput name "XRANGE" 25 | 26 | set multiplot title graphtitle layout 3,1 27 | 28 | set lmargin 12 29 | 30 | # dt 1 (solid), dt 2 (dotted), dt 4 (dot dash) 31 | # lc 1 (purple), lc 4 (orange), lc 6 (blue), lc 7 (red), lc 8 (black) 32 | 33 | set ylabel "Mbps" 34 | 35 | plot datafile1 using ($1-XRANGE_min):6 title "receiver throughput (L7)" lw 2 lc 6, \ 36 | "" using ($1-XRANGE_min):4 title "sender throughput (L7)" lw 2 lc 1 37 | 38 | set ylabel "ms" 39 | 40 | plot datafile1 using ($1-XRANGE_min):7 title "unloaded RTT (L7)" lw 2 lc 1, \ 41 | "" using ($1-XRANGE_min):8 title "RTT (L7)" lw 2 lc 6 42 | 43 | set ylabel "bytes" 44 | 45 | plot datafile1 using ($1-XRANGE_min):9 title "BDP" lw 2 lc 1, \ 46 | "" using ($1-XRANGE_min):10 title "buffered data" lw 2 lc 6 47 | 48 | unset multiplot 49 | 50 | -------------------------------------------------------------------------------- /src/bbperf/const.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | BBPERF_VERSION = "0.0.30" 5 | 6 | SERVER_PORT = 5301 7 | 8 | DEFAULT_VALID_DATA_COLLECTION_TIME_SEC = 20 9 | 10 | # max duration for calibration phase 11 | MAX_DURATION_CALIBRATION_TIME_SEC = 20 12 | 13 | # cap the amount of time we will wait for valid data 14 | MAX_DATA_COLLECTION_TIME_WITHOUT_VALID_DATA = 60 15 | 16 | # ignore incoming data for this amount of time after starting data collection phase 17 | DATA_SAMPLE_IGNORE_TIME_ALWAYS_SEC = 1 18 | DATA_SAMPLE_IGNORE_TIME_TCP_MAX_SEC = 5 19 | DATA_SAMPLE_IGNORE_TIME_UDP_MAX_SEC = 10 20 | 21 | # for socket recv() 22 | BUFSZ = (128 * 1024) 23 | 24 | PAYLOAD_1K = b'a'*1024 25 | PAYLOAD_128K = b'a'*(128 * 1024) 26 | 27 | RUN_MODE_CALIBRATING = 1 28 | RUN_MODE_RUNNING = 2 29 | RUN_MODE_STOP = 3 30 | 31 | SAMPLE_INTERVAL_SEC = 0.1 32 | STDOUT_INTERVAL_SEC = 1 33 | 34 | # pacing for UDP sends 35 | UDP_BATCHES_PER_SECOND = 1000 36 | UDP_DELAY_BETWEEN_BATCH_STARTS = (1.0 / UDP_BATCHES_PER_SECOND) 37 | UDP_NEGATIVE_DELAY_BETWEEN_BATCHES_WARNING_EVERY = UDP_BATCHES_PER_SECOND 38 | 39 | SETUP_COMPLETE_MSG = "setup complete" 40 | START_MSG = " start " 41 | UDP_STOP_MSG = "stop" 42 | TCP_CONTROL_INITIAL_ACK = "control initial ack" 43 | TCP_CONTROL_ARGS_ACK = "control args ack" 44 | UDP_DATA_INITIAL_ACK = "data initial ack" 45 | 46 | SOCKET_TIMEOUT_SEC=30 47 | 48 | UDP_DEFAULT_INITIAL_RATE = 8000 49 | 50 | UDP_MIN_RATE = 100 51 | UDP_MAX_RATE = 800000 52 | -------------------------------------------------------------------------------- /src/bbperf/graph.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import os 5 | import argparse 6 | import subprocess 7 | 8 | 9 | def create_graph(args, datafile1, pngfilename): 10 | 11 | if args.graph_file: 12 | filename_in_title = args.graph_file 13 | else: 14 | filename_in_title = pngfilename 15 | 16 | this_script_dir = os.path.dirname(os.path.abspath(__file__)) 17 | 18 | if args.udp: 19 | gp_file = this_script_dir + "/udp-graph.gp" 20 | graph_title = "bbperf UDP {}".format(filename_in_title) 21 | else: 22 | gp_file = this_script_dir + "/tcp-graph.gp" 23 | graph_title = "bbperf TCP {}".format(filename_in_title) 24 | 25 | gnuplot_script = "datafile1 = \"{}\" ; graphtitle = \"{}\" ; load \"{}\"".format( 26 | datafile1, graph_title, gp_file) 27 | 28 | result = subprocess.run(["gnuplot", "-e", gnuplot_script], capture_output=True) 29 | 30 | if args.verbosity or (result.returncode != 0): 31 | print("gnuplot -e {}".format(gnuplot_script), flush=True) 32 | print("returncode: {}".format(result.returncode), flush=True) 33 | print("stdout: {}".format(result.stdout), flush=True) 34 | print("stderr: {}".format(result.stderr), flush=True) 35 | 36 | 37 | if __name__ == '__main__': 38 | datafile1 = "/tmp/bbperf-tcp-data-aa97m9xl" 39 | 40 | args_d = { "udp": True } 41 | 42 | # recreate args as if it came directly from argparse 43 | args = argparse.Namespace(**args_d) 44 | 45 | create_graph(args, datafile1) 46 | -------------------------------------------------------------------------------- /src/bbperf/data_sample_evaluator_class.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | 6 | from . import const 7 | 8 | class DataSampleEvaluatorClass: 9 | 10 | # args are client args 11 | def __init__(self, args0): 12 | self.args = args0 13 | self.valid_flag = False 14 | self.max_ramp_time = self.args.max_ramp_time 15 | 16 | if not self.max_ramp_time: 17 | if self.args.udp: 18 | self.max_ramp_time = const.DATA_SAMPLE_IGNORE_TIME_UDP_MAX_SEC 19 | else: 20 | self.max_ramp_time = const.DATA_SAMPLE_IGNORE_TIME_TCP_MAX_SEC 21 | 22 | if self.args.verbosity: 23 | print("max_ramp_time is {}".format(self.max_ramp_time), flush=True) 24 | 25 | 26 | # once a sample is valid then all subsequent samples are valid 27 | def is_sample_valid(self, run_mode_running_start_time, dropped_this_interval_percent, curr_time): 28 | if self.valid_flag: 29 | return True 30 | 31 | # samples are never valid until we have passed the "ignore time" 32 | if curr_time < (run_mode_running_start_time + const.DATA_SAMPLE_IGNORE_TIME_ALWAYS_SEC): 33 | return False 34 | 35 | # udp -- can we exit early? 36 | if self.args.udp: 37 | if dropped_this_interval_percent > 0: 38 | self.valid_flag = True 39 | return True 40 | 41 | if curr_time > (run_mode_running_start_time + self.max_ramp_time): 42 | self.valid_flag = True 43 | return True 44 | 45 | return False 46 | 47 | -------------------------------------------------------------------------------- /src/bbperf/udp_helper.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | 6 | from . import const 7 | 8 | 9 | def sendto(data_sock, peer_addr, payload_bytes): 10 | num_payload_bytes = len(payload_bytes) 11 | 12 | num_bytes_sent = data_sock.sendto(payload_bytes, peer_addr) 13 | 14 | if num_bytes_sent <= 0 or num_bytes_sent != num_payload_bytes: 15 | raise Exception("ERROR: udp_helper.sendto(): send failed") 16 | 17 | 18 | def send_stop_message(data_sock, peer_addr): 19 | payload_bytes = const.UDP_STOP_MSG.encode() 20 | 21 | # 3 times just in case the first one does not make it to the destination 22 | for i in range(3): 23 | try: 24 | data_sock.sendto(payload_bytes, peer_addr) 25 | except: 26 | # probable "ConnectionRefusedError: [Errno 111] Connection refused" here if first message was processed successfully 27 | pass 28 | 29 | time.sleep(0.1) 30 | 31 | 32 | def wait_for_string(data_sock, peer_addr, expected_string): 33 | expected_bytes = expected_string.encode() 34 | start_time = time.time() 35 | 36 | while True: 37 | payload_bytes, pkt_from_addr = data_sock.recvfrom(len(expected_bytes)) 38 | 39 | if pkt_from_addr != peer_addr: 40 | continue 41 | 42 | payload_str = payload_bytes.decode() 43 | 44 | if payload_str == expected_string: 45 | break 46 | 47 | elapsed_time = time.time() - start_time 48 | if elapsed_time > 60: 49 | raise Exception("ERROR: failed to UDP recv string: {}".format(expected_string)) 50 | 51 | -------------------------------------------------------------------------------- /src/bbperf/udp-graph.gp: -------------------------------------------------------------------------------- 1 | #!/usr/bin/gnuplot 2 | 3 | # Copyright (c) 2024 Cloudflare, Inc. 4 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 5 | 6 | #datafile1 = "/tmp/bbperf-udp-data-aa97m9xl" 7 | 8 | pngfile1 = datafile1.".png" 9 | 10 | set grid 11 | 12 | set key right top 13 | set key box opaque 14 | 15 | set style data lines 16 | 17 | # noenhanced to avoid need to escape underscores in labels 18 | set terminal pngcairo size 1200,1300 noenhanced 19 | set output pngfile1 20 | 21 | # generate stats for column 2 22 | # nooutput - do not sent to "screen" 23 | # name - prefix 24 | stats datafile1 using 1 nooutput name "XRANGE" 25 | 26 | set multiplot title graphtitle layout 5,1 27 | 28 | set lmargin 12 29 | 30 | # dt 1 (solid), dt 2 (dotted), dt 4 (dot dash) 31 | # lc 1 (purple), lc 4 (orange), lc 6 (blue), lc 7 (red), lc 8 (black) 32 | 33 | set ylabel "pps" 34 | 35 | plot datafile1 using ($1-XRANGE_min):5 title "receiver pps" lw 2 lc 6, \ 36 | "" using ($1-XRANGE_min):3 title "sender pps" lw 2 lc 1 dt 2 37 | 38 | set ylabel "Mbps" 39 | 40 | plot datafile1 using ($1-XRANGE_min):6 title "receiver throughput" lw 2 lc 6, \ 41 | "" using ($1-XRANGE_min):4 title "sender throughput" lw 2 lc 1 dt 2 42 | 43 | set ylabel "ms" 44 | 45 | plot datafile1 using ($1-XRANGE_min):7 title "unloaded RTT" lw 2 lc 1, \ 46 | "" using ($1-XRANGE_min):8 title "RTT" lw 2 lc 6 47 | 48 | set ylabel "bytes" 49 | 50 | plot datafile1 using ($1-XRANGE_min):9 title "BDP" lw 2 lc 1, \ 51 | "" using ($1-XRANGE_min):10 title "buffered data" lw 2 lc 6 52 | 53 | set ylabel "percent" 54 | 55 | plot datafile1 using ($1-XRANGE_min):13 title "pkt loss %" lw 2 lc 6 56 | 57 | unset multiplot 58 | 59 | -------------------------------------------------------------------------------- /src/bbperf/tcp_helper.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import socket 5 | import struct 6 | 7 | from .exceptions import PeerDisconnectedException 8 | 9 | 10 | def recv_exact_num_bytes(data_sock, total_num_bytes_to_read): 11 | payload_bytes = bytearray() 12 | num_bytes_read = 0 13 | 14 | while num_bytes_read < total_num_bytes_to_read: 15 | 16 | num_bytes_remaining = total_num_bytes_to_read - num_bytes_read 17 | 18 | recv_bytes = data_sock.recv(num_bytes_remaining) 19 | 20 | if len(recv_bytes) == 0: 21 | raise PeerDisconnectedException() 22 | 23 | num_bytes_received = len(recv_bytes) 24 | 25 | if num_bytes_received == 0: 26 | raise PeerDisconnectedException() 27 | 28 | num_bytes_read += num_bytes_received 29 | 30 | payload_bytes.extend(recv_bytes) 31 | 32 | return payload_bytes 33 | 34 | 35 | def get_congestion_control(data_sock): 36 | cc_algo_bytes = data_sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_CONGESTION, 1024) 37 | # cc_algo is null-terminated bytes 38 | cc_algo_str = cc_algo_bytes.split(b'\x00')[0].decode() 39 | return cc_algo_str 40 | 41 | 42 | def set_congestion_control(client_args, data_sock): 43 | if get_congestion_control(data_sock) == client_args.congestion: 44 | # already set, nothing to do here 45 | return 46 | 47 | data_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CONGESTION, client_args.congestion.encode()) 48 | 49 | cc_algo_str = get_congestion_control(data_sock) 50 | if cc_algo_str != client_args.congestion: 51 | raise Exception("ERROR: unexpected congestion control in effect: {}".format(cc_algo_str)) 52 | 53 | 54 | def set_tcp_notsent_lowat(data_sock): 55 | lowat_value = 128 * 1024 56 | lowat_val_bytes = struct.pack('I', lowat_value) 57 | data_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NOTSENT_LOWAT, lowat_val_bytes) 58 | -------------------------------------------------------------------------------- /src/bbperf/udp_rate_manager_class.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import numpy 5 | 6 | from . import const 7 | from . import util 8 | 9 | # this is a simple congestion control algorithm for the udp test 10 | class UdpRateManagerClass: 11 | 12 | # args are client args 13 | def __init__(self, args, shared_udp_sending_rate_pps): 14 | self.args = args 15 | self.shared_udp_sending_rate_pps = shared_udp_sending_rate_pps 16 | self.receiver_pps_list = [] 17 | self.last_new_rate = 0 18 | 19 | # control receiver calls this with interval pps (every 0.1 seconds) 20 | def update(self, r_record): 21 | # gut checks to avoid updating based on bogus input 22 | if r_record["r_sender_total_pkts_sent"] < 100: 23 | return 24 | if r_record["r_sender_interval_pkts_sent"] < 10: 25 | return 26 | if r_record["receiver_pps"] < 100: 27 | return 28 | 29 | self.receiver_pps_list.append(r_record["receiver_pps"]) 30 | if len(self.receiver_pps_list) > 10: 31 | self.receiver_pps_list = self.receiver_pps_list[1:] 32 | 33 | receiver_pps_p90 = numpy.percentile(self.receiver_pps_list, 90) 34 | 35 | new_rate = int(receiver_pps_p90 * 1.2) 36 | 37 | if new_rate < const.UDP_MIN_RATE: 38 | new_rate = const.UDP_MIN_RATE 39 | # cap it to something big 40 | if new_rate > const.UDP_MAX_RATE: 41 | new_rate = const.UDP_MAX_RATE 42 | 43 | delta_rate = new_rate - self.last_new_rate 44 | 45 | if abs(delta_rate) < 2: 46 | # do not bother with tiny changes 47 | return 48 | 49 | if self.args.verbosity > 1: 50 | print("UdpRateManager: update: receiver pps {:6d} old rate {:6d} new rate {:6d} delta {:7d}".format( 51 | r_record["receiver_pps"], 52 | self.shared_udp_sending_rate_pps.value, 53 | new_rate, 54 | delta_rate), 55 | flush=True 56 | ) 57 | 58 | self.shared_udp_sending_rate_pps.value = new_rate 59 | self.last_new_rate = new_rate 60 | -------------------------------------------------------------------------------- /src/bbperf/run_mode_manager_class.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | 6 | from . import const 7 | 8 | from .data_sample_evaluator_class import DataSampleEvaluatorClass 9 | 10 | class RunModeManagerClass: 11 | 12 | # args are client args 13 | def __init__(self, args0, shared_run_mode0): 14 | self.args = args0 15 | self.shared_run_mode = shared_run_mode0 16 | 17 | self.job_start_time = None 18 | self.run_mode_running_start_time = None 19 | self.min_rtt_ms = None 20 | self.last_10_rtt_list = [] 21 | self.total_dropped_as_of_last_interval = 0 22 | self.data_sample_evaluator = DataSampleEvaluatorClass(self.args) 23 | self.first_valid_sample_time = None 24 | 25 | 26 | # updates shared_run_mode and r_record["is_sample_valid"] 27 | def update(self, r_record): 28 | curr_time = time.time() 29 | 30 | # first record 31 | if self.job_start_time is None: 32 | self.job_start_time = curr_time 33 | 34 | curr_rtt_ms = r_record["rtt_ms"] 35 | 36 | # update unloaded latency? 37 | if r_record["r_record_type"] == "cal": 38 | if (self.min_rtt_ms is None) or (curr_rtt_ms < self.min_rtt_ms): 39 | self.min_rtt_ms = curr_rtt_ms 40 | 41 | # CALIBRATING 42 | if self.shared_run_mode.value == const.RUN_MODE_CALIBRATING: 43 | 44 | # check to see if we should leave calibration 45 | self.last_10_rtt_list.append(curr_rtt_ms) 46 | if len(self.last_10_rtt_list) > 10: 47 | self.last_10_rtt_list = self.last_10_rtt_list[1:11] 48 | 49 | # are we done calibrating? 50 | # because either end early or hit max calibration time 51 | if ((min(self.last_10_rtt_list) > self.min_rtt_ms) or 52 | (curr_time > self.job_start_time + const.MAX_DURATION_CALIBRATION_TIME_SEC)): 53 | 54 | self.shared_run_mode.value = const.RUN_MODE_RUNNING 55 | self.run_mode_running_start_time = curr_time 56 | 57 | return 58 | 59 | # run mode is RUNNING or STOP 60 | 61 | # have we reached max time for data run without getting any valid data samples? 62 | if ((self.first_valid_sample_time is None) and (curr_time > (self.run_mode_running_start_time + const.MAX_DATA_COLLECTION_TIME_WITHOUT_VALID_DATA))): 63 | self.shared_run_mode.value = const.RUN_MODE_STOP 64 | 65 | # check to see if we should stop RUNNING 66 | 67 | if self.args.udp: 68 | dropped_this_interval = r_record["total_dropped"] - self.total_dropped_as_of_last_interval 69 | if dropped_this_interval < 0: 70 | dropped_this_interval = 0 71 | dropped_this_interval_percent = (dropped_this_interval * 100.0) / r_record["r_sender_interval_pkts_sent"] 72 | # remember this for next loop: 73 | self.total_dropped_as_of_last_interval = r_record["total_dropped"] 74 | else: 75 | dropped_this_interval = -1 76 | dropped_this_interval_percent = -1 77 | 78 | r_record["interval_dropped"] = dropped_this_interval 79 | r_record["interval_dropped_percent"] = dropped_this_interval_percent 80 | 81 | if self.data_sample_evaluator.is_sample_valid( 82 | self.run_mode_running_start_time, 83 | dropped_this_interval_percent, 84 | curr_time): 85 | 86 | r_record["is_sample_valid"] = 1 87 | if self.first_valid_sample_time is None: 88 | self.first_valid_sample_time = curr_time 89 | 90 | else: 91 | r_record["is_sample_valid"] = 0 92 | 93 | if self.first_valid_sample_time and (curr_time > (self.first_valid_sample_time + self.args.time)): 94 | self.shared_run_mode.value = const.RUN_MODE_STOP 95 | 96 | -------------------------------------------------------------------------------- /src/bbperf/bbperf.py: -------------------------------------------------------------------------------- 1 | #!/usr/bin/python3 2 | 3 | # Copyright (c) 2024 Cloudflare, Inc. 4 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 5 | 6 | import argparse 7 | 8 | from . import client 9 | from . import server 10 | from . import util 11 | from . import const 12 | 13 | def mainline(): 14 | print("bbperf version {}".format(const.BBPERF_VERSION), flush=True) 15 | 16 | parser = argparse.ArgumentParser(description="bbperf: end to end performance and bufferbloat measurement tool") 17 | 18 | parser.add_argument("-s", "--server", 19 | action="store_true", 20 | default=False, 21 | help="run in server mode") 22 | 23 | parser.add_argument("-c", "--client", 24 | metavar="SERVER_ADDR", 25 | default=None, 26 | help="run in client mode (specify either DNS name or IP address)") 27 | 28 | parser.add_argument("-p", "--port", 29 | metavar="SERVER_PORT", 30 | type=int, 31 | default=const.SERVER_PORT, 32 | help="server port (default: {})".format(const.SERVER_PORT)) 33 | 34 | parser.add_argument("-u", "--udp", 35 | action="store_true", 36 | default=False, 37 | help="run in UDP mode (default: TCP mode)") 38 | 39 | parser.add_argument("-R", "--reverse", 40 | action="store_true", 41 | default=False, 42 | help="data flow in download direction (server to client)") 43 | 44 | parser.add_argument("--max-ramp-time", 45 | metavar="SECONDS", 46 | type=int, 47 | default=None, 48 | help="max duration in seconds before collecting data samples (tcp default: {}, udp default: {})".format( 49 | const.DATA_SAMPLE_IGNORE_TIME_TCP_MAX_SEC, 50 | const.DATA_SAMPLE_IGNORE_TIME_UDP_MAX_SEC)) 51 | 52 | parser.add_argument("-t", "--time", 53 | metavar="SECONDS", 54 | type=int, 55 | default=const.DEFAULT_VALID_DATA_COLLECTION_TIME_SEC, 56 | help="duration in seconds to collect valid data samples (default: {})".format(const.DEFAULT_VALID_DATA_COLLECTION_TIME_SEC)) 57 | 58 | parser.add_argument("-v", "--verbosity", 59 | action="count", 60 | default=0, 61 | help="increase output verbosity (can be repeated)") 62 | 63 | parser.add_argument("-q", "--quiet", 64 | action="count", 65 | default=0, 66 | help="decrease output verbosity (can be repeated)") 67 | 68 | parser.add_argument("-J", "--json-file", 69 | default=None, 70 | help="JSON output file") 71 | 72 | parser.add_argument("-g", "--graph", 73 | action="store_true", 74 | default=False, 75 | help="generate graph and save in tmp file (requires gnuplot)") 76 | 77 | parser.add_argument("--graph-file", 78 | metavar="GRAPH_FILE", 79 | default=None, 80 | help="generate graph and save in the specified file (requires gnuplot)") 81 | 82 | parser.add_argument("-k", "--keep", 83 | action="store_true", 84 | default=False, 85 | help="keep data file") 86 | 87 | parser.add_argument("-B", "--bind", 88 | metavar="BIND_ADDR", 89 | default="0.0.0.0", 90 | help="bind server sockets to address") 91 | 92 | parser.add_argument("--local-data-port", 93 | metavar="LOCAL_DATA_PORT", 94 | type=int, 95 | default=0, 96 | help="local port for data connection (default: ephemeral)") 97 | 98 | parser.add_argument("-C", "--congestion", 99 | metavar="CC_ALGORITHM", 100 | default="cubic", 101 | help="congestion control algorithm (default: cubic)") 102 | 103 | args = parser.parse_args() 104 | 105 | util.validate_and_finalize_args(args) 106 | 107 | if args.client: 108 | client.client_mainline(args) 109 | else: 110 | server.server_mainline(args) 111 | 112 | 113 | if __name__ == '__main__': 114 | mainline() 115 | -------------------------------------------------------------------------------- /src/bbperf/json_output_class.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import sys 5 | import json 6 | import numpy 7 | 8 | class JsonOutputClass: 9 | 10 | def __init__(self, args): 11 | self.args = args 12 | self.output_dict = {} 13 | self.output_dict["entries"] = [] 14 | self.unloaded_rtt_ms = None 15 | 16 | if self.args.json_file: 17 | self.json_output_file = open(self.args.json_file, 'w') 18 | 19 | def set_unloaded_rtt_ms(self, rtt_ms): 20 | self.unloaded_rtt_ms = rtt_ms 21 | 22 | def add_entry(self, entry): 23 | self.output_dict["entries"].append(entry) 24 | 25 | def create_aggregate_stats(self): 26 | loaded_rtt_ms_list = [] 27 | receiver_throughput_rate_mbps_list = [] 28 | excess_buffered_bytes_list = [] 29 | receiver_pps_list = [] 30 | pkt_loss_percent_list = [] 31 | 32 | for entry in self.output_dict["entries"]: 33 | if entry["is_sample_valid"]: 34 | loaded_rtt_ms_list.append(entry["loaded_rtt_ms"]) 35 | receiver_throughput_rate_mbps_list.append(entry["receiver_throughput_rate_mbps"]) 36 | excess_buffered_bytes_list.append(entry["excess_buffered_bytes"]) 37 | receiver_pps_list.append(entry["receiver_pps"]) 38 | pkt_loss_percent_list.append(entry["pkt_loss_percent"]) 39 | 40 | num_samples = len(loaded_rtt_ms_list) 41 | if num_samples < 10: 42 | print("ERROR: not enough valid samples for summary statistics: {} samples".format(num_samples), 43 | file=sys.stderr, 44 | flush=True) 45 | return 46 | 47 | summary_dict = self.output_dict["summary"] = {} 48 | 49 | summary_dict["num_samples"] = num_samples 50 | 51 | summary_dict["unloaded_rtt_ms"] = self.unloaded_rtt_ms 52 | 53 | p1, p10, p50, p90, p99 = numpy.percentile(loaded_rtt_ms_list, [1, 10, 50, 90, 99]) 54 | summary_dict["loaded_rtt_ms"] = {} 55 | summary_dict["loaded_rtt_ms"]["p1"] = p1 56 | summary_dict["loaded_rtt_ms"]["p10"] = p10 57 | summary_dict["loaded_rtt_ms"]["p50"] = p50 58 | summary_dict["loaded_rtt_ms"]["p90"] = p90 59 | summary_dict["loaded_rtt_ms"]["p99"] = p99 60 | 61 | p1, p10, p50, p90, p99 = numpy.percentile(receiver_throughput_rate_mbps_list, [1, 10, 50, 90, 99]) 62 | summary_dict["receiver_throughput_rate_mbps"] = {} 63 | summary_dict["receiver_throughput_rate_mbps"]["p1"] = p1 64 | summary_dict["receiver_throughput_rate_mbps"]["p10"] = p10 65 | summary_dict["receiver_throughput_rate_mbps"]["p50"] = p50 66 | summary_dict["receiver_throughput_rate_mbps"]["p90"] = p90 67 | summary_dict["receiver_throughput_rate_mbps"]["p99"] = p99 68 | 69 | p1, p10, p50, p90, p99 = numpy.percentile(excess_buffered_bytes_list, [1, 10, 50, 90, 99]) 70 | summary_dict["excess_buffered_bytes"] = {} 71 | summary_dict["excess_buffered_bytes"]["p1"] = p1 72 | summary_dict["excess_buffered_bytes"]["p10"] = p10 73 | summary_dict["excess_buffered_bytes"]["p50"] = p50 74 | summary_dict["excess_buffered_bytes"]["p90"] = p90 75 | summary_dict["excess_buffered_bytes"]["p99"] = p99 76 | 77 | p1, p10, p50, p90, p99 = numpy.percentile(receiver_pps_list, [1, 10, 50, 90, 99]) 78 | summary_dict["receiver_pps"] = {} 79 | summary_dict["receiver_pps"]["p1"] = p1 80 | summary_dict["receiver_pps"]["p10"] = p10 81 | summary_dict["receiver_pps"]["p50"] = p50 82 | summary_dict["receiver_pps"]["p90"] = p90 83 | summary_dict["receiver_pps"]["p99"] = p99 84 | 85 | p1, p10, p50, p90, p99 = numpy.percentile(pkt_loss_percent_list, [1, 10, 50, 90, 99]) 86 | summary_dict["pkt_loss_percent"] = {} 87 | summary_dict["pkt_loss_percent"]["p1"] = p1 88 | summary_dict["pkt_loss_percent"]["p10"] = p10 89 | summary_dict["pkt_loss_percent"]["p50"] = p50 90 | summary_dict["pkt_loss_percent"]["p90"] = p90 91 | summary_dict["pkt_loss_percent"]["p99"] = p99 92 | 93 | def write_output(self): 94 | self.create_aggregate_stats() 95 | 96 | # write to stdout 97 | if (self.args.quiet < 2) and ("summary" in self.output_dict): 98 | str_out = json.dumps(self.output_dict["summary"], indent=4) 99 | print(str_out, flush=True) 100 | 101 | # write to file if requested 102 | if self.args.json_file: 103 | json.dump(self.output_dict, self.json_output_file, indent=4) 104 | self.json_output_file.close() 105 | -------------------------------------------------------------------------------- /src/bbperf/data_receiver_thread.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | import socket 6 | 7 | from . import const 8 | from . import util 9 | 10 | # args are client args 11 | def run(readyevent, args, control_conn, data_sock, peer_addr): 12 | 13 | if args.verbosity: 14 | print("starting data receiver process", flush=True) 15 | 16 | # do not block for very long on the below recv calls as we want to do these thing even 17 | # if the flow of data packets has stopped: 18 | # 1. to send our interval stats on the intervals 19 | # 2. to exit the process when the "end of run" timer expires 20 | data_sock.settimeout(0.01) 21 | 22 | total_recv_calls = 0 23 | 24 | start_time_sec = time.time() 25 | 26 | interval_start_time = start_time_sec 27 | interval_end_time = interval_start_time + const.SAMPLE_INTERVAL_SEC 28 | 29 | interval_pkts_received = 0 30 | interval_bytes_received = 0 31 | 32 | socket_timeout_timer_active = False 33 | socket_timeout_timer_start_time = None 34 | 35 | readyevent.set() 36 | 37 | # do until end of test duration 38 | # we will not get a connection close with udp 39 | while True: 40 | num_bytes_read = 0 41 | 42 | try: 43 | if args.udp: 44 | # recv with short timeout 45 | bytes_read, pkt_from_addr = data_sock.recvfrom(const.BUFSZ) 46 | 47 | # validate peer address 48 | # only accept packets from our client 49 | 50 | if pkt_from_addr != peer_addr: 51 | # ignore this datagram 52 | continue 53 | 54 | else: 55 | # tcp 56 | # recv with short timeout 57 | bytes_read = data_sock.recv(const.BUFSZ) 58 | 59 | num_bytes_read = len(bytes_read) 60 | 61 | if num_bytes_read == 0: 62 | # (tcp only) peer has disconnected 63 | if args.verbosity: 64 | print("peer disconnected (data socket)", flush=True) 65 | # exit process 66 | break 67 | 68 | socket_timeout_timer_active = False 69 | 70 | except socket.timeout: 71 | if socket_timeout_timer_active: 72 | if (time.time() - socket_timeout_timer_start_time) > const.SOCKET_TIMEOUT_SEC: 73 | raise Exception("FATAL: data_receiver_thread: timeout during data socket read") 74 | else: 75 | socket_timeout_timer_active = True 76 | socket_timeout_timer_start_time = time.time() 77 | 78 | if args.udp and num_bytes_read == len(const.UDP_STOP_MSG) and (bytes_read.decode() == const.UDP_STOP_MSG): 79 | if args.verbosity: 80 | print("data receiver thread: received udp stop message, exiting", flush=True) 81 | break 82 | 83 | if num_bytes_read == 0: 84 | # recv must have timed out, skip the rest of this loop 85 | continue 86 | 87 | curr_time_sec = time.time() 88 | 89 | total_recv_calls += 1 90 | 91 | interval_pkts_received += 1 # valid for udp only 92 | interval_bytes_received += num_bytes_read 93 | 94 | # end of interval 95 | # send interval record over control connection 96 | if curr_time_sec > interval_end_time: 97 | interval_time_sec = curr_time_sec - interval_start_time 98 | 99 | # find the packet send time in the user payload 100 | 101 | a_b_block = None 102 | 103 | idx_of_a = bytes_read.find(b' a ') 104 | if idx_of_a > -1: 105 | idx_of_b = bytes_read.find(b' b ', idx_of_a) 106 | if idx_of_b > -1: 107 | a_b_block = bytes_read[ idx_of_a : idx_of_b + 3 ] 108 | 109 | if a_b_block is None: 110 | # skip sending for this packet, but stay "in" sample interval 111 | continue 112 | 113 | # sending info back to client on control connection 114 | 115 | ba = bytearray() 116 | ba.extend(a_b_block) 117 | ba.extend(str(interval_time_sec).encode()) 118 | ba.extend(b' ') 119 | ba.extend(str(interval_pkts_received).encode()) 120 | ba.extend(b' ') 121 | ba.extend(str(interval_bytes_received).encode()) 122 | ba.extend(b' ') 123 | ba.extend(str(total_recv_calls).encode()) # num of pkts received, valid for udp only 124 | ba.extend(b' c ') 125 | 126 | control_conn.send_bytes(ba) 127 | 128 | interval_bytes_received = 0 129 | interval_pkts_received = 0 130 | 131 | interval_start_time = curr_time_sec 132 | interval_end_time = interval_start_time + const.SAMPLE_INTERVAL_SEC 133 | 134 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 135 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 136 | 137 | 138 | # peer disconnected (or an error) 139 | util.done_with_socket(data_sock) 140 | control_conn.close() 141 | 142 | if args.verbosity: 143 | print("exiting data receiver process", flush=True) 144 | -------------------------------------------------------------------------------- /src/bbperf/util.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import sys 5 | import socket 6 | 7 | from . import const 8 | 9 | 10 | def validate_and_finalize_args(args): 11 | if args.server and args.client: 12 | raise Exception("ERROR: cannot be both client and server") 13 | 14 | if (not args.server) and (not args.client): 15 | raise Exception("ERROR: must be either a client or a server") 16 | 17 | if args.port > 65535: 18 | raise Exception("ERROR: invalid server port {}".format(args.port)) 19 | 20 | if args.verbosity and args.quiet: 21 | raise Exception("ERROR: cannot specify both verbosity and quiet") 22 | 23 | if args.congestion not in [ "cubic", "bbr"]: 24 | raise Exception("ERROR: congestion control algorithm must be either cubic or bbr, but found {}".format(args.congestion)) 25 | 26 | if args.graph_file and (not args.graph_file.endswith(".png")): 27 | raise Exception("ERROR: argument --graph-file must end with \".png\"") 28 | 29 | # set max_run_time_failsafe_sec 30 | # never run longer than this under any circumstances 31 | max_run_time_failsafe_sec = const.MAX_DURATION_CALIBRATION_TIME_SEC 32 | 33 | if args.udp: 34 | max_run_time_failsafe_sec += const.DATA_SAMPLE_IGNORE_TIME_UDP_MAX_SEC 35 | else: 36 | max_run_time_failsafe_sec += const.DATA_SAMPLE_IGNORE_TIME_TCP_MAX_SEC 37 | 38 | max_run_time_failsafe_sec += const.MAX_DATA_COLLECTION_TIME_WITHOUT_VALID_DATA 39 | max_run_time_failsafe_sec += args.time 40 | 41 | d = vars(args) 42 | d["max_run_time_failsafe_sec"] = max_run_time_failsafe_sec 43 | 44 | 45 | def convert_udp_pps_to_batch_size(packets_per_sec): 46 | 47 | batch_size = int(packets_per_sec / const.UDP_BATCHES_PER_SECOND) 48 | 49 | if batch_size < 1: 50 | batch_size = 1 51 | 52 | return batch_size 53 | 54 | 55 | def done_with_socket(mysock): 56 | try: 57 | mysock.shutdown(socket.SHUT_RDWR) 58 | except: 59 | pass 60 | 61 | try: 62 | mysock.close() 63 | except: 64 | pass 65 | 66 | 67 | def threads_are_running(thread_list): 68 | any_running = False 69 | 70 | for t in thread_list: 71 | if t.is_alive(): 72 | any_running = True 73 | else: 74 | if t.exitcode != 0: 75 | # thread existing abnormally -- kill everything and go home 76 | raise Exception("FATAL: one of the subprocesses existed abnormally, name: {}, exitcode: {}".format(t.name, t.exitcode)) 77 | 78 | return any_running 79 | 80 | 81 | def parse_r_record(args, s1): 82 | r_record = {} 83 | 84 | swords = s1.split() 85 | 86 | # literal "a" 87 | r_record["r_record_type"] = swords[1] 88 | r_record["r_pkt_sent_time_sec"] = float(swords[2]) 89 | r_record["r_sender_interval_duration_sec"] = float(swords[3]) 90 | r_record["r_sender_interval_pkts_sent"] = int(swords[4]) # valid for udp only 91 | r_record["r_sender_interval_bytes_sent"] = int(swords[5]) 92 | r_record["r_sender_total_pkts_sent"] = int(swords[6]) # valid for udp only 93 | # literal "b" 94 | r_record["r_receiver_interval_duration_sec"] = float(swords[8]) 95 | r_record["r_receiver_interval_pkts_received"] = int(swords[9]) # valid for udp only 96 | r_record["r_receiver_interval_bytes_received"] = int(swords[10]) 97 | r_record["r_receiver_total_pkts_received"] = int(swords[11]) # valid for udp only 98 | # literal "c" 99 | r_record["r_pkt_received_time_sec"] = float(swords[13]) 100 | r_record["interval_dropped"] = int(swords[14]) 101 | r_record["interval_dropped_percent"] = float(swords[15]) 102 | r_record["is_sample_valid"] = int(swords[16]) 103 | # literal "d" 104 | 105 | r_record["rtt_sec"] = r_record["r_pkt_received_time_sec"] - r_record["r_pkt_sent_time_sec"] 106 | r_record["rtt_ms"] = r_record["rtt_sec"] * 1000 107 | 108 | try: 109 | # first record received has zeros 110 | sender_interval_rate_bps = (r_record["r_sender_interval_bytes_sent"] * 8.0) / r_record["r_sender_interval_duration_sec"] 111 | except ZeroDivisionError: 112 | sender_interval_rate_bps = 0 113 | 114 | r_record["sender_interval_rate_mbps"] = sender_interval_rate_bps / (10 ** 6) 115 | 116 | r_record["receiver_interval_rate_bytes_per_sec"] = r_record["r_receiver_interval_bytes_received"] / r_record["r_receiver_interval_duration_sec"] 117 | 118 | receiver_interval_rate_bps = r_record["receiver_interval_rate_bytes_per_sec"] * 8 119 | r_record["receiver_interval_rate_mbps"] = receiver_interval_rate_bps / (10 ** 6) 120 | 121 | r_record["buffered_bytes"] = int( r_record["receiver_interval_rate_bytes_per_sec"] * r_record["rtt_sec"] ) 122 | 123 | if args.udp: 124 | try: 125 | # first record received has zeroes 126 | r_record["sender_pps"] = int(r_record["r_sender_interval_pkts_sent"] / r_record["r_sender_interval_duration_sec"]) 127 | except ZeroDivisionError: 128 | r_record["sender_pps"] = 0 129 | 130 | r_record["receiver_pps"] = int(r_record["r_receiver_interval_pkts_received"] / r_record["r_receiver_interval_duration_sec"]) 131 | r_record["total_dropped"] = r_record["r_sender_total_pkts_sent"] - r_record["r_receiver_total_pkts_received"] 132 | if r_record["total_dropped"] < 0: 133 | # this can happen if we happen to pick up an "early" a_b block (probably just negative by 1 or 2, not a big deal) 134 | r_record["total_dropped"] = 0 135 | 136 | else: 137 | r_record["sender_pps"] = -1 138 | r_record["receiver_pps"] = -1 139 | r_record["total_dropped"] = -1 140 | 141 | return r_record 142 | 143 | 144 | def validate_data_connection(args, run_id, data_connection_initial_str): 145 | w = data_connection_initial_str.split(" ") 146 | 147 | if w[0] != "data": 148 | raise Exception("ERROR: data connection invalid (does not start with data) run_id {} received {}".format( 149 | run_id, data_connection_initial_str)) 150 | 151 | data_connection_run_id = w[1] 152 | 153 | if data_connection_run_id != run_id: 154 | raise Exception("ERROR: data connection invalid (run_id incorrect) control run_id {} data run_id {} received {}".format( 155 | run_id, data_connection_run_id, data_connection_initial_str)) 156 | 157 | if args.verbosity: 158 | print("data run_id is valid", flush=True) 159 | -------------------------------------------------------------------------------- /src/bbperf/data_sender_thread.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | import socket 6 | import select 7 | 8 | from . import util 9 | from . import const 10 | from . import udp_helper 11 | 12 | 13 | # falling off the end of this method terminates the process 14 | def run(args, data_sock, peer_addr, shared_run_mode, shared_udp_sending_rate_pps): 15 | if args.verbosity: 16 | print("data sender: start of process", flush=True) 17 | 18 | # udp autorate 19 | if args.udp: 20 | udp_pps = shared_udp_sending_rate_pps.value 21 | udp_batch_size = util.convert_udp_pps_to_batch_size(udp_pps) 22 | 23 | # start sending 24 | 25 | if args.verbosity: 26 | print("data sender: sending", flush=True) 27 | 28 | start_time_sec = time.time() 29 | calibration_start_time = start_time_sec 30 | curr_time_sec = start_time_sec 31 | 32 | interval_start_time = curr_time_sec 33 | interval_end_time = interval_start_time + const.SAMPLE_INTERVAL_SEC 34 | 35 | interval_time_sec = 0.0 36 | interval_send_count = 0 37 | interval_bytes_sent = 0 38 | 39 | accum_send_count = 0 40 | accum_bytes_sent = 0 41 | 42 | total_send_counter = 1 43 | num_negative_delay = 0 44 | 45 | while True: 46 | curr_time_sec = time.time() 47 | 48 | if (shared_run_mode.value == const.RUN_MODE_CALIBRATING): 49 | # double the limit to avoid a race condition with the run mode manager 50 | if curr_time_sec > (calibration_start_time + (2 * const.MAX_DURATION_CALIBRATION_TIME_SEC)): 51 | error_msg = "FATAL: data_sender_thread: time in calibration exceeded max allowed" 52 | print(error_msg, flush=True) 53 | raise Exception(error_msg) 54 | 55 | is_calibrated = False 56 | else: 57 | is_calibrated = True 58 | 59 | record_type = b'run' if is_calibrated else b'cal' 60 | 61 | # we want to be fast here, since this is data write loop, so use ba.extend 62 | 63 | ba = bytearray(b' a ' + 64 | record_type + b' ' + 65 | str(curr_time_sec).encode() + b' ' + 66 | str(interval_time_sec).encode() + b' ' + 67 | str(interval_send_count).encode() + b' ' + 68 | str(interval_bytes_sent).encode() + b' ' + 69 | str(total_send_counter).encode() + b' b ') 70 | 71 | if args.udp: 72 | ba.extend(const.PAYLOAD_1K) 73 | elif is_calibrated: 74 | ba.extend(const.PAYLOAD_128K) 75 | else: 76 | ba.extend(const.PAYLOAD_1K) 77 | 78 | # send an entire batch 79 | 80 | batch_size = udp_batch_size if args.udp else 1 81 | 82 | try: 83 | batch_counter = 0 84 | 85 | while batch_counter < batch_size: 86 | # blocking 87 | # we want to block here, as blocked time should "count" 88 | 89 | if args.udp: 90 | num_bytes_sent = data_sock.sendto(ba, peer_addr) 91 | else: 92 | # tcp 93 | # we use select to take advantage of tcp_notsent_lowat 94 | # timeout is 20 seconds 95 | _, _, _ = select.select( [], [data_sock], [], 20.0) 96 | num_bytes_sent = data_sock.send(ba) 97 | 98 | if num_bytes_sent <= 0: 99 | raise Exception("ERROR: data_sender_thread.run(): send failed") 100 | 101 | batch_counter += 1 102 | total_send_counter += 1 103 | accum_send_count += 1 104 | accum_bytes_sent += num_bytes_sent 105 | 106 | except ConnectionResetError: 107 | print("Connection reset by peer", flush=True) 108 | # exit process 109 | break 110 | 111 | except BrokenPipeError: 112 | # this can happen at the end of a tcp reverse test 113 | print("broken pipe error", flush=True) 114 | # exit process 115 | break 116 | 117 | except BlockingIOError: 118 | # same as EAGAIN EWOULDBLOCK 119 | # we did not send, loop back up and try again 120 | continue 121 | 122 | except socket.timeout: 123 | error_msg = "FATAL: data_sender_thread: socket timeout" 124 | print(error_msg, flush=True) 125 | raise Exception(error_msg) 126 | 127 | # end of batch loop 128 | 129 | curr_time_sec = time.time() 130 | 131 | if curr_time_sec > interval_end_time: 132 | interval_time_sec = curr_time_sec - interval_start_time 133 | interval_send_count = accum_send_count 134 | interval_bytes_sent = accum_bytes_sent 135 | 136 | interval_start_time = curr_time_sec 137 | interval_end_time = interval_start_time + const.SAMPLE_INTERVAL_SEC 138 | accum_send_count = 0 139 | accum_bytes_sent = 0 140 | 141 | # update udp autorate 142 | if args.udp: 143 | udp_pps = shared_udp_sending_rate_pps.value 144 | udp_batch_size = util.convert_udp_pps_to_batch_size(udp_pps) 145 | 146 | # send very slowly at first to establish unloaded latency 147 | if not is_calibrated: 148 | time.sleep(0.2) 149 | if args.udp: 150 | # initialize udp batch start time here in case next loop is batch processing 151 | current_udp_batch_start_time = time.time() 152 | continue 153 | 154 | # normal end of test 155 | if shared_run_mode.value == const.RUN_MODE_STOP: 156 | break 157 | 158 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 159 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 160 | 161 | # pause between udp batches if necessary 162 | if args.udp: 163 | delay_sec = const.UDP_DELAY_BETWEEN_BATCH_STARTS - (curr_time_sec - current_udp_batch_start_time) 164 | if delay_sec < 0: 165 | num_negative_delay += 1 166 | if (num_negative_delay % const.UDP_NEGATIVE_DELAY_BETWEEN_BATCHES_WARNING_EVERY) == 0: 167 | print("WARNING: udp sender is cpu constrained, results may be invalid: {}".format(num_negative_delay), flush=True) 168 | elif delay_sec > 0: 169 | time.sleep(delay_sec) 170 | current_udp_batch_start_time += const.UDP_DELAY_BETWEEN_BATCH_STARTS 171 | 172 | 173 | # send STOP message 174 | if args.udp: 175 | if args.verbosity: 176 | print("data sender: sending udp stop message", flush=True) 177 | udp_helper.send_stop_message(data_sock, peer_addr) 178 | 179 | util.done_with_socket(data_sock) 180 | 181 | if args.verbosity: 182 | print("data sender: end of process", flush=True) 183 | -------------------------------------------------------------------------------- /src/bbperf/control_receiver_thread.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import time 5 | 6 | from . import util 7 | from . import const 8 | 9 | from .exceptions import PeerDisconnectedException 10 | from .udp_rate_manager_class import UdpRateManagerClass 11 | from .run_mode_manager_class import RunModeManagerClass 12 | 13 | # direction up, runs on client 14 | # args are client args (not server args) 15 | # falling off the end of this method terminates the process 16 | def run_recv_term_queue(readyevent, args, control_conn, results_queue, shared_run_mode, shared_udp_sending_rate_pps): 17 | if args.verbosity: 18 | print("starting control receiver process: run_recv_term_queue", flush=True) 19 | 20 | run_mode_manager = RunModeManagerClass(args, shared_run_mode) 21 | udp_rate_manager = UdpRateManagerClass(args, shared_udp_sending_rate_pps) 22 | 23 | readyevent.set() 24 | 25 | start_time_sec = time.time() 26 | 27 | while True: 28 | 29 | try: 30 | bytes_read = control_conn.recv_a_c_block() 31 | 32 | except ConnectionResetError: 33 | if args.verbosity: 34 | print("connection reset error", flush=True) 35 | # exit process 36 | break 37 | 38 | except PeerDisconnectedException: 39 | if args.verbosity: 40 | print("peer disconnected (control socket)", flush=True) 41 | # exit process 42 | break 43 | 44 | curr_time_sec = time.time() 45 | curr_time_str = str(curr_time_sec) 46 | 47 | received_str = bytes_read.decode() 48 | 49 | # the zeroes will be updated below 50 | tmp_str = received_str + curr_time_str + " 0 0 0 d " 51 | 52 | r_record = util.parse_r_record(args, tmp_str) 53 | 54 | # updates shared_run_mode 55 | # r_record["interval_dropped"] 56 | # r_record["interval_dropped_percent"] 57 | # r_record["is_sample_valid"] 58 | run_mode_manager.update(r_record) 59 | 60 | if args.udp: 61 | udp_rate_manager.update(r_record) 62 | 63 | new_str = (received_str + curr_time_str + " " + 64 | str(r_record["interval_dropped"]) + " " + 65 | str(r_record["interval_dropped_percent"]) + " " + 66 | str(r_record["is_sample_valid"]) + " d ") 67 | 68 | results_queue.put(new_str) 69 | 70 | if args.verbosity > 3: 71 | print("control receiver process: {}".format(new_str), flush=True) 72 | 73 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 74 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 75 | 76 | control_conn.close() 77 | 78 | if args.verbosity: 79 | print("exiting control receiver process: run_recv_term_queue", flush=True) 80 | 81 | 82 | # direction down, runs on server 83 | # args are client args (not server args) 84 | # falling off the end of this method terminates the process 85 | def run_recv_term_send(readyevent, args, control_conn, shared_run_mode, shared_udp_sending_rate_pps): 86 | if args.verbosity: 87 | print("starting control receiver process: run_recv_term_send", flush=True) 88 | 89 | run_mode_manager = RunModeManagerClass(args, shared_run_mode) 90 | udp_rate_manager = UdpRateManagerClass(args, shared_udp_sending_rate_pps) 91 | 92 | readyevent.set() 93 | 94 | start_time_sec = time.time() 95 | 96 | while True: 97 | 98 | try: 99 | bytes_read = control_conn.recv_a_c_block() 100 | 101 | except ConnectionResetError: 102 | if args.verbosity: 103 | print("connection reset error", flush=True) 104 | # exit process 105 | break 106 | 107 | except PeerDisconnectedException: 108 | if args.verbosity: 109 | print("peer disconnected (control socket)", flush=True) 110 | # exit process 111 | break 112 | 113 | curr_time_sec = time.time() 114 | curr_time_str = str(curr_time_sec) 115 | 116 | received_str = bytes_read.decode() 117 | 118 | # the zeroes will be updated below 119 | tmp_str = received_str + curr_time_str + " 0 0 0 d " 120 | 121 | r_record = util.parse_r_record(args, tmp_str) 122 | 123 | # updates shared_run_mode 124 | # r_record["interval_dropped"] 125 | # r_record["interval_dropped_percent"] 126 | # r_record["is_sample_valid"] 127 | run_mode_manager.update(r_record) 128 | 129 | if args.udp: 130 | udp_rate_manager.update(r_record) 131 | 132 | new_str = (received_str + curr_time_str + " " + 133 | str(r_record["interval_dropped"]) + " " + 134 | str(r_record["interval_dropped_percent"]) + " " + 135 | str(r_record["is_sample_valid"]) + " d ") 136 | 137 | control_conn.send_string(new_str) 138 | 139 | if args.verbosity > 3: 140 | print("control receiver process: {}".format(new_str), flush=True) 141 | 142 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 143 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 144 | 145 | control_conn.close() 146 | 147 | if args.verbosity: 148 | print("exiting control receiver process: run_recv_term_send", flush=True) 149 | 150 | 151 | # direction down, runs on client (passthru) 152 | # args are client args (not server args) -- this always runs on client 153 | # falling off the end of this method terminates the process 154 | def run_recv_queue(readyevent, args, control_conn, results_queue): 155 | if args.verbosity: 156 | print("starting control receiver process: run_recv_queue", flush=True) 157 | 158 | readyevent.set() 159 | 160 | start_time_sec = time.time() 161 | 162 | while True: 163 | try: 164 | bytes_read = control_conn.recv_a_d_block() 165 | 166 | except ConnectionResetError: 167 | if args.verbosity: 168 | print("connection reset error", flush=True) 169 | # exit process 170 | break 171 | 172 | except PeerDisconnectedException: 173 | if args.verbosity: 174 | print("peer disconnected (control socket)", flush=True) 175 | # exit process 176 | break 177 | 178 | curr_time_sec = time.time() 179 | 180 | received_str = bytes_read.decode() 181 | 182 | # passthru as is 183 | results_queue.put(received_str) 184 | 185 | if args.verbosity > 3: 186 | print("control receiver process: {}".format(received_str), flush=True) 187 | 188 | if ((curr_time_sec - start_time_sec) > args.max_run_time_failsafe_sec): 189 | raise Exception("ERROR: max_run_time_failsafe_sec exceeded") 190 | 191 | control_conn.close() 192 | 193 | if args.verbosity: 194 | print("exiting control receiver process: run_recv_queue", flush=True) 195 | -------------------------------------------------------------------------------- /src/bbperf/output.py: -------------------------------------------------------------------------------- 1 | # Copyright (c) 2024 Cloudflare, Inc. 2 | # Licensed under the Apache 2.0 license found in the LICENSE file or at https://www.apache.org/licenses/LICENSE-2.0 3 | 4 | import os 5 | import time 6 | import tempfile 7 | 8 | from . import const 9 | from . import util 10 | 11 | from .json_output_class import JsonOutputClass 12 | 13 | 14 | args = None 15 | tmpfile1 = None 16 | tmpfile2 = None 17 | last_line_to_stdout_time = 0 18 | print_header1 = True 19 | print_header2 = True 20 | print_header3 = True 21 | relative_start_time_sec = None 22 | json_output = None 23 | unloaded_latency_rtt_ms = None 24 | 25 | 26 | def init(args0): 27 | global args 28 | global tmpfile1 29 | global tmpfile2 30 | global json_output 31 | 32 | args = args0 33 | 34 | # create and open file 35 | 36 | if args.udp: 37 | tmp_graph_filename_prefix = "bbperf-graph-data-udp-" 38 | tmp_raw_filename_prefix = "bbperf-raw-data-udp-" 39 | else: 40 | tmp_graph_filename_prefix = "bbperf-graph-data-tcp-" 41 | tmp_raw_filename_prefix = "bbperf-raw-data-tcp-" 42 | 43 | tmpfile1 = tempfile.NamedTemporaryFile(prefix=tmp_graph_filename_prefix, delete=False) 44 | tmpfile2 = tempfile.NamedTemporaryFile(prefix=tmp_raw_filename_prefix, delete=False) 45 | 46 | json_output = JsonOutputClass(args) 47 | 48 | 49 | def get_graph_data_file_name(): 50 | return tmpfile1.name 51 | 52 | def get_raw_data_file_name(): 53 | return tmpfile2.name 54 | 55 | def term(): 56 | tmpfile1.close() 57 | tmpfile2.close() 58 | 59 | json_output.write_output() 60 | 61 | 62 | def delete_data_files(): 63 | if args.verbosity: 64 | print("deleting graph data file: {}".format(tmpfile1.name), flush=True) 65 | print("deleting raw data file: {}".format(tmpfile2.name), flush=True) 66 | 67 | os.remove(tmpfile1.name) 68 | os.remove(tmpfile2.name) 69 | 70 | 71 | def write_data_to_file(lineout, fileout): 72 | lineout_bytes = "{}\n".format(lineout).encode() 73 | fileout.file.write(lineout_bytes) 74 | 75 | def write_raw_data_to_file(lineout): 76 | write_data_to_file(lineout, tmpfile2) 77 | 78 | def write_graph_data_to_file(lineout): 79 | write_data_to_file(lineout, tmpfile1) 80 | 81 | 82 | # keep in mind here that the interval data is coming in at a faster 83 | # rate than what we want to (normally) display on stdout 84 | 85 | def print_output(s1): 86 | global args 87 | global last_line_to_stdout_time 88 | global print_header1 89 | global print_header2 90 | global print_header3 91 | global relative_start_time_sec 92 | global unloaded_latency_rtt_ms 93 | 94 | write_raw_data_to_file(s1) 95 | 96 | curr_time = time.time() 97 | 98 | r_record = util.parse_r_record(args, s1) 99 | 100 | if relative_start_time_sec is None: 101 | # first incoming result has arrived 102 | relative_start_time_sec = r_record["r_pkt_sent_time_sec"] 103 | relative_pkt_sent_time_sec = 0 104 | relative_pkt_received_time_sec = 0 105 | else: 106 | relative_pkt_sent_time_sec = r_record["r_pkt_sent_time_sec"] - relative_start_time_sec 107 | relative_pkt_received_time_sec = r_record["r_pkt_received_time_sec"] - relative_start_time_sec 108 | 109 | if r_record["r_record_type"] == "run": 110 | json_output.set_unloaded_rtt_ms(unloaded_latency_rtt_ms) 111 | 112 | bdp_bytes = int( r_record["receiver_interval_rate_bytes_per_sec"] * (unloaded_latency_rtt_ms / 1000.0) ) 113 | 114 | if bdp_bytes > 0: 115 | bloat_factor = float(r_record["buffered_bytes"]) / bdp_bytes 116 | else: 117 | bloat_factor = 0 118 | 119 | if print_header3: 120 | lineout = "sent_time recv_time sender_pps sender_Mbps receiver_pps receiver_Mbps unloaded_rtt_ms rtt_ms BDP_bytes buffered_bytes bloat_factor pkts_dropped pkts_dropped_percent" 121 | write_graph_data_to_file(lineout) 122 | print_header3 = False 123 | 124 | # write to file the same data and same rate as what we are receiving over the control connection 125 | lineout = "{} {} {} {} {} {} {} {} {} {} {} {} {}".format( 126 | relative_pkt_sent_time_sec, 127 | relative_pkt_received_time_sec, 128 | r_record["sender_pps"], 129 | r_record["sender_interval_rate_mbps"], 130 | r_record["receiver_pps"], 131 | r_record["receiver_interval_rate_mbps"], 132 | unloaded_latency_rtt_ms, 133 | r_record["rtt_ms"], 134 | bdp_bytes, 135 | r_record["buffered_bytes"], 136 | bloat_factor, 137 | r_record["interval_dropped"], 138 | r_record["interval_dropped_percent"] 139 | ) 140 | 141 | write_graph_data_to_file(lineout) 142 | 143 | # add to JSON output 144 | excess = r_record["buffered_bytes"] - bdp_bytes 145 | if excess < 0: 146 | excess = 0 147 | new_entry = { 148 | "sent_time_sec": r_record["r_pkt_sent_time_sec"], 149 | "received_time_sec": r_record["r_pkt_received_time_sec"], 150 | "loaded_rtt_ms": r_record["rtt_ms"], 151 | "sender_throughput_rate_mbps": r_record["sender_interval_rate_mbps"], 152 | "receiver_throughput_rate_mbps": r_record["receiver_interval_rate_mbps"], 153 | "bdp_bytes": bdp_bytes, 154 | "excess_buffered_bytes": excess, 155 | "receiver_pps": r_record["receiver_pps"], 156 | "pkt_loss_percent": r_record["interval_dropped_percent"], 157 | "is_sample_valid": r_record["is_sample_valid"] 158 | } 159 | json_output.add_entry(new_entry) 160 | 161 | # write to stdout at the rate of one line per second 162 | # each stdout line will be a 0.1s snapshot 163 | if ((curr_time > (last_line_to_stdout_time + const.STDOUT_INTERVAL_SEC)) and not args.quiet) or args.verbosity > 2: 164 | if print_header2: 165 | print(" sent_time recv_time sender_Mbps receiver_Mbps sender_pps receiver_pps unloaded_rtt_ms rtt_ms BDP_bytes buffered_bytes bloat pkts_dropped drop%", flush=True) 166 | print_header2 = False 167 | 168 | if r_record["interval_dropped_percent"] < 0: 169 | dropped_percent_str = " n/a" 170 | else: 171 | dropped_percent_str = "{:6.3f}%".format(r_record["interval_dropped_percent"]) 172 | 173 | print("{:11.6f} {:11.6f} {:11.3f} {:11.3f} {:8d} {:8d} {:8.3f} {:9.3f} {:9d} {:9d} {:6.1f}x {:6d} {}".format( 174 | relative_pkt_sent_time_sec, 175 | relative_pkt_received_time_sec, 176 | r_record["sender_interval_rate_mbps"], 177 | r_record["receiver_interval_rate_mbps"], 178 | r_record["sender_pps"], 179 | r_record["receiver_pps"], 180 | unloaded_latency_rtt_ms, 181 | r_record["rtt_ms"], 182 | bdp_bytes, 183 | r_record["buffered_bytes"], 184 | bloat_factor, 185 | r_record["interval_dropped"], 186 | dropped_percent_str 187 | ), 188 | flush=True) 189 | 190 | last_line_to_stdout_time = curr_time 191 | 192 | else: 193 | # calibrating 194 | # do we have a new unloaded latency? 195 | if (unloaded_latency_rtt_ms is None) or (r_record["rtt_ms"] < unloaded_latency_rtt_ms): 196 | unloaded_latency_rtt_ms = r_record["rtt_ms"] 197 | 198 | if ((curr_time > (last_line_to_stdout_time + const.STDOUT_INTERVAL_SEC)) and not args.quiet) or args.verbosity > 2: 199 | if print_header1: 200 | print("calibrating", flush=True) 201 | print(" sent_time recv_time rtt_ms", flush=True) 202 | print_header1 = False 203 | 204 | print("{:11.6f} {:11.6f} {:11.6f}".format( 205 | relative_pkt_sent_time_sec, 206 | relative_pkt_received_time_sec, 207 | unloaded_latency_rtt_ms 208 | ), 209 | flush=True) 210 | 211 | last_line_to_stdout_time = curr_time 212 | -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- 1 |
bbperf - An end-to-end performance and bufferbloat measurement tool
2 | 3 | `bbperf` measures what matters most. 4 | 5 | Traditional network performance measurement tools collect metrics such as latency and throughput regardless of the conditions that exist during the collection period. While valuable for many uses, that approach can miss reporting the actual performance that real user payloads experience on production networks. This tool only reports performance metrics when the flow is operating at "max buffer usage". Max buffer usage is when the active flow has filled any and all buffers that exist along the packet path between the endpoints. 6 | 7 | User payload is used to measure latency and throughput. This accounts for the performance impact of transparent proxies, transparent tunnels, transparent firewalls, and all the other things that are not visible to the endpoints. It also simplifies the interpretation of retransmissions on user performance, which is non-intuitive at best. This is because some retransmissions are due to the real loss of user payload while many are not. In this tool, the loss of user payload will show up in the latency and throughput metrics, i.e. higher latencies and lower throughput. 8 | 9 | Features: 10 | 11 | * Latency, both unloaded and loaded, is measured by the same flow that is under test. 12 | 13 | Other tools will commonly measure latency using a different flow or different protocol. One of the reasons why using different protocols and/or different flows is not desirable is because fair queuing will cause the latency of those other flows to be much lower (better) than the flow that matters. 14 | 15 | * Throughput 16 | 17 | Both sender and receiver rates are collected, but the receiver rate (a.k.a. goodput) is the important one. 18 | 19 | * Bufferbloat is calculated 20 | 21 | It is often assumed that TCP receive buffers are the only source of bufferbloat. While that is common, it misses many other locations where bufferbloat may occur. This tool reports the effects of all sources of bufferbloat, not just TCP receive buffers. 22 | 23 | `bbperf` calculates both the BDP (bandwidth delay product) and the total amount of buffer actually used. The difference between those two is reported as "excess buffer usage". A small number for this metric is normal and expected, but a large number, relative to BDP, is bufferbloat. Bufferbloat also appears as a large difference between unloaded and loaded latency. 24 | 25 | * Both TCP and UDP are supported 26 | 27 | Both benchmark tests will wait until it has reached "max buffer usage" before collecting metrics data. For TCP, it will wait for the sending and receiving rates to match. For UDP, the sending rate will be automatically adjusted to be just above the maximum packet rate without dropping packets before starting its metrics collection. 28 | 29 | * `bbperf` measures the performance of data flow in one direction only. 30 | 31 | Network routing can be asymmetric, bottleneck links are asymmetric, bufferbloat is asymmetric, all of which means that performance is asymmetric. `bbperf` allows us to see the asymmetry. 32 | 33 | Data flow in `bbperf` is one way. The direction of data flow is from the client host to the server host (unless the `-R` option is specified). That is the direction being measured, and is what is reported in the metrics. 34 | 35 | Latency is measured round trip, but the return traffic (from the data receiver back to the data sender) is low-volume and should not contribute any bufferbloat-related latency to the measurement. This cannot be guaranteed, in the same way that it cannot be guaranteed that the unloaded latency measurement does not contain any bufferbloat-induced latency. But it does ensure that no bufferbloat-induced latency is cause by `bbperf`s own flow. 36 | 37 | * Automatic generation of graphs 38 | 39 | ### Usage 40 | 41 | To run a test: 42 | 43 | 1. Start the server on one host 44 | ``` 45 | $ bbperf.py -s 46 | ``` 47 | 48 | 2. Run the client on another host 49 | ``` 50 | $ bbperf.py -c