diff --git a/sigils/actors.nim b/sigils/actors.nim index 6e47c813..12a70a90 100644 --- a/sigils/actors.nim +++ b/sigils/actors.nim @@ -109,9 +109,10 @@ method addSubscription*( obj.ensureActorReady() doAssert not obj.isNil(), "agent is nil!" when sigilsSlotEnvDisabled: - assert subscription.slot != nil + assert subscription.packedSlot != nil or subscription.directSlot != nil else: - assert subscription.slot != nil or subscription.envSlot != nil + assert subscription.packedSlot != nil or subscription.directSlot != nil or + subscription.envSlot != nil var added = false withLock obj.lock: @@ -125,7 +126,17 @@ method addSubscription*( method addSubscription*( obj: AgentActor, sig: SigilName, tgt: WeakRef[Agent], slot: AgentProc ) {.gcsafe, raises: [].} = - addSubscription(obj, sig, Subscription(tgt: tgt, slot: slot)) + addSubscription(obj, sig, Subscription(tgt: tgt, packedSlot: slot)) + +method addSubscription*( + obj: AgentActor, + sig: SigilName, + tgt: WeakRef[Agent], + slot: AgentProc, + directSlot: LocalAgentProc +) {.gcsafe, raises: [].} = + addSubscription(obj, sig, Subscription(tgt: tgt, packedSlot: slot, + directSlot: directSlot)) method delSubscription*( self: AgentActor, sig: SigilName, tgt: WeakRef[Agent], slot: AgentProc diff --git a/sigils/agents.nim b/sigils/agents.nim index 6a6b6898..9de713dc 100644 --- a/sigils/agents.nim +++ b/sigils/agents.nim @@ -1,11 +1,6 @@ -import std/[options, tables, sequtils, sets, macros, hashes] -import std/times -import std/isolation -import std/[locks, options] +import std/[hashes, options, sets, strformat, tables] import stack_strings -import threading/atomics - import protocol import weakrefs import debugs @@ -27,7 +22,8 @@ else: export sets, options, svariant, IndexableChars, weakrefs, protocol -import std/[terminal, strutils, strformat, sequtils] +when defined(sigilsDebugPrint): + import std/terminal export strformat export debugs @@ -53,6 +49,13 @@ type # Procedure signature accepted as an RPC call by server. AgentProc* = proc(context: Agent, params: SigilParams) {.nimcall.} + LocalAgentProc* = proc(context: Agent, params: pointer) {.nimcall.} + + SigilLocalCall*[S, A] = object + source*: S + procName*: SigilName + origin*: SigilId + args*: A SlotEnv* = ref object of RootObj ## Owned environment for receiver-bound closure slots. @@ -63,15 +66,18 @@ type Subscription* = object tgt*: WeakRef[Agent] - slot*: AgentProc + packedSlot*: AgentProc + directSlot*: LocalAgentProc when not sigilsSlotEnvDisabled: envSlot*: EnvAgentProc env*: SlotEnv AgentProcTy*[S] = AgentProc + LocalAgentProcTy*[S] = LocalAgentProc Signal*[S] = AgentProcTy[S] SignalTypes* = distinct object + LocalSignalTypes* = distinct object when defined(nimscript): proc getSigilId*(a: Agent): SigilId = @@ -108,11 +114,10 @@ method removeSubscriptionsFor*( debugPrint " removeSubscriptionsFor:agent: ", " self:id: ", $self.unsafeWeakRef() ## Route's an rpc request. - var toDel: seq[int] = newSeq[int](self.subcriptions.len()) for idx in countdown(self.subcriptions.len() - 1, 0): debugPrint " removeSubscriptionsFor subs sig: ", $self.subcriptions[idx].signal if self.subcriptions[idx].subscription.tgt == subscriber: - self.subcriptions.delete(idx..idx) + self.subcriptions.delete(idx) method unregisterSubscriber*( self: Agent, listener: WeakRef[Agent] @@ -218,7 +223,7 @@ method hasSubscription*( for idx in 0 ..< obj.subcriptions.len(): if obj.subcriptions[idx].signal == sig and obj.subcriptions[idx].subscription.tgt == tgt and - obj.subcriptions[idx].subscription.slot == slot: + obj.subcriptions[idx].subscription.packedSlot == slot: return true template hasSubscription*(obj: Agent, @@ -230,12 +235,16 @@ template hasSubscription*(obj: Agent, proc hasCallable(subscription: Subscription): bool = when sigilsSlotEnvDisabled: - not subscription.slot.isNil + not subscription.packedSlot.isNil or not subscription.directSlot.isNil else: - not subscription.slot.isNil or not subscription.envSlot.isNil + not subscription.packedSlot.isNil or not subscription.directSlot.isNil or + not subscription.envSlot.isNil proc sameHandler(a, b: Subscription): bool = - result = a.slot == b.slot + if a.packedSlot.isNil and b.packedSlot.isNil: + result = a.directSlot == b.directSlot + else: + result = a.packedSlot == b.packedSlot when not sigilsSlotEnvDisabled: result = result and a.envSlot == b.envSlot and a.env == b.env @@ -269,7 +278,17 @@ method addSubscription*( method addSubscription*( obj: Agent, sig: SigilName, tgt: WeakRef[Agent], slot: AgentProc ) {.base, gcsafe, raises: [].} = - addSubscription(obj, sig, Subscription(tgt: tgt, slot: slot)) + addSubscription(obj, sig, Subscription(tgt: tgt, packedSlot: slot)) + +method addSubscription*( + obj: Agent, + sig: SigilName, + tgt: WeakRef[Agent], + slot: AgentProc, + directSlot: LocalAgentProc +) {.base, gcsafe, raises: [].} = + addSubscription(obj, sig, Subscription(tgt: tgt, packedSlot: slot, + directSlot: directSlot)) template addSubscription*( obj: Agent, @@ -280,8 +299,31 @@ template addSubscription*( let tgtRef = tgt.unsafeWeakRef().toKind(Agent) addSubscription(obj, sig.toSigilName(), tgtRef, slot) +template addSubscription*( + obj: Agent, + sig: IndexableChars, + tgt: Agent | WeakRef[Agent], + slot: AgentProc, + directSlot: LocalAgentProc +): void = + let tgtRef = tgt.unsafeWeakRef().toKind(Agent) + addSubscription(obj, sig.toSigilName(), tgtRef, slot, directSlot) + var printConnectionsSlotNames* = initTable[pointer, string]() +when defined(sigilsDebugPrint): + proc slotDebugName(subscription: Subscription): string = + if not subscription.packedSlot.isNil: + return printConnectionsSlotNames.getOrDefault( + subscription.packedSlot, subscription.packedSlot.repr + ) + if not subscription.directSlot.isNil: + return subscription.directSlot.repr + when not sigilsSlotEnvDisabled: + if not subscription.envSlot.isNil: + return subscription.envSlot.repr + "nil" + method delSubscription*( self: Agent, sig: SigilName, tgt: WeakRef[Agent], slot: AgentProc ) {.base, gcsafe, raises: [].} = @@ -294,9 +336,9 @@ method delSubscription*( if self.subcriptions[idx].signal == sig and self.subcriptions[idx].subscription.tgt == tgt: subsFound.inc() - if slot == nil or self.subcriptions[idx].subscription.slot == slot: + if slot == nil or self.subcriptions[idx].subscription.packedSlot == slot: subsDeleted.inc() - self.subcriptions.delete(idx..idx) + self.subcriptions.delete(idx) if subsFound == subsDeleted: tgt[].delListener(self.unsafeWeakRef().asAgent()) @@ -309,7 +351,7 @@ method delSubscription*( if self.subcriptions[idx].signal == sig and self.subcriptions[idx].subscription.sameSubscription(subscription): deleted = true - self.subcriptions.delete(idx..idx) + self.subcriptions.delete(idx) if deleted and not procCall hasSubscription(self, sig, subscription.tgt): subscription.tgt[].delListener(self.unsafeWeakRef().asAgent()) @@ -342,10 +384,8 @@ proc printConnections*(agent: Agent) = brightPrint fgBlue, "connections for Agent: ", $agent.unsafeWeakRef() brightPrint fgMagenta, "\t subscribers:", "" for item in agent.subcriptions: - let sname = printConnectionsSlotNames.getOrDefault( - item.subscription.slot, item.subscription.slot.repr) brightPrint fgGreen, "\t\t:", $item.signal, ": => ", - $item.subscription.tgt & " slot: " & $sname + $item.subscription.tgt & " slot: " & slotDebugName(item.subscription) brightPrint fgMagenta, "\t listening:", "" for listening in agent.listening: brightPrint fgRed, "\t\t listen: ", $listening diff --git a/sigils/core.nim b/sigils/core.nim index 8b0e5e71..0566469b 100644 --- a/sigils/core.nim +++ b/sigils/core.nim @@ -36,7 +36,7 @@ when not sigilsSlotEnvDisabled: ## Route a sigil request through a static slot or an env-backed closure slot. if subscription.envSlot.isNil: {.cast(gcsafe).}: - result = ctx.callMethod(req, subscription.slot) + result = ctx.callMethod(req, subscription.packedSlot) else: {.cast(gcsafe).}: subscription.envSlot(ctx, req.params, subscription.env) @@ -47,6 +47,18 @@ from system/ansi_c import c_raise type AgentSlotError* = object of CatchableError +template checkSlotResponse(res: SigilResponse) = + when defined(nimscript) or defined(useJsonSerde): + discard + elif defined(sigilsCborSerde): + discard + else: + variantMatch case res.result.buf as u + of SigilError: + raise newException(AgentSlotError, $u.code & " msg: " & u.msg) + else: + discard + template callSlotsImpl(obj: Agent, req: SigilRequest, subsIter: untyped) = for sub in subsIter: {.cast(gcsafe).}: @@ -60,21 +72,37 @@ template callSlotsImpl(obj: Agent, req: SigilRequest, subsIter: untyped) = discard c_raise(11.cint) assert sub.tgt[].freedByThread == 0 when sigilsSlotEnvDisabled: - var res: SigilResponse = sub.tgt[].callMethod(req, sub.slot) + var res: SigilResponse = sub.tgt[].callMethod(req, sub.packedSlot) else: var res: SigilResponse = sub.tgt[].callMethod(req, sub) - when defined(nimscript) or defined(useJsonSerde): - discard - elif defined(sigilsCborSerde): - discard + checkSlotResponse(res) + +template callSlotsLocalImpl( + obj: Agent, + procName: SigilName, + origin: SigilId, + args: untyped, + subsIter: untyped +) = + var + reqReady = false + req: SigilRequest + for sub in subsIter: + {.cast(gcsafe).}: + if not sub.directSlot.isNil: + sub.directSlot(sub.tgt[], addr args) else: - discard - variantMatch case res.result.buf as u - of SigilError: - raise newException(AgentSlotError, $u.code & " msg: " & u.msg) + if not reqReady: + req = initSigilRequest[typeof(obj), typeof(args)]( + procName = procName, args = args, origin = origin + ) + reqReady = true + when sigilsSlotEnvDisabled: + var res: SigilResponse = sub.tgt[].callMethod(req, sub.packedSlot) else: - discard + var res: SigilResponse = sub.tgt[].callMethod(req, sub) + checkSlotResponse(res) method callSlots*(obj: Agent, req: SigilRequest) {.base, gcsafe.} = callSlotsImpl(obj, req, obj.getSubscriptions(req.procName)) @@ -87,9 +115,35 @@ method callSlots*(obj: AgentActor, req: SigilRequest) {.gcsafe.} = subs.add(sub) callSlotsImpl(Agent(obj), req, subs.items) +proc callSlotsLocal*[A]( + obj: Agent, procName: SigilName, origin: SigilId, args: var A +) {.gcsafe.} = + if obj of AgentActor: + let actor = AgentActor(obj) + actor.ensureActorReady() + var subs: seq[Subscription] + withLock actor.lock: + for sub in actor.getSubscriptions(procName): + subs.add(sub) + callSlotsLocalImpl(Agent(actor), procName, origin, args, subs.items) + else: + callSlotsLocalImpl(obj, procName, origin, args, obj.getSubscriptions(procName)) + proc emit*(call: (Agent | WeakRef[Agent], SigilRequest)) = let (obj, req) = call when obj is WeakRef[Agent]: obj[].callSlots(req) else: obj.callSlots(req) + +proc emit*[T: Agent, A](call: sink SigilLocalCall[T, A]) = + var localCall = call + localCall.source.callSlotsLocal( + localCall.procName, localCall.origin, localCall.args + ) + +proc emit*[T: Agent, A](call: sink SigilLocalCall[WeakRef[T], A]) = + var localCall = call + localCall.source[].callSlotsLocal( + localCall.procName, localCall.origin, localCall.args + ) diff --git a/sigils/registry.nim b/sigils/registry.nim index 5f2f8a13..e884bdef 100644 --- a/sigils/registry.nim +++ b/sigils/registry.nim @@ -37,7 +37,7 @@ proc collectProxyCloneSignals[T](proxy: AgentProxy[T]): seq[ProxyCloneSignal] = # Only clone self-targeted subscriptions (proxy -> proxy). if item.subscription.tgt == proxyRef: result.add(ProxyCloneSignal(signal: item.signal, - slot: item.subscription.slot)) + slot: item.subscription.packedSlot)) proc applyProxyCloneSignals[T](proxy: AgentProxy[T], clones: seq[ProxyCloneSignal]) {.gcsafe.} = diff --git a/sigils/signals.nim b/sigils/signals.nim index e0ebbc14..b50634c2 100644 --- a/sigils/signals.nim +++ b/sigils/signals.nim @@ -115,9 +115,14 @@ template connect*( slot: untyped, acceptVoidSlot: static bool = false, ): void = - let agentSlot = `slot`(typeof(b)) - checkSignalTypes(a, signal, b, agentSlot, acceptVoidSlot) - a.addSubscription(signalName(signal), b, agentSlot) + let packedAgentSlot = `slot`(typeof(b)) + let directAgentSlot: LocalAgentProc = + when compiles(`slot`(LocalSignalTypes, typeof(b))): + `slot`(LocalSignalTypes, typeof(b)) + else: + nil + checkSignalTypes(a, signal, b, packedAgentSlot, acceptVoidSlot) + a.addSubscription(signalName(signal), b, packedAgentSlot, directAgentSlot) template connected*( a: Agent, diff --git a/sigils/slots.nim b/sigils/slots.nim index 41e60719..c29ad1fb 100644 --- a/sigils/slots.nim +++ b/sigils/slots.nim @@ -210,6 +210,18 @@ macro rpcImpl*(p: untyped, publish: untyped, qarg: untyped): untyped = for param in parameters[1 ..^ 1]: mcall.add param[0] + let localArgsIdent = ident("localArgs") + let localMcall = nnkCall.newTree(rpcMethod) + localMcall.add(objId) + var localArgIdx = 0 + for param in parameters[1 ..^ 1]: + discard param + localMcall.add nnkBracketExpr.newTree( + nnkBracketExpr.newTree(localArgsIdent), + newIntLitNode(localArgIdx) + ) + localArgIdx.inc() + let agentSlotImpl = quote: proc slot(context: Agent, params: SigilParams) {.nimcall.} = if context == nil: @@ -223,6 +235,17 @@ macro rpcImpl*(p: untyped, publish: untyped, qarg: untyped): untyped = `paramSetups` `mcall` + let directSlotImpl = quote: + proc directSlot(context: Agent, rawArgs: pointer) {.nimcall.} = + if context == nil: + raise newException(ValueError, "bad value") + let `objId` = `contextType`(context) + if `objId` == nil: + raise newException(ConversionError, "bad cast") + when `tupTyp` isnot tuple[]: + let `localArgsIdent` = cast[ptr `tupTyp`](rawArgs) + `localMcall` + let procTyp = quote: proc() {.nimcall.} procTyp.params = params.copyNimTree() @@ -238,6 +261,12 @@ macro rpcImpl*(p: untyped, publish: untyped, qarg: untyped): untyped = `agentSlotImpl` slot + proc `rpcMethod`( + `kd`: typedesc[LocalSignalTypes], `tp`: typedesc[`contextType`] + ): LocalAgentProcTy[`signalTyp`] = + `directSlotImpl` + directSlot + result.updateProcsSig(isPublic, genericParams, procLineInfo) elif isSignal: var construct = nnkTupleConstr.newTree() @@ -246,13 +275,17 @@ macro rpcImpl*(p: untyped, publish: untyped, qarg: untyped): untyped = let objId = ident"obj" result.add quote do: - proc `rpcMethod`(`objId`: `firstType`): (Agent, SigilRequestTy[`firstType`]) = + proc `rpcMethod`( + `objId`: `firstType` + ): SigilLocalCall[`firstType`, typeof(`construct`)] = var args = `construct` - let name: SigilName = toSigilName(`signalName`) - let req = initSigilRequest[`firstType`, typeof(args)]( - procName = name, args = ensureMove args, origin = `objId`.getSigilId() + const name: SigilName = toSigilName(`signalName`) + result = SigilLocalCall[`firstType`, typeof(args)]( + source: `objId`, + procName: name, + origin: `objId`.getSigilId(), + args: ensureMove args ) - result = (`objId`, req) for param in parameters[1 ..^ 1]: result[^1][3].add param @@ -260,13 +293,15 @@ macro rpcImpl*(p: untyped, publish: untyped, qarg: untyped): untyped = result.add quote do: proc `rpcMethod`( `objId`: WeakRef[`firstType`] - ): (WeakRef[Agent], SigilRequestTy[`firstType`]) = + ): SigilLocalCall[WeakRef[`firstType`], typeof(`construct`)] = var args = `construct` - let name: SigilName = toSigilName(`signalName`) - let req = initSigilRequest[`firstType`, typeof(args)]( - procName = name, args = ensureMove args, origin = `objId`.getSigilId() + const name: SigilName = toSigilName(`signalName`) + result = SigilLocalCall[WeakRef[`firstType`], typeof(args)]( + source: `objId`, + procName: name, + origin: `objId`.getSigilId(), + args: ensureMove args ) - result = (`objId`.asAgent(), req) for param in parameters[1 ..^ 1]: result[^1][3].add param diff --git a/sigils/threadPool.nim b/sigils/threadPool.nim index e35ceb10..f399d5cb 100644 --- a/sigils/threadPool.nim +++ b/sigils/threadPool.nim @@ -50,7 +50,8 @@ proc delSelfSubscription(actor: WeakRef[AgentActor], sub: ThreadSub) = if actor[].subcriptions[idx].signal == sub.name and actor[].subcriptions[idx].subscription.tgt == sub.tgt: subsFound.inc() - if sub.fn == nil or actor[].subcriptions[idx].subscription.slot == sub.fn: + if sub.fn == nil or actor[].subcriptions[idx].subscription.packedSlot == + sub.fn: subsDeleted.inc() actor[].subcriptions.delete(idx) if subsFound == subsDeleted: diff --git a/sigils/threadProxies.nim b/sigils/threadProxies.nim index 2b149bf8..c3269ff9 100644 --- a/sigils/threadProxies.nim +++ b/sigils/threadProxies.nim @@ -98,7 +98,7 @@ method addSubscription*( method addSubscription*( obj: AgentProxyShared, sig: SigilName, tgt: WeakRef[Agent], slot: AgentProc ) {.gcsafe, raises: [].} = - obj.addSubscription(sig, Subscription(tgt: tgt, slot: slot)) + obj.addSubscription(sig, Subscription(tgt: tgt, packedSlot: slot)) method delSubscription*( self: AgentProxyShared, sig: SigilName, subscription: Subscription @@ -139,7 +139,7 @@ method callMethod*( return if slot == localSlot or slot == remoteSlot: - debugPrint "\t proxy:callMethod:localSlot: " + debugPrint "\t proxy:callMethod:directSlot: " proxy.callSlots(req) else: var req = req.duplicate() @@ -201,7 +201,7 @@ iterator findSubscribedTo( for item in other[].subcriptions.mitems(): if item.subscription.tgt == agent: yield (item.signal, Subscription(tgt: other, - slot: item.subscription.slot)) + packedSlot: item.subscription.packedSlot)) proc moveToThread*[T: AgentActor, R: SigilThread]( agentTy: var T, thread: ptr R, inbox = 1_000 @@ -241,14 +241,14 @@ proc moveToThread*[T: AgentActor, R: SigilThread]( var listenSubs = false for item in oldListeningSubs: item.subscription.tgt[].addSubscription(item.signal, localProxy, - item.subscription.slot) + item.subscription.packedSlot) listenSubs = true # update my subcriptionsTable so agent uses the remote proxy to send events back var hasSubs = false for item in oldSubscribers: localProxy.addSubscription(item.signal, item.subscription.tgt, - item.subscription.slot) + item.subscription.packedSlot) hasSubs = true thread.send(ThreadSignal(kind: Move, item: move agentTy)) diff --git a/tests/tbenchmarks.nim b/tests/tbenchmarks.nim index b9335adb..df914219 100644 --- a/tests/tbenchmarks.nim +++ b/tests/tbenchmarks.nim @@ -1,6 +1,5 @@ import std/monotimes import std/strformat -import std/math import unittest # Core modules under test @@ -34,48 +33,79 @@ type proc bump*(tp: Emitter, val: int) {.signal.} proc onBump*(self: Counter, val: int) {.slot.} = - self.value += 1 + self.value += val -var durationMicrosEmitSlot: float +method onBumpMethod*(self: Agent, val: int) {.base.} = + discard + +method onBumpMethod*(self: Counter, val: int) = + self.value += val + +var + durationMicrosDirectProc: float + durationMicrosMethod: float const n = block: - when defined(slowbench): 1_000_000 + when defined(slowbench): 10_000_000 else: 100_000 +const expectedValue = (n * (n - 1)) div 2 suite "benchmarks": - test "emit->slot throughput (tight loop)": + template runBaseline() = + var b {.inject.} = Counter() - var a = Emitter() - var b = Counter() + let t0 = getMonoTime() + for i in 0 ..< n: + b.onBump(i) + let dt = getMonoTime() - t0 - connect(a, bump, b, onBump) + check b.value == expectedValue + + let us {.inject.} = dt.inMicroseconds.float + durationMicrosDirectProc = us + + for i in 1..10: + block: + runBaseLine() + + test "slot proc baseline (tight loop)": + runBaseLine() + let opsPerSec = (n.float * 1_000_000.0) / max(1.0, us) + echo &"[bench] slot proc baseline: n={n}, time={us:.2f} us, rate={opsPerSec:.0f} ops/s, procRatio=1.00" + + test "nim method call (tight loop)": + var b: Agent = Counter() let t0 = getMonoTime() for i in 0 ..< n: - emit a.bump(i) + b.onBumpMethod(i) let dt = getMonoTime() - t0 - check b.value == n + check Counter(b).value == expectedValue - durationMicrosEmitSlot = dt.inMicroseconds.float - let ms = dt.inMilliseconds.float - let opsPerSec = (n.float * 1000.0) / max(1.0, ms) - echo &"[bench] emit->slot: n={n}, time={ms:.2f} ms, rate={opsPerSec:.0f} ops/s, time={dt.inMicroseconds} us" + let us = dt.inMicroseconds.float + durationMicrosMethod = us + let opsPerSec = (n.float * 1_000_000.0) / max(1.0, us) + echo &"[bench] nim method call: n={n}, time={us:.2f} us, rate={opsPerSec:.0f} ops/s, procRatio={us / durationMicrosDirectProc:.2f}" - test "slot direct call (tight loop)": + test "emit->slot throughput (tight loop)": var a = Emitter() var b = Counter() + connect(a, bump, b, onBump) + let t0 = getMonoTime() for i in 0 ..< n: - b.onBump(i) + emit a.bump(i) let dt = getMonoTime() - t0 - check b.value == n + check b.value == expectedValue let us = dt.inMicroseconds.float - let opsPerSec = (n.float * 1_000_000.0) / max(1.0, us) - echo &"[bench] slot direct call: n={n}, time={us:.2f} us, rate={opsPerSec:.0f} ops/s, ratio={durationMicrosEmitSlot / us:.2f}" + let ms = dt.inMilliseconds.float + let opsPerSec = (n.float * 1000.0) / max(1.0, ms) + echo &"[bench] emit->slot: n={n}, time={ms:.2f} ms, timeUs={dt.inMicroseconds}, rate={opsPerSec:.0f} ops/s" + echo &"[bench] emit->slot ratios: procRatio={us / durationMicrosDirectProc:.2f}, methodRatio={us / durationMicrosMethod:.2f}" when not defined(sigilsCborSerde) and not defined(sigilsJsonSerde): test "reactive computed (lazy) update+read": @@ -108,4 +138,3 @@ suite "benchmarks": let ms = dt.inMilliseconds.float let itersPerSec = (n.float * 1000.0) / max(1.0, ms) echo &"[bench] reactive (eager): n={n}, time={ms:.2f} ms, rate={itersPerSec:.0f} iters/s" - diff --git a/tests/tbenchmarks2.nim b/tests/tbenchmarks2.nim index db8cbb54..c76dce1f 100644 --- a/tests/tbenchmarks2.nim +++ b/tests/tbenchmarks2.nim @@ -1,6 +1,5 @@ import std/monotimes import std/strformat -import std/math import unittest # Core modules under test @@ -31,52 +30,78 @@ type proc bump*(tp: Emitter, val: array[1024, int]) {.signal.} proc onBump*(self: Counter, val: array[1024, int]) {.slot.} = - self.value += 1 + self.value += val[0] -var durationMicrosEmitSlot: float +method onBumpMethod*(self: Agent, val: array[1024, int]) {.base.} = + discard + +method onBumpMethod*(self: Counter, val: array[1024, int]) = + self.value += val[0] + +var + durationMicrosDirectProc: float + durationMicrosMethod: float const n = block: when defined(slowbench): 1_000_000 else: 100_000 +const expectedValue = (n * (n - 1)) div 2 suite "benchmarks": - test "emit->slot throughput (tight loop)": - - var a = Emitter() + test "slot proc baseline (tight loop)": var b = Counter() - connect(a, bump, b, onBump) + let t0 = getMonoTime() + var x: array[1024, int] + for i in 0 ..< n: + x[0] = i + b.onBump(x) + let dt = getMonoTime() - t0 + + check b.value == expectedValue + + let us = dt.inMicroseconds.float + durationMicrosDirectProc = us + let opsPerSec = (n.float * 1_000_000.0) / max(1.0, us) + echo &"[bench] slot proc baseline: n={n}, time={us:.2f} us, rate={opsPerSec:.0f} ops/s, procRatio=1.00" + + test "nim method call (tight loop)": + var b: Agent = Counter() let t0 = getMonoTime() var x: array[1024, int] for i in 0 ..< n: x[0] = i - emit a.bump(x) + b.onBumpMethod(x) let dt = getMonoTime() - t0 - check b.value == n + check Counter(b).value == expectedValue - durationMicrosEmitSlot = dt.inMicroseconds.float - let ms = dt.inMilliseconds.float - let opsPerSec = (n.float * 1000.0) / max(1.0, ms) - echo &"[bench] emit->slot: n={n}, time={ms:.2f} ms, rate={opsPerSec:.0f} ops/s, time={dt.inMicroseconds} us" + let us = dt.inMicroseconds.float + durationMicrosMethod = us + let opsPerSec = (n.float * 1_000_000.0) / max(1.0, us) + echo &"[bench] nim method call: n={n}, time={us:.2f} us, rate={opsPerSec:.0f} ops/s, procRatio={us / durationMicrosDirectProc:.2f}" - test "slot direct call (tight loop)": + test "emit->slot throughput (tight loop)": var a = Emitter() var b = Counter() + connect(a, bump, b, onBump) + let t0 = getMonoTime() var x: array[1024, int] for i in 0 ..< n: x[0] = i - b.onBump(x) + emit a.bump(x) let dt = getMonoTime() - t0 - check b.value == n + check b.value == expectedValue let us = dt.inMicroseconds.float - let opsPerSec = (n.float * 1_000_000.0) / max(1.0, us) - echo &"[bench] slot direct call: n={n}, time={us:.2f} us, rate={opsPerSec:.0f} ops/s, ratio={durationMicrosEmitSlot / us:.2f}" + let ms = dt.inMilliseconds.float + let opsPerSec = (n.float * 1000.0) / max(1.0, ms) + echo &"[bench] emit->slot: n={n}, time={ms:.2f} ms, timeUs={dt.inMicroseconds}, rate={opsPerSec:.0f} ops/s" + echo &"[bench] emit->slot ratios: procRatio={us / durationMicrosDirectProc:.2f}, methodRatio={us / durationMicrosMethod:.2f}" when false: test "reactive computed (lazy) update+read": @@ -109,4 +134,3 @@ suite "benchmarks": let ms = dt.inMilliseconds.float let itersPerSec = (n.float * 1000.0) / max(1.0, ms) echo &"[bench] reactive (eager): n={n}, time={ms:.2f} ms, rate={itersPerSec:.0f} iters/s" - diff --git a/tests/tslotsThread.nim b/tests/tslotsThread.nim index 76422090..d45cb77a 100644 --- a/tests/tslotsThread.nim +++ b/tests/tslotsThread.nim @@ -7,9 +7,6 @@ import threading/atomics import sigils import sigils/threads -import std/terminal -import std/strutils - type SomeAction* = ref object of AgentActor value: int @@ -117,14 +114,15 @@ suite "threaded agent slots": b = Counter.new() c = Counter.new() - var agentResults = newChan[(WeakRef[Agent], SigilRequest)]() - connect(a, valueChanged, b, setValue) connect(a, valueChanged, c, Counter.setValue) let wa: WeakRef[SomeAction] = a.unsafeWeakRef() + type ActionCall = typeof(wa.valueChanged(137)) + var agentResults = newChan[ActionCall]() + emit wa.valueChanged(137) - check typeof(wa.valueChanged(137)) is (WeakRef[Agent], SigilRequest) + check ActionCall is SigilLocalCall[WeakRef[SomeAction], (int, )] check wa[].value == 0 check b.value == 137 @@ -199,7 +197,7 @@ suite "threaded agent slots": printConnections(bp.remote[]) let subLocalProxy = Subscription( - tgt: bp.unsafeWeakRef().asAgent(), slot: setValueGlobal(Counter) + tgt: bp.unsafeWeakRef().asAgent(), packedSlot: setValueGlobal(Counter) ) subs = a.getSubscriptions(sigName"valueChanged").toSeq() doAssert subs.len() >= 1 @@ -268,7 +266,7 @@ suite "threaded agent slots": let subLocalProxy = Subscription( - tgt: bp.unsafeWeakRef().asAgent(), slot: setValueGlobal(Counter) + tgt: bp.unsafeWeakRef().asAgent(), packedSlot: setValueGlobal(Counter) ) check a.subcriptions.len() == 0 check a.listening.len() == 1 diff --git a/tests/tweakrefs.nim b/tests/tweakrefs.nim index ac65e67b..e11f60df 100644 --- a/tests/tweakrefs.nim +++ b/tests/tweakrefs.nim @@ -149,7 +149,7 @@ suite "agent weak refs": emit x.valueChanged(137) echo "X::count:end: ", x.unsafeGcCount() echo "Y::count:end: ", y.unsafeGcCount() - check x.unsafeGcCount() == 2 + check x.unsafeGcCount() == 1 # var xx = x # check x.unsafeGcCount() == 2