Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ jobs:
- name: golangci-lint (source/statemachine)
working-directory: source/statemachine
run: go run "$GOLANGCI_LINT" run --config "$GITHUB_WORKSPACE/.golangci.yml" ./...
- name: golangci-lint (tools/docsgen)
working-directory: tools/docsgen
run: go run "$GOLANGCI_LINT" run --config "$GITHUB_WORKSPACE/.golangci.yml" ./...

# The state-machine race-test matrix lives in a reusable workflow so its legs
# render as a single collapsible "state machine tests / …" tree in the checks
Expand Down
14 changes: 12 additions & 2 deletions .github/workflows/docs-deploy.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
name: Deploy docs

# Publishes the Starlight docs site to GitHub Pages on every push to main that
# touches the docs source, the suite's Go source (so regenerated API/diagrams
# stay current — see the DS2 TODO below), or this workflow.
# touches the docs source, the suite's Go source (so the regenerated API
# reference and Mermaid diagrams stay current), the generator itself, or this
# workflow. Every module whose godoc feeds the generated Reference section is
# listed so a godoc change in any of them redeploys the site, not just state.
on:
push:
branches: [main]
paths:
- 'docs/**'
- 'tools/docsgen/**'
- 'state/**'
- 'sink/**'
- 'source/**'
- 'telemetry/**'
- 'durable/**'
- 'cluster/**'
- 'transport/**'
- 'wasm/**'
- '.github/workflows/docs-deploy.yml'

# Least-privilege: read the repo, mint an OIDC token for Pages, write the
Expand Down
117 changes: 117 additions & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"context"
"errors"
"net"
"os"
"os/exec"
"path/filepath"
"testing"

"github.com/stablekernel/crucible/cluster"
"github.com/stablekernel/crucible/durable"
"github.com/stablekernel/crucible/state"
"github.com/stablekernel/crucible/state/expr"
"github.com/stablekernel/crucible/transport"
"github.com/stablekernel/crucible/wasm"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
Expand Down Expand Up @@ -84,6 +88,119 @@ func TestE2E_DurableCELAssignSurvivesRecovery(t *testing.T) {
}
}

// ---- wasm ⊗ state ⊗ durable: a WASM-backed guard survives record/replay ----

type approvalOrder struct {
Amount int64 `json:"amount"`
Status string `json:"status"`
}

// buildApprovalGuest compiles the approval guard guest to wasip1/wasm with the
// standard Go toolchain (no committed binary, no TinyGo) and returns its bytes,
// mirroring the wasm package's own guest build so the e2e joint compiles its
// guard the same proven way.
func buildApprovalGuest(t *testing.T) []byte {
t.Helper()
dir := t.TempDir()
out := filepath.Join(dir, "approval.wasm")
cmd := exec.Command("go", "build", "-buildmode=c-shared", "-o", out, "./testdata/approvalguest")
cmd.Env = append(os.Environ(), "GOOS=wasip1", "GOARCH=wasm")
if buildOut, err := cmd.CombinedOutput(); err != nil {
t.Fatalf("build approval guest: %v\n%s", err, buildOut)
}
b, err := os.ReadFile(out)
if err != nil {
t.Fatalf("read built guest: %v", err)
}
return b
}

// approvalMachine forges an order machine whose approve transition is gated by a
// WASM-backed guard (amount >= 100), authored through the full production flow
// (Forge → ToJSON → LoadFromJSON → Provide) so the foreign-engine guard resolves
// exactly like an in-tree one.
func approvalMachine(t *testing.T, mod *wasm.Module) *state.Machine[string, string, approvalOrder] {
t.Helper()
reg := state.NewRegistry[approvalOrder]()
node := wasm.Guard[string](reg, "approved", mod)

def := state.Forge[string, string, approvalOrder]("approval").
Guard("approved", func(state.GuardCtx[approvalOrder]) bool { return false }). // stub, replaced by Provide
State("pending").
State("approved").
Initial("pending").
Transition("pending").On("approve").GoTo("approved").WhenExpr(node).
Quench()

js, err := def.ToJSON()
if err != nil {
t.Fatalf("ToJSON: %v", err)
}
ir, err := state.LoadFromJSON[string, string, approvalOrder](js)
if err != nil {
t.Fatalf("LoadFromJSON: %v", err)
}
return ir.Provide(reg).Quench()
}

