Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ VISIBILITY_DB ?= temporal_visibility
# The `disable_grpc_modules` build tag excludes gRPC dependencies from cloud.google.com/go/storage,
# reducing binary size by 16MB since we only use the REST client (storage.NewClient), not the
# gRPC client (storage.NewGRPCClient). Related issue: https://github.com/googleapis/google-cloud-go/issues/12343
#
ALL_BUILD_TAGS := disable_grpc_modules,$(BUILD_TAG)
ALL_TEST_TAGS := $(ALL_BUILD_TAGS),test_dep,$(TEST_TAG)
BUILD_TAG_FLAG := -tags $(ALL_BUILD_TAGS)
Expand Down
20 changes: 20 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,26 @@ values in system search attributes.`,
query.`,
)

// FrontendAPIVariant selects which frontend API variant the
// frontend exposes at startup. Empty (default) registers stable
// WorkflowService only. A non-empty value (e.g. "ping", "tinker") looks
// up the named variant in the experimental registry and registers it
// IN PLACE OF stable WorkflowService at the same wire path; the variant
// delegates stable methods to the existing stable handler. Toggling
// requires a frontend restart.
//
// A variant name is only resolvable if the corresponding build tag was
// set (e.g. -tags experimental). With no tag, the registry is
// empty and any non-empty value here causes Start() to log Fatal —
// production binaries cannot accidentally expose experimental surface.
FrontendAPIVariant = NewGlobalStringSetting(
"frontend.apiVariant",
"",
`FrontendAPIVariant selects which frontend API variant the
frontend exposes. Empty = stable only. See service/frontend/services/
for the list of supported variants in this build.`,
)

HistoryArchivalState = NewGlobalStringSetting(
"system.historyArchivalState",
"", // actual default is from static config
Expand Down
36 changes: 36 additions & 0 deletions common/expguard/expguard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Package expguard checks that experimental proto fields don't cross stable
// service boundaries. The compile-time boundary (experimental symbols gated
// behind //go:build experimental) prevents stable code from *constructing*
// experimental values, but the protobuf wire format preserves them as unknown
// fields when an experimental client talks to a stable server. expguard
// detects that case at the boundary.
package expguard

import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/softassert"
"google.golang.org/protobuf/proto"
)

// Check fires a softassert if msg carries any unknown proto fields.
//
// Unknown fields on an inbound request indicate an experimental client sent
// fields that this build doesn't recognise. In stable prod binaries this
// should never happen; in test binaries the test runner flags the assertion.
//
// site is a short static string describing the call site (e.g.
// "frontend.inbound", "replication.outbound") used for log grouping.
func Check(logger log.Logger, msg proto.Message, site string) {
if msg == nil {
return
}
if len(msg.ProtoReflect().GetUnknown()) == 0 {
return
}
softassert.Fail(logger,
"experimental unknown fields at stable boundary",
tag.NewStringTag("site", site),
tag.NewStringTag("message", string(msg.ProtoReflect().Descriptor().FullName())),
)
}
42 changes: 42 additions & 0 deletions common/rpc/interceptor/experimental_guard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package interceptor

import (
"context"

"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/expguard"
"go.temporal.io/server/common/log"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)

// ExperimentalGuardInterceptor checks inbound requests for unknown proto fields
// when no experimental API variant is active. An unknown field on an inbound
// request means an experimental client sent a field this binary doesn't know
// about. When a variant IS active the server expects experimental traffic, so
// the check is skipped.
type ExperimentalGuardInterceptor struct {
apiVariant dynamicconfig.StringPropertyFn
logger log.Logger
}

func NewExperimentalGuardInterceptor(
apiVariant dynamicconfig.StringPropertyFn,
logger log.Logger,
) *ExperimentalGuardInterceptor {
return &ExperimentalGuardInterceptor{apiVariant: apiVariant, logger: logger}
}

