Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ CHANGELOG
6.4.1 (unreleased)
------------------

- Nothing changed yet.
- Deserialize pickles in a asyncio executor
[masipcat]
- Converted db functions reader() and IWriter.serialize() to async
[masipcat]


6.4.0 (2022-04-12)
Expand Down
1 change: 1 addition & 0 deletions guillotina/_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
"indexer": "guillotina.catalog.index.Indexer",
"search_parser": "default",
"object_reader": "guillotina.db.reader.reader",
"async_object_read_size": 25000, # bytes
"thread_pool_workers": 32,
"server_settings": {"uvicorn": {"timeout_keep_alive": 5, "http": "h11"}},
"valid_id_characters": string.digits + string.ascii_lowercase + ".-_@$^()+ =",
Expand Down
11 changes: 9 additions & 2 deletions guillotina/db/reader.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from guillotina import app_settings
from guillotina.db.orm.interfaces import IBaseObject
from guillotina.utils import run_async

import pickle
import typing


def reader(result: dict) -> IBaseObject:
obj = typing.cast(IBaseObject, pickle.loads(result["state"]))
async def reader(result: dict) -> IBaseObject:
state = result["state"]
if len(state) > app_settings["async_object_read_size"]:
o = await run_async(pickle.loads, state)
else:
o = pickle.loads(state)
obj = typing.cast(IBaseObject, o)
obj.__uuid__ = result["zoid"]
obj.__serial__ = result["tid"]
obj.__name__ = result["id"]
Expand Down
2 changes: 1 addition & 1 deletion guillotina/db/storages/cockroach.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ async def delete(self, txn, oid):
async def store(self, oid, old_serial, writer, obj, txn):
assert oid is not None

pickled = writer.serialize() # This calls __getstate__ of obj
pickled = await writer.serialize() # This calls __getstate__ of obj
if len(pickled) >= self._large_record_size:
logger.warning(f"Large object {obj.__class__}: {len(pickled)}")
part = writer.part
Expand Down
2 changes: 1 addition & 1 deletion guillotina/db/storages/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def get_txn(self, txn):

async def store(self, oid, old_serial, writer, obj, txn):
assert oid is not None
p = writer.serialize() # This calls __getstate__ of obj
p = await writer.serialize() # This calls __getstate__ of obj
json = await writer.get_json()
part = writer.part
if part is None:
Expand Down
2 changes: 1 addition & 1 deletion guillotina/db/storages/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ async def load(self, txn, oid):
async def store(self, oid, old_serial, writer, obj, txn):
assert oid is not None

pickled = writer.serialize() # This calls __getstate__ of obj
pickled = await writer.serialize() # This calls __getstate__ of obj
if len(pickled) >= self._large_record_size:
log.info(f"Large object {obj.__class__}: {len(pickled)}")
if self._store_json:
Expand Down
14 changes: 7 additions & 7 deletions guillotina/db/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ async def get(self, oid: str, ignore_registered: bool = False) -> IBaseObject:
if result is None:
result = await self._get(oid)

obj = app_settings["object_reader"](result)
obj = await app_settings["object_reader"](result)
obj.__txn__ = self
if obj.__immutable_cache__:
# ttl of zero means we want to provide a hard cache here
Expand Down Expand Up @@ -553,10 +553,10 @@ async def get_child(self, parent, key, ignore_registered=False):
if obj is not None:
return obj

return self._fill_object(result, parent)
return await self._fill_object(result, parent)

def _fill_object(self, item: dict, parent: IBaseObject) -> IBaseObject:
obj = app_settings["object_reader"](item)
async def _fill_object(self, item: dict, parent: IBaseObject) -> IBaseObject:
obj = await app_settings["object_reader"](item)
obj.__parent__ = parent
obj.__txn__ = self
return obj
Expand All @@ -565,7 +565,7 @@ async def _get_batch_children(self, parent: IBaseObject, keys: List[str]) -> Asy
for litem in await self._manager._storage.get_children(self, parent.__uuid__, keys):
if len(litem["state"]) < self._cache.max_cache_record_size:
await self._cache.set(litem, container=parent, id=litem["id"])
yield self._fill_object(litem, parent)
yield await self._fill_object(litem, parent)

