feat(communication): Agent Mesh Network — comunicação P2P descentralizada#591
feat(communication): Agent Mesh Network — comunicação P2P descentralizada#591nikolasdehor wants to merge 1 commit intoSynkraAI:mainfrom
Conversation
…tre agentes Implementa modulo de comunicacao peer-to-peer para agentes se descobrirem, formarem grupos ad-hoc e rotearem mensagens atraves da mesh sem orquestrador central. Funcionalidades: peer discovery, direct messaging, broadcast/topics, roteamento multi-hop (BFS), heartbeat/pruning, message queue com TTL, rate limiting (token bucket), deteccao de particoes (DFS), persistencia em disco e metricas de rede.
|
@nikolasdehor is attempting to deploy a commit to the Pedro Valério Lopez's projects Team on Vercel. A member of the Team first needs to authorize it. |
WalkthroughThis PR introduces a comprehensive Agent Mesh Network implementation for peer-to-peer communication, featuring peer management, BFS-based message routing, pub/sub messaging, request/response patterns, queue management, health monitoring, and disk persistence, along with a backward-compatibility wrapper module and extensive test coverage. Changes
Sequence DiagramssequenceDiagram
participant Peer1
participant Mesh as AgentMeshNetwork
participant Peer2
participant Peer3
Peer1->>Mesh: join()
Mesh->>Mesh: Create bidirectional adjacency
Mesh->>Peer1: Auto-subscribe to topics
Peer2->>Mesh: join()
Mesh->>Mesh: Create bidirectional adjacency
Mesh->>Peer2: Auto-subscribe to topics
Peer1->>Mesh: send(message to Peer3)
Mesh->>Mesh: BFS shortest path<br/>(Peer1 → Peer2 → Peer3)
Mesh->>Peer2: Route message
Mesh->>Peer3: Deliver message
Peer3->>Peer3: Emit receive event
sequenceDiagram
participant Publisher as Peer1<br/>(Publisher)
participant Mesh as AgentMeshNetwork
participant Sub1 as Peer2<br/>(Subscriber)
participant Sub2 as Peer3<br/>(Subscriber)
Publisher->>Mesh: broadcast(topic='orders', data)
Mesh->>Mesh: Filter subscribers<br/>by topic
Mesh->>Sub1: Emit message event<br/>(topic='orders')
Mesh->>Sub2: Emit message event<br/>(topic='orders')
Sub1->>Sub1: Handle message
Sub2->>Sub2: Handle message
sequenceDiagram
participant Client as Requester<br/>(Peer1)
participant Mesh as AgentMeshNetwork
participant Server as Responder<br/>(Peer2)
Client->>Mesh: request(targetPeer, data)
Mesh->>Mesh: Create pendingRequest<br/>with timeout
Mesh->>Server: Send request message
Server->>Mesh: reply(requestId, response)
Mesh->>Mesh: Resolve pending<br/>request promise
Mesh->>Client: Return response data
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (1)
.aios-core/core/communication/agent-mesh-network.js (1)
1-2: Use the repo's absolute import convention for this compatibility shim.Hardcoding a three-level relative
require()makes this re-export brittle to future moves and diverges from the repository's JS import rule. Please resolve the implementation through the package/root alias used elsewhere. As per coding guidelines, "Use absolute imports instead of relative imports in all code".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.aios-core/core/communication/agent-mesh-network.js around lines 1 - 2, Replace the brittle three-level relative require in the compatibility shim by importing via the repository's absolute/package root alias (instead of "../../../.aiox-core/..."); update the module.exports line to require the core module using the project alias used elsewhere (e.g., the package/root alias that maps to the .aiox-core package) so module.exports still re-exports the same module (agent-mesh-network) but via the absolute import convention.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.aiox-core/core/communication/agent-mesh-network.js:
- Around line 155-172: The join(agentId, meta = {}) function currently converts
meta.topics and meta.capabilities without validating types, which allows strings
like "deploy" to become sets/arrays of characters; add input validation in join
to explicitly require meta.topics to be an array (of strings) when provided and
meta.capabilities to be an array (of strings) when provided, throwing a
descriptive Error if they are malformed; only construct topics as new
Set(meta.topics) and capabilities as meta.capabilities (or []) after validation
so the peer object (id, capabilities, topics, state, joinedAt, lastSeen,
messageCount) is never populated with corrupted values.
- Around line 366-389: The broadcast() loop currently delivers directly to every
toId with PeerState.ACTIVE, bypassing routing/partition checks; change it to use
the same route-or-queue logic as send(): for each target (from the peers
map/targets array) perform the routing/path check used by send() (or call the
send() helper that encapsulates that logic) and only call _deliverMessage(msg)
when a valid route exists, otherwise call _enqueueMessage(toId, msg); make sure
to reuse the same symbols/behavior as send(), _deliverMessage, _enqueueMessage
and PeerState.ACTIVE so broadcasts respect mesh partitions and queued delivery
semantics.
- Around line 846-855: The save/load flow currently serializes only peer records
and then calls join(peerId, ...) which recreates peers by connecting them into
the active mesh and ignores saved topology and per-peer state; update save() to
serialize the mesh topology (peer adjacency/links, routing/multi-hop entries,
and each peer's saved state like offline/timeout flags) and update load() to
reconstruct Peer instances and routing tables from that data without invoking
join() for each peer; instead, restore connections by reinstating saved
adjacency/routing entries and applying saved state to each Peer (use existing
peer map this.peers, any routing tables, and state setters like setPeerState or
equivalent) so partitions, multi-hop routes, and offline/timeout statuses are
preserved after restart.
- Around line 654-675: getNetworkHealth() and getMeshStats() currently call
detectPartitions(), which has side-effects (increments partitionsDetected and
emits 'partition-detected'), so mere reads inflate counters and spam listeners;
change detectPartitions() to support a non-mutating mode (e.g., add a parameter
like emitEvents = true or create a pure helper detectPartitionsSnapshot()) that
returns the partition list without incrementing partitionsDetected or emitting
events, and update getNetworkHealth() and getMeshStats() to call this
non-mutating variant so reads do not change state or emit events (keep existing
behavior for callers that need real detection side-effects).
- Around line 910-923: _deliverMessage currently increments messageCount and
emits MESSAGE_RECEIVED but never refreshes the recipient peer's lastSeen or
triggers processing of any queued messages, so reactivated peers remain TIMEOUT
with stuck backlogs; update _deliverMessage to set toPeer.lastSeen = Date.now()
(or call the existing method that marks peers seen) when toPeer exists and,
after marking active, call this._drainQueue(toPeer) (or the generic _drainQueue
method) to flush queued messages; make the same adjustments in the other
message-delivery paths referenced (the similar blocks around the other delivery
sites) so every code path that delivers toPeer also refreshes lastSeen and
invokes _drainQueue for that peer.
- Around line 89-141: The constructor currently ignores
DEFAULT_OPTIONS.autoStart so automatic heartbeat never starts; update the
constructor (in agent-mesh-network.js) to inspect this.options.autoStart after
options merge and, if truthy, call the existing startHeartbeat() method and set
the _started flag (and ensure _heartbeatTimer is managed by startHeartbeat());
this will ensure periodic pruning/TTL expiry runs automatically for new Mesh
instances without requiring callers to manually invoke startHeartbeat().
- Around line 420-444: The request() implementation registers the pending
request after calling send(), so if delivery is synchronous and the receiver
replies immediately the pendingRequests entry won't exist; fix by generating or
capturing the request id up-front, call this.pendingRequests.set(requestId,
{resolve, reject, timer}) before invoking this.send(fromId, toId, message, {
type: MessageType.REQUEST }), and ensure the timeout/cleanup uses that same
requestId (instead of sent.id); keep using this.options.requestTimeout and
preserve the resolve/reject handlers and timer logic from request().
In `@tests/core/communication/agent-mesh-network.test.js`:
- Around line 11-17: The test currently imports the implementation directly from
the internal .aiox-core path; change the import to use the public compatibility
entrypoint (require('.aios-core')) so the suite covers the compatibility
wrapper. Replace the existing
require('../../../.aiox-core/core/communication/agent-mesh-network') with a
require('.aios-core') (or require('.aios-core').core/communication if the
package exposes nested exports) and then pull AgentMeshNetwork and its exports
(MessageType, PeerState, MeshEvent, DEFAULT_OPTIONS) from that module so the
test verifies the public compatibility path is exercised.
---
Nitpick comments:
In @.aios-core/core/communication/agent-mesh-network.js:
- Around line 1-2: Replace the brittle three-level relative require in the
compatibility shim by importing via the repository's absolute/package root alias
(instead of "../../../.aiox-core/..."); update the module.exports line to
require the core module using the project alias used elsewhere (e.g., the
package/root alias that maps to the .aiox-core package) so module.exports still
re-exports the same module (agent-mesh-network) but via the absolute import
convention.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: c5aa4d29-1676-48c3-b207-98444f5c148a
📒 Files selected for processing (4)
.aios-core/core/communication/agent-mesh-network.js.aiox-core/core/communication/agent-mesh-network.js.aiox-core/install-manifest.yamltests/core/communication/agent-mesh-network.test.js
| constructor(projectRoot, options = {}) { | ||
| super(); | ||
|
|
||
| this.projectRoot = projectRoot ?? process.cwd(); | ||
| this.options = { | ||
| ...DEFAULT_OPTIONS, | ||
| ...options, | ||
| rateLimit: { | ||
| ...DEFAULT_OPTIONS.rateLimit, | ||
| ...(options.rateLimit ?? {}), | ||
| }, | ||
| }; | ||
|
|
||
| /** @type {Map<string, Object>} Mapa de peers registrados */ | ||
| this.peers = new Map(); | ||
|
|
||
| /** @type {Map<string, Set<string>>} Lista de adjacencia do grafo */ | ||
| this.adjacency = new Map(); | ||
|
|
||
| /** @type {Map<string, Set<string>>} Mapa de topico -> subscribers */ | ||
| this.topics = new Map(); | ||
|
|
||
| /** @type {Map<string, Array>} Fila de mensagens por peer */ | ||
| this.queues = new Map(); | ||
|
|
||
| /** @type {Map<string, Object>} Token buckets para rate limiting */ | ||
| this.rateLimiters = new Map(); | ||
|
|
||
| /** @type {Map<string, Object>} Pending requests aguardando resposta */ | ||
| this.pendingRequests = new Map(); | ||
|
|
||
| /** @type {Object} Estatisticas da rede */ | ||
| this.stats = { | ||
| messagesSent: 0, | ||
| messagesReceived: 0, | ||
| messagesDropped: 0, | ||
| messagesBroadcast: 0, | ||
| messagesRouted: 0, | ||
| messagesQueued: 0, | ||
| peersJoined: 0, | ||
| peersLeft: 0, | ||
| peersTimedOut: 0, | ||
| partitionsDetected: 0, | ||
| }; | ||
|
|
||
| /** @type {number|null} Referencia do intervalo de heartbeat */ | ||
| this._heartbeatTimer = null; | ||
|
|
||
| /** @type {Promise} Cadeia de persistencia serializada */ | ||
| this._writeChain = Promise.resolve(); | ||
|
|
||
| this._started = false; | ||
| } |
There was a problem hiding this comment.
autoStart is currently a no-op.
DEFAULT_OPTIONS.autoStart and the constructor JSDoc promise automatic heartbeat startup, but the constructor never calls startHeartbeat(). New meshes will not prune timed-out peers or expire queued TTLs unless every caller remembers to start the timer manually.
🔧 Suggested fix
/** `@type` {Promise} Cadeia de persistencia serializada */
this._writeChain = Promise.resolve();
this._started = false;
+ if (this.options.autoStart) {
+ this.startHeartbeat();
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| constructor(projectRoot, options = {}) { | |
| super(); | |
| this.projectRoot = projectRoot ?? process.cwd(); | |
| this.options = { | |
| ...DEFAULT_OPTIONS, | |
| ...options, | |
| rateLimit: { | |
| ...DEFAULT_OPTIONS.rateLimit, | |
| ...(options.rateLimit ?? {}), | |
| }, | |
| }; | |
| /** @type {Map<string, Object>} Mapa de peers registrados */ | |
| this.peers = new Map(); | |
| /** @type {Map<string, Set<string>>} Lista de adjacencia do grafo */ | |
| this.adjacency = new Map(); | |
| /** @type {Map<string, Set<string>>} Mapa de topico -> subscribers */ | |
| this.topics = new Map(); | |
| /** @type {Map<string, Array>} Fila de mensagens por peer */ | |
| this.queues = new Map(); | |
| /** @type {Map<string, Object>} Token buckets para rate limiting */ | |
| this.rateLimiters = new Map(); | |
| /** @type {Map<string, Object>} Pending requests aguardando resposta */ | |
| this.pendingRequests = new Map(); | |
| /** @type {Object} Estatisticas da rede */ | |
| this.stats = { | |
| messagesSent: 0, | |
| messagesReceived: 0, | |
| messagesDropped: 0, | |
| messagesBroadcast: 0, | |
| messagesRouted: 0, | |
| messagesQueued: 0, | |
| peersJoined: 0, | |
| peersLeft: 0, | |
| peersTimedOut: 0, | |
| partitionsDetected: 0, | |
| }; | |
| /** @type {number|null} Referencia do intervalo de heartbeat */ | |
| this._heartbeatTimer = null; | |
| /** @type {Promise} Cadeia de persistencia serializada */ | |
| this._writeChain = Promise.resolve(); | |
| this._started = false; | |
| } | |
| constructor(projectRoot, options = {}) { | |
| super(); | |
| this.projectRoot = projectRoot ?? process.cwd(); | |
| this.options = { | |
| ...DEFAULT_OPTIONS, | |
| ...options, | |
| rateLimit: { | |
| ...DEFAULT_OPTIONS.rateLimit, | |
| ...(options.rateLimit ?? {}), | |
| }, | |
| }; | |
| /** `@type` {Map<string, Object>} Mapa de peers registrados */ | |
| this.peers = new Map(); | |
| /** `@type` {Map<string, Set<string>>} Lista de adjacencia do grafo */ | |
| this.adjacency = new Map(); | |
| /** `@type` {Map<string, Set<string>>} Mapa de topico -> subscribers */ | |
| this.topics = new Map(); | |
| /** `@type` {Map<string, Array>} Fila de mensagens por peer */ | |
| this.queues = new Map(); | |
| /** `@type` {Map<string, Object>} Token buckets para rate limiting */ | |
| this.rateLimiters = new Map(); | |
| /** `@type` {Map<string, Object>} Pending requests aguardando resposta */ | |
| this.pendingRequests = new Map(); | |
| /** `@type` {Object} Estatisticas da rede */ | |
| this.stats = { | |
| messagesSent: 0, | |
| messagesReceived: 0, | |
| messagesDropped: 0, | |
| messagesBroadcast: 0, | |
| messagesRouted: 0, | |
| messagesQueued: 0, | |
| peersJoined: 0, | |
| peersLeft: 0, | |
| peersTimedOut: 0, | |
| partitionsDetected: 0, | |
| }; | |
| /** `@type` {number|null} Referencia do intervalo de heartbeat */ | |
| this._heartbeatTimer = null; | |
| /** `@type` {Promise} Cadeia de persistencia serializada */ | |
| this._writeChain = Promise.resolve(); | |
| this._started = false; | |
| if (this.options.autoStart) { | |
| this.startHeartbeat(); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/communication/agent-mesh-network.js around lines 89 - 141,
The constructor currently ignores DEFAULT_OPTIONS.autoStart so automatic
heartbeat never starts; update the constructor (in agent-mesh-network.js) to
inspect this.options.autoStart after options merge and, if truthy, call the
existing startHeartbeat() method and set the _started flag (and ensure
_heartbeatTimer is managed by startHeartbeat()); this will ensure periodic
pruning/TTL expiry runs automatically for new Mesh instances without requiring
callers to manually invoke startHeartbeat().
| join(agentId, meta = {}) { | ||
| if (!agentId || typeof agentId !== 'string') { | ||
| throw new Error('agentId is required and must be a string'); | ||
| } | ||
|
|
||
| if (this.peers.has(agentId)) { | ||
| throw new Error(`Peer "${agentId}" already exists in the mesh`); | ||
| } | ||
|
|
||
| const peer = { | ||
| id: agentId, | ||
| capabilities: meta.capabilities ?? [], | ||
| topics: new Set(meta.topics ?? []), | ||
| state: PeerState.ACTIVE, | ||
| joinedAt: Date.now(), | ||
| lastSeen: Date.now(), | ||
| messageCount: 0, | ||
| }; |
There was a problem hiding this comment.
Fail fast on malformed join() metadata.
new Set(meta.topics ?? []) will turn 'deploy' into ['d', 'e', ...], and a string capabilities value later serializes as an array of characters. This silently corrupts mesh state and persistence instead of rejecting bad API input. As per coding guidelines, "Check for proper input validation on public API methods".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/communication/agent-mesh-network.js around lines 155 - 172,
The join(agentId, meta = {}) function currently converts meta.topics and
meta.capabilities without validating types, which allows strings like "deploy"
to become sets/arrays of characters; add input validation in join to explicitly
require meta.topics to be an array (of strings) when provided and
meta.capabilities to be an array (of strings) when provided, throwing a
descriptive Error if they are malformed; only construct topics as new
Set(meta.topics) and capabilities as meta.capabilities (or []) after validation
so the peer object (id, capabilities, topics, state, joinedAt, lastSeen,
messageCount) is never populated with corrupted values.
| for (const toId of targets) { | ||
| const toPeer = this.peers.get(toId); | ||
| if (!toPeer) continue; | ||
|
|
||
| const msg = { | ||
| id: randomUUID(), | ||
| broadcastId: msgId, | ||
| from: fromId, | ||
| to: toId, | ||
| type: MessageType.BROADCAST, | ||
| topic: opts.topic ?? null, | ||
| payload: this._deepClone(message), | ||
| ttl: this.options.messageTTL, | ||
| createdAt: Date.now(), | ||
| hops: [], | ||
| }; | ||
|
|
||
| if (toPeer.state === PeerState.ACTIVE) { | ||
| this._deliverMessage(msg); | ||
| delivered.push(toId); | ||
| } else { | ||
| this._enqueueMessage(toId, msg); | ||
| } | ||
| } |
There was a problem hiding this comment.
broadcast() bypasses routing and partitions.
Every target in PeerState.ACTIVE is delivered directly here, even if there is no adjacency or path from fromId to toId. In a split mesh that leaks broadcasts across disconnected components; per-target delivery needs the same route-or-queue logic as send().
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/communication/agent-mesh-network.js around lines 366 - 389,
The broadcast() loop currently delivers directly to every toId with
PeerState.ACTIVE, bypassing routing/partition checks; change it to use the same
route-or-queue logic as send(): for each target (from the peers map/targets
array) perform the routing/path check used by send() (or call the send() helper
that encapsulates that logic) and only call _deliverMessage(msg) when a valid
route exists, otherwise call _enqueueMessage(toId, msg); make sure to reuse the
same symbols/behavior as send(), _deliverMessage, _enqueueMessage and
PeerState.ACTIVE so broadcasts respect mesh partitions and queued delivery
semantics.
| request(fromId, toId, message, opts = {}) { | ||
| const timeout = opts.timeout ?? this.options.requestTimeout; | ||
|
|
||
| const sent = this.send(fromId, toId, message, { type: MessageType.REQUEST }); | ||
|
|
||
| return new Promise((resolve, reject) => { | ||
| const timer = setTimeout(() => { | ||
| this.pendingRequests.delete(sent.id); | ||
| reject(new Error(`Request to "${toId}" timed out after ${timeout}ms`)); | ||
| }, timeout); | ||
|
|
||
| this.pendingRequests.set(sent.id, { | ||
| resolve: (response) => { | ||
| clearTimeout(timer); | ||
| this.pendingRequests.delete(sent.id); | ||
| resolve(response); | ||
| }, | ||
| reject: (err) => { | ||
| clearTimeout(timer); | ||
| this.pendingRequests.delete(sent.id); | ||
| reject(err); | ||
| }, | ||
| timer, | ||
| }); | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
fd -t f "agent-mesh-network.js" .aiox-core/Repository: SynkraAI/aiox-core
Length of output: 113
🏁 Script executed:
wc -l .aiox-core/core/communication/agent-mesh-network.jsRepository: SynkraAI/aiox-core
Length of output: 118
🏁 Script executed:
cat -n .aiox-core/core/communication/agent-mesh-network.js | sed -n '420,500p'Repository: SynkraAI/aiox-core
Length of output: 3046
🏁 Script executed:
cat -n .aiox-core/core/communication/agent-mesh-network.js | sed -n '1,100p'Repository: SynkraAI/aiox-core
Length of output: 4098
🏁 Script executed:
cat -n .aiox-core/core/communication/agent-mesh-network.js | sed -n '300,420p'Repository: SynkraAI/aiox-core
Length of output: 4430
🏁 Script executed:
cat -n .aiox-core/core/communication/agent-mesh-network.js | sed -n '260,335p'Repository: SynkraAI/aiox-core
Length of output: 2906
🏁 Script executed:
cat -n .aiox-core/core/communication/agent-mesh-network.js | sed -n '700,800p'Repository: SynkraAI/aiox-core
Length of output: 3481
🏁 Script executed:
cat -n .aiox-core/core/communication/agent-mesh-network.js | sed -n '850,950p'Repository: SynkraAI/aiox-core
Length of output: 3255
🏁 Script executed:
cat -n .aiox-core/core/communication/agent-mesh-network.js | sed -n '950,1050p'Repository: SynkraAI/aiox-core
Length of output: 3450
🏁 Script executed:
cat -n .aiox-core/core/communication/agent-mesh-network.js | sed -n '1050,1115p'Repository: SynkraAI/aiox-core
Length of output: 2296
🏁 Script executed:
find .aiox-core -name "*.test.js" -o -name "*.spec.js" | head -20Repository: SynkraAI/aiox-core
Length of output: 653
🏁 Script executed:
find .aiox-core -path "*test*" -o -path "*spec*" | grep -i "mesh\|communication" | head -20Repository: SynkraAI/aiox-core
Length of output: 44
🏁 Script executed:
fd -t f "agent-mesh" .aiox-core/ --type fRepository: SynkraAI/aiox-core
Length of output: 113
🏁 Script executed:
rg -A 5 -B 5 "MESSAGE_RECEIVED" .aiox-core/core/communication/agent-mesh-network.jsRepository: SynkraAI/aiox-core
Length of output: 636
Register pending request before calling send().
The message delivery is synchronous—if the receiver immediately calls reply() from its MESSAGE_RECEIVED handler, the pending request entry won't exist yet and the response will be lost, causing the request to timeout.
Move pendingRequests.set() before send():
Suggested fix
request(fromId, toId, message, opts = {}) {
const timeout = opts.timeout ?? this.options.requestTimeout;
const requestId = randomUUID();
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingRequests.delete(requestId);
reject(new Error(`Request to "${toId}" timed out after ${timeout}ms`));
}, timeout);
this.pendingRequests.set(requestId, {
resolve: (response) => {
clearTimeout(timer);
this.pendingRequests.delete(requestId);
resolve(response);
},
reject: (err) => {
clearTimeout(timer);
this.pendingRequests.delete(requestId);
reject(err);
},
timer,
});
const sent = this.send(fromId, toId, message, { type: MessageType.REQUEST });
});
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/communication/agent-mesh-network.js around lines 420 - 444,
The request() implementation registers the pending request after calling send(),
so if delivery is synchronous and the receiver replies immediately the
pendingRequests entry won't exist; fix by generating or capturing the request id
up-front, call this.pendingRequests.set(requestId, {resolve, reject, timer})
before invoking this.send(fromId, toId, message, { type: MessageType.REQUEST }),
and ensure the timeout/cleanup uses that same requestId (instead of sent.id);
keep using this.options.requestTimeout and preserve the resolve/reject handlers
and timer logic from request().
| getNetworkHealth() { | ||
| const totalPeers = this.peers.size; | ||
| const activePeers = Array.from(this.peers.values()) | ||
| .filter(p => p.state === PeerState.ACTIVE).length; | ||
| const partitions = this.detectPartitions(); | ||
| const totalQueuedMessages = Array.from(this.queues.values()) | ||
| .reduce((sum, q) => sum + q.length, 0); | ||
|
|
||
| const healthScore = totalPeers === 0 ? 1.0 : | ||
| (activePeers / totalPeers) * (partitions.length <= 1 ? 1.0 : 0.5); | ||
|
|
||
| return { | ||
| totalPeers, | ||
| activePeers, | ||
| inactivePeers: totalPeers - activePeers, | ||
| partitionCount: partitions.length, | ||
| partitions, | ||
| totalQueuedMessages, | ||
| healthScore: Math.round(healthScore * 100) / 100, | ||
| timestamp: Date.now(), | ||
| }; | ||
| } |
There was a problem hiding this comment.
Don't let health/stat reads mutate partition metrics.
getNetworkHealth() and getMeshStats() call detectPartitions(), but detectPartitions() increments partitionsDetected and emits partition-detected every time it sees the same split. Any polling dashboard will inflate the counter and spam listeners just by reading stats.
Also applies to: 681-724
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/communication/agent-mesh-network.js around lines 654 - 675,
getNetworkHealth() and getMeshStats() currently call detectPartitions(), which
has side-effects (increments partitionsDetected and emits 'partition-detected'),
so mere reads inflate counters and spam listeners; change detectPartitions() to
support a non-mutating mode (e.g., add a parameter like emitEvents = true or
create a pure helper detectPartitionsSnapshot()) that returns the partition list
without incrementing partitionsDetected or emitting events, and update
getNetworkHealth() and getMeshStats() to call this non-mutating variant so reads
do not change state or emit events (keep existing behavior for callers that need
real detection side-effects).
| if (data.schemaVersion === 'agent-mesh-v1' && Array.isArray(data.peers)) { | ||
| for (const peerData of data.peers) { | ||
| if (!this.peers.has(peerData.id)) { | ||
| this.join(peerData.id, { | ||
| capabilities: peerData.capabilities ?? [], | ||
| topics: peerData.topics ?? [], | ||
| }); | ||
| } | ||
| } | ||
| return true; |
There was a problem hiding this comment.
save() / load() do not persist the actual mesh topology.
Only peer records are serialized, and load() rebuilds them via join(), which reconnects every restored peer to every active peer and ignores the saved state. After a restart, partitions, multi-hop routes, and timeout/offline state collapse into a fully-connected active mesh.
Also applies to: 872-883
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/communication/agent-mesh-network.js around lines 846 - 855,
The save/load flow currently serializes only peer records and then calls
join(peerId, ...) which recreates peers by connecting them into the active mesh
and ignores saved topology and per-peer state; update save() to serialize the
mesh topology (peer adjacency/links, routing/multi-hop entries, and each peer's
saved state like offline/timeout flags) and update load() to reconstruct Peer
instances and routing tables from that data without invoking join() for each
peer; instead, restore connections by reinstating saved adjacency/routing
entries and applying saved state to each Peer (use existing peer map this.peers,
any routing tables, and state setters like setPeerState or equivalent) so
partitions, multi-hop routes, and offline/timeout statuses are preserved after
restart.
| _deliverMessage(msg) { | ||
| const toPeer = this.peers.get(msg.to); | ||
| if (toPeer) { | ||
| toPeer.messageCount++; | ||
| } | ||
|
|
||
| this.stats.messagesReceived++; | ||
| this._emitSafe(MeshEvent.MESSAGE_RECEIVED, { | ||
| messageId: msg.id, | ||
| from: msg.from, | ||
| to: msg.to, | ||
| type: msg.type, | ||
| payload: msg.payload, | ||
| }); |
There was a problem hiding this comment.
Passive or reactivated peers never recover their queued backlog.
_deliverMessage() does not refresh the recipient's lastSeen, so a peer can keep receiving traffic and still be marked TIMEOUT. And although _drainQueue() exists, nothing calls it when a peer becomes active again, so offline messages stay stuck forever after reconnection.
Also applies to: 946-964, 986-992
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/communication/agent-mesh-network.js around lines 910 - 923,
_deliverMessage currently increments messageCount and emits MESSAGE_RECEIVED but
never refreshes the recipient peer's lastSeen or triggers processing of any
queued messages, so reactivated peers remain TIMEOUT with stuck backlogs; update
_deliverMessage to set toPeer.lastSeen = Date.now() (or call the existing method
that marks peers seen) when toPeer exists and, after marking active, call
this._drainQueue(toPeer) (or the generic _drainQueue method) to flush queued
messages; make the same adjustments in the other message-delivery paths
referenced (the similar blocks around the other delivery sites) so every code
path that delivers toPeer also refreshes lastSeen and invokes _drainQueue for
that peer.
| const AgentMeshNetwork = require('../../../.aiox-core/core/communication/agent-mesh-network'); | ||
| const { | ||
| MessageType, | ||
| PeerState, | ||
| MeshEvent, | ||
| DEFAULT_OPTIONS, | ||
| } = AgentMeshNetwork; |
There was a problem hiding this comment.
Cover the .aios-core compatibility entrypoint from this suite.
By importing the implementation directly from .aiox-core, these tests can all pass while the new .aios-core/core/communication/agent-mesh-network.js wrapper is broken. Using the public compatibility path in at least one spec would also keep the test aligned with the repo's import convention. As per coding guidelines, "Use absolute imports instead of relative imports in all code" and "Verify test coverage exists for new/modified functions".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/core/communication/agent-mesh-network.test.js` around lines 11 - 17,
The test currently imports the implementation directly from the internal
.aiox-core path; change the import to use the public compatibility entrypoint
(require('.aios-core')) so the suite covers the compatibility wrapper. Replace
the existing
require('../../../.aiox-core/core/communication/agent-mesh-network') with a
require('.aios-core') (or require('.aios-core').core/communication if the
package exposes nested exports) and then pull AgentMeshNetwork and its exports
(MessageType, PeerState, MeshEvent, DEFAULT_OPTIONS) from that module so the
test verifies the public compatibility path is exercised.
Summary
Testes
Summary by CodeRabbit
New Features
Tests
Chores