chore: Streaming images for pdf transcript#38263
Conversation
|
Looks like this PR is not ready to merge, because of the following issues:
Please fix the issues and try again If you have any trouble, please check the PR guidelines |
|
WalkthroughAdds streaming-based upload support: new UploadService stream processing and stream-to-file upload methods; broker routing for readable streams with details in meta; interface/type updates for upload metadata; Matrix media metadata signature change; omnichannel transcript refactored to use streams. Changes
Sequence DiagramsequenceDiagram
participant Client
participant Broker
participant UploadService
participant TempFS as Temp/FileSystem
participant Storage as UploadFS
participant DB
Client->>Broker: call upload with streamParam + details (meta)
Broker->>UploadService: route readable stream (streamParam) and meta.details
UploadService->>TempFS: pipe stream -> temp file
TempFS-->>UploadService: file persisted (size known)
UploadService->>Storage: insert file from temp into UploadFS
Storage-->>UploadService: stored identifier/path
UploadService->>DB: insert upload record with metadata
DB-->>UploadService: upload record
UploadService-->>Client: resolve Promise<IUpload>
alt optional image resize
Client->>UploadService: request streamUploadedFile(file, imageResizeOpts)
UploadService->>UploadService: apply sharp transform -> Readable
UploadService-->>Client: Promise<Readable> (resized stream)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #38263 +/- ##
===========================================
- Coverage 70.39% 70.37% -0.02%
===========================================
Files 3161 3161
Lines 110654 110688 +34
Branches 19892 19899 +7
===========================================
+ Hits 77895 77901 +6
- Misses 30731 30758 +27
- Partials 2028 2029 +1
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
9d7c70d to
9d16efd
Compare
9d16efd to
675cf71
Compare
There was a problem hiding this comment.
1 issue found across 6 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="ee/packages/omnichannel-services/src/OmnichannelTranscript.ts">
<violation number="1" location="ee/packages/omnichannel-services/src/OmnichannelTranscript.ts:420">
P1: The usage of `uploadFileFromStream` (which relies on `.pipe()`) risks causing the process to hang if the PDF generation stream emits an error. Standard `.pipe()` does not forward errors to the destination, and `uploadFileFromStream` does not appear to handle source stream errors. If the source stream fails, the upload promise will likely never resolve.
Ensure `uploadFileFromStream` handles the source stream's 'error' event (e.g., using `stream.pipeline` or explicit listeners) to reject the promise appropriately.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| return uploadService.uploadFile({ | ||
| userId: details.userId, | ||
| buffer, | ||
| return uploadService.uploadFileFromStream({ |
There was a problem hiding this comment.
P1: The usage of uploadFileFromStream (which relies on .pipe()) risks causing the process to hang if the PDF generation stream emits an error. Standard .pipe() does not forward errors to the destination, and uploadFileFromStream does not appear to handle source stream errors. If the source stream fails, the upload promise will likely never resolve.
Ensure uploadFileFromStream handles the source stream's 'error' event (e.g., using stream.pipeline or explicit listeners) to reject the promise appropriately.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At ee/packages/omnichannel-services/src/OmnichannelTranscript.ts, line 420:
<comment>The usage of `uploadFileFromStream` (which relies on `.pipe()`) risks causing the process to hang if the PDF generation stream emits an error. Standard `.pipe()` does not forward errors to the destination, and `uploadFileFromStream` does not appear to handle source stream errors. If the source stream fails, the upload promise will likely never resolve.
Ensure `uploadFileFromStream` handles the source stream's 'error' event (e.g., using `stream.pipeline` or explicit listeners) to reject the promise appropriately.</comment>
<file context>
@@ -406,23 +405,20 @@ export class OmnichannelTranscript extends ServiceClass implements IOmnichannelT
- return uploadService.uploadFile({
- userId: details.userId,
- buffer,
+ return uploadService.uploadFileFromStream({
+ streamParam,
details: {
</file context>
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ee/packages/omnichannel-services/src/OmnichannelTranscript.ts (1)
407-435:⚠️ Potential issue | 🔴 CriticalCritical: the same
Readablestream is consumed by multiple concurrent uploads — only the first will receive data.
uploadFilesmaps overroomIds(which contains 2 entries per the call site at Line 370) and passes the samestreamParaminstance to eachuploadFileFromStreamcall viaPromise.all. AReadablestream can only be consumed once — the second upload will receive an empty/errored stream.This worked before because
Bufferis freely reusable, but streams are not.🐛 Suggested approach — buffer once or duplicate the stream
One option: convert the stream to a
Bufferonce, then create freshReadableinstances per upload:private async uploadFiles({ - streamParam, + streamParam, roomIds, data, transcriptText, }: { streamParam: Readable; roomIds: string[]; data: Pick<WorkerData, 'siteName' | 'visitor'>; transcriptText: string; }): Promise<IUpload[]> { + const buffer = await streamToBuffer(streamParam); return Promise.all( roomIds.map((roomId) => { return uploadService.uploadFileFromStream({ - streamParam, + streamParam: Readable.from(buffer), details: {Alternatively, write to a single temp file first and upload from that path for each room.
🤖 Fix all issues with AI agents
In `@apps/meteor/server/services/upload/service.ts`:
- Around line 170-202: The finish handler in uploadFileFromStream uses
resolver.resolve(await FileUpload.getStore('Uploads').insert(...)) which will
leave the returned promise unresolved and leak tempFilePath if insert() throws;
wrap the async callback for writeStream.on('finish', ...) in a try/catch, on
success call resolver.resolve(...) with the insert result, and on any error call
await fs.promises.unlink(tempFilePath).catch(()=>undefined) then
resolver.reject(err); ensure the existing writeStream.on('error') behavior
remains for stream errors and only the insert failure path performs cleanup and
rejection so the temp file is always removed and the resolver always settles.
- Around line 156-164: In streamUploadedFile, avoid throwing inside the
sharp().on('error') handler because that won't reject the caller; instead
propagate stream errors by using stream.pipeline (from stream/promises) or by
piping through a PassThrough and re-emitting errors: replace the direct
stream.pipe(sharp().resize(...).on('error', ...)) with a pipeline(stream,
sharp().resize({width, height, fit: 'contain'})) so errors from the source and
transformer are forwarded to the returned Readable (or wrap sharp in a
PassThrough and call passThrough.emit('error', err) inside the sharp error
handler); ensure you remove the throw and wire error propagation so consumers of
streamUploadedFile receive 'error' events or a rejected pipeline promise.
In `@packages/core-services/src/types/IUploadService.ts`:
- Line 9: IUploadFileParams.details was tightened from Partial<IUploadDetails>
to IUploadDetails causing breaks; update every call site that constructs or
passes IUploadFileParams (search for IUploadFileParams, uploadFile, and any
usages of details) to supply the full required IUploadDetails properties: name,
size, type, rid, and userId, ensuring none are undefined, or if a site cannot
provide them yet, restore the type to Partial<IUploadDetails> or build a
complete object before calling the function; verify compilation and tests after
changing each call site.
🧹 Nitpick comments (2)
ee/packages/federation-matrix/src/services/MatrixMediaService.ts (1)
1-1: ImportIUploadDetailsas a type — it's only used in a type position.
IUploadDetailsis used solely as a parameter type annotation on Line 83. Importing it as a value causes unnecessary runtime resolution of the module.♻️ Proposed fix
-import { IUploadDetails } from '@rocket.chat/apps-engine/definition/uploads/IUploadDetails'; +import type { IUploadDetails } from '@rocket.chat/apps-engine/definition/uploads/IUploadDetails';ee/packages/network-broker/src/NetworkBroker.ts (1)
44-51: Convention-based streaming detection is fragile — any call withdata[0].streamParamtriggers it.The broker routes all service calls, so any method whose first argument happens to contain a
streamParamproperty will be treated as a streaming call. This works today because onlyuploadFileFromStreamuses this pattern, but it's an implicit convention that could cause subtle bugs if another service accidentally uses a parameter namedstreamParam.Consider making this more explicit, e.g., by using a sentinel/symbol or a dedicated streaming call method on the broker.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
apps/meteor/server/services/upload/service.tsee/packages/federation-matrix/src/events/message.tsee/packages/federation-matrix/src/services/MatrixMediaService.tsee/packages/network-broker/src/NetworkBroker.tsee/packages/omnichannel-services/src/OmnichannelTranscript.tspackages/core-services/src/types/IUploadService.ts
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{ts,tsx,js}
📄 CodeRabbit inference engine (.cursor/rules/playwright.mdc)
**/*.{ts,tsx,js}: Write concise, technical TypeScript/JavaScript with accurate typing in Playwright tests
Avoid code comments in the implementation
Files:
ee/packages/network-broker/src/NetworkBroker.tsee/packages/federation-matrix/src/events/message.tspackages/core-services/src/types/IUploadService.tsapps/meteor/server/services/upload/service.tsee/packages/federation-matrix/src/services/MatrixMediaService.tsee/packages/omnichannel-services/src/OmnichannelTranscript.ts
🧠 Learnings (4)
📚 Learning: 2025-11-16T19:09:43.823Z
Learnt from: d-gubert
Repo: RocketChat/Rocket.Chat PR: 37496
File: packages/apps-engine/tests/server/runtime/deno/LivenessManager.spec.ts:3-3
Timestamp: 2025-11-16T19:09:43.823Z
Learning: In Node.js, `EventEmitter` can be imported from either the 'events' module or the 'stream' module—both export the same reference. While 'events' is the canonical module, importing from 'stream' is valid and works correctly.
Applied to files:
ee/packages/network-broker/src/NetworkBroker.ts
📚 Learning: 2025-10-28T16:53:42.761Z
Learnt from: ricardogarim
Repo: RocketChat/Rocket.Chat PR: 37205
File: ee/packages/federation-matrix/src/FederationMatrix.ts:296-301
Timestamp: 2025-10-28T16:53:42.761Z
Learning: In the Rocket.Chat federation-matrix integration (ee/packages/federation-matrix/), the createRoom method from rocket.chat/federation-sdk will support a 4-argument signature (userId, roomName, visibility, displayName) in newer versions. Code using this 4-argument call is forward-compatible with planned library updates and should not be flagged as an error.
Applied to files:
ee/packages/federation-matrix/src/events/message.tsee/packages/federation-matrix/src/services/MatrixMediaService.tsee/packages/omnichannel-services/src/OmnichannelTranscript.ts
📚 Learning: 2025-12-09T20:01:00.324Z
Learnt from: sampaiodiego
Repo: RocketChat/Rocket.Chat PR: 37532
File: ee/packages/federation-matrix/src/FederationMatrix.ts:920-927
Timestamp: 2025-12-09T20:01:00.324Z
Learning: When reviewing federation invite handling in Rocket.Chat (specifically under ee/packages/federation-matrix), understand that rejecting an invite via federationSDK.rejectInvite() triggers an event-driven cleanup: a leave event is emitted and handled by handleLeave() in ee/packages/federation-matrix/src/events/member.ts, which calls Room.performUserRemoval() to remove the subscription. Do not add explicit cleanup in the reject branch of handleInvite(); rely on the existing leave-event flow for cleanup. If making changes, ensure this invariant remains and that any related paths still funnel cleanup through the leave event to avoid duplicate or missing removals.
Applied to files:
ee/packages/federation-matrix/src/events/message.tsee/packages/federation-matrix/src/services/MatrixMediaService.ts
📚 Learning: 2026-01-15T22:03:35.587Z
Learnt from: d-gubert
Repo: RocketChat/Rocket.Chat PR: 38071
File: apps/meteor/app/apps/server/bridges/listeners.ts:257-271
Timestamp: 2026-01-15T22:03:35.587Z
Learning: In the file upload pipeline (apps/meteor/app/apps/server/bridges/listeners.ts), temporary files are created by the server in the same filesystem, so symlinks between temp files are safe and don't require cross-filesystem fallbacks.
Applied to files:
apps/meteor/server/services/upload/service.ts
🧬 Code graph analysis (2)
packages/core-services/src/types/IUploadService.ts (1)
packages/core-typings/src/IUpload.ts (1)
IUpload(4-67)
ee/packages/omnichannel-services/src/OmnichannelTranscript.ts (2)
apps/meteor/packages/rocketchat-mongo-config/server/index.js (1)
stream(51-51)apps/meteor/app/lib/server/functions/notifications/email.js (1)
files(32-32)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: 📦 Build Packages
- GitHub Check: cubic · AI code reviewer
- GitHub Check: CodeQL-Build
- GitHub Check: CodeQL-Build
🔇 Additional comments (6)
packages/core-services/src/types/IUploadService.ts (1)
1-2: New streaming methods look well-designed.The
streamUploadedFilemethod with optional resize anduploadFileFromStreamwithOmit<IUploadDetails, 'size'>are clean additions. Omittingsizefrom the stream-based upload makes sense since it's computed from the written bytes.Also applies to: 34-41
ee/packages/federation-matrix/src/services/MatrixMediaService.ts (1)
83-109: Defaults fornameandtyperemoved — callers must now always provide them.Previously the method defaulted
nameto'unnamed'andtypeto'application/octet-stream'when metadata didn't include them. With the spread approach, those fallbacks are gone. The single caller inmessage.ts(Lines 45-51) does supply both, so this is currently safe — but the contract is less defensive.This is fine if
IUploadDetailsrequiresnameandtype, which the type system would enforce. Just flagging for awareness.ee/packages/federation-matrix/src/events/message.ts (1)
45-51: LGTM — defaults and field rename align with theIUploadDetailscontract.The fallbacks for
sizeandtyperestore the defensive behavior that was previously insidedownloadAndStoreRemoteFile, andridmatches the expectedIUploadDetailsfield name.ee/packages/omnichannel-services/src/OmnichannelTranscript.ts (1)
243-246: Stream-to-buffer conversion for images is intentional but worth a clarifying note.
streamUploadedFilereturns a stream (with on-the-fly resize via sharp), butstreamToBufferimmediately buffers it for the PDF worker'sfilesarray. The streaming benefit here is bounded to the resize pipeline — the full resized image still lands in memory. This is acceptable since the resized images are much smaller than originals, but it's worth knowing the peak-memory reduction comes from resizing during streaming, not from avoiding buffering entirely.ee/packages/network-broker/src/NetworkBroker.ts (1)
146-162: Streaming action wiring looks correct.The receiving side correctly detects a readable stream via
Stream.isReadable, reconstructs the{ streamParam, details }shape fromctx.params+ctx.meta.details, and spreads into the service method. The tracer annotation is a nice touch.apps/meteor/server/services/upload/service.ts (1)
177-177: This usage is correct—Promise.withResolvers is natively supported in the project's minimum Node.js version.The project requires Node.js 22.16.0 (verified in package.json engines and volta configuration), which fully supports Promise.withResolvers natively since the feature was introduced in Node.js 22.0.0. No polyfill is needed.
Likely an incorrect or invalid review comment.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
675cf71 to
e019bad
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@apps/meteor/server/services/upload/service.ts`:
- Around line 169-205: The uploadFileFromStream implementation lacks an error
handler on the source readable (streamParam) so upstream errors can leave the
resolver.promise unresolved or create truncated files; update
uploadFileFromStream to attach a 'error' listener to streamParam that unlinks
the tempFilePath and calls resolver.reject(err) (same cleanup as writeStream
'error' handler), or replace the manual pipe logic with
stream.pipeline/stream.promises.pipeline to pipe streamParam into writeStream so
errors and cleanup are propagated automatically (keep references to
tempFilePath, writeStream.bytesWritten, and the existing
FileUpload.getStore('Uploads').insert flow).
🧹 Nitpick comments (1)
apps/meteor/server/services/upload/service.ts (1)
144-167: Stream error propagation is correctly handled.The pattern of forwarding source stream errors via
transformer.destroy(err)is the correct approach for stream error propagation.Regarding
_storeaccess: While_storeis marked aspublicin the FileUploadClass definition, the underscore convention indicates it should be treated as an internal implementation detail. The codebase uses inconsistent patterns—some areas callstore.getReadStream()directly while this code accesses_store.getReadStream(). Consider refactoring to use a publicgetReadStreammethod on FileUploadClass or accessing the store through the publicstoreproperty to avoid coupling to the internal property name.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
apps/meteor/server/services/upload/service.tsee/packages/federation-matrix/src/events/message.tsee/packages/federation-matrix/src/services/MatrixMediaService.tspackages/core-services/src/types/IUploadService.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- ee/packages/federation-matrix/src/services/MatrixMediaService.ts
- ee/packages/federation-matrix/src/events/message.ts
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{ts,tsx,js}
📄 CodeRabbit inference engine (.cursor/rules/playwright.mdc)
**/*.{ts,tsx,js}: Write concise, technical TypeScript/JavaScript with accurate typing in Playwright tests
Avoid code comments in the implementation
Files:
packages/core-services/src/types/IUploadService.tsapps/meteor/server/services/upload/service.ts
🧠 Learnings (1)
📚 Learning: 2026-01-15T22:03:35.587Z
Learnt from: d-gubert
Repo: RocketChat/Rocket.Chat PR: 38071
File: apps/meteor/app/apps/server/bridges/listeners.ts:257-271
Timestamp: 2026-01-15T22:03:35.587Z
Learning: In the file upload pipeline (apps/meteor/app/apps/server/bridges/listeners.ts), temporary files are created by the server in the same filesystem, so symlinks between temp files are safe and don't require cross-filesystem fallbacks.
Applied to files:
apps/meteor/server/services/upload/service.ts
🧬 Code graph analysis (1)
packages/core-services/src/types/IUploadService.ts (1)
packages/core-typings/src/IUpload.ts (1)
IUpload(4-67)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: 📦 Build Packages
- GitHub Check: CodeQL-Build
- GitHub Check: CodeQL-Build
🔇 Additional comments (4)
packages/core-services/src/types/IUploadService.ts (1)
34-41: Well-designed streaming API additions.The new
streamUploadedFileanduploadFileFromStreammethods are well-typed. UsingOmit<IUploadDetails, 'size'>for the stream-based upload is a good design choice since the size is only known after the stream is fully consumed.apps/meteor/server/services/upload/service.ts (3)
1-4: New imports look appropriate for the streaming functionality.The additions of
fs,Stream,sharp,Random, andUploadFSare all used by the new methods. No unused imports.Also applies to: 11-12, 22-22
183-197: Previous unhandled-rejection concern has been addressed, but the.then(resolver.resolve)pattern has a subtle difference.The past review's concern about
resolver.resolve(await ...)inside an async callback is now handled with.then(resolver.resolve).catch(...). This correctly chains rejections.Note that
.then(resolver.resolve)passes the resolved value as the first argument toresolve, which works correctly sincePromise.withResolversreturns bound functions. This is fine.
176-176: No action required. The project targets Node.js 22.16.0, which fully supportsPromise.withResolvers()(available since Node.js 22.0.0). This API is already used elsewhere in the codebase without issue.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
e019bad to
57494f3
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ee/packages/omnichannel-services/src/OmnichannelTranscript.ts (1)
407-435:⚠️ Potential issue | 🔴 CriticalCritical: Same
Readablestream is consumed by multiple uploads — second upload gets empty data.
uploadFilesis called with a singlestreamParam(line 371) and fans it out acrossroomIdsviaPromise.all(line 418). A Node.jsReadablecan only be piped/consumed once. The firstuploadFileFromStreamcall will drain the stream; the second will receive an ended stream and write a 0-byte file.This is called from
doRender(line 370) withroomIds: [rid, details.rid]— always 2 rooms.🐛 Suggested fix — upload once, copy for second room
private async uploadFiles({ streamParam, roomIds, data, transcriptText, }: { streamParam: Readable; roomIds: string[]; data: Pick<WorkerData, 'siteName' | 'visitor'>; transcriptText: string; }): Promise<IUpload[]> { + const fileName = `${transcriptText}_${data.siteName}_${new Intl.DateTimeFormat('en-US').format(new Date()).replace(/\//g, '-')}_${ + data.visitor?.name || data.visitor?.username || 'Visitor' + }.pdf`; + + // Upload once from the stream + const firstUpload = await uploadService.uploadFileFromStream({ + streamParam, + details: { + name: fileName, + type: 'application/pdf', + rid: roomIds[0], + userId: 'rocket.cat', + }, + }); + + // For subsequent rooms, re-upload from the stored file's buffer + const remainingUploads = await Promise.all( + roomIds.slice(1).map(async (roomId) => { + const buffer = await uploadService.getFileBuffer({ file: firstUpload }); + return uploadService.uploadFile({ + userId: 'rocket.cat', + buffer, + details: { + name: fileName, + type: 'application/pdf', + rid: roomId, + userId: 'rocket.cat', + size: buffer.length, + }, + }); + }), + ); + + return [firstUpload, ...remainingUploads]; - return Promise.all( - roomIds.map((roomId) => { - return uploadService.uploadFileFromStream({ - streamParam, - details: { ... }, - }); - }), - ); }
🤖 Fix all issues with AI agents
In `@apps/meteor/server/services/upload/service.ts`:
- Around line 169-209: The current uploadFileFromStream consumes the provided
Readable (streamParam) once, so when OmnichannelTranscript.uploadFiles fans the
same stream to multiple calls it fails; change the implementation so the stream
is buffered once and then re-used for each consumer: e.g., in
uploadFileFromStream (or upstream in OmnichannelTranscript.uploadFiles) read the
incoming stream to a temporary buffer/file first, then for each target room
create a fresh Readable from that buffer (or call
FileUpload.getStore('Uploads').insert multiple times using the same temp file
path) instead of piping the original stream multiple times; update references to
streamParam, tempFilePath, and the write/cleanup logic to ensure the temp file
is cleaned after all inserts complete or on error.
🧹 Nitpick comments (2)
ee/packages/network-broker/src/NetworkBroker.ts (1)
44-51: Stream detection heuristic could misfire on non-streaming calls.The check
data?.[0]?.streamParamwill match any service call whose first argument happens to contain a truthystreamParamproperty, even if it's not actually a stream-based call. While unlikely today, this is a fragile convention.Consider a more explicit marker (e.g., a
Symbolor a dedicated__streaming: trueflag) or additionally verify withStream.isReadable(data?.[0]?.streamParam)on the sender side to mirror the receiver-side check at line 147.apps/meteor/server/services/upload/service.ts (1)
144-167: Use public.storeaccessor instead of accessing internal._storeproperty.
FileUploadClassprovides a public.storeaccessor (or.getStore()method) to access the underlying Store instance. Accessing._storedirectly bypasses this public interface. Change:FileUpload.getStore('Uploads')._store.getReadStream(file._id, file)to:
FileUpload.getStore('Uploads').store.getReadStream(file._id, file)This improves consistency with the intended public API design.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
apps/meteor/server/services/upload/service.tsee/packages/federation-matrix/src/events/message.tsee/packages/federation-matrix/src/services/MatrixMediaService.tsee/packages/network-broker/src/NetworkBroker.tsee/packages/omnichannel-services/src/OmnichannelTranscript.tspackages/core-services/src/types/IUploadService.ts
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{ts,tsx,js}
📄 CodeRabbit inference engine (.cursor/rules/playwright.mdc)
**/*.{ts,tsx,js}: Write concise, technical TypeScript/JavaScript with accurate typing in Playwright tests
Avoid code comments in the implementation
Files:
ee/packages/federation-matrix/src/events/message.tsapps/meteor/server/services/upload/service.tsee/packages/federation-matrix/src/services/MatrixMediaService.tsee/packages/omnichannel-services/src/OmnichannelTranscript.tspackages/core-services/src/types/IUploadService.tsee/packages/network-broker/src/NetworkBroker.ts
🧠 Learnings (5)
📚 Learning: 2025-10-28T16:53:42.761Z
Learnt from: ricardogarim
Repo: RocketChat/Rocket.Chat PR: 37205
File: ee/packages/federation-matrix/src/FederationMatrix.ts:296-301
Timestamp: 2025-10-28T16:53:42.761Z
Learning: In the Rocket.Chat federation-matrix integration (ee/packages/federation-matrix/), the createRoom method from rocket.chat/federation-sdk will support a 4-argument signature (userId, roomName, visibility, displayName) in newer versions. Code using this 4-argument call is forward-compatible with planned library updates and should not be flagged as an error.
Applied to files:
ee/packages/federation-matrix/src/events/message.tsee/packages/federation-matrix/src/services/MatrixMediaService.tsee/packages/omnichannel-services/src/OmnichannelTranscript.ts
📚 Learning: 2025-12-09T20:01:00.324Z
Learnt from: sampaiodiego
Repo: RocketChat/Rocket.Chat PR: 37532
File: ee/packages/federation-matrix/src/FederationMatrix.ts:920-927
Timestamp: 2025-12-09T20:01:00.324Z
Learning: When reviewing federation invite handling in Rocket.Chat (specifically under ee/packages/federation-matrix), understand that rejecting an invite via federationSDK.rejectInvite() triggers an event-driven cleanup: a leave event is emitted and handled by handleLeave() in ee/packages/federation-matrix/src/events/member.ts, which calls Room.performUserRemoval() to remove the subscription. Do not add explicit cleanup in the reject branch of handleInvite(); rely on the existing leave-event flow for cleanup. If making changes, ensure this invariant remains and that any related paths still funnel cleanup through the leave event to avoid duplicate or missing removals.
Applied to files:
ee/packages/federation-matrix/src/events/message.tsee/packages/federation-matrix/src/services/MatrixMediaService.ts
📚 Learning: 2026-01-15T22:03:35.587Z
Learnt from: d-gubert
Repo: RocketChat/Rocket.Chat PR: 38071
File: apps/meteor/app/apps/server/bridges/listeners.ts:257-271
Timestamp: 2026-01-15T22:03:35.587Z
Learning: In the file upload pipeline (apps/meteor/app/apps/server/bridges/listeners.ts), temporary files are created by the server in the same filesystem, so symlinks between temp files are safe and don't require cross-filesystem fallbacks.
Applied to files:
apps/meteor/server/services/upload/service.ts
📚 Learning: 2026-01-26T18:26:01.279Z
Learnt from: d-gubert
Repo: RocketChat/Rocket.Chat PR: 38227
File: apps/meteor/app/api/server/router.ts:44-49
Timestamp: 2026-01-26T18:26:01.279Z
Learning: In apps/meteor/app/api/server/router.ts, when retrieving bodyParams and queryParams from the Hono context via c.get(), do not add defensive defaults (e.g., ?? {}). The code should fail fast if these parameters are missing, as endpoint handlers expect them to be present and breaking here helps surface parsing problems rather than hiding them.
Applied to files:
apps/meteor/server/services/upload/service.ts
📚 Learning: 2025-11-16T19:09:43.823Z
Learnt from: d-gubert
Repo: RocketChat/Rocket.Chat PR: 37496
File: packages/apps-engine/tests/server/runtime/deno/LivenessManager.spec.ts:3-3
Timestamp: 2025-11-16T19:09:43.823Z
Learning: In Node.js, `EventEmitter` can be imported from either the 'events' module or the 'stream' module—both export the same reference. While 'events' is the canonical module, importing from 'stream' is valid and works correctly.
Applied to files:
ee/packages/network-broker/src/NetworkBroker.ts
🧬 Code graph analysis (2)
apps/meteor/server/services/upload/service.ts (2)
packages/core-typings/src/IUpload.ts (1)
IUpload(4-67)apps/meteor/app/file-upload/server/lib/FileUpload.ts (1)
FileUpload(113-699)
ee/packages/omnichannel-services/src/OmnichannelTranscript.ts (1)
packages/core-typings/src/IUpload.ts (1)
IUpload(4-67)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: 📦 Build Packages
- GitHub Check: CodeQL-Build
- GitHub Check: CodeQL-Build
🔇 Additional comments (6)
ee/packages/federation-matrix/src/events/message.ts (1)
45-51: LGTM!The metadata object now aligns with the
IUploadDetailsshape (ridinstead ofroomId), and the fallback defaults forsizeandtypeare sensible.ee/packages/federation-matrix/src/services/MatrixMediaService.ts (1)
83-109: LGTM!The signature now accepts the typed
IUploadDetailsand correctly spreads it into the details object, withsizeproperly overridden bybuffer.lengthto reflect the actual downloaded content size.packages/core-services/src/types/IUploadService.ts (1)
34-41: Well-designed streaming API surface.
Omit<IUploadDetails, 'size'>foruploadFileFromStreamcorrectly reflects that size is unknown until the stream completes. The optionalimageResizeOptsonstreamUploadedFilekeeps the interface clean.ee/packages/network-broker/src/NetworkBroker.ts (1)
146-162: LGTM — receiver-side stream reconstruction is correct.The
Stream.isReadablecheck on the receiver side properly gates the stream path, and the params are reconstructed to match the original method signature. The tracer label annotation with(streaming)is a nice touch for observability.ee/packages/omnichannel-services/src/OmnichannelTranscript.ts (1)
243-246: Streaming benefit for individual images is limited to resize-during-stream.The stream is immediately collected back into a buffer via
streamToBuffer. The main win here is thatsharpresizes during the stream (400×240) so only the smaller image is buffered, rather than the full-resolution original. This is a valid optimization for large images.apps/meteor/server/services/upload/service.ts (1)
176-176: No action required. The project explicitly requires Node.js 22.16.0 inpackage.jsonengines, which supportsPromise.withResolvers()natively. The code is compatible with the target runtime.Likely an incorrect or invalid review comment.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| async uploadFileFromStream({ | ||
| streamParam, | ||
| details, | ||
| }: { | ||
| streamParam: Stream.Readable; | ||
| details: Omit<IUploadDetails, 'size'>; | ||
| }): Promise<IUpload> { | ||
| const resolver = Promise.withResolvers<IUpload>(); | ||
|
|
||
| const tempFilePath = UploadFS.getTempFilePath(Random.id()); | ||
|
|
||
| const writeStream = fs.createWriteStream(tempFilePath); | ||
| streamParam.pipe(writeStream); | ||
|
|
||
| const cleanup = (err: unknown) => { | ||
| fs.promises.unlink(tempFilePath).catch(() => undefined); | ||
| resolver.reject(err); | ||
| }; | ||
|
|
||
| writeStream.on('finish', async () => { | ||
| FileUpload.getStore('Uploads') | ||
| .insert( | ||
| { | ||
| ...details, | ||
| size: writeStream.bytesWritten, | ||
| }, | ||
| tempFilePath, | ||
| ) | ||
| .then(resolver.resolve) | ||
| .catch(cleanup); | ||
| }); | ||
|
|
||
| streamParam.on('error', async (err) => { | ||
| writeStream.destroy(); | ||
| cleanup(err); | ||
| }); | ||
|
|
||
| writeStream.on('error', cleanup); | ||
|
|
||
| return resolver.promise; | ||
| } |
There was a problem hiding this comment.
Sharing a single Readable across multiple consumers will fail.
This method is called from OmnichannelTranscript.uploadFiles (line 420-432 of OmnichannelTranscript.ts), which maps over roomIds and calls uploadFileFromStream for each room using the same streamParam. A Readable stream can only be consumed once — the first .pipe() will drain it, and subsequent calls will receive an already-ended (or empty) stream.
In OmnichannelTranscript.ts lines 370-375, a single Readable.from(stream) is created and passed to uploadFiles, which then fans it out via Promise.all to multiple rooms. The second upload will get no data.
🐛 Possible approaches
Option A: Buffer the stream once, then upload from the buffer to each room:
private async uploadFiles({
- streamParam,
+ buffer,
roomIds,
...
}) {
return Promise.all(
roomIds.map((roomId) => {
- return uploadService.uploadFileFromStream({ streamParam, details: { ... } });
+ return uploadService.uploadFile({ buffer, details: { ..., size: buffer.length } });
}),
);
}Option B: Clone/tee the stream for each consumer (e.g., using stream.PassThrough or buffering once then wrapping with Readable.from()).
Option C: Upload once, then copy the upload record for the second room.
🤖 Prompt for AI Agents
In `@apps/meteor/server/services/upload/service.ts` around lines 169 - 209, The
current uploadFileFromStream consumes the provided Readable (streamParam) once,
so when OmnichannelTranscript.uploadFiles fans the same stream to multiple calls
it fails; change the implementation so the stream is buffered once and then
re-used for each consumer: e.g., in uploadFileFromStream (or upstream in
OmnichannelTranscript.uploadFiles) read the incoming stream to a temporary
buffer/file first, then for each target room create a fresh Readable from that
buffer (or call FileUpload.getStore('Uploads').insert multiple times using the
same temp file path) instead of piping the original stream multiple times;
update references to streamParam, tempFilePath, and the write/cleanup logic to
ensure the temp file is cleaned after all inserts complete or on error.

Proposed changes (including videos or screenshots)
Issue(s)
Steps to test or reproduce
Further comments
Memory Measurements
programs/server: +822.7 MiBapps/omnichannel-transcript: +980.2 MiBprograms/server: +426.8 MiBapps/omnichannel-transcript: +152.6 MiBConclusions
Extra
Summary by CodeRabbit
New Features
Improvements