python(feat): Test results logging now happens in a subprocess while …#537
python(feat): Test results logging now happens in a subprocess while …#537
Conversation
73c7c28 to
a365d26
Compare
761e43a to
9408872
Compare
eb47287 to
cc1bbdd
Compare
| def _git_metadata() -> dict[str, str] | None: | ||
| """Return git branch and commit hash, or None if not in a git repo.""" | ||
| try: | ||
| branch = subprocess.check_output( |
There was a problem hiding this comment.
Perhaps check if we are in a git repo before running any of these. Probably should also verify that there aren't any modifications otherwise this information will be wrong/misleading.
There was a problem hiding this comment.
Was using the exception if not git repo
CalledProcessError: Command '['git', 'rev-parse', '--abbrev-ref', 'HEAD']' returned non-zero exit status 128.
but i can update the rev check to git describe --always --dirty --exclude="*" which will give like abc1234-dirty if modified
| parser.addoption( | ||
| "--sift-test-results-git-metadata", | ||
| action="store_true", | ||
| default=True, |
There was a problem hiding this comment.
default should be false otherwise this will not be disable-able
There was a problem hiding this comment.
mmk. going to flip it to maek it opt out
| print(f"Measurements: {len(result.measurements)}") | ||
| for m in result.measurements: | ||
| print(f" - {m.name}: passed={m.passed}") | ||
| if result: |
There was a problem hiding this comment.
duplicate print statement
| # Flatten the list of lists into a single list of strings | ||
| flat_options = [item for sublist in existing_options for item in sublist] | ||
| if not any("--sift-test-results-log-file" in name for name in flat_options): | ||
| parser.addoption("--sift-test-results-log-file", action="store_true", default="false") |
There was a problem hiding this comment.
default type should be boolean
| f"Error replaying log file: {self.log_file}.\n Can replay with `import-test-result-log {self.log_file}`." | ||
| ) | ||
| self._import_proc.communicate(timeout=10) | ||
| except subprocess.TimeoutExpired: |
There was a problem hiding this comment.
was it intentional to remove the previous exception handling?
| def _update_tracking(log_file: str | Path, tracking: LogTracking) -> None: | ||
| """Rewrite the LogTracking header (line 0) in place.""" | ||
| log_path = Path(log_file) | ||
| with open(log_path, "r+") as f: |
There was a problem hiding this comment.
since we have to rewrite the file each time, this could get very slow.
Instead, can we change how this is tracked?
- Separate file
- Appending to end and scan for last tracking checkpoint before resuming import?
There was a problem hiding this comment.
By keeping it at line 0, this is only rewriting specific line and not the whole file so performance is the same as having a single separate file w/ just that info.
| Args: | ||
| log_file: Path to the log file to replay. | ||
| log_file: Path to the log file to import. | ||
| incremental: (internal tooling) If True, goes line by line and calls every event vs. reading the entire file at once and sending resultant test report. |
There was a problem hiding this comment.
potentially misleading since our non-incremental approach is doing the same thing, but at once. We aren't just generating a report creation request
There was a problem hiding this comment.
ya hoping the docstrings help add context here. Do you have preferred terminology?
| # Incremental replay | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| async def _incremental_import_log_file(self, log_path: Path) -> ReplayResult: |
There was a problem hiding this comment.
Some findings from Claude:
Incremental replay re-uploads every entry on each tick
The incremental path is at _internal/low_level_wrappers/test_results.py:1091-1122 (_incremental_import_log_file) and the driver loop is at scripts/import_test_result_log.py:32-41 (_incremental_import_loop).
What the code does
Subprocess loop:
while True:
received_signal, _, _ = select.select([sys.stdin], [], [], 1.0)
result = client.test_results.import_log_file(log_file, incremental=True)
if received_signal:
break
Each tick calls import_log_file(log_file, incremental=True). Inside:
async def _incremental_import_log_file(self, log_path: Path) -> ReplayResult:
with open(log_path) as f:
first_line = f.readline()
tracking = LogTracking.from_log_line(first_line) if first_line else LogTracking()
id_map = tracking.id_map
state = _ReplayState()
for line_num, (request_type, response_id, json_str) in enumerate(
self._iter_log_data_lines(log_path), start=tracking.last_uploaded_line + 1
):
await self._import_entry(
request_type,
response_id,
json_str,
simulate=False,
id_map=id_map,
state=state,
)
tracking.last_uploaded_line = line_num
self._update_tracking(log_path, tracking)
And the iterator:
@staticmethod
def _iter_log_data_lines(log_path: Path):
line_pattern = re.compile(r"^[(\w+)(?::([^\]]+))?]\s*(.+)$")
with open(log_path) as f:
fcntl.flock(f, fcntl.LOCK_SH)
raw_lines = f.readlines()
for raw_line in raw_lines:
line = raw_line.strip()
if not line:
continue
match = line_pattern.match(line)
if not match:
raise ValueError(f"Invalid log line: {line}")
request_type = match.group(1)
if request_type == "LogTracking":
continue
yield (request_type, match.group(2), match.group(3))
The bug
_iter_log_data_lines always yields every data line in the file from the start. It skips only the LogTracking header, not already-processed entries. The enumerate(..., start=tracking.last_uploaded_line + 1) in _incremental_import_log_file only renumbers the iteration — it
does not skip anything.
So on tick N of the subprocess loop, if the file has M data lines, _import_entry is called with simulate=False for all M of them — even the M-1 that were already processed on tick N-1. simulate=False makes real gRPC calls (CreateTestReport, CreateTestStep, etc.), so every
tick creates N duplicate reports, N duplicate steps, N duplicate measurements on the server.
The id_map offers no protection either: each replay pass produces a fresh server-side ID (server generates new UUIDs), and those new IDs overwrite the old mapping in id_map, so the next tick's lookups resolve to whichever duplicate was created most recently.
Concretely, consider a test suite that writes 3 entries over its run:
┌──────┬────────────────┬───────────────────┬───────────────────────────────────────────────────────┐
│ Tick │ Entries in log │ What loop uploads │ Server state after tick │
├──────┼────────────────┼───────────────────┼───────────────────────────────────────────────────────┤
│ 1 │ 1 │ entry 1 │ 1 report │
├──────┼────────────────┼───────────────────┼───────────────────────────────────────────────────────┤
│ 2 │ 2 │ entries 1, 2 │ 2 reports (1 dup) + steps/measurements on the new dup │
├──────┼────────────────┼───────────────────┼───────────────────────────────────────────────────────┤
│ 3 │ 3 │ entries 1, 2, 3 │ 3 reports (2 dups) + cascading dup children │
└──────┴────────────────┴───────────────────┴───────────────────────────────────────────────────────┘
With a ~1-second tick and a test suite that runs for minutes, you get hundreds of duplicate reports.
What the code probably meant
Either:
- Have _iter_log_data_lines accept a skip_until argument and skip data lines whose ordinal (1-based across data lines only) is ≤ tracking.last_uploaded_line, or
- Track a byte offset on the LogTracking header and f.seek() past it instead of counting lines.
Option 2 is simpler and robust under concurrent append; option 1 is easier to retrofit into current structure
| ) | ||
|
|
||
| @staticmethod | ||
| def _log_request_to_file( |
There was a problem hiding this comment.
Another issue related to the tracking of checkpoint in the log file
From Claude:
File corruption race between test writer and import subprocess
The log file has two independent processes touching it:
- Writer: the test process, calling _log_request_to_file after every create/update.
- Reader/rewriter: the import-test-result-log --incremental subprocess, calling _iter_log_data_lines and _update_tracking.
The race
fcntl.flock is advisory and only enforced between processes that both call it. The writer does not call flock, so the exclusive lock held by _update_tracking does nothing to stop the writer.
Timeline of a collision:
- Log file has N lines. Writer is about to append line N+1.
- Subprocess enters _update_tracking. Acquires LOCK_EX (no-op re: writer). Calls readlines() → reads N lines into memory.
- Writer opens in append mode. In POSIX append mode, each write is atomic relative to the current end-of-file, so it appends line N+1 at the file's true EOF.
- Subprocess calls f.seek(0), f.writelines(lines) — writes the N lines it read back, starting at offset 0. In-memory copy did not include line N+1, so file now has: N lines (overwritten) + the appended line N+1 starting at offset sum(len(L) for L in lines[:N]).
- Subprocess calls f.truncate(). Truncates to current file position — which is end of line N. Line N+1 is destroyed.
Even without step 5 being destructive, step 3 + step 4 interleaved at the byte level can corrupt because writelines(lines) issues multiple write() calls, and the writer's write() can land between them — but here the "r+" handle's writes go to explicit offsets (after
seek(0)), while the "a" handle's writes always go to current EOF; Linux/Darwin guarantee append atomicity but nothing about atomicity against a concurrent in-place writer.
Also _iter_log_data_lines reads everything under LOCK_SH and releases the lock before yielding. So while the main replay loop is mid-iteration calling _update_tracking (LOCK_EX) on each entry, the snapshot raw_lines taken at the top is already stale — any entries appended
after the initial read are not processed until the next tick, which compounds bug 1.
|
|
||
|
|
||
| @dataclass | ||
| class LogTracking: |
There was a problem hiding this comment.
nit: move all of the log file code to another _internal namespace to keep the concerns separated between the wrappers (which are mainly for wrapping autogen gRPC) and this file handling
| "--sift-test-results-log-file", | ||
| default=None, | ||
| help="Path to write the Sift test result log file. " | ||
| "Use 'true' (default) to auto-create a temp file, " |
There was a problem hiding this comment.
suggestion: have another flag for disabling logging to keep the arg concerns separate: --sift-test-results-no-log or something?
There was a problem hiding this comment.
in that case then there are sort of two competing args. / with these being added via pyaddopts and not normal cmdline args not sure if that would get complicated. But can add if you feel strongly
| elif request_type == "CreateTestStep": | ||
| create_step_req = CreateTestStepRequest() | ||
| json_format.Parse(json_str, create_step_req) | ||
| create_step_req.test_step.test_report_id = _map_id( |
There was a problem hiding this comment.
nit: improve readability by creating a util function for each of these types
See test reports from CI in dev env