Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ jobs:
test-name: "billing"
secrets: inherit

e2e-test-thread-checkpoint:
uses: ./.github/workflows/reusable-e2e-test.yml
with:
test-name: "thread-checkpoint"
mock: true
secrets: inherit

e2e-tests-complete:
needs:
[
Expand All @@ -124,6 +131,7 @@ jobs:
e2e-test-thread-state-validation,
e2e-test-connection-drops,
e2e-test-billing,
e2e-test-thread-checkpoint,
]
if: ${{ always() && !cancelled() }}
uses: ./.github/workflows/reusable-success.yml
Expand Down
3 changes: 3 additions & 0 deletions argo/tim-api/values-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ env:
TIM_API_CORS_ENABLED: "false"
TIM_API_PIPEDREAM_CLIENT_ID: "HhrH_HzIBvMdW2rnNpMxkcrK7wiYKscYBzgrLu0jRiY"
TIM_API_PIPEDREAM_PROJECT_ID: "proj_1jsxmER"
# Thread Checkpoints - always enabled, uses gitignore patterns, always skips binaries
TIM_API_CHECKPOINT_MAX_DIR_SIZE_MB: "100"
TIM_API_CHECKPOINT_MAX_FILE_SIZE_MB: "10"

# Gateway API configuration
gateway:
Expand Down
3 changes: 3 additions & 0 deletions charts/tim-api/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ env:
TIM_API_ANALYTICS_ENABLED: "true"
TIM_API_ANALYTICS_BATCH_SIZE: "250"
TIM_API_ANALYTICS_FLUSH_INTERVAL_SECS: "5"
# Thread Checkpoints - always enabled, uses gitignore patterns, always skips binaries
TIM_API_CHECKPOINT_MAX_DIR_SIZE_MB: "100"
TIM_API_CHECKPOINT_MAX_FILE_SIZE_MB: "10"
TIM_API_NATS_URL: "nats://nats.nats.svc.cluster.local:4222"

configMap: {}
Expand Down
2 changes: 1 addition & 1 deletion policies/resources/thread_message.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ resourcePolicy:
resource: "thread-message"
importDerivedRoles: ["common"]
rules:
- actions: [ "read", "list", "stream", "create-user-message" ]
- actions: [ "read", "list", "stream", "create-user-message", "update" ]
effect: EFFECT_ALLOW
derivedRoles: [ "resource_owner" ]
71 changes: 71 additions & 0 deletions proto/tim-api/tim/api/thread/v1alpha1/thread_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "buf/validate/validate.proto";
import "google/api/annotations.proto";
import "google/api/client.proto";
import "google/api/field_behavior.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "tim/api/thread/v1alpha1/thread_types.proto";
import "tim/api/tool/v1alpha1/tool_types.proto";
Expand Down Expand Up @@ -80,6 +81,24 @@ service ThreadService {
};
option (google.api.method_signature) = "parent,user_message";
}

// Edit a thread message (with checkpoint restoration support)
rpc EditThreadMessage(EditThreadMessageRequest) returns (EditThreadMessageResponse) {
option (google.api.http) = {
post: "/v1alpha1/{path=orgs/*/users/*/threads/*/messages/*}:edit"
body: "*"
};
option (google.api.method_signature) = "path,content";
}

// Configure the working directory for a thread (for checkpoint creation)
rpc ConfigureThreadWorkingDirectory(ConfigureThreadWorkingDirectoryRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
post: "/v1alpha1/{path=orgs/*/users/*/threads/*}:configureWorkingDirectory"
body: "*"
};
option (google.api.method_signature) = "path,working_directory";
}
}

// GetThreadRequest is used to get a specific thread by its UID
Expand Down Expand Up @@ -289,3 +308,55 @@ message UserMessage {
(buf.validate.field).string.max_len = 32768
];
}

