Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/system_tests/services/opa_config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
bundles:
diamond-policies:
service: ghcr
resource: ghcr.io/diamondlightsource/authz-policy:0.0.20
Comment thread
ZohebShaikh marked this conversation as resolved.
resource: ghcr.io/zohebshaikh/authz-policy:0.2.7
polling:
min_delay_seconds: 30
max_delay_seconds: 120
14 changes: 7 additions & 7 deletions tests/system_tests/services/tiled_config/dls.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(
provider: str | None = None,
):
self._token_audience = token_audience
self._type_adapter = TypeAdapter(DiamondAccessBlob)
self._type_adapter = TypeAdapter(DiamondAccessBlob | int)

super().__init__(
authorization_provider=authorization_provider,
Expand All @@ -80,7 +80,7 @@ async def init_node(
decision = await self._get_external_decision(
self._create_node,
self.build_input(principal, authn_access_tags, authn_scopes, access_blob),
ResultHolder[int],
ResultHolder[str],
)
if decision and decision.result is not None:
return (True, {"tags": [decision.result]})
Expand Down Expand Up @@ -131,11 +131,11 @@ def build_input(
and "tags" in access_blob
and len(access_blob["tags"]) > 0
):
if isinstance(tags := access_blob["tags"][0], str):
blob = self._type_adapter.validate_json(tags)
blob = self._type_adapter.validate_json(access_blob["tags"][0])
if isinstance(blob, DiamondAccessBlob):
_input.update(blob.model_dump())
elif isinstance(tags, int):
_input["session"] = str(tags)
elif isinstance(blob, int):
_input["session"] = str(blob)

return json.dumps({"input": _input})

Expand All @@ -151,7 +151,7 @@ async def filters(
tags = await self._get_external_decision(
self._user_tags,
self.build_input(principal, authn_access_tags, authn_scopes),
ResultHolder[list[int | str]],
ResultHolder[list[str]],
)
if tags is not None:
if tags.result == ["*"]:
Expand Down