kraken: use zenoh to show extension logs#3803
kraken: use zenoh to show extension logs#3803nicoschmdt wants to merge 4 commits intobluerobotics:masterfrom
Conversation
Reviewer's GuideSwitches extension log viewing from an HTTP streaming endpoint to a zenoh-based flow with a dedicated logs modal, shared frontend zenoh session, and a backend zenoh queryable that serves historical logs and provides the topic for live streaming. Sequence diagram for zenoh-based extension log retrieval and streamingsequenceDiagram
actor User
participant ExtensionManagerView
participant ExtensionLogsModal
participant KrakenManager
participant ZenohManager
participant ZenohRouter_backend
participant ExtensionHandlers
participant ContainerManager
participant ExtensionLogPublisher
User->>ExtensionManagerView: Click showLogs for extension
ExtensionManagerView->>ExtensionLogsModal: Open with extensionIdentifier, extensionName
ExtensionLogsModal->>KrakenManager: getHistoricalLogsForExtension(identifier, timeout)
KrakenManager->>ZenohManager: query(kraken/extension/logs/request?extension_name=identifier, BestMatching, timeout)
ZenohManager->>ZenohRouter_backend: zenoh get on key kraken/extension/logs/request
ZenohRouter_backend->>ExtensionHandlers: logs_request_handler(extension_name)
ExtensionHandlers->>ContainerManager: get_container_historical_logs(container_name)
ContainerManager-->>ExtensionHandlers: List[str] raw_logs
ExtensionHandlers->>ExtensionLogPublisher: _topic_for(extension)
ExtensionLogPublisher-->>ExtensionHandlers: topic string
ExtensionHandlers-->>ZenohRouter_backend: {messages, total_lines, topic}
ZenohRouter_backend-->>ZenohManager: JSON payload
ZenohManager-->>KrakenManager: Parsed response object
KrakenManager-->>ExtensionLogsModal: {messages, topic}
ExtensionLogsModal->>ExtensionLogsModal: Render historical log lines
ExtensionLogsModal->>KrakenManager: createExtensionLogsSubscriber(topic, handler)
KrakenManager->>ZenohManager: subscriber(topic, handler)
ZenohManager->>ExtensionLogPublisher: declare_subscriber(topic)
ExtensionLogPublisher-->>ZenohManager: Subscriber
ZenohManager-->>KrakenManager: Subscriber instance
KrakenManager-->>ExtensionLogsModal: modal_subscriber set
loop For each live log line
ExtensionLogPublisher-->>ZenohManager: Sample payload(message)
ZenohManager-->>ExtensionLogsModal: Invoke handler(sample)
ExtensionLogsModal->>ExtensionLogsModal: Buffer and flush to modal_messages
ExtensionLogsModal->>ExtensionLogsModal: Scroll to bottom if follow_logs
end
User->>ExtensionLogsModal: Toggle follow_logs / Download / Close
ExtensionLogsModal->>ExtensionLogsModal: downloadCurrentLog() build file
ExtensionLogsModal->>ExtensionLogsModal: cleanup(), undeclare subscriber
Class diagram for updated frontend and backend log handlingclassDiagram
class ExtensionManagerView {
+boolean show_log
+string selected_log_extension_identifier
+string selected_log_extension_name
+Session session
+mounted()
+destroyed()
+initializeZenohSession()
+showLogs(extension)
}
class ExtensionLogsModal {
<<VueComponent>>
+boolean value
+string extensionIdentifier
+string extensionName
+LogMessage[] modal_messages
+Subscriber modal_subscriber
+string current_modal_topic
+string modal_error
+boolean requesting_logs
+number query_timeout
+boolean follow_logs
+boolean scroll_pending
+LogMessage[] message_buffer
+number buffer_flush_timer
+openModal()
+closeModal()
+cleanup()
+flushMessageBuffer()
+setupModalSubscriber(topic)
+handleSubscriber(sample)
+requestHistoricalLogsForExtension(identifier)
+isMessageEmpty(msg) bool
+extractLogMessage(msg) string
+formatLogMessage(msg) string
+scheduleScroll()
+scrollToBottom()
+setErrorAndStop(message)
+downloadCurrentLog()
}
class ZenohManager {
-static ZenohManager instance
-Promise~Session~ sessionPromise
+static getInstance() ZenohManager
+getSession() Promise~Session~
+query(key, target, timeout) Promise~any~
+subscriber(topic, handler) Promise~Subscriber~
}
class KrakenManager {
+getHistoricalLogsForExtension(identifier, timeout) Promise~any~
+createExtensionLogsSubscriber(extensionIdentifier, subscriberHandler) Promise~Subscriber~
}
class ContainerManager {
+get_container_log_by_name(container_name) AsyncGenerator~str~
+get_container_historical_logs(container_name) List~str~
+get_containers_stats() Dict
}
class ExtensionSettings {
+string identifier
+string name
+boolean enabled
+container_name() string
}
class SettingsV2 {
+List~ExtensionSettings~ extensions
}
class SettingsFunctions {
+get_extension_settings() List~ExtensionSettings~
}
class ExtensionLogPublisher {
+_collect_desired_streams()
+_topic_for(extension) string
+_extract_level(raw_line) (level, rest)
}
class ZenohRouter {
+add_routes_to_zenoh(application)
+add_queryable(key, handler)
}
class ExtensionHandlers {
-ZenohRouter router
+ExtensionHandlers(router)
+logs_request_handler(extension_name) dict
+register_queryables()
}
class ZenohApp {
+ZenohSession session
+ZenohRouter router
+ExtensionHandlers extension_handlers
}
ExtensionManagerView --> ExtensionLogsModal : uses
ExtensionManagerView --> ZenohManager : initializes Session
ExtensionLogsModal --> KrakenManager : uses
KrakenManager --> ZenohManager : delegates query, subscriber
SettingsFunctions --> SettingsV2 : reads
SettingsFunctions --> ExtensionSettings : returns list
ExtensionLogPublisher --> ContainerManager : reads container logs (streaming)
ExtensionHandlers --> ContainerManager : get_container_historical_logs
ExtensionHandlers --> ExtensionLogPublisher : _topic_for, _extract_level
ExtensionHandlers --> SettingsFunctions : get_extension_settings
ExtensionHandlers --> ZenohRouter : register_queryables
ZenohApp --> ZenohRouter
ZenohApp --> ZenohSession
ZenohApp --> ExtensionHandlers
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The historical logs path (
get_container_historical_logs→logs_request_handler) returns the full log list and sends all lines over Zenoh in one response; consider enforcing a maximum number of lines or a time/size window to prevent excessive memory usage and very large payloads for long-running containers. - In
ExtensionLogsModal.cleanup,undeclare()is called without awaiting while other code paths (setupModalSubscriber) await it; for consistency and to avoid dangling subscriptions, consider centralizing subscriber teardown in a single async helper that is always awaited from async contexts and called synchronously only when absolutely necessary.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The historical logs path (`get_container_historical_logs` → `logs_request_handler`) returns the full log list and sends all lines over Zenoh in one response; consider enforcing a maximum number of lines or a time/size window to prevent excessive memory usage and very large payloads for long-running containers.
- In `ExtensionLogsModal.cleanup`, `undeclare()` is called without awaiting while other code paths (`setupModalSubscriber`) await it; for consistency and to avoid dangling subscriptions, consider centralizing subscriber teardown in a single async helper that is always awaited from async contexts and called synchronously only when absolutely necessary.
## Individual Comments
### Comment 1
<location> `core/services/kraken/harbor/container.py:127-134` </location>
<code_context>
logger.info(f"Finished streaming logs for {container_name}")
+ @classmethod
+ async def get_container_historical_logs(cls, container_name: str) -> List[str]:
+ async with DockerCtx() as client:
+ try:
+ container = await cls.get_raw_container_by_name(client, container_name)
+ except ContainerNotFound as error:
+ raise StackedHTTPException(status_code=status.HTTP_404_NOT_FOUND, error=error) from error
+
+ return await container.log(stdout=True, stderr=True, follow=False, stream=False) # type: ignore
+
@classmethod
</code_context>
<issue_to_address>
**issue (bug_risk):** The type hint suggests a list of strings, but `container.log(..., stream=False)` may return bytes or a single string, which can break downstream processing.
`get_container_historical_logs` is annotated as `List[str]`, and `logs_request_handler` iterates over `raw_logs` assuming each element is a full line of text. However, `container.log(..., stream=False)` may return a single `str`/`bytes` blob or `bytes`, not a list of lines. That can cause iteration over bytes/characters and type issues.
To keep the contract accurate, normalize the result before returning:
- Decode `bytes` to `str` (e.g., UTF-8).
- If you get a single string, split it into lines (e.g., `splitlines()`) so the method always returns `List[str]`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
core/frontend/src/components/kraken/modals/ExtensionLogsModal.vue
Outdated
Show resolved
Hide resolved
3388171 to
8b904aa
Compare
8b904aa to
5ba4952
Compare
5ba4952 to
6253510
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
f5f2909 to
83e6771
Compare
83e6771 to
c9613c2
Compare
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- In
ExtensionLogsModal.setupModalSubscriberyou pass atopicstring intokraken.createExtensionLogsSubscriber, which itself prefixesextensions/logs/; when the backend returns a full topic (via_topic_for), this will produceextensions/logs/extensions/logs/..., so consider changingcreateExtensionLogsSubscriberto accept a full topic or adding a separate helper that does not re-prefix. - In
ExtensionLogsModal.downloadCurrentLogthe filename usesthis.extensionName, which can be empty; it would be more robust to fall back toextensionIdentifier(or a sanitized combination) so the downloaded log file always has a meaningful name. - The
ZenohManager.querymethod assumesgetSession()has been called elsewhere; to avoid intermittentsession not initializederrors when used from new callers, consider lazily initializing the session insidequery()(e.g., callinggetSession()ifsessionPromiseis null) instead of just logging and returning null.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `ExtensionLogsModal.setupModalSubscriber` you pass a `topic` string into `kraken.createExtensionLogsSubscriber`, which itself prefixes `extensions/logs/`; when the backend returns a full topic (via `_topic_for`), this will produce `extensions/logs/extensions/logs/...`, so consider changing `createExtensionLogsSubscriber` to accept a full topic or adding a separate helper that does not re-prefix.
- In `ExtensionLogsModal.downloadCurrentLog` the filename uses `this.extensionName`, which can be empty; it would be more robust to fall back to `extensionIdentifier` (or a sanitized combination) so the downloaded log file always has a meaningful name.
- The `ZenohManager.query` method assumes `getSession()` has been called elsewhere; to avoid intermittent `session not initialized` errors when used from new callers, consider lazily initializing the session inside `query()` (e.g., calling `getSession()` if `sessionPromise` is null) instead of just logging and returning null.
## Individual Comments
### Comment 1
<location path="core/frontend/src/libs/zenoh/index.ts" line_range="54-65" />
<code_context>
+ setTimeout(() => resolve(null), timeout)
+ })
+
+ const replyPromise = receiver.receive()
+ const reply = await Promise.race([replyPromise, timeoutPromise])
+
+ if (reply === null || reply === RecvErr.Disconnected) {
+ console.error('Query timeout: No response from zenoh queryable. '
+ + 'The service may be unavailable or the extension may not exist.')
+ return null
+ }
+
+ const payload = (reply as { result: () => Sample }).result()
+ try {
+ return JSON.parse(payload.payload().to_string())
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Defensive handling of `receiver.receive()` result would avoid crashes when timeouts or errors occur.
Since `reply` can also be `RecvErr.Disconnected` (and possibly other error variants), casting it to `{ result: () => Sample }` and calling `.result()` risks a runtime throw when `reply` is an error. Please branch on the actual shape/value first (e.g., distinguish success vs error/timeout cases and only call `.result()` and parse JSON when a valid `Sample` is present).
```suggestion
const replyPromise = receiver.receive()
const reply = await Promise.race([replyPromise, timeoutPromise])
if (reply === null || reply === RecvErr.Disconnected) {
console.error('Query timeout: No response from zenoh queryable. '
+ 'The service may be unavailable or the extension may not exist.')
return null
}
// Defensive handling: ensure reply is a successful result before accessing .result()
if (!reply || typeof (reply as any).result !== 'function') {
console.error('Unexpected reply from zenoh queryable. '
+ 'Expected a successful result but received:', reply)
return null
}
const payload = (reply as { result: () => Sample }).result()
try {
return JSON.parse(payload.payload().to_string())
```
</issue_to_address>
### Comment 2
<location path="core/frontend/src/components/kraken/modals/ExtensionLogsModal.vue" line_range="239-245" />
<code_context>
+ }, BUFFER_FLUSH_INTERVAL_MS)
+ }
+ },
+ async requestHistoricalLogsForExtension(identifier: string) {
+ this.requesting_logs = true
+ this.modal_error = null
+ try {
+ const response = await kraken.getHistoricalLogsForExtension(identifier, this.query_timeout)
+
+ if (response.error) {
+ const errorSuffix = response.error_type ? ` (${response.error_type})` : ''
+ this.setErrorAndStop(`Error from queryable: ${response.error}${errorSuffix}`)
</code_context>
<issue_to_address>
**issue:** Handle the case where the Zenoh query returns `null` (e.g. timeout) before dereferencing `response`.
Since `kraken.getHistoricalLogsForExtension` may return `null` on timeout/failure, directly accessing `response.error`/`response.messages` can throw. Add a null check (e.g. `if (!response) { this.setErrorAndStop('No response from logs service (timeout or connection issue)'); return }`) before using the response to keep the modal resilient to network/service issues.
</issue_to_address>
### Comment 3
<location path="core/frontend/src/components/kraken/modals/ExtensionLogsModal.vue" line_range="308-314" />
<code_context>
+ this.modal_error = message
+ this.requesting_logs = false
+ },
+ downloadCurrentLog() {
+ const logContent = this.modal_messages
+ .map((msg) => String(msg.message || ''))
+ .join('\n')
+ const file = new File([logContent], `${this.extensionName}.log`, { type: 'text/plain' })
+ saveAs(file)
+ },
</code_context>
<issue_to_address>
**suggestion:** Fallback to the extension identifier when building the log file name to avoid empty base names.
Given the default allows `extensionName` to be an empty string, this will generate `.log`. Consider falling back to `extensionIdentifier`, e.g.:
```js
const base = this.extensionName || this.extensionIdentifier
const file = new File([logContent], `${base}.log`, { type: 'text/plain' })
```
This avoids empty/hidden filenames and produces more meaningful log names.
```suggestion
downloadCurrentLog() {
const logContent = this.modal_messages
.map((msg) => String(msg.message || ''))
.join('\n')
const baseName = this.extensionName || this.extensionIdentifier
const file = new File([logContent], `${baseName}.log`, { type: 'text/plain' })
saveAs(file)
},
```
</issue_to_address>
### Comment 4
<location path="core/services/kraken/harbor/container.py" line_range="127-134" />
<code_context>
logger.info(f"Finished streaming logs for {container_name}")
+ @classmethod
+ async def get_container_historical_logs(cls, container_name: str) -> List[str]:
+ async with DockerCtx() as client:
+ try:
+ container = await cls.get_raw_container_by_name(client, container_name)
+ except ContainerNotFound as error:
+ raise StackedHTTPException(status_code=status.HTTP_404_NOT_FOUND, error=error) from error
+
+ return await container.log(stdout=True, stderr=True, follow=False, stream=False) # type: ignore
+
@classmethod
</code_context>
<issue_to_address>
**issue (bug_risk):** Align the expected type of `container.log` with how it is consumed (string vs list of lines).
`get_container_historical_logs` is typed and used as returning `List[str]` (and `logs_request_handler` iterates `for raw_line in raw_logs`), but `container.log(..., stream=False)` usually returns a single string/bytes. That means consumers will iterate over characters, not lines. Either use `stream=True` and collect the async iterator into a list of lines, or keep `stream=False` and call `splitlines()` before returning, and then update/confirm the return type accordingly.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| const replyPromise = receiver.receive() | ||
| const reply = await Promise.race([replyPromise, timeoutPromise]) | ||
|
|
||
| if (reply === null || reply === RecvErr.Disconnected) { | ||
| console.error('Query timeout: No response from zenoh queryable. ' | ||
| + 'The service may be unavailable or the extension may not exist.') | ||
| return null | ||
| } | ||
|
|
||
| const payload = (reply as { result: () => Sample }).result() | ||
| try { | ||
| return JSON.parse(payload.payload().to_string()) |
There was a problem hiding this comment.
suggestion (bug_risk): Defensive handling of receiver.receive() result would avoid crashes when timeouts or errors occur.
Since reply can also be RecvErr.Disconnected (and possibly other error variants), casting it to { result: () => Sample } and calling .result() risks a runtime throw when reply is an error. Please branch on the actual shape/value first (e.g., distinguish success vs error/timeout cases and only call .result() and parse JSON when a valid Sample is present).
| const replyPromise = receiver.receive() | |
| const reply = await Promise.race([replyPromise, timeoutPromise]) | |
| if (reply === null || reply === RecvErr.Disconnected) { | |
| console.error('Query timeout: No response from zenoh queryable. ' | |
| + 'The service may be unavailable or the extension may not exist.') | |
| return null | |
| } | |
| const payload = (reply as { result: () => Sample }).result() | |
| try { | |
| return JSON.parse(payload.payload().to_string()) | |
| const replyPromise = receiver.receive() | |
| const reply = await Promise.race([replyPromise, timeoutPromise]) | |
| if (reply === null || reply === RecvErr.Disconnected) { | |
| console.error('Query timeout: No response from zenoh queryable. ' | |
| + 'The service may be unavailable or the extension may not exist.') | |
| return null | |
| } | |
| // Defensive handling: ensure reply is a successful result before accessing .result() | |
| if (!reply || typeof (reply as any).result !== 'function') { | |
| console.error('Unexpected reply from zenoh queryable. ' | |
| + 'Expected a successful result but received:', reply) | |
| return null | |
| } | |
| const payload = (reply as { result: () => Sample }).result() | |
| try { | |
| return JSON.parse(payload.payload().to_string()) |
| async requestHistoricalLogsForExtension(identifier: string) { | ||
| this.requesting_logs = true | ||
| this.modal_error = null | ||
| try { | ||
| const response = await kraken.getHistoricalLogsForExtension(identifier, this.query_timeout) | ||
|
|
||
| if (response.error) { |
There was a problem hiding this comment.
issue: Handle the case where the Zenoh query returns null (e.g. timeout) before dereferencing response.
Since kraken.getHistoricalLogsForExtension may return null on timeout/failure, directly accessing response.error/response.messages can throw. Add a null check (e.g. if (!response) { this.setErrorAndStop('No response from logs service (timeout or connection issue)'); return }) before using the response to keep the modal resilient to network/service issues.
| downloadCurrentLog() { | ||
| const logContent = this.modal_messages | ||
| .map((msg) => String(msg.message || '')) | ||
| .join('\n') | ||
| const file = new File([logContent], `${this.extensionName}.log`, { type: 'text/plain' }) | ||
| saveAs(file) | ||
| }, |
There was a problem hiding this comment.
suggestion: Fallback to the extension identifier when building the log file name to avoid empty base names.
Given the default allows extensionName to be an empty string, this will generate .log. Consider falling back to extensionIdentifier, e.g.:
const base = this.extensionName || this.extensionIdentifier
const file = new File([logContent], `${base}.log`, { type: 'text/plain' })This avoids empty/hidden filenames and produces more meaningful log names.
| downloadCurrentLog() { | |
| const logContent = this.modal_messages | |
| .map((msg) => String(msg.message || '')) | |
| .join('\n') | |
| const file = new File([logContent], `${this.extensionName}.log`, { type: 'text/plain' }) | |
| saveAs(file) | |
| }, | |
| downloadCurrentLog() { | |
| const logContent = this.modal_messages | |
| .map((msg) => String(msg.message || '')) | |
| .join('\n') | |
| const baseName = this.extensionName || this.extensionIdentifier | |
| const file = new File([logContent], `${baseName}.log`, { type: 'text/plain' }) | |
| saveAs(file) | |
| }, |
| async def get_container_historical_logs(cls, container_name: str) -> List[str]: | ||
| async with DockerCtx() as client: | ||
| try: | ||
| container = await cls.get_raw_container_by_name(client, container_name) | ||
| except ContainerNotFound as error: | ||
| raise StackedHTTPException(status_code=status.HTTP_404_NOT_FOUND, error=error) from error | ||
|
|
||
| return await container.log(stdout=True, stderr=True, follow=False, stream=False) # type: ignore |
There was a problem hiding this comment.
issue (bug_risk): Align the expected type of container.log with how it is consumed (string vs list of lines).
get_container_historical_logs is typed and used as returning List[str] (and logs_request_handler iterates for raw_line in raw_logs), but container.log(..., stream=False) usually returns a single string/bytes. That means consumers will iterate over characters, not lines. Either use stream=True and collect the async iterator into a list of lines, or keep stream=False and call splitlines() before returning, and then update/confirm the return type accordingly.
Summary by Sourcery
Switch extension log viewing from an HTTP container log stream to a Zenoh-based flow that serves historical logs via a queryable and live logs via subscriptions, with a new dedicated frontend modal for log display and download.
New Features:
Enhancements:
Note
Medium Risk
Touches both UI and backend messaging paths for logs and introduces a new Zenoh queryable, so failures could break log visibility or add load when fetching large historical logs.
Overview
Switches extension log viewing from the
GET /container/{name}/logstreaming path to a Zenoh-based flow that queries historical logs and subscribes to live log topics.Frontend adds a dedicated
ExtensionLogsModal(follow + download, buffering/limits, error handling) and refactorsExtensionManagerViewto open it and manage a shared ZenohSession; the oldgetContainerLogsAPI call and client-side streaming/parsing code are removed.Backend registers a new Zenoh queryable (
extension/logs/request) that resolves an extension, returns recent container logs plus the topic to subscribe to, and addsContainerManager.get_container_historical_logsto support non-streaming log retrieval.Written by Cursor Bugbot for commit 6253510. This will update automatically on new commits. Configure here.