Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,14 @@ if(BUILD_TESTS)
"1,write,3000000,primary"
)

add_piccolo_test(
NAME pi_basic_blocking
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/basicperf.py
CLIENT_BIN ./submit PERF_LABEL "Basic Blocking"
ADDITIONAL_ARGS --package "samples/apps/basic/basic" --client-def
"128,blocking_write,100,primary" --max-writes-ahead 0
)

add_piccolo_test(
NAME pi_basic_js
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/basicperf.py
Expand Down
14 changes: 14 additions & 0 deletions samples/apps/basic/basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ namespace basicapp
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.install();

make_endpoint(
"/records/blocking/{key}", HTTP_PUT, put, {ccf::user_cert_auth_policy})
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.set_consensus_committed_function(
ccf::endpoints::default_respond_on_commit_func)
.install();

auto get = [this](ccf::endpoints::ReadOnlyEndpointContext& ctx) {
std::string key;
std::string error;
Expand Down Expand Up @@ -95,6 +102,13 @@ namespace basicapp
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.install();

make_read_only_endpoint(
"/records/blocking/{key}", HTTP_GET, get, {ccf::user_cert_auth_policy})
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.set_consensus_committed_function(
ccf::endpoints::default_respond_on_commit_func)
.install();

auto post = [](ccf::endpoints::EndpointContext& ctx) {
const nlohmann::json body =
nlohmann::json::parse(ctx.rpc_ctx->get_request_body());
Expand Down
129 changes: 99 additions & 30 deletions tests/infra/basicperf.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,62 @@ def read_from_key_space(
)


def blocking_write_to_key_space(
key_space: List[str],
iterations: int,
msgs: generator.Messages,
additional_headers: Dict[str, str],
):
LOG.info(
f"Workload: {iterations} blocking writes to a range of {len(key_space)} keys"
)
indices = list(range(iterations))
random.shuffle(indices)
for index in indices:
key = key_space[index % len(key_space)]
msgs.append(
f"/records/blocking/{key}",
"PUT",
additional_headers=additional_headers,
body=f"{hashlib.sha256(key.encode()).hexdigest()}",
content_type="text/plain",
)


def blocking_read_from_key_space(
key_space: List[str],
iterations: int,
msgs: generator.Messages,
additional_headers: Dict[str, str],
):
LOG.info(
f"Workload: {iterations} blocking reads from a range of {len(key_space)} keys"
)
indices = list(range(iterations))
random.shuffle(indices)
for index in indices:
key = key_space[index % len(key_space)]
msgs.append(
f"/records/blocking/{key}",
"GET",
additional_headers=additional_headers,
content_type="text/plain",
)


def append_to_msgs(definition, key_space, iterations, msgs, additional_headers):
if definition == "write":
return write_to_key_space(key_space, iterations, msgs, additional_headers)
elif definition == "read":
return read_from_key_space(key_space, iterations, msgs, additional_headers)
elif definition == "blocking_write":
return blocking_write_to_key_space(
key_space, iterations, msgs, additional_headers
)
elif definition == "blocking_read":
return blocking_read_from_key_space(
key_space, iterations, msgs, additional_headers
)
elif definition.startswith("rwmix:"):
_, ratio = definition.split(":")
assert iterations % 1000 == 0
Expand Down Expand Up @@ -576,13 +627,13 @@ def _shades(n, start, end):
]

def blue_shades(n):
return _shades(n, (59, 142, 234), (100, 100, 255))
return _shades(n, (59, 142, 234), (180, 180, 255))

def green_shades(n):
return _shades(n, (13, 188, 121), (100, 255, 100))
return _shades(n, (13, 188, 121), (180, 255, 180))

def red_shades(n):
return _shades(n, (205, 49, 49), (255, 100, 100))
return _shades(n, (205, 49, 49), (255, 180, 180))

def format_title(title, width):
"""Centered title bar in the style of plotext's simple_bar charts."""
Expand Down Expand Up @@ -610,11 +661,6 @@ def color_legend(names, *color_groups):
)
print(format_title(" ".join(parts), chart_width), end="")

# Generate per-client color palettes
sent_colors = blue_shades(n_clients)
rcvd_colors = green_shades(n_clients)
error_colors = red_shades(n_clients)

# Compute per-client, per-second counts in a single pass
agg_with_seconds = agg.with_columns(
((pl.col("sendTime") - start_send) / 1000000)
Expand Down Expand Up @@ -668,15 +714,43 @@ def color_legend(names, *color_groups):

seconds = sorted(all_seconds)

use_builtin_legend = n_clients <= max_builtin_legend_clients
# With many clients, bin them into groups so each group gets
# enough bar width to be visible. Target ~chart_width/4 groups max.
max_stacked_layers = chart_width // 4
if n_clients > max_stacked_layers:
group_size = (
n_clients + max_stacked_layers - 1
) // max_stacked_layers
groups = [
client_names[i : i + group_size]
for i in range(0, n_clients, group_size)
]
group_names = [
f"{g[0]}-{g[-1]}" if len(g) > 1 else g[0] for g in groups
]
else:
groups = [[cn] for cn in client_names]
group_names = client_names

# Stacked bar: sent per second, blue shades per client
sent_stacks = [
[client_sent[cn].get(s, 0) for s in seconds] for cn in client_names
]
n_groups = len(groups)
sent_colors = blue_shades(n_groups)
rcvd_colors = green_shades(n_groups)
error_colors = red_shades(n_groups)
Comment thread
eddyashton marked this conversation as resolved.

def group_stacks(per_client_dict):
return [
[
sum(per_client_dict[cn].get(s, 0) for cn in group)
for s in seconds
]
for group in groups
]

# Stacked bar: sent per second
sent_stacks = group_stacks(client_sent)
sent_kwargs = {"colors": sent_colors}
if use_builtin_legend:
sent_kwargs["labels"] = client_names
if n_groups <= max_builtin_legend_clients:
sent_kwargs["labels"] = group_names
plt.simple_stacked_bar(
seconds,
sent_stacks,
Expand All @@ -685,21 +759,16 @@ def color_legend(names, *color_groups):
**sent_kwargs,
)
plt.show()
if not use_builtin_legend:
color_legend(client_names, (sent_colors, ""))
if n_groups > max_builtin_legend_clients:
color_legend(group_names, (sent_colors, ""))

# Stacked bar: received per second, green shades per client + red for errors
rcvd_stacks = [
[client_rcvd[cn].get(s, 0) for s in seconds] for cn in client_names
]
error_stacks = [
[client_errors[cn].get(s, 0) for s in seconds]
for cn in client_names
]
# Stacked bar: received per second + errors
rcvd_stacks = group_stacks(client_rcvd)
error_stacks = group_stacks(client_errors)
rcvd_kwargs = {"colors": rcvd_colors + error_colors}
if use_builtin_legend:
rcvd_kwargs["labels"] = client_names + [
f"{cn} errors" for cn in client_names
if n_groups <= max_builtin_legend_clients:
rcvd_kwargs["labels"] = group_names + [
f"{gn} errors" for gn in group_names
]
plt.simple_stacked_bar(
seconds,
Expand All @@ -709,9 +778,9 @@ def color_legend(names, *color_groups):
**rcvd_kwargs,
)
plt.show()
if not use_builtin_legend:
if n_groups > max_builtin_legend_clients:
color_legend(
client_names,
group_names,
(rcvd_colors, ""),
(error_colors, " errors"),
)
Expand Down