Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/pixels-retina/include/RGVisibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
template<size_t CAPACITY>
class RGVisibility : public pixels::RetinaBase<RGVisibility<CAPACITY>> {
public:
explicit RGVisibility(uint64_t rgRecordNum);
explicit RGVisibility(uint64_t rgRecordNum, uint64_t timestamp, const std::vector<uint64_t>& initialBitmap);
explicit RGVisibility(uint64_t rgRecordNum, uint64_t timestamp = 0,
const std::vector<uint64_t>* initialBitmap = nullptr);
~RGVisibility() override;

void deleteRGRecord(uint32_t rowId, uint64_t timestamp);
uint64_t* getRGVisibilityBitmap(uint64_t timestamp);

void collectRGGarbage(uint64_t timestamp);
std::vector<uint64_t> collectRGGarbage(uint64_t timestamp);

uint64_t getBitmapSize() const;

Expand Down
14 changes: 3 additions & 11 deletions cpp/pixels-retina/include/RGVisibilityJni.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 25 additions & 10 deletions cpp/pixels-retina/include/TileVisibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ struct VersionedData : public pixels::RetinaBase<VersionedData<CAPACITY>> {
uint64_t baseTimestamp;
DeleteIndexBlock* head; // Delete chain head, part of the version

VersionedData() : baseTimestamp(0), head(nullptr) {
std::memset(baseBitmap, 0, sizeof(baseBitmap));
}

VersionedData(uint64_t ts, const uint64_t* bitmap, DeleteIndexBlock* h)
// timestamp defaults to 0; bitmap defaults to all-zeros.
explicit VersionedData(uint64_t ts = 0, const uint64_t* bitmap = nullptr, DeleteIndexBlock* h = nullptr)
: baseTimestamp(ts), head(h) {
std::memcpy(baseBitmap, bitmap, NUM_WORDS * sizeof(uint64_t));
if (bitmap)
std::memcpy(baseBitmap, bitmap, NUM_WORDS * sizeof(uint64_t));
else
std::memset(baseBitmap, 0, sizeof(baseBitmap));
}
};

Expand All @@ -92,12 +92,12 @@ template<size_t CAPACITY>
class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {
static constexpr size_t NUM_WORDS = BITMAP_WORDS(CAPACITY);
public:
TileVisibility();
TileVisibility(uint64_t ts, const uint64_t* bitmap);
// timestamp defaults to 0; bitmap defaults to all-zeros.
explicit TileVisibility(uint64_t timestamp = 0, const uint64_t* bitmap = nullptr);
~TileVisibility() override;
void deleteTileRecord(uint16_t rowId, uint64_t ts);
void getTileVisibilityBitmap(uint64_t ts, uint64_t* outBitmap) const;
void collectTileGarbage(uint64_t ts);
void collectTileGarbage(uint64_t ts, uint64_t* gcSnapshotBitmap);

private:
TileVisibility(const TileVisibility &) = delete;
Expand All @@ -108,7 +108,22 @@ class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {
std::atomic<VersionedData<CAPACITY>*> currentVersion;
std::atomic<DeleteIndexBlock *> tail;
std::atomic<size_t> tailUsed;
std::vector<RetiredVersion<CAPACITY>> retired; // Protected by GC (single writer)

// Retired versions awaiting epoch-based reclamation. Only the GC thread
// (collectTileGarbage / reclaimRetiredVersions) reads and writes this vector,
// so no locking is needed.
std::vector<RetiredVersion<CAPACITY>> retired;

// Lock-free staging slot between deleteTileRecord (CDC threads) and GC.
// deleteTileRecord's empty-chain path replaces currentVersion but cannot
// write `retired` directly — that would race with the GC thread. Instead
// it atomically stores oldVer here. The GC thread drains this slot at the
// start of collectTileGarbage, moving it into `retired` with a proper epoch.
// Flow: deleteTileRecord → pendingRetire.store → collectTileGarbage →
// pendingRetire.exchange(nullptr) → retired.emplace_back → reclaimRetiredVersions
// At most one version is pending per GC cycle (the empty-chain path fires
// at most once between consecutive GC compactions).
std::atomic<VersionedData<CAPACITY>*> pendingRetire{nullptr};
};

#endif // PIXELS_RETINA_TILE_VISIBILITY_H
51 changes: 21 additions & 30 deletions cpp/pixels-retina/lib/RGVisibility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,23 @@
#include <cstring>
#include <thread>

// Validates before allocation: any throw leaves tileVisibilities as nullptr,
// so the incomplete constructor does not invoke the destructor (no memory leak).
template<size_t CAPACITY>
RGVisibility<CAPACITY>::RGVisibility(uint64_t rgRecordNum)
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY) {
size_t allocSize = tileCount * sizeof(TileVisibility<CAPACITY>);
void* rawMemory = operator new[](allocSize);
tileVisibilities = static_cast<TileVisibility<CAPACITY>*>(rawMemory);
for (uint64_t i = 0; i < tileCount; ++i) {
new (&tileVisibilities[i]) TileVisibility<CAPACITY>();
}
}
RGVisibility<CAPACITY>::RGVisibility(uint64_t rgRecordNum, uint64_t timestamp,
const std::vector<uint64_t>* initialBitmap)
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY),
tileVisibilities(nullptr) {
if (initialBitmap && initialBitmap->size() < tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY)
throw std::invalid_argument("Initial bitmap size is too small for the given record number.");

template<size_t CAPACITY>
RGVisibility<CAPACITY>::RGVisibility(uint64_t rgRecordNum, uint64_t timestamp, const std::vector<uint64_t>& initialBitmap)
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY) {
size_t allocSize = tileCount * sizeof(TileVisibility<CAPACITY>);
void* rawMemory = operator new[](allocSize);
tileVisibilities = static_cast<TileVisibility<CAPACITY>*>(
operator new[](tileCount * sizeof(TileVisibility<CAPACITY>)));

if (initialBitmap.size() < tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY) {
operator delete[](rawMemory);
throw std::runtime_error("Initial bitmap size is too small for the given record number.");
}

tileVisibilities = static_cast<TileVisibility<CAPACITY>*>(rawMemory);
for (uint64_t i = 0; i < tileCount; ++i) {
// Each tile takes 4 uint64_t
const uint64_t* tileBitmap = &initialBitmap[i * BITMAP_SIZE_PER_TILE_VISIBILITY];
// We use timestamp 0 for restored checkpoints to serve as the base state
new (&tileVisibilities[i]) TileVisibility<CAPACITY>(timestamp, tileBitmap);
}
for (uint64_t i = 0; i < tileCount; ++i)
new (&tileVisibilities[i]) TileVisibility<CAPACITY>(
timestamp,
initialBitmap ? initialBitmap->data() + i * BITMAP_SIZE_PER_TILE_VISIBILITY : nullptr);
}

