diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 31cf8d0dc..08eec9891 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,7 +4,10 @@ CHANGELOG 6.4.1 (unreleased) ------------------ -- Nothing changed yet. +- Deserialize pickles in a asyncio executor + [masipcat] +- Converted db functions reader() and IWriter.serialize() to async + [masipcat] 6.4.0 (2022-04-12) diff --git a/guillotina/_settings.py b/guillotina/_settings.py index 58f8e7a62..9e9869694 100644 --- a/guillotina/_settings.py +++ b/guillotina/_settings.py @@ -92,6 +92,7 @@ "indexer": "guillotina.catalog.index.Indexer", "search_parser": "default", "object_reader": "guillotina.db.reader.reader", + "async_object_read_size": 25000, # bytes "thread_pool_workers": 32, "server_settings": {"uvicorn": {"timeout_keep_alive": 5, "http": "h11"}}, "valid_id_characters": string.digits + string.ascii_lowercase + ".-_@$^()+ =", diff --git a/guillotina/db/reader.py b/guillotina/db/reader.py index 0e2d55f20..dfc4824b3 100644 --- a/guillotina/db/reader.py +++ b/guillotina/db/reader.py @@ -1,11 +1,18 @@ +from guillotina import app_settings from guillotina.db.orm.interfaces import IBaseObject +from guillotina.utils import run_async import pickle import typing -def reader(result: dict) -> IBaseObject: - obj = typing.cast(IBaseObject, pickle.loads(result["state"])) +async def reader(result: dict) -> IBaseObject: + state = result["state"] + if len(state) > app_settings["async_object_read_size"]: + o = await run_async(pickle.loads, state) + else: + o = pickle.loads(state) + obj = typing.cast(IBaseObject, o) obj.__uuid__ = result["zoid"] obj.__serial__ = result["tid"] obj.__name__ = result["id"] diff --git a/guillotina/db/storages/cockroach.py b/guillotina/db/storages/cockroach.py index eb740af1d..a216e92d0 100644 --- a/guillotina/db/storages/cockroach.py +++ b/guillotina/db/storages/cockroach.py @@ -212,7 +212,7 @@ async def delete(self, txn, oid): async def store(self, oid, old_serial, writer, obj, txn): assert oid is not None - pickled = writer.serialize() # This calls __getstate__ of obj + pickled = await writer.serialize() # This calls __getstate__ of obj if len(pickled) >= self._large_record_size: logger.warning(f"Large object {obj.__class__}: {len(pickled)}") part = writer.part diff --git a/guillotina/db/storages/dummy.py b/guillotina/db/storages/dummy.py index ddf7aeda0..735aa164b 100644 --- a/guillotina/db/storages/dummy.py +++ b/guillotina/db/storages/dummy.py @@ -69,7 +69,7 @@ def get_txn(self, txn): async def store(self, oid, old_serial, writer, obj, txn): assert oid is not None - p = writer.serialize() # This calls __getstate__ of obj + p = await writer.serialize() # This calls __getstate__ of obj json = await writer.get_json() part = writer.part if part is None: diff --git a/guillotina/db/storages/pg.py b/guillotina/db/storages/pg.py index f52a2f3cc..0ac547329 100644 --- a/guillotina/db/storages/pg.py +++ b/guillotina/db/storages/pg.py @@ -867,7 +867,7 @@ async def load(self, txn, oid): async def store(self, oid, old_serial, writer, obj, txn): assert oid is not None - pickled = writer.serialize() # This calls __getstate__ of obj + pickled = await writer.serialize() # This calls __getstate__ of obj if len(pickled) >= self._large_record_size: log.info(f"Large object {obj.__class__}: {len(pickled)}") if self._store_json: diff --git a/guillotina/db/transaction.py b/guillotina/db/transaction.py index fc37c3f1d..e7fa3c8e7 100644 --- a/guillotina/db/transaction.py +++ b/guillotina/db/transaction.py @@ -378,7 +378,7 @@ async def get(self, oid: str, ignore_registered: bool = False) -> IBaseObject: if result is None: result = await self._get(oid) - obj = app_settings["object_reader"](result) + obj = await app_settings["object_reader"](result) obj.__txn__ = self if obj.__immutable_cache__: # ttl of zero means we want to provide a hard cache here @@ -553,10 +553,10 @@ async def get_child(self, parent, key, ignore_registered=False): if obj is not None: return obj - return self._fill_object(result, parent) + return await self._fill_object(result, parent) - def _fill_object(self, item: dict, parent: IBaseObject) -> IBaseObject: - obj = app_settings["object_reader"](item) + async def _fill_object(self, item: dict, parent: IBaseObject) -> IBaseObject: + obj = await app_settings["object_reader"](item) obj.__parent__ = parent obj.__txn__ = self return obj @@ -565,7 +565,7 @@ async def _get_batch_children(self, parent: IBaseObject, keys: List[str]) -> Asy for litem in await self._manager._storage.get_children(self, parent.__uuid__, keys): if len(litem["state"]) < self._cache.max_cache_record_size: await self._cache.set(litem, container=parent, id=litem["id"]) - yield self._fill_object(litem, parent) + yield await self._fill_object(litem, parent) async def get_children(self, parent, keys): """ @@ -592,7 +592,7 @@ async def get_children(self, parent, keys): yield litem lookup_group = [] - yield self._fill_object(item, parent) + yield await self._fill_object(item, parent) # flush the rest if len(lookup_group) > 0: @@ -636,7 +636,7 @@ async def get_annotation(self, base_obj, id, reader=None): if result == _EMPTY: raise KeyError(id) if reader is None: - obj = app_settings["object_reader"](result) + obj = await app_settings["object_reader"](result) else: obj = reader(result) obj.__of__ = base_obj.__uuid__ diff --git a/guillotina/db/writer.py b/guillotina/db/writer.py index 607184cba..7e3dbdb2e 100644 --- a/guillotina/db/writer.py +++ b/guillotina/db/writer.py @@ -56,7 +56,7 @@ def old_serial(self): def part(self): return getattr(self._obj, "__partition_id__", 0) - def serialize(self): + async def serialize(self): protocol = app_settings.get("pickle_protocol", pickle.HIGHEST_PROTOCOL) return pickle.dumps(self._obj, protocol=protocol) diff --git a/guillotina/factory/content.py b/guillotina/factory/content.py index 16610c9da..c7e93ecfc 100644 --- a/guillotina/factory/content.py +++ b/guillotina/factory/content.py @@ -175,7 +175,7 @@ async def initialize(self): await txn.initialize_tid() root = await tm._storage.load(txn, ROOT_ID) if root is not None: - root = app_settings["object_reader"](root) + root = await app_settings["object_reader"](root) root.__txn__ = txn if root.__db_id__ is None: root.__db_id__ = self.__db_id__ diff --git a/guillotina/tests/cache/test_cache_memory.py b/guillotina/tests/cache/test_cache_memory.py index 7bbaf408c..2de20e4ac 100644 --- a/guillotina/tests/cache/test_cache_memory.py +++ b/guillotina/tests/cache/test_cache_memory.py @@ -97,7 +97,7 @@ async def test_cache_object(guillotina_main): cache = BasicCache(txn) txn._cache = cache ob = create_content() - storage.store(None, None, None, ob, txn) + await storage.store(None, None, None, ob, txn) loaded = await txn.get(ob.__uuid__) assert id(loaded) != id(ob) assert loaded.__uuid__ == ob.__uuid__ @@ -119,8 +119,8 @@ async def test_cache_object_from_child(guillotina_main): ob = create_content() parent = create_content() ob.__parent__ = parent - storage.store(None, None, None, parent, txn) - storage.store(None, None, None, ob, txn) + await storage.store(None, None, None, parent, txn) + await storage.store(None, None, None, ob, txn) loaded = await txn.get_child(parent, ob.id) assert cache._hits == 0 @@ -140,7 +140,7 @@ async def test_do_not_cache_large_object(guillotina_main): txn._cache = cache ob = create_content() ob.foobar = "X" * cache.max_cache_record_size # push size above cache threshold - storage.store(None, None, None, ob, txn) + await storage.store(None, None, None, ob, txn) loaded = await txn.get(ob.__uuid__) assert id(loaded) != id(ob) assert loaded.__uuid__ == ob.__uuid__ diff --git a/guillotina/tests/mocks.py b/guillotina/tests/mocks.py index 3232ccdf7..1b2c3d464 100644 --- a/guillotina/tests/mocks.py +++ b/guillotina/tests/mocks.py @@ -122,10 +122,10 @@ async def get_child(self, txn, container_uid, key): if oid in self._objects: return self._objects[oid] - def store(self, oid, old_serial, writer, ob, txn): + async def store(self, oid, old_serial, writer, ob, txn): writer = IWriter(ob) self._objects[ob.__uuid__] = { - "state": writer.serialize(), + "state": await writer.serialize(), "zoid": ob.__uuid__, "tid": 1, "id": writer.id, diff --git a/guillotina/tests/test_metrics.py b/guillotina/tests/test_metrics.py index 4f442232d..94e74a5c6 100644 --- a/guillotina/tests/test_metrics.py +++ b/guillotina/tests/test_metrics.py @@ -133,7 +133,9 @@ async def test_store_object(self, metrics_registry): txn.connection_reserved = True txn._db_conn = conn - await storage.store("foobar", 1, MagicMock(), ob, txn) + writer = AsyncMock() + writer.serialize.return_value = b"" + await storage.store("foobar", 1, writer, ob, txn) assert ( metrics_registry.get_sample_value( "guillotina_db_pg_ops_total", {"type": "store_object", "error": "none"} diff --git a/guillotina/utils/content.py b/guillotina/utils/content.py index cf2a2cd47..96d415e5f 100644 --- a/guillotina/utils/content.py +++ b/guillotina/utils/content.py @@ -186,7 +186,7 @@ async def get_object_by_uid(uid: str, txn=None) -> IBaseObject: if result["parent_id"] == TRASHED_ID: raise KeyError(uid) - obj = app_settings["object_reader"](result) + obj = await app_settings["object_reader"](result) obj.__txn__ = txn if result["parent_id"]: parent = await get_object_by_uid(result["parent_id"], txn) diff --git a/measures/pickling.py b/measures/pickling.py index 96e5b41b8..5492b0777 100644 --- a/measures/pickling.py +++ b/measures/pickling.py @@ -28,7 +28,7 @@ async def run1(): start = time.time() writer = get_adapter(ob, IWriter) for _ in range(ITERATIONS): - writer.serialize() + await writer.serialize() end = time.time() print(f"Done with {ITERATIONS} in {end - start} seconds") @@ -39,11 +39,11 @@ async def run2(): ob.foobar1 = "1" ob.foobar2 = "2" ob.foobar6 = "6" - start = time.time() writer = get_adapter(ob, IWriter) - serialized = writer.serialize() + serialized = await writer.serialize() + start = time.time() for _ in range(ITERATIONS): - ob = reader({"state": serialized, "zoid": 0, "tid": 0, "id": "foobar"}) + ob = await reader({"state": serialized, "zoid": 0, "tid": 0, "id": "foobar"}) end = time.time() assert ob.foobar1 == "1" assert ob.foobar6 == "6"