Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 100 additions & 82 deletions packages/plugins/google-discovery/src/sdk/binding-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@

import { Effect, Schema } from "effect";

import {
defineSchema,
type StorageDeps,
type StorageFailure,
} from "@executor-js/sdk/core";
import { defineSchema, type StorageDeps, type StorageFailure } from "@executor-js/sdk/core";

import {
GoogleDiscoveryMethodBinding,
Expand Down Expand Up @@ -134,18 +130,45 @@ const decodeStoredSourceData = Schema.decodeUnknownSync(GoogleDiscoveryStoredSou
const encodeBinding = Schema.encodeSync(GoogleDiscoveryMethodBinding);
const decodeBinding = Schema.decodeUnknownSync(GoogleDiscoveryMethodBinding);

const toJsonRecord = (value: unknown): Record<string, unknown> => value as Record<string, unknown>;
const JsonRecord = Schema.Record(Schema.String, Schema.Unknown);
const decodeJsonRecord = Schema.decodeUnknownSync(JsonRecord);
const decodeJsonString = Schema.decodeUnknownSync(Schema.fromJsonString(Schema.Unknown));

const decodeJson = (value: unknown): unknown => {
if (value === null || value === undefined) return value;
if (typeof value !== "string") return value;
try {
return JSON.parse(value);
} catch {
return value;
}
return decodeJsonString(value);
};

const SourceRow = Schema.Struct({
id: Schema.String,
scope_id: Schema.String,
name: Schema.String,
});
const decodeSourceRow = Schema.decodeUnknownSync(SourceRow);

const BindingRow = Schema.Struct({
id: Schema.String,
source_id: Schema.String,
});
const decodeBindingRow = Schema.decodeUnknownSync(BindingRow);

const CredentialValueRow = Schema.Struct({
name: Schema.String,
kind: Schema.Union([Schema.Literal("text"), Schema.Literal("secret")]),
text_value: Schema.optional(Schema.NullOr(Schema.String)),
secret_id: Schema.optional(Schema.NullOr(Schema.String)),
secret_prefix: Schema.optional(Schema.NullOr(Schema.String)),
});
const decodeCredentialValueRow = Schema.decodeUnknownSync(CredentialValueRow);

const CredentialLookupRow = Schema.Struct({
source_id: Schema.String,
scope_id: Schema.String,
name: Schema.String,
});
const decodeCredentialLookupRow = Schema.decodeUnknownSync(CredentialLookupRow);

// --- auth column packing/unpacking ------------------------------------------

interface AuthColumns {
Expand Down Expand Up @@ -239,14 +262,15 @@ const rowsToValueMap = (
): Record<string, GoogleDiscoveryCredentialValue> => {
const out: Record<string, GoogleDiscoveryCredentialValue> = {};
for (const row of rows) {
const name = row.name as string;
if (row.kind === "secret" && typeof row.secret_id === "string") {
const prefix = row.secret_prefix as string | undefined | null;
const decoded = decodeCredentialValueRow(row);
if (decoded.kind === "secret" && decoded.secret_id) {
const prefix = decoded.secret_prefix;
const name = decoded.name;
out[name] = prefix
? { secretId: row.secret_id, prefix }
: { secretId: row.secret_id };
} else if (row.kind === "text" && typeof row.text_value === "string") {
out[name] = row.text_value;
? { secretId: decoded.secret_id, prefix }
: { secretId: decoded.secret_id };
} else if (decoded.kind === "text" && typeof decoded.text_value === "string") {
out[decoded.name] = decoded.text_value;
}
}
return out;
Expand Down Expand Up @@ -320,9 +344,7 @@ export interface GoogleDiscoveryStore {

/** Source rows whose oauth2 auth columns reference the given secret id.
* `slot` distinguishes client_id vs client_secret. */
readonly findSourcesBySecret: (
secretId: string,
) => Effect.Effect<
readonly findSourcesBySecret: (secretId: string) => Effect.Effect<
readonly {
readonly namespace: string;
readonly scope_id: string;
Expand All @@ -333,9 +355,7 @@ export interface GoogleDiscoveryStore {
>;

/** Source rows whose oauth2 auth points at the given connection id. */
readonly findSourcesByConnection: (
connectionId: string,
) => Effect.Effect<
readonly findSourcesByConnection: (connectionId: string) => Effect.Effect<
readonly {
readonly namespace: string;
readonly scope_id: string;
Expand Down Expand Up @@ -382,7 +402,7 @@ export const makeGoogleDiscoveryStore = (
});
if (!row) return null;
const decoded = decodeBinding(decodeJson(row.binding));
return { namespace: row.source_id as string, binding: decoded };
return { namespace: decodeBindingRow(row).source_id, binding: decoded };
}),

putBinding: (toolId, sourceId, scope, binding) =>
Expand All @@ -404,7 +424,7 @@ export const makeGoogleDiscoveryStore = (
id: toolId,
scope_id: scope,
source_id: sourceId,
binding: toJsonRecord(encodeBinding(binding)),
binding: decodeJsonRecord(encodeBinding(binding)),
created_at: new Date(),
},
forceAllowId: true,
Expand All @@ -420,7 +440,7 @@ export const makeGoogleDiscoveryStore = (
{ field: "scope_id", value: scope },
],
});
const ids = rows.map((r) => r.id as string);
const ids = rows.map((r) => decodeBindingRow(r).id);
yield* db.deleteMany({
model: "google_discovery_binding",
where: [
Expand All @@ -442,7 +462,7 @@ export const makeGoogleDiscoveryStore = (
});
const out = new Map<string, GoogleDiscoveryMethodBinding>();
for (const row of rows) {
out.set(row.id as string, decodeBinding(decodeJson(row.binding)));
out.set(decodeBindingRow(row).id, decodeBinding(decodeJson(row.binding)));
}
return out;
}),
Expand All @@ -462,26 +482,22 @@ export const makeGoogleDiscoveryStore = (
yield* deleteSourceChildren(source.namespace, source.scope);

const encoded = stripExtractedFields(
encodeStoredSourceData(source.config) as Record<string, unknown>,
decodeJsonRecord(encodeStoredSourceData(source.config)),
);
yield* db.create({
model: "google_discovery_source",
data: {
id: source.namespace,
scope_id: source.scope,
name: source.name,
config: toJsonRecord(encoded),
config: encoded,
created_at: now,
updated_at: now,
...authToColumns(source.config.auth),
},
forceAllowId: true,
});
yield* writeCredentialRows(
source.namespace,
source.scope,
source.config.credentials,
);
yield* writeCredentialRows(source.namespace, source.scope, source.config.credentials);
}),

updateSourceMeta: (sourceId, scope, update) =>
Expand All @@ -502,7 +518,7 @@ export const makeGoogleDiscoveryStore = (
{ field: "scope_id", value: scope },
],
update: {
name: update.name ?? (row.name as string),
name: update.name ?? decodeSourceRow(row).name,
updated_at: new Date(),
...authToColumns(auth),
},
Expand Down Expand Up @@ -531,10 +547,11 @@ export const makeGoogleDiscoveryStore = (
],
});
if (!row) return null;
const sourceRow = decodeSourceRow(row);
return {
namespace: row.id as string,
scope: row.scope_id as string,
name: row.name as string,
namespace: sourceRow.id,
scope: sourceRow.scope_id,
name: sourceRow.name,
config: yield* hydrateStoredSourceData(row, sourceId, scope),
};
}),
Expand All @@ -558,15 +575,11 @@ export const makeGoogleDiscoveryStore = (
[
db.findMany({
model: "google_discovery_source",
where: [
{ field: "auth_client_id_secret_id", value: secretId },
],
where: [{ field: "auth_client_id_secret_id", value: secretId }],
}),
db.findMany({
model: "google_discovery_source",
where: [
{ field: "auth_client_secret_secret_id", value: secretId },
],
where: [{ field: "auth_client_secret_secret_id", value: secretId }],
}),
],
{ concurrency: "unbounded" },
Expand All @@ -578,18 +591,20 @@ export const makeGoogleDiscoveryStore = (
readonly slot: string;
}[] = [];
for (const r of byClientId) {
const row = decodeSourceRow(r);
out.push({
namespace: r.id as string,
scope_id: r.scope_id as string,
name: r.name as string,
namespace: row.id,
scope_id: row.scope_id,
name: row.name,
slot: "auth.oauth2.client_id",
});
}
for (const r of byClientSecret) {
const row = decodeSourceRow(r);
out.push({
namespace: r.id as string,
scope_id: r.scope_id as string,
name: r.name as string,
namespace: row.id,
scope_id: row.scope_id,
name: row.name,
slot: "auth.oauth2.client_secret",
});
}
Expand All @@ -604,12 +619,15 @@ export const makeGoogleDiscoveryStore = (
})
.pipe(
Effect.map((rows) =>
rows.map((r) => ({
namespace: r.id as string,
scope_id: r.scope_id as string,
name: r.name as string,
slot: "auth.oauth2.connection",
})),
rows.map((r) => {
const row = decodeSourceRow(r);
return {
namespace: row.id,
scope_id: row.scope_id,
name: row.name,
slot: "auth.oauth2.connection",
};
}),
),
),

