Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 65 additions & 20 deletions rpc/rpc_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/usr/bin/python3

import sys

import requests
import json


MAINNET_RPC_URL = "https://mainnetrpc.num.network"
Expand All @@ -11,54 +12,98 @@


def test_connectivity(rpc_url):
"""Test that the RPC endpoint is reachable and returns HTTP 200."""
try:
response = requests.get(rpc_url)
if response.status_code == 200:
print("Node is reachable, HTTP Status Code: 200")
else:
print(f"Node is not reachable, HTTP Status Code: {response.status_code}")
assert response.status_code == 200, (
f"Node is not reachable, HTTP Status Code: {response.status_code}"
)
print(f"Node is reachable, HTTP Status Code: {response.status_code}")
except AssertionError:
raise
except Exception as e:
print(f"Connection failed, Error: {str(e)}")
raise AssertionError(f"Connection failed, Error: {str(e)}") from e


def test_functionality(rpc_url):
"""Test that the RPC endpoint handles a web3_clientVersion request successfully."""
payload = {
"jsonrpc": "2.0",
"method": "web3_clientVersion",
"params": [],
"id": 1
"id": 1,
}

try:
response = requests.post(rpc_url, json=payload)
assert response.status_code == 200, (
f"Unexpected HTTP status code: {response.status_code}"
)
response_data = response.json()
assert "error" not in response_data, (
f"RPC returned an error: {response_data['error']}"
)
assert "result" in response_data, (
f"RPC response missing 'result' field: {response_data}"
)
print("RPC request successful, Response Data:", response_data)
except AssertionError:
raise
except Exception as e:
print(f"RPC request failed, Error: {str(e)}")
raise AssertionError(f"RPC request failed, Error: {str(e)}") from e


def test_chain_id(rpc_url, chain_id):
def test_chain_id(rpc_url, expected_chain_id):
"""Test that the RPC endpoint returns the expected chain ID."""
payload = {
"jsonrpc": "2.0",
"method": "eth_chainId",
"id": chain_id,
"id": 1,
}

try:
response = requests.post(rpc_url, json=payload)
assert response.status_code == 200, (
f"Unexpected HTTP status code: {response.status_code}"
)
response_data = response.json()
assert "error" not in response_data, (
f"RPC returned an error: {response_data['error']}"
)
assert "result" in response_data, (
f"RPC response missing 'result' field: {response_data}"
)
returned_chain_id = int(response_data["result"], 16)
assert returned_chain_id == expected_chain_id, (
f"Chain ID mismatch: expected {expected_chain_id}, got {returned_chain_id}"
)
print("RPC request successful, Response Data:", response_data)
except AssertionError:
raise
except Exception as e:
print(f"RPC request failed, Error: {str(e)}")
raise AssertionError(f"RPC request failed, Error: {str(e)}") from e


def run_tests(rpc_url, chain_id, network_name):
"""Run all tests for the given network, returning True if all pass."""
print(f"Testing {network_name} RPC")
failed = False
for test_fn, args in [
(test_connectivity, (rpc_url,)),
(test_functionality, (rpc_url,)),
(test_chain_id, (rpc_url, chain_id)),
]:
try:
test_fn(*args)
except AssertionError as e:
print(f"FAIL [{test_fn.__name__}]: {e}")
failed = True
return not failed


if __name__ == '__main__':
print("Testing Mainnet RPC")
test_connectivity(MAINNET_RPC_URL)
test_functionality(MAINNET_RPC_URL)
test_chain_id(MAINNET_RPC_URL, MAINNET_CHAIN_ID)

print("Testing Testnet RPC")
test_connectivity(TESTNET_RPC_URL)
test_functionality(TESTNET_RPC_URL)
test_chain_id(TESTNET_RPC_URL, TESTNET_CHAIN_ID)
results = [
run_tests(MAINNET_RPC_URL, MAINNET_CHAIN_ID, "Mainnet"),
run_tests(TESTNET_RPC_URL, TESTNET_CHAIN_ID, "Testnet"),
]
sys.exit(0 if all(results) else 1)
102 changes: 88 additions & 14 deletions rpc/websocket_test.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,111 @@
import argparse
import asyncio
import json
import sys
import time

from websockets import connect


