Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions config.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ BN_ARANGO_HOST=localhost
# Update this port in web/brightid-nginx.conf and docker-compose.yml too
BN_ARANGO_PORT=8529
BN_ARANGO_SERVER_ENDPOINT=tcp://127.0.0.1:8529
BN_CONSENSUS_INFURA_URL=wss://idchain.one/ws/
# a tx with 100 KB data spends around 1.6M gas where gas limit per block is 8M
# https://bit.ly/3sKDux7
BN_CONSENSUS_MAX_DATA_SIZE=100000
BN_CONSENSUS_MAX_DATA_SIZE=1000
BN_CONSENSUS_SNAPSHOTS_PERIOD_SECONDS=1200
BN_CONSENSUS_APPLY_URL=/_db/_system/apply{v}/operations/{hash}
BN_CONSENSUS_MIRROR_NODE_URL=https://testnet.mirrornode.hedera.com/api/v1/topics/{topic_id}/messages?sequencenumber={sequence_number}&limit={limit}
BN_CONSENSUS_NETWORK=testnet
BN_CONSENSUS_TOPIC_ID=0.0.7217823
BN_CONSENSUS_DUMP_URL=/_api/replication/dump

# following BN_CONSENSUS variables should be removed in the next update
BN_CONSENSUS_GAS=2000000
BN_CONSENSUS_GAS_PRICE=10000000000
BN_CONSENSUS_TO_ADDRESS=0xb1d1CDd5C4C541f95A73b5748392A6990cBe32b7
BN_CONSENSUS_SNAPSHOTS_PERIOD=240
BN_CONSENSUS_APPLY_URL=/_db/_system/apply{v}/operations/{hash}
BN_CONSENSUS_DUMP_URL=/_api/replication/dump
BN_CONSENSUS_IDCHAIN_RPC_URL=https://idchain.one/rpc/
BN_CONSENSUS_INFURA_URL=wss://idchain.one/ws/

BN_UPDATER_SEED_VOTING_ADDRESS=0x56741DbC203648983c359A48aaf68f25f5550B6a
BN_UPDATER_SP_ADDRESS_MAINNET=0x0aB346a16ceA1B1363b20430C414eAB7bC179324
BN_UPDATER_SP_ADDRESS_IDCHAIN=0x183C5D2d1E43A3aCC8a977023796996f8AFd2327
Expand All @@ -28,9 +33,14 @@ BN_UPDATER_SEED_GROUPS_WS_URL=wss://idchain.one/ws/
BN_ARANGO_EXTRA_OPTS=
BN_DEVELOPMENT=false
BN_PEERS=

# passwords
BN_SEED=
BN_WS_PRIVATE_KEY=
BN_WS_ETH_PRIVATE_KEY=
BN_WS_WISCHNORR_PASSWORD=
BN_CONSENSUS_PRIVATE_KEY=
BN_CONSENSUS_OPERATOR_ID=
BN_CONSENSUS_OPERATOR_KEY=

# this variable should be removed in the next update
BN_CONSENSUS_PRIVATE_KEY=
8 changes: 4 additions & 4 deletions consensus/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# 1st stage
FROM python:3.7 as builder
FROM python:3.12 as builder

ADD . /code
WORKDIR /code/
# Install with --user prefix so all installed packages are easy to copy in next stage
RUN pip3 install --user -r requirements.txt

# 2nd stage
FROM python:3.7-slim as runner
FROM python:3.12-slim as runner
ADD . /code
WORKDIR /code/
ADD https://download.arangodb.com/arangodb36/Community/Linux/arangodb3-client_3.6.4-1_amd64.deb ./
RUN dpkg -i arangodb3-client_3.6.4-1_amd64.deb && rm arangodb3-client_3.6.4-1_amd64.deb
ADD https://download.arangodb.com/arangodb312/DEBIAN/amd64/arangodb3-client_3.12.0-1_amd64.deb ./
RUN dpkg -i arangodb3-client_3.12.0-1_amd64.deb && rm arangodb3-client_3.12.0-1_amd64.deb
COPY docker-entrypoint.sh /usr/local/bin/
# Copy installed packages from 1st stage
COPY --from=builder /root/.local /root/.local
Expand Down
19 changes: 14 additions & 5 deletions consensus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from eth_keys import keys
from eth_utils import decode_hex

## following variables should be removed in the next update ##
INFURA_URL = os.environ["BN_CONSENSUS_INFURA_URL"]
PRIVATE_KEY = os.environ.get("BN_CONSENSUS_PRIVATE_KEY")
SEED = os.environ.get("BN_SEED")
Expand All @@ -13,14 +14,24 @@
if PRIVATE_KEY
else ""
)

