Skip to content

Commit c141948

Browse files
committed
IBLT variants
1 parent 6c11903 commit c141948

File tree

5 files changed

+881
-102
lines changed

5 files changed

+881
-102
lines changed

include/iblt.h

Lines changed: 378 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
#pragma once
2+
3+
#include <vector>
4+
#include <xxhash.h>
5+
#include <iostream>
6+
#include <cstdint>
7+
#include <cassert>
8+
#include <cmath>
9+
#include "types.h"
10+
#include "recovery_types.h"
11+
#include "sketch.h"
12+
13+
// Template for ItemType to allow 32-bit vs 64-bit elements (e.g. node_id_t vs vec_t)
14+
template<typename ItemType = vec_t, typename HashType = vec_hash_t>
15+
class IBLT {
16+
private:
17+
size_t capacity;
18+
size_t num_hashes;
19+
size_t universe_size;
20+
size_t max_recovery_size;
21+
size_t cleanup_sketch_support;
22+
bool has_cleanup_sketch;
23+
bool owns_cleanup_sketch;
24+
long seed;
25+
26+
static constexpr double capacity_factor = 1.35;
27+
// static constexpr double capacity_factor = 1.001;
28+
29+
std::vector<ItemType> alphas;
30+
std::vector<HashType> gammas;
31+
32+
// Deterministic checking bucket
33+
ItemType det_alpha;
34+
HashType det_gamma;
35+
36+
// Hashes an item into a HashType checksum (e.g. 64-bit)
37+
inline HashType get_item_hash(const ItemType item_idx) const {
38+
return (HashType)(XXH3_128bits_withSeed(&item_idx, sizeof(ItemType), seed)).low64;
39+
}
40+
41+
// Computes k unique bucket indices for an item, re-hashing on collision.
42+
inline void get_bucket_indices(const ItemType item_idx, size_t *indices) const {
43+
assert(capacity > 0);
44+
assert(num_hashes > 0);
45+
assert(capacity > num_hashes);
46+
for (size_t i = 0; i < num_hashes; ++i) {
47+
size_t attempt = i;
48+
bool unique;
49+
do {
50+
auto hash = XXH3_128bits_withSeed(&item_idx, sizeof(ItemType), seed + attempt);
51+
indices[i] = hash.low64 % capacity;
52+
unique = true;
53+
for (size_t j = 0; j < i; ++j) {
54+
if (indices[j] == indices[i]) {
55+
unique = false;
56+
attempt += num_hashes;
57+
break;
58+
}
59+
}
60+
} while (!unique);
61+
}
62+
}
63+
64+
inline bool is_empty(size_t idx) const {
65+
return (alphas[idx] | gammas[idx]) == 0;
66+
}
67+
68+
inline bool is_good(size_t idx) const {
69+
return !is_empty(idx) && get_item_hash(alphas[idx]) == gammas[idx];
70+
}
71+
72+
// Shared recovery implementation. If allow_partial is true, may return PARTIAL_RECOVERY.
73+
RecoveryResult recover_internal(bool allow_partial) {
74+
if (cleanup_sketch != nullptr) {
75+
cleanup_sketch->reset_sample_state();
76+
}
77+
78+
std::vector<vec_t> recovered_items;
79+
std::vector<size_t> good_buckets;
80+
81+
ItemType working_det_alpha = 0;
82+
HashType working_det_gamma = 0;
83+
84+
// push good buckets onto queue
85+
for (size_t i = 0; i < capacity; ++i) {
86+
if (is_good(i)) {
87+
good_buckets.push_back(i);
88+
}
89+
}
90+
91+
// Peel good buckets
92+
while (!good_buckets.empty()) {
93+
size_t idx = good_buckets.back();
94+
good_buckets.pop_back();
95+
96+
if (!is_good(idx)) continue;
97+
98+
ItemType item = alphas[idx];
99+
HashType item_hash = get_item_hash(item);
100+
101+
recovered_items.push_back((vec_t)item);
102+
working_det_alpha ^= item;
103+
working_det_gamma ^= item_hash;
104+
105+
// Remove from IBLT buckets
106+
size_t indices[num_hashes];
107+
get_bucket_indices(item, indices);
108+
for (size_t i = 0; i < num_hashes; ++i) {
109+
alphas[indices[i]] ^= item;
110+
gammas[indices[i]] ^= item_hash;
111+
}
112+
113+
// Check if removing this item created new good buckets
114+
for (size_t i = 0; i < num_hashes; ++i) {
115+
if (is_good(indices[i])) {
116+
good_buckets.push_back(indices[i]);
117+
}
118+
}
119+
120+
// Early exit: deterministic bucket says all items recovered
121+
if (working_det_alpha == det_alpha && working_det_gamma == det_gamma) {
122+
// Undo changes so recover is non-destructive
123+
for (auto ri : recovered_items) {
124+
ItemType it = (ItemType)ri;
125+
HashType ih = get_item_hash(it);
126+
size_t ri_indices[num_hashes];
127+
get_bucket_indices(it, ri_indices);
128+
for (size_t i = 0; i < num_hashes; ++i) {
129+
alphas[ri_indices[i]] ^= it;
130+
gammas[ri_indices[i]] ^= ih;
131+
}
132+
}
133+
return {SUCCESS, recovered_items};
134+
}
135+
}
136+
137+
// Undo peeling changes so recover is non-destructive
138+
for (auto ri : recovered_items) {
139+
ItemType it = (ItemType)ri;
140+
HashType ih = get_item_hash(it);
141+
size_t ri_indices[num_hashes];
142+
get_bucket_indices(it, ri_indices);
143+
for (size_t i = 0; i < num_hashes; ++i) {
144+
alphas[ri_indices[i]] ^= it;
145+
gammas[ri_indices[i]] ^= ih;
146+
}
147+
}
148+
149+
if (working_det_alpha == det_alpha && working_det_gamma == det_gamma) {
150+
return {SUCCESS, recovered_items};
151+
}
152+
153+
// If a cleanup sketch exists, try to finish with it.
154+
if (cleanup_sketch != nullptr) {
155+
// Temporarily remove recovered items from the structure
156+
for (auto idx : recovered_items) {
157+
this->update(idx);
158+
}
159+
160+
for (size_t i = 0; i < cleanup_sketch->get_num_samples(); i++) {
161+
ExhaustiveSketchSample sample = cleanup_sketch->exhaustive_sample();
162+
if (sample.result == ZERO) {
163+
// Undo temporary removals
164+
for (auto idx : recovered_items) {
165+
this->update(idx);
166+
}
167+
return {SUCCESS, recovered_items};
168+
}
169+
for (auto idx : sample.idxs) {
170+
recovered_items.push_back(idx);
171+
this->update(idx);
172+
}
173+
}
174+
175+
// Undo the temporary removals from cleanup probing
176+
for (auto idx : recovered_items) {
177+
this->update(idx);
178+
}
179+
}
180+
181+
if (allow_partial && !recovered_items.empty()) {
182+
return {PARTIAL_RECOVERY, recovered_items};
183+
}
184+
return {FAILURE, recovered_items};
185+
}
186+
187+
public:
188+
Sketch *cleanup_sketch;
189+
190+
IBLT() : capacity(0), num_hashes(0), universe_size(0), max_recovery_size(0),
191+
cleanup_sketch_support(0), has_cleanup_sketch(false), owns_cleanup_sketch(false),
192+
seed(0), det_alpha(0), det_gamma(0), cleanup_sketch(nullptr) {}
193+
194+
IBLT(size_t universe_size, size_t max_recovery_size, double cleanup_sketch_support_factor,
195+
uint64_t seed, bool include_cleanup_sketch = true, Sketch *borrowed_cleanup_sketch = nullptr,
196+
size_t k = 3)
197+
: capacity((size_t)std::ceil(capacity_factor * max_recovery_size)),
198+
num_hashes(k), universe_size(universe_size), max_recovery_size(max_recovery_size),
199+
seed(seed),
200+
alphas((size_t)std::ceil(capacity_factor * max_recovery_size), 0),
201+
gammas((size_t)std::ceil(capacity_factor * max_recovery_size), 0),
202+
det_alpha(0), det_gamma(0) {
203+
204+
assert(capacity > 0);
205+
assert(num_hashes > 0);
206+
assert(capacity > num_hashes);
207+
208+
cleanup_sketch_support = (size_t)std::ceil(cleanup_sketch_support_factor * std::log2((double)universe_size));
209+
has_cleanup_sketch = false;
210+
owns_cleanup_sketch = false;
211+
cleanup_sketch = nullptr;
212+
if (borrowed_cleanup_sketch != nullptr) {
213+
cleanup_sketch = borrowed_cleanup_sketch;
214+
has_cleanup_sketch = true;
215+
} else if (include_cleanup_sketch && cleanup_sketch_support > 0) {
216+
cleanup_sketch = new Sketch(universe_size, seed, cleanup_sketch_support, 1);
217+
has_cleanup_sketch = true;
218+
owns_cleanup_sketch = true;
219+
}
220+
}
221+
222+
IBLT(const IBLT &other)
223+
: capacity(other.capacity), num_hashes(other.num_hashes),
224+
universe_size(other.universe_size), max_recovery_size(other.max_recovery_size),
225+
cleanup_sketch_support(other.cleanup_sketch_support),
226+
has_cleanup_sketch(other.has_cleanup_sketch), owns_cleanup_sketch(false),
227+
seed(other.seed),
228+
alphas(other.alphas), gammas(other.gammas),
229+
det_alpha(other.det_alpha), det_gamma(other.det_gamma),
230+
cleanup_sketch(nullptr) {
231+
if (other.cleanup_sketch != nullptr) {
232+
if (other.owns_cleanup_sketch) {
233+
cleanup_sketch = new Sketch(*other.cleanup_sketch);
234+
owns_cleanup_sketch = true;
235+
} else {
236+
cleanup_sketch = other.cleanup_sketch;
237+
}
238+
}
239+
}
240+
241+
IBLT &operator=(const IBLT &other) {
242+
if (this == &other) return *this;
243+
if (owns_cleanup_sketch) delete cleanup_sketch;
244+
cleanup_sketch = nullptr;
245+
246+
capacity = other.capacity;
247+
num_hashes = other.num_hashes;
248+
universe_size = other.universe_size;
249+
max_recovery_size = other.max_recovery_size;
250+
cleanup_sketch_support = other.cleanup_sketch_support;
251+
has_cleanup_sketch = other.has_cleanup_sketch;
252+
owns_cleanup_sketch = false;
253+
seed = other.seed;
254+
alphas = other.alphas;
255+
gammas = other.gammas;
256+
det_alpha = other.det_alpha;
257+
det_gamma = other.det_gamma;
258+
259+
if (other.cleanup_sketch != nullptr) {
260+
if (other.owns_cleanup_sketch) {
261+
cleanup_sketch = new Sketch(*other.cleanup_sketch);
262+
owns_cleanup_sketch = true;
263+
} else {
264+
cleanup_sketch = other.cleanup_sketch;
265+
}
266+
}
267+
return *this;
268+
}
269+
270+
IBLT(IBLT &&other) noexcept
271+
: capacity(other.capacity), num_hashes(other.num_hashes),
272+
universe_size(other.universe_size), max_recovery_size(other.max_recovery_size),
273+
cleanup_sketch_support(other.cleanup_sketch_support),
274+
has_cleanup_sketch(other.has_cleanup_sketch), owns_cleanup_sketch(other.owns_cleanup_sketch),
275+
seed(other.seed),
276+
alphas(std::move(other.alphas)), gammas(std::move(other.gammas)),
277+
det_alpha(other.det_alpha), det_gamma(other.det_gamma),
278+
cleanup_sketch(other.cleanup_sketch) {
279+
other.cleanup_sketch = nullptr;
280+
other.owns_cleanup_sketch = false;
281+
other.has_cleanup_sketch = false;
282+
}
283+
284+
IBLT &operator=(IBLT &&other) noexcept {
285+
if (this == &other) return *this;
286+
if (owns_cleanup_sketch) delete cleanup_sketch;
287+
288+
capacity = other.capacity;
289+
num_hashes = other.num_hashes;
290+
universe_size = other.universe_size;
291+
max_recovery_size = other.max_recovery_size;
292+
cleanup_sketch_support = other.cleanup_sketch_support;
293+
has_cleanup_sketch = other.has_cleanup_sketch;
294+
owns_cleanup_sketch = other.owns_cleanup_sketch;
295+
seed = other.seed;
296+
alphas = std::move(other.alphas);
297+
gammas = std::move(other.gammas);
298+
det_alpha = other.det_alpha;
299+
det_gamma = other.det_gamma;
300+
cleanup_sketch = other.cleanup_sketch;
301+
302+
other.cleanup_sketch = nullptr;
303+
other.owns_cleanup_sketch = false;
304+
other.has_cleanup_sketch = false;
305+
return *this;
306+
}
307+
308+
~IBLT() {
309+
if (owns_cleanup_sketch) {
310+
delete cleanup_sketch;
311+
}
312+
}
313+
314+
void update(const vec_t update_idx) {
315+
ItemType item = (ItemType)update_idx;
316+
HashType item_hash = get_item_hash(item);
317+
318+
det_alpha ^= item;
319+
det_gamma ^= item_hash;
320+
321+
// TODO - do we want to keep using
322+
// variable length arrays?
323+
size_t indices[num_hashes];
324+
get_bucket_indices(item, indices);
325+
for (size_t i = 0; i < num_hashes; ++i) {
326+
alphas[indices[i]] ^= item;
327+
gammas[indices[i]] ^= item_hash;
328+
}
329+
if (cleanup_sketch != nullptr) {
330+
cleanup_sketch->update(update_idx);
331+
}
332+
}
333+
334+
void reset() {
335+
det_alpha = 0;
336+
det_gamma = 0;
337+
std::fill(alphas.begin(), alphas.end(), 0);
338+
std::fill(gammas.begin(), gammas.end(), 0);
339+
if (cleanup_sketch != nullptr) {
340+
cleanup_sketch->zero_contents();
341+
}
342+
}
343+
344+
// Non-destructive recovery
345+
RecoveryResult recover() {
346+
return recover_internal(false);
347+
}
348+
349+
RecoveryResult recover(bool allow_partial) {
350+
return recover_internal(allow_partial);
351+
}
352+
353+
void merge(const IBLT &other) {
354+
assert(other.capacity == capacity);
355+
det_alpha ^= other.det_alpha;
356+
det_gamma ^= other.det_gamma;
357+
for (size_t i = 0; i < capacity; ++i) {
358+
alphas[i] ^= other.alphas[i];
359+
gammas[i] ^= other.gammas[i];
360+
}
361+
if (cleanup_sketch != nullptr && other.cleanup_sketch != nullptr) {
362+
cleanup_sketch->merge(*other.cleanup_sketch);
363+
}
364+
}
365+
366+
size_t space_usage_bytes() const {
367+
size_t total = sizeof(IBLT)
368+
+ alphas.capacity() * sizeof(ItemType)
369+
+ gammas.capacity() * sizeof(HashType);
370+
if (owns_cleanup_sketch && cleanup_sketch != nullptr) {
371+
total += sizeof(Sketch);
372+
total += cleanup_sketch->bucket_array_bytes();
373+
}
374+
return total;
375+
}
376+
377+
inline long get_seed() const { return seed; }
378+
};

0 commit comments

Comments
 (0)