Skip to content

move extension and container endpoints to zenoh #3907

Open
nicoschmdt wants to merge 7 commits intobluerobotics:masterfrom
nicoschmdt:add-zenoh-extension-methods
Open

move extension and container endpoints to zenoh #3907
nicoschmdt wants to merge 7 commits intobluerobotics:masterfrom
nicoschmdt:add-zenoh-extension-methods

Conversation

@nicoschmdt
Copy link
Copy Markdown
Collaborator

@nicoschmdt nicoschmdt commented Apr 28, 2026

Summary by Sourcery

Migrate kraken extension and container management from HTTP endpoints to zenoh-based queryables and streaming, updating both backend handlers and frontend consumers.

New Features:

  • Add zenoh-based streaming install progress channel for extensions with backend publishers and frontend subscription via a deferred-based installer API.

Enhancements:

  • Replace kraken extension and container CRUD and metrics REST calls with zenoh queries on both backend and frontend.
  • Introduce an asyncio-driven zenoh session loop with coroutine submission utilities and reusable publishers in the common zenoh helper.
  • Refactor kraken backend into distinct extension and container zenoh handlers for installs, lifecycle operations, logs, container listing, and stats.
  • Simplify the extension manager view to rely solely on kraken APIs for zenoh access, removing direct zenoh session handling and centralizing install progress tracking.
  • Add a generic deferred utility on the frontend to coordinate async install completion and timeouts.

@nicoschmdt nicoschmdt marked this pull request as draft April 28, 2026 13:21
Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • ExtensionManagerView now calls kraken.fetchInstalledExtensions(), but fetchInstalledExtensions is not included in the default export of KrakenManager, so this will be undefined; either add fetchInstalledExtensions to the default export or adjust the call site to use the exported name.
  • The frontend installExtension helper always returns { status: 'success' } without waiting for an explicit success signal from the backend stream; consider having the backend install_handler yield a final success message and wiring the frontend to resolve only when that is received (and to fail on stream timeout/no replies).
  • Error handling for fetchRunningContainers and fetchContainersStats was simplified to silently ignore failures (other than setting flags); if user feedback on failures is still desired, consider reintroducing notifier calls or another visible error path for these operations.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- ExtensionManagerView now calls `kraken.fetchInstalledExtensions()`, but `fetchInstalledExtensions` is not included in the default export of KrakenManager, so this will be `undefined`; either add `fetchInstalledExtensions` to the default export or adjust the call site to use the exported name.
- The frontend `installExtension` helper always returns `{ status: 'success' }` without waiting for an explicit success signal from the backend stream; consider having the backend `install_handler` yield a final success message and wiring the frontend to resolve only when that is received (and to fail on stream timeout/no replies).
- Error handling for `fetchRunningContainers` and `fetchContainersStats` was simplified to silently ignore failures (other than setting flags); if user feedback on failures is still desired, consider reintroducing notifier calls or another visible error path for these operations.

## Individual Comments

### Comment 1
<location path="core/frontend/src/components/kraken/KrakenManager.ts" line_range="311-314" />
<code_context>
-    url: `${KRAKEN_API_V2_URL}/extension/upload/keep-alive?temp_tag=${tempTag}`,
-    timeout: 10000,
-  })
+export async function keepTemporaryExtensionAlive(tempTag: string): Promise<any | null> {
+  await zenoh.query(
+    `kraken/extension/upload/keep-alive?temp_tag=${tempTag}`, 
+    QueryTarget.BestMatching
+  )
 }
</code_context>
<issue_to_address>
**issue (bug_risk):** Return type of `keepTemporaryExtensionAlive` no longer matches the actual return value.

The function is typed as `Promise<any | null>`, but it only `await`s `zenoh.query` and never returns its result, so callers actually get `void`. Either return the result of `zenoh.query(...)` (e.g. `return await zenoh.query(...)`) or change the return type to `Promise<void>` to match the current behavior.
</issue_to_address>

### Comment 2
<location path="core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py" line_range="135-136" />
<code_context>
-                    if response is not None:
-                        query.reply(query.selector.key_expr, json.dumps(response, default=str))
+                    payload_args = (bytes(query.payload) if query.payload else b"",) if has_payload else ()
+                    result = func(*payload_args, **params)
+                    if inspect.isasyncgen(result):
+                        async for item in result:
+                            if item is not None:
</code_context>
<issue_to_address>
**issue (bug_risk):** `inspect.isasyncgen` usage will fail without importing `inspect`.

