Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions common/health/checkable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package health

import (
enumsspb "go.temporal.io/server/api/enums/v1"
)

type (
// Checkable structs represent a component with some health status that changes
// over time. The HostHealthAggregator will periodically call CheckHealth on
// all registered Checkable structs and aggregate their results. When implementing CheckHealth,
// be aware that this is called from a different goroutine! Use locks accordingly.
Checkable interface {
NameForHealthReporting() string
CheckHealth() []HealthCheck
}

// HealthCheck is an internal struct that should be convertable to the
// HealthCheck proto from proto/internal/temporal/server/api/health/v1/message.proto.
// This struct is copy-friendly and avoids protobuf cruft like the internal mutex
HealthCheck struct {
// CheckName should be a globally-unique string that describes
// this check in under three words. Some examples:
// "host_availability", "
CheckName string
State enumsspb.HealthState
Value float64
Threshold float64
// Message is a human-readable description of the result.
// Not all results need a message! If your CheckName is "workflow_error_rate"
// and the value and threshold are set, you probably don't need to re-explain that
// the workflow error rate is above 1%.
Message string
}
)
File renamed without changes.
118 changes: 118 additions & 0 deletions common/health/health_aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package health

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

enumsspb "go.temporal.io/server/api/enums/v1"
)

type (
HostHealthAggregator struct {
lifetime context.Context
checkables map[string]Checkable
start sync.Once
latestObservation atomic.Pointer[AggregateHealth]
// requiredChecks will generate an error entry if there isn't a health
// check reported in the final result with the same CheckName
requiredChecks []string
}
AggregateHealth struct {
state enumsspb.HealthState
checks []HealthCheck
}
// The singleHealthCheckWrapper enables the convenience function AddCheck
// so that single health checks can be added without creating a struct/component for them.
singleHealthCheckWrapper struct {
name string
fn func() HealthCheck
}
)

func (h *HostHealthAggregator) GetHealth() AggregateHealth {
return *h.latestObservation.Load()
}

func (h *HostHealthAggregator) Start() {
h.start.Do(func() {
go h.continuouslyMonitor()
})
}

// continuouslyMonitor pulls health check results from all registered checkables
func (h *HostHealthAggregator) continuouslyMonitor() {
ticker := time.NewTicker(2 * time.Second)
for {
// Make a guess at initial capacity for the health check slice
observations := make([]HealthCheck, len(h.checkables)+2*len(h.checkables))
aggregateState := enumsspb.HEALTH_STATE_UNSPECIFIED
for name, observer := range h.checkables {
observations = append(observations, safeObserve(name, observer)...)
}
checkValidation := make(map[string]bool, len(h.checkables))
for _, reqd := range h.requiredChecks {
checkValidation[reqd] = false
}
for _, check := range observations {
checkValidation[check.CheckName] = true
}
for check, present := range checkValidation {
if !present {
aggregateState = enumsspb.HEALTH_STATE_INTERNAL_ERROR
observations = append(observations, HealthCheck{
CheckName: fmt.Sprintf("%s-unregistered", check),
State: enumsspb.HEALTH_STATE_INTERNAL_ERROR,
Message: fmt.Sprintf("required checkable %s not registered", check),
})
}
}
h.latestObservation.Store(&AggregateHealth{
state: aggregateState,
checks: observations,
})
select {
case <-ticker.C:
case <-h.lifetime.Done():
ticker.Stop()
return
}
}
}

// safeObserve converts panics to normal health check errors
func safeObserve(name string, observer Checkable) (checkResult []HealthCheck) {
defer func() {
if r := recover(); r != nil {
checkResult = []HealthCheck{{
CheckName: fmt.Sprintf("%s-failure", name),
State: enumsspb.HEALTH_STATE_INTERNAL_ERROR,
Message: fmt.Sprintf("panic'ed when trying to observe: %s", name),
}}
}
}()
checkResult = observer.CheckHealth()
return
}

