Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changeset/fix-discovered-properties-source-reconcile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
---

Add `source` column to `discovered_properties` to enable property-removal reconciliation in the hosted-property sync. Previously, the sync could only do additive upserts because it could not distinguish its own rows from crawler-written rows. Now, rows written by the hosted sync carry `source='aao_hosted'` and are reconciled (deleted when removed from the manifest) inside a domain-scoped advisory-lock transaction, preventing concurrent sync races.
8 changes: 4 additions & 4 deletions .changeset/hosted-property-fed-index-sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ Reconciliation semantics:
- `discovered_publishers` row uses a stable AAO sentinel value
(`aao://hosted`) for `discovered_by_agent` so re-syncs collapse to one
row regardless of agent ordering.
- `discovered_properties` is additive only — the table has no source
column, so we cannot safely distinguish hosted-written rows from
crawler-written rows. Removed properties persist until manually cleared.
Tracked as a follow-up.
- `discovered_properties` now has full reconcile support (see PR #4111).
Properties removed from the hosted manifest are deleted on re-sync.
The `source` column added in migration 467 enables source-aware
conflict handling; the reconcile runs under a domain-scoped advisory lock.

Also: rate-limit the per-agent rollup on `/api/registry/publisher` to
50 agents per request to bound fan-out on an unauthenticated endpoint.
Expand Down
9 changes: 6 additions & 3 deletions server/src/db/federated-index-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export interface DiscoveredProperty {
name: string;
identifiers: PropertyIdentifier[];
tags?: string[];
source?: 'crawler' | 'aao_hosted'; // Write-path discriminator (migration 467)
discovered_at?: Date;
last_validated?: Date;
expires_at?: Date;
Expand Down Expand Up @@ -548,14 +549,15 @@ export class FederatedIndexDatabase {
async upsertProperty(property: DiscoveredProperty): Promise<DiscoveredProperty> {
const result = await query<DiscoveredProperty>(
`INSERT INTO discovered_properties (
property_id, publisher_domain, property_type, name, identifiers, tags, expires_at
) VALUES ($1, $2, $3, $4, $5, $6, $7)
property_id, publisher_domain, property_type, name, identifiers, tags, expires_at, source
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (publisher_domain, name, property_type) DO UPDATE SET
property_id = COALESCE(EXCLUDED.property_id, discovered_properties.property_id),
identifiers = EXCLUDED.identifiers,
tags = EXCLUDED.tags,
last_validated = NOW(),
expires_at = EXCLUDED.expires_at
expires_at = EXCLUDED.expires_at,
source = CASE WHEN discovered_properties.source = 'crawler' THEN 'crawler' ELSE EXCLUDED.source END
RETURNING *`,
[
property.property_id || null,
Expand All @@ -565,6 +567,7 @@ export class FederatedIndexDatabase {
JSON.stringify(property.identifiers),
property.tags || [],
property.expires_at || null,
property.source || 'crawler',
]
);
return this.deserializeProperty(result.rows[0]);
Expand Down
16 changes: 16 additions & 0 deletions server/src/db/migrations/467_discovered_properties_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Migration: 467_discovered_properties_source.sql
-- Purpose: Add source column to discovered_properties to distinguish crawler-written
-- rows from hosted-sync-written rows, enabling full property reconciliation on re-sync.
--
-- Prior to this migration, discovered_properties had no write-path discriminator, so
-- hosted-property-sync.ts could only do additive upserts — it could not safely delete
-- rows it no longer owns without risking crawler-written rows. This column fixes that:
-- only rows with source='aao_hosted' are owned (and reconciled) by the sync job.

ALTER TABLE discovered_properties
ADD COLUMN source TEXT NOT NULL DEFAULT 'crawler'
CHECK (source IN ('crawler', 'aao_hosted'));

-- Index for reconcile queries: delete WHERE publisher_domain=$1 AND source='aao_hosted'
CREATE INDEX idx_properties_by_publisher_source
ON discovered_properties(publisher_domain, source);
118 changes: 95 additions & 23 deletions server/src/services/hosted-property-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
* this publisher_domain. We own that source label exclusively;
* crawler-written `adagents_json` rows for the same domain are left
* untouched (they represent verified origin facts).
* - Properties: additive only. `discovered_properties` has no source
* column, so we cannot safely distinguish hosted-written rows from
* crawler-written rows. Removed properties persist until manually
* cleared. Tracked as a follow-up — adding a `source` column to
* `discovered_properties` would let us reconcile here too.
* - Properties: full reconcile. This sync is the authoritative source
* for the publisher_domain's property list — any row not in the current
* manifest is deleted, regardless of `source`. Source is still written
* as `'aao_hosted'` on new rows; on conflict with a crawler row, the
* crawler's source label is preserved (origin-verified > hosted-only)
* but the sync still owns the reconcile (the publisher's manifest is
* the truth for which properties exist). Runs inside a domain-scoped
* advisory-lock transaction to prevent concurrent-sync interleave races.
* - Publisher row: keyed by stable sentinel (`AAO_HOSTED_SENTINEL`) so
* re-syncs collapse to the same row regardless of which agent is first
* in the manifest.
Expand All @@ -39,7 +42,7 @@
*/
import type { HostedProperty } from '../types.js';
import { FederatedIndexDatabase } from '../db/federated-index-db.js';
import { query } from '../db/client.js';
import { query, getClient } from '../db/client.js';
import { createLogger } from '../logger.js';

const logger = createLogger('hosted-property-sync');
Expand All @@ -49,6 +52,7 @@ export const AAO_HOSTED_SENTINEL = 'aao://hosted';

export interface HostedSyncResult {
properties_synced: number;
properties_removed: number;
agents_synced: number;
authorizations_reconciled: number;
authorizations_removed: number;
Expand Down Expand Up @@ -87,6 +91,7 @@ export async function syncHostedPropertyToFederatedIndex(
): Promise<HostedSyncResult> {
const result: HostedSyncResult = {
properties_synced: 0,
properties_removed: 0,
agents_synced: 0,
authorizations_reconciled: 0,
authorizations_removed: 0,
Expand All @@ -99,26 +104,93 @@ export async function syncHostedPropertyToFederatedIndex(
const agents = readAgents(adagents);
const properties = readProperties(adagents);

// Properties: additive upsert (see file-level comment for why removal
// is not yet supported).
for (const p of properties) {
if (typeof p.name !== 'string' || !p.name) continue;
const propType = typeof p.type === 'string' && p.type ? p.type : 'website';
// Properties: upsert + full reconcile under a domain-scoped advisory lock.
// The lock prevents two concurrent syncs for the same domain from interleaving
// their upserts with the trailing DELETE, which would cause the second sync's
// delete to remove rows the first sync just wrote (and vice versa).
{
const client = await getClient();
try {
await fedDb.upsertProperty({
property_id: typeof p.property_id === 'string' ? p.property_id : undefined,
publisher_domain: domain,
property_type: propType,
name: p.name,
identifiers: Array.isArray(p.identifiers)
? (p.identifiers as Array<{ type: string; value: string }>)
: [],
tags: Array.isArray(p.tags) ? (p.tags as string[]) : [],
});
result.properties_synced++;
await client.query('BEGIN');
await client.query(`SET LOCAL lock_timeout = '5000ms'`);
await client.query(`SET LOCAL statement_timeout = '30000ms'`);
await client.query('SELECT pg_advisory_xact_lock(hashtext($1))', [`dp:${domain}`]);

const propertyNames: string[] = [];
const propertyTypes: string[] = [];
let synced = 0;
for (const p of properties) {
if (typeof p.name !== 'string' || !p.name) continue;
const propType = typeof p.type === 'string' && p.type ? p.type : 'website';
propertyNames.push(p.name);
propertyTypes.push(propType);
await client.query(
`INSERT INTO discovered_properties (
property_id, publisher_domain, property_type, name, identifiers, tags, source
) VALUES ($1, $2, $3, $4, $5, $6, 'aao_hosted')
ON CONFLICT (publisher_domain, name, property_type) DO UPDATE SET
property_id = COALESCE(EXCLUDED.property_id, discovered_properties.property_id),
-- Preserve crawler-attested identifiers/tags: they represent origin-verified
-- facts and take precedence over hosted-manifest values.
identifiers = CASE WHEN discovered_properties.source = 'crawler'
THEN discovered_properties.identifiers
ELSE EXCLUDED.identifiers END,
tags = CASE WHEN discovered_properties.source = 'crawler'
THEN discovered_properties.tags
ELSE EXCLUDED.tags END,
last_validated = NOW(),
source = CASE WHEN discovered_properties.source = 'crawler'
THEN 'crawler'
ELSE 'aao_hosted' END`,
[
typeof p.property_id === 'string' ? p.property_id : null,
domain,
propType,
p.name,
JSON.stringify(Array.isArray(p.identifiers) ? p.identifiers : []),
Array.isArray(p.tags) ? p.tags : [],
]
);
synced++; // after await: counts only confirmed writes
}

// Reconcile: the hosted manifest is authoritative for this publisher's
// property list. Delete any row — regardless of source — whose
// (name, property_type) is not in the current manifest. This covers
// both aao_hosted rows (we wrote them) and crawler rows that were later
// removed from the manifest (the publisher's intent takes precedence).
// Keyed on (name, property_type) — not name alone — so a property
// reclassified to a different type is correctly removed.
const deleteResult = propertyNames.length > 0
? await client.query(
`DELETE FROM discovered_properties
WHERE publisher_domain = $1
AND NOT EXISTS (
SELECT 1 FROM unnest($2::text[], $3::text[]) AS m(mname, mtype)
WHERE m.mname = discovered_properties.name
AND m.mtype = discovered_properties.property_type
)`,
[domain, propertyNames, propertyTypes],
)
: await client.query(
// Empty manifest: publisher has declared zero properties. Delete all
// rows for the domain — the hosted manifest is authoritative and the
// publisher's intent (empty list) takes precedence over any
// crawler-attested rows that may exist.
`DELETE FROM discovered_properties WHERE publisher_domain = $1`,
[domain],
);

await client.query('COMMIT');
// Only update counters after commit so partial-rollback doesn't skew them.
result.properties_synced = synced;
result.properties_removed = deleteResult.rowCount ?? 0;
} catch (err) {
try { await client.query('ROLLBACK'); } catch { /* ignore rollback failures */ }
result.errors++;
logger.warn({ err, domain, name: p.name }, 'Failed to upsert hosted property row');
logger.warn({ err, domain }, 'Failed to sync/reconcile hosted property rows');
} finally {
client.release();
}
}

Expand Down
36 changes: 36 additions & 0 deletions server/tests/integration/hosted-property-fed-index-sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,42 @@ describe('Hosted property → federated index sync', () => {
expect(res.body.authorized_agents[0].url).toBe(AGENT_X);
});

it('reconciles properties on re-sync: properties removed from the manifest are deleted', async () => {
const initial = await propertyDb.createHostedProperty({
publisher_domain: PUB,
adagents_json: {
authorized_agents: [{ url: AGENT_X }],
properties: [
{ type: 'website', name: PUB, identifiers: [{ type: 'domain', value: PUB }] },
{ type: 'mobile_app', name: `${PUB} app` },
],
},
is_public: true,
source_type: 'community',
});
await syncHostedPropertyToFederatedIndex(initial);

let res = await request(app).get(`/api/registry/publisher?domain=${encodeURIComponent(PUB)}`);
expect(res.body.properties).toHaveLength(2);

// Re-sync with only one property — the removed one should be deleted.
await pool.query(
`UPDATE hosted_properties SET adagents_json = $2 WHERE publisher_domain = $1`,
[PUB, JSON.stringify({
authorized_agents: [{ url: AGENT_X }],
properties: [{ type: 'website', name: PUB, identifiers: [{ type: 'domain', value: PUB }] }],
})],
);
const updated = await propertyDb.getHostedPropertyByDomain(PUB);
if (!updated) throw new Error('expected hosted property to exist');
const result = await syncHostedPropertyToFederatedIndex(updated);
expect(result.properties_removed).toBe(1);

res = await request(app).get(`/api/registry/publisher?domain=${encodeURIComponent(PUB)}`);
expect(res.body.properties).toHaveLength(1);
expect(res.body.properties[0].name).toBe(PUB);
});

it('skips entries without required fields without throwing', async () => {
const hosted = await propertyDb.createHostedProperty({
publisher_domain: PUB,
Expand Down
Loading