template<size_t CAPACITY>
Expand All @@ -62,11 +50,14 @@ RGVisibility<CAPACITY>::~RGVisibility() {
}

template<size_t CAPACITY>
void RGVisibility<CAPACITY>::collectRGGarbage(uint64_t timestamp) {
// TileVisibility::collectTileGarbage uses COW + Epoch, so it's safe to call concurrently
for (uint64_t i = 0; i < tileCount; i++) {
tileVisibilities[i].collectTileGarbage(timestamp);
std::vector<uint64_t> RGVisibility<CAPACITY>::collectRGGarbage(uint64_t timestamp) {
size_t totalWords = tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY;
std::vector<uint64_t> rgSnapshot(totalWords, 0);
for (uint32_t t = 0; t < tileCount; t++) {
tileVisibilities[t].collectTileGarbage(timestamp,
rgSnapshot.data() + t * BITMAP_SIZE_PER_TILE_VISIBILITY);
}
return rgSnapshot;
}

template<size_t CAPACITY>
Expand Down
56 changes: 24 additions & 32 deletions cpp/pixels-retina/lib/RGVisibilityJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,28 @@
/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: createNativeObject
* Signature: (J)J
*/
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_createNativeObject
(JNIEnv* env, jobject, jlong rgRecordNum) {
try {
auto* rgVisibility = new RGVisibilityInstance(rgRecordNum);
return reinterpret_cast<jlong>(rgVisibility);
} catch (const std::exception& e) {
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
return 0;
}
}

/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: createNativeObjectInitialized
* Signature: (JJ[J)J
*
* Converts the Java bitmap array to a native vector when present, then
* forwards to a single RGVisibility constructor call.
*/
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_createNativeObjectInitialized
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_createNativeObject
(JNIEnv* env, jobject, jlong rgRecordNum, jlong timestamp, jlongArray bitmap) {
try {
jsize len = env->GetArrayLength(bitmap);
jlong *body = env->GetLongArrayElements(bitmap, nullptr);

std::vector<uint64_t> bitmapData;
bitmapData.reserve(len);
for (int i = 0; i < len; i++) {
bitmapData.push_back((uint64_t)body[i]);
const std::vector<uint64_t>* bitmapPtr = nullptr;
if (bitmap != nullptr) {
jsize len = env->GetArrayLength(bitmap);
jlong* body = env->GetLongArrayElements(bitmap, nullptr);
bitmapData.assign(reinterpret_cast<uint64_t*>(body),
reinterpret_cast<uint64_t*>(body) + len);
env->ReleaseLongArrayElements(bitmap, body, JNI_ABORT);
bitmapPtr = &bitmapData;
}

env->ReleaseLongArrayElements(bitmap, body, JNI_ABORT);

RGVisibilityInstance *rgVisibility = new RGVisibilityInstance(rgRecordNum, timestamp, bitmapData);
return reinterpret_cast<jlong>(rgVisibility);
return reinterpret_cast<jlong>(new RGVisibilityInstance(
static_cast<uint64_t>(rgRecordNum),
static_cast<uint64_t>(timestamp),
bitmapPtr));
} catch (const std::exception& e) {
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
return 0;
Expand Down Expand Up @@ -129,15 +117,20 @@ JNIEXPORT jlongArray JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_getVisi
/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: garbageCollect
* Signature: (JJ)V
* Signature: (JJ)[J
*/
JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_garbageCollect
JNIEXPORT jlongArray JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_garbageCollect
(JNIEnv* env, jobject, jlong timestamp, jlong handle) {
try {
auto* rgVisibility = reinterpret_cast<RGVisibilityInstance*>(handle);
rgVisibility->collectRGGarbage(timestamp);
std::vector<uint64_t> snapshot = rgVisibility->collectRGGarbage(timestamp);
jlongArray result = env->NewLongArray(snapshot.size());
env->SetLongArrayRegion(result, 0, snapshot.size(),
reinterpret_cast<const jlong*>(snapshot.data()));
return result;
} catch (const std::exception& e) {
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
return nullptr;
}
}

Expand Down Expand Up @@ -190,6 +183,5 @@ JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_getRetinaTra
*/
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_getRetinaObjectCount
(JNIEnv *env, jclass clazz) {
// Read the atomic object counter from RetinaBase namespace
return static_cast<jlong>(pixels::g_retina_object_count.load(std::memory_order_relaxed));
}
Loading
Loading