// AddCheck adds a single health check to the aggregator.
// This is a convenience function for adding a single health check, and it generates
// a struct per-function registered. If you're calling this multiple times from the same
// struct, implement Checkable and pass the struct to AddObserver.
func (h *HostHealthAggregator) AddCheck(name string, check func() HealthCheck) {
h.checkables[name] = singleHealthCheckWrapper{fn: check}
}

func (h *HostHealthAggregator) AddObserver(observer Checkable) {
h.checkables[observer.NameForHealthReporting()] = observer
}

func (w singleHealthCheckWrapper) CheckHealth() []HealthCheck {
return []HealthCheck{w.fn()}
}

func (w singleHealthCheckWrapper) NameForHealthReporting() string {
return w.name
}
54 changes: 54 additions & 0 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.temporal.io/api/workflowservice/v1"
batchspb "go.temporal.io/server/api/batch/v1"
deploymentspb "go.temporal.io/server/api/deployment/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
schedulespb "go.temporal.io/server/api/schedule/v1"
Expand All @@ -50,6 +51,7 @@ import (
"go.temporal.io/server/common/enums"
"go.temporal.io/server/common/failure"
"go.temporal.io/server/common/headers"
internalhealth "go.temporal.io/server/common/health"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
Expand Down Expand Up @@ -140,6 +142,7 @@ type (
saValidator *searchattribute.Validator
archivalMetadata archiver.ArchivalMetadata
healthServer *health.Server
internalHealthState healthCounters
overrides *Overrides
membershipMonitor membership.Monitor
healthInterceptor *interceptor.HealthInterceptor
Expand All @@ -149,8 +152,59 @@ type (
registry *chasm.Registry
workerDeploymentReadRateLimiter quotas.RequestRateLimiter
}
// Counters for health monitoring.
healthCounters struct {
mu sync.Mutex
// heartbeatTime is updated periodically
heartbeatTime time.Time
// creationTime is set when NewWorkflowHandler got called
creationTime time.Time
// handlerStartTime is set from WorkflowHandler.Start() after
// it successfully starts the worker and joins RingPop membership
handlerStartTime time.Time
}
)

// CheckHealth implements health.Checkable for health.HostHealthAggregator
func (wh *WorkflowHandler) CheckHealth() []internalhealth.HealthCheck {
checks := make([]internalhealth.HealthCheck, 10)
whState := internalhealth.HealthCheck{
CheckName: "handler-state",
State: enumsspb.HEALTH_STATE_SERVING,
}
handlerState := atomic.LoadInt32(&wh.status)
switch handlerState {
case common.DaemonStatusInitialized:
// "DaemonStatusInitialized" is a bit of a misnomer: That's the 0-value,
// meaning we have no information on whether the daemon is initialized.
// We do know it's not started, though.
whState.Message = "Daemon not yet started!"
case common.DaemonStatusStarted:
whState.Message = "Daemon started"
case common.DaemonStatusStopped:
whState.Message = "Daemon stopped"
default:
whState.State = enumsspb.HEALTH_STATE_INTERNAL_ERROR
whState.Message = fmt.Sprintf("unknown status %v", handlerState)
}
checks = append(checks, whState)
if handlerState == common.DaemonStatusInitialized {
initializeTimeCheck := internalhealth.HealthCheck{
CheckName: "initialization-delay",
Value: time.Since(wh.internalHealthState.creationTime).Seconds(),
Threshold: (5 * time.Second).Seconds(),
State: enumsspb.HEALTH_STATE_SERVING,
Message: "Initialization delay",
}
if initializeTimeCheck.Value > initializeTimeCheck.Threshold {
initializeTimeCheck.State = enumsspb.HEALTH_STATE_SERVING
initializeTimeCheck.Message = "Exceeded limit on initialization time!"
}
checks = append(checks, initializeTimeCheck)
}
return checks
}

func (wh *WorkflowHandler) CreateWorkerDeploymentVersion(
ctx context.Context,
request *workflowservice.CreateWorkerDeploymentVersionRequest,
Expand Down
1 change: 1 addition & 0 deletions service/frontend/workflow_handler_health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package frontend
Loading