Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Add `CUSTOM_COMMAND` feature to `CommandServer`

## 2.16.0 (2026-05-26)

- Add `set_exposure_settings(shutter_time, analogue_gain)` to `LibcameraCapture`, mirroring the API of `UnicamIspCapture`. When either parameter is set to a concrete value (i.e. not `Auto.AUTO`), `AeEnable=False` is also applied so the AE algorithm does not overwrite the manual setting.
Expand Down
19 changes: 10 additions & 9 deletions actfw_core/_private/schema/agent_app_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import socket
from dataclasses import dataclass

from typing_extensions import Self
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typing_extensions が pyproject.toml に入っていなそうなため、直接の依存として入れた方が壊れにくくなるかもしれません


from ..util.result import ResultTuple


Expand Down Expand Up @@ -33,6 +35,8 @@ def next_(self) -> "RequestId":

class CommandKind(enum.Enum):
TAKE_PHOTO = 0
CHECK_CUSTOM_COMMAND_AVAILABILITY = 1
CUSTOM_COMMAND = 2


class Status(enum.Enum):
Expand All @@ -51,15 +55,12 @@ class CommandRequest:
data: bytes

@classmethod
def parse(cls, stream: socket.socket) -> ResultTuple["CommandRequest", Exception]:
try:
id_ = RequestId(_read_int(stream))
kind = CommandKind(_read_int(stream))
data_length = _read_int(stream)
data = _read_bytes(stream, data_length)
return cls(id_, kind, data), None
except Exception as e:
return None, e
def parse(cls, stream: socket.socket) -> Self:
id_ = RequestId(_read_int(stream))
kind = CommandKind(_read_int(stream))
data_length = _read_int(stream)
data = _read_bytes(stream, data_length)
return cls(id_, kind, data)

def to_bytes(self) -> bytes:
id_ = self.id_._id
Expand Down
101 changes: 69 additions & 32 deletions actfw_core/command_server.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import base64
import copy
import io
import json
import os
import socket
from threading import Lock
from typing import Optional
import sys
from threading import Lock, Thread
from typing import Callable, Optional, TypedDict

from PIL.Image import Image as PIL_Image

from .schema.agent_app_protocol import CommandKind, CommandRequest, CommandResponse, Status
from .task import Isolated


class CustomCommandRequest(TypedDict):
id: str
payload: str


class CommandServer(Isolated):
sock_path: Optional[str]
img_lock: Lock
Expand All @@ -26,7 +33,9 @@ class CommandServer(Isolated):

