Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions golpe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ tablesRaw:
## vals are prefixed with a type byte:
## 0: no compression, payload follows
## 1: zstd compression. Followed by Dictionary ID (native endian uint32) then compressed payload
## 2: zstd compression (no dictionary), compressed payload follows
EventPayload:
flags: 'MDB_INTEGERKEY'

Expand Down
84 changes: 80 additions & 4 deletions src/events.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,39 @@
#include <openssl/sha.h>
#include <negentropy.h>
#include <cstdlib>
#include <algorithm>

#include "events.h"
#include "jsonParseUtils.h"


static int getPayloadZstdLevel() {
static int level = []() {
const char *s = std::getenv("STRFRY_EVENT_PAYLOAD_ZSTD_LEVEL");
if (!s) return 0;

try {
int v = std::stoi(s);
return std::max(0, std::min(v, 22));
} catch (...) {
return 0;
}
}();

return level;
}

static bool shouldRetainReplacedEvents() {
static bool retain = []() {
const char *s = std::getenv("STRFRY_RETAIN_REPLACED_EVENTS");
if (!s) return false;
return std::string(s) == "1" || std::string(s) == "true";
}();

return retain;
}


std::string nostrJsonToPackedEvent(const tao::json::value &v) {
PackedEventTagBuilder tagBuilder;

Expand Down Expand Up @@ -212,6 +241,23 @@ std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::s
if (outDictId) *outDictId = dictId;
if (outCompressedSize) *outCompressedSize = raw.size();
return buf;
} else if (raw[0] == '\x02') {
raw = raw.substr(1);

size_t frameSize = ZSTD_getFrameContentSize(raw.data(), raw.size());
if (frameSize == ZSTD_CONTENTSIZE_ERROR) throw herr("EventPayload zstd frame invalid");

size_t outSize = cfg().events__maxEventSize;
if (frameSize != ZSTD_CONTENTSIZE_UNKNOWN) outSize = std::min<size_t>(frameSize, cfg().events__maxEventSize);

decomp.reserve(outSize);

auto ret = ZSTD_decompress(decomp.buffer.data(), decomp.buffer.size(), raw.data(), raw.size());
if (ZSTD_isError(ret)) throw herr("zstd decompression failed: ", ZSTD_getErrorName(ret));

if (outDictId) *outDictId = 0;
if (outCompressedSize) *outCompressedSize = raw.size();
return std::string_view(decomp.buffer.data(), ret);
} else {
throw herr("Unexpected first byte in EventPayload");
}
Expand Down Expand Up @@ -298,9 +344,20 @@ void writeEvents(lmdb::txn &txn, NegentropyFilterCache &neFilterCache, std::vect

if (otherTimestamp < thisTimestamp ||
(otherTimestamp == thisTimestamp && packed.id() < otherPacked.id())) {
if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(otherPacked.id());
levIdsToDelete.push_back(otherEv.primaryKeyId);
// New event is newer - delete the old one (unless audit mode)
if (shouldRetainReplacedEvents()) {
// Audit mode: keep old event in storage for historical queries.
// The replace index will be updated to point to the new event,
// so NIP-01 queries return only the latest version.
// Old event remains queryable by ID for audit purposes.
if (logLevel >= 1) LI << "Retaining replaced event in storage (audit mode). id=" << to_hex(otherPacked.id());
} else {
if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(otherPacked.id());
levIdsToDelete.push_back(otherEv.primaryKeyId);
}
} else {
// New event is older - reject it (even in audit mode, to avoid index corruption).
// The JSONL audit log in the ingestion layer captures these anyway.
ev.status = EventWriteStatus::Replaced;
}
}
Expand Down Expand Up @@ -328,8 +385,27 @@ void writeEvents(lmdb::txn &txn, NegentropyFilterCache &neFilterCache, std::vect
ev.levId = env.insert_Event(txn, ev.packedStr);

tmpBuf.clear();
tmpBuf += '\x00';
tmpBuf += ev.jsonStr;

int zstdLevel = getPayloadZstdLevel();
if (zstdLevel > 0 && ev.jsonStr.size() > 64) {
std::string compressed;
compressed.resize(ZSTD_compressBound(ev.jsonStr.size()));

auto ret = ZSTD_compress(compressed.data(), compressed.size(), ev.jsonStr.data(), ev.jsonStr.size(), zstdLevel);

if (!ZSTD_isError(ret) && (ret + 1) < ev.jsonStr.size()) {
compressed.resize(ret);
tmpBuf += '\x02';
tmpBuf += compressed;
} else {
tmpBuf += '\x00';
tmpBuf += ev.jsonStr;
}
} else {
tmpBuf += '\x00';
tmpBuf += ev.jsonStr;
}

env.dbi_EventPayload.put(txn, lmdb::to_sv<uint64_t>(ev.levId), tmpBuf);

updateNegentropy(PackedEventView(ev.packedStr), true);
Expand Down
21 changes: 21 additions & 0 deletions strfry.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,27 @@ events {

# Maximum size for tag values, in bytes
maxTagValSize = 1024

# Optional event-payload zstd compression at ingest (experimental).
# Enable with env var on relay process:
# STRFRY_EVENT_PAYLOAD_ZSTD_LEVEL=3
# Valid range: 1-22 (higher = more CPU, potentially better ratio)

# Optional: Retain replaced events for audit trail.
# By default, when a newer replaceable event (kind 0, 3, 10000-19999, 30000-39999)
# arrives, the old version is deleted from storage.
#
# Enable this to KEEP old versions in storage:
# STRFRY_RETAIN_REPLACED_EVENTS=true
#
# Behavior when enabled:
# - Old versions are kept in storage (queryable by event ID)
# - Replace index still points to newest (NIP-01 queries return latest only)
# - Older incoming events are still rejected (to prevent index corruption)
# - Storage usage increases over time
#
# For complete audit including rejected older events, use an external
# JSONL audit log in your ingestion layer.
}

relay {
Expand Down