// EditThreadMessageRequest is used to edit a message (e.g., edit user message content)
message EditThreadMessageRequest {
// The resource path of the message being updated.
string path = 1 [
(google.api.field_behavior) = REQUIRED,
(aep.api.field_info).resource_reference = "tim.settlerlabs.com/llm-message",
(buf.validate.field).string.pattern = "^orgs/[a-fA-F0-9-]{36}/users/[a-fA-F0-9-]{36}/threads/[a-fA-F0-9-]{36}/messages/[a-fA-F0-9-]{36}$"
];

// The new content for the message
string content = 2 [
(buf.validate.field).required = true,
(buf.validate.field).string.min_len = 1,
(buf.validate.field).string.max_len = 32768
];

// Whether to restore files from checkpoint (if one exists).
// If not set: returns error when checkpoint exists (to prompt user).
// If true: restores files from checkpoint and returns file restoration data.
// If false: saves without restoring (new checkpoint replaces old one).
optional bool restore = 3;
}

// EditThreadMessageResponse contains the edited message and any file restoration data
message EditThreadMessageResponse {
// The updated message
LlmMessage message = 1 [
(buf.validate.field).required = true,
(google.api.field_behavior) = REQUIRED
];

// Files to restore from checkpoint (if applicable)
// The client should restore these files before continuing
repeated FileRestoration file_restorations = 2;
}

// ConfigureThreadWorkingDirectoryRequest configures the working directory for checkpoint creation
message ConfigureThreadWorkingDirectoryRequest {
// The resource path of the thread
string path = 1 [
(google.api.field_behavior) = REQUIRED,
(aep.api.field_info).resource_reference = "tim.settlerlabs.com/thread",
(buf.validate.field).string.pattern = "^orgs/[a-fA-F0-9-]{36}/users/[a-fA-F0-9-]{36}/threads/[a-fA-F0-9-]{36}$"
];

// The working directory path
string working_directory = 2 [
(buf.validate.field).required = true,
(buf.validate.field).string.min_len = 1
];
}
20 changes: 20 additions & 0 deletions proto/tim-api/tim/api/thread/v1alpha1/thread_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ message LlmMessage {
// Token usage for this message (optional, set after LLM response)
tim.api.llm_response.v1alpha1.TokenUsage token_usage = 10 [(google.api.field_behavior) = OUTPUT_ONLY];

// Whether a checkpoint exists for this message
bool has_checkpoint = 11 [(google.api.field_behavior) = OUTPUT_ONLY];

// Validation rules to ensure that the correct data is set based on the role type
option (buf.validate.message).cel = {
id: "llm-message.role.valid_data_for_user_role"
Expand Down Expand Up @@ -235,6 +238,23 @@ message StreamErrorEvent {
string error = 1;
}

// FileRestoration contains information about a file to restore from a checkpoint.
// This is a data transfer object, not an API resource.
// buf:lint:ignore AEP_0004_RESOURCE_ANNOTATION
message FileRestoration {
// The absolute path to the file
string path = 1 [
(buf.validate.field).required = true,
(google.api.field_behavior) = REQUIRED
];

// The content to restore
bytes content = 2 [
(buf.validate.field).required = true,
(google.api.field_behavior) = REQUIRED
];
}

// An actor who may participate in creating LLM messages
enum LlmMessageRole {
// Default unspecified
Expand Down
37 changes: 37 additions & 0 deletions tests/system/framework/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,43 @@ def submit_user_message(self, thread_path: str, text: str) -> dict[str, Any]:
response.raise_for_status()
return response.json()

def edit_thread_message(
self, message_path: str, content: str, restore: bool = False
) -> dict[str, Any]:
"""Edit a thread message.

Args:
message_path: Path to the message (e.g., "orgs/{org}/users/{user}/threads/{thread}/messages/{message}")
content: New content for the message
restore: If true, restores files from checkpoint and returns file restoration data.
If false, saves without restoring (new checkpoint replaces old one).
If not set, returns error when checkpoint exists (to prompt user).

Returns:
The updated message with optional file_restorations field
"""
response = self.client.post(
self._url(f"{message_path}:edit"),
headers=self._default_headers,
json={"content": content, "restore": restore},
)
response.raise_for_status()
return response.json()

def configure_thread_working_directory(self, thread_path: str, working_directory: str) -> None:
"""Configure the working directory for a thread.

Args:
thread_path: Path to the thread
working_directory: Absolute path to the working directory
"""
response = self.client.post(
self._url(f"{thread_path}:configureWorkingDirectory"),
headers=self._default_headers,
json={"working_directory": working_directory},
)
response.raise_for_status()

def submit_tool_result(
self,
thread_path: str,
Expand Down
Loading
Loading