Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.temporal.internal.nexus;

import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import io.nexusrpc.handler.HandlerException;
import io.nexusrpc.handler.OperationContext;
import io.nexusrpc.handler.OperationStartDetails;
import io.temporal.api.common.v1.Link;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.client.NexusStartWorkflowResponse;
import java.net.URISyntaxException;
import java.util.function.Function;

/**
* Shared helper for starting a workflow from a Nexus operation and attaching workflow links to the
* operation context. Used by both {@code WorkflowRunOperationImpl} and {@code TemporalNexusClient}.
*/
public class NexusStartWorkflowHelper {

/**
* Starts a workflow via the provided invoker function, attaches workflow links to the operation
* context, and returns the response.
*
* @param ctx the operation context (links will be attached as a side-effect)
* @param details the operation start details containing requestId, callback, links
* @param invoker function that starts the workflow given a {@link NexusStartWorkflowRequest}
* @return the {@link NexusStartWorkflowResponse} containing the operation token and workflow
* execution
*/
public static NexusStartWorkflowResponse startWorkflowAndAttachLinks(
OperationContext ctx,
OperationStartDetails details,
Function<NexusStartWorkflowRequest, NexusStartWorkflowResponse> invoker) {
InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();

NexusStartWorkflowRequest nexusRequest =
new NexusStartWorkflowRequest(
details.getRequestId(),
details.getCallbackUrl(),
details.getCallbackHeaders(),
nexusCtx.getTaskQueue(),
details.getLinks());

NexusStartWorkflowResponse response = invoker.apply(nexusRequest);
WorkflowExecution workflowExec = response.getWorkflowExecution();

// If the start workflow response returned a link use it, otherwise
// create the link information about the new workflow and return to the caller.
Link.WorkflowEvent workflowEventLink =
nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent()
? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent()
: null;
if (workflowEventLink == null) {
workflowEventLink =
Link.WorkflowEvent.newBuilder()
.setNamespace(nexusCtx.getNamespace())
.setWorkflowId(workflowExec.getWorkflowId())
.setRunId(workflowExec.getRunId())
.setEventRef(
Link.WorkflowEvent.EventReference.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
.build();
}
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
if (nexusLink != null) {
try {
ctx.addLinks(nexusProtoLinkToLink(nexusLink));
} catch (URISyntaxException e) {
throw new HandlerException(HandlerException.ErrorType.INTERNAL, "failed to parse URI", e);
}
}

return response;
}

private NexusStartWorkflowHelper() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

public class WorkflowRunOperationToken {
/** Deserialized representation of a Nexus operation token. */
public class OperationToken {
@JsonProperty("v")
@JsonInclude(JsonInclude.Include.NON_NULL)
private final Integer version;
Expand All @@ -17,7 +18,7 @@ public class WorkflowRunOperationToken {
@JsonProperty("wid")
private final String workflowId;

public WorkflowRunOperationToken(
public OperationToken(
@JsonProperty("t") Integer type,
@JsonProperty("ns") String namespace,
@JsonProperty("wid") String workflowId,
Expand All @@ -28,8 +29,8 @@ public WorkflowRunOperationToken(
this.version = version;
}

public WorkflowRunOperationToken(String namespace, String workflowId) {
this.type = OperationTokenType.WORKFLOW_RUN;
public OperationToken(OperationTokenType type, String namespace, String workflowId) {
this.type = type;
this.namespace = namespace;
this.workflowId = workflowId;
this.version = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,45 @@ public class OperationTokenUtil {
private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();

/**
* Load a workflow run operation token from an operation token.
* Load and validate an operation token without asserting the token type. Use this for cancel
* dispatch where the token type determines the cancel behavior.
*
* @throws IllegalArgumentException if the operation token is invalid
* @throws IllegalArgumentException if the operation token is malformed or has invalid structure
*/
public static WorkflowRunOperationToken loadWorkflowRunOperationToken(String operationToken) {
WorkflowRunOperationToken token;
public static OperationToken loadOperationToken(String operationToken) {
OperationToken token;
try {
JavaType reference = mapper.getTypeFactory().constructType(WorkflowRunOperationToken.class);
JavaType reference = mapper.getTypeFactory().constructType(OperationToken.class);
token = mapper.readValue(decoder.decode(operationToken), reference);
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse operation token: " + e.getMessage());
}
if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) {
throw new IllegalArgumentException(
"Invalid workflow run token: incorrect operation token type: " + token.getType());
}
if (token.getVersion() != null && token.getVersion() != 0) {
throw new IllegalArgumentException("Invalid workflow run token: unexpected version field");
throw new IllegalArgumentException("Invalid operation token: unexpected version field");
}
if (Strings.isNullOrEmpty(token.getWorkflowId())) {
throw new IllegalArgumentException("Invalid workflow run token: missing workflow ID (wid)");
throw new IllegalArgumentException("Invalid operation token: missing workflow ID (wid)");
}
return token;
Comment thread
Quinn-With-Two-Ns marked this conversation as resolved.
}

/**
* Load a workflow run operation token, asserting that the token type is {@link
* OperationTokenType#WORKFLOW_RUN}.
*
* @throws IllegalArgumentException if the operation token is invalid or not a workflow run token
*/
public static OperationToken loadWorkflowRunOperationToken(String operationToken) {
OperationToken token = loadOperationToken(operationToken);
if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) {
throw new IllegalArgumentException(
"Invalid workflow run token: incorrect operation token type: " + token.getType());
}
return token;
}

/**
* Attempt to extract the workflow Id from an operation token.
* Extract the workflow ID from a workflow run operation token.
*
* @throws IllegalArgumentException if the operation token is invalid
*/
Expand All @@ -52,7 +64,9 @@ public static String loadWorkflowIdFromOperationToken(String operationToken) {
/** Generate a workflow run operation token from a workflow ID and namespace. */
public static String generateWorkflowRunOperationToken(String workflowId, String namespace)
throws JsonProcessingException {
String json = ow.writeValueAsString(new WorkflowRunOperationToken(namespace, workflowId));
String json =
ow.writeValueAsString(
new OperationToken(OperationTokenType.WORKFLOW_RUN, namespace, workflowId));
return encoder.encodeToString(json.getBytes());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package io.temporal.nexus;

import io.nexusrpc.handler.OperationContext;
import io.nexusrpc.handler.OperationStartDetails;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.Experimental;
import io.temporal.internal.client.NexusStartWorkflowResponse;
import io.temporal.internal.nexus.NexusStartWorkflowHelper;
import io.temporal.workflow.Functions;
import java.util.Objects;

/**
* Nexus-aware client wrapping {@link WorkflowClient}. Provides methods for interacting with
* Temporal workflows from within a Nexus operation handler.
*
* <p>Obtained via the {@link TemporalOperationHandler.StartFunction} parameter. The client creates
* workflow stubs internally — users pass the workflow class, a lambda that calls the workflow
* method, and workflow options.
*
* <p>Usage example:
*
* <pre>{@code
* @OperationImpl
* public OperationHandler<OrderInput, OrderResult> createOrder() {
* return TemporalOperationHandler.from((context, client, input) -> {
* return client.startWorkflow(
* OrderWorkflow.class,
* wf -> wf.processOrder(input),
* WorkflowOptions.newBuilder()
* .setWorkflowId("order-" + context.getRequestId())
* .build());
* });
* }
* }</pre>
*
* <p>For advanced use cases, the underlying {@link WorkflowClient} can be accessed via {@link
* #getWorkflowClient()}. For example, to send a signal and return a synchronous result:
*
* <pre>{@code
* @OperationImpl
* public OperationHandler<CancelOrderInput, Void> cancelOrder() {
* return TemporalOperationHandler.from((context, client, input) -> {
* client.getWorkflowClient()
* .newUntypedWorkflowStub("order-" + input.getOrderId())
* .signal("requestCancellation", input);
* return TemporalOperationResult.sync(null);
* });
* }
* }</pre>
*/
@Experimental
public final class TemporalNexusClient {

private final WorkflowClient client;
private final OperationContext operationContext;
private final OperationStartDetails operationStartDetails;

TemporalNexusClient(
WorkflowClient client,
OperationContext operationContext,
OperationStartDetails operationStartDetails) {
this.client = Objects.requireNonNull(client);
this.operationContext = Objects.requireNonNull(operationContext);
this.operationStartDetails = Objects.requireNonNull(operationStartDetails);
}

/** Returns the underlying {@link WorkflowClient} for advanced use cases. */
public WorkflowClient getWorkflowClient() {
return client;
}

/**
* Starts a workflow by invoking a returning method on a workflow stub. The client creates the
* stub from the given class and options, then invokes the workflow method via the provided
* function.
*
* <p>Example:
*
* <pre>{@code
* client.startWorkflow(MyWorkflow.class, wf -> wf.run(input), options)
* }</pre>
*
* <p>For void-returning workflow methods, use a block lambda that returns null:
*
* <pre>{@code
* client.startWorkflow(MyWorkflow.class, wf -> { wf.execute(input); return null; }, options)
* }</pre>
*
* @param workflowClass the workflow interface class
* @param workflowMethod receives the workflow stub and calls exactly one workflow method
* @param options workflow start options (must include workflowId)
* @param <T> the workflow interface type
* @param <R> the workflow return type
* @return an async {@link TemporalOperationResult} with the workflow-run operation token
*/
public <T, R> TemporalOperationResult<R> startWorkflow(
Class<T> workflowClass, Functions.Func1<T, R> workflowMethod, WorkflowOptions options) {
T stub = client.newWorkflowStub(workflowClass, options);
Functions.Func<R> bound = () -> workflowMethod.apply(stub);
return invokeAndReturn(WorkflowHandle.fromWorkflowMethod(bound));
}

/**
* Starts a workflow using an untyped workflow type name.
*
* @param workflowType the workflow type name string
* @param resultClass the expected result class
* @param args workflow arguments
* @param options workflow start options (must include workflowId)
* @param <R> the workflow return type
* @return an async {@link TemporalOperationResult} with the workflow-run operation token
*/
public <R> TemporalOperationResult<R> startWorkflow(
String workflowType, Class<R> resultClass, Object[] args, WorkflowOptions options) {
WorkflowStub stub = client.newUntypedWorkflowStub(workflowType, options);
WorkflowHandle<R> handle = WorkflowHandle.fromWorkflowStub(stub, resultClass, args);
return invokeAndReturn(handle);
}

private <R> TemporalOperationResult<R> invokeAndReturn(WorkflowHandle<R> handle) {
NexusStartWorkflowResponse response =
NexusStartWorkflowHelper.startWorkflowAndAttachLinks(
operationContext,
operationStartDetails,
request -> handle.getInvoker().invoke(request));
return TemporalOperationResult.async(response.getOperationToken());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.temporal.nexus;

import io.nexusrpc.handler.OperationCancelDetails;
import io.nexusrpc.handler.OperationContext;
import io.temporal.common.Experimental;
import java.util.Objects;

/**
* Context for a Nexus cancel operation. Combines the {@link OperationContext} and {@link
* OperationCancelDetails} into a single object passed to cancel methods on {@link
* TemporalOperationHandler}.
*/
@Experimental
public final class TemporalOperationCancelContext {

private final OperationContext operationContext;
private final OperationCancelDetails operationCancelDetails;

TemporalOperationCancelContext(
OperationContext operationContext, OperationCancelDetails operationCancelDetails) {
this.operationContext = Objects.requireNonNull(operationContext);
this.operationCancelDetails = Objects.requireNonNull(operationCancelDetails);
}

/** Returns the service name for this operation. */
public String getService() {
return operationContext.getService();
}

/** Returns the operation name. */
public String getOperation() {
return operationContext.getOperation();
}

/** Returns the operation token identifying the operation to cancel. */
public String getOperationToken() {
return operationCancelDetails.getOperationToken();
}

/** Returns the underlying {@link OperationContext} for advanced use cases. */
public OperationContext getOperationContext() {
return operationContext;
}

/** Returns the underlying {@link OperationCancelDetails} for advanced use cases. */
public OperationCancelDetails getOperationCancelDetails() {
return operationCancelDetails;
}
}
Loading
Loading