diff --git a/common/health/checkable.go b/common/health/checkable.go new file mode 100644 index 00000000000..c4f9014405a --- /dev/null +++ b/common/health/checkable.go @@ -0,0 +1,34 @@ +package health + +import ( + enumsspb "go.temporal.io/server/api/enums/v1" +) + +type ( + // Checkable structs represent a component with some health status that changes + // over time. The HostHealthAggregator will periodically call CheckHealth on + // all registered Checkable structs and aggregate their results. When implementing CheckHealth, + // be aware that this is called from a different goroutine! Use locks accordingly. + Checkable interface { + NameForHealthReporting() string + CheckHealth() []HealthCheck + } + + // HealthCheck is an internal struct that should be convertable to the + // HealthCheck proto from proto/internal/temporal/server/api/health/v1/message.proto. + // This struct is copy-friendly and avoids protobuf cruft like the internal mutex + HealthCheck struct { + // CheckName should be a globally-unique string that describes + // this check in under three words. Some examples: + // "host_availability", " + CheckName string + State enumsspb.HealthState + Value float64 + Threshold float64 + // Message is a human-readable description of the result. + // Not all results need a message! If your CheckName is "workflow_error_rate" + // and the value and threshold are set, you probably don't need to re-explain that + // the workflow error rate is above 1%. + Message string + } +) diff --git a/common/health/check_types.go b/common/health/expected_check_types.go similarity index 100% rename from common/health/check_types.go rename to common/health/expected_check_types.go diff --git a/common/health/health_aggregator.go b/common/health/health_aggregator.go new file mode 100644 index 00000000000..972dd8eed08 --- /dev/null +++ b/common/health/health_aggregator.go @@ -0,0 +1,118 @@ +package health + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + enumsspb "go.temporal.io/server/api/enums/v1" +) + +type ( + HostHealthAggregator struct { + lifetime context.Context + checkables map[string]Checkable + start sync.Once + latestObservation atomic.Pointer[AggregateHealth] + // requiredChecks will generate an error entry if there isn't a health + // check reported in the final result with the same CheckName + requiredChecks []string + } + AggregateHealth struct { + state enumsspb.HealthState + checks []HealthCheck + } + // The singleHealthCheckWrapper enables the convenience function AddCheck + // so that single health checks can be added without creating a struct/component for them. + singleHealthCheckWrapper struct { + name string + fn func() HealthCheck + } +) + +func (h *HostHealthAggregator) GetHealth() AggregateHealth { + return *h.latestObservation.Load() +} + +func (h *HostHealthAggregator) Start() { + h.start.Do(func() { + go h.continuouslyMonitor() + }) +} + +// continuouslyMonitor pulls health check results from all registered checkables +func (h *HostHealthAggregator) continuouslyMonitor() { + ticker := time.NewTicker(2 * time.Second) + for { + // Make a guess at initial capacity for the health check slice + observations := make([]HealthCheck, len(h.checkables)+2*len(h.checkables)) + aggregateState := enumsspb.HEALTH_STATE_UNSPECIFIED + for name, observer := range h.checkables { + observations = append(observations, safeObserve(name, observer)...) + } + checkValidation := make(map[string]bool, len(h.checkables)) + for _, reqd := range h.requiredChecks { + checkValidation[reqd] = false + } + for _, check := range observations { + checkValidation[check.CheckName] = true + } + for check, present := range checkValidation { + if !present { + aggregateState = enumsspb.HEALTH_STATE_INTERNAL_ERROR + observations = append(observations, HealthCheck{ + CheckName: fmt.Sprintf("%s-unregistered", check), + State: enumsspb.HEALTH_STATE_INTERNAL_ERROR, + Message: fmt.Sprintf("required checkable %s not registered", check), + }) + } + } + h.latestObservation.Store(&AggregateHealth{ + state: aggregateState, + checks: observations, + }) + select { + case <-ticker.C: + case <-h.lifetime.Done(): + ticker.Stop() + return + } + } +} + +// safeObserve converts panics to normal health check errors +func safeObserve(name string, observer Checkable) (checkResult []HealthCheck) { + defer func() { + if r := recover(); r != nil { + checkResult = []HealthCheck{{ + CheckName: fmt.Sprintf("%s-failure", name), + State: enumsspb.HEALTH_STATE_INTERNAL_ERROR, + Message: fmt.Sprintf("panic'ed when trying to observe: %s", name), + }} + } + }() + checkResult = observer.CheckHealth() + return +} + +// AddCheck adds a single health check to the aggregator. +// This is a convenience function for adding a single health check, and it generates +// a struct per-function registered. If you're calling this multiple times from the same +// struct, implement Checkable and pass the struct to AddObserver. +func (h *HostHealthAggregator) AddCheck(name string, check func() HealthCheck) { + h.checkables[name] = singleHealthCheckWrapper{fn: check} +} + +func (h *HostHealthAggregator) AddObserver(observer Checkable) { + h.checkables[observer.NameForHealthReporting()] = observer +} + +func (w singleHealthCheckWrapper) CheckHealth() []HealthCheck { + return []HealthCheck{w.fn()} +} + +func (w singleHealthCheckWrapper) NameForHealthReporting() string { + return w.name +} diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 0f7cac9dcfa..ae625fd2a9a 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -30,6 +30,7 @@ import ( "go.temporal.io/api/workflowservice/v1" batchspb "go.temporal.io/server/api/batch/v1" deploymentspb "go.temporal.io/server/api/deployment/v1" + enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" schedulespb "go.temporal.io/server/api/schedule/v1" @@ -50,6 +51,7 @@ import ( "go.temporal.io/server/common/enums" "go.temporal.io/server/common/failure" "go.temporal.io/server/common/headers" + internalhealth "go.temporal.io/server/common/health" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/membership" @@ -140,6 +142,7 @@ type ( saValidator *searchattribute.Validator archivalMetadata archiver.ArchivalMetadata healthServer *health.Server + internalHealthState healthCounters overrides *Overrides membershipMonitor membership.Monitor healthInterceptor *interceptor.HealthInterceptor @@ -149,8 +152,59 @@ type ( registry *chasm.Registry workerDeploymentReadRateLimiter quotas.RequestRateLimiter } + // Counters for health monitoring. + healthCounters struct { + mu sync.Mutex + // heartbeatTime is updated periodically + heartbeatTime time.Time + // creationTime is set when NewWorkflowHandler got called + creationTime time.Time + // handlerStartTime is set from WorkflowHandler.Start() after + // it successfully starts the worker and joins RingPop membership + handlerStartTime time.Time + } ) +// CheckHealth implements health.Checkable for health.HostHealthAggregator +func (wh *WorkflowHandler) CheckHealth() []internalhealth.HealthCheck { + checks := make([]internalhealth.HealthCheck, 10) + whState := internalhealth.HealthCheck{ + CheckName: "handler-state", + State: enumsspb.HEALTH_STATE_SERVING, + } + handlerState := atomic.LoadInt32(&wh.status) + switch handlerState { + case common.DaemonStatusInitialized: + // "DaemonStatusInitialized" is a bit of a misnomer: That's the 0-value, + // meaning we have no information on whether the daemon is initialized. + // We do know it's not started, though. + whState.Message = "Daemon not yet started!" + case common.DaemonStatusStarted: + whState.Message = "Daemon started" + case common.DaemonStatusStopped: + whState.Message = "Daemon stopped" + default: + whState.State = enumsspb.HEALTH_STATE_INTERNAL_ERROR + whState.Message = fmt.Sprintf("unknown status %v", handlerState) + } + checks = append(checks, whState) + if handlerState == common.DaemonStatusInitialized { + initializeTimeCheck := internalhealth.HealthCheck{ + CheckName: "initialization-delay", + Value: time.Since(wh.internalHealthState.creationTime).Seconds(), + Threshold: (5 * time.Second).Seconds(), + State: enumsspb.HEALTH_STATE_SERVING, + Message: "Initialization delay", + } + if initializeTimeCheck.Value > initializeTimeCheck.Threshold { + initializeTimeCheck.State = enumsspb.HEALTH_STATE_SERVING + initializeTimeCheck.Message = "Exceeded limit on initialization time!" + } + checks = append(checks, initializeTimeCheck) + } + return checks +} + func (wh *WorkflowHandler) CreateWorkerDeploymentVersion( ctx context.Context, request *workflowservice.CreateWorkerDeploymentVersionRequest, diff --git a/service/frontend/workflow_handler_health_test.go b/service/frontend/workflow_handler_health_test.go new file mode 100644 index 00000000000..d698e76937f --- /dev/null +++ b/service/frontend/workflow_handler_health_test.go @@ -0,0 +1 @@ +package frontend