From 749df4bd0d6f8de71ec68086aa96615f93c03e0e Mon Sep 17 00:00:00 2001 From: Ovidiu T Date: Thu, 27 Mar 2025 12:08:23 +0200 Subject: [PATCH] add consumed capacity to the table resource --- boto3/dynamodb/table.py | 116 +++++++++++- tests/integration/test_dynamodb.py | 63 +++++++ tests/unit/dynamodb/test_table.py | 290 +++++++++++++++++++++++++++++ 3 files changed, 463 insertions(+), 6 deletions(-) diff --git a/boto3/dynamodb/table.py b/boto3/dynamodb/table.py index 931296bc09..ccf4b5c92e 100644 --- a/boto3/dynamodb/table.py +++ b/boto3/dynamodb/table.py @@ -28,7 +28,9 @@ class TableResource: def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - def batch_writer(self, overwrite_by_pkeys=None): + def batch_writer( + self, overwrite_by_pkeys=None, return_consumed_capacity=None + ): """Create a batch writer object. This method creates a context manager for writing @@ -53,10 +55,25 @@ def batch_writer(self, overwrite_by_pkeys=None): :param overwrite_by_pkeys: De-duplicate request items in buffer if match new request item on specified primary keys. i.e ``["partition_key1", "sort_key2", "sort_key3"]`` + :type return_consumed_capacity: string + :param return_consumed_capacity: Determines the level of detail + about either provisioned or on-demand throughput consumption + that is returned in the response: + INDEXES - The response includes the aggregate + ConsumedCapacity for the operation, together with + ConsumedCapacity for each table and secondary index that + was accessed. + TOTAL - The response includes only the aggregate + ConsumedCapacity for the operation. + NONE - No ConsumedCapacity details are included in the + response. """ return BatchWriter( - self.name, self.meta.client, overwrite_by_pkeys=overwrite_by_pkeys + self.name, + self.meta.client, + overwrite_by_pkeys=overwrite_by_pkeys, + return_consumed_capacity=return_consumed_capacity, ) @@ -64,7 +81,12 @@ class BatchWriter: """Automatically handle batch writes to DynamoDB for a single table.""" def __init__( - self, table_name, client, flush_amount=25, overwrite_by_pkeys=None + self, + table_name, + client, + flush_amount=25, + overwrite_by_pkeys=None, + return_consumed_capacity=None, ): """ @@ -92,12 +114,26 @@ def __init__( if match new request item on specified primary keys. i.e ``["partition_key1", "sort_key2", "sort_key3"]`` + :type return_consumed_capacity: string + :param return_consumed_capacity: Determines the level of detail + about either provisioned or on-demand throughput consumption + that is returned in the response: + INDEXES - The response includes the aggregate + ConsumedCapacity for the operation, together with + ConsumedCapacity for each table and secondary index that + was accessed. + TOTAL - The response includes only the aggregate + ConsumedCapacity for the operation. + NONE - No ConsumedCapacity details are included in the + response. """ self._table_name = table_name self._client = client self._items_buffer = [] self._flush_amount = flush_amount self._overwrite_by_pkeys = overwrite_by_pkeys + self.return_consumed_capacity = return_consumed_capacity + self.consumed_capacity = None def put_item(self, Item): self._add_request_and_process({'PutRequest': {'Item': Item}}) @@ -141,9 +177,15 @@ def _flush_if_needed(self): def _flush(self): items_to_send = self._items_buffer[: self._flush_amount] self._items_buffer = self._items_buffer[self._flush_amount :] - response = self._client.batch_write_item( - RequestItems={self._table_name: items_to_send} - ) + params = { + 'RequestItems': {self._table_name: items_to_send}, + } + if self.return_consumed_capacity is not None: + params['ReturnConsumedCapacity'] = self.return_consumed_capacity + response = self._client.batch_write_item(**params) + consumed_capacity = response.get('ConsumedCapacity') + if consumed_capacity is not None: + self._update_consumed_capacity_array(consumed_capacity) unprocessed_items = response['UnprocessedItems'] if not unprocessed_items: unprocessed_items = {} @@ -157,6 +199,68 @@ def _flush(self): len(self._items_buffer), ) + def _update_consumed_capacity_array(self, new_consumed_capacity): + if self.consumed_capacity is None: + self.consumed_capacity = new_consumed_capacity + elif new_consumed_capacity: + self.aggg_consumed_capacity_objects( + self.consumed_capacity[0], new_consumed_capacity[0] + ) + + @staticmethod + def aggg_consumed_capacity_objects( + total_consumed_capacity, consumed_capacity + ): + # Merge total capacities + BatchWriter._agg_capacity_objects( + total_consumed_capacity, consumed_capacity + ) + + # Merge table capacities + if 'Table' in consumed_capacity: + if 'Table' not in total_consumed_capacity: + total_consumed_capacity['Table'] = {} + BatchWriter._agg_capacity_objects( + total_consumed_capacity['Table'], + consumed_capacity['Table'], + ) + + # Merge indexes capacities + index_types = ['LocalSecondaryIndexes', 'GlobalSecondaryIndexes'] + for index_type in index_types: + if index_type in consumed_capacity: + if index_type not in total_consumed_capacity: + total_consumed_capacity[index_type] = consumed_capacity[ + index_type + ] + else: + for index_name in consumed_capacity[index_type]: + if ( + index_name + not in total_consumed_capacity[index_type] + ): + total_consumed_capacity[index_type][ + index_name + ] = {} + BatchWriter._agg_capacity_objects( + total_consumed_capacity[index_type][index_name], + consumed_capacity[index_type][index_name], + ) + + @staticmethod + def _agg_capacity_objects(total_consumed_capacity, consumed_capacity): + capacity_unit_keys = [ + 'CapacityUnits', + 'ReadCapacityUnits', + 'WriteCapacityUnits', + ] + for key in capacity_unit_keys: + if key in consumed_capacity: + total_consumed_capacity[key] = ( + total_consumed_capacity.get(key, 0) + + consumed_capacity[key] + ) + def __enter__(self): return self diff --git a/tests/integration/test_dynamodb.py b/tests/integration/test_dynamodb.py index 33162036d7..417f075230 100644 --- a/tests/integration/test_dynamodb.py +++ b/tests/integration/test_dynamodb.py @@ -218,3 +218,66 @@ def test_batch_write_items(self): # Verify all the items were added to dynamodb. for obj in self.table.scan(ConsistentRead=True)['Items']: self.assertIn(obj, items) + + # Verify consumed capacity is None + self.assertIs(batch.consumed_capacity, None) + + def test_batch_write_item_agg_capacity_none(self): + num_elements = 100 + items = [] + for i in range(num_elements): + items.append({'MyHashKey': f'foo{i}', 'OtherKey': f'bar{i}'}) + + with self.table.batch_writer(return_consumed_capacity='NONE') as batch: + for item in items: + batch.put_item(Item=item) + + # Verify all the items were added to dynamodb. + for obj in self.table.scan(ConsistentRead=True)['Items']: + self.assertIn(obj, items) + + # Verify consumed capacity + self.assertIs(batch.consumed_capacity, None) + + def test_batch_write_item_agg_capacity_total(self): + num_elements = 100 + items = [] + for i in range(num_elements): + items.append({'MyHashKey': f'foo{i}', 'OtherKey': f'bar{i}'}) + + with self.table.batch_writer( + return_consumed_capacity='TOTAL' + ) as batch: + for item in items: + batch.put_item(Item=item) + + # Verify all the items were added to dynamodb. + for obj in self.table.scan(ConsistentRead=True)['Items']: + self.assertIn(obj, items) + + # Verify consumed capacity + total_cu = batch.consumed_capacity[0]['CapacityUnits'] + self.assertEqual(total_cu, num_elements) + self.assertNotIn('Table', batch.consumed_capacity[0]) + + def test_batch_write_item_agg_capacity_indexes(self): + num_elements = 100 + items = [] + for i in range(num_elements): + items.append({'MyHashKey': f'foo{i}', 'OtherKey': f'bar{i}'}) + + with self.table.batch_writer( + return_consumed_capacity='INDEXES' + ) as batch: + for item in items: + batch.put_item(Item=item) + + # Verify all the items were added to dynamodb. + for obj in self.table.scan(ConsistentRead=True)['Items']: + self.assertIn(obj, items) + + # Verify consumed capacity + total_cu = batch.consumed_capacity[0]['CapacityUnits'] + table_cu = batch.consumed_capacity[0]['Table']['CapacityUnits'] + self.assertEqual(total_cu, num_elements) + self.assertEqual(table_cu, num_elements) diff --git a/tests/unit/dynamodb/test_table.py b/tests/unit/dynamodb/test_table.py index 21473dcb0f..c7b000b209 100644 --- a/tests/unit/dynamodb/test_table.py +++ b/tests/unit/dynamodb/test_table.py @@ -518,3 +518,293 @@ def test_added_unsent_request_not_flushed_delete(self): self.assert_batch_write_calls_are([batch, batch, final_batch]) # the buffer should be empty now self.assertEqual(self.batch_writer._items_buffer, []) + + def test_consumed_capacity_agg_total(self): + self.client.batch_write_item.side_effect = [ + { + 'ConsumedCapacity': [ + { + 'TableName': self.table_name, + 'CapacityUnits': 5.0, + } + ], + 'UnprocessedItems': {}, + }, + { + 'ConsumedCapacity': [ + { + 'TableName': self.table_name, + 'CapacityUnits': 5.0, + } + ], + 'UnprocessedItems': {}, + }, + ] + + # ReturnConsumedCapacity=TOTAL + with BatchWriter( + self.table_name, + self.client, + flush_amount=2, + return_consumed_capacity='TOTAL', + ) as b: + b.put_item(Item={'Hash': 'foo1'}) + b.put_item(Item={'Hash': 'foo2'}) + b.put_item(Item={'Hash': 'foo3'}) + b.put_item(Item={'Hash': 'foo4'}) + + first_batch = { + 'RequestItems': { + self.table_name: [ + {'PutRequest': {'Item': {'Hash': 'foo1'}}}, + {'PutRequest': {'Item': {'Hash': 'foo2'}}}, + ] + }, + 'ReturnConsumedCapacity': 'TOTAL', + } + + second_batch = { + 'RequestItems': { + self.table_name: [ + {'PutRequest': {'Item': {'Hash': 'foo3'}}}, + {'PutRequest': {'Item': {'Hash': 'foo4'}}}, + ] + }, + 'ReturnConsumedCapacity': 'TOTAL', + } + self.assert_batch_write_calls_are([first_batch, second_batch]) + + self.assertEqual( + b.consumed_capacity, + [ + { + 'TableName': self.table_name, + 'CapacityUnits': 10.0, + } + ], + ) + + def test_consumed_capacity_agg_none(self): + with BatchWriter( + self.table_name, + self.client, + flush_amount=2, + return_consumed_capacity='NONE', + ) as b: + b.put_item(Item={'Hash': 'foo1'}) + b.put_item(Item={'Hash': 'foo2'}) + b.put_item(Item={'Hash': 'foo3'}) + b.put_item(Item={'Hash': 'foo4'}) + + first_batch = { + 'RequestItems': { + self.table_name: [ + {'PutRequest': {'Item': {'Hash': 'foo1'}}}, + {'PutRequest': {'Item': {'Hash': 'foo2'}}}, + ] + }, + 'ReturnConsumedCapacity': 'NONE', + } + + second_batch = { + 'RequestItems': { + self.table_name: [ + {'PutRequest': {'Item': {'Hash': 'foo3'}}}, + {'PutRequest': {'Item': {'Hash': 'foo4'}}}, + ] + }, + 'ReturnConsumedCapacity': 'NONE', + } + self.assert_batch_write_calls_are([first_batch, second_batch]) + + self.assertIs(b.consumed_capacity, None) + + def test_consumed_capacity_agg_indexes(self): + # Operation is on a table so there will be no split + self.client.batch_write_item.side_effect = [ + { + 'ConsumedCapacity': [ + { + 'TableName': self.table_name, + 'CapacityUnits': 45.0, + 'ReadCapacityUnits': 18.0, + 'WriteCapacityUnits': 27.0, + 'Table': { + 'CapacityUnits': 5.0, + 'ReadCapacityUnits': 2.0, + 'WriteCapacityUnits': 3.0, + }, + 'LocalSecondaryIndexes': { + 'FirstLSI': { + 'CapacityUnits': 10.0, + 'ReadCapacityUnits': 4.0, + 'WriteCapacityUnits': 6.0, + }, + 'SecondLSI': { + 'CapacityUnits': 20.0, + 'ReadCapacityUnits': 8.0, + 'WriteCapacityUnits': 12.0, + }, + }, + 'GlobalSecondaryIndexes': { + 'FirstGSI': { + 'CapacityUnits': 10.0, + 'ReadCapacityUnits': 4.0, + 'WriteCapacityUnits': 6.0, + }, + }, + } + ], + 'UnprocessedItems': {}, + }, + { + 'ConsumedCapacity': [ + { + 'TableName': self.table_name, + 'CapacityUnits': 55.0, + 'ReadCapacityUnits': 22.0, + 'WriteCapacityUnits': 33.0, + 'Table': { + 'CapacityUnits': 5.0, + 'ReadCapacityUnits': 2.0, + 'WriteCapacityUnits': 3.0, + }, + 'LocalSecondaryIndexes': { + 'SecondLSI': { + 'CapacityUnits': 20.0, + 'ReadCapacityUnits': 8.0, + 'WriteCapacityUnits': 12.0, + }, + 'ThirdLSI': { + 'CapacityUnits': 30.0, + 'ReadCapacityUnits': 12.0, + 'WriteCapacityUnits': 18.0, + }, + }, + } + ], + 'UnprocessedItems': {}, + }, + { + 'ConsumedCapacity': [ + { + 'TableName': self.table_name, + 'CapacityUnits': 55.0, + 'ReadCapacityUnits': 22.0, + 'WriteCapacityUnits': 33.0, + 'Table': { + 'CapacityUnits': 5.0, + 'ReadCapacityUnits': 2.0, + 'WriteCapacityUnits': 3.0, + }, + 'LocalSecondaryIndexes': { + 'FourthLSI': { + 'CapacityUnits': 40.0, + 'ReadCapacityUnits': 16.0, + 'WriteCapacityUnits': 24.0, + } + }, + 'GlobalSecondaryIndexes': { + 'FirstGSI': { + 'CapacityUnits': 10.0, + 'ReadCapacityUnits': 4.0, + 'WriteCapacityUnits': 6.0, + }, + }, + } + ], + 'UnprocessedItems': {}, + }, + ] + + with BatchWriter( + self.table_name, + self.client, + flush_amount=2, + return_consumed_capacity='INDEXES', + ) as b: + b.put_item(Item={'Hash': 'foo1'}) + b.put_item(Item={'Hash': 'foo2'}) + b.put_item(Item={'Hash': 'foo3'}) + b.put_item(Item={'Hash': 'foo4'}) + b.put_item(Item={'Hash': 'foo5'}) + b.put_item(Item={'Hash': 'foo6'}) + + first_batch = { + 'RequestItems': { + self.table_name: [ + {'PutRequest': {'Item': {'Hash': 'foo1'}}}, + {'PutRequest': {'Item': {'Hash': 'foo2'}}}, + ] + }, + 'ReturnConsumedCapacity': 'INDEXES', + } + + second_batch = { + 'RequestItems': { + self.table_name: [ + {'PutRequest': {'Item': {'Hash': 'foo3'}}}, + {'PutRequest': {'Item': {'Hash': 'foo4'}}}, + ] + }, + 'ReturnConsumedCapacity': 'INDEXES', + } + + third_batch = { + 'RequestItems': { + self.table_name: [ + {'PutRequest': {'Item': {'Hash': 'foo5'}}}, + {'PutRequest': {'Item': {'Hash': 'foo6'}}}, + ] + }, + 'ReturnConsumedCapacity': 'INDEXES', + } + self.assert_batch_write_calls_are( + [first_batch, second_batch, third_batch] + ) + + self.assertEqual( + b.consumed_capacity, + [ + { + 'TableName': self.table_name, + 'CapacityUnits': 155.0, + 'ReadCapacityUnits': 62.0, + 'WriteCapacityUnits': 93.0, + 'Table': { + 'CapacityUnits': 15.0, + 'ReadCapacityUnits': 6.0, + 'WriteCapacityUnits': 9.0, + }, + 'LocalSecondaryIndexes': { + 'FirstLSI': { + 'CapacityUnits': 10.0, + 'ReadCapacityUnits': 4.0, + 'WriteCapacityUnits': 6.0, + }, + 'SecondLSI': { + 'CapacityUnits': 40.0, + 'ReadCapacityUnits': 16.0, + 'WriteCapacityUnits': 24.0, + }, + 'ThirdLSI': { + 'CapacityUnits': 30.0, + 'ReadCapacityUnits': 12.0, + 'WriteCapacityUnits': 18.0, + }, + 'FourthLSI': { + 'CapacityUnits': 40.0, + 'ReadCapacityUnits': 16.0, + 'WriteCapacityUnits': 24.0, + }, + }, + 'GlobalSecondaryIndexes': { + 'FirstGSI': { + 'CapacityUnits': 20.0, + 'ReadCapacityUnits': 8.0, + 'WriteCapacityUnits': 12.0, + }, + }, + } + ], + )