MAX_DATA_SIZE = int(os.environ["BN_CONSENSUS_MAX_DATA_SIZE"])
GAS = int(os.environ["BN_CONSENSUS_GAS"])
GAS_PRICE = int(os.environ["BN_CONSENSUS_GAS_PRICE"])
TO_ADDRESS = os.environ["BN_CONSENSUS_TO_ADDRESS"]
DEPRECATED_TO_ADDRESS = "0x0000000000000000000000000000000000000007"
IDCHAIN_RPC_URL = os.environ["BN_CONSENSUS_IDCHAIN_RPC_URL"]
SNAPSHOTS_PERIOD = os.environ["BN_CONSENSUS_SNAPSHOTS_PERIOD"]
##############################################################

NETWORK = os.environ["BN_CONSENSUS_NETWORK"]
TOPIC_ID = os.environ["BN_CONSENSUS_TOPIC_ID"]

OPERATOR_ID = os.environ["BN_CONSENSUS_OPERATOR_ID"]
OPERATOR_KEY = os.environ["BN_CONSENSUS_OPERATOR_KEY"]
MIRROR_NODE_URL = os.environ["BN_CONSENSUS_MIRROR_NODE_URL"]

SNAPSHOTS_PERIOD = int(os.environ["BN_CONSENSUS_SNAPSHOTS_PERIOD"])
MAX_DATA_SIZE = int(os.environ["BN_CONSENSUS_MAX_DATA_SIZE"])

SNAPSHOTS_PERIOD_SECONDS = int(os.environ["BN_CONSENSUS_SNAPSHOTS_PERIOD_SECONDS"])
SNAPSHOTS_PATH = "/snapshots/dump_{}"

BN_ARANGO_PROTOCOL = os.environ["BN_ARANGO_PROTOCOL"]
Expand All @@ -30,5 +41,3 @@

APPLY_URL = ARANGO_SERVER + os.environ["BN_CONSENSUS_APPLY_URL"]
DUMP_URL = ARANGO_SERVER + os.environ["BN_CONSENSUS_DUMP_URL"]

IDCHAIN_RPC_URL = os.environ["BN_CONSENSUS_IDCHAIN_RPC_URL"]
142 changes: 75 additions & 67 deletions consensus/receiver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
import math
import socket
import json
import base64
Expand All @@ -8,20 +9,13 @@
import requests
import traceback
from arango import ArangoClient, errno
from web3 import Web3
from web3.middleware import geth_poa_middleware
import config

db = ArangoClient(hosts=config.ARANGO_SERVER).db("_system")
w3 = Web3(Web3.WebsocketProvider(config.INFURA_URL))
if config.INFURA_URL.count("rinkeby") > 0 or config.INFURA_URL.count("idchain") > 0:
w3.middleware_onion.inject(geth_poa_middleware, layer=0)

NUM_SEALERS = 0
variables = db.collection("variables")


def hash(op):
blockTime = op["blockTime"]
op = {
k: op[k]
for k in op
Expand All @@ -30,8 +24,8 @@ def hash(op):
if op["name"] == "Set Signing Key":
del op["id1"]
del op["id2"]
# in next release checking blockTime should be removed
if op["name"] == "Social Recovery" and op["v"] == 6 and blockTime > 1637380189000:

if op["name"] == "Social Recovery" and op["v"] == 6:
for k in ["id1", "id2", "id3", "id4", "id5"]:
op.pop(k, None)
message = json.dumps(op, sort_keys=True, separators=(",", ":"))
Expand All @@ -41,19 +35,20 @@ def hash(op):
return h.replace("+", "-").replace("/", "_").replace("=", "")


def process(data, block_timestamp):
data_bytes = bytes.fromhex(data.strip("0x"))
data_str = data_bytes.decode("utf-8", "ignore")
def process(message):
encoded_message = message.get("message", "")
message_content = base64.b64decode(encoded_message).decode("utf-8").strip()

try:
operations = json.loads(data_str)
operations = json.loads(message_content)
except ValueError:
print("error in parsing operations", data_str)
print("error in parsing operations", message_content)
return
for op in operations:
if type(op) is not dict or op.get("v") not in (5, 6) or "name" not in op:
print("invalid operation", op)
continue
op["blockTime"] = block_timestamp * 1000
op["blockTime"] = int(float(message["consensus_timestamp"]) * 1000)
process_op(op)


Expand All @@ -74,8 +69,8 @@ def process_op(op):
raise Exception("Error from apply service")


def save_snapshot(block):
dir_name = config.SNAPSHOTS_PATH.format(block)
def save_snapshot(next_snapshot_timestamp):
dir_name = config.SNAPSHOTS_PATH.format(next_snapshot_timestamp)
fnl_dir_name = f"{dir_name}_fnl"
dir_path = os.path.dirname(os.path.realpath(__file__))
collections_file = os.path.join(dir_path, "collections.json")
Expand All @@ -84,18 +79,8 @@ def save_snapshot(block):
)
assert res == 0, "dumping snapshot failed"
shutil.move(dir_name, fnl_dir_name)


