diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7571644..23ee452 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,4 +30,4 @@ jobs: run: sbt -v assembly - name: Test shell: bash - run: target/scala-2.13/exist-xqts-runner-assembly-*-SNAPSHOT.jar --xqts-version HEAD --test-set fn-current-date + run: java -jar target/scala-2.13/exist-xqts-runner-assembly-*-SNAPSHOT.jar --xqts-version HEAD --test-set fn-current-date diff --git a/build.sbt b/build.sbt index abe9c5f..0add55b 100644 --- a/build.sbt +++ b/build.sbt @@ -65,7 +65,7 @@ libraryDependencies ++= { "org.parboiled" %% "parboiled" % "2.5.1", "org.apache.ant" % "ant-junit" % "1.10.15", // used for formatting junit style report - "net.sf.saxon" % "Saxon-HE" % "9.9.1-8", + "net.sf.saxon" % "Saxon-HE" % "12.5", "org.exist-db" % "exist-core" % existV changing(), "org.exist-db" % "exist-expath" % existV changing(), "org.xmlunit" % "xmlunit-core" % "2.11.0", @@ -77,10 +77,16 @@ libraryDependencies ++= { autoAPIMappings := true -// we prefer Saxon over Xalan +// Exclude transitive dependencies the runner doesn't need. +// Jetty exclusions allow building against both Jetty 11 (develop) and Jetty 12 (next) — +// Ivy can't resolve Jetty 12 Maven POM constructs, and the runner doesn't use Jetty anyway. excludeDependencies ++= Seq( ExclusionRule("xalan", "xalan"), - ExclusionRule("org.eclipse.jetty.toolchain", "jetty-jakarta-servlet-api"), + ExclusionRule("org.eclipse.jetty"), + ExclusionRule("org.eclipse.jetty.toolchain"), + ExclusionRule("org.eclipse.jetty.websocket"), + ExclusionRule("org.eclipse.jetty.ee10"), + ExclusionRule("org.eclipse.jetty.ee10.websocket"), ExclusionRule("org.hamcrest", "hamcrest-core"), ExclusionRule("org.hamcrest", "hamcrest-library") @@ -149,7 +155,10 @@ assembly / assemblyMergeStrategy := { // make the assembly executable with basic shell scripts import sbtassembly.AssemblyPlugin.defaultUniversalScript -assemblyPrependShellScript := Some(defaultUniversalScript(shebang = false)) +// Skip prepend script in CI — the prepended shell script can corrupt the ZIP +// central directory offsets on certain platforms, causing "An unexpected error +// occurred while trying to open file" from the Java launcher. +assemblyPrependShellScript := (if (sys.env.contains("CI")) None else Some(defaultUniversalScript(shebang = false))) // Add assembly to publish step diff --git a/run-batched.sh b/run-batched.sh new file mode 100755 index 0000000..0afef8e --- /dev/null +++ b/run-batched.sh @@ -0,0 +1,410 @@ +#!/usr/bin/env bash +# +# Batch XQTS Runner — runs the exist-xqts-runner JAR in batches to avoid OOM. +# +# Each batch runs in a fresh JVM, so thread pool / BrokerPool leaks are +# cleaned up between batches. JUnit XML results accumulate in a single +# output directory across batches. +# +# Usage: +# ./run-batched.sh [OPTIONS] +# +# Options: +# --xqts-version VERSION 3.1, HEAD, QT4, or FTTS (default: QT4) +# --batch-size N test sets per batch (default: 50) +# --heap SIZE JVM heap size (default: 4g) +# --timeout SECS per-batch timeout in seconds (default: 180) +# --output-dir DIR output directory (default: target) +# --test-set-pattern PAT regex filter for test set names +# --exclude-test-set SETS comma-separated test sets to exclude +# --enable-feature FEATS comma-separated features to enable +# --parallel N run N batch streams in parallel (default: 1) +# --resume skip test sets that already have result XML +# --dry-run print batches without running +# -- remaining args passed through to runner JAR +# +# Examples: +# ./run-batched.sh --xqts-version QT4 --batch-size 40 --heap 6g +# ./run-batched.sh --xqts-version 3.1 --resume +# ./run-batched.sh --xqts-version QT4 --test-set-pattern 'fn-.*' --batch-size 30 + +set -euo pipefail + +# === Defaults === +XQTS_VERSION="QT4" +BATCH_SIZE=50 +HEAP="4g" +BATCH_TIMEOUT=300 +OUTPUT_DIR="target" +TEST_SET_PATTERN="" +EXCLUDE_TEST_SETS="" +ENABLE_FEATURES="" +PARALLEL=1 +RESUME=false +DRY_RUN=false +EXTRA_ARGS=() +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +JAR="$SCRIPT_DIR/exist-xqts-runner-assembly-2.0.0-SNAPSHOT.jar" +JAVA_HOME="${JAVA_HOME:-/Users/wicentowskijc/.asdf/installs/java/zulu-21.38.21}" + +# === Parse args === +while [[ $# -gt 0 ]]; do + case "$1" in + --xqts-version) XQTS_VERSION="$2"; shift 2 ;; + --batch-size) BATCH_SIZE="$2"; shift 2 ;; + --heap) HEAP="$2"; shift 2 ;; + --timeout) BATCH_TIMEOUT="$2"; shift 2 ;; + --output-dir) OUTPUT_DIR="$2"; shift 2 ;; + --test-set-pattern) TEST_SET_PATTERN="$2"; shift 2 ;; + --exclude-test-set) EXCLUDE_TEST_SETS="$2"; shift 2 ;; + --enable-feature) ENABLE_FEATURES="$2"; shift 2 ;; + --parallel) PARALLEL="$2"; shift 2 ;; + --resume) RESUME=true; shift ;; + --dry-run) DRY_RUN=true; shift ;; + --) shift; EXTRA_ARGS+=("$@"); break ;; + *) EXTRA_ARGS+=("$1"); shift ;; + esac +done + +# === Resolve catalog === +case "$XQTS_VERSION" in + 3.1) CATALOG="$SCRIPT_DIR/work/QT3_1_0/catalog.xml" ;; + HEAD) CATALOG="$SCRIPT_DIR/work/qt3tests-master/catalog.xml" ;; + QT4) CATALOG="$SCRIPT_DIR/work/qt4tests-master/catalog.xml" ;; + FTTS) CATALOG="$SCRIPT_DIR/work/XQFTTS_1_0_4/XQFTTSCatalog.xml" ;; + *) echo "ERROR: Unknown XQTS version: $XQTS_VERSION"; exit 1 ;; +esac + +if [[ ! -f "$CATALOG" ]]; then + echo "ERROR: Catalog not found: $CATALOG" + echo "Run the JAR once with no test sets to trigger download, or check work/ dir." + exit 1 +fi + +if [[ ! -f "$JAR" ]]; then + echo "ERROR: Runner JAR not found: $JAR" + exit 1 +fi + +# === Extract test set names from catalog === +if [[ "$XQTS_VERSION" == "FTTS" ]]; then + # XQFTTS uses a different catalog format + ALL_SETS=$(grep ' "$batch_log" 2>&1 & + local batch_pid=$! + + # Monitor: if still running near timeout, capture jstack + ( + sleep "$jstack_delay" + if kill -0 $batch_pid 2>/dev/null; then + echo " Batch $batch_num approaching timeout — capturing thread dump..." + local java_pid + java_pid=$(pgrep -P $batch_pid java 2>/dev/null | head -1 || true) + if [[ -n "$java_pid" ]]; then + "$JAVA_HOME/bin/jstack" "$java_pid" > "$jstack_file" 2>&1 || true + echo " Thread dump saved to $jstack_file" + fi + fi + ) & + local monitor_pid=$! + + # Wait for the batch to complete (or timeout) + exit_code=0 + wait $batch_pid 2>/dev/null || exit_code=$? + + # Clean up monitor and any lingering Java processes + kill $monitor_pid 2>/dev/null || true + wait $monitor_pid 2>/dev/null || true + + tail -20 "$batch_log" 2>/dev/null || true + rm -f "$batch_log" 2>/dev/null || true + + # Kill any lingering Java processes from this batch (BrokerPool shutdown hangs) + pkill -9 -f "exist.home=$exist_home" 2>/dev/null || true + sleep 1 + rm -rf "$exist_home" 2>/dev/null || true + + batch_end=$(date +%s) + batch_elapsed=$((batch_end - batch_start)) + + if [[ $exit_code -eq 124 || $exit_code -eq 137 ]]; then + echo " WARNING: Batch $batch_num TIMED OUT after ${BATCH_TIMEOUT}s (exit $exit_code) [stream $stream_id]" + if [[ -f "$jstack_file" ]]; then + echo " Thread dump: $jstack_file" + fi + return 1 + elif [[ $exit_code -gt 1 && $exit_code -ne 255 ]]; then + echo " WARNING: Batch $batch_num crashed with code $exit_code (${batch_elapsed}s) [stream $stream_id]" + return 1 + else + # exit 0 = all tests passed, exit 1 = some test failures (normal), exit 255 = runner error (non-fatal) + echo " Batch $batch_num completed in ${batch_elapsed}s (exit $exit_code) [stream $stream_id]" + fi + return 0 +} + +# === Run a stream of batches sequentially === +# Args: stream_id batch_indices... +# Writes failure count to /tmp/xqts-stream-failures-$stream_id +run_stream() { + local stream_id=$1; shift + local failures=0 + local indices=("$@") + + for batch_idx in "${indices[@]}"; do + local start_idx=$((batch_idx * BATCH_SIZE)) + local end_idx=$((start_idx + BATCH_SIZE)) + if (( end_idx > TOTAL )); then end_idx=$TOTAL; fi + local batch_num=$((batch_idx + 1)) + + run_batch "$batch_num" "$BATCHES" "$start_idx" "$end_idx" "$stream_id" || failures=$((failures + 1)) + echo "" + done + + echo "$failures" > "/tmp/xqts-stream-failures-$stream_id" +} + +# === Dispatch batches === +mkdir -p "$OUTPUT_DIR/junit/data" +START_TIME=$(date +%s) +FAILURES=0 + +if [[ "$PARALLEL" -le 1 ]]; then + # Sequential mode (original behavior) + for (( batch_idx=0; batch_idx TOTAL )); then local_end=$TOTAL; fi + + run_batch "$((batch_idx + 1))" "$BATCHES" "$local_start" "$local_end" "1" || FAILURES=$((FAILURES + 1)) + echo "" + done +else + # Parallel mode: distribute batches round-robin across streams + echo "Starting $PARALLEL parallel streams..." + echo "" + + # Build batch index arrays for each stream + declare -a STREAM_PIDS + for (( s=0; s> "$TIMING_LOG" + +# Count result files +if [[ -d "$OUTPUT_DIR/junit/data" ]]; then + RESULT_COUNT=$(ls "$OUTPUT_DIR/junit/data"/TEST-*.xml 2>/dev/null | wc -l | tr -d ' ') + echo "Results: $RESULT_COUNT XML files in $OUTPUT_DIR/junit/data/" + + # Quick aggregate: count pass/fail/error across all XML files + if command -v xmllint &>/dev/null && [[ $RESULT_COUNT -gt 0 ]]; then + TOTAL_TESTS=0 + TOTAL_FAILURES=0 + TOTAL_ERRORS=0 + TOTAL_SKIPPED=0 + for f in "$OUTPUT_DIR/junit/data"/TEST-*.xml; do + T=$(xmllint --xpath 'string(//testsuite/@tests)' "$f" 2>/dev/null || echo 0) + F=$(xmllint --xpath 'string(//testsuite/@failures)' "$f" 2>/dev/null || echo 0) + E=$(xmllint --xpath 'string(//testsuite/@errors)' "$f" 2>/dev/null || echo 0) + S=$(xmllint --xpath 'string(//testsuite/@skipped)' "$f" 2>/dev/null || echo 0) + TOTAL_TESTS=$((TOTAL_TESTS + T)) + TOTAL_FAILURES=$((TOTAL_FAILURES + F)) + TOTAL_ERRORS=$((TOTAL_ERRORS + E)) + TOTAL_SKIPPED=$((TOTAL_SKIPPED + S)) + done + PASSED=$((TOTAL_TESTS - TOTAL_FAILURES - TOTAL_ERRORS - TOTAL_SKIPPED)) + echo "" + echo "Aggregate: $TOTAL_TESTS tests, $PASSED passed, $TOTAL_FAILURES failed, $TOTAL_ERRORS errors, $TOTAL_SKIPPED skipped" + if [[ $TOTAL_TESTS -gt 0 ]]; then + PCT=$(echo "scale=1; $PASSED * 100 / $TOTAL_TESTS" | bc) + echo "Pass rate: ${PCT}% ($PASSED / $TOTAL_TESTS)" + fi + fi +fi + +# Per-test-set timing report (sorted by time, descending) +if [[ -d "$OUTPUT_DIR/junit/data" ]] && command -v python3 &>/dev/null; then + TIMING_REPORT="$OUTPUT_DIR/timing-report.txt" + python3 -c " +import xml.etree.ElementTree as ET, glob, sys +results = [] +for f in sorted(glob.glob('$OUTPUT_DIR/junit/data/TEST-*.xml')): + root = ET.parse(f).getroot() + name = root.get('name','').replace('XQTS_QT4.','').replace('XQTS_3_1.','').replace('XQTS_FTTS_1_0.','') + t = float(root.get('time','0')) + tests = int(root.get('tests','0')) + fails = int(root.get('failures','0')) + errs = int(root.get('errors','0')) + passed = tests - fails - errs - int(root.get('skipped','0')) + results.append((t, name, tests, passed, fails, errs)) +results.sort(reverse=True) +total_time = sum(r[0] for r in results) +print(f'Per-test-set timing report ({len(results)} sets, {total_time:.0f}s total)') +print(f'{\"Time\":>8} {\"Tests\":>6} {\"Pass\":>6} {\"Fail\":>5} {\"Err\":>4} Set') +for t, name, tests, p, f, e in results: + if t >= 1.0: + flag = ' !!!' if t > 60 else ' !' if t > 10 else '' + print(f'{t:>7.1f}s {tests:>6} {p:>6} {f:>5} {e:>4} {name}{flag}') +slow = [r for r in results if r[0] > 60] +if slow: + print(f'\n{len(slow)} test sets >60s — investigate for performance issues') +" 2>/dev/null | tee "$TIMING_REPORT" + echo "" + echo "Timing report saved to: $TIMING_REPORT" +fi + +# List test sets that were expected but produced no results (killed by timeout) +if [[ $FAILURES -gt 0 ]]; then + echo "" + echo "WARNING: $FAILURES batch(es) timed out or failed. Some test sets may have no results." +fi + +echo "" +echo "Done." diff --git a/src/main/scala/org/exist/xqts/runner/ExistServer.scala b/src/main/scala/org/exist/xqts/runner/ExistServer.scala index 91edf4d..dc3d235 100644 --- a/src/main/scala/org/exist/xqts/runner/ExistServer.scala +++ b/src/main/scala/org/exist/xqts/runner/ExistServer.scala @@ -372,10 +372,11 @@ class ExistConnection(brokerRes: Resource[IO, DBBroker], contextAttributesSuppli ): IO[Either[ExistServerException, Result]] = { IO.delay { try { - val resultSequence = xqueryService.execute(broker, compiledQuery.compiledXquery, contextSequence.orNull) - // Extract serialization properties from the query context (e.g. declare option output:method "json") + // Pass outputProperties to execute() so eXist extracts serialization + // options (e.g., declare option output:method "html") BEFORE calling + // context.reset(), which clears them. val serializationProps = new Properties() - compiledQuery.xqueryContext.checkOptions(serializationProps) + val resultSequence = xqueryService.execute(broker, compiledQuery.compiledXquery, contextSequence.orNull, serializationProps) val result = Result(resultSequence, compiledQuery.compilationTime, System.currentTimeMillis() - executionStartTime) result.serializationProperties = serializationProps Right(result) @@ -502,7 +503,20 @@ class ExistConnection(brokerRes: Resource[IO, DBBroker], contextAttributesSuppli context } - val source = new StringSource(query) + // If the query has no version declaration and we're running with QT4 + // (indicated by exist.xqts.default-version=4.0 system property), prepend + // "xquery version '4.0';" so XQ4 syntax (=!>, ->, etc.) is accepted. + // If the query has no version declaration and exist.xqts.default-version=4.0, + // prepend "xquery version '4.0';" so XQ4 syntax is accepted. + // Match version declaration even after leading comments. + val hasVersionDecl = query.contains("xquery version") || query.contains("module namespace") + val defaultVersion = System.getProperty("exist.xqts.default-version", "") + val effectiveQuery = if (!hasVersionDecl && defaultVersion == "4.0") { + "xquery version \"4.0\";\n" + query + } else { + query + } + val source = new StringSource(effectiveQuery) val fnConfigureContext: XQueryContext => XQueryContext = { ctx => val configured = setupContext(ctx)(staticBaseUri, availableDocuments, availableCollections, availableTextResources, namespaces, externalVariables, decimalFormats, modules, xpath1Compatibility) // Set global context attributes (e.g., ft.stopWordURIMap, ft.thesaurusURIMap from XQFTTS catalog) diff --git a/src/main/scala/org/exist/xqts/runner/TestCaseRunnerActor.scala b/src/main/scala/org/exist/xqts/runner/TestCaseRunnerActor.scala index 5e52772..4f6ad9a 100644 --- a/src/main/scala/org/exist/xqts/runner/TestCaseRunnerActor.scala +++ b/src/main/scala/org/exist/xqts/runner/TestCaseRunnerActor.scala @@ -1269,22 +1269,31 @@ class TestCaseRunnerActor(existServer: ExistServer, commonResourceCacheActor: Ac val serializationQuery = if (serializationProperties.isEmpty || !serializationProperties.containsKey(OutputKeys.METHOD)) { QUERY_ASSERT_XML_SERIALIZATION } else { - val method = serializationProperties.getProperty(OutputKeys.METHOD, "xml") - val indent = serializationProperties.getProperty(OutputKeys.INDENT, "no") + // Build a map with all serialization properties from the query context + val mapEntries = new StringBuilder() + val propNames = serializationProperties.propertyNames() + while (propNames.hasMoreElements) { + val key = propNames.nextElement().asInstanceOf[String] + val value = serializationProperties.getProperty(key) + if (mapEntries.nonEmpty) mapEntries.append(", ") + // Boolean-valued properties need xs:boolean, not string + val booleanProps = Set("indent", "omit-xml-declaration", "include-content-type", + "escape-uri-attributes", "undeclare-prefixes", "byte-order-mark", "allow-duplicate-names") + if (booleanProps.contains(key) && (value == "yes" || value == "no")) { + mapEntries.append(s"'$key': ${value == "yes"}") + } else { + mapEntries.append(s"'$key': '${value.replace("'", "''")}'") + } + } + // Always include omit-xml-declaration unless already set + if (!serializationProperties.containsKey("omit-xml-declaration")) { + if (mapEntries.nonEmpty) mapEntries.append(", ") + mapEntries.append("'omit-xml-declaration': true()") + } s""" - |xquery version "3.1"; - |declare namespace output = "http://www.w3.org/2010/xslt-xquery-serialization"; - | - |declare variable $$local:serialization := - | - | - | - | - | ; - | |declare variable $$result external; | - |fn:serialize($$result, $$local:serialization) + |fn:serialize($$result, map { $mapEntries }) |""".stripMargin } executeQueryWith$Result(connection, serializationQuery, true, None, actual, assertionBaseUri) match { @@ -1644,14 +1653,19 @@ class TestCaseRunnerActor(existServer: ExistServer, commonResourceCacheActor: Ac ErrorResult(testSetName, testCaseName, compilationTime, executionTime, t) case Right(expectedRegexStr) => + // Pass regex and flags as external variables to avoid eXist parser issues + // with special characters in backtick string constructors (e.g., new StringValue(actualStr), "regex" -> new StringValue(expectedRegexStr), "flags" -> new StringValue(flags.getOrElse("")))) match { case Left(existServerException) => ErrorResult(testSetName, testCaseName, compilationTime + existServerException.compilationTime, executionTime + existServerException.executionTime, existServerException) diff --git a/src/main/scala/org/exist/xqts/runner/XQTSRunnerActor.scala b/src/main/scala/org/exist/xqts/runner/XQTSRunnerActor.scala index 6256328..1c555f6 100644 --- a/src/main/scala/org/exist/xqts/runner/XQTSRunnerActor.scala +++ b/src/main/scala/org/exist/xqts/runner/XQTSRunnerActor.scala @@ -71,10 +71,11 @@ class XQTSRunnerActor(xmlParserBufferSize: Int, existServer: ExistServer, parser private var previousStats: Stats = Stats(0, (0, 0), (0, 0), 0) private var unchangedStatsTicks = 0; - /** Number of consecutive watchdog ticks with no progress before forcing shutdown. 10s tick x 60 = 600s stall timeout. */ - private val STALL_TIMEOUT_TICKS = 60 + /** Number of consecutive watchdog ticks with no progress before forcing shutdown. 10s tick x 6 = 60s stall timeout. */ + private val STALL_TIMEOUT_TICKS = 6 private var watchdogPreviousCompletedCount = 0 private var watchdogStalledTicks = 0 + private var startedTestCases: Map[TestSetRef, Set[String]] = Map.empty override def receive: Receive = { @@ -141,19 +142,17 @@ class XQTSRunnerActor(xmlParserBufferSize: Int, existServer: ExistServer, parser if (watchdogStalledTicks >= STALL_TIMEOUT_TICKS) { val totalCases = this.testCases.values.foldLeft(0)(_ + _.size) + // Identify which test cases started but never completed (hung tests) + val hungTests = for { + (testSetRef, started) <- startedTestCases + completed = completedTestCases.getOrElse(testSetRef, Map.empty).keySet + testCase <- started -- completed + } yield s"${testSetRef.name}/$testCase" logger.warn(s"Watchdog: no progress for ${STALL_TIMEOUT_TICKS * 10}s ($currentCompletedCount/$totalCases cases completed, ${unserializedTestSets.size} unserialized). Forcing shutdown.") - - // Serialize any completed but unsent test sets before shutting down - for { - (testSetRef, _) <- this.testCases - if isTestSetCompleted(testSetRef) && !unserializedTestSets.contains(testSetRef) - } { - completedTestCases.get(testSetRef).foreach { results => - resultsSerializerRouter ! TestSetResults(testSetRef, results.values.toSeq) - } + if (hungTests.nonEmpty) { + logger.warn(s"Hung test cases (started but never completed): ${hungTests.mkString(", ")}") } - - shutdown() + forceSerializeAndShutdown() } case ParseComplete(xqtsVersion, _, matchedTestSets) => @@ -171,7 +170,7 @@ class XQTSRunnerActor(xmlParserBufferSize: Int, existServer: ExistServer, parser unparsedTestSets -= testSetRef // have we completed testing an entire TestSet? NOTE: tests could have finished executing before parse complete message arrives! - if (isTestSetCompleted(testSetRef)) { + if (!unserializedTestSets.contains(testSetRef) && isTestSetCompleted(testSetRef)) { // serialize the TestSet results resultsSerializerRouter ! TestSetResults(testSetRef, completedTestCases(testSetRef).values.toSeq) unserializedTestSets += testSetRef @@ -180,16 +179,24 @@ class XQTSRunnerActor(xmlParserBufferSize: Int, existServer: ExistServer, parser case RunningTestCase(testSetRef, testCase) => logger.info(s"Starting execution of Test Case: ${testSetRef.name}/${testCase}...") testCases = addTestCase(testCases, testSetRef, testCase) + startedTestCases = addTestCase(startedTestCases, testSetRef, testCase) case RanTestCase(testSetRef, testResult) => logger.info(s"Finished execution of Test Case: ${testSetRef.name}/${testResult.testCase}.") completedTestCases = mergeTestCases(completedTestCases, testSetRef, testResult) // have we completed testing an entire TestSet? - if (isTestSetCompleted(testSetRef)) { + if (!unserializedTestSets.contains(testSetRef) && isTestSetCompleted(testSetRef)) { // serialize the TestSet results resultsSerializerRouter ! TestSetResults(testSetRef, completedTestCases(testSetRef).values.toSeq) unserializedTestSets += testSetRef + } else if (!unserializedTestSets.contains(testSetRef) && isTestSetCompletedByStarted(testSetRef)) { + // All started test cases completed, but ParsedTestSet hasn't been processed + // yet (still in unparsedTestSets). This happens when BrokerPool threads block + // the Pekko dispatcher, preventing the ParsedTestSet message from being delivered. + logger.info(s"Test set ${testSetRef.name} completed (all started cases finished, ParsedTestSet pending). Serializing results.") + resultsSerializerRouter ! TestSetResults(testSetRef, completedTestCases(testSetRef).values.toSeq) + unserializedTestSets += testSetRef } case SerializedTestSetResults(testSetRef) => @@ -205,26 +212,62 @@ class XQTSRunnerActor(xmlParserBufferSize: Int, existServer: ExistServer, parser shutdown() } + private def forceSerializeAndShutdown(): Unit = { + // Serialize any completed but unsent test sets before shutting down + for { + (testSetRef, _) <- this.testCases + if !unserializedTestSets.contains(testSetRef) + } { + completedTestCases.get(testSetRef).foreach { results => + resultsSerializerRouter ! TestSetResults(testSetRef, results.values.toSeq) + } + } + shutdown() + } + private def shutdown(): Unit = { timers.cancel(TimerWatchdogKey) if (logger.isDebugEnabled()) { timers.cancel(TimerStatsKey) } + // Hard deadline: force exit if actor system termination hangs. + // BrokerPool threads can block the Pekko dispatcher, preventing + // CoordinatedShutdown from completing. This standalone thread + // runs outside Pekko and forces JVM exit after 30 seconds. + logger.info("Starting 30-second shutdown deadline thread") + val deadline = new Thread(() => { + try { + Thread.sleep(30000) + logger.warn("Actor system shutdown did not complete within 30 seconds, forcing exit") + Runtime.getRuntime.halt(0) + } catch { + case _: InterruptedException => + logger.info("Shutdown deadline thread interrupted (clean exit)") + } + }, "xqts-shutdown-deadline") + deadline.setDaemon(true) + deadline.start() context.stop(self) context.system.terminate() } private def isTestSetCompleted(testSetRef: TestSetRef): Boolean = { unparsedTestSets.contains(testSetRef) == false && - completedTestCases.get(testSetRef).map(_.keySet) - .flatMap(completed => testCases.get(testSetRef).map(_ == completed)) - .getOrElse(false) + isTestSetCompletedByStarted(testSetRef) + } + + /** Check if all STARTED test cases have completed, ignoring ParsedTestSet status. */ + private def isTestSetCompletedByStarted(testSetRef: TestSetRef): Boolean = { + completedTestCases.get(testSetRef).map(_.keySet) + .flatMap(completed => startedTestCases.get(testSetRef).map(started => started.nonEmpty && started == completed)) + .getOrElse(false) } private def allTestSetsCompleted(): Boolean = { - unserializedTestSets.isEmpty && - unparsedTestSets.isEmpty && - !testCases.keySet.map(isTestSetCompleted(_)).contains(false) + unserializedTestSets.isEmpty && { + val testSetRefs = if (startedTestCases.nonEmpty) startedTestCases.keySet else testCases.keySet + testSetRefs.forall(ref => isTestSetCompleted(ref) || isTestSetCompletedByStarted(ref)) + } } @unused diff --git a/src/main/scala/org/exist/xqts/runner/qt3/XQTS3TestSetParserActor.scala b/src/main/scala/org/exist/xqts/runner/qt3/XQTS3TestSetParserActor.scala index 83d0547..8974f11 100644 --- a/src/main/scala/org/exist/xqts/runner/qt3/XQTS3TestSetParserActor.scala +++ b/src/main/scala/org/exist/xqts/runner/qt3/XQTS3TestSetParserActor.scala @@ -580,10 +580,7 @@ class XQTS3TestSetParserActor(xmlParserBufferSize: Int, testCaseRunnerActor: Act case END_ELEMENT if (asyncReader.getLocalName == ELEM_ASSERT_TRUE) => currentResult = currentResult.map(addAssertion(_)(AssertTrue)) - case END_ELEMENT if (asyncReader.getLocalName == ELEM_ALL_OF || asyncReader.getLocalName == ELEM_ANY_OF) => - currentResult = currentResult.map(stepOutAssertions) - - case END_ELEMENT if (asyncReader.getLocalName == ELEM_ALL_OF || asyncReader.getLocalName == ELEM_NOT) => + case END_ELEMENT if (asyncReader.getLocalName == ELEM_ALL_OF || asyncReader.getLocalName == ELEM_ANY_OF || asyncReader.getLocalName == ELEM_NOT) => currentResult = currentResult.map(stepOutAssertions) case START_ELEMENT if (currentResult.nonEmpty && asyncReader.getLocalName == ELEM_ERROR) => @@ -664,9 +661,21 @@ class XQTS3TestSetParserActor(xmlParserBufferSize: Int, testCaseRunnerActor: Act def addAssertion(currentAssertions: Stack[Result])(assertion: Result): Stack[Result] = { currentAssertions.peekOption match { - case Some(head) if (head.isInstanceOf[Assertions] && !assertion.isInstanceOf[Assertions]) => + case Some(head: Assertions) if (!assertion.isInstanceOf[Assertions]) => // head of the stack is itself a list of assertions, and the assertion to add is not a list of assertions - currentAssertions.replace(head.asInstanceOf[Assertions] :+ assertion) + // Check if the last element in the list is a Not(None) that needs filling + head.assertions.lastOption match { + case Some(Not(None)) => + // Fill the empty Not with this assertion + val updatedAssertions = head.assertions.init :+ Not(Some(assertion)) + val updatedHead = head match { + case AllOf(_) => AllOf(updatedAssertions) + case AnyOf(_) => AnyOf(updatedAssertions) + } + currentAssertions.replace(updatedHead) + case _ => + currentAssertions.replace(head :+ assertion) + } case Some(Not(None)) => // head of the stack is a Not assertion which is empty, so wrap this assertion in the Not assertion @@ -682,7 +691,8 @@ class XQTS3TestSetParserActor(xmlParserBufferSize: Int, testCaseRunnerActor: Act def stepOutAssertions(currentAssertions: Stack[Result]): Stack[Result] = { if (currentAssertions.size >= 2) { - if (currentAssertions.peek.isInstanceOf[Assertions]) { + val top = currentAssertions.peek + if (top.isInstanceOf[Assertions] || top.isInstanceOf[Not]) { val (prevHead, stack) = currentAssertions.pop() val head = stack.peek if (head.isInstanceOf[Assertions]) {