From ecf552e6f879641f91fdd097f7a81d185acdce82 Mon Sep 17 00:00:00 2001 From: abramsymons Date: Mon, 17 Nov 2025 16:49:45 +0330 Subject: [PATCH 01/14] init hedera consensus --- config.env | 19 ++-- consensus/config.py | 22 +--- consensus/receiver.py | 138 ++++++++++++----------- consensus/requirements.txt | 4 +- consensus/sender.py | 52 +++++---- web_services/foxx/brightid/db.js | 3 - web_services/foxx/brightid/encoding.js | 16 --- web_services/foxx/brightid/manifest.json | 9 +- 8 files changed, 120 insertions(+), 143 deletions(-) diff --git a/config.env b/config.env index a6ba7aae..26b97171 100644 --- a/config.env +++ b/config.env @@ -4,21 +4,17 @@ BN_WS_APPS_OPERATIONS_LIMIT=500 # Update this port in web/brightid-nginx.conf and docker-compose.yml too BN_WS_PROFILE_SERVICE_PORT=3000 BN_ARANGO_PROTOCOL=http -BN_ARANGO_HOST=localhost +BN_ARANGO_HOST=127.0.0.1 # Update this port in web/brightid-nginx.conf and docker-compose.yml too BN_ARANGO_PORT=8529 BN_ARANGO_SERVER_ENDPOINT=tcp://127.0.0.1:8529 -BN_CONSENSUS_INFURA_URL=wss://idchain.one/ws/ -# a tx with 100 KB data spends around 1.6M gas where gas limit per block is 8M -# https://bit.ly/3sKDux7 -BN_CONSENSUS_MAX_DATA_SIZE=100000 -BN_CONSENSUS_GAS=2000000 -BN_CONSENSUS_GAS_PRICE=10000000000 -BN_CONSENSUS_TO_ADDRESS=0xb1d1CDd5C4C541f95A73b5748392A6990cBe32b7 -BN_CONSENSUS_SNAPSHOTS_PERIOD=240 +BN_CONSENSUS_MAX_DATA_SIZE=1000 +BN_CONSENSUS_SNAPSHOTS_PERIOD=1200000 BN_CONSENSUS_APPLY_URL=/_db/_system/apply{v}/operations/{hash} +BN_CONSENSUS_MIRROR_NODE_URL="https://testnet.mirrornode.hedera.com/api/v1/topics/{topic_id}/messages?sequencenumber={sequence_number}&limit={limit}" +BN_CONSENSUS_NETWORK=testnet +BN_CONSENSUS_TOPIC_ID=0.0.7217823 BN_CONSENSUS_DUMP_URL=/_api/replication/dump -BN_CONSENSUS_IDCHAIN_RPC_URL=https://idchain.one/rpc/ BN_UPDATER_SEED_VOTING_ADDRESS=0x56741DbC203648983c359A48aaf68f25f5550B6a BN_UPDATER_SP_ADDRESS_MAINNET=0x0aB346a16ceA1B1363b20430C414eAB7bC179324 BN_UPDATER_SP_ADDRESS_IDCHAIN=0x183C5D2d1E43A3aCC8a977023796996f8AFd2327 @@ -33,4 +29,5 @@ BN_SEED= BN_WS_PRIVATE_KEY= BN_WS_ETH_PRIVATE_KEY= BN_WS_WISCHNORR_PASSWORD= -BN_CONSENSUS_PRIVATE_KEY= +BN_CONSENSUS_OPERATOR_ID= +BN_CONSENSUS_OPERATOR_KEY= \ No newline at end of file diff --git a/consensus/config.py b/consensus/config.py index db110b69..00fa7d85 100755 --- a/consensus/config.py +++ b/consensus/config.py @@ -3,22 +3,14 @@ from eth_keys import keys from eth_utils import decode_hex -INFURA_URL = os.environ["BN_CONSENSUS_INFURA_URL"] -PRIVATE_KEY = os.environ.get("BN_CONSENSUS_PRIVATE_KEY") -SEED = os.environ.get("BN_SEED") -if not PRIVATE_KEY and SEED: - PRIVATE_KEY = hashlib.sha256(SEED.encode("utf-8")).hexdigest() -ADDRESS = ( - keys.PrivateKey(decode_hex(PRIVATE_KEY)).public_key.to_checksum_address() - if PRIVATE_KEY - else "" -) +NETWORK = os.environ["BN_CONSENSUS_NETWORK"] +TOPIC_ID = os.environ["BN_CONSENSUS_TOPIC_ID"] + +OPERATOR_ID = os.environ["BN_CONSENSUS_OPERATOR_ID"] +OPERATOR_KEY = os.environ["BN_CONSENSUS_OPERATOR_KEY"] +MIRROR_NODE_URL = os.environ["BN_CONSENSUS_MIRROR_NODE_URL"] MAX_DATA_SIZE = int(os.environ["BN_CONSENSUS_MAX_DATA_SIZE"]) -GAS = int(os.environ["BN_CONSENSUS_GAS"]) -GAS_PRICE = int(os.environ["BN_CONSENSUS_GAS_PRICE"]) -TO_ADDRESS = os.environ["BN_CONSENSUS_TO_ADDRESS"] -DEPRECATED_TO_ADDRESS = "0x0000000000000000000000000000000000000007" SNAPSHOTS_PERIOD = int(os.environ["BN_CONSENSUS_SNAPSHOTS_PERIOD"]) SNAPSHOTS_PATH = "/snapshots/dump_{}" @@ -30,5 +22,3 @@ APPLY_URL = ARANGO_SERVER + os.environ["BN_CONSENSUS_APPLY_URL"] DUMP_URL = ARANGO_SERVER + os.environ["BN_CONSENSUS_DUMP_URL"] - -IDCHAIN_RPC_URL = os.environ["BN_CONSENSUS_IDCHAIN_RPC_URL"] diff --git a/consensus/receiver.py b/consensus/receiver.py index 6bfccee9..0588225c 100755 --- a/consensus/receiver.py +++ b/consensus/receiver.py @@ -1,5 +1,6 @@ import os import time +import math import socket import json import base64 @@ -8,20 +9,13 @@ import requests import traceback from arango import ArangoClient, errno -from web3 import Web3 -from web3.middleware import geth_poa_middleware import config db = ArangoClient(hosts=config.ARANGO_SERVER).db("_system") -w3 = Web3(Web3.WebsocketProvider(config.INFURA_URL)) -if config.INFURA_URL.count("rinkeby") > 0 or config.INFURA_URL.count("idchain") > 0: - w3.middleware_onion.inject(geth_poa_middleware, layer=0) - -NUM_SEALERS = 0 +variables = db.collection("variables") def hash(op): - blockTime = op["blockTime"] op = { k: op[k] for k in op @@ -30,8 +24,8 @@ def hash(op): if op["name"] == "Set Signing Key": del op["id1"] del op["id2"] - # in next release checking blockTime should be removed - if op["name"] == "Social Recovery" and op["v"] == 6 and blockTime > 1637380189000: + + if op["name"] == "Social Recovery" and op["v"] == 6: for k in ["id1", "id2", "id3", "id4", "id5"]: op.pop(k, None) message = json.dumps(op, sort_keys=True, separators=(",", ":")) @@ -41,19 +35,20 @@ def hash(op): return h.replace("+", "-").replace("/", "_").replace("=", "") -def process(data, block_timestamp): - data_bytes = bytes.fromhex(data.strip("0x")) - data_str = data_bytes.decode("utf-8", "ignore") +def process(message): + encoded_message = message.get("message", "") + message_content = base64.b64decode(encoded_message).decode("utf-8").strip() + try: - operations = json.loads(data_str) + operations = json.loads(message_content) except ValueError: - print("error in parsing operations", data_str) + print("error in parsing operations", message_content) return for op in operations: if type(op) is not dict or op.get("v") not in (5, 6) or "name" not in op: print("invalid operation", op) continue - op["blockTime"] = block_timestamp * 1000 + op["blockTime"] = int(float(message["consensus_timestamp"]) * 1000) process_op(op) @@ -74,8 +69,8 @@ def process_op(op): raise Exception("Error from apply service") -def save_snapshot(block): - dir_name = config.SNAPSHOTS_PATH.format(block) +def save_snapshot(next_snapshot_timestamp): + dir_name = config.SNAPSHOTS_PATH.format(next_snapshot_timestamp) fnl_dir_name = f"{dir_name}_fnl" dir_path = os.path.dirname(os.path.realpath(__file__)) collections_file = os.path.join(dir_path, "collections.json") @@ -84,18 +79,8 @@ def save_snapshot(block): ) assert res == 0, "dumping snapshot failed" shutil.move(dir_name, fnl_dir_name) - - -def update_num_sealers(): - global NUM_SEALERS - data = {"jsonrpc": "2.0", "method": "clique_status", "params": [], "id": 1} - headers = {"Content-Type": "application/json", "Cache-Control": "no-cache"} - try: - resp = requests.post(config.IDCHAIN_RPC_URL, json=data, headers=headers) - NUM_SEALERS = len(resp.json()["result"]["sealerActivity"]) - except Exception as e: - print("Error from update_num_sealers", e) - update_num_sealers() + variables.update({"_key": "PREV_SNAPSHOT_TIME", "value": next_snapshot_timestamp}) + remove_old_operations() def remove_old_operations(): @@ -112,47 +97,66 @@ def remove_old_operations(): ) +def get_sequence_number(): + if variables.has("SEQUENCE_NUMBER"): + return variables.get("SEQUENCE_NUMBER")["value"] + else: + variables.insert({"_key": "SEQUENCE_NUMBER", "value": 3}) + return 3 + + +def get_next_snapshot_timestamp(sequence_number): + if variables.has("PREV_SNAPSHOT_TIME"): + return variables.get("PREV_SNAPSHOT_TIME")["value"] + config.SNAPSHOTS_PERIOD + else: + url = config.MIRROR_NODE_URL.format( + topic_id=config.TOPIC_ID, sequence_number=sequence_number, limit=1 + ) + r = requests.get(url) + messages = r.json()["messages"] + if len(messages) == 0: + raise Exception("no genesis message! consensus receiver stopped ...") + + timestamp = int(float(messages[0]["consensus_timestamp"]) * 1000) + prev_snapshot_timestamp = ( + int(timestamp / config.SNAPSHOTS_PERIOD) * config.SNAPSHOTS_PERIOD + ) + variables.insert( + {"_key": "PREV_SNAPSHOT_TIME", "value": prev_snapshot_timestamp} + ) + return prev_snapshot_timestamp + config.SNAPSHOTS_PERIOD + + def main(): - update_num_sealers() - variables = db.collection("variables") - last_block = variables.get("LAST_BLOCK")["value"] + sequence_number = get_sequence_number() + next_snapshot_timestamp = get_next_snapshot_timestamp(sequence_number) while True: - # This sleep is for not calling the ethereum node endpoint - # for getting the last block number more than once per second time.sleep(1) - current_block = w3.eth.getBlock("latest").number - confirmed_block = current_block - (NUM_SEALERS // 2 + 1) - - if confirmed_block > last_block: - # Here we should go to process the block imediately, but there seems - # to be a bug in getBlock that cause error when we get the transactions - # instantly. This delay is added to avoid that error. - # When error is raised, the file will run again and no bad problem occur. - time.sleep(3) - - for block_number in range(last_block + 1, confirmed_block + 1): - print("processing block {}".format(block_number)) - if block_number % 100 == 0: - update_num_sealers() - block = w3.eth.getBlock(block_number, True) - for i, tx in enumerate(block["transactions"]): - if tx["to"] and tx["to"].lower() in ( - config.TO_ADDRESS.lower(), - config.DEPRECATED_TO_ADDRESS.lower(), - ): - process(tx["input"], block.timestamp) - if block_number % config.SNAPSHOTS_PERIOD == 0: - save_snapshot(block_number) - # PREV_SNAPSHOT_TIME is used by some verification - # algorithms to filter connections that are made - # after previous processed snapshot - variables.update( - {"_key": "PREV_SNAPSHOT_TIME", "value": block["timestamp"]} - ) - remove_old_operations() - variables.update({"_key": "LAST_BLOCK", "value": block_number}) - last_block = block_number + url = config.MIRROR_NODE_URL.format( + topic_id=config.TOPIC_ID, + sequence_number="gt:{}".format(sequence_number), + limit=100, + ) + r = requests.get(url) + + messages = r.json()["messages"] + for i, message in enumerate(messages): + consensus_timestamp = int(float(message["consensus_timestamp"]) * 1000) + if consensus_timestamp >= next_snapshot_timestamp: + save_snapshot(next_snapshot_timestamp) + next_snapshot_timestamp += config.SNAPSHOTS_PERIOD + + process(message) + sequence_number = message["sequence_number"] + variables.update({"_key": "SEQUENCE_NUMBER", "value": sequence_number}) + + now = time.time() * 1000 + allowed_delay = min(config.SNAPSHOTS_PERIOD / 2, 60 * 1000) + + if len(messages) == 0 and now > next_snapshot_timestamp + allowed_delay: + save_snapshot(next_snapshot_timestamp) + next_snapshot_timestamp += config.SNAPSHOTS_PERIOD def wait(): diff --git a/consensus/requirements.txt b/consensus/requirements.txt index 76161229..c0185673 100644 --- a/consensus/requirements.txt +++ b/consensus/requirements.txt @@ -1,3 +1,3 @@ python-arango==5.4.0 -web3==5.0.0 -requests \ No newline at end of file +hiero_sdk_python==0.1.7 +requests diff --git a/consensus/sender.py b/consensus/sender.py index 37e9a37f..368998a3 100644 --- a/consensus/sender.py +++ b/consensus/sender.py @@ -1,29 +1,37 @@ import socket import time import json -import binascii from arango import ArangoClient -from web3 import Web3 import config +from hiero_sdk_python import ( + Client, + Network, + AccountId, + PrivateKey, + TopicMessageSubmitTransaction, + TopicId, +) + +print(f"Connecting to Hedera {config.NETWORK} network!") +network = Network(config.NETWORK) +client = Client(network) +operator_id = AccountId.from_string(config.OPERATOR_ID) +operator_key = PrivateKey.from_string_ecdsa(config.OPERATOR_KEY) +client.set_operator(operator_id, operator_key) -w3 = Web3(Web3.WebsocketProvider(config.INFURA_URL)) db = ArangoClient(hosts=config.ARANGO_SERVER).db("_system") -def sendTransaction(data): - nonce = w3.eth.getTransactionCount(config.ADDRESS, "pending") - tx = { - "to": config.TO_ADDRESS, - "value": 0, - "gas": config.GAS, - "gasPrice": config.GAS_PRICE, - "nonce": nonce, - "chainId": w3.eth.chainId, - "data": data, - } - signed = w3.eth.account.sign_transaction(tx, config.PRIVATE_KEY) - tx = w3.eth.sendRawTransaction(signed.rawTransaction).hex() - return tx +def sendMessage(data): + topic_id = TopicId.from_string(config.TOPIC_ID) + message_transaction = ( + TopicMessageSubmitTransaction() + .set_topic_id(topic_id) + .set_message(data) + .freeze_with(client) + .sign(operator_key) + ) + message_transaction.execute(client) def main(): @@ -41,12 +49,14 @@ def main(): if not operations: return - data = json.dumps(operations).encode("utf-8") - data = "0x" + binascii.hexlify(data).decode("utf-8") - transaction_hash = sendTransaction(data) + data = json.dumps(operations) + sendMessage(data) for i, op in enumerate(operations): db.collection("operations").update( - {"_key": hashes[i], "state": "sent", "transactionHash": transaction_hash}, + { + "_key": hashes[i], + "state": "sent", + }, merge=True, ) diff --git a/web_services/foxx/brightid/db.js b/web_services/foxx/brightid/db.js index 2dbb6e7e..aae230fb 100755 --- a/web_services/foxx/brightid/db.js +++ b/web_services/foxx/brightid/db.js @@ -7,7 +7,6 @@ const { priv2addr, getNaclKeyPair, getEthKeyPair, - getConsensusSenderAddress, } = require("./encoding"); const errors = require("./errors"); const wISchnorrServer = require("./WISchnorrServer"); @@ -783,7 +782,6 @@ function getState() { variablesColl.document("VERIFICATIONS_HASHES").hashes ); const conf = module.context.configuration; - const consensusSenderAddress = getConsensusSenderAddress(); const { privateKey: ethPrivateKey } = getEthKeyPair(); const { publicKey: naclSigningKey } = getNaclKeyPair(); let wISchnorrPublic = null; @@ -815,7 +813,6 @@ function getState() { wISchnorrPublic, ethSigningAddress: priv2addr(ethPrivateKey), naclSigningKey, - consensusSenderAddress, development: conf.development, version: module.context.manifest.version, appsLastUpdateBlock, diff --git a/web_services/foxx/brightid/encoding.js b/web_services/foxx/brightid/encoding.js index cd4676eb..13367000 100755 --- a/web_services/foxx/brightid/encoding.js +++ b/web_services/foxx/brightid/encoding.js @@ -114,21 +114,6 @@ function getEthKeyPair() { return { publicKey, privateKey }; } -function getConsensusSenderAddress() { - let address = null; - if (conf.consensusSenderPrivateKey) { - const uint8ArrayPrivateKey = new Uint8Array( - Buffer.from(conf.consensusSenderPrivateKey, "hex") - ); - address = priv2addr(uint8ArrayPrivateKey); - } else if (conf.seed) { - const hex32 = crypto.sha256(conf.seed); - const uint8ArrayPrivateKey = new Uint8Array(Buffer.from(hex32, "hex")); - address = priv2addr(uint8ArrayPrivateKey); - } - return address; -} - function modPow(a, exp, b) { const response = request({ method: "get", @@ -154,6 +139,5 @@ module.exports = { priv2addr, getNaclKeyPair, getEthKeyPair, - getConsensusSenderAddress, modPow, }; diff --git a/web_services/foxx/brightid/manifest.json b/web_services/foxx/brightid/manifest.json index 373d99d0..7b597cc3 100755 --- a/web_services/foxx/brightid/manifest.json +++ b/web_services/foxx/brightid/manifest.json @@ -3,14 +3,14 @@ "name": "BrightID-Node", "description": "Read and update the anonymous social graph stored on BrightID nodes.", "license": "ISC", - "version": "6.18.0", + "version": "6.19.0", "tests": ["tests/*.js"], "scripts": { "setup": "initdb.js" }, "configuration": { "seed": { - "description": "A password; is used for generating privateKey, ethPrivateKey, consensusSenderPrivateKey, and wISchnorrPassword if they haven't set (string)", + "description": "A password; is used for generating privateKey, ethPrivateKey, and wISchnorrPassword if they haven't set (string)", "type": "string", "required": false }, @@ -24,11 +24,6 @@ "type": "string", "required": false }, - "consensusSenderPrivateKey": { - "description": "Ethereum private key of consensus sender service of this server node; used for sending operations (hex representation without 0x)", - "type": "string", - "required": false - }, "wISchnorrPassword": { "description": "Public key of this server node (base64 encoded)", "type": "string", From 35951de4e7f35425df7c9413fc0ce255c18c171b Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 13:45:53 +0330 Subject: [PATCH 02/14] enable sending to both idchain and hedera --- config.env | 19 ++++++++++++++++--- consensus/config.py | 21 ++++++++++++++++++++- consensus/receiver.py | 15 ++++++++------- consensus/requirements.txt | 1 + consensus/sender.py | 23 +++++++++++++++++++++++ 5 files changed, 68 insertions(+), 11 deletions(-) diff --git a/config.env b/config.env index 26b97171..6e792e74 100644 --- a/config.env +++ b/config.env @@ -4,17 +4,26 @@ BN_WS_APPS_OPERATIONS_LIMIT=500 # Update this port in web/brightid-nginx.conf and docker-compose.yml too BN_WS_PROFILE_SERVICE_PORT=3000 BN_ARANGO_PROTOCOL=http -BN_ARANGO_HOST=127.0.0.1 +BN_ARANGO_HOST=localhost # Update this port in web/brightid-nginx.conf and docker-compose.yml too BN_ARANGO_PORT=8529 BN_ARANGO_SERVER_ENDPOINT=tcp://127.0.0.1:8529 BN_CONSENSUS_MAX_DATA_SIZE=1000 -BN_CONSENSUS_SNAPSHOTS_PERIOD=1200000 +BN_CONSENSUS_SNAPSHOTS_PERIOD_MILLISECONDS=1200000 BN_CONSENSUS_APPLY_URL=/_db/_system/apply{v}/operations/{hash} BN_CONSENSUS_MIRROR_NODE_URL="https://testnet.mirrornode.hedera.com/api/v1/topics/{topic_id}/messages?sequencenumber={sequence_number}&limit={limit}" BN_CONSENSUS_NETWORK=testnet BN_CONSENSUS_TOPIC_ID=0.0.7217823 BN_CONSENSUS_DUMP_URL=/_api/replication/dump + +# following BN_CONSENSUS variables should be removed in the next update +BN_CONSENSUS_GAS=2000000 +BN_CONSENSUS_GAS_PRICE=10000000000 +BN_CONSENSUS_TO_ADDRESS=0xb1d1CDd5C4C541f95A73b5748392A6990cBe32b7 +BN_CONSENSUS_SNAPSHOTS_PERIOD=240 +BN_CONSENSUS_IDCHAIN_RPC_URL=https://idchain.one/rpc/ +BN_CONSENSUS_INFURA_URL=wss://idchain.one/ws/ + BN_UPDATER_SEED_VOTING_ADDRESS=0x56741DbC203648983c359A48aaf68f25f5550B6a BN_UPDATER_SP_ADDRESS_MAINNET=0x0aB346a16ceA1B1363b20430C414eAB7bC179324 BN_UPDATER_SP_ADDRESS_IDCHAIN=0x183C5D2d1E43A3aCC8a977023796996f8AFd2327 @@ -24,10 +33,14 @@ BN_UPDATER_SEED_GROUPS_WS_URL=wss://idchain.one/ws/ BN_ARANGO_EXTRA_OPTS= BN_DEVELOPMENT=false BN_PEERS= + # passwords BN_SEED= BN_WS_PRIVATE_KEY= BN_WS_ETH_PRIVATE_KEY= BN_WS_WISCHNORR_PASSWORD= BN_CONSENSUS_OPERATOR_ID= -BN_CONSENSUS_OPERATOR_KEY= \ No newline at end of file +BN_CONSENSUS_OPERATOR_KEY= + +# this variable should be removed in the next update +BN_CONSENSUS_PRIVATE_KEY= \ No newline at end of file diff --git a/consensus/config.py b/consensus/config.py index 00fa7d85..fa3b0582 100755 --- a/consensus/config.py +++ b/consensus/config.py @@ -3,6 +3,25 @@ from eth_keys import keys from eth_utils import decode_hex +## following variables should be removed in the next update ## +INFURA_URL = os.environ["BN_CONSENSUS_INFURA_URL"] +PRIVATE_KEY = os.environ.get("BN_CONSENSUS_PRIVATE_KEY") +SEED = os.environ.get("BN_SEED") +if not PRIVATE_KEY and SEED: + PRIVATE_KEY = hashlib.sha256(SEED.encode("utf-8")).hexdigest() +ADDRESS = ( + keys.PrivateKey(decode_hex(PRIVATE_KEY)).public_key.to_checksum_address() + if PRIVATE_KEY + else "" +) +GAS = int(os.environ["BN_CONSENSUS_GAS"]) +GAS_PRICE = int(os.environ["BN_CONSENSUS_GAS_PRICE"]) +TO_ADDRESS = os.environ["BN_CONSENSUS_TO_ADDRESS"] +DEPRECATED_TO_ADDRESS = "0x0000000000000000000000000000000000000007" +IDCHAIN_RPC_URL = os.environ["BN_CONSENSUS_IDCHAIN_RPC_URL"] +SNAPSHOTS_PERIOD = os.environ["BN_CONSENSUS_SNAPSHOTS_PERIOD"] +############################################################## + NETWORK = os.environ["BN_CONSENSUS_NETWORK"] TOPIC_ID = os.environ["BN_CONSENSUS_TOPIC_ID"] @@ -12,7 +31,7 @@ MAX_DATA_SIZE = int(os.environ["BN_CONSENSUS_MAX_DATA_SIZE"]) -SNAPSHOTS_PERIOD = int(os.environ["BN_CONSENSUS_SNAPSHOTS_PERIOD"]) +SNAPSHOTS_PERIOD_MILLISECONDS = int(os.environ["BN_CONSENSUS_SNAPSHOTS_PERIOD_MILLISECONDS"]) SNAPSHOTS_PATH = "/snapshots/dump_{}" BN_ARANGO_PROTOCOL = os.environ["BN_ARANGO_PROTOCOL"] diff --git a/consensus/receiver.py b/consensus/receiver.py index 0588225c..7a0e59d7 100755 --- a/consensus/receiver.py +++ b/consensus/receiver.py @@ -107,7 +107,7 @@ def get_sequence_number(): def get_next_snapshot_timestamp(sequence_number): if variables.has("PREV_SNAPSHOT_TIME"): - return variables.get("PREV_SNAPSHOT_TIME")["value"] + config.SNAPSHOTS_PERIOD + return variables.get("PREV_SNAPSHOT_TIME")["value"] + config.SNAPSHOTS_PERIOD_MILLISECONDS else: url = config.MIRROR_NODE_URL.format( topic_id=config.TOPIC_ID, sequence_number=sequence_number, limit=1 @@ -119,12 +119,12 @@ def get_next_snapshot_timestamp(sequence_number): timestamp = int(float(messages[0]["consensus_timestamp"]) * 1000) prev_snapshot_timestamp = ( - int(timestamp / config.SNAPSHOTS_PERIOD) * config.SNAPSHOTS_PERIOD + int(timestamp / config.SNAPSHOTS_PERIOD_MILLISECONDS) * config.SNAPSHOTS_PERIOD_MILLISECONDS ) variables.insert( {"_key": "PREV_SNAPSHOT_TIME", "value": prev_snapshot_timestamp} ) - return prev_snapshot_timestamp + config.SNAPSHOTS_PERIOD + return prev_snapshot_timestamp + config.SNAPSHOTS_PERIOD_MILLISECONDS def main(): @@ -143,20 +143,21 @@ def main(): messages = r.json()["messages"] for i, message in enumerate(messages): consensus_timestamp = int(float(message["consensus_timestamp"]) * 1000) - if consensus_timestamp >= next_snapshot_timestamp: + if next_snapshot_timestamp <= consensus_timestamp: save_snapshot(next_snapshot_timestamp) - next_snapshot_timestamp += config.SNAPSHOTS_PERIOD + while next_snapshot_timestamp <= consensus_timestamp: + next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_MILLISECONDS process(message) sequence_number = message["sequence_number"] variables.update({"_key": "SEQUENCE_NUMBER", "value": sequence_number}) now = time.time() * 1000 - allowed_delay = min(config.SNAPSHOTS_PERIOD / 2, 60 * 1000) + allowed_delay = min(config.SNAPSHOTS_PERIOD_MILLISECONDS / 2, 60 * 1000) if len(messages) == 0 and now > next_snapshot_timestamp + allowed_delay: save_snapshot(next_snapshot_timestamp) - next_snapshot_timestamp += config.SNAPSHOTS_PERIOD + next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_MILLISECONDS def wait(): diff --git a/consensus/requirements.txt b/consensus/requirements.txt index c0185673..79249ca0 100644 --- a/consensus/requirements.txt +++ b/consensus/requirements.txt @@ -1,3 +1,4 @@ python-arango==5.4.0 hiero_sdk_python==0.1.7 +web3==6.20.4 requests diff --git a/consensus/sender.py b/consensus/sender.py index 368998a3..27c35217 100644 --- a/consensus/sender.py +++ b/consensus/sender.py @@ -1,7 +1,9 @@ import socket import time import json +import binascii from arango import ArangoClient +from web3 import Web3 import config from hiero_sdk_python import ( Client, @@ -21,6 +23,24 @@ db = ArangoClient(hosts=config.ARANGO_SERVER).db("_system") +# This method should be removed in the next update +def sendTransaction(data): + data = data.encode("utf-8") + data = "0x" + binascii.hexlify(data).decode("utf-8") + w3 = Web3(Web3.WebsocketProvider(config.INFURA_URL)) + nonce = w3.eth.get_transaction_count(config.ADDRESS, "pending") + tx = { + "to": config.TO_ADDRESS, + "value": 0, + "gas": config.GAS, + "gasPrice": config.GAS_PRICE, + "nonce": nonce, + "chainId": w3.eth.chain_id, + "data": data, + } + signed = w3.eth.account.sign_transaction(tx, config.PRIVATE_KEY) + tx = w3.eth.send_raw_transaction(signed.rawTransaction).hex() + return tx def sendMessage(data): topic_id = TopicId.from_string(config.TOPIC_ID) @@ -50,6 +70,9 @@ def main(): return data = json.dumps(operations) + + # send the operation to the both services until all nodes upgrade to hedera + sendTransaction(data) sendMessage(data) for i, op in enumerate(operations): db.collection("operations").update( From 34c2dcac1817a7e4f9c21246bc373c26f8e3ed2d Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 14:21:17 +0330 Subject: [PATCH 03/14] update arangodb --- db/Dockerfile | 39 +++++++++++++++++++++++---------------- db/docker-entrypoint.sh | 21 ++++++--------------- db/docker-foxx.sh | 5 ----- 3 files changed, 29 insertions(+), 36 deletions(-) mode change 100755 => 100644 db/docker-entrypoint.sh delete mode 100755 db/docker-foxx.sh diff --git a/db/Dockerfile b/db/Dockerfile index dfab4f24..35c55c24 100644 --- a/db/Dockerfile +++ b/db/Dockerfile @@ -1,24 +1,28 @@ -# Edited from https://github.com/arangodb/arangodb-docker/blob/official/alpine/3.9.1/Dockerfile -FROM alpine:3.14 +# Edited from https://github.com/arangodb/arangodb-docker/blob/official/alpine/3.12.4.3/Dockerfile + +FROM alpine:3.21 MAINTAINER Frank Celler -ENV ARANGO_VERSION 3.9.1 +ENV ARANGO_VERSION 3.12.4.3 ENV ARANGO_NO_AUTH true -ENV ARANGO_URL https://download.arangodb.com/arangodb39/DEBIAN/amd64 -ENV ARANGO_PACKAGE arangodb3_${ARANGO_VERSION}-1_amd64.deb -ENV ARANGO_PACKAGE_URL ${ARANGO_URL}/${ARANGO_PACKAGE} -ENV ARANGO_SIGNATURE_URL ${ARANGO_PACKAGE_URL}.asc - # see -# https://www.arangodb.com/docs/3.9/programs-arangod-server.html#managing-endpoints -# https://www.arangodb.com/docs/3.9/programs-arangod-log.html +# https://docs.arangodb.com/3.12/components/arangodb-server/options/#--serverendpoint +# https://docs.arangodb.com/3.12/components/arangodb-server/options/#log -RUN apk add --no-cache gnupg pwgen binutils numactl numactl-tools nodejs yarn && \ - yarn global add foxx-cli@2.0.1 && \ - apk del yarn && \ - gpg --batch --keyserver keys.openpgp.org --recv-keys CD8CB0F1E0AD5B52E93F41E7EA93F5E56E751E9B && \ +RUN apk add --no-cache gnupg pwgen binutils numactl numactl-tools && \ + gpg --batch --keyserver keys.openpgp.org --recv-keys 8003EDF6F05459984878D4A6C04AD0FD86FEC04D && \ mkdir /docker-entrypoint-initdb.d && \ cd /tmp && \ + arch="$(apk --print-arch)" && \ + case "$arch" in \ + x86_64) dpkgArch='amd64' ;; \ + aarch64) dpkgArch='arm64' ;; \ + *) echo >&2 "unsupported: $arch" && exit 1 ;; \ + esac && \ + ARANGO_URL="https://download.arangodb.com/arangodb312/DEBIAN/$dpkgArch" && \ + ARANGO_PACKAGE="arangodb3_${ARANGO_VERSION}-1_${dpkgArch}.deb" && \ + ARANGO_PACKAGE_URL="${ARANGO_URL}/${ARANGO_PACKAGE}" && \ + ARANGO_SIGNATURE_URL="${ARANGO_PACKAGE_URL}.asc" && \ wget ${ARANGO_SIGNATURE_URL} && \ wget ${ARANGO_PACKAGE_URL} && \ gpg --verify ${ARANGO_PACKAGE}.asc && \ @@ -30,16 +34,19 @@ RUN apk add --no-cache gnupg pwgen binutils numactl numactl-tools nodejs yarn && /etc/arangodb3/arangod.conf && \ chgrp -R 0 /var/lib/arangodb3 /var/lib/arangodb3-apps && \ chmod -R 775 /var/lib/arangodb3 /var/lib/arangodb3-apps && \ - rm -f /usr/bin/foxx && \ rm -f ${ARANGO_PACKAGE}* data.tar.gz && \ apk del gnupg # Note that Openshift runs containers by default with a random UID and GID 0. # We need that the database and apps directory are writable for this config. ENV GLIBCXX_FORCE_NEW=1 + +# Adjust TZ by default since tzdata package isn't present (BTS-913) +RUN echo "UTC" > /etc/timezone + COPY docker-entrypoint.sh /entrypoint.sh -COPY docker-foxx.sh /usr/bin/foxx ENTRYPOINT ["/entrypoint.sh"] +# standard port CMD ["arangod"] diff --git a/db/docker-entrypoint.sh b/db/docker-entrypoint.sh old mode 100755 new mode 100644 index 81feb4ff..edb5e3bd --- a/db/docker-entrypoint.sh +++ b/db/docker-entrypoint.sh @@ -1,5 +1,5 @@ #!/bin/sh -# Edited from https://github.com/arangodb/arangodb-docker/blob/official/alpine/3.9.1/docker-entrypoint.sh +# Edited from https://github.com/arangodb/arangodb-docker/blob/official/alpine/3.12.4.3/docker-entrypoint.sh set -e echo "BN_ARANGO_EXTRA_OPTS: $BN_ARANGO_EXTRA_OPTS" @@ -60,6 +60,7 @@ if [ "$1" = 'arangod' ]; then echo "Using encrypted database" sed -i /tmp/arangod.conf -e "s;^.*encryption-keyfile.*;encryption-keyfile=$ARANGO_ENCRYPTION_KEYFILE;" fi + if [ "$INIT_BRIGHTID_DB" == "1" ] || ([ ! -f /var/lib/arangodb3/SERVER ] && [ "$SKIP_DATABASE_INIT" != "1" ]); then if [ ! -z "$ARANGO_ROOT_PASSWORD_FILE" ]; then if [ -f "$ARANGO_ROOT_PASSWORD_FILE" ]; then @@ -67,7 +68,7 @@ if [ "$1" = 'arangod' ]; then else echo "WARNING: password file '$ARANGO_ROOT_PASSWORD_FILE' does not exist" fi - fi + fi # Please note that the +x in the following line is for the case # that ARANGO_ROOT_PASSWORD is set but to an empty value, please # do not remove! @@ -101,8 +102,8 @@ if [ "$1" = 'arangod' ]; then $NUMACTL arangod --config /tmp/arangod.conf \ --server.endpoint tcp://127.0.0.1:$ARANGO_INIT_PORT \ --server.authentication false \ - --log.file /tmp/init-log \ - --log.foreground-tty false & + --log.file /tmp/init-log \ + --log.foreground-tty false & pid="$!" counter=0 @@ -129,12 +130,6 @@ if [ "$1" = 'arangod' ]; then > /dev/null 2>&1 || ARANGO_UP=0 done - if [ "$(id -u)" = "0" ] ; then - foxx server set default http://127.0.0.1:$ARANGO_INIT_PORT - else - echo Not setting foxx server default because we are not root. - fi - for f in /docker-entrypoint-initdb.d/*; do case "$f" in *.sh) @@ -165,10 +160,6 @@ if [ "$1" = 'arangod' ]; then esac done - if [ "$(id -u)" = "0" ] ; then - foxx server remove default - fi - if ! kill -s TERM "$pid" || ! wait "$pid"; then echo >&2 'ArangoDB Init failed.' exit 1 @@ -183,7 +174,7 @@ if [ "$1" = 'arangod' ]; then shift if [ ! -z "$ARANGO_NO_AUTH" ]; then - AUTHENTICATION="false" + AUTHENTICATION="false" fi set -- arangod "$@" --server.endpoint=$BN_ARANGO_SERVER_ENDPOINT --server.authentication="$AUTHENTICATION" --config /tmp/arangod.conf $BN_ARANGO_EXTRA_OPTS diff --git a/db/docker-foxx.sh b/db/docker-foxx.sh deleted file mode 100755 index 78f790f6..00000000 --- a/db/docker-foxx.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/sh -# Edited from https://github.com/arangodb/arangodb-docker/blob/official/alpine/3.9.1/docker-foxx.sh -test -d /tmp/foxx || mkdir -m 700 /tmp/foxx -export HOME=/tmp/foxx -exec /usr/local/share/.config/yarn/global/node_modules/.bin/foxx "$@" From 44c3b532d27642a4796a6106be4c441095371785 Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 14:27:40 +0330 Subject: [PATCH 04/14] upgrade consensus python to 3.12 --- consensus/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/Dockerfile b/consensus/Dockerfile index aa867ce1..d609b83e 100644 --- a/consensus/Dockerfile +++ b/consensus/Dockerfile @@ -1,5 +1,5 @@ # 1st stage -FROM python:3.7 as builder +FROM python:3.12 as builder ADD . /code WORKDIR /code/ From e21ee5ee2b1f0cec1e1a5616c9a331d5ee64bd3b Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 14:46:04 +0330 Subject: [PATCH 05/14] add +x permission to docker-entrypoint.sh --- db/docker-entrypoint.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 db/docker-entrypoint.sh diff --git a/db/docker-entrypoint.sh b/db/docker-entrypoint.sh old mode 100644 new mode 100755 From d6e7368f76f26ba056606342e365e64ab9874413 Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 15:47:47 +0330 Subject: [PATCH 06/14] update consensus Dockerfile --- consensus/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/Dockerfile b/consensus/Dockerfile index d609b83e..497ec9b7 100644 --- a/consensus/Dockerfile +++ b/consensus/Dockerfile @@ -7,7 +7,7 @@ WORKDIR /code/ RUN pip3 install --user -r requirements.txt # 2nd stage -FROM python:3.7-slim as runner +FROM python:3.12-slim as runner ADD . /code WORKDIR /code/ ADD https://download.arangodb.com/arangodb36/Community/Linux/arangodb3-client_3.6.4-1_amd64.deb ./ From 7a1c3122d657917bc88fede784a0ee40e39797fd Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 16:01:58 +0330 Subject: [PATCH 07/14] update arangodb client --- config.env | 2 +- consensus/Dockerfile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config.env b/config.env index 6e792e74..688897da 100644 --- a/config.env +++ b/config.env @@ -11,7 +11,7 @@ BN_ARANGO_SERVER_ENDPOINT=tcp://127.0.0.1:8529 BN_CONSENSUS_MAX_DATA_SIZE=1000 BN_CONSENSUS_SNAPSHOTS_PERIOD_MILLISECONDS=1200000 BN_CONSENSUS_APPLY_URL=/_db/_system/apply{v}/operations/{hash} -BN_CONSENSUS_MIRROR_NODE_URL="https://testnet.mirrornode.hedera.com/api/v1/topics/{topic_id}/messages?sequencenumber={sequence_number}&limit={limit}" +BN_CONSENSUS_MIRROR_NODE_URL=https://testnet.mirrornode.hedera.com/api/v1/topics/{topic_id}/messages?sequencenumber={sequence_number}&limit={limit} BN_CONSENSUS_NETWORK=testnet BN_CONSENSUS_TOPIC_ID=0.0.7217823 BN_CONSENSUS_DUMP_URL=/_api/replication/dump diff --git a/consensus/Dockerfile b/consensus/Dockerfile index 497ec9b7..fc94b443 100644 --- a/consensus/Dockerfile +++ b/consensus/Dockerfile @@ -10,7 +10,7 @@ RUN pip3 install --user -r requirements.txt FROM python:3.12-slim as runner ADD . /code WORKDIR /code/ -ADD https://download.arangodb.com/arangodb36/Community/Linux/arangodb3-client_3.6.4-1_amd64.deb ./ +ADD https://download.arangodb.com/arangodb312/DEBIAN/amd64/arangodb3-client_3.12.0-1_amd64.deb ./ RUN dpkg -i arangodb3-client_3.6.4-1_amd64.deb && rm arangodb3-client_3.6.4-1_amd64.deb COPY docker-entrypoint.sh /usr/local/bin/ # Copy installed packages from 1st stage From 47e5fe21fd6164d2b5a4bc92a06d74043fd0f251 Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 16:05:05 +0330 Subject: [PATCH 08/14] fix a bug --- consensus/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/Dockerfile b/consensus/Dockerfile index fc94b443..e2d2ed68 100644 --- a/consensus/Dockerfile +++ b/consensus/Dockerfile @@ -11,7 +11,7 @@ FROM python:3.12-slim as runner ADD . /code WORKDIR /code/ ADD https://download.arangodb.com/arangodb312/DEBIAN/amd64/arangodb3-client_3.12.0-1_amd64.deb ./ -RUN dpkg -i arangodb3-client_3.6.4-1_amd64.deb && rm arangodb3-client_3.6.4-1_amd64.deb +RUN dpkg -i arangodb3-client_3.12.0-1_amd64.deb && rm arangodb3-client_3.12.0-1_amd64.deb COPY docker-entrypoint.sh /usr/local/bin/ # Copy installed packages from 1st stage COPY --from=builder /root/.local /root/.local From e40ef3920c77829ecaa5fbbaeca0bfe4704aba75 Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 16:19:36 +0330 Subject: [PATCH 09/14] update scorer Dockerfile arango client --- scorer/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scorer/Dockerfile b/scorer/Dockerfile index e1da0bd2..0b0a2dc1 100644 --- a/scorer/Dockerfile +++ b/scorer/Dockerfile @@ -13,8 +13,8 @@ RUN pip3 install --user -r requirements.txt FROM python:3.7-slim as runner ADD . /code WORKDIR /code/ -ADD https://download.arangodb.com/arangodb36/Community/Linux/arangodb3-client_3.6.4-1_amd64.deb ./ -RUN dpkg -i arangodb3-client_3.6.4-1_amd64.deb && rm arangodb3-client_3.6.4-1_amd64.deb +ADD https://download.arangodb.com/arangodb312/DEBIAN/amd64/arangodb3-client_3.12.0-1_amd64.deb ./ +RUN dpkg -i arangodb3-client_3.12.0-1_amd64.deb && rm arangodb3-client_3.12.0-1_amd64.deb # Copy installed packages from 1st stage COPY --from=builder /root/.local /root/.local # Make sure scripts in .local are usable: From c79b115d04ad4c57565132fa336186d534278028 Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 16:59:58 +0330 Subject: [PATCH 10/14] use seconds as timestamp for snapshot filenames --- consensus/receiver.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consensus/receiver.py b/consensus/receiver.py index 7a0e59d7..8fd2c8f1 100755 --- a/consensus/receiver.py +++ b/consensus/receiver.py @@ -70,7 +70,7 @@ def process_op(op): def save_snapshot(next_snapshot_timestamp): - dir_name = config.SNAPSHOTS_PATH.format(next_snapshot_timestamp) + dir_name = config.SNAPSHOTS_PATH.format(int(next_snapshot_timestamp / 1000)) fnl_dir_name = f"{dir_name}_fnl" dir_path = os.path.dirname(os.path.realpath(__file__)) collections_file = os.path.join(dir_path, "collections.json") @@ -147,6 +147,7 @@ def main(): save_snapshot(next_snapshot_timestamp) while next_snapshot_timestamp <= consensus_timestamp: next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_MILLISECONDS + print(f"next snapshot timestamp will be {next_snapshot_timestamp}") process(message) sequence_number = message["sequence_number"] From f77a594afa16e0566d1b88476b8e199fe6e81768 Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 17:02:32 +0330 Subject: [PATCH 11/14] use 7th message as first --- consensus/receiver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consensus/receiver.py b/consensus/receiver.py index 8fd2c8f1..aa555af6 100755 --- a/consensus/receiver.py +++ b/consensus/receiver.py @@ -101,8 +101,8 @@ def get_sequence_number(): if variables.has("SEQUENCE_NUMBER"): return variables.get("SEQUENCE_NUMBER")["value"] else: - variables.insert({"_key": "SEQUENCE_NUMBER", "value": 3}) - return 3 + variables.insert({"_key": "SEQUENCE_NUMBER", "value": 7}) + return 7 def get_next_snapshot_timestamp(sequence_number): From 0988dff1c1e369e5e0841a82edc89c3a55d707c1 Mon Sep 17 00:00:00 2001 From: abramsymons Date: Wed, 19 Nov 2025 17:18:37 +0330 Subject: [PATCH 12/14] use seconds for prev snapshot time --- consensus/receiver.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/consensus/receiver.py b/consensus/receiver.py index aa555af6..9e401a43 100755 --- a/consensus/receiver.py +++ b/consensus/receiver.py @@ -70,7 +70,7 @@ def process_op(op): def save_snapshot(next_snapshot_timestamp): - dir_name = config.SNAPSHOTS_PATH.format(int(next_snapshot_timestamp / 1000)) + dir_name = config.SNAPSHOTS_PATH.format(next_snapshot_timestamp) fnl_dir_name = f"{dir_name}_fnl" dir_path = os.path.dirname(os.path.realpath(__file__)) collections_file = os.path.join(dir_path, "collections.json") @@ -107,7 +107,7 @@ def get_sequence_number(): def get_next_snapshot_timestamp(sequence_number): if variables.has("PREV_SNAPSHOT_TIME"): - return variables.get("PREV_SNAPSHOT_TIME")["value"] + config.SNAPSHOTS_PERIOD_MILLISECONDS + return variables.get("PREV_SNAPSHOT_TIME")["value"] * 1000 + config.SNAPSHOTS_PERIOD_MILLISECONDS else: url = config.MIRROR_NODE_URL.format( topic_id=config.TOPIC_ID, sequence_number=sequence_number, limit=1 @@ -122,7 +122,7 @@ def get_next_snapshot_timestamp(sequence_number): int(timestamp / config.SNAPSHOTS_PERIOD_MILLISECONDS) * config.SNAPSHOTS_PERIOD_MILLISECONDS ) variables.insert( - {"_key": "PREV_SNAPSHOT_TIME", "value": prev_snapshot_timestamp} + {"_key": "PREV_SNAPSHOT_TIME", "value": int(prev_snapshot_timestamp / 1000)} ) return prev_snapshot_timestamp + config.SNAPSHOTS_PERIOD_MILLISECONDS @@ -144,7 +144,7 @@ def main(): for i, message in enumerate(messages): consensus_timestamp = int(float(message["consensus_timestamp"]) * 1000) if next_snapshot_timestamp <= consensus_timestamp: - save_snapshot(next_snapshot_timestamp) + save_snapshot(int(next_snapshot_timestamp / 1000)) while next_snapshot_timestamp <= consensus_timestamp: next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_MILLISECONDS print(f"next snapshot timestamp will be {next_snapshot_timestamp}") @@ -157,7 +157,7 @@ def main(): allowed_delay = min(config.SNAPSHOTS_PERIOD_MILLISECONDS / 2, 60 * 1000) if len(messages) == 0 and now > next_snapshot_timestamp + allowed_delay: - save_snapshot(next_snapshot_timestamp) + save_snapshot(int(next_snapshot_timestamp / 1000)) next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_MILLISECONDS From ccdd143967e3ba66b2ae9368edc5ae22ea1bc886 Mon Sep 17 00:00:00 2001 From: abramsymons Date: Thu, 20 Nov 2025 13:31:20 +0330 Subject: [PATCH 13/14] use seconds instead of milliseconds --- config.env | 2 +- consensus/config.py | 2 +- consensus/receiver.py | 24 ++++++++++++------------ 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/config.env b/config.env index 688897da..16b46152 100644 --- a/config.env +++ b/config.env @@ -9,7 +9,7 @@ BN_ARANGO_HOST=localhost BN_ARANGO_PORT=8529 BN_ARANGO_SERVER_ENDPOINT=tcp://127.0.0.1:8529 BN_CONSENSUS_MAX_DATA_SIZE=1000 -BN_CONSENSUS_SNAPSHOTS_PERIOD_MILLISECONDS=1200000 +BN_CONSENSUS_SNAPSHOTS_PERIOD_SECONDS=1200 BN_CONSENSUS_APPLY_URL=/_db/_system/apply{v}/operations/{hash} BN_CONSENSUS_MIRROR_NODE_URL=https://testnet.mirrornode.hedera.com/api/v1/topics/{topic_id}/messages?sequencenumber={sequence_number}&limit={limit} BN_CONSENSUS_NETWORK=testnet diff --git a/consensus/config.py b/consensus/config.py index fa3b0582..967a876c 100755 --- a/consensus/config.py +++ b/consensus/config.py @@ -31,7 +31,7 @@ MAX_DATA_SIZE = int(os.environ["BN_CONSENSUS_MAX_DATA_SIZE"]) -SNAPSHOTS_PERIOD_MILLISECONDS = int(os.environ["BN_CONSENSUS_SNAPSHOTS_PERIOD_MILLISECONDS"]) +SNAPSHOTS_PERIOD_SECONDS = int(os.environ["BN_CONSENSUS_SNAPSHOTS_PERIOD_SECONDS"]) SNAPSHOTS_PATH = "/snapshots/dump_{}" BN_ARANGO_PROTOCOL = os.environ["BN_ARANGO_PROTOCOL"] diff --git a/consensus/receiver.py b/consensus/receiver.py index 9e401a43..37fd8ea3 100755 --- a/consensus/receiver.py +++ b/consensus/receiver.py @@ -107,7 +107,7 @@ def get_sequence_number(): def get_next_snapshot_timestamp(sequence_number): if variables.has("PREV_SNAPSHOT_TIME"): - return variables.get("PREV_SNAPSHOT_TIME")["value"] * 1000 + config.SNAPSHOTS_PERIOD_MILLISECONDS + return variables.get("PREV_SNAPSHOT_TIME")["value"] + config.SNAPSHOTS_PERIOD_SECONDS else: url = config.MIRROR_NODE_URL.format( topic_id=config.TOPIC_ID, sequence_number=sequence_number, limit=1 @@ -117,14 +117,14 @@ def get_next_snapshot_timestamp(sequence_number): if len(messages) == 0: raise Exception("no genesis message! consensus receiver stopped ...") - timestamp = int(float(messages[0]["consensus_timestamp"]) * 1000) + timestamp = float(messages[0]["consensus_timestamp"]) prev_snapshot_timestamp = ( - int(timestamp / config.SNAPSHOTS_PERIOD_MILLISECONDS) * config.SNAPSHOTS_PERIOD_MILLISECONDS + int(timestamp / config.SNAPSHOTS_PERIOD_SECONDS) * config.SNAPSHOTS_PERIOD_SECONDS ) variables.insert( - {"_key": "PREV_SNAPSHOT_TIME", "value": int(prev_snapshot_timestamp / 1000)} + {"_key": "PREV_SNAPSHOT_TIME", "value": prev_snapshot_timestamp} ) - return prev_snapshot_timestamp + config.SNAPSHOTS_PERIOD_MILLISECONDS + return prev_snapshot_timestamp + config.SNAPSHOTS_PERIOD_SECONDS def main(): @@ -142,23 +142,23 @@ def main(): messages = r.json()["messages"] for i, message in enumerate(messages): - consensus_timestamp = int(float(message["consensus_timestamp"]) * 1000) + consensus_timestamp = float(message["consensus_timestamp"]) if next_snapshot_timestamp <= consensus_timestamp: - save_snapshot(int(next_snapshot_timestamp / 1000)) + save_snapshot(next_snapshot_timestamp) while next_snapshot_timestamp <= consensus_timestamp: - next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_MILLISECONDS + next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_SECONDS print(f"next snapshot timestamp will be {next_snapshot_timestamp}") process(message) sequence_number = message["sequence_number"] variables.update({"_key": "SEQUENCE_NUMBER", "value": sequence_number}) - now = time.time() * 1000 - allowed_delay = min(config.SNAPSHOTS_PERIOD_MILLISECONDS / 2, 60 * 1000) + now = time.time() + allowed_delay = min(config.SNAPSHOTS_PERIOD_SECONDS / 2, 60) if len(messages) == 0 and now > next_snapshot_timestamp + allowed_delay: - save_snapshot(int(next_snapshot_timestamp / 1000)) - next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_MILLISECONDS + save_snapshot(next_snapshot_timestamp) + next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_SECONDS def wait(): From b1a99a7f41cf845efcf763722eac75976d45a07e Mon Sep 17 00:00:00 2001 From: abramsymons Date: Fri, 21 Nov 2025 09:36:05 +0330 Subject: [PATCH 14/14] enable skipping unused snapshots --- consensus/receiver.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/consensus/receiver.py b/consensus/receiver.py index 37fd8ea3..a44a203b 100755 --- a/consensus/receiver.py +++ b/consensus/receiver.py @@ -145,9 +145,9 @@ def main(): consensus_timestamp = float(message["consensus_timestamp"]) if next_snapshot_timestamp <= consensus_timestamp: save_snapshot(next_snapshot_timestamp) - while next_snapshot_timestamp <= consensus_timestamp: - next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_SECONDS - print(f"next snapshot timestamp will be {next_snapshot_timestamp}") + while next_snapshot_timestamp <= consensus_timestamp: + next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_SECONDS + print(f"next snapshot timestamp will be {next_snapshot_timestamp}") process(message) sequence_number = message["sequence_number"] @@ -156,9 +156,11 @@ def main(): now = time.time() allowed_delay = min(config.SNAPSHOTS_PERIOD_SECONDS / 2, 60) - if len(messages) == 0 and now > next_snapshot_timestamp + allowed_delay: + if len(messages) == 0 and next_snapshot_timestamp + allowed_delay < now: save_snapshot(next_snapshot_timestamp) - next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_SECONDS + while next_snapshot_timestamp + allowed_delay < now: + next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_SECONDS + print(f"next snapshot timestamp will be {next_snapshot_timestamp}") def wait():