Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,24 @@ See the [streaming package README](streaming/README.md) for more details.

Pulse builds on top of [replicated maps](rmap/README.md) and
[streaming](streaming/README.md) to implement a dedicated worker pool where jobs
are dipatched to workers based on their key and a consistent hashing algorithm.
and messages are dispatched to workers based on their key and a consistent
hashing algorithm. Jobs create durable worker ownership; messages are
hash-routed short-lived work that does not create job state.

```mermaid
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart LR
A[Job Producer]
A[Producer]
subgraph Pool[Pool Node]
Sink
end
subgraph Worker[Pool Node]
Reader
B[Worker]
end
A-->|Job+Key|Sink
Sink-.->|Job|Reader
Reader-.->|Job|B
A-->|Job or Message + Key|Sink
Sink-.->|Worker Event|Reader
Reader-.->|Worker Event|B

classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;
Expand Down
11 changes: 8 additions & 3 deletions examples/pool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
This example shows how to use the pool package to create a pool of workers. It has three parts:

1. The `worker` process registers a worker with the node and waits for jobs.
2. The `producer` process starts and stops two jobs. It also notifies the worker handling the second job.
2. The `producer` process starts and stops two jobs. It also notifies the worker
that owns the second job.
3. The `scheduler` process starts runs a schedule that starts and stops jobs alternately.

## Running the example
Expand Down Expand Up @@ -31,8 +32,12 @@ $ source .env
$ go run examples/pool/producer/main.go
```

The above starts and stops two jobs. The first job is handled by the first worker,
and the second job is handled by the second worker.
The above starts and stops two jobs. The first job is handled by the first
worker, and the second job is handled by the second worker. The producer also
sends a job-scoped notification to the worker that owns the second job.

For keyed work that should be routed by the worker hash ring without creating a
durable job, use `DispatchMessage` instead of `NotifyWorker`.

Finally in the same terminal used above run the following command:

Expand Down
2 changes: 1 addition & 1 deletion examples/pool/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (w *JobHandler) Stop(key string) error {
return nil
}

