-
Notifications
You must be signed in to change notification settings - Fork 65
feat: Added a batch completed callback to the data client mutations batcher #1308
Changes from 1 commit
0b225ac
4e3d8bb
f03977c
701eebb
8320f9d
9069d34
0741d0b
8993539
20fb4cb
c4da6eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,7 @@ | |
| # | ||
| from __future__ import annotations | ||
|
|
||
| from typing import Sequence, TYPE_CHECKING, cast | ||
| from typing import Callable, Optional, Sequence, TYPE_CHECKING, cast | ||
| import atexit | ||
| import warnings | ||
| from collections import deque | ||
|
|
@@ -24,6 +24,10 @@ | |
| from google.cloud.bigtable.data.exceptions import FailedMutationEntryError | ||
| from google.cloud.bigtable.data._helpers import _get_retryable_errors | ||
| from google.cloud.bigtable.data._helpers import _get_timeouts | ||
| from google.cloud.bigtable.data._helpers import ( | ||
| _populate_statuses_from_mutations_exception_group, | ||
| ) | ||
|
|
||
| from google.cloud.bigtable.data._helpers import TABLE_DEFAULT | ||
|
|
||
| from google.cloud.bigtable.data.mutations import ( | ||
|
|
@@ -33,6 +37,9 @@ | |
|
|
||
| from google.cloud.bigtable.data._cross_sync import CrossSync | ||
|
|
||
| from google.rpc import code_pb2 | ||
| from google.rpc import status_pb2 | ||
|
|
||
| if TYPE_CHECKING: | ||
| from google.cloud.bigtable.data.mutations import RowMutationEntry | ||
|
|
||
|
|
@@ -223,6 +230,7 @@ def __init__( | |
| batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, | ||
| batch_retryable_errors: Sequence[type[Exception]] | ||
| | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, | ||
| _batch_completed_callback: Optional[Callable[list[status_pb2.Status]]] = None, | ||
| ): | ||
| self._operation_timeout, self._attempt_timeout = _get_timeouts( | ||
| batch_operation_timeout, batch_attempt_timeout, target | ||
|
|
@@ -269,6 +277,7 @@ def __init__( | |
| self._newest_exceptions: deque[Exception] = deque( | ||
| maxlen=self._exception_list_limit | ||
| ) | ||
| self._user_batch_completed_callback = _batch_completed_callback | ||
| # clean up on program exit | ||
| atexit.register(self._on_exit) | ||
|
|
||
|
|
@@ -380,6 +389,7 @@ async def _execute_mutate_rows( | |
| list of FailedMutationEntryError objects for mutations that failed. | ||
| FailedMutationEntryError objects will not contain index information | ||
| """ | ||
| statuses = [status_pb2.Status(code=code_pb2.Code.OK)] * len(batch) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels risky to me to assume all statuses are successful before starting the operation. Can we set them to unknown at first, and then add an And then maybe we can change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern is more about the sequence of events. If we wait until the operation is complete and we were given an error, it's safer to assume the others succeeded. But making the assumption before starting the operation, feels like it could lead to problems down the line. To be clear, I don't see any problems with the code as it is now, but it still feels a bit risky I had something like this in mind, but let me know what you think |
||
| try: | ||
| operation = CrossSync._MutateRowsOperation( | ||
| self._target.client._gapic_client, | ||
|
|
@@ -391,13 +401,19 @@ async def _execute_mutate_rows( | |
| ) | ||
| await operation.start() | ||
| except MutationsExceptionGroup as e: | ||
| _populate_statuses_from_mutations_exception_group(statuses, e) | ||
|
|
||
| # strip index information from exceptions, since it is not useful in a batch context | ||
| for subexc in e.exceptions: | ||
| subexc.index = None | ||
| return list(e.exceptions) | ||
| finally: | ||
| # mark batch as complete in flow control | ||
| await self._flow_control.remove_from_flow(batch) | ||
|
|
||
| # Call batch done callback with list of statuses. | ||
| if self._user_batch_completed_callback: | ||
| self._user_batch_completed_callback(statuses) | ||
| return [] | ||
|
|
||
| def _add_exceptions(self, excs: list[Exception]): | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -26,6 +26,10 @@ | |||||||||||||||||||||||||||
| from google.api_core import retry as retries | ||||||||||||||||||||||||||||
| from google.api_core.retry import RetryFailureReason | ||||||||||||||||||||||||||||
| from google.cloud.bigtable.data.exceptions import RetryExceptionGroup | ||||||||||||||||||||||||||||
| from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup | ||||||||||||||||||||||||||||
| from google.rpc import code_pb2 | ||||||||||||||||||||||||||||
| from google.rpc import status_pb2 | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| if TYPE_CHECKING: | ||||||||||||||||||||||||||||
| import grpc | ||||||||||||||||||||||||||||
|
|
@@ -224,6 +228,61 @@ def _align_timeouts(operation: float, attempt: float | None) -> tuple[float, flo | |||||||||||||||||||||||||||
| return operation, final_attempt | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def _populate_statuses_from_mutations_exception_group( | ||||||||||||||||||||||||||||
| statuses: list[status_pb2.Status], exc_group: MutationsExceptionGroup | ||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| Helper function that populates a list of Status objects with exception information from | ||||||||||||||||||||||||||||
| the exception group. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||
| statuses: The initial list of Status objects | ||||||||||||||||||||||||||||
| exc_group: The exception group from a mutate rows operation | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| # We exception handle as follows: | ||||||||||||||||||||||||||||
| # | ||||||||||||||||||||||||||||
| # 1. Each exception in the error group is a FailedMutationEntryError, and its | ||||||||||||||||||||||||||||
| # cause is either a singular exception or a RetryExceptionGroup consisting of | ||||||||||||||||||||||||||||
| # multiple exceptions. | ||||||||||||||||||||||||||||
| # | ||||||||||||||||||||||||||||
| # 2. In the case of a singular exception, if the error does not have a gRPC status | ||||||||||||||||||||||||||||
| # code, we return a status code of UNKNOWN. | ||||||||||||||||||||||||||||
| # | ||||||||||||||||||||||||||||
| # 3. In the case of a RetryExceptionGroup, we use terminal exception in the exception | ||||||||||||||||||||||||||||
| # group and process that. | ||||||||||||||||||||||||||||
| for error in exc_group.exceptions: | ||||||||||||||||||||||||||||
| cause = error.__cause__ | ||||||||||||||||||||||||||||
| if isinstance(cause, RetryExceptionGroup): | ||||||||||||||||||||||||||||
| statuses[error.index] = _get_status(cause.exceptions[-1]) | ||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||
| statuses[error.index] = _get_status(cause) | ||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The helper function
Suggested change
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def _get_status(exc: Exception) -> status_pb2.Status: | ||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it looks like this can be called with None, if error.__cause__ is None? Or is that not possible? |
||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| Helper function that returns a Status object corresponding to the given exception. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||
| exc: An exception to be converted into a Status. | ||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||
| status_pb2.Status: A Status proto object. | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| if ( | ||||||||||||||||||||||||||||
| isinstance(exc, core_exceptions.GoogleAPICallError) | ||||||||||||||||||||||||||||
| and exc.grpc_status_code is not None | ||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||
| return status_pb2.Status( | ||||||||||||||||||||||||||||
| code=exc.grpc_status_code.value[0], | ||||||||||||||||||||||||||||
| message=exc.message, | ||||||||||||||||||||||||||||
| details=exc.details, | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| return status_pb2.Status( | ||||||||||||||||||||||||||||
| code=code_pb2.Code.UNKNOWN, | ||||||||||||||||||||||||||||
| message=str(exc), | ||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If exc can be None, we should probably change this message? |
||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def _validate_timeouts( | ||||||||||||||||||||||||||||
| operation_timeout: float, attempt_timeout: float | None, allow_none: bool = False | ||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should add hidden arguments to the public constructor args like this.