Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions sigils/actors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ method addSubscription*(
obj.ensureActorReady()
doAssert not obj.isNil(), "agent is nil!"
when sigilsSlotEnvDisabled:
assert subscription.slot != nil
assert subscription.packedSlot != nil or subscription.directSlot != nil
else:
assert subscription.slot != nil or subscription.envSlot != nil
assert subscription.packedSlot != nil or subscription.directSlot != nil or
subscription.envSlot != nil

var added = false
withLock obj.lock:
Expand All @@ -125,7 +126,17 @@ method addSubscription*(
method addSubscription*(
obj: AgentActor, sig: SigilName, tgt: WeakRef[Agent], slot: AgentProc
) {.gcsafe, raises: [].} =
addSubscription(obj, sig, Subscription(tgt: tgt, slot: slot))
addSubscription(obj, sig, Subscription(tgt: tgt, packedSlot: slot))

method addSubscription*(
obj: AgentActor,
sig: SigilName,
tgt: WeakRef[Agent],
slot: AgentProc,
directSlot: LocalAgentProc
) {.gcsafe, raises: [].} =
addSubscription(obj, sig, Subscription(tgt: tgt, packedSlot: slot,
directSlot: directSlot))

method delSubscription*(
self: AgentActor, sig: SigilName, tgt: WeakRef[Agent], slot: AgentProc
Expand Down
82 changes: 61 additions & 21 deletions sigils/agents.nim
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import std/[options, tables, sequtils, sets, macros, hashes]
import std/times
import std/isolation
import std/[locks, options]
import std/[hashes, options, sets, strformat, tables]
import stack_strings

import threading/atomics

import protocol
import weakrefs
import debugs
Expand All @@ -27,7 +22,8 @@ else:

export sets, options, svariant, IndexableChars, weakrefs, protocol

import std/[terminal, strutils, strformat, sequtils]
when defined(sigilsDebugPrint):
import std/terminal
export strformat
export debugs

Expand All @@ -53,6 +49,13 @@ type

# Procedure signature accepted as an RPC call by server.
AgentProc* = proc(context: Agent, params: SigilParams) {.nimcall.}
LocalAgentProc* = proc(context: Agent, params: pointer) {.nimcall.}

SigilLocalCall*[S, A] = object
source*: S
procName*: SigilName
origin*: SigilId
args*: A

SlotEnv* = ref object of RootObj
## Owned environment for receiver-bound closure slots.
Expand All @@ -63,15 +66,18 @@ type

Subscription* = object
tgt*: WeakRef[Agent]
slot*: AgentProc
packedSlot*: AgentProc
directSlot*: LocalAgentProc
when not sigilsSlotEnvDisabled:
envSlot*: EnvAgentProc
env*: SlotEnv

AgentProcTy*[S] = AgentProc
LocalAgentProcTy*[S] = LocalAgentProc

Signal*[S] = AgentProcTy[S]
SignalTypes* = distinct object
LocalSignalTypes* = distinct object

when defined(nimscript):
proc getSigilId*(a: Agent): SigilId =
Expand Down Expand Up @@ -108,11 +114,10 @@ method removeSubscriptionsFor*(
debugPrint " removeSubscriptionsFor:agent: ", " self:id: ",
$self.unsafeWeakRef()
## Route's an rpc request.
var toDel: seq[int] = newSeq[int](self.subcriptions.len())
for idx in countdown(self.subcriptions.len() - 1, 0):
debugPrint " removeSubscriptionsFor subs sig: ", $self.subcriptions[idx].signal
if self.subcriptions[idx].subscription.tgt == subscriber:
self.subcriptions.delete(idx..idx)
self.subcriptions.delete(idx)

method unregisterSubscriber*(
self: Agent, listener: WeakRef[Agent]
Expand Down Expand Up @@ -218,7 +223,7 @@ method hasSubscription*(
for idx in 0 ..< obj.subcriptions.len():
if obj.subcriptions[idx].signal == sig and
obj.subcriptions[idx].subscription.tgt == tgt and
obj.subcriptions[idx].subscription.slot == slot:
obj.subcriptions[idx].subscription.packedSlot == slot:
return true

template hasSubscription*(obj: Agent,
Expand All @@ -230,12 +235,16 @@ template hasSubscription*(obj: Agent,

proc hasCallable(subscription: Subscription): bool =
when sigilsSlotEnvDisabled:
not subscription.slot.isNil
not subscription.packedSlot.isNil or not subscription.directSlot.isNil
else:
not subscription.slot.isNil or not subscription.envSlot.isNil
not subscription.packedSlot.isNil or not subscription.directSlot.isNil or
not subscription.envSlot.isNil

proc sameHandler(a, b: Subscription): bool =
result = a.slot == b.slot
if a.packedSlot.isNil and b.packedSlot.isNil:
result = a.directSlot == b.directSlot
else:
result = a.packedSlot == b.packedSlot
when not sigilsSlotEnvDisabled:
result = result and a.envSlot == b.envSlot and a.env == b.env

Expand Down Expand Up @@ -269,7 +278,17 @@ method addSubscription*(
method addSubscription*(
obj: Agent, sig: SigilName, tgt: WeakRef[Agent], slot: AgentProc
) {.base, gcsafe, raises: [].} =
addSubscription(obj, sig, Subscription(tgt: tgt, slot: slot))
addSubscription(obj, sig, Subscription(tgt: tgt, packedSlot: slot))

method addSubscription*(
obj: Agent,
sig: SigilName,
tgt: WeakRef[Agent],
slot: AgentProc,
directSlot: LocalAgentProc
) {.base, gcsafe, raises: [].} =
addSubscription(obj, sig, Subscription(tgt: tgt, packedSlot: slot,
directSlot: directSlot))

template addSubscription*(
obj: Agent,
Expand All @@ -280,8 +299,31 @@ template addSubscription*(
let tgtRef = tgt.unsafeWeakRef().toKind(Agent)
addSubscription(obj, sig.toSigilName(), tgtRef, slot)

template addSubscription*(
obj: Agent,
sig: IndexableChars,
tgt: Agent | WeakRef[Agent],
slot: AgentProc,
directSlot: LocalAgentProc
): void =
let tgtRef = tgt.unsafeWeakRef().toKind(Agent)
addSubscription(obj, sig.toSigilName(), tgtRef, slot, directSlot)

var printConnectionsSlotNames* = initTable[pointer, string]()

when defined(sigilsDebugPrint):
proc slotDebugName(subscription: Subscription): string =
if not subscription.packedSlot.isNil:
return printConnectionsSlotNames.getOrDefault(
subscription.packedSlot, subscription.packedSlot.repr
)
if not subscription.directSlot.isNil:
return subscription.directSlot.repr
when not sigilsSlotEnvDisabled:
if not subscription.envSlot.isNil:
return subscription.envSlot.repr
"nil"

method delSubscription*(
self: Agent, sig: SigilName, tgt: WeakRef[Agent], slot: AgentProc
) {.base, gcsafe, raises: [].} =
Expand All @@ -294,9 +336,9 @@ method delSubscription*(
if self.subcriptions[idx].signal == sig and
self.subcriptions[idx].subscription.tgt == tgt:
subsFound.inc()
if slot == nil or self.subcriptions[idx].subscription.slot == slot:
if slot == nil or self.subcriptions[idx].subscription.packedSlot == slot:
subsDeleted.inc()
self.subcriptions.delete(idx..idx)
self.subcriptions.delete(idx)

if subsFound == subsDeleted:
tgt[].delListener(self.unsafeWeakRef().asAgent())
Expand All @@ -309,7 +351,7 @@ method delSubscription*(
if self.subcriptions[idx].signal == sig and
self.subcriptions[idx].subscription.sameSubscription(subscription):
deleted = true
self.subcriptions.delete(idx..idx)
self.subcriptions.delete(idx)

if deleted and not procCall hasSubscription(self, sig, subscription.tgt):
subscription.tgt[].delListener(self.unsafeWeakRef().asAgent())
Expand Down Expand Up @@ -342,10 +384,8 @@ proc printConnections*(agent: Agent) =
brightPrint fgBlue, "connections for Agent: ", $agent.unsafeWeakRef()
brightPrint fgMagenta, "\t subscribers:", ""
for item in agent.subcriptions:
let sname = printConnectionsSlotNames.getOrDefault(
item.subscription.slot, item.subscription.slot.repr)
brightPrint fgGreen, "\t\t:", $item.signal, ": => ",
$item.subscription.tgt & " slot: " & $sname
$item.subscription.tgt & " slot: " & slotDebugName(item.subscription)
brightPrint fgMagenta, "\t listening:", ""
for listening in agent.listening:
brightPrint fgRed, "\t\t listen: ", $listening
76 changes: 65 additions & 11 deletions sigils/core.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ when not sigilsSlotEnvDisabled:
## Route a sigil request through a static slot or an env-backed closure slot.
if subscription.envSlot.isNil:
{.cast(gcsafe).}:
result = ctx.callMethod(req, subscription.slot)
result = ctx.callMethod(req, subscription.packedSlot)
else:
{.cast(gcsafe).}:
subscription.envSlot(ctx, req.params, subscription.env)
Expand All @@ -47,6 +47,18 @@ from system/ansi_c import c_raise

type AgentSlotError* = object of CatchableError

template checkSlotResponse(res: SigilResponse) =
when defined(nimscript) or defined(useJsonSerde):
discard
elif defined(sigilsCborSerde):
discard
else:
variantMatch case res.result.buf as u
of SigilError:
raise newException(AgentSlotError, $u.code & " msg: " & u.msg)
else:
discard

template callSlotsImpl(obj: Agent, req: SigilRequest, subsIter: untyped) =
for sub in subsIter:
{.cast(gcsafe).}:
Expand All @@ -60,21 +72,37 @@ template callSlotsImpl(obj: Agent, req: SigilRequest, subsIter: untyped) =
discard c_raise(11.cint)
assert sub.tgt[].freedByThread == 0
when sigilsSlotEnvDisabled:
var res: SigilResponse = sub.tgt[].callMethod(req, sub.slot)
var res: SigilResponse = sub.tgt[].callMethod(req, sub.packedSlot)
else:
var res: SigilResponse = sub.tgt[].callMethod(req, sub)

when defined(nimscript) or defined(useJsonSerde):
discard
elif defined(sigilsCborSerde):
discard
checkSlotResponse(res)

template callSlotsLocalImpl(
obj: Agent,
procName: SigilName,
origin: SigilId,
args: untyped,
subsIter: untyped
) =
var
reqReady = false
req: SigilRequest
for sub in subsIter:
{.cast(gcsafe).}:
if not sub.directSlot.isNil:
sub.directSlot(sub.tgt[], addr args)
else:
discard
variantMatch case res.result.buf as u
of SigilError:
raise newException(AgentSlotError, $u.code & " msg: " & u.msg)
if not reqReady:
req = initSigilRequest[typeof(obj), typeof(args)](
procName = procName, args = args, origin = origin
)
reqReady = true
when sigilsSlotEnvDisabled:
var res: SigilResponse = sub.tgt[].callMethod(req, sub.packedSlot)
else:
discard
var res: SigilResponse = sub.tgt[].callMethod(req, sub)
checkSlotResponse(res)

method callSlots*(obj: Agent, req: SigilRequest) {.base, gcsafe.} =
callSlotsImpl(obj, req, obj.getSubscriptions(req.procName))
Expand All @@ -87,9 +115,35 @@ method callSlots*(obj: AgentActor, req: SigilRequest) {.gcsafe.} =
subs.add(sub)
callSlotsImpl(Agent(obj), req, subs.items)

proc callSlotsLocal*[A](
obj: Agent, procName: SigilName, origin: SigilId, args: var A
) {.gcsafe.} =
if obj of AgentActor:
let actor = AgentActor(obj)
actor.ensureActorReady()
var subs: seq[Subscription]
withLock actor.lock:
for sub in actor.getSubscriptions(procName):
subs.add(sub)
callSlotsLocalImpl(Agent(actor), procName, origin, args, subs.items)
else:
callSlotsLocalImpl(obj, procName, origin, args, obj.getSubscriptions(procName))

proc emit*(call: (Agent | WeakRef[Agent], SigilRequest)) =
let (obj, req) = call
when obj is WeakRef[Agent]:
obj[].callSlots(req)
else:
obj.callSlots(req)

proc emit*[T: Agent, A](call: sink SigilLocalCall[T, A]) =
var localCall = call
localCall.source.callSlotsLocal(
localCall.procName, localCall.origin, localCall.args
)

proc emit*[T: Agent, A](call: sink SigilLocalCall[WeakRef[T], A]) =
var localCall = call
localCall.source[].callSlotsLocal(
localCall.procName, localCall.origin, localCall.args
)
2 changes: 1 addition & 1 deletion sigils/registry.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ proc collectProxyCloneSignals[T](proxy: AgentProxy[T]): seq[ProxyCloneSignal] =
# Only clone self-targeted subscriptions (proxy -> proxy).
if item.subscription.tgt == proxyRef:
result.add(ProxyCloneSignal(signal: item.signal,
slot: item.subscription.slot))
slot: item.subscription.packedSlot))

proc applyProxyCloneSignals[T](proxy: AgentProxy[T],
clones: seq[ProxyCloneSignal]) {.gcsafe.} =
Expand Down
11 changes: 8 additions & 3 deletions sigils/signals.nim
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,14 @@ template connect*(
slot: untyped,
acceptVoidSlot: static bool = false,
): void =
let agentSlot = `slot`(typeof(b))
checkSignalTypes(a, signal, b, agentSlot, acceptVoidSlot)
a.addSubscription(signalName(signal), b, agentSlot)
let packedAgentSlot = `slot`(typeof(b))
let directAgentSlot: LocalAgentProc =
when compiles(`slot`(LocalSignalTypes, typeof(b))):
`slot`(LocalSignalTypes, typeof(b))
else:
nil
checkSignalTypes(a, signal, b, packedAgentSlot, acceptVoidSlot)
a.addSubscription(signalName(signal), b, packedAgentSlot, directAgentSlot)

template connected*(
a: Agent,
Expand Down
Loading
Loading