// TestE2E_WASMGuardedDurableTransitionSurvivesRecovery drives a durable instance
// through a transition gated by a WebAssembly-backed guard, then recovers it from
// the store — proving a foreign-engine guard composes with the durable
// record/replay seam: the approved order persists at approved and replays
// deterministically, and a below-threshold order is blocked through the same
// WASM evaluator. It is hermetic (the guest is built on demand with the Go wasm
// toolchain) and deterministic (the guard is a pure predicate over context).
func TestE2E_WASMGuardedDurableTransitionSurvivesRecovery(t *testing.T) {
ctx := context.Background()
mod, err := wasm.Compile(ctx, buildApprovalGuest(t))
if err != nil {
t.Fatalf("compile guard: %v", err)
}
t.Cleanup(func() { _ = mod.Close(ctx) })

m := approvalMachine(t, mod)
store := durable.NewMemStore()
runner := durable.NewRunner(m, store)

// An at-threshold order: the WASM guard admits it, the durable runner records
// the transition, and recovery replays it to the same approved state.
const okID = "order-ok"
okH, err := runner.Start(ctx, okID, approvalOrder{Amount: 150, Status: "pending"}, state.WithInitialState("pending"))
if err != nil {
t.Fatalf("start ok order: %v", err)
}
if _, err = okH.Fire(ctx, "approve"); err != nil {
t.Fatalf("fire approve: %v", err)
}
if got := okH.Instance().Current(); got != "approved" {
t.Fatalf("WASM guard should admit amount 150; current=%q, want approved", got)
}

rec, err := durable.Recover(ctx, m, store, okID)
if err != nil {
t.Fatalf("recover: %v", err)
}
if got := rec.Instance().Current(); got != "approved" {
t.Fatalf("recovered state = %q, want approved (WASM-guarded transition replayed)", got)
}

// A below-threshold order: the same WASM guard blocks it, so the transition is
// rejected (a GuardFailedError) and the durable instance never leaves pending.
const lowID = "order-low"
lowH, err := runner.Start(ctx, lowID, approvalOrder{Amount: 50, Status: "pending"}, state.WithInitialState("pending"))
if err != nil {
t.Fatalf("start low order: %v", err)
}
_, err = lowH.Fire(ctx, "approve")
var guardErr *state.GuardFailedError
if !errors.As(err, &guardErr) {
t.Fatalf("fire approve (low) error = %v, want a *state.GuardFailedError from the WASM guard", err)
}
if got := lowH.Instance().Current(); got != "pending" {
t.Fatalf("WASM guard should block amount 50; current=%q, want pending", got)
}
}

// ---- cluster ⊗ transport ⊗ supervisor: supervised remote actor over gRPC ----

type pinger struct{}
Expand Down
5 changes: 4 additions & 1 deletion e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ replace (
github.com/stablekernel/crucible/state/expr => ../state/expr
github.com/stablekernel/crucible/telemetry => ../telemetry
github.com/stablekernel/crucible/transport => ../transport
github.com/stablekernel/crucible/wasm => ../wasm
)

