Skip to content

Commit 1fc7664

Browse files
committed
working with light testing
1 parent 0a2ac07 commit 1fc7664

File tree

8 files changed

+215
-160
lines changed

8 files changed

+215
-160
lines changed

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ add_library(GutterTree
8181
include/buffer_flusher.h
8282
src/standalone_gutters.cpp
8383
include/standalone_gutters.h
84-
src/cache_guttering.cpp
85-
include/cache_guttering.h
84+
src/pht.cpp
85+
include/pht.h
8686
src/numa_pht.cpp
8787
include/numa_pht.h
8888
include/types.h)

experiment/cache_exp.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include <chrono>
44
#include <atomic>
55
#include <fstream>
6-
#include "../include/cache_guttering.h"
6+
#include "../include/pht.h"
77

88
static bool shutdown = false;
99
static constexpr uint32_t prime = 100000007;
@@ -52,7 +52,7 @@ static void run_randomized(const int nodes, const unsigned long updates, const u
5252
.gutter_bytes(gutter_size)
5353
.wq_batch_per_elm(wq_batch);
5454

55-
CacheGuttering *gutters = new CacheGuttering(nodes, num_workers, nthreads, conf);
55+
PipelineHyperTree *gutters = new PipelineHyperTree(nodes, num_workers, nthreads, conf);
5656

5757
// create queriers
5858
#ifndef EARLY_EXIT
@@ -130,7 +130,7 @@ static void run_test(const int nodes, const unsigned long updates, const unsigne
130130
.num_flushers(2)
131131
.gutter_bytes(32 * 1024)
132132
.wq_batch_per_elm(8);
133-
CacheGuttering *gutters = new CacheGuttering(nodes, num_workers, nthreads, conf);
133+
PipelineHyperTree *gutters = new PipelineHyperTree(nodes, num_workers, nthreads, conf);
134134

135135
// create queriers
136136
#ifndef EARLY_EXIT

include/numa_pht.h

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
2+
#include "guttering_system.h"
3+
#include "pht.h"
4+
/**
5+
* The idea of NumaPHT is to limit the amount of tree space that each thread touches without
6+
* giving up on performance. If updates are uniformly distributed then this is easy, simply divide
7+
* the gutter space among the threads evenly and allow each to operate independently.
8+
*
9+
* However, if updates are not distributed uniformly, or arrive in a non-uniform order, then this
10+
* strategy will reduce performance as only some of the threads will be performing significant
11+
* work.
12+
*
13+
* TODO: Solve this problem at some point I guess
14+
*/
15+
class NumaPHT : public PipelineHyperTree {
16+
private:
17+
const node_id_t num_gutters;
18+
19+
const size_t num_inserters;
20+
const size_t num_trees;
21+
const size_t tree_bits;
22+
const size_t vert_bits;
23+
24+
constexpr static size_t thread_buffer_size = 2048;
25+
26+
struct ThreadBuffer {
27+
size_t size = 0;
28+
size_t capacity;
29+
update_t *buffer = nullptr;
30+
31+
ThreadBuffer() : capacity(thread_buffer_size), buffer(new update_t[capacity]) {}
32+
ThreadBuffer(ThreadBuffer&& oth) : size(oth.size), capacity(oth.capacity), buffer(oth.buffer) {
33+
oth.size = 0;
34+
oth.capacity = 0;
35+
oth.buffer = nullptr;
36+
}
37+
ThreadBuffer& operator=(ThreadBuffer&& oth) {
38+
size = oth.size;
39+
capacity = oth.capacity;
40+
buffer = oth.buffer;
41+
42+
oth.size = 0;
43+
oth.capacity = 0;
44+
oth.buffer = nullptr;
45+
46+
return *this;
47+
}
48+
49+
~ThreadBuffer() { if (buffer != nullptr) delete[] buffer; }
50+
};
51+
52+
// thread buffers for its roots and updates it wants to route to other trees
53+
ThreadBuffer *thread_buffers;
54+
55+
WorkQueue<ThreadBuffer> **tree_queues;
56+
public:
57+
/**
58+
* NumaPHT constructor
59+
*
60+
* @param num_gutters number of gutters to construct
61+
* @param num_consumers number of threads that will be removing leaf gutters from the tree
62+
* @param num_inserters number of threads that will be performing updates
63+
* @param num_trees number of independent trees to divide the gutters into
64+
*/
65+
NumaPHT(node_id_t num_gutters, size_t num_consumers, size_t num_inserters, size_t num_trees,
66+
GutteringConfiguration conf);
67+
68+
~NumaPHT();
69+
70+
insert_ret_t insert(const update_t &upd, size_t thr_id);
71+
72+
insert_ret_t batch_insert(const update_t *batch, size_t num_updates, size_t thr_id);
73+
74+
insert_ret_t process_stream_upd_batch(const GraphStreamUpdate *batch, size_t num_updates,
75+
size_t thr_id);
76+
77+
insert_ret_t force_flush();
78+
79+
insert_ret_t insert(const update_t &upd) {
80+
insert(upd, 0);
81+
}
82+
83+
thread_local static size_t thr_insert_num;
84+
};
Lines changed: 3 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ constexpr int log2_constexpr(size_t num) {
1212
return power;
1313
}
1414

15-
class PipelineHyperTree {
15+
16+
class PipelineHyperTree : public GutteringSystem {
1617
private:
1718
const size_t inserters;
1819
const node_id_t num_nodes;
@@ -252,16 +253,14 @@ class PipelineHyperTree {
252253
friend class InsertThread;
253254

254255
std::vector<InsertThread> insert_threads; // vector of InsertThreads
255-
VertexBatchQueue &wq;
256256
public:
257257
/**
258258
* Constructs a new guttering systems using a tree like structure for cache efficiency.
259259
* @param nodes number of nodes in the graph.
260260
* @param workers the number of workers which will be removing batches
261261
* @param inserters the number of inserter buffers
262262
*/
263-
PipelineHyperTree(node_id_t nodes, size_t inserters, GutteringConfiguration &conf,
264-
VertexBatchQueue &wq);
263+
PipelineHyperTree(node_id_t num_gutters, size_t num_consumers, size_t num_inserters, GutteringConfiguration conf);
265264

266265
~PipelineHyperTree();
267266

@@ -313,59 +312,4 @@ class PipelineHyperTree {
313312
*/
314313
void print_r_to_l(node_id_t src);
315314
void print_fanouts();
316-
317-
// number of batches per work queue element
318-
const size_t wq_batch_per_elm;
319-
const size_t leaf_gutter_size;
320-
};
321-
322-
// The CacheGuttering class adds the GutteringSystem base class to the PipelineHyperTree
323-
class CacheGuttering : public GutteringSystem {
324-
private:
325-
PipelineHyperTree pht;
326-
public:
327-
CacheGuttering(node_id_t nodes, size_t workers, size_t inserters, GutteringConfiguration conf)
328-
: GutteringSystem(nodes, workers, conf), pht(nodes, inserters, conf, wq){};
329-
330-
331-
/**
332-
* Puts an update into the data structure.
333-
* @param upd the edge update.
334-
* @param which, which thread is inserting this update
335-
* @return nothing.
336-
*/
337-
insert_ret_t insert(const update_t &upd, size_t which) override {
338-
pht.insert(upd, which);
339-
}
340-
341-
insert_ret_t batch_insert(const update_t *batch, size_t num_updates, size_t which) override {
342-
pht.batch_insert(batch, num_updates, which);
343-
}
344-
345-
insert_ret_t process_stream_upd_batch(const GraphStreamUpdate *batch, size_t num_updates,
346-
size_t which) override {
347-
pht.process_stream_upd_batch(batch, num_updates, which);
348-
}
349-
350-
// pure virtual functions don't like default params, so default to 'which' of 0
351-
insert_ret_t insert(const update_t &upd) { pht.insert(upd); }
352-
353-
/**
354-
* Flushes all pending buffers. When this function returns there are no more updates in the
355-
* guttering system
356-
* @return nothing.
357-
*/
358-
flush_ret_t force_flush() {
359-
pht.force_flush();
360-
}
361-
362-
/**
363-
* Set the "offset" for incoming edges. That is, if we set an offset of x, an incoming edge
364-
* {i,j} will be stored internally as an edge {i - x, j}. Use only for integration with
365-
* distributed guttering. If you don't know what that means, don't use this function!
366-
*
367-
* @param offset
368-
* @return a reference to the parent PipelineHyperTree object.
369-
*/
370-
void set_offset(node_id_t offset) { pht.set_offset(offset); }
371315
};

include/work_queue.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ class WorkQueue {
6363
consumer_list_lock.unlock();
6464
}
6565

66+
WorkQueue(WorkQueue &) = delete;
67+
WorkQueue& operator=(WorkQueue &) = delete;
68+
6669
/**
6770
* TODO: Rewrite this description
6871
* Initialize the queue pointers to point at actual data instead of nullptrs
@@ -89,12 +92,16 @@ class WorkQueue {
8992
* @param push_data the data the user wants to add to the queue. When this function returns,
9093
* this reference will hold the data that was in the "empty" queue node it replaced
9194
*/
92-
void push(T &push_data) {
95+
bool push(T &push_data, bool block = true) {
9396
std::unique_lock<std::mutex> lk(producer_list_lock);
94-
producer_condition.wait(lk, [this]{return !full();});
97+
producer_condition.wait(lk, [this, block]{return !full() || !block;});
9598

9699
// printf("WQ: Push:\n");
97100
// print();
101+
if (full()) {
102+
lk.unlock();
103+
return false;
104+
}
98105

99106
// remove head from produce_list
100107
DataNode *node = producer_list;
@@ -110,18 +117,19 @@ class WorkQueue {
110117
consumer_list = node;
111118
consumer_list_lock.unlock();
112119
consumer_condition.notify_one();
120+
return true;
113121
}
114122

115123
/**
116124
* Get data from the queue for processing
117125
* @param data where to place the Data
118126
* @return true if we were able to get good data, false otherwise
119127
*/
120-
bool pop(DataNode *&data) {
128+
bool pop(DataNode *&data, bool block = true) {
121129
// wait while queue is empty
122130
// printf("waiting to peek\n");
123131
std::unique_lock<std::mutex> lk(consumer_list_lock);
124-
consumer_condition.wait(lk, [this]{return !empty() || non_block;});
132+
consumer_condition.wait(lk, [this, block]{return !empty() || !block || non_block;});
125133

126134
// printf("WQ: Peek\n");
127135
// print();

0 commit comments

Comments
 (0)