Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ manager.RegisterTaskHandler(taskKind, handler,
oncetask.WithRetryPolicy(policy),
oncetask.WithLeaseDuration(duration),
oncetask.WithConcurrency(n),
oncetask.WithPollInterval(interval),
)
```

Expand Down Expand Up @@ -113,6 +114,26 @@ oncetask.WithConcurrency(3)
- Keep low for CPU-intensive tasks
- Consider your downstream service rate limits

## Poll Interval

Controls the idle-poll fallback interval for the consumer loop:

```go
oncetask.WithPollInterval(15 * time.Second)
```

**Default:** 1 minute

**Behavior:**
- New task creations wake the loop immediately, regardless of this setting
- Only affects time-based readiness (waitUntil expiry, retry backoffs, recurrence occurrences)
- Lower values reduce scheduling latency at the cost of more Firestore reads

**Recommendations:**
- Default is fine for most workloads
- Reduce when scheduled tasks need to fire close to their `waitUntil`
- Increase to reduce Firestore reads when latency is not critical

## Cancellation Handler

Register a cleanup handler for cancelled tasks:
Expand Down
121 changes: 121 additions & 0 deletions oncetask/handler_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package oncetask

import (
"context"
"testing"
"time"
)

type testHandlerCfgKind string

func TestWithRetryPolicy(t *testing.T) {
config := defaultHandlerConfig
policy := FixedDelayPolicy{MaxAttempts: 7, Delay: 3 * time.Second}
WithRetryPolicy(policy)(&config)
if config.RetryPolicy != policy {
t.Errorf("RetryPolicy: got %v, want %v", config.RetryPolicy, policy)
}
}

func TestWithNoRetry(t *testing.T) {
config := defaultHandlerConfig
WithNoRetry()(&config)
if _, ok := config.RetryPolicy.(NoRetryPolicy); !ok {
t.Errorf("RetryPolicy: got %T, want NoRetryPolicy", config.RetryPolicy)
}
}

func TestWithLeaseDuration(t *testing.T) {
config := defaultHandlerConfig
WithLeaseDuration(7 * time.Minute)(&config)
if config.LeaseDuration != 7*time.Minute {
t.Errorf("LeaseDuration: got %v, want 7m", config.LeaseDuration)
}
}

func TestWithConcurrency(t *testing.T) {
tests := []struct {
name string
n int
want int
}{
{"positive", 5, 5},
{"one", 1, 1},
{"zero ignored", 0, 1},
{"negative ignored", -3, 1},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := defaultHandlerConfig
WithConcurrency(tt.n)(&config)
if config.Concurrency != tt.want {
t.Errorf("Concurrency: got %d, want %d", config.Concurrency, tt.want)
}
})
}
}

func TestWithCancellationHandler(t *testing.T) {
config := defaultHandlerConfig
if config.cancellationTaskHandler != nil {
t.Fatalf("default cancellationTaskHandler should be nil, got %v", config.cancellationTaskHandler)
}
handler := func(ctx context.Context, task *OnceTask[testHandlerCfgKind]) (any, error) {
return nil, nil
}
WithCancellationHandler(handler)(&config)
if config.cancellationTaskHandler == nil {
t.Errorf("cancellationTaskHandler not set")
}
}

func TestWithCancellationRetryPolicy(t *testing.T) {
config := defaultHandlerConfig
policy := FixedDelayPolicy{MaxAttempts: 4, Delay: 2 * time.Second}
WithCancellationRetryPolicy(policy)(&config)
if config.CancellationRetryPolicy != policy {
t.Errorf("CancellationRetryPolicy: got %v, want %v", config.CancellationRetryPolicy, policy)
}
}

func TestWithPollInterval(t *testing.T) {
tests := []struct {
name string
d time.Duration
want time.Duration
}{
{"positive", 30 * time.Second, 30 * time.Second},
{"zero ignored", 0, 1 * time.Minute},
{"negative ignored", -1 * time.Second, 1 * time.Minute},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := defaultHandlerConfig
WithPollInterval(tt.d)(&config)
if config.PollInterval != tt.want {
t.Errorf("PollInterval: got %v, want %v", config.PollInterval, tt.want)
}
})
}
}

func TestDefaultHandlerConfig(t *testing.T) {
if defaultHandlerConfig.LeaseDuration != 10*time.Minute {
t.Errorf("LeaseDuration default: got %v, want 10m", defaultHandlerConfig.LeaseDuration)
}
if defaultHandlerConfig.Concurrency != 1 {
t.Errorf("Concurrency default: got %d, want 1", defaultHandlerConfig.Concurrency)
}
if defaultHandlerConfig.PollInterval != 1*time.Minute {
t.Errorf("PollInterval default: got %v, want 1m", defaultHandlerConfig.PollInterval)
}
if defaultHandlerConfig.cancellationTaskHandler != nil {
t.Errorf("cancellationTaskHandler default: got %v, want nil", defaultHandlerConfig.cancellationTaskHandler)
}
if _, ok := defaultHandlerConfig.RetryPolicy.(ExponentialBackoffPolicy); !ok {
t.Errorf("RetryPolicy default: got %T, want ExponentialBackoffPolicy", defaultHandlerConfig.RetryPolicy)
}
if _, ok := defaultHandlerConfig.CancellationRetryPolicy.(ExponentialBackoffPolicy); !ok {
t.Errorf("CancellationRetryPolicy default: got %T, want ExponentialBackoffPolicy", defaultHandlerConfig.CancellationRetryPolicy)
}
}
2 changes: 2 additions & 0 deletions oncetask/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Manager[TaskKind ~string] interface {
// - WithCancellationRetryPolicy: Configure retry behavior for cancellation handlers
// - WithLeaseDuration: Set how long a task is leased during execution
// - WithConcurrency: Set number of concurrent workers
// - WithPollInterval: Set the idle-poll fallback interval for the consumer loop
RegisterTaskHandler(taskType TaskKind, handler Handler[TaskKind], opts ...HandlerOption) error

// RegisterResourceKeyHandler listens for new tasks and executes the handler for all tasks with the same resource key.
Expand All @@ -103,6 +104,7 @@ type Manager[TaskKind ~string] interface {
// - WithCancellationRetryPolicy: Configure retry behavior for cancellation handlers
// - WithLeaseDuration: Set how long a task is leased during execution
// - WithConcurrency: Set number of concurrent workers
// - WithPollInterval: Set the idle-poll fallback interval for the consumer loop
RegisterResourceKeyHandler(taskType TaskKind, handler ResourceKeyHandler[TaskKind], opts ...HandlerOption) error

// GetTasksByResourceKey retrieves all tasks with the given resource key.
Expand Down
Loading