async def get_children(self, parent, keys):
"""
Expand All @@ -592,7 +592,7 @@ async def get_children(self, parent, keys):
yield litem
lookup_group = []

yield self._fill_object(item, parent)
yield await self._fill_object(item, parent)

# flush the rest
if len(lookup_group) > 0:
Expand Down Expand Up @@ -636,7 +636,7 @@ async def get_annotation(self, base_obj, id, reader=None):
if result == _EMPTY:
raise KeyError(id)
if reader is None:
obj = app_settings["object_reader"](result)
obj = await app_settings["object_reader"](result)
else:
obj = reader(result)
obj.__of__ = base_obj.__uuid__
Expand Down
2 changes: 1 addition & 1 deletion guillotina/db/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def old_serial(self):
def part(self):
return getattr(self._obj, "__partition_id__", 0)

def serialize(self):
async def serialize(self):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add the same logic as in the deserialization function (because it's expensive to know if a python object it's big or small) but I changed the function to async just in case someone wants to use a custom logic to decide when to run_async. For example:

async def serialize(obj):
    if isinstance(obj, BigObject):
        return await run_async(pickle.dumps, obj)
    else:
        return pickle.dumps(obj)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we feel it's not necessary I can revert this part

protocol = app_settings.get("pickle_protocol", pickle.HIGHEST_PROTOCOL)
return pickle.dumps(self._obj, protocol=protocol)

Expand Down
2 changes: 1 addition & 1 deletion guillotina/factory/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async def initialize(self):
await txn.initialize_tid()
root = await tm._storage.load(txn, ROOT_ID)
if root is not None:
root = app_settings["object_reader"](root)
root = await app_settings["object_reader"](root)
root.__txn__ = txn
if root.__db_id__ is None:
root.__db_id__ = self.__db_id__
Expand Down
8 changes: 4 additions & 4 deletions guillotina/tests/cache/test_cache_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def test_cache_object(guillotina_main):
cache = BasicCache(txn)
txn._cache = cache
ob = create_content()
storage.store(None, None, None, ob, txn)
await storage.store(None, None, None, ob, txn)
loaded = await txn.get(ob.__uuid__)
assert id(loaded) != id(ob)
assert loaded.__uuid__ == ob.__uuid__
Expand All @@ -119,8 +119,8 @@ async def test_cache_object_from_child(guillotina_main):
ob = create_content()
parent = create_content()
ob.__parent__ = parent
storage.store(None, None, None, parent, txn)
storage.store(None, None, None, ob, txn)
await storage.store(None, None, None, parent, txn)
await storage.store(None, None, None, ob, txn)

loaded = await txn.get_child(parent, ob.id)
assert cache._hits == 0
Expand All @@ -140,7 +140,7 @@ async def test_do_not_cache_large_object(guillotina_main):
txn._cache = cache
ob = create_content()
ob.foobar = "X" * cache.max_cache_record_size # push size above cache threshold
storage.store(None, None, None, ob, txn)
await storage.store(None, None, None, ob, txn)
loaded = await txn.get(ob.__uuid__)
assert id(loaded) != id(ob)
assert loaded.__uuid__ == ob.__uuid__
Expand Down
4 changes: 2 additions & 2 deletions guillotina/tests/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ async def get_child(self, txn, container_uid, key):
if oid in self._objects:
return self._objects[oid]

def store(self, oid, old_serial, writer, ob, txn):
async def store(self, oid, old_serial, writer, ob, txn):
writer = IWriter(ob)
self._objects[ob.__uuid__] = {
"state": writer.serialize(),
"state": await writer.serialize(),
"zoid": ob.__uuid__,
"tid": 1,
"id": writer.id,
Expand Down
4 changes: 3 additions & 1 deletion guillotina/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ async def test_store_object(self, metrics_registry):
txn.connection_reserved = True
txn._db_conn = conn

await storage.store("foobar", 1, MagicMock(), ob, txn)
writer = AsyncMock()
writer.serialize.return_value = b""
await storage.store("foobar", 1, writer, ob, txn)
assert (
metrics_registry.get_sample_value(
"guillotina_db_pg_ops_total", {"type": "store_object", "error": "none"}
Expand Down
2 changes: 1 addition & 1 deletion guillotina/utils/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async def get_object_by_uid(uid: str, txn=None) -> IBaseObject:
if result["parent_id"] == TRASHED_ID:
raise KeyError(uid)

obj = app_settings["object_reader"](result)
obj = await app_settings["object_reader"](result)
obj.__txn__ = txn
if result["parent_id"]:
parent = await get_object_by_uid(result["parent_id"], txn)
Expand Down
8 changes: 4 additions & 4 deletions measures/pickling.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def run1():
start = time.time()
writer = get_adapter(ob, IWriter)
for _ in range(ITERATIONS):
writer.serialize()
await writer.serialize()
end = time.time()
print(f"Done with {ITERATIONS} in {end - start} seconds")

Expand All @@ -39,11 +39,11 @@ async def run2():
ob.foobar1 = "1"
ob.foobar2 = "2"
ob.foobar6 = "6"
start = time.time()
writer = get_adapter(ob, IWriter)
serialized = writer.serialize()
serialized = await writer.serialize()
start = time.time()
for _ in range(ITERATIONS):
ob = reader({"state": serialized, "zoid": 0, "tid": 0, "id": "foobar"})
ob = await reader({"state": serialized, "zoid": 0, "tid": 0, "id": "foobar"})
end = time.time()
assert ob.foobar1 == "1"
assert ob.foobar6 == "6"
Expand Down