diff --git a/bun.lock b/bun.lock index 4e7057630669..7ceb87df4079 100644 --- a/bun.lock +++ b/bun.lock @@ -688,7 +688,7 @@ "dompurify": "3.3.1", "drizzle-kit": "1.0.0-beta.19-d95b7a4", "drizzle-orm": "1.0.0-beta.19-d95b7a4", - "effect": "4.0.0-beta.59", + "effect": "4.0.0-beta.57", "fuzzysort": "3.1.0", "hono": "4.10.7", "hono-openapi": "1.1.2", @@ -3003,7 +3003,7 @@ "ee-first": ["ee-first@1.1.1", "", {}, "sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow=="], - "effect": ["effect@4.0.0-beta.59", "", { "dependencies": { "@standard-schema/spec": "^1.1.0", "fast-check": "^4.6.0", "find-my-way-ts": "^0.1.6", "ini": "^6.0.0", "kubernetes-types": "^1.30.0", "msgpackr": "^1.11.9", "multipasta": "^0.2.7", "toml": "^4.1.1", "uuid": "^13.0.0", "yaml": "^2.8.3" } }, "sha512-xyUDLeHSe8d6lWGOvR6Fgn2HL6gYeTZ/S4Jzk9uc4ZUxMPPsNZlNXrvk0C7/utQFzeX7uAWcVnG2BjbA0SRoAA=="], + "effect": ["effect@4.0.0-beta.57", "", { "dependencies": { "@standard-schema/spec": "^1.1.0", "fast-check": "^4.6.0", "find-my-way-ts": "^0.1.6", "ini": "^6.0.0", "kubernetes-types": "^1.30.0", "msgpackr": "^1.11.9", "multipasta": "^0.2.7", "toml": "^4.1.1", "uuid": "^13.0.0", "yaml": "^2.8.3" } }, "sha512-rg32VgXnLKaPRs9tbRDaZ5jxmzNY7ojXt85gSHGUTwdlbWH5Ik+OCUY2q14TXliygPGoHwCAvNWS4bQJOqf00g=="], "ejs": ["ejs@3.1.10", "", { "dependencies": { "jake": "^10.8.5" }, "bin": { "ejs": "bin/cli.js" } }, "sha512-UeJmFfOrAQS8OJWPZ4qtgHyWExa088/MtK5UEyoJGFH67cDEXkZSviOiKRCZ4Xij0zxI3JECgYs3oKx+AizQBA=="], diff --git a/package.json b/package.json index 9d9207c5ea3e..12f6250262c9 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,7 @@ "dompurify": "3.3.1", "drizzle-kit": "1.0.0-beta.19-d95b7a4", "drizzle-orm": "1.0.0-beta.19-d95b7a4", - "effect": "4.0.0-beta.59", + "effect": "4.0.0-beta.57", "ai": "6.0.168", "cross-spawn": "7.0.6", "hono": "4.10.7", diff --git a/packages/opencode/src/cli/cmd/debug/ripgrep.ts b/packages/opencode/src/cli/cmd/debug/ripgrep.ts index 8d1cbd2b1eae..5a7fb41cc423 100644 --- a/packages/opencode/src/cli/cmd/debug/ripgrep.ts +++ b/packages/opencode/src/cli/cmd/debug/ripgrep.ts @@ -48,15 +48,15 @@ const FilesCommand = effectCmd({ const ctx = yield* InstanceRef if (!ctx) return const rg = yield* Ripgrep.Service - const files = yield* rg + const files: string[] = [] + yield* rg .files({ cwd: ctx.directory, glob: args.glob ? [args.glob] : undefined, }) .pipe( Stream.take(args.limit ?? Infinity), - Stream.runCollect, - Effect.map((c) => [...c]), + Stream.runForEach((file) => Effect.sync(() => { files.push(file) })), Effect.orDie, ) process.stdout.write(files.join(EOL) + EOL) diff --git a/packages/opencode/src/file/index.ts b/packages/opencode/src/file/index.ts index 4dd6a3ae7a69..ea70fc11d5a1 100644 --- a/packages/opencode/src/file/index.ts +++ b/packages/opencode/src/file/index.ts @@ -379,10 +379,8 @@ export const layer = Layer.effect( next.dirs = Array.from(dirs).toSorted() } else { - const files = yield* rg.files({ cwd: ctx.directory }).pipe( - Stream.runCollect, - Effect.map((chunk) => [...chunk]), - ) + const files: string[] = [] + yield* Stream.runForEach(rg.files({ cwd: ctx.directory }), (file) => Effect.sync(() => { files.push(file) })) const seen = new Set() for (const file of files) { next.files.push(file) diff --git a/packages/opencode/src/file/ripgrep.ts b/packages/opencode/src/file/ripgrep.ts index 27fd5f2323d6..1cda3fda9e12 100644 --- a/packages/opencode/src/file/ripgrep.ts +++ b/packages/opencode/src/file/ripgrep.ts @@ -383,17 +383,20 @@ export const layer: Layer.Layer line.length > 0), + Stream.mapEffect(parse), + Stream.filter((item): item is Match => item.type === "match"), + Stream.map((item) => row(item.data)), + ) + const [items, stderr, code] = yield* Effect.all( [ - Stream.decodeText(handle.stdout).pipe( - Stream.splitLines, - Stream.filter((line) => line.length > 0), - Stream.mapEffect(parse), - Stream.filter((item): item is Match => item.type === "match"), - Stream.map((item) => row(item.data)), - Stream.runCollect, - Effect.map((chunk) => [...chunk]), - ), + Effect.gen(function* () { + const acc: ReturnType[] = [] + yield* Stream.runForEach(rowStream, (item) => Effect.sync(() => { acc.push(item) })) + return acc + }), Stream.mkString(Stream.decodeText(handle.stderr)), handle.exitCode, ], @@ -416,7 +419,8 @@ export const layer: Layer.Layer Effect.sync(() => { list.push(file) })) interface Node { name: string diff --git a/packages/opencode/src/git/index.ts b/packages/opencode/src/git/index.ts index 349bbad466ec..98e45edf1bfe 100644 --- a/packages/opencode/src/git/index.ts +++ b/packages/opencode/src/git/index.ts @@ -118,23 +118,25 @@ export const layer = Layer.effect( }) const handle = yield* spawner.spawn(proc) const collect = (stream: typeof handle.stdout) => - Stream.runFold( - stream, - () => ({ chunks: [] as Uint8Array[], bytes: 0, truncated: false }), - (acc, chunk) => { - if (opts.maxOutputBytes === undefined) { - acc.chunks.push(chunk) - acc.bytes += chunk.length - return acc - } - - const remaining = opts.maxOutputBytes - acc.bytes - if (remaining > 0) acc.chunks.push(remaining >= chunk.length ? chunk : chunk.slice(0, remaining)) - acc.bytes += chunk.length - acc.truncated = acc.truncated || acc.bytes > opts.maxOutputBytes - return acc - }, - ).pipe(Effect.map((x) => ({ buffer: Buffer.concat(x.chunks), truncated: x.truncated }))) + Effect.gen(function* () { + const chunks: Uint8Array[] = [] + let bytes = 0 + let truncated = false + yield* Stream.runForEach(stream, (chunk) => + Effect.sync(() => { + if (opts.maxOutputBytes === undefined) { + chunks.push(chunk) + bytes += chunk.length + } else { + const remaining = opts.maxOutputBytes - bytes + if (remaining > 0) chunks.push(remaining >= chunk.length ? chunk : chunk.slice(0, remaining)) + bytes += chunk.length + truncated = truncated || bytes > opts.maxOutputBytes + } + }), + ) + return { buffer: Buffer.concat(chunks), truncated } + }) const [stdout, stderr] = yield* Effect.all([collect(handle.stdout), collect(handle.stderr)], { concurrency: 2 }) return { exitCode: yield* handle.exitCode, diff --git a/packages/opencode/src/snapshot/index.ts b/packages/opencode/src/snapshot/index.ts index ea30f5afc7ca..9e8634376339 100644 --- a/packages/opencode/src/snapshot/index.ts +++ b/packages/opencode/src/snapshot/index.ts @@ -573,8 +573,28 @@ export const layer: Layer.Layer< stdin: Stream.make(new TextEncoder().encode(refs.map((item) => item.ref).join("\n") + "\n")), }) const handle = yield* spawner.spawn(proc) + const collectUint8 = (stream: typeof handle.stdout) => { + const chunks: Uint8Array[] = [] + let bytes = 0 + return Stream.runForEach(stream, (chunk) => + Effect.sync(() => { + chunks.push(chunk) + bytes += chunk.length + }), + ).pipe( + Effect.map(() => { + const result = new Uint8Array(bytes) + let offset = 0 + for (const chunk of chunks) { + result.set(chunk, offset) + offset += chunk.length + } + return result + }), + ) + } const [out, err] = yield* Effect.all( - [Stream.mkUint8Array(handle.stdout), Stream.mkString(Stream.decodeText(handle.stderr))], + [collectUint8(handle.stdout), Stream.mkString(Stream.decodeText(handle.stderr))], { concurrency: 2 }, ) const code = yield* handle.exitCode diff --git a/packages/opencode/src/tool/glob.ts b/packages/opencode/src/tool/glob.ts index 0c97b9cdf7c5..ca6aed5a8c6d 100644 --- a/packages/opencode/src/tool/glob.ts +++ b/packages/opencode/src/tool/glob.ts @@ -47,22 +47,24 @@ export const GlobTool = Tool.define( const limit = 100 let truncated = false - const files = yield* rg.files({ cwd: search, glob: [params.pattern], signal: ctx.abort }).pipe( - Stream.mapEffect((file) => - Effect.gen(function* () { - const full = path.resolve(search, file) - const info = yield* fs.stat(full).pipe(Effect.catch(() => Effect.succeed(undefined))) - const mtime = - info?.mtime.pipe( - Option.map((date) => date.getTime()), - Option.getOrElse(() => 0), - ) ?? 0 - return { path: full, mtime } - }), + const files: { path: string; mtime: number }[] = [] + yield* Stream.runForEach( + rg.files({ cwd: search, glob: [params.pattern], signal: ctx.abort }).pipe( + Stream.mapEffect((file) => + Effect.gen(function* () { + const full = path.resolve(search, file) + const info = yield* fs.stat(full).pipe(Effect.catch(() => Effect.succeed(undefined))) + const mtime = + info?.mtime.pipe( + Option.map((date) => date.getTime()), + Option.getOrElse(() => 0), + ) ?? 0 + return { path: full, mtime } + }), + ), + Stream.take(limit + 1), ), - Stream.take(limit + 1), - Stream.runCollect, - Effect.map((chunk) => [...chunk]), + (item) => Effect.sync(() => { files.push(item) }), ) if (files.length > limit) { diff --git a/packages/opencode/src/tool/skill.ts b/packages/opencode/src/tool/skill.ts index 8c41077be5ec..aedc5d2645db 100644 --- a/packages/opencode/src/tool/skill.ts +++ b/packages/opencode/src/tool/skill.ts @@ -39,13 +39,16 @@ export const SkillTool = Tool.define( const dir = path.dirname(info.location) const base = pathToFileURL(dir).href const limit = 10 - const files = yield* rg.files({ cwd: dir, follow: false, hidden: true, signal: ctx.abort }).pipe( - Stream.filter((file) => !file.includes("SKILL.md")), - Stream.map((file) => path.resolve(dir, file)), - Stream.take(limit), - Stream.runCollect, - Effect.map((chunk) => [...chunk].map((file) => `${file}`).join("\n")), + const fileList: string[] = [] + yield* Stream.runForEach( + rg.files({ cwd: dir, follow: false, hidden: true, signal: ctx.abort }).pipe( + Stream.filter((file) => !file.includes("SKILL.md")), + Stream.map((file) => path.resolve(dir, file)), + Stream.take(limit), + ), + (file) => Effect.sync(() => { fileList.push(file) }), ) + const files = fileList.map((file) => `${file}`).join("\n") return { title: `Loaded skill: ${info.name}`,