`inspect.isasyncgen(result)` is used here but `inspect` isn’t imported in this module, so this will raise a `NameError` at runtime for any queryable call and break all zenoh handlers registered via this router. Please ensure `inspect` is imported (e.g. `import inspect`) or otherwise made available in this scope.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread core/frontend/src/components/kraken/KrakenManager.ts Outdated
Comment thread core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py Outdated
@nicoschmdt nicoschmdt force-pushed the add-zenoh-extension-methods branch 14 times, most recently from d4e1b43 to e6ab490 Compare April 30, 2026 16:28
@nicoschmdt nicoschmdt force-pushed the add-zenoh-extension-methods branch 2 times, most recently from 36d6d64 to 36a3630 Compare April 30, 2026 18:06
@nicoschmdt nicoschmdt force-pushed the add-zenoh-extension-methods branch from 36a3630 to 86f9d8d Compare April 30, 2026 18:59
@nicoschmdt nicoschmdt marked this pull request as ready for review April 30, 2026 19:39
Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • In KrakenManager.ts, the new installExtension implementation relies on Subscriber, Sample, QueryTarget, and a zenoh client that are not imported in the shown diff; make sure these symbols are still imported (or re-exported) after the refactor or the file will no longer type-check.
  • Frontend zenoh calls now inconsistently handle error payloads (e.g., getContainersStats checks for stats.error while enableExtension/disableExtension/uninstallExtension ignore any reply); consider standardizing how zenoh query responses and error fields are handled so user-facing operations can surface backend failures reliably.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `KrakenManager.ts`, the new `installExtension` implementation relies on `Subscriber`, `Sample`, `QueryTarget`, and a `zenoh` client that are not imported in the shown diff; make sure these symbols are still imported (or re-exported) after the refactor or the file will no longer type-check.
- Frontend zenoh calls now inconsistently handle error payloads (e.g., `getContainersStats` checks for `stats.error` while `enableExtension`/`disableExtension`/`uninstallExtension` ignore any reply); consider standardizing how zenoh query responses and `error` fields are handled so user-facing operations can surface backend failures reliably.

## Individual Comments

