Skip to content

Add dataproxy client selector#959

Open
katrogan wants to merge 6 commits intomainfrom
katrina/eng26-386-sdk-create-and-cache-per-cluster-conns-for-dataproxy-svc
Open

Add dataproxy client selector#959
katrogan wants to merge 6 commits intomainfrom
katrina/eng26-386-sdk-create-and-cache-per-cluster-conns-for-dataproxy-svc

Conversation

@katrogan
Copy link
Copy Markdown
Contributor

@katrogan katrogan commented Apr 10, 2026

NOT TO BE MERGED UNTIL BE CHANGES ARE IN.

This PR adds a new dependency on the flyte remote client for dataproxy endpoints

  1. Use a client conn cache for dataproxy operations, using 'operation' and 'resource' as cache keys
  2. On cache misses call SelectCluster to get the respective cluster endpoint and initialize and cache a new client conn

Going forward all new dataproxy calls should use this pattern.

Today this PR updates the only uses of dataproxy in the sdk: UploadInputs and CreateUploadLocation


Testing

Verified that the CreateRun path (which calls UploadInputs now) works with a local cluster

flyte -vvv  --config .flyte/config-oss-local.yaml  run -p flytesnacks -d development examples/basics/hello.py main
...
╭────────────────────────────────────────── Remote Run ───────────────────────────────────────────╮
│ Created Run: rmj7wwft78l69stlj69j                                                               │
│ URL: http://localhost:30080/v2/domain/development/project/flytesnacks/runs/rmj7wwft78l69stlj69j │
╰─────────────────────────────────────────────────────────────────────────────────────────────────╯

ref 26-353

Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
Copy link
Copy Markdown
Contributor

@EngHabu EngHabu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, @pingsutw @wild-endeavor should advise though

def cluster_service(self) -> ClusterService:
return self._cluster_service

async def get_dataproxy_for_resource(self, operation: int, resource: object) -> DataProxyService:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this operation be an enum instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +260 to +272
# Build the SelectClusterRequest with the right oneof field
req = cluster_payload_pb2.SelectClusterRequest(operation=operation)
if hasattr(resource, "DESCRIPTOR"):
field_map = {
"OrgIdentifier": "org_id",
"ProjectIdentifier": "project_id",
"TaskIdentifier": "task_id",
"ActionIdentifier": "action_id",
"ActionAttemptIdentifier": "action_attempt_id",
}
field_name = field_map.get(type(resource).__name__)
if field_name:
getattr(req, field_name).CopyFrom(resource)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the best way to create the one of 😬, @wild-endeavor @pingsutw ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do something like

  req = SelectClusterRequest(operation=operation)                  
  if hasattr(resource, "DESCRIPTOR"):                                                                    
      oneof = req.DESCRIPTOR.oneofs_by_name["resource"]  # replace with actual oneof name
      for field in oneof.fields:                                                                         
          if field.message_type is resource.DESCRIPTOR:                                                  
              getattr(req, field.name).CopyFrom(resource)                                                
              break    

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice thanks. done

getattr(req, field_name).CopyFrom(resource)

resp = await self._cluster_service.select_cluster(req)
cluster_endpoint = resp.cluster_endpoint
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to normalize this here? stripping/adding http/s or dns:/// ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary! create_session_config already calls normalize_rpc_endpoint

if field_name:
getattr(req, field_name).CopyFrom(resource)

resp = await self._cluster_service.select_cluster(req)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we are throwing informative errors here in the infamous "no healthy clusters" error.. we now have a good place to catch that and raise a good error to the user

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

EngHabu
EngHabu previously approved these changes Apr 10, 2026
Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants