Skip to content

Commit 0a2ac07

Browse files
committed
some progress. Still not tested
1 parent 6ac8d25 commit 0a2ac07

File tree

8 files changed

+327
-137
lines changed

8 files changed

+327
-137
lines changed

experiment/cache_exp.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ static std::atomic<size_t> num_updates_processed;
1616
// queries the guttering system
1717
// Should be run in a seperate thread
1818
static void querier(GutteringSystem *gts) {
19-
WorkQueue<update_batch>::DataNode *data;
19+
VertexBatchQueue::DataNode *data;
2020
while(true) {
2121
bool valid = gts->get_data(data);
2222
if (valid) {
2323
size_t updates = 0;
24-
for (auto batch : data->get_batches())
24+
for (auto batch : data->get_data())
2525
updates += batch.upd_vec.size();
2626
num_updates_processed += updates;
2727
}

experiment/standalone_exp.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ static constexpr uint32_t prime = 100000007;
1111
// queries the guttering system
1212
// Should be run in a seperate thread
1313
void querier(GutteringSystem *gts) {
14-
WorkQueue<update_batch>::DataNode *data;
14+
VertexBatchQueue::DataNode *data;
1515
while(true) {
1616
bool valid = gts->get_data(data);
1717
if(!valid && shutdown)

include/cache_guttering.h

Lines changed: 77 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ constexpr int log2_constexpr(size_t num) {
1212
return power;
1313
}
1414

15-
class CacheGuttering : public GutteringSystem {
15+
class PipelineHyperTree {
1616
private:
1717
const size_t inserters;
1818
const node_id_t num_nodes;
@@ -102,7 +102,7 @@ class CacheGuttering : public GutteringSystem {
102102

103103
class SharedGutter {
104104
private:
105-
CacheGuttering &CGsystem;
105+
PipelineHyperTree &CGsystem;
106106
public:
107107
update_t *data;
108108
std::atomic<size_t> insert_pos;
@@ -112,7 +112,7 @@ class CacheGuttering : public GutteringSystem {
112112
const size_t level;
113113

114114
// true init
115-
SharedGutter(CacheGuttering &CGsystem, size_t size, size_t level, size_t index)
115+
SharedGutter(PipelineHyperTree &CGsystem, size_t size, size_t level, size_t index)
116116
: CGsystem(CGsystem),
117117
data(new update_t[size]),
118118
insert_pos(0),
@@ -124,31 +124,31 @@ class CacheGuttering : public GutteringSystem {
124124
delete[] data;
125125
}
126126

127-
bool batch_insert(CacheGuttering::InsertThread &thr, SharedGutter *&gut_ptr,
127+
bool batch_insert(PipelineHyperTree::InsertThread &thr, SharedGutter *&gut_ptr,
128128
const std::vector<update_t> &updates);
129129
void flush(InsertThread &thr, SharedGutter *&gut_ptr, size_t num_upd_flush,
130130
const std::vector<update_t> &updates);
131131
};
132132

133133
class LeafGutter {
134134
private:
135-
CacheGuttering &CGsystem;
135+
PipelineHyperTree &CGsystem;
136136
public:
137137
std::vector<node_id_t> data;
138138
std::atomic<size_t> insert_pos;
139139
std::atomic<int> active_inserts;
140140
node_id_t index;
141141
const size_t capacity;
142142

143-
LeafGutter(CacheGuttering &CGsystem, size_t size, size_t index)
143+
LeafGutter(PipelineHyperTree &CGsystem, size_t size, size_t index)
144144
: CGsystem(CGsystem),
145145
data(size),
146146
insert_pos(0),
147147
active_inserts(0),
148148
index(index),
149149
capacity(size) {}
150150

151-
bool batch_insert(CacheGuttering::InsertThread &thr, LeafGutter *&gut_ptr,
151+
bool batch_insert(PipelineHyperTree::InsertThread &thr, LeafGutter *&gut_ptr,
152152
const std::vector<node_id_t> &updates);
153153
void flush(InsertThread &thr, LeafGutter *&gut_ptr, size_t num_upd_flush,
154154
const std::vector<node_id_t> &updates);
@@ -163,15 +163,15 @@ class CacheGuttering : public GutteringSystem {
163163
private:
164164
static constexpr size_t root_buffer_capacity = 256;
165165
size_t root_buffer_size = 0;
166-
CacheGuttering &CGsystem; // reference to associated CacheGuttering system
166+
PipelineHyperTree &CGsystem; // reference to associated PipelineHyperTree system
167167

168168
// thread local gutters
169169
update_t root_buffer[root_buffer_capacity];
170170
std::array<LocalGutter<level1_elms_per_buf>, level1_bufs> level1_gutters;
171171
std::array<LocalGutter<level2_elms_per_buf>, level2_bufs> level2_gutters;
172172

173173
public:
174-
InsertThread(CacheGuttering &CGsystem)
174+
InsertThread(PipelineHyperTree &CGsystem)
175175
: CGsystem(CGsystem),
176176
l3_insert_bufs(local_fanout),
177177
l4_insert_bufs(global_fanout),
@@ -239,6 +239,9 @@ class CacheGuttering : public GutteringSystem {
239239
InsertThread (InsertThread &&) = default;
240240
};
241241

242+
void flush_leaf(PipelineHyperTree::InsertThread &thr, LeafGutter *&gut_ptr,
243+
const std::vector<node_id_t> &updates);
244+
242245
// buffers shared amongst all threads
243246
SharedGutter **level3_gutters = nullptr;
244247
SharedGutter **level4_gutters = nullptr;
@@ -249,48 +252,44 @@ class CacheGuttering : public GutteringSystem {
249252
friend class InsertThread;
250253

251254
std::vector<InsertThread> insert_threads; // vector of InsertThreads
255+
VertexBatchQueue &wq;
252256
public:
253257
/**
254258
* Constructs a new guttering systems using a tree like structure for cache efficiency.
255259
* @param nodes number of nodes in the graph.
256260
* @param workers the number of workers which will be removing batches
257261
* @param inserters the number of inserter buffers
258262
*/
259-
CacheGuttering(node_id_t nodes, uint32_t workers, uint32_t inserters,
260-
GutteringConfiguration conf);
261-
CacheGuttering(node_id_t nodes, uint32_t workers, uint32_t inserters) :
262-
CacheGuttering(nodes, workers, inserters, GutteringConfiguration()) {};
263+
PipelineHyperTree(node_id_t nodes, size_t inserters, GutteringConfiguration &conf,
264+
VertexBatchQueue &wq);
263265

264-
~CacheGuttering();
266+
~PipelineHyperTree();
265267

266268
/**
267269
* Puts an update into the data structure.
268-
* @param upd the edge update.1
270+
* @param upd the edge update.
269271
* @param which, which thread is inserting this update
270272
* @return nothing.
271273
*/
272-
insert_ret_t insert(const update_t &upd, size_t which) override {
274+
insert_ret_t insert(const update_t &upd, size_t which) {
273275
assert(which < inserters);
274276
insert_threads[which].insert(upd);
275277
}
276278

277-
insert_ret_t batch_insert(const update_t *batch, size_t num_updates, size_t which) override {
279+
insert_ret_t batch_insert(const update_t *batch, size_t num_updates, size_t which) {
278280
assert(which < inserters);
279281
insert_threads[which].batch_insert(batch, num_updates);
280282
}
281283

282284
insert_ret_t process_stream_upd_batch(const GraphStreamUpdate *batch, size_t num_updates,
283-
size_t which) override {
285+
size_t which) {
284286
assert(which < inserters);
285287
insert_threads[which].process_stream_upd_batch(batch, num_updates);
286288
}
287289

288290
// pure virtual functions don't like default params, so default to 'which' of 0
289291
insert_ret_t insert(const update_t &upd) { insert_threads[0].insert(upd); }
290292

291-
void flush_leaf(CacheGuttering::InsertThread &thr, LeafGutter *&gut_ptr,
292-
const std::vector<node_id_t> &updates);
293-
294293
/**
295294
* Flushes all pending buffers. When this function returns there are no more updates in the
296295
* guttering system
@@ -304,14 +303,69 @@ class CacheGuttering : public GutteringSystem {
304303
* distributed guttering. If you don't know what that means, don't use this function!
305304
*
306305
* @param offset
307-
* @return a reference to the parent CacheGuttering object.
306+
* @return a reference to the parent PipelineHyperTree object.
308307
*/
309-
CacheGuttering& set_offset(node_id_t offset) { relabelling_offset = offset; return *this; }
308+
void set_offset(node_id_t offset) { relabelling_offset = offset; }
310309

311310
/*
312311
* Helper function for tracing a root to leaf path. Prints path to stdout
313312
* @param src the node id to trace
314313
*/
315314
void print_r_to_l(node_id_t src);
316315
void print_fanouts();
316+
317+
// number of batches per work queue element
318+
const size_t wq_batch_per_elm;
319+
const size_t leaf_gutter_size;
320+
};
321+
322+
// The CacheGuttering class adds the GutteringSystem base class to the PipelineHyperTree
323+
class CacheGuttering : public GutteringSystem {
324+
private:
325+
PipelineHyperTree pht;
326+
public:
327+
CacheGuttering(node_id_t nodes, size_t workers, size_t inserters, GutteringConfiguration conf)
328+
: GutteringSystem(nodes, workers, conf), pht(nodes, inserters, conf, wq){};
329+
330+
331+
/**
332+
* Puts an update into the data structure.
333+
* @param upd the edge update.
334+
* @param which, which thread is inserting this update
335+
* @return nothing.
336+
*/
337+
insert_ret_t insert(const update_t &upd, size_t which) override {
338+
pht.insert(upd, which);
339+
}
340+
341+
insert_ret_t batch_insert(const update_t *batch, size_t num_updates, size_t which) override {
342+
pht.batch_insert(batch, num_updates, which);
343+
}
344+
345+
insert_ret_t process_stream_upd_batch(const GraphStreamUpdate *batch, size_t num_updates,
346+
size_t which) override {
347+
pht.process_stream_upd_batch(batch, num_updates, which);
348+
}
349+
350+
// pure virtual functions don't like default params, so default to 'which' of 0
351+
insert_ret_t insert(const update_t &upd) { pht.insert(upd); }
352+
353+
/**
354+
* Flushes all pending buffers. When this function returns there are no more updates in the
355+
* guttering system
356+
* @return nothing.
357+
*/
358+
flush_ret_t force_flush() {
359+
pht.force_flush();
360+
}
361+
362+
/**
363+
* Set the "offset" for incoming edges. That is, if we set an offset of x, an incoming edge
364+
* {i,j} will be stored internally as an edge {i - x, j}. Use only for integration with
365+
* distributed guttering. If you don't know what that means, don't use this function!
366+
*
367+
* @param offset
368+
* @return a reference to the parent PipelineHyperTree object.
369+
*/
370+
void set_offset(node_id_t offset) { pht.set_offset(offset); }
317371
};

include/guttering_system.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ struct update_batch {
1212
std::vector<node_id_t> upd_vec;
1313
};
1414

15+
typedef WorkQueue<std::vector<update_batch>> VertexBatchQueue;
16+
1517
class GutteringSystem {
1618
public:
1719
// Constructor for programmatic configuration
18-
GutteringSystem(node_id_t num_nodes, int workers, GutteringConfiguration conf,
20+
GutteringSystem(node_id_t num_nodes, int workers, GutteringConfiguration &conf,
1921
bool page_slots = false)
2022
: page_size((conf.set_defaults())._page_size), // set defaults first to default init params
2123
buffer_size(conf._buffer_size),
@@ -25,7 +27,7 @@ class GutteringSystem {
2527
wq_batch_per_elm(conf._wq_batch_per_elm),
2628
num_nodes(num_nodes),
2729
leaf_gutter_size(conf._gutter_bytes / sizeof(node_id_t)),
28-
wq(workers * queue_factor, wq_batch_per_elm) {
30+
wq(workers * queue_factor) {
2931
size_t batch_len =
3032
page_slots ? leaf_gutter_size + page_size / sizeof(node_id_t) : leaf_gutter_size;
3133
std::vector<std::vector<update_batch>> wq_data;
@@ -75,8 +77,12 @@ class GutteringSystem {
7577
size_t gutter_size() { return leaf_gutter_size * sizeof(node_id_t); }
7678

7779
// get data out of the guttering system either one gutter at a time or in a batched fashion
78-
bool get_data(WorkQueue<update_batch>::DataNode *&data) { return wq.pop(data); }
79-
void get_data_callback(WorkQueue<update_batch>::DataNode *data) { wq.pop_callback(data); }
80+
bool get_data(VertexBatchQueue::DataNode *&data) {
81+
return wq.pop(data);
82+
}
83+
void get_data_callback(VertexBatchQueue::DataNode *data) {
84+
wq.pop_callback(data);
85+
}
8086
void set_non_block(bool block) { wq.set_non_block(block); } // set non-blocking calls in wq
8187
protected:
8288
// parameters of the GutteringSystem, defined by the GutteringConfiguration param or config file
@@ -89,5 +95,5 @@ class GutteringSystem {
8995

9096
const node_id_t num_nodes;
9197
const node_id_t leaf_gutter_size;
92-
WorkQueue<update_batch> wq;
98+
VertexBatchQueue wq;
9399
};

include/work_queue.h

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,38 @@
77
#include <exception>
88
#include "types.h"
99

10-
template<class T> // templatized by data type we're storing
10+
/**
11+
* WorkQueue is templatized by data type we're storing.
12+
* This data-type must be: 1. default constructable, 2. able to use operator=
13+
* Ideally it should also have fast std::swap() performance (e.g. a std::vector just swaps
14+
* metadata/pointers)
15+
*/
16+
template<class T>
1117
class WorkQueue {
1218
public:
1319
class DataNode {
1420
private:
1521
// LL next pointer
1622
DataNode *next = nullptr;
17-
std::vector<T> data_batch;
23+
T data;
1824

1925
friend class WorkQueue;
2026
public:
21-
const std::vector<T>& get_batches() { return data_batch; }
27+
const T& get_data() { return data; }
2228
};
2329

2430
/**
2531
* Construct a work queue
26-
* @param num_queue_elements the rough number of batches to have in the queue
27-
* @param data_per_elm number of batches per queue element.
32+
* @param num_queue_elements the rough number of data elements to have in the queue
2833
*/
29-
WorkQueue(size_t num_queue_elements, size_t data_per_elm)
30-
: len(num_queue_elements), batch_per_elm(data_per_elm) {
34+
WorkQueue(size_t num_queue_elements)
35+
: len(num_queue_elements) {
3136
non_block = false;
3237

3338
// place all nodes of linked list in the producer queue and reserve
3439
// memory for the vectors
3540
for (size_t i = 0; i < len; i++) {
36-
// create and reserve space for updates
41+
// create and reserve space for queue elements
3742
DataNode *node = new DataNode();
3843
node->next = producer_list; // next of node is head
3944
producer_list = node; // set head to new node
@@ -59,37 +64,32 @@ class WorkQueue {
5964
}
6065

6166
/**
67+
* TODO: Rewrite this description
6268
* Initialize the queue pointers to point at actual data instead of nullptrs
6369
* If this function is called, IT MUST be called before performing any operations with the queue
6470
* The queue can also work without initializing pointers, so long as the pointers returned from
6571
* push being null is acceptable. (i.e. user initializes after push or does not need the returned
6672
* pointer)
67-
* @param data_batches a vector of data batches that will start in the queue but is swapped with
73+
* @param new_data a vector of data that will start in the queue but is swapped with
6874
* data that is pushed into the queue.
6975
*/
70-
void populate_queue(std::vector<std::vector<T>> data_batches) {
71-
if (data_batches.size() != len) {
76+
void populate_queue(const std::vector<T> &new_data) {
77+
if (new_data.size() != len) {
7278
throw std::invalid_argument("WQ: Error number of initialized data batches incorrect");
7379
}
7480
DataNode *data = producer_list; // head of producer list
7581
for (size_t i = 0; i < len; i++) {
76-
if (data_batches[i].size() != batch_per_elm) {
77-
throw std::invalid_argument("WQ: Error number of data elements per batch incorrect");
78-
}
79-
data->data_batch = data_batches[i];
82+
data->data = new_data[i];
8083
data = data->next;
8184
}
8285
}
8386

8487
/**
85-
* Add a data element to the queue
86-
* @param upd_vec_batch vector of graph node id the associated updates
88+
* Adds a data element to the queue
89+
* @param push_data the data the user wants to add to the queue. When this function returns,
90+
* this reference will hold the data that was in the "empty" queue node it replaced
8791
*/
88-
void push(std::vector<T> &upd_vec_batch) {
89-
if (upd_vec_batch.size() > batch_per_elm) {
90-
throw std::runtime_error("WQ: Too many batches in call to push " +
91-
std::to_string(upd_vec_batch.size()) + " > " + std::to_string(batch_per_elm));
92-
}
92+
void push(T &push_data) {
9393
std::unique_lock<std::mutex> lk(producer_list_lock);
9494
producer_condition.wait(lk, [this]{return !full();});
9595

@@ -102,7 +102,7 @@ class WorkQueue {
102102
lk.unlock();
103103

104104
// swap the batch vectors to perform the update
105-
std::swap(node->data_batch, upd_vec_batch);
105+
std::swap(node->data, push_data);
106106

107107
// add this block to the consumer queue for processing
108108
consumer_list_lock.lock();
@@ -198,7 +198,6 @@ class WorkQueue {
198198
DataNode *consumer_list = nullptr; // list of nodes with data for reading
199199

200200
const size_t len; // number of elments in queue
201-
const size_t batch_per_elm; // number of batches per work queue element
202201

203202
// locks and condition variables for producer list
204203
std::condition_variable producer_condition;

0 commit comments

Comments
 (0)