"""

def __init__(self, sock_path: Optional[str] = None) -> None:
def __init__(
self, sock_path: Optional[str] = None, custom_command_handler: Optional[Callable[[CustomCommandRequest], str]] = None
) -> None:
super().__init__()
self.sock_path = None
env = "ACTCAST_COMMAND_SOCK"
Expand All @@ -37,6 +46,7 @@ def __init__(self, sock_path: Optional[str] = None) -> None:
self.running = True
self.img_lock = Lock()
self.img = None
self.custom_command_handler = custom_command_handler

def run(self) -> None:
"""Run and start the activity"""
Expand All @@ -54,39 +64,40 @@ def run(self) -> None:
while self.running:
try:
conn, _ = s.accept()
request, err = CommandRequest.parse(conn)

if request is None:
raise RuntimeError("couldn't parse a request from actcast agent: `CommandRequest.parse()` failed")

# FIXME: this error handling is unreachable because `CommandRequest.parse()` returns `None` if parsing fails
if err:
error_response = CommandResponse(
copy.copy(request.id_),
Status.GENERAL_ERROR,
b"",
)
conn.sendall(error_response.to_bytes())
conn.shutdown(socket.SHUT_RDWR)
conn.close()
continue
Thread(target=self._handle_request, args=(conn,), daemon=True).start()
except socket.timeout:
pass
except Exception as e:
print(f"Unexpected CommandServer error: {e}", file=sys.stderr, flush=True)
pass
os.remove(self.sock_path)

response = None
def _handle_request(self, conn: socket.socket) -> None:
try:
request = CommandRequest.parse(conn)
except Exception as e:
print(f"Failed to parse command request: {e}", file=sys.stderr, flush=True)
conn.shutdown(socket.SHUT_RDWR)
conn.close()
return

if request.kind == CommandKind.TAKE_PHOTO:
response = self._handle_take_photo(request)
response = None

if response is None:
conn.shutdown(socket.SHUT_RDWR)
conn.close()
continue
if request.kind == CommandKind.TAKE_PHOTO:
response = self._handle_take_photo(request)
elif request.kind == CommandKind.CHECK_CUSTOM_COMMAND_AVAILABILITY:
response = self._handle_custom_command_availability(request)
elif request.kind == CommandKind.CUSTOM_COMMAND:
response = self._handle_custom_command(request)

conn.sendall(response.to_bytes())
conn.shutdown(socket.SHUT_RDWR)
conn.close()
except socket.timeout:
pass
os.remove(self.sock_path)
if response is None:
conn.shutdown(socket.SHUT_RDWR)
conn.close()
return

conn.sendall(response.to_bytes())
conn.shutdown(socket.SHUT_RDWR)
conn.close()

def _handle_take_photo(self, request: CommandRequest) -> Optional[CommandResponse]:
# Wait photo
Expand All @@ -104,6 +115,32 @@ def _handle_take_photo(self, request: CommandRequest) -> Optional[CommandRespons

return None

def _handle_custom_command_availability(self, request: CommandRequest) -> CommandResponse:
if self.custom_command_handler is None:
return CommandResponse(copy.copy(request.id_), Status.UNIMPLEMENTED, b"Custom command handler is not set")
else:
return CommandResponse(copy.copy(request.id_), Status.OK, b"")

def _handle_custom_command(self, request: CommandRequest) -> CommandResponse:
if self.custom_command_handler is None:
return CommandResponse(
copy.copy(request.id_), Status.GENERAL_ERROR, b"Unreachable Error: custom command handler is not set"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unreachable という語は、コード上で到達する方法がないことをイメージすることが多い気がします。
どちらかというと、先に availability を check して OK が返ることを確認していない protocol error のイメージの方が近いかもしれません。

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

基本的にはここには入ってきてはいけないという意味で Unreachable にしてました

このコメントとしてはProtocol Error に変更してエラーメッセージを変えるべき的なことですかね👀
(私としてもめちゃくちゃ強いこだわりがある訳ではないです!!)

)

try:
command_server_request: CustomCommandRequest = json.loads(request.data.decode())
Copy link
Copy Markdown
Member

@ny-a ny-a May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json.loads は Python の型をランタイムで確認しないはずなので、ここで保証されるのは json として valid なことまでな気がします。
(agent 側の動作に問題がなければほぼありえないはずですが、id を持っていることまでは確認してもよいかもしれません。)

(あと、ここで落ちる場合は agent がおかしい場合だと思うため、エラーメッセージではユーザーにとっては対処のしようがないということは明示しておきたい気がします)

Copy link
Copy Markdown
Contributor Author

@umadein umadein May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json.loads は Python の型をランタイムで確認しないはずなので、ここで保証されるのは json として valid なことまでな気がします。

はい、これは基本的に payload が JSON であるということのみを確認していて, ここで落ちるということは agent の request の形式がおかしいので GENERAL_ERROR として返してます!
id チェックとかしたいならもしかしたら pydantic とか導入した方が幸せな気がしないでもないですね🤔
一旦は可能な限り標準に寄せようとしてますが.

(あと、ここで落ちる場合は agent がおかしい場合だと思うため、エラーメッセージではユーザーにとっては対処のしようがないということは明示しておきたい気がします)

基本的には我々がバグを発見するためのもので, ユーザーに何かを求める用途でないのはそうです.
このエラーメッセージ自体は agent 側から syslog に出るだけとかじゃないんですかね?
そうすると他のエラーメッセージの扱いとあまり違わないような気がするんですが, とはいえエラーメッセージ自体に気にしなくていいよ的なのを書いた方がいいってことですかね?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

と思ったけど、エラーメッセージなんてあっても別に困らないし書いてもいい気がしてきた

なんなら agent が JSON を渡すことを前提に try-catch しないでもありかもしれない.
handle_request のトップレベルに try-catch を書いて GENERAL_ERROR で吸収するようにして

except Exception as e:
return CommandResponse(
copy.copy(request.id_), Status.GENERAL_ERROR, f"Failed to parse custom command payload: {e}".encode()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error の種類が分かるようにしたいです。

Suggested change
copy.copy(request.id_), Status.GENERAL_ERROR, f"Failed to parse custom command payload: {e}".encode()
copy.copy(request.id_), Status.GENERAL_ERROR, f"Failed to parse custom command payload: {e!r}".encode()

)

try:
payload = self.custom_command_handler(command_server_request)
return CommandResponse(copy.copy(request.id_), Status.OK, payload.encode())
except Exception as e:
error_message = str(e)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error の種類が分かるようにしたいです。

Suggested change
error_message = str(e)
error_message = repr(e)

return CommandResponse(copy.copy(request.id_), Status.APP_ERROR, error_message.encode())

def update_image(self, image: PIL_Image) -> None:
"""

Expand Down
16 changes: 14 additions & 2 deletions tests/unit_test/schema/test_agent_app_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,19 @@ def is_consumed_all(self) -> bool:
return self._data.read(1) == b""


def test_roundtrip() -> None:
def test_command_request_roundtrip() -> None:
HUGE_DATA_SIZE = 2**13 # 8KB
HUGE_DATA = f"0 0 {HUGE_DATA_SIZE} ".encode() + b"a" * HUGE_DATA_SIZE

DATAS = [b"0 0 0 ", b"0 0 1 a", HUGE_DATA]
for data in DATAS:
sock = DummySocket(data)
x = CommandRequest.parse(sock)
assert sock.is_consumed_all()
assert x.to_bytes() == data


def test_result_tuple_parse_roundtrip() -> None:
def roundtrip(cls: Any, data: bytes) -> None:
sock = DummySocket(data)
x, err = cls.parse(sock)
Expand All @@ -29,7 +41,7 @@ def roundtrip(cls: Any, data: bytes) -> None:
HUGE_DATA_SIZE = 2**13 # 8KB
HUGE_DATA = f"0 0 {HUGE_DATA_SIZE} ".encode() + b"a" * HUGE_DATA_SIZE

CLASSES = [CommandRequest, CommandResponse, ServiceRequest, ServiceResponse]
CLASSES = [CommandResponse, ServiceRequest, ServiceResponse]
DATAS = [b"0 0 0 ", b"0 0 1 a", HUGE_DATA]
for cls, data in itertools.product(CLASSES, DATAS):
roundtrip(cls, data)
Loading