Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions keyvi/include/keyvi/index/internal/merge_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <chrono> //NOLINT
#include <functional>
#include <memory>
#include <sstream>
#include <string>
#include <thread> // NOLINT
#include <vector>
Expand Down Expand Up @@ -157,7 +158,16 @@ class MergeJob final {
void DoExternalProcessMerge(boost::asio::io_context* external_process_ctx) {
payload_.start_time_ = std::chrono::system_clock::now();

std::istringstream iss(payload_.settings_.GetKeyviMergerBin());
std::string executable;
iss >> executable;

std::vector<std::string> args;
std::string token;
while (iss >> token) {
args.push_back(std::move(token));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: no header providing "std::move" is directly included [misc-include-cleaner]

keyvi/include/keyvi/index/internal/merge_job.h:27:

- #include <vector>
+ #include <utility>
+ #include <vector>

}

args.push_back("-m");
args.push_back("5242880");

Expand All @@ -169,8 +179,7 @@ class MergeJob final {
args.push_back("-o");
args.push_back(payload_.output_filename_.string());

external_process_.reset(
new boost::process::v2::process(*external_process_ctx, payload_.settings_.GetKeyviMergerBin(), args));
external_process_.reset(new boost::process::v2::process(*external_process_ctx, executable, args));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: initializing non-owner argument of type 'boost::process::basic_process<> *' with a newly created 'gsl::owner<>' [cppcoreguidelines-owning-memory]

    external_process_.reset(new boost::process::v2::process(*external_process_ctx, executable, args));
                            ^

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use std::make_shared instead [modernize-make-shared]

Suggested change
external_process_.reset(new boost::process::v2::process(*external_process_ctx, executable, args));
external_process_ = std::make_shared<boost::process::v2::process>(*external_process_ctx, executable, args);

}

bool TryFinalizeMerge() {
Expand Down
4 changes: 2 additions & 2 deletions python/src/py/keyvi/_pycore/keyvimerger.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@

merger = keyvi.compiler.JsonDictionaryMerger(params)
for f in args.input_file:
merger.Add(f)
merger.add(f)

merger.Merge(args.output_file)
merger.merge(args.output_file)
else:
print ("ERROR: arguments wrong or missing.")
sys.exit(1)
Expand Down
129 changes: 129 additions & 0 deletions python/tests/index/index_external_merge_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# -*- coding: utf-8 -*-
# Usage: py.test tests

from keyvi.index import Index, ReadOnlyIndex
import os
import tempfile
import time


def test_external_merge():
with tempfile.TemporaryDirectory() as test_dir:
index_dir = os.path.join(test_dir, "index")
index = Index(index_dir, {"segment_external_merge_key_threshold": "100"})

for batch in range(10):
key_values = []
for i in range(50):
key = "key-{:05d}".format(batch * 50 + i)
value = "value-{}".format(batch * 50 + i)
key_values.append((key, value))
index.bulk_set(key_values)
index.flush()

# wait for external merge to complete
time.sleep(5)

# verify all keys are still accessible
for i in range(500):
key = "key-{:05d}".format(i)
assert key in index, "missing key: {}".format(key)
match = index[key]
assert match.value == "value-{}".format(i)



def test_external_merge_with_deletes():
with tempfile.TemporaryDirectory() as test_dir:
index_dir = os.path.join(test_dir, "index")
index = Index(index_dir, {"segment_external_merge_key_threshold": "100"})

for batch in range(10):
key_values = []
for i in range(50):
key = "key-{:05d}".format(batch * 50 + i)
value = "value-{}".format(batch * 50 + i)
key_values.append((key, value))
index.bulk_set(key_values)
index.flush()

# delete every other key
for i in range(0, 500, 2):
index.delete("key-{:05d}".format(i))
index.flush()

# wait for external merge to complete
time.sleep(5)

for i in range(500):
key = "key-{:05d}".format(i)
if i % 2 == 0:
assert key not in index, "key should be deleted: {}".format(key)
else:
assert key in index, "missing key: {}".format(key)
match = index[key]
assert match.value == "value-{}".format(i)



def test_external_merge_with_overwrite():
with tempfile.TemporaryDirectory() as test_dir:
index_dir = os.path.join(test_dir, "index")
index = Index(index_dir, {"segment_external_merge_key_threshold": "100"})

# write initial data across multiple segments
for batch in range(5):
key_values = []
for i in range(50):
key = "key-{:05d}".format(batch * 50 + i)
value = "value-v1-{}".format(batch * 50 + i)
key_values.append((key, value))
index.bulk_set(key_values)
index.flush()

# overwrite some keys with new values in new segments
for batch in range(5):
key_values = []
for i in range(50):
key = "key-{:05d}".format(batch * 50 + i)
value = "value-v2-{}".format(batch * 50 + i)
key_values.append((key, value))
index.bulk_set(key_values)
index.flush()

# wait for external merge to complete
time.sleep(5)

# verify that the latest values are returned
for i in range(250):
key = "key-{:05d}".format(i)
assert key in index, "missing key: {}".format(key)
match = index[key]
assert match.value == "value-v2-{}".format(i), \
"expected v2 value for {}, got {}".format(key, match.value)



def test_external_merge_read_only_index():
with tempfile.TemporaryDirectory() as test_dir:
index_dir = os.path.join(test_dir, "index")
index = Index(index_dir, {"segment_external_merge_key_threshold": "100"})

for batch in range(10):
key_values = []
for i in range(50):
key = "key-{:05d}".format(batch * 50 + i)
value = "value-{}".format(batch * 50 + i)
key_values.append((key, value))
index.bulk_set(key_values)
index.flush()

# wait for external merge to complete
time.sleep(5)

# open a read-only index and verify all data
reader = ReadOnlyIndex(index_dir)
for i in range(500):
key = "key-{:05d}".format(i)
assert key in reader, "missing key in reader: {}".format(key)

Loading