Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 157 additions & 92 deletions core/frontend/src/components/kraken/KrakenManager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts'

import zenoh from '@/libs/zenoh'
import {
ExtensionData,
Expand All @@ -9,24 +11,13 @@ import {
UploadProgressEvent,
} from '@/types/kraken'
import back_axios from '@/utils/api'
import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts'
import { createDeferred } from '@/utils/deferred'

const KRAKEN_BASE_URL = '/kraken'
const KRAKEN_API_V2_URL = `${KRAKEN_BASE_URL}/v2.0`
const KRAKEN_BASE_ZENOH = 'kraken'
const INSTALL_PROGRESS_TOPIC = `${KRAKEN_BASE_ZENOH}/extension/install/progress`

/**
* List details of all installed extensions.
* @returns {Promise<InstalledExtensionData[]>}
*/
export async function fetchInstalledExtensions(): Promise<InstalledExtensionData[]> {
const response = await back_axios({
method: 'get',
url: `${KRAKEN_API_V2_URL}/extension/`,
timeout: 10000,
})

return response.data as InstalledExtensionData[]
}

/**
* List all manifest sources from kraken, uses API v2
Expand Down Expand Up @@ -188,78 +179,160 @@ export async function setManifestSourceOrder(identifier: string, order: number):
})
}

function buildInstallQueryKey(identifier: string, tag: string | undefined, stable: boolean): string {
let key = `${KRAKEN_BASE_ZENOH}/extension/install?identifier=${encodeURIComponent(identifier)}`
if (tag) key += `;tag=${encodeURIComponent(tag)}`
if (!stable) key += ';stable=false'
return key
}

type InstallSample =
| { kind: 'error'; message: string }
| { kind: 'complete' }
| { kind: 'progress'; raw: string }
| null

function parseInstallSample(raw: string, identifier: string): InstallSample {
let data: { identifier?: string; status?: string; error?: string }
try {
data = JSON.parse(raw)
} catch {
return null
}
if (data.identifier !== identifier) return null
if (data.error) return { kind: 'error', message: data.error }
if (data.status === 'complete') return { kind: 'complete' }
return { kind: 'progress', raw }
}

/**
* Install an extension to the latest version available
* @param {InstalledExtensionData} extension The extension to be installed
* Install an extension to the latest version available.
* The backend publishes the pull progress on `INSTALL_PROGRESS_TOPIC`.
*
* @param {string} identifier The identifier of the extension
* @param {function} progressHandler The progress handler for the download
* @param {string} tag The tag of the extension
* @param {boolean} stable If true, will install the latest stable version, default is true
* @param {number} timeout The timeout for the install
*/
export async function installExtension(
extension: InstalledExtensionData,
progressHandler: (event: any) => void,
identifier: string,
progressHandler?: (fragment: string) => void,
tag?: string,
stable = true,
timeout = 600000,
): Promise<void> {
await back_axios({
url: `${KRAKEN_API_V2_URL}/extension/install`,
method: 'POST',
data: {
identifier: extension.identifier,
name: extension.name,
docker: extension.docker,
tag: extension.tag,
enabled: true,
permissions: extension?.permissions ?? '',
user_permissions: extension?.user_permissions ?? '',
},
timeout: 600000,
onDownloadProgress: progressHandler,
})
const deferred = createDeferred<void>()
let subscriber: Subscriber | null = null
let timer: ReturnType<typeof setTimeout> | null = null

async function cleanup(): Promise<void> {
if (timer !== null) {
clearTimeout(timer)
timer = null
}
try {
await subscriber?.undeclare()
} catch {
// The subscriber may already be gone. Ignore cleanup errors.
}
subscriber = null
}

async function handleSample(sample: Sample): Promise<void> {
const result = parseInstallSample(sample.payload().to_string(), identifier)
if (result === null) return
switch (result.kind) {
case 'error':
cleanup().finally(() => deferred.reject(new Error(result.message)))
break
case 'complete':
cleanup().finally(() => deferred.resolve())
break
case 'progress':
progressHandler?.(result.raw)
break
default:
break
}
}

// Subscribe before triggering the install.
subscriber = await zenoh.subscriber(INSTALL_PROGRESS_TOPIC, handleSample)
if (!subscriber) {
throw new Error('Failed to subscribe to install progress topic')
}
timer = setTimeout(
() => cleanup().finally(() => deferred.reject(new Error(`Install timed out after ${timeout}ms`))),
timeout,
)

try {
const reply = await zenoh.query(
buildInstallQueryKey(identifier, tag, stable),
QueryTarget.BestMatching,
timeout,
)
if (!reply || reply.error) {
throw new Error(reply?.error ?? 'Install query failed')
}
} catch (error) {
await cleanup()
throw error
}

return deferred.promise
}

/**
* Enable an extension by its identifier and tag, uses API v2
* Enable an extension by its identifier and tag, uses zenoh
* @param {string} identifier The identifier of the extension
* @param {string} tag The tag of the extension
*/
export async function enableExtension(identifier: string, tag: string): Promise<void> {
await back_axios({
method: 'POST',
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${tag}/enable`,
timeout: 10000,
})
await zenoh.query(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): uninstallExtension zenoh query omits a timeout, which can hang longer than the caller-supplied timeout semantics used elsewhere.

Other zenoh queries here (installExtension, enableExtension, disableExtension, restartExtension, fetchInstalledExtensions, listContainers, getContainersStats) all set a timeout, but uninstallExtension does not. If the backend or network hangs, this call can block indefinitely. Please add an explicit timeout (e.g. the same 10s used by enable/disable/restart) to this query for consistent behavior.

Suggested implementation:

export async function uninstallExtension(identifier: string, tag?: string): Promise<void> {
  let queryKey = `${KRAKEN_BASE_ZENOH}/extension/uninstall?identifier=${identifier}`
  if (tag) queryKey += `;tag=${tag}`

  const timeout = 10_000

  await zenoh.query(
    queryKey,
    QueryTarget.BestMatching,
    undefined,
    undefined,
    timeout,
  )
}

If this file already defines or imports a shared zenoh query timeout constant (e.g. const ZENOH_QUERY_TIMEOUT = 10_000 or similar) and other functions like enableExtension / disableExtension use it, you should:

  1. Replace const timeout = 10_000 with that shared timeout constant to keep things consistent.
  2. Ensure the argument order and number for zenoh.query exactly match the pattern used in installExtension, enableExtension, disableExtension, and restartExtension (e.g. if they omit one of the undefined placeholders, do the same here).

`${KRAKEN_BASE_ZENOH}/extension/enable?identifier=${identifier};tag=${tag}`,
Comment on lines 292 to +294
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): URL parameters for zenoh queries are not encoded, which may break identifiers/tags with special characters.

Here identifier and tag are directly interpolated into the zenoh selector. If they contain ;, ?, &, spaces, or other reserved characters, the query can be malformed or misparsed. Please URL-encode these fields (e.g., with encodeURIComponent, as in buildInstallQueryKey) in enableExtension, disableExtension, uninstallExtension, and restartExtension so that arbitrary identifiers/tags are handled correctly.

QueryTarget.BestMatching,
10000,
)
}

/**
* Disable an extension by its identifier, uses API v2
* Disable an extension by its identifier, uses zenoh
* @param {string} identifier The identifier of the extension
*/
export async function disableExtension(identifier: string): Promise<void> {
await back_axios({
method: 'POST',
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/disable`,
timeout: 10000,
})
await zenoh.query(
`${KRAKEN_BASE_ZENOH}/extension/disable?identifier=${identifier}`,
QueryTarget.BestMatching,
10000,
)
}

/**
* Uninstall an extension by its identifier, uses API v2
* Uninstall an extension by its identifier, uses zenoh
* @param {string} identifier The identifier of the extension
*/
export async function uninstallExtension(identifier: string): Promise<void> {
await back_axios({
method: 'DELETE',
url: `${KRAKEN_API_V2_URL}/extension/${identifier}`,
})
export async function uninstallExtension(identifier: string, tag?: string): Promise<void> {
let queryKey = `${KRAKEN_BASE_ZENOH}/extension/uninstall?identifier=${identifier}`
if (tag) queryKey += `;tag=${tag}`

await zenoh.query(
queryKey,
QueryTarget.BestMatching,
)
}

/**
* Restart an extension by its identifier, uses API v2
* Restart an extension by its identifier, uses zenoh
* @param {string} identifier The identifier of the extension
*/
export async function restartExtension(identifier: string): Promise<void> {
await back_axios({
method: 'POST',
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/restart`,
timeout: 10000,
})
await zenoh.query(
`${KRAKEN_BASE_ZENOH}/extension/restart?identifier=${identifier}`,
QueryTarget.BestMatching,
10000,
)
}

/**
Expand All @@ -282,42 +355,37 @@ export async function updateExtensionToVersion(
}

/**
* List all installed extensions from kraken, uses API v2
* List details of all installed extensions.
* @returns {Promise<InstalledExtensionData[]> | null}
*/
export async function getInstalledExtensions(): Promise<InstalledExtensionData[]> {
const response = await back_axios({
method: 'GET',
url: `${KRAKEN_API_V2_URL}/extension/`,
timeout: 30000,
})

return response.data as InstalledExtensionData[]
export async function fetchInstalledExtensions(): Promise<InstalledExtensionData[] | null> {
return zenoh.query(
`${KRAKEN_BASE_ZENOH}/extension/fetch`,
QueryTarget.BestMatching,
30000,
)
}

/**
* List all running containers from kraken, uses API v2
* List all running containers from kraken, uses zenoh.
*/
export async function listContainers(): Promise<RunningContainer[]> {
const response = await back_axios({
method: 'GET',
url: `${KRAKEN_API_V2_URL}/container/`,
timeout: 30000,
})

return response.data as RunningContainer[]
export async function listContainers(): Promise<RunningContainer[] | null> {
return zenoh.query(
`${KRAKEN_BASE_ZENOH}/container/fetch`,
QueryTarget.BestMatching,
10000,
)
}

/**
* List all stats of all running containers from kraken, uses API v2
* List all stats of all running containers from kraken, uses zenoh.
*/
export async function getContainersStats(): Promise<any> {
const response = await back_axios({
method: 'GET',
url: `${KRAKEN_API_V2_URL}/container/stats`,
timeout: 20000,
})

return response.data
export async function getContainersStats(): Promise<any | null> {
return zenoh.query(
`${KRAKEN_BASE_ZENOH}/container/stats`,
QueryTarget.BestMatching,
10000,
)
}

/**
Expand Down Expand Up @@ -352,11 +420,10 @@ export async function uploadExtensionTarFile(
* @returns {Promise<void>}
*/
export async function keepTemporaryExtensionAlive(tempTag: string): Promise<void> {
await back_axios({
method: 'POST',
url: `${KRAKEN_API_V2_URL}/extension/upload/keep-alive?temp_tag=${tempTag}`,
timeout: 10000,
})
await zenoh.query(
`${KRAKEN_BASE_ZENOH}/extension/upload/keep-alive?temp_tag=${tempTag}`,
QueryTarget.BestMatching,
)
}

/**
Expand Down Expand Up @@ -395,7 +462,7 @@ export async function finalizeExtension(
* @returns {Promise<any | null>}
*/
export async function getHistoricalLogsForExtension(identifier: string, timeout: number): Promise<any | null> {
const queryKey = `kraken/extension/logs/request?extension_name=${identifier}`
const queryKey = `${KRAKEN_BASE_ZENOH}/container/logs/request?extension_name=${identifier}`
return await zenoh.query(queryKey, QueryTarget.BestMatching, timeout)
}

Expand Down Expand Up @@ -424,9 +491,7 @@ export default {
disabledManifestSource,
setManifestSourcesOrders,
setManifestSourceOrder,
updateExtensionToVersion,
installExtension,
getInstalledExtensions,
enableExtension,
disableExtension,
uninstallExtension,
Expand Down
15 changes: 15 additions & 0 deletions core/frontend/src/utils/deferred.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
interface Deferred<T> {
promise: Promise<T>
resolve: (value: T) => void
reject: (reason: Error) => void
}

export function createDeferred<T>(): Deferred<T> {
let resolve!: (value: T) => void
let reject!: (reason: Error) => void
const promise = new Promise<T>((res, rej) => {
resolve = res
reject = rej
})
return { promise, resolve, reject }
}
Loading
Loading