func (g *ExperimentalGuardInterceptor) UnaryIntercept(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
if g.apiVariant() == "" {
if msg, ok := req.(proto.Message); ok {
expguard.Check(g.logger, msg, "frontend.inbound")
}
}
return handler(ctx, req)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ require (
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.temporal.io/api v1.62.12-0.20260424184119-9015efabce8d
go.temporal.io/api v1.62.12-0.20260501020957-6987518fc4e7
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2
go.temporal.io/sdk v1.41.1
go.uber.org/fx v1.24.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0 h1:R
go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0/go.mod h1:I89cynRj8y+383o7tEQVg2SVA6SRgDVIouWPUVXjx0U=
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0 h1:CQvJSldHRUN6Z8jsUeYv8J0lXRvygALXIzsmAeCcZE0=
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0/go.mod h1:xSQ+mEfJe/GjK1LXEyVOoSI1N9JV9ZI923X5kup43W4=
go.temporal.io/api v1.62.12-0.20260424184119-9015efabce8d h1:On7TmNeQ/mm1fxkXCn2Aqqf9Sy8GgcKPJUZunqA7Wpo=
go.temporal.io/api v1.62.12-0.20260424184119-9015efabce8d/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.62.12-0.20260501020957-6987518fc4e7 h1:1GDzeBs5BYccON5UIVuEMrFAY4/Cjkxjc+95YXyooq8=
go.temporal.io/api v1.62.12-0.20260501020957-6987518fc4e7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ=
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50=
go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA=
Expand Down
1 change: 1 addition & 0 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func GrpcServerOptionsProvider(
slowRequestLoggerInterceptor.Intercept,
chasmRequestVisibilityInterceptor.Intercept,
contextMetadataInterceptor.Intercept,
interceptor.NewExperimentalGuardInterceptor(serviceConfig.APIVariant, logger).UnaryIntercept,
}
if len(customInterceptors) > 0 {
// TODO: Deprecate WithChainedFrontendGrpcInterceptors and provide a inner custom interceptor
Expand Down
22 changes: 16 additions & 6 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import (
"sync"
"time"

"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/chasm/lib/activity"
chasmnexus "go.temporal.io/server/chasm/lib/nexusoperation"
"go.temporal.io/server/common/dynamicconfig"
Expand All @@ -20,6 +17,7 @@ import (
"go.temporal.io/server/common/retrypolicy"
"go.temporal.io/server/components/callbacks"
"go.temporal.io/server/components/nexusoperations"
"go.temporal.io/server/service/frontend/services"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
Expand Down Expand Up @@ -232,6 +230,10 @@ type Config struct {
HTTPAllowedHosts dynamicconfig.TypedPropertyFn[*regexp.Regexp]
AllowedExperiments dynamicconfig.TypedPropertyFnWithNamespaceFilter[[]string]

// APIVariant selects which frontend API variant the
// frontend exposes at startup. See service/frontend/services/.
APIVariant dynamicconfig.StringPropertyFn

// CHASM archetypes
Activity *activity.Config
}
Expand Down Expand Up @@ -400,6 +402,8 @@ func NewConfig(
HTTPAllowedHosts: dynamicconfig.FrontendHTTPAllowedHosts.Get(dc),
AllowedExperiments: dynamicconfig.FrontendAllowedExperiments.Get(dc),

APIVariant: dynamicconfig.FrontendAPIVariant.Get(dc),

Activity: activity.ConfigProvider(dc),
}
}
Expand Down Expand Up @@ -456,13 +460,19 @@ func NewService(
}

// Start starts the service

func (s *Service) Start() {
s.logger.Info("frontend starting")

healthpb.RegisterHealthServer(s.server, s.healthServer)
workflowservice.RegisterWorkflowServiceServer(s.server, s.handler)
adminservice.RegisterAdminServiceServer(s.server, s.adminHandler)
operatorservice.RegisterOperatorServiceServer(s.server, s.operatorHandler)
services.Register(services.Registration{
Server: s.server,
Workflow: s.handler,
Admin: s.adminHandler,
Operator: s.operatorHandler,
Variant: s.config.APIVariant(),
Logger: s.logger,
})

reflection.Register(s.server)

Expand Down
53 changes: 53 additions & 0 deletions service/frontend/services/api_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package services

import (
"context"

expenums "github.com/temporalio/api-go/experimental/enums/v1"
expworkflowservice "github.com/temporalio/api-go/experimental/workflowservice/v1"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// exampleHandler implements the experimental Echo RPC (experimental_method,
// experimental_message). Types come from the experimental module — no build
// tags required.
type exampleHandler struct{}

func (exampleHandler) Echo(_ context.Context, req *expworkflowservice.EchoRequest) (*expworkflowservice.EchoResponse, error) {
return &expworkflowservice.EchoResponse{Payload: req.GetPayload()}, nil
}

// exampleWorkflowWrapper intercepts stable RPCs to add experimental behaviour.
// Embedding WorkflowServiceServer means every other method passes through.
type exampleWorkflowWrapper struct {
workflowservice.WorkflowServiceServer
}

func (w *exampleWorkflowWrapper) StartWorkflowExecution(
ctx context.Context,
req *workflowservice.StartWorkflowExecutionRequest,
) (*workflowservice.StartWorkflowExecutionResponse, error) {
// experimental_enum_value: react to the FOO conflict policy.
if req.GetWorkflowIdConflictPolicy() == expenums.WORKFLOW_ID_CONFLICT_POLICY_FOO {
return nil, status.Error(codes.InvalidArgument, "WORKFLOW_ID_CONFLICT_POLICY_FOO is not yet supported")
}

// experimental_field (overlay): read foo_text from the request overlay.
if overlay, ok, err := expworkflowservice.GetStartWorkflowExecutionRequestOverlay(req); err != nil {
return nil, err
} else if ok {
_ = overlay.GetFooText() // a real feature would use this
}

return w.WorkflowServiceServer.StartWorkflowExecution(ctx, req)
}

func init() {
register("example", variant{registerWorkflow: func(server *grpc.Server, workflow workflowservice.WorkflowServiceServer) {
wrapped := &exampleWorkflowWrapper{workflow}
registerServiceOverlay(server, workflowservice.WorkflowService_ServiceDesc, wrapped, expworkflowservice.ExampleWorkflowService_ServiceDesc, exampleHandler{})
}})
}
101 changes: 101 additions & 0 deletions service/frontend/services/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package services

import (
"fmt"

"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"google.golang.org/grpc"
)

var entries = map[string]variant{}
var registryErr error

func register(name string, v variant) {
if _, dup := entries[name]; dup {
registryErr = fmt.Errorf("%w; duplicate registration for variant %s", registryErr, name)
return
}
entries[name] = v
}

func get(name string) (variant, bool) { v, ok := entries[name]; return v, ok }
func names() []string { out := make([]string, 0, len(entries)); for n := range entries { out = append(out, n) }; return out }
func registryError() error { return registryErr }

type variant struct {
registerWorkflow func(*grpc.Server, workflowservice.WorkflowServiceServer)
registerAdmin func(*grpc.Server, adminservice.AdminServiceServer)
registerOperator func(*grpc.Server, operatorservice.OperatorServiceServer)
}

type Registration struct {
Server *grpc.Server
Workflow workflowservice.WorkflowServiceServer
Admin adminservice.AdminServiceServer
Operator operatorservice.OperatorServiceServer
Variant string
Logger log.Logger
}

// Register installs the stable frontend gRPC services, or an experimental
// variant compiled into this binary.
func Register(r Registration) {
if err := registryError(); err != nil {
r.Logger.Fatal("invalid experimental API registry", tag.Error(err))
}

selected := variant{}
if r.Variant == "" {
registerVariant(r, selected)
return
}

var ok bool
selected, ok = get(r.Variant)
if !ok {
r.Logger.Fatal(
"frontend.apiVariant set but variant not wired into this binary",
tag.NewStringTag("variant", r.Variant),
tag.NewStringsTag("compiled_in", names()),
)
}

registerVariant(r, selected)
r.Logger.Info("Experimental API variant active",
tag.NewStringTag("variant", r.Variant))
}

func registerVariant(r Registration, v variant) {
registerWorkflow := registerStableWorkflow
if v.registerWorkflow != nil {
registerWorkflow = v.registerWorkflow
}
registerAdmin := registerStableAdmin
if v.registerAdmin != nil {
registerAdmin = v.registerAdmin
}
registerOperator := registerStableOperator
if v.registerOperator != nil {
registerOperator = v.registerOperator
}

registerWorkflow(r.Server, r.Workflow)
registerAdmin(r.Server, r.Admin)
registerOperator(r.Server, r.Operator)
}

func registerStableWorkflow(server *grpc.Server, workflow workflowservice.WorkflowServiceServer) {
workflowservice.RegisterWorkflowServiceServer(server, workflow)
}

func registerStableAdmin(server *grpc.Server, admin adminservice.AdminServiceServer) {
adminservice.RegisterAdminServiceServer(server, admin)
}

func registerStableOperator(server *grpc.Server, operator operatorservice.OperatorServiceServer) {
operatorservice.RegisterOperatorServiceServer(server, operator)
}
12 changes: 12 additions & 0 deletions service/frontend/services/registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package services

import (
"sort"
"testing"
)

func TestRegisteredVariants(t *testing.T) {
names := names()
sort.Strings(names)
t.Logf("variants compiled in: %v", names)
}
Loading