require (
Expand All @@ -26,6 +27,7 @@ require (
github.com/stablekernel/crucible/state/expr v0.0.0-00010101000000-000000000000
github.com/stablekernel/crucible/telemetry v0.0.0
github.com/stablekernel/crucible/transport v0.0.0-00010101000000-000000000000
github.com/stablekernel/crucible/wasm v0.0.0-00010101000000-000000000000
google.golang.org/grpc v1.81.1
)

Expand All @@ -34,10 +36,11 @@ require (
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/google/cel-go v0.28.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/tetratelabs/wazero v1.12.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/exp v0.0.0-20260209203927-2842357ff358 // indirect
golang.org/x/net v0.51.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/sys v0.45.0 // indirect
golang.org/x/text v0.35.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260226221140-a57be14db171 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 // indirect
Expand Down
14 changes: 8 additions & 6 deletions e2e/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/tetratelabs/wazero v1.12.0 h1:DuWcpNu/FzgEXgGBDp8J1Spc+CWOvvtvVyjKlaZopYU=
github.com/tetratelabs/wazero v1.12.0/go.mod h1:LvKtzl2RqO4gyF27BiXU+nKAjcV8f38U+kP/q2vgxh0=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=
go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0=
go.opentelemetry.io/otel v1.44.0 h1:JjwHmHpA4iZ3wBxluu2fbbE7j4kqlE8jXyAyPXH7HqU=
go.opentelemetry.io/otel v1.44.0/go.mod h1:BMgjTHL9WPRlRjL2oZCBTL4whCGtXch2H4BhOPIAyYc=
go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM=
go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY=
go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg=
go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg=
go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw=
go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A=
go.opentelemetry.io/otel/sdk/metric v1.44.0 h1:3LlKgI+VjbVsjNRFZJZAJ30WjXC5VkNRks6si09iEfI=
go.opentelemetry.io/otel/sdk/metric v1.44.0/go.mod h1:5B5pMARnXxKhltooO4xUuCBorl65a4EpnTalObqOigA=
go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A=
go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
Expand All @@ -41,8 +43,8 @@ golang.org/x/exp v0.0.0-20260209203927-2842357ff358 h1:kpfSV7uLwKJbFSEgNhWzGSL47
golang.org/x/exp v0.0.0-20260209203927-2842357ff358/go.mod h1:R3t0oliuryB5eenPWl3rrQxwnNM3WTwnsRZZiXLAAW8=
golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo=
golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY=
golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
Expand Down
19 changes: 16 additions & 3 deletions e2e/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,22 @@ import (
// through the Store seam without any core importing another.

// shipment is the entity a fulfillment instance advances. Funds gates the pay
// transition so a guard rejection (invalid-for-state) is reachable.
// transition so a guard rejection (invalid-for-state) is reachable. Stage
// records the lifecycle state reached; CurrentStateFn reads it so a shipment
// cast from an in-flight value resumes at its real state, not at pending.
type shipment struct {
Funds bool `json:"funds"`
Funds bool `json:"funds"`
Stage string `json:"stage"`
}

// currentStage derives a shipment's current state for CurrentStateFn. A nil
// entity (a fresh cast with no record) or an empty Stage starts at pending;
// otherwise the recorded stage is honored so resume lands on the real state.
func currentStage(s *shipment) string {
if s == nil || s.Stage == "" {
return "pending"
}
return s.Stage
}

// shipEvent is the decoded inbound command carried in each message's JSON body.
Expand All @@ -51,7 +64,7 @@ func fulfillmentMachine() *state.Machine[string, string, *shipment] {
State("shipped").
State("delivered").
Initial("pending").
CurrentStateFn(func(*shipment) string { return "pending" }).
CurrentStateFn(currentStage).
Transition("pending").On("pay").GoTo("shipped").When("funded").
Transition("shipped").On("deliver").GoTo("delivered").
Quench(state.Strict())
Expand Down
78 changes: 78 additions & 0 deletions e2e/testdata/approvalguest/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//go:build wasip1

// Command approvalguest is a WebAssembly guest implementing a Crucible behavior
// guard over the JSON ABI: it reads a {"context": {"amount": N}} request and
// returns {"ok": bool}, admitting an order whose amount is at or above the
// approval threshold. The e2e wasm joint compiles it to wasip1/wasm on demand
// (GOOS=wasip1 GOARCH=wasm, -buildmode=c-shared) and runs it through wazero, so a
// guard whose truth lives in a foreign module gates a state-machine transition
// exactly like an in-tree guard. No binary is committed; the test builds it.
package main

import (
"encoding/json"
"unsafe"
)

func main() {}

// approvalThreshold is the amount at or above which an order is approved. The
// host test drives one order above it and one below it so both verdicts are
// exercised through the WASM evaluator.
const approvalThreshold = 100

// Fixed input and output buffers at stable linear-memory addresses, so the host
// writes the request to alloc's pointer and reads the response from eval's
// returned pointer without a real allocator — the convention the wasm package's
// own reference guest uses.
var (
inBuf [16 << 10]byte
outBuf [16 << 10]byte
)

func ptrOf(p *byte) uint32 { return uint32(uintptr(unsafe.Pointer(p))) }

// alloc returns the address of the input buffer for the host to write size bytes
// into. The buffer is fixed; size must not exceed it.
//
//nolint:unparam // ABI: alloc must accept the requested size even though the buffer is fixed.
//go:wasmexport alloc
func alloc(size uint32) uint32 {
_ = size
return ptrOf(&inBuf[0])
}

// request is the guard envelope: the read-only context the guard evaluates.
type request struct {
Context struct {
Amount int64 `json:"amount"`
} `json:"context"`
}

// response is the guard verdict envelope.
type response struct {
OK bool `json:"ok"`
}

// eval reads the JSON request the host wrote at the input buffer, evaluates the
// approval predicate amount >= threshold, writes the JSON response into the
// output buffer, and returns a packed (outPtr<<32 | outLen). A malformed request
// is fail-safe: it returns ok=false rather than erroring.
//
//go:wasmexport eval
func eval(ptr, size uint32) uint64 {
_ = ptr // input is always at inBuf; the host wrote it there via alloc.
var req request
if err := json.Unmarshal(inBuf[:size], &req); err != nil {
return write(response{OK: false})
}
return write(response{OK: req.Context.Amount >= approvalThreshold})
}

// write marshals the response into the output buffer and returns the packed
// pointer and length the host unpacks.
func write(resp response) uint64 {
b, _ := json.Marshal(resp)
n := copy(outBuf[:], b)
return uint64(ptrOf(&outBuf[0]))<<32 | uint64(n)
}
37 changes: 37 additions & 0 deletions examples/dispatch/durable_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,43 @@ func TestStartActiveOrder_ExistingInstance(t *testing.T) {
}
}

// TestStartActiveOrder_SubmitError covers the submit-fire error branch: Start
// succeeds, then the Submit step's Append fails, so startActiveOrder surfaces a
// wrapped submit-order error rather than returning a half-started handle.
func TestStartActiveOrder_SubmitError(t *testing.T) {
ctx := context.Background()
// No Appends allowed: the first recorded step (Submit) fails. Loads are open so
// Start's existence probe succeeds.
store := &failingStore{Store: durable.NewMemStore(), allowLoads: 5, allowAppends: 0}

_, _, err := startActiveOrder(ctx, store, durable.InstanceID("order-submit-fail"),
durableOptions(state.NewFakeClock(fixedClockStart)))
if err == nil {
t.Fatal("expected a submit error from the failing store, got nil")
}
if !strings.Contains(err.Error(), "submit order") {
t.Fatalf("error = %v, want a submit-order message", err)
}
}

// TestStartActiveOrder_RunServiceError covers the run-authorize-service error
// branch: Start and the Submit step succeed, then the authorize service's Append
// fails, so startActiveOrder surfaces a wrapped run-authorize-service error.
func TestStartActiveOrder_RunServiceError(t *testing.T) {
ctx := context.Background()
// One Append allowed (Submit); the authorize service's record then fails.
store := &failingStore{Store: durable.NewMemStore(), allowLoads: 5, allowAppends: 1}

_, _, err := startActiveOrder(ctx, store, durable.InstanceID("order-service-fail"),
durableOptions(state.NewFakeClock(fixedClockStart)))
if err == nil {
t.Fatal("expected a run-authorize-service error from the failing store, got nil")
}
if !strings.Contains(err.Error(), "run authorize service") {
t.Fatalf("error = %v, want a run-authorize-service message", err)
}
}

// failingStore wraps a durable.Store and fails Load / History after a configured
// number of successful Loads, so the harness's recovery and reconstruction error
// branches are exercised without disturbing the live recording run (whose only Load
Expand Down
Loading
Loading