Expand All @@ -629,18 +647,24 @@ export const makeGoogleDiscoveryStore = (
{ concurrency: "unbounded" },
);
return [
...headers.map((r) => ({
kind: "credential_header" as const,
source_id: r.source_id as string,
scope_id: r.scope_id as string,
name: r.name as string,
})),
...params.map((r) => ({
kind: "credential_query_param" as const,
source_id: r.source_id as string,
scope_id: r.scope_id as string,
name: r.name as string,
})),
...headers.map((r) => {
const row = decodeCredentialLookupRow(r);
return {
kind: "credential_header" as const,
source_id: row.source_id,
scope_id: row.scope_id,
name: row.name,
};
}),
...params.map((r) => {
const row = decodeCredentialLookupRow(r);
return {
kind: "credential_query_param" as const,
source_id: row.source_id,
scope_id: row.scope_id,
name: row.name,
};
}),
];
}),

Expand All @@ -651,8 +675,9 @@ export const makeGoogleDiscoveryStore = (
const requested = new Set(keys);
const out = new Map<string, string>();
for (const r of rows) {
const key = `${r.scope_id as string}:${r.id as string}`;
if (requested.has(key)) out.set(key, r.name as string);
const row = decodeSourceRow(r);
const key = `${row.scope_id}:${row.id}`;
if (requested.has(key)) out.set(key, row.name);
}
return out;
}),
Expand Down Expand Up @@ -698,11 +723,7 @@ export const makeGoogleDiscoveryStore = (
forceAllowId: true,
});
}
const paramRows = valueMapToRows(
sourceId,
scope,
credentials.queryParams,
);
const paramRows = valueMapToRows(sourceId, scope, credentials.queryParams);
if (paramRows.length > 0) {
yield* db.createMany({
model: "google_discovery_source_credential_query_param",
Expand All @@ -719,7 +740,7 @@ export const makeGoogleDiscoveryStore = (
scope: string,
): Effect.Effect<GoogleDiscoveryStoredSourceData, StorageFailure> {
return Effect.gen(function* () {
const partial = decodeJson(row.config) as Record<string, unknown>;
const partial = decodeJsonRecord(decodeJson(row.config));
const headerRows = yield* db.findMany({
model: "google_discovery_source_credential_header",
where: [
Expand All @@ -737,8 +758,7 @@ export const makeGoogleDiscoveryStore = (
const headers = rowsToValueMap(headerRows);
const queryParams = rowsToValueMap(paramRows);
const credentials =
Object.keys(headers).length === 0 &&
Object.keys(queryParams).length === 0
Object.keys(headers).length === 0 && Object.keys(queryParams).length === 0
? undefined
: {
...(Object.keys(headers).length > 0 ? { headers } : {}),
Expand All @@ -757,9 +777,7 @@ export const makeGoogleDiscoveryStore = (
// Strip auth/credentials from the encoded source-data shape. Those
// moved to columns and child tables; the remaining structural fields
// live in the `config` JSON.
const stripExtractedFields = (
encoded: Record<string, unknown>,
): Record<string, unknown> => {
const stripExtractedFields = (encoded: Record<string, unknown>): Record<string, unknown> => {
const { auth, credentials, ...rest } = encoded;
void auth;
void credentials;
Expand Down
Loading
Loading