diff --git a/python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py b/python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py new file mode 100644 index 000000000..4f41d7989 --- /dev/null +++ b/python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py @@ -0,0 +1,215 @@ +"""Internal log-format primitives for test-result simulation logs. + +Two files per run: + +* **Log file** (e.g. ``foo.jsonl``) - append-only record of each logged API call, + one line per call. Written by :func:`log_request_to_file` in the test process + and read by :func:`iter_log_data_lines` / the replay subprocess. Has no header: + every line is a data line. +* **Tracking sidecar** (``foo.jsonl.tracking``) - small JSON file holding the + incremental replay cursor (``lastUploadedLine``) and the simulated-to-real ID + map. Written only by the replay subprocess via :meth:`LogTracking.save` using + a temp-file + ``os.replace`` so a crash can't leave a half-written sidecar. + Read once at replay start via :meth:`LogTracking.load`. Never touched by the + test process. + +# Concurrency + +With tracking moved out of the main log, the log file becomes strictly +append-only and has exactly one in-place mutator (the writer) and one scanner +(the replay subprocess). POSIX guarantees that an ``O_APPEND`` write atomically +bumps the EOF, so parallel writers can't lose data. To keep a concurrent reader +from observing a mid-append partial final line we still take ``LOCK_EX`` on the +writer's single append and ``LOCK_SH`` on the reader's ``readlines()``; there +is never any exclusive-vs-exclusive contention because nothing rewrites the +file any more. + +The sidecar has a single writer (the replay subprocess) and no live reader, so +it needs no locking. Atomic rename is still used to keep the on-disk contents +valid across crashes. + +``flock`` is advisory, so this contract only holds for processes that use these +helpers; ad-hoc writers are not protected. +""" + +from __future__ import annotations + +import fcntl +import json +import os +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Any, Generator + +from google.protobuf import json_format + +if TYPE_CHECKING: + from sift_client.sift_types.test_report import TestMeasurement, TestReport, TestStep + + +def _client_version() -> str: + from importlib.metadata import PackageNotFoundError, version + + try: + return version("sift_stack_py") + except PackageNotFoundError: + return "unknown" + + +@dataclass +class LogTracking: + """Incremental-replay cursor and simulated-to-real ID map. + + Persisted beside the log file (see module docstring for layout). The log + file itself is append-only and stores only API-call data lines. + + * ``last_uploaded_line`` is the count of data lines that have been + successfully replayed against the server. Each data line corresponds to a + single API call, so line granularity matches the atomic unit of work: a + line is either fully replayed or must be retried in its entirety. Data + lines are strictly append-only, so this counter is stable across runs. + * ``id_map`` maps simulated response IDs (created during the original test + run) to the real IDs assigned by the server during replay. Subsequent + ``Update*`` entries consult this map to translate IDs. + """ + + last_uploaded_line: int = 0 + id_map: dict[str, str] = field(default_factory=dict) + client_version: str = field(default_factory=_client_version) + + @staticmethod + def sidecar_path(log_path: str | Path) -> Path: + """Return the sidecar path for a given log file (``.tracking``).""" + p = Path(log_path) + return p.with_name(p.name + ".tracking") + + @classmethod + def load(cls, log_path: str | Path) -> LogTracking: + """Read tracking state for ``log_path``; return a fresh instance if missing or corrupt. + + A missing sidecar is the normal state before the first incremental tick. + A malformed sidecar is treated the same so a crash mid-write can't brick + replay; the worst case is a re-replay of already-uploaded lines, which + the server must be prepared for anyway. + """ + sidecar = cls.sidecar_path(log_path) + try: + data = json.loads(sidecar.read_text()) + except (FileNotFoundError, json.JSONDecodeError, OSError): + return cls() + return cls( + last_uploaded_line=data.get("lastUploadedLine", 0), + id_map=data.get("idMap", {}), + client_version=data.get("clientVersion", "unknown"), + ) + + def save(self, log_path: str | Path) -> None: + """Atomically write tracking state to the sidecar for ``log_path``. + + Uses temp-file + ``os.replace`` so readers (and crash recovery) never + observe a partially written sidecar. + """ + sidecar = self.sidecar_path(log_path) + sidecar.parent.mkdir(parents=True, exist_ok=True) + payload = json.dumps( + { + "clientVersion": self.client_version, + "lastUploadedLine": self.last_uploaded_line, + "idMap": self.id_map, + }, + separators=(",", ":"), + ) + tmp = sidecar.with_name(sidecar.name + ".tmp") + tmp.write_text(payload) + os.replace(tmp, sidecar) + + +@dataclass +class _ReplayState: + """Mutable state accumulated during log replay.""" + + report: TestReport | None = None + steps_by_id: dict[str, TestStep] = field(default_factory=dict) + steps_order: list[str] = field(default_factory=list) + measurements_by_id: dict[str, TestMeasurement] = field(default_factory=dict) + measurements_order: list[str] = field(default_factory=list) + + +@dataclass +class ReplayResult: + """Result of replaying a log file.""" + + report: TestReport + steps: list[TestStep] = field(default_factory=list) + measurements: list[TestMeasurement] = field(default_factory=list) + + +def log_request_to_file( + log_file: str | Path, + request_type: str, + request: Any, + response_id: str | None = None, +) -> None: + """Append a request as a JSON-encoded line to ``log_file``. + + Takes ``LOCK_EX`` across the append so a concurrent reader holding + ``LOCK_SH`` in :func:`iter_log_data_lines` can't see a mid-write partial + final line. See the module docstring for the full concurrency model. + + Args: + log_file: Path to the log file. + request_type: Type of request being logged. + request: The protobuf request to log. + response_id: Optional ID from the simulated response, embedded in the tag + for create operations so replay can map previously simulated IDs used + by simulated updates. + """ + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + tag = f"{request_type}:{response_id}" if response_id else request_type + request_dict = json_format.MessageToDict(request) + request_json = json.dumps(request_dict, separators=(",", ":")) + line = f"[{tag}] {request_json}\n" + with open(log_path, "a") as f: + fcntl.flock(f, fcntl.LOCK_EX) + # Closing the file flushes and releases the flock atomically; no + # explicit unlock needed here. + f.write(line) + + +def iter_log_data_lines( + log_path: Path, + start_line: int = 0, +) -> Generator[tuple[str, str | None, str], None, None]: + """Parse data lines from a log file. + + Yields ``(request_type, response_id, json_str)`` tuples. Each yielded item + corresponds to one logged API call. + + ``start_line`` is the count of data lines (1-based) already uploaded; the + iterator skips the first ``start_line`` lines and yields the rest. Pass 0 + to read all data lines. + + Acquires ``LOCK_SH`` only while snapshotting the file into memory, then + releases before yielding. Lines appended by a concurrent + :func:`log_request_to_file` after the snapshot are not visible this call -- + they will be picked up on the next invocation. + """ + line_pattern = re.compile(r"^\[(\w+)(?::([^\]]+))?\]\s*(.+)$") + with open(log_path) as f: + fcntl.flock(f, fcntl.LOCK_SH) + raw_lines = f.readlines() + + data_line_count = 0 + for raw_line in raw_lines: + line = raw_line.strip() + if not line: + continue + match = line_pattern.match(line) + if not match: + raise ValueError(f"Invalid log line: {line}") + data_line_count += 1 + if data_line_count <= start_line: + continue + yield (match.group(1), match.group(2), match.group(3)) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py index 0c6499694..ac4896190 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py @@ -1,10 +1,7 @@ from __future__ import annotations -import json import logging -import re import uuid -from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING, Any, cast @@ -43,6 +40,13 @@ from sift.test_reports.v1.test_reports_pb2 import TestStep as TestStepProto from sift.test_reports.v1.test_reports_pb2_grpc import TestReportServiceStub +from sift_client._internal.low_level_wrappers._test_results_log import ( + LogTracking, + ReplayResult, + _ReplayState, + iter_log_data_lines, + log_request_to_file, +) from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase from sift_client.sift_types.test_report import ( TestMeasurement, @@ -78,30 +82,6 @@ def __init__(self, grpc_client: GrpcClient): """ super().__init__(grpc_client) - @staticmethod - def _log_request_to_file( - log_file: str | Path, - request_type: str, - request: Any, - response_id: str | None = None, - ) -> None: - """Log a request to a file in JSON format. - - Args: - log_file: Path to the log file. - request_type: Type of request being logged. - request: The protobuf request to log. - response_id: Optional ID from the simulated response, embedded in the tag - for create operations so replay can map previously simulated IDs used by simulated updates. - """ - log_path = Path(log_file) - log_path.parent.mkdir(parents=True, exist_ok=True) - tag = f"{request_type}:{response_id}" if response_id else request_type - with open(log_path, "a") as f: - request_dict = json_format.MessageToDict(request) - request_json = json.dumps(request_dict, separators=(",", ":")) - f.write(f"[{tag}] {request_json}\n") - @staticmethod def simulate_create_test_report_response( request: CreateTestReportRequest, @@ -366,24 +346,31 @@ async def import_test_report(self, remote_file_id: str) -> TestReport: async def create_test_report( self, *, - test_report: TestReportCreate, + test_report: TestReportCreate | None = None, log_file: str | Path | None = None, + request: CreateTestReportRequest | None = None, + simulate: bool = False, ) -> TestReport: """Create a new test report. Args: test_report: The test report to create. log_file: If set, log the request to this file and return a simulated response. + request: Raw protobuf request (mutually exclusive with test_report). + simulate: If True, return a simulated response without making an API call. Returns: The created TestReport. """ - request = test_report.to_proto() + if request is None: + if test_report is None: + raise ValueError("Either test_report or request must be provided") + request = test_report.to_proto() - if log_file is not None: + if log_file is not None or simulate: simulated_proto = self.simulate_create_test_report_response(request) if log_file is not None: - self._log_request_to_file( + log_request_to_file( log_file, "CreateTestReport", request, @@ -479,9 +466,11 @@ async def list_all_test_reports( async def update_test_report( self, - update: TestReportUpdate, + update: TestReportUpdate | None = None, log_file: str | Path | None = None, existing: TestReport | None = None, + request: UpdateTestReportRequest | None = None, + simulate: bool = False, ) -> TestReport: """Update an existing test report. @@ -490,16 +479,21 @@ async def update_test_report( log_file: If set, log the request to this file and return a simulated response. existing: The full existing TestReport for simulation merge. If not provided, the simulated response will only contain the updated fields. + request: Raw protobuf request (mutually exclusive with update). + simulate: If True, return a simulated response without making an API call. Returns: The updated TestReport. """ - test_report_proto, field_mask = update.to_proto_with_mask() - request = UpdateTestReportRequest(test_report=test_report_proto, update_mask=field_mask) + if request is None: + if update is None: + raise ValueError("Either update or request must be provided") + test_report_proto, field_mask = update.to_proto_with_mask() + request = UpdateTestReportRequest(test_report=test_report_proto, update_mask=field_mask) - if log_file is not None: + if log_file is not None or simulate: if log_file is not None: - self._log_request_to_file(log_file, "UpdateTestReport", request) + log_request_to_file(log_file, "UpdateTestReport", request) return self.simulate_update_test_report_response(request, existing=existing) response = await self._grpc_client.get_stub(TestReportServiceStub).UpdateTestReport(request) @@ -525,24 +519,31 @@ async def delete_test_report(self, test_report_id: str) -> None: async def create_test_step( self, - test_step: TestStepCreate, + test_step: TestStepCreate | None = None, log_file: str | Path | None = None, + request: CreateTestStepRequest | None = None, + simulate: bool = False, ) -> TestStep: """Create a new test step. Args: test_step: The test step to create. log_file: If set, log the request to this file and return a simulated response. + request: Raw protobuf request (mutually exclusive with test_step). + simulate: If True, return a simulated response without making an API call. Returns: The created TestStep. """ - request = CreateTestStepRequest(test_step=test_step.to_proto()) + if request is None: + if test_step is None: + raise ValueError("Either test_step or request must be provided") + request = CreateTestStepRequest(test_step=test_step.to_proto()) - if log_file is not None: + if log_file is not None or simulate: simulated_proto = self.simulate_create_test_step_response(request) if log_file is not None: - self._log_request_to_file( + log_request_to_file( log_file, "CreateTestStep", request, @@ -618,9 +619,11 @@ async def list_all_test_steps( async def update_test_step( self, - update: TestStepUpdate, + update: TestStepUpdate | None = None, log_file: str | Path | None = None, existing: TestStep | None = None, + request: UpdateTestStepRequest | None = None, + simulate: bool = False, ) -> TestStep: """Update an existing test step. @@ -629,19 +632,24 @@ async def update_test_step( log_file: If set, log the request to this file and return a simulated response. existing: The full existing TestStep for simulation merge. If not provided, the simulated response will only contain the updated fields. + request: Raw protobuf request (mutually exclusive with update). + simulate: If True, return a simulated response without making an API call. Returns: The updated TestStep. """ - test_step_proto, field_mask = update.to_proto_with_mask() - has_error_info = test_step_proto.HasField("error_info") - if has_error_info: - field_mask.paths.append("error_info") - request = UpdateTestStepRequest(test_step=test_step_proto, update_mask=field_mask) - - if log_file is not None: + if request is None: + if update is None: + raise ValueError("Either update or request must be provided") + test_step_proto, field_mask = update.to_proto_with_mask() + has_error_info = test_step_proto.HasField("error_info") + if has_error_info: + field_mask.paths.append("error_info") + request = UpdateTestStepRequest(test_step=test_step_proto, update_mask=field_mask) + + if log_file is not None or simulate: if log_file is not None: - self._log_request_to_file(log_file, "UpdateTestStep", request) + log_request_to_file(log_file, "UpdateTestStep", request) return self.simulate_update_test_step_response(request, existing=existing) response = await self._grpc_client.get_stub(TestReportServiceStub).UpdateTestStep(request) @@ -667,24 +675,31 @@ async def delete_test_step(self, test_step_id: str) -> None: async def create_test_measurement( self, - test_measurement: TestMeasurementCreate, + test_measurement: TestMeasurementCreate | None = None, log_file: str | Path | None = None, + request: CreateTestMeasurementRequest | None = None, + simulate: bool = False, ) -> TestMeasurement: """Create a new test measurement. Args: test_measurement: The test measurement to create. log_file: If set, log the request to this file and return a simulated response. + request: Raw protobuf request (mutually exclusive with test_measurement). + simulate: If True, return a simulated response without making an API call. Returns: The created TestMeasurement. """ - request = CreateTestMeasurementRequest(test_measurement=test_measurement.to_proto()) + if request is None: + if test_measurement is None: + raise ValueError("Either test_measurement or request must be provided") + request = CreateTestMeasurementRequest(test_measurement=test_measurement.to_proto()) - if log_file is not None: + if log_file is not None or simulate: simulated_proto = self.simulate_create_test_measurement_response(request) if log_file is not None: - self._log_request_to_file( + log_request_to_file( log_file, "CreateTestMeasurement", request, @@ -700,25 +715,32 @@ async def create_test_measurement( async def create_test_measurements( self, - test_measurements: list[TestMeasurementCreate], + test_measurements: list[TestMeasurementCreate] | None = None, log_file: str | Path | None = None, + request: CreateTestMeasurementsRequest | None = None, + simulate: bool = False, ) -> tuple[int, list[str]]: """Create multiple test measurements in a single request. Args: test_measurements: The test measurements to create. log_file: If set, log the request to this file and return a simulated response. + request: Raw protobuf request (mutually exclusive with test_measurements). + simulate: If True, return a simulated response without making an API call. Returns: A tuple of (measurements_created_count, measurement_ids). """ - measurement_protos = [tm.to_proto() for tm in test_measurements] - request = CreateTestMeasurementsRequest(test_measurements=measurement_protos) + if request is None: + if test_measurements is None: + raise ValueError("Either test_measurements or request must be provided") + measurement_protos = [tm.to_proto() for tm in test_measurements] + request = CreateTestMeasurementsRequest(test_measurements=measurement_protos) - if log_file is not None: + if log_file is not None or simulate: count, measurement_ids = self.simulate_create_test_measurements_response(request) if log_file is not None: - self._log_request_to_file( + log_request_to_file( log_file, "CreateTestMeasurements", request, @@ -798,9 +820,11 @@ async def list_all_test_measurements( async def update_test_measurement( self, - update: TestMeasurementUpdate, + update: TestMeasurementUpdate | None = None, log_file: str | Path | None = None, existing: TestMeasurement | None = None, + request: UpdateTestMeasurementRequest | None = None, + simulate: bool = False, ) -> TestMeasurement: """Update an existing test measurement. @@ -809,18 +833,23 @@ async def update_test_measurement( log_file: If set, log the request to this file and return a simulated response. existing: The full existing TestMeasurement for simulation merge. If not provided, the simulated response will only contain the updated fields. + request: Raw protobuf request (mutually exclusive with update). + simulate: If True, return a simulated response without making an API call. Returns: The updated TestMeasurement. """ - test_measurement_proto, field_mask = update.to_proto_with_mask() - request = UpdateTestMeasurementRequest( - test_measurement=test_measurement_proto, update_mask=field_mask - ) + if request is None: + if update is None: + raise ValueError("Either update or request must be provided") + test_measurement_proto, field_mask = update.to_proto_with_mask() + request = UpdateTestMeasurementRequest( + test_measurement=test_measurement_proto, update_mask=field_mask + ) - if log_file is not None: + if log_file is not None or simulate: if log_file is not None: - self._log_request_to_file(log_file, "UpdateTestMeasurement", request) + log_request_to_file(log_file, "UpdateTestMeasurement", request) return self.simulate_update_test_measurement_response(request, existing=existing) response = await self._grpc_client.get_stub(TestReportServiceStub).UpdateTestMeasurement( @@ -847,15 +876,24 @@ async def delete_test_measurement(self, measurement_id: str) -> None: async def import_log_file( self, log_file: str | Path, + incremental: bool = False, ) -> ReplayResult: - """Replay a log file by parsing each entry, simulating the results, then creating for real. + """Replay a log file, creating real API objects from the logged simulation data. - This method reads a log file created by the simulation logging, reconstructs - all the objects via simulation, and then creates them via the actual API. - IDs are mapped from simulated to real during the creation process. + Two modes are available: + + * **batch** (default): Parse the entire log, reconstruct objects via + simulation, then create them all via the API in one pass. The + ``LogTracking`` header on line 0 is ignored. + * **incremental** (``incremental=True``): Walk the log line-by-line, + issuing the real API call for each entry as it is encountered. + ``LogTracking.last_uploaded_line`` is advanced only after the call + succeeds, so a failure during a line causes the entire line to be + retried on the next invocation; already-uploaded lines are skipped. Args: log_file: Path to the log file to replay. + incremental: If True, use incremental mode. Returns: A ReplayResult containing the created report, steps, and measurements. @@ -864,123 +902,234 @@ async def import_log_file( if not log_path.exists(): raise FileNotFoundError(f"Log file not found: {log_file}") - simulated_report: TestReport | None = None - simulated_steps_by_id: dict[str, TestStep] = {} - simulated_steps_order: list[str] = [] - simulated_measurements_by_id: dict[str, TestMeasurement] = {} - simulated_measurements_order: list[str] = [] - - line_pattern = re.compile(r"^\[(\w+)(?::([^\]]+))?\]\s*(.+)$") - - # Parse the log file and simulate the responses (without calling the API). - with open(log_path) as f: - for line in f: - line = line.strip() - if not line: - continue - - match = line_pattern.match(line) - if not match: - raise ValueError(f"Skipping malformed log line: {line[:100]}...") - - request_type = match.group(1) - response_id = match.group(2) - json_str = match.group(3) - - if request_type == "CreateTestReport": - create_report_req = CreateTestReportRequest() - json_format.Parse(json_str, create_report_req) - report_proto = self.simulate_create_test_report_response(create_report_req) - if response_id: - report_proto.test_report_id = response_id - simulated_report = TestReport._from_proto(report_proto) - - elif request_type == "CreateTestStep": - create_step_req = CreateTestStepRequest() - json_format.Parse(json_str, create_step_req) - step_proto = self.simulate_create_test_step_response(create_step_req) - if response_id: - step_proto.test_step_id = response_id - step = TestStep._from_proto(step_proto) - simulated_steps_by_id[step._id_or_error] = step - simulated_steps_order.append(step._id_or_error) - - elif request_type == "CreateTestMeasurement": - create_meas_req = CreateTestMeasurementRequest() - json_format.Parse(json_str, create_meas_req) - meas_proto = self.simulate_create_test_measurement_response(create_meas_req) - if response_id: - meas_proto.measurement_id = response_id - measurement = TestMeasurement._from_proto(meas_proto) - simulated_measurements_by_id[measurement._id_or_error] = measurement - simulated_measurements_order.append(measurement._id_or_error) - - elif request_type == "CreateTestMeasurements": - create_batch_req = CreateTestMeasurementsRequest() - json_format.Parse(json_str, create_batch_req) - original_ids = response_id.split(",") if response_id else [] - for i, tm_proto in enumerate(create_batch_req.test_measurements): - single_request = CreateTestMeasurementRequest(test_measurement=tm_proto) - batch_meas_proto = self.simulate_create_test_measurement_response( - single_request - ) - if i < len(original_ids): - batch_meas_proto.measurement_id = original_ids[i] - measurement = TestMeasurement._from_proto(batch_meas_proto) - simulated_measurements_by_id[measurement._id_or_error] = measurement - simulated_measurements_order.append(measurement._id_or_error) - - elif request_type == "UpdateTestReport": - if simulated_report is None: - raise ValueError("UpdateTestReport found before CreateTestReport") - update_report_req = UpdateTestReportRequest() - json_format.Parse(json_str, update_report_req) - simulated_report = self.simulate_update_test_report_response( - update_report_req, existing=simulated_report - ) - - elif request_type == "UpdateTestStep": - update_step_req = UpdateTestStepRequest() - json_format.Parse(json_str, update_step_req) - step_id = update_step_req.test_step.test_step_id - if step_id not in simulated_steps_by_id: - raise ValueError(f"UpdateTestStep for unknown step: {step_id}") - simulated_steps_by_id[step_id] = self.simulate_update_test_step_response( - update_step_req, existing=simulated_steps_by_id[step_id] - ) - - elif request_type == "UpdateTestMeasurement": - update_meas_req = UpdateTestMeasurementRequest() - json_format.Parse(json_str, update_meas_req) - measurement_id = update_meas_req.test_measurement.measurement_id - if measurement_id not in simulated_measurements_by_id: - raise ValueError( - f"UpdateTestMeasurement for unknown measurement: {measurement_id}" - ) - simulated_measurements_by_id[measurement_id] = ( - self.simulate_update_test_measurement_response( - update_meas_req, - existing=simulated_measurements_by_id[measurement_id], - ) - ) - - else: - logger.warning(f"Unknown request type: {request_type}") - - # Send the test report to the server, making sure to update the IDs to real ones as we go. - if simulated_report is None: + if incremental: + return await self._incremental_import_log_file(log_path) + + return await self._batch_import_log_file(log_path) + + # ------------------------------------------------------------------ + # Shared replay dispatch + # ------------------------------------------------------------------ + + async def _import_entry( + self, + request_type: str, + response_id: str | None, + json_str: str, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + """Process a single log entry, updating *state* in place. + + When *simulate* is True the create/update methods return simulated + responses (no network call). When False they issue real gRPC calls. + *id_map* is updated so that subsequent entries can remap IDs that + were generated during the original simulation run. + """ + handlers = { + "CreateTestReport": self._replay_create_report, + "CreateTestStep": self._replay_create_step, + "CreateTestMeasurement": self._replay_create_measurement, + "CreateTestMeasurements": self._replay_create_measurements, + "UpdateTestReport": self._replay_update_report, + "UpdateTestStep": self._replay_update_step, + "UpdateTestMeasurement": self._replay_update_measurement, + } + handler = handlers.get(request_type) + if handler is None: + return + await handler(json_str, response_id, simulate=simulate, id_map=id_map, state=state) + + @staticmethod + def _map_id(id_map: dict[str, str], sid: str) -> str: + """Translate a simulated ID to its real counterpart, or return *sid* unchanged.""" + return id_map.get(sid, sid) + + async def _replay_create_report( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = CreateTestReportRequest() + json_format.Parse(json_str, request) + state.report = await self.create_test_report(request=request, simulate=simulate) + if response_id: + id_map[response_id] = state.report._id_or_error + + async def _replay_create_step( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = CreateTestStepRequest() + json_format.Parse(json_str, request) + request.test_step.test_report_id = self._map_id(id_map, request.test_step.test_report_id) + if request.test_step.parent_step_id: + request.test_step.parent_step_id = self._map_id( + id_map, request.test_step.parent_step_id + ) + step = await self.create_test_step(request=request, simulate=simulate) + if response_id: + id_map[response_id] = step._id_or_error + state.steps_by_id[step._id_or_error] = step + state.steps_order.append(step._id_or_error) + + async def _replay_create_measurement( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = CreateTestMeasurementRequest() + json_format.Parse(json_str, request) + request.test_measurement.test_step_id = self._map_id( + id_map, request.test_measurement.test_step_id + ) + measurement = await self.create_test_measurement(request=request, simulate=simulate) + if response_id: + id_map[response_id] = measurement._id_or_error + state.measurements_by_id[measurement._id_or_error] = measurement + state.measurements_order.append(measurement._id_or_error) + + async def _replay_create_measurements( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = CreateTestMeasurementsRequest() + json_format.Parse(json_str, request) + for tm in request.test_measurements: + tm.test_step_id = self._map_id(id_map, tm.test_step_id) + original_ids = response_id.split(",") if response_id else [] + if simulate: + # Batch endpoint has no simulate path; fan out to per-measurement simulate calls. + for i, tm_proto in enumerate(request.test_measurements): + single_req = CreateTestMeasurementRequest(test_measurement=tm_proto) + meas = await self.create_test_measurement(request=single_req, simulate=True) + if i < len(original_ids): + id_map[original_ids[i]] = meas._id_or_error + state.measurements_by_id[meas._id_or_error] = meas + state.measurements_order.append(meas._id_or_error) + else: + _, real_ids = await self.create_test_measurements(request=request) + for i, real_id in enumerate(real_ids): + if i < len(original_ids): + id_map[original_ids[i]] = real_id + + async def _replay_update_report( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + if state.report is None: + raise ValueError("UpdateTestReport found before CreateTestReport") + request = UpdateTestReportRequest() + json_format.Parse(json_str, request) + request.test_report.test_report_id = self._map_id( + id_map, request.test_report.test_report_id + ) + state.report = await self.update_test_report( + request=request, simulate=simulate, existing=state.report + ) + + async def _replay_update_step( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = UpdateTestStepRequest() + json_format.Parse(json_str, request) + orig_step_id = request.test_step.test_step_id + mapped_step_id = self._map_id(id_map, orig_step_id) + request.test_step.test_step_id = mapped_step_id + existing_step = state.steps_by_id.get(mapped_step_id) + if simulate and existing_step is None: + raise ValueError(f"UpdateTestStep for unknown step: {orig_step_id}") + updated_step = await self.update_test_step( + request=request, simulate=simulate, existing=existing_step + ) + if mapped_step_id in state.steps_by_id: + state.steps_by_id[mapped_step_id] = updated_step + + async def _replay_update_measurement( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = UpdateTestMeasurementRequest() + json_format.Parse(json_str, request) + orig_meas_id = request.test_measurement.measurement_id + mapped_meas_id = self._map_id(id_map, orig_meas_id) + request.test_measurement.measurement_id = mapped_meas_id + existing_meas = state.measurements_by_id.get(mapped_meas_id) + if simulate and existing_meas is None: + raise ValueError(f"UpdateTestMeasurement for unknown measurement: {orig_meas_id}") + updated_meas = await self.update_test_measurement( + request=request, simulate=simulate, existing=existing_meas + ) + if mapped_meas_id in state.measurements_by_id: + state.measurements_by_id[mapped_meas_id] = updated_meas + + # ------------------------------------------------------------------ + # Batch replay (default) + # ------------------------------------------------------------------ + + async def _batch_import_log_file(self, log_path: Path) -> ReplayResult: + id_map: dict[str, str] = {} + state = _ReplayState() + + for request_type, response_id, json_str in iter_log_data_lines(log_path): + await self._import_entry( + request_type, + response_id, + json_str, + simulate=True, + id_map=id_map, + state=state, + ) + + if state.report is None: raise ValueError("No CreateTestReport found in log file") - simulated_step_id_map: dict[str, str] = {} + real_id_map: dict[str, str] = {} - real_report = await self._create_report_from_simulated(simulated_report) + real_report = await self._create_report_from_simulated(state.report) real_report_id = real_report._id_or_error real_steps: list[TestStep] = [] - for sim_step_id in simulated_steps_order: - sim_step = simulated_steps_by_id[sim_step_id] + for sim_step_id in state.steps_order: + sim_step = state.steps_by_id[sim_step_id] real_parent_step_id = ( - simulated_step_id_map.get(sim_step.parent_step_id, sim_step.parent_step_id) + real_id_map.get(sim_step.parent_step_id, sim_step.parent_step_id) if sim_step.parent_step_id else None ) @@ -989,12 +1138,12 @@ async def import_log_file( ) real_step = await self.create_test_step(step_create) real_steps.append(real_step) - simulated_step_id_map[sim_step_id] = real_step._id_or_error + real_id_map[sim_step_id] = real_step._id_or_error real_measurements: list[TestMeasurement] = [] - for sim_measurement_id in simulated_measurements_order: - sim_measurement = simulated_measurements_by_id[sim_measurement_id] - real_step_id = simulated_step_id_map.get( + for sim_measurement_id in state.measurements_order: + sim_measurement = state.measurements_by_id[sim_measurement_id] + real_step_id = real_id_map.get( sim_measurement.test_step_id, sim_measurement.test_step_id ) measurement_create = self._measurement_create_from_simulated( @@ -1009,6 +1158,52 @@ async def import_log_file( measurements=real_measurements, ) + # ------------------------------------------------------------------ + # Incremental replay + # ------------------------------------------------------------------ + + async def _incremental_import_log_file(self, log_path: Path) -> ReplayResult: + """Replay line-by-line, issuing real API calls and updating tracking. + + Resumes from ``LogTracking.last_uploaded_line`` (loaded from the + ``.tracking`` sidecar) so already-uploaded entries are skipped on + subsequent ticks rather than re-sent to the server. Each data line is a + single atomic API call; if replay of a line fails, + ``last_uploaded_line`` is not advanced so the whole line is retried + next tick. + """ + tracking = LogTracking.load(log_path) + id_map = tracking.id_map + state = _ReplayState() + + for request_type, response_id, json_str in iter_log_data_lines( + log_path, start_line=tracking.last_uploaded_line + ): + await self._import_entry( + request_type, + response_id, + json_str, + simulate=False, + id_map=id_map, + state=state, + ) + + tracking.last_uploaded_line += 1 + tracking.save(log_path) + + if state.report is None: + raise ValueError("No CreateTestReport found in log file") + + return ReplayResult( + report=state.report, + steps=[state.steps_by_id[sid] for sid in state.steps_order], + measurements=[state.measurements_by_id[mid] for mid in state.measurements_order], + ) + + # ------------------------------------------------------------------ + # Log line parsing helpers + # ------------------------------------------------------------------ + async def _create_report_from_simulated(self, simulated: TestReport) -> TestReport: """Create a real test report from a simulated one.""" report_create = TestReportCreate( @@ -1067,19 +1262,8 @@ def _measurement_create_from_simulated( ) -def _client_version() -> str: - from importlib.metadata import PackageNotFoundError, version - - try: - return version("sift_stack_py") - except PackageNotFoundError: - return "unknown" - - -@dataclass -class ReplayResult: - """Result of replaying a log file.""" - - report: TestReport - steps: list[TestStep] = field(default_factory=list) - measurements: list[TestMeasurement] = field(default_factory=list) +__all__ = [ + "LogTracking", + "ReplayResult", + "TestResultsLowLevelClient", +] diff --git a/python/lib/sift_client/_tests/conftest.py b/python/lib/sift_client/_tests/conftest.py index 1d44bb437..5683182e5 100644 --- a/python/lib/sift_client/_tests/conftest.py +++ b/python/lib/sift_client/_tests/conftest.py @@ -78,16 +78,10 @@ def ci_pytest_tag(sift_client): return tag -from sift_client.util.test_results import ( - client_has_connection, # noqa: F401 - pytest_runtest_makereport, # noqa: F401 -) -from sift_client.util.test_results import ( - module_substep_check_connection as module_substep, # noqa: F401 -) -from sift_client.util.test_results import ( - report_context_check_connection as report_context, # noqa: F401 -) -from sift_client.util.test_results import ( - step_check_connection as step, # noqa: F401 -) +# Import the Sift test results fixtures the way we recommend to users. +from sift_client.util.test_results import * # noqa: F403 + + +def pytest_configure(config: pytest.Config) -> None: + """Enable the Sift connection-check mode for the fixtures used in this test suite since we run w/ mock client in non-integration tests.""" + config.option.sift_test_results_check_connection = True diff --git a/python/lib/sift_client/_tests/resources/test_test_results.py b/python/lib/sift_client/_tests/resources/test_test_results.py index d04609724..60941bfb9 100644 --- a/python/lib/sift_client/_tests/resources/test_test_results.py +++ b/python/lib/sift_client/_tests/resources/test_test_results.py @@ -697,24 +697,76 @@ def test_import_log_file_round_trip(self, sift_client, nostromo_run, tmp_path): compare_test_measurement_fields(replayed_m, direct_m) @pytest.mark.asyncio - async def test_malformed_log_line_raises(self, tmp_path): - """import_log_file raises ValueError on a line that doesn't match the expected format.""" + async def test_malformed_log_line_skipped(self, tmp_path): + """Malformed lines raise a ValueError during iteration.""" log_file = tmp_path / "bad.jsonl" log_file.write_text("this is not a valid log line\n") client = TestResultsLowLevelClient(grpc_client=MagicMock()) - with pytest.raises(ValueError, match="malformed log line"): + with pytest.raises(ValueError, match="Invalid log line: this is not a valid log lin"): await client.import_log_file(log_file) @pytest.mark.asyncio - async def test_malformed_line_after_valid_lines_raises(self, tmp_path): - """A malformed line after valid entries still raises.""" - log_file = tmp_path / "mixed.jsonl" - log_file.write_text( - '[CreateTestReport] {"name":"r","testCase":"c","testSystemName":"s"}\n' - "totally broken line\n" - ) + async def test_empty_log_file_raises(self, tmp_path): + """A log file with no entries raises 'No CreateTestReport'.""" + log_file = tmp_path / "empty.jsonl" + log_file.touch() client = TestResultsLowLevelClient(grpc_client=MagicMock()) - with pytest.raises(ValueError, match="malformed log line"): + with pytest.raises(ValueError, match="No CreateTestReport found"): await client.import_log_file(log_file) + + def test_concurrent_append_and_tracking_save_preserves_all_lines(self, tmp_path): + """Writer appending to the log shares no mutation point with the tracking sidecar. + + With tracking moved out of the main log into ``.tracking``, the log + file is strictly append-only -- there is no path by which concurrent + ``LogTracking.save`` calls can clobber appended lines. This test pins + that invariant: 500 writer appends run alongside a hot updater looping + on sidecar writes, and every append must survive. + """ + import threading + import time + + from sift.test_reports.v1.test_reports_pb2 import CreateTestReportRequest + + from sift_client._internal.low_level_wrappers._test_results_log import ( + LogTracking, + log_request_to_file, + ) + + log_file = tmp_path / "race.jsonl" + + n_appends = 500 + stop = threading.Event() + request = CreateTestReportRequest() + + def writer() -> None: + for i in range(n_appends): + log_request_to_file(log_file, "CreateTestReport", request, response_id=str(i)) + + def updater() -> None: + tracking = LogTracking(last_uploaded_line=0) + while not stop.is_set(): + tracking.last_uploaded_line += 1 + tracking.save(log_file) + time.sleep(0) + + t_updater = threading.Thread(target=updater) + t_writer = threading.Thread(target=writer) + t_updater.start() + t_writer.start() + t_writer.join() + stop.set() + t_updater.join() + + with open(log_file) as f: + data_lines = [line for line in f if line.strip()] + assert len(data_lines) == n_appends, ( + f"expected {n_appends} appended data lines, found {len(data_lines)}" + ) + + sidecar = LogTracking.sidecar_path(log_file) + assert sidecar.exists() + reloaded = LogTracking.load(log_file) + assert reloaded.last_uploaded_line >= 1 diff --git a/python/lib/sift_client/_tests/util/conftest.py b/python/lib/sift_client/_tests/util/conftest.py index 3d8eb07fc..45279cca6 100644 --- a/python/lib/sift_client/_tests/util/conftest.py +++ b/python/lib/sift_client/_tests/util/conftest.py @@ -1,43 +1,14 @@ -"""Override report_context to disable log file simulation for integration tests in this directory so that we can exercise the context manager when no log file is provided.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING, Generator - import pytest -from sift_client.util.test_results.pytest_util import _report_context_impl, _step_impl - -if TYPE_CHECKING: - from sift_client.client import SiftClient - from sift_client.util.test_results.context_manager import NewStep, ReportContext - - -@pytest.fixture(scope="session", autouse=True) -def report_context( - sift_client: SiftClient, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[ReportContext | None, None, None]: - if client_has_connection: - yield from _report_context_impl(sift_client, request, log_file=None) - else: - yield None - -@pytest.fixture(autouse=True) -def step( - report_context: ReportContext, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[NewStep | None, None, None]: - if client_has_connection: - yield from _step_impl(report_context, request) - else: - yield None +def pytest_addoption(parser: pytest.Parser) -> None: + existing_options = [opt.names() for opt in parser._anonymous.options] + # Flatten the list of lists into a single list of strings + flat_options = [item for sublist in existing_options for item in sublist] + if not any("--sift-test-results-log-file" in name for name in flat_options): + parser.addoption("--sift-test-results-log-file", action="store_true", default=False) -@pytest.fixture(scope="module", autouse=True) -def module_substep( - report_context: ReportContext, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[NewStep | None, None, None]: - if client_has_connection: - yield from _step_impl(report_context, request) - else: - yield None +def pytest_configure(config: pytest.Config) -> None: + """Configure the pytest configuration to disable the Sift test results log file.""" + config.option.sift_test_results_log_file = False diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 62fe9d87a..de0107068 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -2001,7 +2001,7 @@ class TestResultsAPI: """ ... - def import_log_file(self, log_file: str | Path) -> ReplayResult: + def import_log_file(self, log_file: str | Path, incremental: bool = False) -> ReplayResult: """Replay a log file by parsing each entry, simulating the results, then creating for real. This method reads a log file created by the simulation logging, reconstructs @@ -2009,7 +2009,8 @@ class TestResultsAPI: IDs are mapped from simulated to real during the creation process. Args: - log_file: Path to the log file to replay. + log_file: Path to the log file to import. + incremental: (internal tooling) If True, goes line by line and calls API every event -- keeps track of last line sent so it can be called after some updates and be additive vs. replaying the entire log file each time(i.e. when False, reads the entire log file, building a test report in memory, then sends the calls for each step/measurement to the API). Returns: A ReplayResult containing the created report, steps, and measurements. diff --git a/python/lib/sift_client/resources/test_results.py b/python/lib/sift_client/resources/test_results.py index d739b4400..a6ffec2e0 100644 --- a/python/lib/sift_client/resources/test_results.py +++ b/python/lib/sift_client/resources/test_results.py @@ -619,6 +619,7 @@ async def delete_measurement(self, *, test_measurement: str | TestMeasurement) - async def import_log_file( self, log_file: str | Path, + incremental: bool = False, ) -> ReplayResult: """Replay a log file by parsing each entry, simulating the results, then creating for real. @@ -627,12 +628,13 @@ async def import_log_file( IDs are mapped from simulated to real during the creation process. Args: - log_file: Path to the log file to replay. + log_file: Path to the log file to import. + incremental: (internal tooling) If True, goes line by line and calls API every event -- keeps track of last line sent so it can be called after some updates and be additive vs. replaying the entire log file each time(i.e. when False, reads the entire log file, building a test report in memory, then sends the calls for each step/measurement to the API). Returns: A ReplayResult containing the created report, steps, and measurements. """ - result = await self._low_level_client.import_log_file(log_file) + result = await self._low_level_client.import_log_file(log_file, incremental=incremental) result.report = self._apply_client_to_instance(result.report) result.steps = self._apply_client_to_instances(result.steps) result.measurements = self._apply_client_to_instances(result.measurements) diff --git a/python/lib/sift_client/scripts/import_test_result_log.py b/python/lib/sift_client/scripts/import_test_result_log.py index 7b01f4d29..7e14e4d59 100644 --- a/python/lib/sift_client/scripts/import_test_result_log.py +++ b/python/lib/sift_client/scripts/import_test_result_log.py @@ -3,9 +3,45 @@ from __future__ import annotations import argparse +import logging import os +import select +import sys +import tempfile +from typing import TYPE_CHECKING from sift_client import SiftClient, SiftConnectionConfig +from sift_client.util.test_results.context_manager import log_replay_instructions + +if TYPE_CHECKING: + from sift_client._internal.low_level_wrappers.test_results import ReplayResult + +logger = logging.getLogger(__name__) + + +def _print_result(result: ReplayResult) -> None: + print(f"Report: {result.report.name} (id={result.report.id_})") + print(f"Steps: {len(result.steps)}") + for step in result.steps: + print(f" - {step.step_path} [{step.status}]") + print(f"Measurements: {len(result.measurements)}") + for m in result.measurements: + print(f" - {m.name}: passed={m.passed}") + + +def _incremental_import_loop(client: SiftClient, log_file: str) -> ReplayResult | None: + """Replay incrementally in a loop until stdin is closed (EOF).""" + result = None + while True: + received_signal, _, _ = select.select([sys.stdin], [], [], 1.0) + result = client.test_results.import_log_file(log_file, incremental=True) + if received_signal: + break + logger.info(f"Replay completed: {result}") + fp = os.path.abspath(log_file) + if fp.startswith(tempfile.gettempdir()): + os.remove(fp) + return result def main() -> None: @@ -17,6 +53,9 @@ def main() -> None: parser.add_argument("--grpc-url", default=os.getenv("SIFT_GRPC_URI")) parser.add_argument("--rest-url", default=os.getenv("SIFT_REST_URI")) parser.add_argument("--api-key", default=os.getenv("SIFT_API_KEY")) + parser.add_argument( + "--incremental", action="store_true", help="Import the log file incrementally." + ) args = parser.parse_args() if not args.grpc_url or not args.rest_url or not args.api_key: @@ -33,15 +72,21 @@ def main() -> None: ) ) - result = client.test_results.import_log_file(args.log_file) + try: + if args.incremental: + result = _incremental_import_loop(client, args.log_file) + else: + result = client.test_results.import_log_file(args.log_file) + fp = os.path.abspath(args.log_file) + if fp.startswith(tempfile.gettempdir()): + os.remove(fp) + except Exception as e: + logger.error(e) + log_replay_instructions(args.log_file) + raise - print(f"Report: {result.report.name} (id={result.report.id_})") - print(f"Steps: {len(result.steps)}") - for step in result.steps: - print(f" - {step.step_path} [{step.status}]") - print(f"Measurements: {len(result.measurements)}") - for m in result.measurements: - print(f" - {m.name}: passed={m.passed}") + if result: + _print_result(result) if __name__ == "__main__": diff --git a/python/lib/sift_client/util/test_results/__init__.py b/python/lib/sift_client/util/test_results/__init__.py index 6f80ce382..e7a82866c 100644 --- a/python/lib/sift_client/util/test_results/__init__.py +++ b/python/lib/sift_client/util/test_results/__init__.py @@ -58,7 +58,15 @@ def main(self): - If you want each module(file) to be marked as a step w/ each test as a substep, import the `module_substep` fixture as well. - The `report_context` fixture requires a fixture `sift_client` returning an `SiftClient` instance to be passed in. -Note: FedRAMP users: report_context will log test results to a temp file to avoid API calls during test execution. If this is a shared environment, you should import the `report_context_no_logging` fixture instead. +Note: FedRAMP users: report_context will log test results to a temp file to avoid API calls during test execution. If this is a shared environment, you can disable logging by passing ``--sift-test-results-log-file=false``. + +#### Configuration + +Import the `pytest_addoption` function to add configuration options for Test Results to the commandline or add the options to your pyproject.toml file (https://docs.pytest.org/en/stable/reference/customize.html#configuration). If ommitted, will use the default values described below. + +- Git metadata: Include git metadata (repo, branch, commit) in the test results. Default is True. You can disable it by passing `--no-sift-test-results-git-metadata`. +- Log file: Write test results to a file. This happens automatically but you can configure specify a specific log file by passing `--sift-test-results-log-file=` or disable logging by passing `--sift-test-results-log-file=false`. +- Check connection: Pass `--sift-test-results-check-connection` (off by default) to make the `report_context`, `step`, and `module_substep` fixtures no-op when the Sift client has no connection to the server. Requires a `client_has_connection` fixture to be available. ###### Example at top of your test file or in your conftest.py file: @@ -75,7 +83,7 @@ def sift_client() -> SiftClient: return client -from sift_client.util.test_results import pytest_runtest_makereport, report_context, step, module_substep +from sift_client.util.test_results import * ``` ###### Then in your test file: @@ -100,13 +108,10 @@ def test_example(report_context, step): from .pytest_util import ( client_has_connection, module_substep, - module_substep_check_connection, + pytest_addoption, pytest_runtest_makereport, report_context, - report_context_check_connection, - report_context_no_logging, step, - step_check_connection, ) __all__ = [ @@ -114,11 +119,8 @@ def test_example(report_context, step): "ReportContext", "client_has_connection", "module_substep", - "module_substep_check_connection", + "pytest_addoption", "pytest_runtest_makereport", "report_context", - "report_context_check_connection", - "report_context_no_logging", "step", - "step_check_connection", ] diff --git a/python/lib/sift_client/util/test_results/context_manager.py b/python/lib/sift_client/util/test_results/context_manager.py index 05e6fc3d4..4c179c060 100644 --- a/python/lib/sift_client/util/test_results/context_manager.py +++ b/python/lib/sift_client/util/test_results/context_manager.py @@ -4,9 +4,10 @@ import logging import os import socket +import subprocess import tempfile import traceback -from contextlib import AbstractContextManager +from contextlib import AbstractContextManager, contextmanager from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING @@ -37,6 +38,63 @@ logger = logging.getLogger(__name__) +def log_replay_instructions(log_file: str | Path | None) -> None: + """Log instructions for manually replaying a test result log file. + + Used when an import/replay attempt fails so the user can retry against the same file. + """ + if log_file is None: + return + logger.error( + f"Error replaying log file: {log_file}.\n" + f" Can replay with `replay-test-result-log {log_file}`." + ) + + +@contextmanager +def _quiet_fork_stderr(): + """Redirect fd 2 to /dev/null across a ``fork()`` to discard gRPC's prefork notices. + + Redirecting fd 2 at the fd level (``os.dup2``) is what gRPC's handlers actually + write to, so wrapping a fork-site in this context manager reliably swallows those + notices without touching gRPC's global state. Scope the ``with`` block as tightly + as possible since it affects every thread in the process while active. + """ + saved_fd = os.dup(2) + devnull_fd = os.open(os.devnull, os.O_WRONLY) + try: + os.dup2(devnull_fd, 2) + os.close(devnull_fd) + yield + finally: + os.dup2(saved_fd, 2) + os.close(saved_fd) + + +def _git_metadata() -> dict[str, str] | None: + """Return git branch and commit hash, or None if not in a git repo.""" + try: + with _quiet_fork_stderr(): + branch = subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + commit = subprocess.check_output( + ["git", "describe", "--always", "--dirty", "--exclude", "*"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + repo = subprocess.check_output( + ["git", "remote", "get-url", "origin"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + return {"git_repo": repo, "git_branch": branch, "git_commit": commit} + except Exception: + return None + + class ReportContext(AbstractContextManager): """Context manager for a new TestReport. See usage example in __init__.py.""" @@ -48,6 +106,7 @@ class ReportContext(AbstractContextManager): step_number_at_depth: dict[int, int] open_step_results: dict[str, bool] any_failures: bool + _import_proc: subprocess.Popen | None = None def __init__( self, @@ -57,6 +116,7 @@ def __init__( system_operator: str | None = None, test_case: str | None = None, log_file: str | Path | bool | None = None, + include_git_metadata: bool = False, ): """Initialize a new report context. @@ -68,6 +128,7 @@ def __init__( test_case: The name of the test case. Will default to the basename of the file containing the test if not provided. log_file: If True, create a temp log file. If a path, use that path. All create/update operations will be logged to this file. + include_git_metadata: If True, include git metadata in the report. """ self.client = client self.step_is_open = False @@ -97,10 +158,33 @@ def __init__( end_time=datetime.now(timezone.utc), status=TestStatus.IN_PROGRESS, system_operator=system_operator, + metadata=_git_metadata() if include_git_metadata else None, # type: ignore ) self.report = client.test_results.create(create, log_file=self.log_file) + def _open_import_proc(self): + """Open a subprocess to import the log file.""" + with _quiet_fork_stderr(): + self._import_proc = subprocess.Popen( + [ + "import-test-result-log", + "--incremental", + str(self.log_file), + "--grpc-url", + self.client.grpc_client._config.uri, + "--rest-url", + self.client.rest_client._config.base_url, + "--api-key", + self.client.grpc_client._config.api_key, + ], + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + def __enter__(self): + if self.log_file: + self._open_import_proc() return self def __exit__(self, exc_type, exc_value, traceback): @@ -112,19 +196,15 @@ def __exit__(self, exc_type, exc_value, traceback): else: update["status"] = TestStatus.PASSED self.report.update(update, log_file=self.log_file) - if self.log_file: + + if self._import_proc is not None: try: - # Try replaying the log file and clean up the file if it's a temporary file. - self.client.test_results.import_log_file(self.log_file) - fp = os.path.abspath(self.log_file) - tmp_dir = tempfile.gettempdir() - if fp.startswith(tmp_dir): - os.remove(fp) - except Exception as e: - logger.error(e) - logger.error( - f"Error replaying log file: {self.log_file}.\n Can replay with `import-test-result-log {self.log_file}`." - ) + self._import_proc.communicate(timeout=1) + except subprocess.TimeoutExpired: + logger.error("Import process did not exit in 10s, killing it") + self._import_proc.kill() + self._import_proc.wait() + log_replay_instructions(self.log_file) raise return True diff --git a/python/lib/sift_client/util/test_results/pytest_util.py b/python/lib/sift_client/util/test_results/pytest_util.py index afd6e34b7..ae22fa122 100644 --- a/python/lib/sift_client/util/test_results/pytest_util.py +++ b/python/lib/sift_client/util/test_results/pytest_util.py @@ -16,6 +16,49 @@ REPORT_CONTEXT: ReportContext | None = None +def pytest_addoption(parser: pytest.Parser) -> None: + """Register Sift-specific command-line options.""" + parser.addoption( + "--sift-test-results-log-file", + default=None, + help="Path to write the Sift test result log file. " + "Use 'true' (default) to auto-create a temp file, " + "False, 'false', or 'none' to disable logging, " + "or a file path to write to a specific location.", + ) + parser.addoption( + "--no-sift-test-results-git-metadata", + action="store_false", + dest="sift_test_results_git_metadata", + default=True, + help="Exclude git metadata from the Sift test results. " + "Git metadata (repo, branch, commit) is included by default.", + ) + parser.addoption( + "--sift-test-results-check-connection", + action="store_true", + default=False, + help="Skip the sift test-result fixtures (report_context, step, module_substep) " + "when the Sift client has no connection to the server. Requires a " + "`client_has_connection` fixture to be available in the test session.", + ) + + +def _resolve_log_file(pytestconfig: pytest.Config | None) -> str | Path | bool | None: + """Determine log_file value from --sift-test-results-log-file option.""" + raw = None + if pytestconfig is not None: + raw = pytestconfig.getoption("--sift-test-results-log-file", default=None) + if raw is None: + return True + lower = str(raw).lower() + if lower in ("true", "1"): + return True + if lower in ("false", "none"): + return None + return Path(raw) + + @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo[Any]): """You should import this hook to capture any AssertionErrors that occur during the test. If not included, any assert failures in a test will not automatically fail the step.""" @@ -34,7 +77,7 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo[Any]): def _report_context_impl( sift_client: SiftClient, request: pytest.FixtureRequest, - log_file: str | Path | bool | None = True, + pytestconfig: pytest.Config | None = None, ) -> Generator[ReportContext | None, None, None]: test_path = Path(request.config.invocation_params.args[0]) base_name = ( @@ -43,11 +86,18 @@ def _report_context_impl( else "pytest " + " ".join(request.config.invocation_params.args) ) test_case = test_path if test_path.exists() else base_name + log_file = _resolve_log_file(pytestconfig) + include_git_metadata = ( + bool(pytestconfig.getoption("sift_test_results_git_metadata", default=True)) + if pytestconfig + else True + ) with ReportContext( sift_client, name=f"{base_name} {datetime.now(timezone.utc).isoformat()}", test_case=str(test_case), log_file=log_file, + include_git_metadata=include_git_metadata, ) as context: # Set a global so we can access this in pytest hooks. global REPORT_CONTEXT @@ -55,20 +105,35 @@ def _report_context_impl( yield context +def _check_connection_enabled(pytestconfig: pytest.Config | None) -> bool: + """Return True when the caller opted into `--sift-test-results-check-connection`.""" + if pytestconfig is None: + return False + return bool(pytestconfig.getoption("sift_test_results_check_connection", default=False)) + + +def _has_sift_connection(request: pytest.FixtureRequest) -> bool: + """Resolve the `client_has_connection` fixture lazily; only called when the check is enabled.""" + return bool(request.getfixturevalue("client_has_connection")) + + @pytest.fixture(scope="session", autouse=True) def report_context( - sift_client: SiftClient, request: pytest.FixtureRequest + sift_client: SiftClient, request: pytest.FixtureRequest, pytestconfig: pytest.Config ) -> Generator[ReportContext | None, None, None]: - """Create a report context for the session.""" - yield from _report_context_impl(sift_client, request, log_file=True) + """Create a report context for the session. + The log file destination is controlled by ``--sift-test-results-log-file``. + Defaults to a temp file when not set. -@pytest.fixture(scope="session", autouse=True) -def report_context_no_logging( - sift_client: SiftClient, request: pytest.FixtureRequest -) -> Generator[ReportContext | None, None, None]: - """Create a report context for the session with logging disabled.""" - yield from _report_context_impl(sift_client, request, log_file=None) + When ``--sift-test-results-check-connection`` is passed, this fixture will no-op + (yield None) if the Sift client has no connection to the server. That mode + requires a ``client_has_connection`` fixture to be available in the session. + """ + if _check_connection_enabled(pytestconfig) and not _has_sift_connection(request): + yield None + return + yield from _report_context_impl(sift_client, request, pytestconfig=pytestconfig) def _step_impl( @@ -90,17 +155,39 @@ def _step_impl( @pytest.fixture(autouse=True) def step( - report_context: ReportContext, request: pytest.FixtureRequest + report_context: ReportContext | None, + request: pytest.FixtureRequest, + pytestconfig: pytest.Config, ) -> Generator[NewStep | None, None, None]: - """Create an outer step for the function.""" + """Create an outer step for the function. + + No-ops when ``--sift-test-results-check-connection`` is set and the client + has no connection (or when the session-scoped ``report_context`` resolved to None). + """ + if report_context is None or ( + _check_connection_enabled(pytestconfig) and not _has_sift_connection(request) + ): + yield None + return yield from _step_impl(report_context, request) @pytest.fixture(scope="module", autouse=True) def module_substep( - report_context: ReportContext, request: pytest.FixtureRequest + report_context: ReportContext | None, + request: pytest.FixtureRequest, + pytestconfig: pytest.Config, ) -> Generator[NewStep | None, None, None]: - """Create a step per module.""" + """Create a step per module. + + No-ops when ``--sift-test-results-check-connection`` is set and the client + has no connection (or when the session-scoped ``report_context`` resolved to None). + """ + if report_context is None or ( + _check_connection_enabled(pytestconfig) and not _has_sift_connection(request) + ): + yield None + return yield from _step_impl(report_context, request) @@ -108,7 +195,8 @@ def module_substep( def client_has_connection(sift_client): """Check if the SiftClient has a connection to the Sift server. - Can be used to skip tests that require a connection to the Sift server. + Can be used to skip tests that require a connection to the Sift server, and is + consulted by the Sift fixtures when ``--sift-test-results-check-connection`` is set. """ has_connection = False try: @@ -117,42 +205,3 @@ def client_has_connection(sift_client): except Exception: has_connection = False return has_connection - - -######################################################## -# The following fixtures will conditionally create a report if the client has a connection to the Sift server. -# If you want to use these, you must also import or implement the client_has_connection fixture. -######################################################## - - -@pytest.fixture(scope="session", autouse=True) -def report_context_check_connection( - sift_client: SiftClient, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[ReportContext | None, None, None]: - """Create a report context for the session. Doesn't run if the client has no connection to the Sift server.""" - if client_has_connection: - yield from _report_context_impl(sift_client, request) - else: - yield None - - -@pytest.fixture(autouse=True) -def step_check_connection( - report_context: ReportContext, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[NewStep | None, None, None]: - """Create an outer step for the function. Doesn't run if the client has no connection to the Sift server.""" - if client_has_connection: - yield from _step_impl(report_context, request) - else: - yield None - - -@pytest.fixture(scope="module", autouse=True) -def module_substep_check_connection( - report_context: ReportContext, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[NewStep | None, None, None]: - """Create a step per module. Doesn't run if the client has no connection to the Sift server.""" - if client_has_connection: - yield from _step_impl(report_context, request) - else: - yield None