Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 86 additions & 23 deletions rivetkit-typescript/packages/rivetkit/src/registry/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ async function callNative<T>(invoke: () => Promise<T>): Promise<T> {
}
}

function callNativeSync<T>(invoke: () => T): T {
export function callNativeSync<T>(invoke: () => T): T {
try {
return invoke();
} catch (error) {
Expand Down Expand Up @@ -1206,7 +1206,7 @@ function toActorKey(
);
}

class NativeConnAdapter {
export class NativeConnAdapter {
#runtime: CoreRuntime;
#conn: ConnHandle;
#schemas: NativeValidationConfig;
Expand Down Expand Up @@ -2368,6 +2368,88 @@ class TrackedWebSocketHandleAdapter implements UniversalWebSocket {
}
}

class NativeConnectionMap implements ReadonlyMap<string, NativeConnAdapter> {
#runtime: CoreRuntime;
#ctx: ActorContextHandle;
#schemas: NativeValidationConfig;

constructor(
runtime: CoreRuntime,
ctx: ActorContextHandle,
schemas: NativeValidationConfig,
) {
this.#runtime = runtime;
this.#ctx = ctx;
this.#schemas = schemas;
}

#connToAdapter(conn: ConnHandle): NativeConnAdapter {
return new NativeConnAdapter(
this.#runtime,
conn,
this.#schemas,
this.#ctx,
(connId) =>
callNativeSync(() =>
this.#runtime.actorQueueHibernationRemoval(
this.#ctx,
connId,
),
),
);
}

get size(): number {
return callNativeSync(() => this.#runtime.actorConns(this.#ctx)).length;
}

get(key: string): NativeConnAdapter | undefined {
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
const conn = conns.find(
(c) => this.#runtime.connId(c) === key,
);
if (!conn) return undefined;
return this.#connToAdapter(conn);
}

has(key: string): boolean {
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
return conns.some((c) => this.#runtime.connId(c) === key);
}

keys(): MapIterator<string> {
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
return conns.map((c) => this.#runtime.connId(c))[Symbol.iterator]() as MapIterator<string>;
}

values(): MapIterator<NativeConnAdapter> {
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
return conns.map((c) => this.#connToAdapter(c))[Symbol.iterator]() as MapIterator<NativeConnAdapter>;
}

entries(): MapIterator<[string, NativeConnAdapter]> {
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
return conns.map(
(c) => [this.#runtime.connId(c), this.#connToAdapter(c)] as [string, NativeConnAdapter],
)[Symbol.iterator]() as MapIterator<[string, NativeConnAdapter]>;
}

forEach(
callback: (value: NativeConnAdapter, key: string, map: ReadonlyMap<string, NativeConnAdapter>) => void,
thisArg?: unknown,
): void {
const conns = callNativeSync(() => this.#runtime.actorConns(this.#ctx));
for (const conn of conns) {
const id = this.#runtime.connId(conn);
callback.call(thisArg, this.#connToAdapter(conn), id, this);
}
}

[Symbol.iterator](): MapIterator<[string, NativeConnAdapter]> {
return this.entries();
}
}

export class ActorContextHandleAdapter {
#runtime: CoreRuntime;
#ctx: ActorContextHandle;
Expand Down Expand Up @@ -2556,27 +2638,8 @@ export class ActorContextHandleAdapter {
return callNativeSync(() => this.#runtime.actorRegion(this.#ctx));
}

get conns(): Map<string, NativeConnAdapter> {
return new Map(
callNativeSync(() => this.#runtime.actorConns(this.#ctx)).map(
(conn) => [
this.#runtime.connId(conn),
new NativeConnAdapter(
this.#runtime,
conn,
this.#schemas,
this.#ctx,
(connId) =>
callNativeSync(() =>
this.#runtime.actorQueueHibernationRemoval(
this.#ctx,
connId,
),
),
),
],
),
);
get conns(): NativeConnectionMap {
return new NativeConnectionMap(this.#runtime, this.#ctx, this.#schemas);
}

get log() {
Expand Down
Loading