// Print notification.
// HandleNotification prints job-scoped notifications for jobs owned locally.
func (w *JobHandler) HandleNotification(key string, payload []byte) error {
log.Info(w.logctx, log.Fields{"msg": "notification", "key": key, "payload": string(payload)})
return nil
Expand Down
66 changes: 47 additions & 19 deletions pool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,34 @@ dedicated worker pools.

## Overview

A *dedicated* worker pool uses a consistent hashing algorithm to assign long
running jobs to workers. Each job is associated with a key and each worker with
a range of hashed values. The pool hashes the job key when the job is dispatched
to route the job to the proper worker.
A *dedicated* worker pool uses a consistent hashing algorithm to route keyed
work to workers. Durable jobs use the key to choose the worker that owns and
executes the job. Keyed messages use the same hash ring for short-lived work
without creating job ownership.

Workers can be added or removed from the pool dynamically. Jobs get
automatically re-assigned to workers when the pool grows or shrinks. This makes
it possible to implement auto-scaling solutions, for example based on queueing
delays.

Pulse uses the [Jump Consistent Hash](https://arxiv.org/abs/1406.2294) algorithm
to assign jobs to workers which provides a good balance between load balancing
to assign keys to workers, which provides a good balance between load balancing
and worker assignment stability.

```mermaid
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart LR
A[Job Producer]
A[Producer]
subgraph Pool["<span style='margin: 0 10px;'>Routing Pool Node</span>"]
Sink["Job Sink"]
end
subgraph Worker[Worker Pool Node]
Reader
B[Worker]
end
A-->|Job+Key|Sink
Sink-.->|Job|Reader
Reader-.->|Job|B
A-->|Job or Message + Key|Sink
Sink-.->|Worker Event|Reader
Reader-.->|Worker Event|B

classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;
Expand Down Expand Up @@ -81,12 +81,22 @@ type JobHandler struct {
}

// Pulse calls this method to start a job that was assigned to this worker.
func (h *JobHandler) Start(ctx context.Context, key string, payload []byte) error {
func (h *JobHandler) Start(job *pool.Job) error {
// ...
}

// Pulse calls this method to stop a job that was assigned to this worker.
func (h *JobHandler) Stop(ctx context.Context, key string) error {
func (h *JobHandler) Stop(key string) error {
// ...
}

// Pulse calls this method when a message key hashes to this worker.
func (h *JobHandler) HandleMessage(key string, payload []byte) error {
// ...
}

// Pulse calls this method when this worker owns the notified job key.
func (h *JobHandler) HandleNotification(key string, payload []byte) error {
// ...
}
```
Expand Down Expand Up @@ -156,10 +166,10 @@ handler object.
[![Worker AddWorker](../snippets/pool-addworker.png)](../examples/pool/worker/main.go#L55-L57)

The job handler must implement the `Start` and `Stop` methods used to start and
stop jobs. The handler may also optionally implement a `HandleNotification`
method to receive notifications.
stop durable jobs. The handler may also implement `HandleMessage` to receive
keyed messages and `HandleNotification` to receive job-scoped notifications.

[![Worker JobHandler](../snippets/worker-jobhandler.png)](worker.go#L59-L71)
[![Worker JobHandler](../snippets/worker-jobhandler.png)](worker.go#L68-L87)

The `AddWorker` function returns a new worker and an error. Workers can be
removed from pool nodes using the `RemoveWorker` method.
Expand All @@ -171,17 +181,35 @@ input a job key and a job payload.

[![Pool DispatchJob](../snippets/pool-dispatchjob.png)](../examples/pool/producer/main.go#L39-L42)

The job key is used to route the job to the proper worker. The job payload is
passed to the worker's `Start` method.
The job key is used to route the job to the proper worker. If the worker starts
the job successfully, the worker owns that key until the job stops or moves
during rebalancing. The job payload is passed to the worker's `Start` method.

The `DispatchJob` method returns an error if the job could not be dispatched.
This can happen if the pool is full or if the job key is invalid.

### Dispatching A Message

The `DispatchMessage` method sends a keyed, fire-and-forget message to the
worker currently assigned by the pool hash ring. Messages are the right primitive
for short-lived work that needs stable key-based routing but must not create a
durable job.

Messages do not write job payloads and do not require any worker to own a job
with the same key. The receiving worker must implement `HandleMessage`. A
message handler can return `ErrRequeue` to leave the message pending for
redelivery; any other error is treated as terminal.

### Notifications

Nodes can send notifications to workers using the `NotifyWorker` method. The method
takes as input a job key and a notification payload. The notification payload
is passed to the worker's `HandleNotification` method.
Nodes can send notifications to workers using the `NotifyWorker` method. A
notification is a job control event: it targets the worker that currently owns an
existing job key and passes the notification payload to that worker's
`HandleNotification` method.

Use `NotifyWorker` when the message only makes sense for the active owner of an
existing job. Use `DispatchMessage` when there is no durable job with the same
key.

### Stopping A Job

Expand Down
7 changes: 5 additions & 2 deletions pool/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ func unmarshalBool(reader *bytes.Reader) bool {
return value
}

func marshalNotification(key string, payload []byte) []byte {
// marshalKeyedPayload marshals the shared wire shape used by events whose
// routing key is distinct from their opaque handler payload.
func marshalKeyedPayload(key string, payload []byte) []byte {
var buf bytes.Buffer
if err := binary.Write(&buf, binary.LittleEndian, int32(len(key))); err != nil {
panic(err)
Expand All @@ -151,7 +153,8 @@ func marshalNotification(key string, payload []byte) []byte {
return buf.Bytes()
}

func unmarshalNotification(data []byte) (string, []byte) {
// unmarshalKeyedPayload unmarshals data produced by marshalKeyedPayload.
func unmarshalKeyedPayload(data []byte) (string, []byte) {
reader := bytes.NewReader(data)
var keyLength int32
if err := binary.Read(reader, binary.LittleEndian, &keyLength); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pool/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,15 @@ func TestMarshalJob(t *testing.T) {
})
}
}

func TestMarshalKeyedPayload(t *testing.T) {
key := "test-key"
payload := []byte("test-payload")

marshaled := marshalKeyedPayload(key, payload)
gotKey, gotPayload := unmarshalKeyedPayload(marshaled)

assert.Equal(t, key, gotKey)
assert.Equal(t, payload, gotPayload)
assert.Equal(t, key, unmarshalJobKey(marshaled))
}
34 changes: 26 additions & 8 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ const (
evInit string = "i"
// evStartJob is the event used to send new job to workers.
evStartJob string = "j"
// evMessage is the event used to send a keyed message to a hash-ring worker.
evMessage string = "m"
// evNotify is the event used to notify a worker running a specific job.
evNotify string = "n"
// evStopJob is the event used to stop a job.
Expand Down Expand Up @@ -288,7 +290,8 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No

// AddWorker adds a new worker to the pool and returns it. The worker starts
// processing jobs immediately. handler can optionally implement the
// NotificationHandler interface to handle notifications.
// NotificationHandler and MessageHandler interfaces to handle job-scoped
// notifications and hash-routed messages.
func (node *Node) AddWorker(ctx context.Context, handler JobHandler) (*Worker, error) {
if node.IsClosed() {
return nil, fmt.Errorf("AddWorker: pool %q is closed", node.PoolName)
Expand Down Expand Up @@ -361,6 +364,20 @@ func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) e
return node.dispatchJob(ctx, key, job)
}

// DispatchMessage sends a keyed message to the worker currently assigned by the
// pool hash ring. Messages do not create job ownership and are intended for
// fire-and-forget work that should be load-balanced by key.
func (node *Node) DispatchMessage(ctx context.Context, key string, payload []byte) error {
if node.IsClosed() {
return fmt.Errorf("DispatchMessage: pool %q is closed", node.PoolName)
}
if _, err := node.poolStream.Add(ctx, evMessage, marshalKeyedPayload(key, payload)); err != nil {
return fmt.Errorf("DispatchMessage: failed to add message to stream %q: %w", node.poolStream.Name, err)
}
node.logger.Info("message dispatched", "key", key)
return nil
}

func (node *Node) dispatchJob(ctx context.Context, key string, job []byte) error {
if node.IsClosed() {
return fmt.Errorf("DispatchJob: pool %q is closed", node.PoolName)
Expand Down Expand Up @@ -520,12 +537,13 @@ func (node *Node) JobPayload(key string) ([]byte, bool) {
return []byte(payload), true
}

// NotifyWorker notifies the worker that handles the job with the given key.
// NotifyWorker notifies the worker that currently owns the job with the given
// key.
func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte) error {
if node.IsClosed() {
return fmt.Errorf("NotifyWorker: pool %q is closed", node.PoolName)
}
if _, err := node.poolStream.Add(ctx, evNotify, marshalNotification(key, payload)); err != nil {
if _, err := node.poolStream.Add(ctx, evNotify, marshalKeyedPayload(key, payload)); err != nil {
return fmt.Errorf("NotifyWorker: failed to add notification to stream %q: %w", node.poolStream.Name, err)
}
node.logger.Info("notification sent", "key", key)
Expand Down Expand Up @@ -699,7 +717,7 @@ func (node *Node) routeWorkerEvent(ev *streaming.Event) error {
return nil
}

// Compute the worker ID that will handle the job.
// Compute the worker ID that will handle the event key.
key := unmarshalJobKey(ev.Payload)
wid, err := node.workerForEvent(ev.EventName, key)
if err != nil {
Expand Down Expand Up @@ -838,11 +856,11 @@ func (node *Node) returnDispatchStatus(ev *streaming.Event) {
val.(chan error) <- err
}

// workerForEvent returns the worker that should receive a pool event. Start
// events are routed by the current consistent hash ring; stop and notification
// events target the worker that currently owns the job.
// workerForEvent returns the worker that should receive a pool event. Start and
// message events are routed by the current consistent hash ring; stop and
// notification events target the worker that currently owns the job.
func (node *Node) workerForEvent(eventName, key string) (string, error) {
if eventName == evStartJob {
if eventName == evStartJob || eventName == evMessage {
activeWorkers := node.activeWorkers()
if len(activeWorkers) == 0 {
return "", fmt.Errorf("routeWorkerEvent: no active worker in pool %q", node.PoolName)
Expand Down
69 changes: 68 additions & 1 deletion pool/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func TestNotifyWorkerNoHandler(t *testing.T) {
defer ptesting.CleanupRedis(t, rdb, true, testName)

// Create a worker without NotificationHandler implementation
worker := newTestWorkerWithoutNotify(t, ctx, node)
worker := newTestWorkerWithoutOptionalHandlers(t, ctx, node)

// Dispatch a job to ensure the worker is assigned
jobKey := "test-job"
Expand Down Expand Up @@ -678,6 +678,73 @@ func TestNotifyWorkerNoHandler(t *testing.T) {
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
}

func TestDispatchMessageRoutesByHashWithoutJob(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
ctx := ptesting.NewTestContext(t)
rdb := ptesting.NewRedisClient(t)
node := newTestNode(t, ctx, rdb, testName)
defer ptesting.CleanupRedis(t, rdb, true, testName)

messageKey := "message-key"
messagePayload := []byte("message payload")
received := make(chan string, 2)

node.h = &ptesting.Hasher{Index: 1}
handler1 := &mockMessageHandler{mockHandler: newMockHandler()}
handler2 := &mockMessageHandler{mockHandler: newMockHandler()}
worker1, err := node.AddWorker(ctx, handler1)
require.NoError(t, err)
worker2, err := node.AddWorker(ctx, handler2)
require.NoError(t, err)
handler1.messageFunc = func(key string, payload []byte) error {
assert.Equal(t, messageKey, key)
assert.Equal(t, messagePayload, payload)
received <- worker1.ID
return nil
}
handler2.messageFunc = func(key string, payload []byte) error {
assert.Equal(t, messageKey, key)
assert.Equal(t, messagePayload, payload)
received <- worker2.ID
return nil
}

require.NoError(t, node.DispatchMessage(ctx, messageKey, messagePayload))
select {
case got := <-received:
assert.Equal(t, worker2.ID, got)
case <-time.After(max):
t.Fatal("message was not routed to hash-ring worker")
}

assert.Empty(t, worker1.Jobs())
assert.Empty(t, worker2.Jobs())
assert.Empty(t, node.JobKeys())
_, ok := node.JobPayload(messageKey)
assert.False(t, ok)

assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
}

func TestDispatchMessageRequiresHandler(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
ctx, buf := ptesting.NewBufferedLogContext(t)
rdb := ptesting.NewRedisClient(t)
node := newTestNode(t, ctx, rdb, testName)
defer ptesting.CleanupRedis(t, rdb, true, testName)

worker := newTestWorkerWithoutOptionalHandlers(t, ctx, node)
assert.NoError(t, node.DispatchMessage(ctx, "message-key", []byte("message payload")))

assert.Eventually(t, func() bool {
return strings.Contains(buf.String(), "handler failed: worker") &&
strings.Contains(buf.String(), "does not implement MessageHandler")
}, max, delay, "Expected missing MessageHandler error within the timeout period")
assert.Empty(t, worker.Jobs())

assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
}

func TestRemoveWorkerThenShutdown(t *testing.T) {
ctx := ptesting.NewTestContext(t)
testName := strings.Replace(t.Name(), "/", "_", -1)
Expand Down
Loading
Loading