Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,19 @@ typedef struct btree_node {
} btree_node;

typedef struct {
char merged_data[MAX_INLINE_MESSAGE_SIZE(MAX_PAGE_SIZE)];
} scratch_btree_add_tuple;

typedef struct {
char scratch_node[MAX_PAGE_SIZE];
char scratch_node[1]; // Actual size is btree_scratch_size(cfg)
} scratch_btree_defragment_node;

typedef struct { // Note: not a union
scratch_btree_add_tuple add_tuple;
scratch_btree_defragment_node defragment_node;
} PLATFORM_CACHELINE_ALIGNED btree_scratch;

static inline uint64
btree_scratch_size(const btree_config *cfg)
{
return cache_config_page_size(cfg->cache_cfg);
}

/*
* *************************************************************************
* BTree pivot data: Disk-resident structure
Expand Down
34 changes: 30 additions & 4 deletions src/memtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ memtable_add_tuple(memtable_context *ctxt)
ctxt->is_empty = FALSE;
}

static btree_scratch *
get_btree_scratch(memtable_context *ctxt, uint64 tid)
{
return (btree_scratch *)(ctxt->btree_scratches
+ tid * ctxt->btree_scratch_sz);
}

platform_status
memtable_insert(memtable_context *ctxt,
memtable *mt,
Expand All @@ -218,10 +225,11 @@ memtable_insert(memtable_context *ctxt,
const threadid tid = platform_get_tid();
bool32 was_unique;

platform_status rc = btree_insert(ctxt->cc,
btree_scratch *scratch = get_btree_scratch(ctxt, tid);
platform_status rc = btree_insert(ctxt->cc,
ctxt->cfg.btree_cfg,
heap_id,
&ctxt->scratch[tid],
scratch,
mt->root_addr,
&mt->mini,
tuple_key,
Expand Down Expand Up @@ -311,15 +319,31 @@ memtable_context_init(memtable_context *ctxt,
ZERO_CONTENTS(ctxt);
ctxt->cc = cc;
ctxt->cfg = *cfg;
ctxt->hid = hid;

if (MAX_MEMTABLES < cfg->max_memtables) {
platform_error_log("Configured number of memtables (%lu) exceeds max "
"supported memtables (%d)\n",
cfg->max_memtables,
MAX_MEMTABLES);
return STATUS_BAD_PARAM;
}

rc = platform_mutex_init(
&ctxt->incorporation_mutex, platform_get_module_id(), hid);
if (!SUCCESS(rc)) {
return rc;
}
batch_rwlock_init(&ctxt->rwlock);

platform_assert(cfg->max_memtables <= MAX_MEMTABLES);
ctxt->btree_scratch_sz = btree_scratch_size(cfg->btree_cfg);
ctxt->btree_scratches = TYPED_MANUAL_ZALLOC(
hid, ctxt->btree_scratches, ctxt->btree_scratch_sz * MAX_THREADS);
if (ctxt->btree_scratches == NULL) {
platform_mutex_destroy(&ctxt->incorporation_mutex);
return STATUS_NO_MEMORY;
}

batch_rwlock_init(&ctxt->rwlock);

for (uint64 mt_no = 0; mt_no < cfg->max_memtables; mt_no++) {
uint64 generation = mt_no;
Expand Down Expand Up @@ -348,6 +372,8 @@ memtable_context_deinit(memtable_context *ctxt)

platform_mutex_destroy(&ctxt->incorporation_mutex);
batch_rwlock_deinit(&ctxt->rwlock);

platform_free(ctxt->hid, ctxt->btree_scratches);
}

void
Expand Down
10 changes: 6 additions & 4 deletions src/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ typedef struct memtable_config {
} memtable_config;

typedef struct memtable_context {
cache *cc;
memtable_config cfg;
task_system *ts;
cache *cc;
memtable_config cfg;
task_system *ts;
platform_heap_id *hid;

process_fn process;
void *process_ctxt;
Expand All @@ -136,7 +137,8 @@ typedef struct memtable_context {
bool32 is_empty;

// Effectively thread local, no locking at all:
btree_scratch scratch[MAX_THREADS];
uint64 btree_scratch_sz;
char *btree_scratches;

memtable mt[MAX_MEMTABLES];
} memtable_context;
Expand Down
19 changes: 13 additions & 6 deletions tests/unit/btree_stress_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ CTEST_DATA(btree_stress)
allocator_config allocator_cfg;
clockcache_config cache_cfg;
task_system_config task_cfg;
btree_scratch test_scratch;
btree_scratch *test_scratch;
btree_config dbtree_cfg;

// To create a heap for io, allocator, cache and splinter
Expand Down Expand Up @@ -152,6 +152,11 @@ CTEST_SETUP(btree_stress)
{
ASSERT_TRUE(FALSE, "Failed to init heap\n");
}

data->test_scratch = TYPED_MANUAL_ZALLOC(
data->hid, data->test_scratch, btree_scratch_size(&data->dbtree_cfg));
ASSERT_NOT_NULL(data->test_scratch);

// Setup execution of concurrent threads
data->io = io_handle_create(&data->io_cfg, data->hid);
if (data->io == NULL
Expand Down Expand Up @@ -217,7 +222,7 @@ CTEST2(btree_stress, iterator_basics)
btree_insert((cache *)&data->cc,
&data->dbtree_cfg,
data->hid,
&data->test_scratch,
data->test_scratch,
root_addr,
&mini,
gen_key(&data->dbtree_cfg, i, keybuf, sizeof(keybuf)),
Expand Down Expand Up @@ -251,11 +256,13 @@ CTEST2(btree_stress, test_random_inserts_concurrent)
insert_thread_params *params = TYPED_ARRAY_ZALLOC(hid, params, nthreads);
platform_thread *threads = TYPED_ARRAY_ZALLOC(hid, threads, nthreads);

uint64 bt_sc_sz = btree_scratch_size(&data->dbtree_cfg);
for (uint64 i = 0; i < nthreads; i++) {
params[i].cc = (cache *)&data->cc;
params[i].cfg = &data->dbtree_cfg;
params[i].hid = data->hid;
params[i].scratch = TYPED_MALLOC(data->hid, params[i].scratch);
params[i].cc = (cache *)&data->cc;
params[i].cfg = &data->dbtree_cfg;
params[i].hid = data->hid;
params[i].scratch =
TYPED_MANUAL_MALLOC(data->hid, params[i].scratch, bt_sc_sz);
params[i].mini = &mini;
params[i].root_addr = root_addr;
params[i].start = i * (nkvs / nthreads);
Expand Down
11 changes: 7 additions & 4 deletions tests/unit/btree_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ CTEST_DATA(btree)
io_config io_cfg;
allocator_config allocator_cfg;
clockcache_config cache_cfg;
btree_scratch test_scratch;
btree_scratch *test_scratch;
btree_config dbtree_cfg;
platform_heap_id hid;
};
Expand Down Expand Up @@ -106,6 +106,9 @@ CTEST_SETUP(btree)
{
ASSERT_TRUE(FALSE, "Failed to parse args\n");
}

data->test_scratch = TYPED_MANUAL_ZALLOC(
data->hid, data->test_scratch, btree_scratch_size(&data->dbtree_cfg));
}

// Optional teardown function for suite, called after every test in suite
Expand All @@ -120,7 +123,7 @@ CTEST_TEARDOWN(btree)
*/
CTEST2(btree, test_leaf_hdr)
{
int rc = leaf_hdr_tests(&data->dbtree_cfg, &data->test_scratch, data->hid);
int rc = leaf_hdr_tests(&data->dbtree_cfg, data->test_scratch, data->hid);
ASSERT_EQUAL(0, rc);
}

Expand All @@ -138,7 +141,7 @@ CTEST2(btree, test_leaf_hdr_search)
*/
CTEST2(btree, test_index_hdr)
{
int rc = index_hdr_tests(&data->dbtree_cfg, &data->test_scratch, data->hid);
int rc = index_hdr_tests(&data->dbtree_cfg, data->test_scratch, data->hid);
ASSERT_EQUAL(0, rc);
}

Expand All @@ -158,7 +161,7 @@ CTEST2(btree, test_leaf_split)
{
for (int nkvs = 2; nkvs < 100; nkvs++) {
int rc = leaf_split_tests(
&data->dbtree_cfg, &data->test_scratch, nkvs, data->hid);
&data->dbtree_cfg, data->test_scratch, nkvs, data->hid);
ASSERT_EQUAL(0, rc);
}
}
Expand Down
Loading