diff --git a/README.md b/README.md index 34232eb..683c3f7 100644 --- a/README.md +++ b/README.md @@ -70,12 +70,14 @@ See the [streaming package README](streaming/README.md) for more details. Pulse builds on top of [replicated maps](rmap/README.md) and [streaming](streaming/README.md) to implement a dedicated worker pool where jobs -are dipatched to workers based on their key and a consistent hashing algorithm. +and messages are dispatched to workers based on their key and a consistent +hashing algorithm. Jobs create durable worker ownership; messages are +hash-routed short-lived work that does not create job state. ```mermaid %%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%% flowchart LR - A[Job Producer] + A[Producer] subgraph Pool[Pool Node] Sink end @@ -83,9 +85,9 @@ flowchart LR Reader B[Worker] end - A-->|Job+Key|Sink - Sink-.->|Job|Reader - Reader-.->|Job|B + A-->|Job or Message + Key|Sink + Sink-.->|Worker Event|Reader + Reader-.->|Worker Event|B classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC; classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6; diff --git a/examples/pool/README.md b/examples/pool/README.md index d1c2a68..ed23c2f 100644 --- a/examples/pool/README.md +++ b/examples/pool/README.md @@ -3,7 +3,8 @@ This example shows how to use the pool package to create a pool of workers. It has three parts: 1. The `worker` process registers a worker with the node and waits for jobs. -2. The `producer` process starts and stops two jobs. It also notifies the worker handling the second job. +2. The `producer` process starts and stops two jobs. It also notifies the worker + that owns the second job. 3. The `scheduler` process starts runs a schedule that starts and stops jobs alternately. ## Running the example @@ -31,8 +32,12 @@ $ source .env $ go run examples/pool/producer/main.go ``` -The above starts and stops two jobs. The first job is handled by the first worker, -and the second job is handled by the second worker. +The above starts and stops two jobs. The first job is handled by the first +worker, and the second job is handled by the second worker. The producer also +sends a job-scoped notification to the worker that owns the second job. + +For keyed work that should be routed by the worker hash ring without creating a +durable job, use `DispatchMessage` instead of `NotifyWorker`. Finally in the same terminal used above run the following command: diff --git a/examples/pool/worker/main.go b/examples/pool/worker/main.go index f8722f7..9199ddc 100644 --- a/examples/pool/worker/main.go +++ b/examples/pool/worker/main.go @@ -115,7 +115,7 @@ func (w *JobHandler) Stop(key string) error { return nil } -// Print notification. +// HandleNotification prints job-scoped notifications for jobs owned locally. func (w *JobHandler) HandleNotification(key string, payload []byte) error { log.Info(w.logctx, log.Fields{"msg": "notification", "key": key, "payload": string(payload)}) return nil diff --git a/pool/README.md b/pool/README.md index bcb2236..b9c096a 100644 --- a/pool/README.md +++ b/pool/README.md @@ -6,10 +6,10 @@ dedicated worker pools. ## Overview -A *dedicated* worker pool uses a consistent hashing algorithm to assign long -running jobs to workers. Each job is associated with a key and each worker with -a range of hashed values. The pool hashes the job key when the job is dispatched -to route the job to the proper worker. +A *dedicated* worker pool uses a consistent hashing algorithm to route keyed +work to workers. Durable jobs use the key to choose the worker that owns and +executes the job. Keyed messages use the same hash ring for short-lived work +without creating job ownership. Workers can be added or removed from the pool dynamically. Jobs get automatically re-assigned to workers when the pool grows or shrinks. This makes @@ -17,13 +17,13 @@ it possible to implement auto-scaling solutions, for example based on queueing delays. Pulse uses the [Jump Consistent Hash](https://arxiv.org/abs/1406.2294) algorithm -to assign jobs to workers which provides a good balance between load balancing +to assign keys to workers, which provides a good balance between load balancing and worker assignment stability. ```mermaid %%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%% flowchart LR - A[Job Producer] + A[Producer] subgraph Pool["Routing Pool Node"] Sink["Job Sink"] end @@ -31,9 +31,9 @@ flowchart LR Reader B[Worker] end - A-->|Job+Key|Sink - Sink-.->|Job|Reader - Reader-.->|Job|B + A-->|Job or Message + Key|Sink + Sink-.->|Worker Event|Reader + Reader-.->|Worker Event|B classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC; classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6; @@ -81,12 +81,22 @@ type JobHandler struct { } // Pulse calls this method to start a job that was assigned to this worker. -func (h *JobHandler) Start(ctx context.Context, key string, payload []byte) error { +func (h *JobHandler) Start(job *pool.Job) error { // ... } // Pulse calls this method to stop a job that was assigned to this worker. -func (h *JobHandler) Stop(ctx context.Context, key string) error { +func (h *JobHandler) Stop(key string) error { + // ... +} + +// Pulse calls this method when a message key hashes to this worker. +func (h *JobHandler) HandleMessage(key string, payload []byte) error { + // ... +} + +// Pulse calls this method when this worker owns the notified job key. +func (h *JobHandler) HandleNotification(key string, payload []byte) error { // ... } ``` @@ -156,10 +166,10 @@ handler object. [![Worker AddWorker](../snippets/pool-addworker.png)](../examples/pool/worker/main.go#L55-L57) The job handler must implement the `Start` and `Stop` methods used to start and -stop jobs. The handler may also optionally implement a `HandleNotification` -method to receive notifications. +stop durable jobs. The handler may also implement `HandleMessage` to receive +keyed messages and `HandleNotification` to receive job-scoped notifications. -[![Worker JobHandler](../snippets/worker-jobhandler.png)](worker.go#L59-L71) +[![Worker JobHandler](../snippets/worker-jobhandler.png)](worker.go#L68-L87) The `AddWorker` function returns a new worker and an error. Workers can be removed from pool nodes using the `RemoveWorker` method. @@ -171,17 +181,35 @@ input a job key and a job payload. [![Pool DispatchJob](../snippets/pool-dispatchjob.png)](../examples/pool/producer/main.go#L39-L42) -The job key is used to route the job to the proper worker. The job payload is -passed to the worker's `Start` method. +The job key is used to route the job to the proper worker. If the worker starts +the job successfully, the worker owns that key until the job stops or moves +during rebalancing. The job payload is passed to the worker's `Start` method. The `DispatchJob` method returns an error if the job could not be dispatched. This can happen if the pool is full or if the job key is invalid. +### Dispatching A Message + +The `DispatchMessage` method sends a keyed, fire-and-forget message to the +worker currently assigned by the pool hash ring. Messages are the right primitive +for short-lived work that needs stable key-based routing but must not create a +durable job. + +Messages do not write job payloads and do not require any worker to own a job +with the same key. The receiving worker must implement `HandleMessage`. A +message handler can return `ErrRequeue` to leave the message pending for +redelivery; any other error is treated as terminal. + ### Notifications -Nodes can send notifications to workers using the `NotifyWorker` method. The method -takes as input a job key and a notification payload. The notification payload -is passed to the worker's `HandleNotification` method. +Nodes can send notifications to workers using the `NotifyWorker` method. A +notification is a job control event: it targets the worker that currently owns an +existing job key and passes the notification payload to that worker's +`HandleNotification` method. + +Use `NotifyWorker` when the message only makes sense for the active owner of an +existing job. Use `DispatchMessage` when there is no durable job with the same +key. ### Stopping A Job diff --git a/pool/marshal.go b/pool/marshal.go index f5d81c4..c17eeea 100644 --- a/pool/marshal.go +++ b/pool/marshal.go @@ -134,7 +134,9 @@ func unmarshalBool(reader *bytes.Reader) bool { return value } -func marshalNotification(key string, payload []byte) []byte { +// marshalKeyedPayload marshals the shared wire shape used by events whose +// routing key is distinct from their opaque handler payload. +func marshalKeyedPayload(key string, payload []byte) []byte { var buf bytes.Buffer if err := binary.Write(&buf, binary.LittleEndian, int32(len(key))); err != nil { panic(err) @@ -151,7 +153,8 @@ func marshalNotification(key string, payload []byte) []byte { return buf.Bytes() } -func unmarshalNotification(data []byte) (string, []byte) { +// unmarshalKeyedPayload unmarshals data produced by marshalKeyedPayload. +func unmarshalKeyedPayload(data []byte) (string, []byte) { reader := bytes.NewReader(data) var keyLength int32 if err := binary.Read(reader, binary.LittleEndian, &keyLength); err != nil { diff --git a/pool/marshal_test.go b/pool/marshal_test.go index e017042..981fafa 100644 --- a/pool/marshal_test.go +++ b/pool/marshal_test.go @@ -59,3 +59,15 @@ func TestMarshalJob(t *testing.T) { }) } } + +func TestMarshalKeyedPayload(t *testing.T) { + key := "test-key" + payload := []byte("test-payload") + + marshaled := marshalKeyedPayload(key, payload) + gotKey, gotPayload := unmarshalKeyedPayload(marshaled) + + assert.Equal(t, key, gotKey) + assert.Equal(t, payload, gotPayload) + assert.Equal(t, key, unmarshalJobKey(marshaled)) +} diff --git a/pool/node.go b/pool/node.go index 8a7ed70..a3c06be 100644 --- a/pool/node.go +++ b/pool/node.go @@ -82,6 +82,8 @@ const ( evInit string = "i" // evStartJob is the event used to send new job to workers. evStartJob string = "j" + // evMessage is the event used to send a keyed message to a hash-ring worker. + evMessage string = "m" // evNotify is the event used to notify a worker running a specific job. evNotify string = "n" // evStopJob is the event used to stop a job. @@ -288,7 +290,8 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No // AddWorker adds a new worker to the pool and returns it. The worker starts // processing jobs immediately. handler can optionally implement the -// NotificationHandler interface to handle notifications. +// NotificationHandler and MessageHandler interfaces to handle job-scoped +// notifications and hash-routed messages. func (node *Node) AddWorker(ctx context.Context, handler JobHandler) (*Worker, error) { if node.IsClosed() { return nil, fmt.Errorf("AddWorker: pool %q is closed", node.PoolName) @@ -361,6 +364,20 @@ func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) e return node.dispatchJob(ctx, key, job) } +// DispatchMessage sends a keyed message to the worker currently assigned by the +// pool hash ring. Messages do not create job ownership and are intended for +// fire-and-forget work that should be load-balanced by key. +func (node *Node) DispatchMessage(ctx context.Context, key string, payload []byte) error { + if node.IsClosed() { + return fmt.Errorf("DispatchMessage: pool %q is closed", node.PoolName) + } + if _, err := node.poolStream.Add(ctx, evMessage, marshalKeyedPayload(key, payload)); err != nil { + return fmt.Errorf("DispatchMessage: failed to add message to stream %q: %w", node.poolStream.Name, err) + } + node.logger.Info("message dispatched", "key", key) + return nil +} + func (node *Node) dispatchJob(ctx context.Context, key string, job []byte) error { if node.IsClosed() { return fmt.Errorf("DispatchJob: pool %q is closed", node.PoolName) @@ -520,12 +537,13 @@ func (node *Node) JobPayload(key string) ([]byte, bool) { return []byte(payload), true } -// NotifyWorker notifies the worker that handles the job with the given key. +// NotifyWorker notifies the worker that currently owns the job with the given +// key. func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte) error { if node.IsClosed() { return fmt.Errorf("NotifyWorker: pool %q is closed", node.PoolName) } - if _, err := node.poolStream.Add(ctx, evNotify, marshalNotification(key, payload)); err != nil { + if _, err := node.poolStream.Add(ctx, evNotify, marshalKeyedPayload(key, payload)); err != nil { return fmt.Errorf("NotifyWorker: failed to add notification to stream %q: %w", node.poolStream.Name, err) } node.logger.Info("notification sent", "key", key) @@ -699,7 +717,7 @@ func (node *Node) routeWorkerEvent(ev *streaming.Event) error { return nil } - // Compute the worker ID that will handle the job. + // Compute the worker ID that will handle the event key. key := unmarshalJobKey(ev.Payload) wid, err := node.workerForEvent(ev.EventName, key) if err != nil { @@ -838,11 +856,11 @@ func (node *Node) returnDispatchStatus(ev *streaming.Event) { val.(chan error) <- err } -// workerForEvent returns the worker that should receive a pool event. Start -// events are routed by the current consistent hash ring; stop and notification -// events target the worker that currently owns the job. +// workerForEvent returns the worker that should receive a pool event. Start and +// message events are routed by the current consistent hash ring; stop and +// notification events target the worker that currently owns the job. func (node *Node) workerForEvent(eventName, key string) (string, error) { - if eventName == evStartJob { + if eventName == evStartJob || eventName == evMessage { activeWorkers := node.activeWorkers() if len(activeWorkers) == 0 { return "", fmt.Errorf("routeWorkerEvent: no active worker in pool %q", node.PoolName) diff --git a/pool/node_test.go b/pool/node_test.go index f74236d..22f3214 100644 --- a/pool/node_test.go +++ b/pool/node_test.go @@ -650,7 +650,7 @@ func TestNotifyWorkerNoHandler(t *testing.T) { defer ptesting.CleanupRedis(t, rdb, true, testName) // Create a worker without NotificationHandler implementation - worker := newTestWorkerWithoutNotify(t, ctx, node) + worker := newTestWorkerWithoutOptionalHandlers(t, ctx, node) // Dispatch a job to ensure the worker is assigned jobKey := "test-job" @@ -678,6 +678,73 @@ func TestNotifyWorkerNoHandler(t *testing.T) { assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node") } +func TestDispatchMessageRoutesByHashWithoutJob(t *testing.T) { + testName := strings.Replace(t.Name(), "/", "_", -1) + ctx := ptesting.NewTestContext(t) + rdb := ptesting.NewRedisClient(t) + node := newTestNode(t, ctx, rdb, testName) + defer ptesting.CleanupRedis(t, rdb, true, testName) + + messageKey := "message-key" + messagePayload := []byte("message payload") + received := make(chan string, 2) + + node.h = &ptesting.Hasher{Index: 1} + handler1 := &mockMessageHandler{mockHandler: newMockHandler()} + handler2 := &mockMessageHandler{mockHandler: newMockHandler()} + worker1, err := node.AddWorker(ctx, handler1) + require.NoError(t, err) + worker2, err := node.AddWorker(ctx, handler2) + require.NoError(t, err) + handler1.messageFunc = func(key string, payload []byte) error { + assert.Equal(t, messageKey, key) + assert.Equal(t, messagePayload, payload) + received <- worker1.ID + return nil + } + handler2.messageFunc = func(key string, payload []byte) error { + assert.Equal(t, messageKey, key) + assert.Equal(t, messagePayload, payload) + received <- worker2.ID + return nil + } + + require.NoError(t, node.DispatchMessage(ctx, messageKey, messagePayload)) + select { + case got := <-received: + assert.Equal(t, worker2.ID, got) + case <-time.After(max): + t.Fatal("message was not routed to hash-ring worker") + } + + assert.Empty(t, worker1.Jobs()) + assert.Empty(t, worker2.Jobs()) + assert.Empty(t, node.JobKeys()) + _, ok := node.JobPayload(messageKey) + assert.False(t, ok) + + assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node") +} + +func TestDispatchMessageRequiresHandler(t *testing.T) { + testName := strings.Replace(t.Name(), "/", "_", -1) + ctx, buf := ptesting.NewBufferedLogContext(t) + rdb := ptesting.NewRedisClient(t) + node := newTestNode(t, ctx, rdb, testName) + defer ptesting.CleanupRedis(t, rdb, true, testName) + + worker := newTestWorkerWithoutOptionalHandlers(t, ctx, node) + assert.NoError(t, node.DispatchMessage(ctx, "message-key", []byte("message payload"))) + + assert.Eventually(t, func() bool { + return strings.Contains(buf.String(), "handler failed: worker") && + strings.Contains(buf.String(), "does not implement MessageHandler") + }, max, delay, "Expected missing MessageHandler error within the timeout period") + assert.Empty(t, worker.Jobs()) + + assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node") +} + func TestRemoveWorkerThenShutdown(t *testing.T) { ctx := ptesting.NewTestContext(t) testName := strings.Replace(t.Name(), "/", "_", -1) diff --git a/pool/testing.go b/pool/testing.go index 3163131..a44c6c9 100644 --- a/pool/testing.go +++ b/pool/testing.go @@ -10,19 +10,26 @@ import ( "goa.design/pulse/pulse" ) -// mockHandler is a mock worker mockHandler for testing +// mockHandler implements the regular test worker job and notification contract. type mockHandler struct { startFunc func(job *Job) error stopFunc func(key string) error notifyFunc func(key string, payload []byte) error } -// mockHandlerWithoutNotify is a mock handler that doesn't implement NotificationHandler -type mockHandlerWithoutNotify struct { +// mockJobHandler implements only the required job handler contract. +type mockJobHandler struct { startFunc func(job *Job) error stopFunc func(key string) error } +// mockMessageHandler extends the regular test job handler with explicit message +// support so tests only opt into the message contract when they need it. +type mockMessageHandler struct { + *mockHandler + messageFunc func(key string, payload []byte) error +} + const ( testWorkerShutdownTTL = 100 * time.Millisecond testJobSinkBlockDuration = 100 * time.Millisecond @@ -50,34 +57,45 @@ func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name stri // worker to the given node. func newTestWorker(t *testing.T, ctx context.Context, node *Node) *Worker { t.Helper() - handler := &mockHandler{ - startFunc: func(job *Job) error { return nil }, - stopFunc: func(key string) error { return nil }, - notifyFunc: func(key string, payload []byte) error { return nil }, - } + handler := newMockHandler() worker, err := node.AddWorker(ctx, handler) require.NoError(t, err) return worker } -// newTestWorkerWithoutNotify creates a new Worker instance for testing purposes. -// It sets up a mock handler without NotificationHandler for testing. -func newTestWorkerWithoutNotify(t *testing.T, ctx context.Context, node *Node) *Worker { +// newTestWorkerWithoutOptionalHandlers creates a worker whose handler only +// implements the required job lifecycle methods. +func newTestWorkerWithoutOptionalHandlers(t *testing.T, ctx context.Context, node *Node) *Worker { t.Helper() - handler := &mockHandlerWithoutNotify{ - startFunc: func(job *Job) error { return nil }, - stopFunc: func(key string) error { return nil }, - } + handler := newMockJobHandler() worker, err := node.AddWorker(ctx, handler) require.NoError(t, err) return worker } +func newMockHandler() *mockHandler { + return &mockHandler{ + startFunc: func(job *Job) error { return nil }, + stopFunc: func(key string) error { return nil }, + notifyFunc: func(key string, payload []byte) error { return nil }, + } +} + +func newMockJobHandler() *mockJobHandler { + return &mockJobHandler{ + startFunc: func(job *Job) error { return nil }, + stopFunc: func(key string) error { return nil }, + } +} + func (w *mockHandler) Start(job *Job) error { return w.startFunc(job) } func (w *mockHandler) Stop(key string) error { return w.stopFunc(key) } func (w *mockHandler) HandleNotification(key string, payload []byte) error { return w.notifyFunc(key, payload) } +func (w *mockMessageHandler) HandleMessage(key string, payload []byte) error { + return w.messageFunc(key, payload) +} -func (h *mockHandlerWithoutNotify) Start(job *Job) error { return h.startFunc(job) } -func (h *mockHandlerWithoutNotify) Stop(key string) error { return h.stopFunc(key) } +func (h *mockJobHandler) Start(job *Job) error { return h.startFunc(job) } +func (h *mockJobHandler) Stop(key string) error { return h.stopFunc(key) } diff --git a/pool/worker.go b/pool/worker.go index 4371cba..671b893 100644 --- a/pool/worker.go +++ b/pool/worker.go @@ -73,12 +73,20 @@ type ( Stop(key string) error } - // NotificationHandler handle job notifications. + // NotificationHandler handles notifications for jobs owned by the worker. NotificationHandler interface { - // HandleNotification handles a notification. + // HandleNotification handles a job-scoped notification. HandleNotification(key string, payload []byte) error } + // MessageHandler handles keyed messages that are routed by the pool hash ring + // without requiring a running job with the same key. Returning ErrRequeue + // leaves the message pending for redelivery; any other error is terminal. + MessageHandler interface { + // HandleMessage handles a keyed message. + HandleMessage(key string, payload []byte) error + } + // ack is a worker event acknowledgement. ack struct { // EventID is the ID of the event being acknowledged. @@ -199,12 +207,16 @@ func (w *Worker) handleEvents(ctx context.Context, c <-chan *streaming.Event) { case evStartJob: w.logger.Debug("handleEvents: received start job", "event", ev.EventName, "id", ev.ID) err = w.startJob(ctx, unmarshalJob(payload)) + case evMessage: + w.logger.Debug("handleEvents: received message", "event", ev.EventName, "id", ev.ID) + key, payload := unmarshalKeyedPayload(payload) + err = w.message(key, payload) case evStopJob: w.logger.Debug("handleEvents: received stop job", "event", ev.EventName, "id", ev.ID) err = w.stopJob(ctx, unmarshalJobKey(payload)) case evNotify: w.logger.Debug("handleEvents: received notify", "event", ev.EventName, "id", ev.ID) - key, payload := unmarshalNotification(payload) + key, payload := unmarshalKeyedPayload(payload) err = w.notify(ctx, key, payload) } if err != nil { @@ -307,7 +319,7 @@ func (w *Worker) releaseJob(ctx context.Context, key string) error { return nil } -// notify notifies the worker with the given payload. +// notify delivers a job-scoped notification after verifying local ownership. func (w *Worker) notify(_ context.Context, key string, payload []byte) error { if w.IsStopped() { w.logger.Debug("worker stopped, ignoring notification") @@ -325,6 +337,20 @@ func (w *Worker) notify(_ context.Context, key string, payload []byte) error { return nh.HandleNotification(key, payload) } +// message handles a keyed message routed by the pool hash ring. Unlike +// notifications, messages are independent of job ownership. +func (w *Worker) message(key string, payload []byte) error { + if w.IsStopped() { + return fmt.Errorf("worker %q stopped", w.ID) + } + mh, ok := w.handler.(MessageHandler) + if !ok { + return fmt.Errorf("worker %q does not implement MessageHandler", w.ID) + } + w.logger.Debug("handled message", "payload", string(payload)) + return mh.HandleMessage(key, payload) +} + // ackPoolEvent acknowledges the pool event that originated from the node with // the given ID. func (w *Worker) ackPoolEvent(ctx context.Context, nodeID, eventID string, ackerr error) {