### Comment 1
<location path="core/frontend/src/components/kraken/KrakenManager.ts" line_range="292-294" />
<code_context>
  * @param {string} identifier The identifier of the extension
  * @param {string} tag The tag of the extension
  */
 export async function enableExtension(identifier: string, tag: string): Promise<void> {
-  await back_axios({
-    method: 'POST',
-    url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${tag}/enable`,
-    timeout: 10000,
-  })
+  await zenoh.query(
+    `${KRAKEN_BASE_ZENOH}/extension/enable?identifier=${identifier};tag=${tag}`,
+    QueryTarget.BestMatching,
+    10000,
</code_context>
<issue_to_address>
**issue (bug_risk):** URL parameters for zenoh queries are not encoded, which may break identifiers/tags with special characters.

Here `identifier` and `tag` are directly interpolated into the zenoh selector. If they contain `;`, `?`, `&`, spaces, or other reserved characters, the query can be malformed or misparsed. Please URL-encode these fields (e.g., with `encodeURIComponent`, as in `buildInstallQueryKey`) in `enableExtension`, `disableExtension`, `uninstallExtension`, and `restartExtension` so that arbitrary identifiers/tags are handled correctly.
</issue_to_address>

### Comment 2
<location path="core/frontend/src/components/kraken/KrakenManager.ts" line_range="316-293" />
<code_context>
-    method: 'DELETE',
-    url: `${KRAKEN_API_V2_URL}/extension/${identifier}`,
-  })
+export async function uninstallExtension(identifier: string, tag?: string): Promise<void> {
+  let queryKey = `${KRAKEN_BASE_ZENOH}/extension/uninstall?identifier=${identifier}`
+  if (tag) queryKey += `;tag=${tag}`
+
+  await zenoh.query(
+    queryKey,
+    QueryTarget.BestMatching,
</code_context>
<issue_to_address>
**suggestion (bug_risk):** uninstallExtension zenoh query omits a timeout, which can hang longer than the caller-supplied timeout semantics used elsewhere.

Other zenoh queries here (`installExtension`, `enableExtension`, `disableExtension`, `restartExtension`, `fetchInstalledExtensions`, `listContainers`, `getContainersStats`) all set a timeout, but `uninstallExtension` does not. If the backend or network hangs, this call can block indefinitely. Please add an explicit timeout (e.g. the same 10s used by enable/disable/restart) to this query for consistent behavior.

Suggested implementation:

```typescript
export async function uninstallExtension(identifier: string, tag?: string): Promise<void> {
  let queryKey = `${KRAKEN_BASE_ZENOH}/extension/uninstall?identifier=${identifier}`
  if (tag) queryKey += `;tag=${tag}`

  const timeout = 10_000

  await zenoh.query(
    queryKey,
    QueryTarget.BestMatching,
    undefined,
    undefined,
    timeout,
  )
}

```

If this file already defines or imports a shared zenoh query timeout constant (e.g. `const ZENOH_QUERY_TIMEOUT = 10_000` or similar) and other functions like `enableExtension` / `disableExtension` use it, you should:
1. Replace `const timeout = 10_000` with that shared timeout constant to keep things consistent.
2. Ensure the argument order and number for `zenoh.query` exactly match the pattern used in `installExtension`, `enableExtension`, `disableExtension`, and `restartExtension` (e.g. if they omit one of the `undefined` placeholders, do the same here).
</issue_to_address>

### Comment 3
<location path="core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py" line_range="33-40" />
<code_context>
-        self._executor = ThreadPoolExecutor(
-            max_workers=4,
-            thread_name_prefix="zenoh-",
+        self._loop_ready = threading.Event()
+        self._loop_thread = threading.Thread(
+            target=self._run_loop,
+            name="zenoh-loop",
+            daemon=True,
         )
-
-    def submit_to_executor(self, func: Callable[..., Any]) -> None:
-        if self._executor is None:
-            logger.warning("Zenoh session executor is not available, task will not be initialized.")
-            return
+        self._loop_thread.start()
+        self._loop_ready.wait()
+
+    def _run_loop(self) -> None:
</code_context>
<issue_to_address>
**issue (bug_risk):** Waiting on _loop_ready without a timeout can block process startup if the loop thread fails to initialize.

If `_loop_thread` never calls `_loop_ready.set()` (e.g., it fails to start or errors before that point), `self._loop_ready.wait()` will block forever and can deadlock startup.

Consider adding a bounded timeout on the wait and then logging/raising a clear error if the event isn’t set in time, so you get a fast, diagnosable failure instead of a silent hang.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 292 to +294
export async function enableExtension(identifier: string, tag: string): Promise<void> {
await back_axios({
method: 'POST',
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${tag}/enable`,
timeout: 10000,
})
await zenoh.query(
`${KRAKEN_BASE_ZENOH}/extension/enable?identifier=${identifier};tag=${tag}`,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): URL parameters for zenoh queries are not encoded, which may break identifiers/tags with special characters.

Here identifier and tag are directly interpolated into the zenoh selector. If they contain ;, ?, &, spaces, or other reserved characters, the query can be malformed or misparsed. Please URL-encode these fields (e.g., with encodeURIComponent, as in buildInstallQueryKey) in enableExtension, disableExtension, uninstallExtension, and restartExtension so that arbitrary identifiers/tags are handled correctly.

url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${tag}/enable`,
timeout: 10000,
})
await zenoh.query(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): uninstallExtension zenoh query omits a timeout, which can hang longer than the caller-supplied timeout semantics used elsewhere.

Other zenoh queries here (installExtension, enableExtension, disableExtension, restartExtension, fetchInstalledExtensions, listContainers, getContainersStats) all set a timeout, but uninstallExtension does not. If the backend or network hangs, this call can block indefinitely. Please add an explicit timeout (e.g. the same 10s used by enable/disable/restart) to this query for consistent behavior.

Suggested implementation:

export async function uninstallExtension(identifier: string, tag?: string): Promise<void> {
  let queryKey = `${KRAKEN_BASE_ZENOH}/extension/uninstall?identifier=${identifier}`
  if (tag) queryKey += `;tag=${tag}`

  const timeout = 10_000

  await zenoh.query(
    queryKey,
    QueryTarget.BestMatching,
    undefined,
    undefined,
    timeout,
  )
}

If this file already defines or imports a shared zenoh query timeout constant (e.g. const ZENOH_QUERY_TIMEOUT = 10_000 or similar) and other functions like enableExtension / disableExtension use it, you should:

  1. Replace const timeout = 10_000 with that shared timeout constant to keep things consistent.
  2. Ensure the argument order and number for zenoh.query exactly match the pattern used in installExtension, enableExtension, disableExtension, and restartExtension (e.g. if they omit one of the undefined placeholders, do the same here).

Comment on lines +33 to +40
self._loop_ready = threading.Event()
self._loop_thread = threading.Thread(
target=self._run_loop,
name="zenoh-loop",
daemon=True,
)

def submit_to_executor(self, func: Callable[..., Any]) -> None:
if self._executor is None:
logger.warning("Zenoh session executor is not available, task will not be initialized.")
return
self._loop_thread.start()
self._loop_ready.wait()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Waiting on _loop_ready without a timeout can block process startup if the loop thread fails to initialize.

If _loop_thread never calls _loop_ready.set() (e.g., it fails to start or errors before that point), self._loop_ready.wait() will block forever and can deadlock startup.

Consider adding a bounded timeout on the wait and then logging/raising a clear error if the event isn’t set in time, so you get a fast, diagnosable failure instead of a silent hang.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant