From b34109244d267006278be910b6a1d40870eac471 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 27 May 2026 14:29:16 +0200 Subject: [PATCH 1/4] parse GetKeyviMergerBin to get args --- keyvi/include/keyvi/index/internal/merge_job.h | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/keyvi/include/keyvi/index/internal/merge_job.h b/keyvi/include/keyvi/index/internal/merge_job.h index 9d2f0a278..9bc768595 100644 --- a/keyvi/include/keyvi/index/internal/merge_job.h +++ b/keyvi/include/keyvi/index/internal/merge_job.h @@ -22,6 +22,7 @@ #include //NOLINT #include #include +#include #include #include // NOLINT #include @@ -157,7 +158,16 @@ class MergeJob final { void DoExternalProcessMerge(boost::asio::io_context* external_process_ctx) { payload_.start_time_ = std::chrono::system_clock::now(); + std::istringstream iss(payload_.settings_.GetKeyviMergerBin()); + std::string executable; + iss >> executable; + std::vector args; + std::string token; + while (iss >> token) { + args.push_back(std::move(token)); + } + args.push_back("-m"); args.push_back("5242880"); @@ -169,8 +179,7 @@ class MergeJob final { args.push_back("-o"); args.push_back(payload_.output_filename_.string()); - external_process_.reset( - new boost::process::v2::process(*external_process_ctx, payload_.settings_.GetKeyviMergerBin(), args)); + external_process_.reset(new boost::process::v2::process(*external_process_ctx, executable, args)); } bool TryFinalizeMerge() { From f3879b4c0cdcef42787087ebeee8eb8aad217f52 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 27 May 2026 14:29:35 +0200 Subject: [PATCH 2/4] fix deprecated call --- python/src/py/keyvi/_pycore/keyvimerger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/py/keyvi/_pycore/keyvimerger.py b/python/src/py/keyvi/_pycore/keyvimerger.py index a526c2597..45eaa10de 100644 --- a/python/src/py/keyvi/_pycore/keyvimerger.py +++ b/python/src/py/keyvi/_pycore/keyvimerger.py @@ -41,7 +41,7 @@ merger = keyvi.compiler.JsonDictionaryMerger(params) for f in args.input_file: - merger.Add(f) + merger.add(f) merger.Merge(args.output_file) else: From 9778e5cccbd2ed66c6e93ea5852a2702c3747f8a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 28 May 2026 15:40:46 +0200 Subject: [PATCH 3/4] fix deprecation --- python/src/py/keyvi/_pycore/keyvimerger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/py/keyvi/_pycore/keyvimerger.py b/python/src/py/keyvi/_pycore/keyvimerger.py index 45eaa10de..e2425d807 100644 --- a/python/src/py/keyvi/_pycore/keyvimerger.py +++ b/python/src/py/keyvi/_pycore/keyvimerger.py @@ -43,7 +43,7 @@ for f in args.input_file: merger.add(f) - merger.Merge(args.output_file) + merger.merge(args.output_file) else: print ("ERROR: arguments wrong or missing.") sys.exit(1) From 0586dcb919adf799216226dd94a2b2dcea95e23a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 28 May 2026 17:55:29 +0200 Subject: [PATCH 4/4] add test --- .../tests/index/index_external_merge_test.py | 129 ++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 python/tests/index/index_external_merge_test.py diff --git a/python/tests/index/index_external_merge_test.py b/python/tests/index/index_external_merge_test.py new file mode 100644 index 000000000..568ec6e32 --- /dev/null +++ b/python/tests/index/index_external_merge_test.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# Usage: py.test tests + +from keyvi.index import Index, ReadOnlyIndex +import os +import tempfile +import time + + +def test_external_merge(): + with tempfile.TemporaryDirectory() as test_dir: + index_dir = os.path.join(test_dir, "index") + index = Index(index_dir, {"segment_external_merge_key_threshold": "100"}) + + for batch in range(10): + key_values = [] + for i in range(50): + key = "key-{:05d}".format(batch * 50 + i) + value = "value-{}".format(batch * 50 + i) + key_values.append((key, value)) + index.bulk_set(key_values) + index.flush() + + # wait for external merge to complete + time.sleep(5) + + # verify all keys are still accessible + for i in range(500): + key = "key-{:05d}".format(i) + assert key in index, "missing key: {}".format(key) + match = index[key] + assert match.value == "value-{}".format(i) + + + +def test_external_merge_with_deletes(): + with tempfile.TemporaryDirectory() as test_dir: + index_dir = os.path.join(test_dir, "index") + index = Index(index_dir, {"segment_external_merge_key_threshold": "100"}) + + for batch in range(10): + key_values = [] + for i in range(50): + key = "key-{:05d}".format(batch * 50 + i) + value = "value-{}".format(batch * 50 + i) + key_values.append((key, value)) + index.bulk_set(key_values) + index.flush() + + # delete every other key + for i in range(0, 500, 2): + index.delete("key-{:05d}".format(i)) + index.flush() + + # wait for external merge to complete + time.sleep(5) + + for i in range(500): + key = "key-{:05d}".format(i) + if i % 2 == 0: + assert key not in index, "key should be deleted: {}".format(key) + else: + assert key in index, "missing key: {}".format(key) + match = index[key] + assert match.value == "value-{}".format(i) + + + +def test_external_merge_with_overwrite(): + with tempfile.TemporaryDirectory() as test_dir: + index_dir = os.path.join(test_dir, "index") + index = Index(index_dir, {"segment_external_merge_key_threshold": "100"}) + + # write initial data across multiple segments + for batch in range(5): + key_values = [] + for i in range(50): + key = "key-{:05d}".format(batch * 50 + i) + value = "value-v1-{}".format(batch * 50 + i) + key_values.append((key, value)) + index.bulk_set(key_values) + index.flush() + + # overwrite some keys with new values in new segments + for batch in range(5): + key_values = [] + for i in range(50): + key = "key-{:05d}".format(batch * 50 + i) + value = "value-v2-{}".format(batch * 50 + i) + key_values.append((key, value)) + index.bulk_set(key_values) + index.flush() + + # wait for external merge to complete + time.sleep(5) + + # verify that the latest values are returned + for i in range(250): + key = "key-{:05d}".format(i) + assert key in index, "missing key: {}".format(key) + match = index[key] + assert match.value == "value-v2-{}".format(i), \ + "expected v2 value for {}, got {}".format(key, match.value) + + + +def test_external_merge_read_only_index(): + with tempfile.TemporaryDirectory() as test_dir: + index_dir = os.path.join(test_dir, "index") + index = Index(index_dir, {"segment_external_merge_key_threshold": "100"}) + + for batch in range(10): + key_values = [] + for i in range(50): + key = "key-{:05d}".format(batch * 50 + i) + value = "value-{}".format(batch * 50 + i) + key_values.append((key, value)) + index.bulk_set(key_values) + index.flush() + + # wait for external merge to complete + time.sleep(5) + + # open a read-only index and verify all data + reader = ReadOnlyIndex(index_dir) + for i in range(500): + key = "key-{:05d}".format(i) + assert key in reader, "missing key in reader: {}".format(key) +