def update_num_sealers():
global NUM_SEALERS
data = {"jsonrpc": "2.0", "method": "clique_status", "params": [], "id": 1}
headers = {"Content-Type": "application/json", "Cache-Control": "no-cache"}
try:
resp = requests.post(config.IDCHAIN_RPC_URL, json=data, headers=headers)
NUM_SEALERS = len(resp.json()["result"]["sealerActivity"])
except Exception as e:
print("Error from update_num_sealers", e)
update_num_sealers()
variables.update({"_key": "PREV_SNAPSHOT_TIME", "value": next_snapshot_timestamp})
remove_old_operations()


def remove_old_operations():
Expand All @@ -112,47 +97,70 @@ def remove_old_operations():
)


def get_sequence_number():
if variables.has("SEQUENCE_NUMBER"):
return variables.get("SEQUENCE_NUMBER")["value"]
else:
variables.insert({"_key": "SEQUENCE_NUMBER", "value": 7})
return 7


def get_next_snapshot_timestamp(sequence_number):
if variables.has("PREV_SNAPSHOT_TIME"):
return variables.get("PREV_SNAPSHOT_TIME")["value"] + config.SNAPSHOTS_PERIOD_SECONDS
else:
url = config.MIRROR_NODE_URL.format(
topic_id=config.TOPIC_ID, sequence_number=sequence_number, limit=1
)
r = requests.get(url)
messages = r.json()["messages"]
if len(messages) == 0:
raise Exception("no genesis message! consensus receiver stopped ...")

timestamp = float(messages[0]["consensus_timestamp"])
prev_snapshot_timestamp = (
int(timestamp / config.SNAPSHOTS_PERIOD_SECONDS) * config.SNAPSHOTS_PERIOD_SECONDS
)
variables.insert(
{"_key": "PREV_SNAPSHOT_TIME", "value": prev_snapshot_timestamp}
)
return prev_snapshot_timestamp + config.SNAPSHOTS_PERIOD_SECONDS


def main():
update_num_sealers()
variables = db.collection("variables")
last_block = variables.get("LAST_BLOCK")["value"]
sequence_number = get_sequence_number()
next_snapshot_timestamp = get_next_snapshot_timestamp(sequence_number)

while True:
# This sleep is for not calling the ethereum node endpoint
# for getting the last block number more than once per second
time.sleep(1)
current_block = w3.eth.getBlock("latest").number
confirmed_block = current_block - (NUM_SEALERS // 2 + 1)

if confirmed_block > last_block:
# Here we should go to process the block imediately, but there seems
# to be a bug in getBlock that cause error when we get the transactions
# instantly. This delay is added to avoid that error.
# When error is raised, the file will run again and no bad problem occur.
time.sleep(3)

for block_number in range(last_block + 1, confirmed_block + 1):
print("processing block {}".format(block_number))
if block_number % 100 == 0:
update_num_sealers()
block = w3.eth.getBlock(block_number, True)
for i, tx in enumerate(block["transactions"]):
if tx["to"] and tx["to"].lower() in (
config.TO_ADDRESS.lower(),
config.DEPRECATED_TO_ADDRESS.lower(),
):
process(tx["input"], block.timestamp)
if block_number % config.SNAPSHOTS_PERIOD == 0:
save_snapshot(block_number)
# PREV_SNAPSHOT_TIME is used by some verification
# algorithms to filter connections that are made
# after previous processed snapshot
variables.update(
{"_key": "PREV_SNAPSHOT_TIME", "value": block["timestamp"]}
)
remove_old_operations()
variables.update({"_key": "LAST_BLOCK", "value": block_number})
last_block = block_number
url = config.MIRROR_NODE_URL.format(
topic_id=config.TOPIC_ID,
sequence_number="gt:{}".format(sequence_number),
limit=100,
)
r = requests.get(url)

messages = r.json()["messages"]
for i, message in enumerate(messages):
consensus_timestamp = float(message["consensus_timestamp"])
if next_snapshot_timestamp <= consensus_timestamp:
save_snapshot(next_snapshot_timestamp)
while next_snapshot_timestamp <= consensus_timestamp:
next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_SECONDS
print(f"next snapshot timestamp will be {next_snapshot_timestamp}")

process(message)
sequence_number = message["sequence_number"]
variables.update({"_key": "SEQUENCE_NUMBER", "value": sequence_number})

now = time.time()
allowed_delay = min(config.SNAPSHOTS_PERIOD_SECONDS / 2, 60)

if len(messages) == 0 and next_snapshot_timestamp + allowed_delay < now:
save_snapshot(next_snapshot_timestamp)
while next_snapshot_timestamp + allowed_delay < now:
next_snapshot_timestamp += config.SNAPSHOTS_PERIOD_SECONDS
print(f"next snapshot timestamp will be {next_snapshot_timestamp}")


def wait():
Expand Down
5 changes: 3 additions & 2 deletions consensus/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
python-arango==5.4.0
web3==5.0.0
requests
hiero_sdk_python==0.1.7
web3==6.20.4
requests
Loading