TESTNET_RPC_WS_ENDPOINT='wss://testnetrpc.num.network/ws'
MAINNET_RPC_WS_ENDPOINT='wss://mainnetrpc.num.network/ws'
TESTNET_RPC_WS_ENDPOINT = 'wss://testnetrpc.num.network/ws'
MAINNET_RPC_WS_ENDPOINT = 'wss://mainnetrpc.num.network/ws'

DEFAULT_TIMEOUT = 120
MIN_EVENTS = 1


async def get_event(rpc_ws_endpoint, total_timeout=DEFAULT_TIMEOUT, min_events=MIN_EVENTS):
"""
Subscribe to Ethereum log events over a WebSocket connection.

Listens for events until at least ``min_events`` have been received or
``total_timeout`` seconds have elapsed since the function started.

Returns True if the minimum number of events was received, False otherwise.
"""
events_received = 0
deadline = time.monotonic() + total_timeout

async def get_event(rpc_ws_endpoint):
async with connect(rpc_ws_endpoint) as ws:
await ws.send('{"jsonrpc":"2.0", "id": 1, "method": "eth_subscribe", "params": ["logs", {"topics": []}]}')
await ws.send(
'{"jsonrpc":"2.0", "id": 1, "method": "eth_subscribe", "params": ["logs", {"topics": []}]}'
)
subscription_response = await ws.recv()
sub_data = json.loads(subscription_response)
assert "error" not in sub_data, (
f"Subscription error from {rpc_ws_endpoint}: {sub_data.get('error')}"
)
print(f'Testing WS endpoint {rpc_ws_endpoint}')
print(subscription_response)

while True:
while events_received < min_events:
remaining = deadline - time.monotonic()
if remaining <= 0:
print(
f"Timeout reached for {rpc_ws_endpoint} after {total_timeout}s "
f"({events_received}/{min_events} events received)."
)
return False
try:
message = await asyncio.wait_for(ws.recv(), timeout=60)
message = await asyncio.wait_for(ws.recv(), timeout=remaining)
print(json.loads(message))
events_received += 1
except asyncio.TimeoutError:
print(f"Timeout, no messages received for 60 seconds.")
print(
f"Timeout reached for {rpc_ws_endpoint} after {total_timeout}s "
f"({events_received}/{min_events} events received)."
)
return False
except Exception as e:
print(f'Error: {e}')
break # This will only break the loop if there is an exception that's not a TimeoutError.
print(f'Error receiving message from {rpc_ws_endpoint}: {e}')
return False

print(
f"Success: received {events_received} event(s) from {rpc_ws_endpoint}."
)
return True

async def main():
await asyncio.gather(
get_event(rpc_ws_endpoint=TESTNET_RPC_WS_ENDPOINT),
get_event(rpc_ws_endpoint=MAINNET_RPC_WS_ENDPOINT)

async def main(total_timeout=DEFAULT_TIMEOUT, min_events=MIN_EVENTS):
results = await asyncio.gather(
get_event(
rpc_ws_endpoint=TESTNET_RPC_WS_ENDPOINT,
total_timeout=total_timeout,
min_events=min_events,
),
get_event(
rpc_ws_endpoint=MAINNET_RPC_WS_ENDPOINT,
total_timeout=total_timeout,
min_events=min_events,
),
return_exceptions=True,
)
all_passed = all(r is True for r in results)
for endpoint, result in zip(
[TESTNET_RPC_WS_ENDPOINT, MAINNET_RPC_WS_ENDPOINT], results
):
if isinstance(result, Exception):
print(f"FAIL [{endpoint}]: {result}")
elif not result:
print(f"FAIL [{endpoint}]: did not receive enough events within timeout.")
return all_passed


if __name__ == '__main__':
asyncio.run(main())
parser = argparse.ArgumentParser(description="WebSocket RPC subscription test.")
parser.add_argument(
"--timeout",
type=int,
default=DEFAULT_TIMEOUT,
help=f"Total timeout in seconds for each endpoint (default: {DEFAULT_TIMEOUT})",
)
parser.add_argument(
"--min-events",
type=int,
default=MIN_EVENTS,
help=f"Minimum number of events to receive per endpoint (default: {MIN_EVENTS})",
)
args = parser.parse_args()

passed = asyncio.run(main(total_timeout=args.timeout, min_events=args.min_events))
sys.exit(0 if passed else 1)
Loading