diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index 38bb999c342..db996af96fc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -60,57 +60,97 @@ class KyuubiBatchService( } override def start(): Unit = { + val UNINITIALIZED_BATCH_ID = "UNINITIALIZED_BATCH_ID" assert(running.compareAndSet(false, true)) val submitTask: Runnable = () => { restFrontend.waitForServerStarted() while (running.get) { - metadataManager.pickBatchForSubmitting(kyuubiInstance) match { - case None => Thread.sleep(1000) - case Some(metadata) => - val batchId = metadata.identifier - info(s"$batchId is picked for submission.") - val batchSession = sessionManager.createBatchSession( - metadata.username, - "anonymous", - metadata.ipAddress, - metadata.requestConf, - metadata.engineType, - Option(metadata.requestName), - metadata.resource, - metadata.className, - metadata.requestArgs, - Some(metadata), - fromRecovery = false) - sessionManager.openBatchSession(batchSession) - var submitted = false - while (!submitted) { // block until batch job submitted - submitted = metadataManager.getBatchSessionMetadata(batchId) match { - case Some(metadata) if OperationState.isTerminal(metadata.opState) => - true - case Some(metadata) if metadata.opState == OperationState.RUNNING => - metadata.appState match { - // app that is not submitted to resource manager - case None | Some(ApplicationState.NOT_FOUND) => false - // app that is pending in resource manager while the local startup - // process is alive. For example, in Spark YARN cluster mode, if set - // spark.yarn.submit.waitAppCompletion=false, the local spark-submit - // process exits immediately once Application goes ACCEPTED status, - // even no resource could be allocated for the AM container. - case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive => - false - // not sure, added for safe - case Some(ApplicationState.UNKNOWN) => false - case _ => true - } - case Some(_) => - false - case None => - error(s"$batchId does not existed in metastore, assume it is finished") - true + var batchId = UNINITIALIZED_BATCH_ID + try { + metadataManager.pickBatchForSubmitting(kyuubiInstance) match { + case None => Thread.sleep(1000) + case Some(metadata) => + batchId = metadata.identifier + info(s"$batchId is picked for submission.") + val batchSession = sessionManager.createBatchSession( + metadata.username, + "anonymous", + metadata.ipAddress, + metadata.requestConf, + metadata.engineType, + Option(metadata.requestName), + metadata.resource, + metadata.className, + metadata.requestArgs, + Some(metadata), + fromRecovery = false) + sessionManager.openBatchSession(batchSession) + var submitted = false + while (!submitted) { // block until batch job submitted + submitted = metadataManager.getBatchSessionMetadata(batchId) match { + case Some(metadata) if OperationState.isTerminal(metadata.opState) => + true + case Some(metadata) if metadata.opState == OperationState.RUNNING => + metadata.appState match { + // app that is not submitted to resource manager + case None | Some(ApplicationState.NOT_FOUND) => false + // app that is pending in resource manager while the local startup + // process is alive. For example, in Spark YARN cluster mode, if set + // spark.yarn.submit.waitAppCompletion=false, the local spark-submit + // process exits immediately once Application goes ACCEPTED status, + // even no resource could be allocated for the AM container. + case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive => + false + // not sure, added for safe + case Some(ApplicationState.UNKNOWN) => false + case _ => true + } + case Some(_) => + false + case None => + error(s"$batchId does not exist in metastore, assume it is finished") + true + } + if (!submitted) Thread.sleep(1000) + } + info(s"$batchId is submitted or finished.") + } + } catch { + case e: InterruptedException => + if (batchId == UNINITIALIZED_BATCH_ID) { + error(s"Interrupted while picking batch for submission", e) + } else { + error(s"Interrupted while opening batch session for $batchId", e) + try { + metadataManager.failScheduledBatch(batchId) + } catch { + case ex: Exception => + error( + s"Unable to modify metadata for $batchId to ERROR; " + + "an administrator may need to reset the batch state manually.", + ex) + } + } + throw e + // If the batch session failed to open, reinitialize the batch state to ERROR + // This can be due to a DB error or batch_connection_limits exceeded + case e: Exception => + if (batchId == UNINITIALIZED_BATCH_ID) { + error(s"Error picking batch for submission", e) + } else { + error(s"Error opening batch session for $batchId", e) + try { + metadataManager.failScheduledBatch(batchId) + } catch { + case ex: Exception => + error( + s"Unable to modify metadata for $batchId to ERROR; " + + "an administrator may need to reset the batch state manually.", + ex) } - if (!submitted) Thread.sleep(1000) } - info(s"$batchId is submitted or finished.") + // sleep 1 second to avoid excessive retries during transient network/DB failures + Thread.sleep(1000) } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala index c5182979ee7..85495c11ecd 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala @@ -181,6 +181,10 @@ class MetadataManager extends AbstractService("MetadataManager") { _metadataStore.transformMetadataState(batchId, "INITIALIZED", "CANCELED") } + def failScheduledBatch(batchId: String): Boolean = { + _metadataStore.transformMetadataState(batchId, "PENDING", "ERROR") + } + def getBatchesRecoveryMetadata( state: String, kyuubiInstance: String, diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index b7e83cd563d..a9ee8fa9569 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -235,7 +235,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { targetState: String): Boolean = { val query = s"UPDATE $METADATA_TABLE SET state = ? WHERE identifier = ? AND state = ?" JdbcUtils.withConnection { connection => - withUpdateCount(connection, query, fromState, identifier, targetState) { updateCount => + withUpdateCount(connection, query, targetState, identifier, fromState) { updateCount => updateCount == 1 } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index f836f6f1572..abba4addfd5 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -44,6 +44,7 @@ import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState} import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.server.KyuubiRestFrontendService import org.apache.kyuubi.server.http.util.HttpAuthUtils.{basicAuthorizationHeader, AUTHORIZATION_HEADER} +import org.apache.kyuubi.server.metadata.MetadataManager import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} import org.apache.kyuubi.service.authentication.{AnonymousAuthenticationProviderImpl, AuthUtils} import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle, SessionType} @@ -72,6 +73,71 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase { Utils.tryLogNonFatalError { sessionManager.closeSession(session.handle) } } } + + test("KyuubiBatchService catch block when openBatchSession fails during metadata update") { + val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager] + val realMetadataManager = sessionManager.metadataManager.get + + val wrapperMetadataManager = new MetadataManager { + override def updateMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = { + throw new RuntimeException("test metadata update failure") + } + override def insertMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = { + realMetadataManager.insertMetadata(metadata, asyncRetryOnError) + } + override def getBatch(batchId: String) = realMetadataManager.getBatch(batchId) + } + + val originalMetadataManager = sessionManager.metadataManager + try { + sessionManager.metadataManager = Some(wrapperMetadataManager) + + val requestObj = newSparkBatchRequest(Map("spark.master" -> "local")) + val response = webTarget.path("api/v1/batches") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous")) + .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE)) + assert(response.getStatus == 200) + val batch = response.readEntity(classOf[Batch]) + val batchId = batch.getId + assert(batch.getState === "INITIALIZED") + + eventually(timeout(15.seconds), interval(1.second)) { + val batchInfoResponse = webTarget.path(s"api/v1/batches/$batchId") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous")) + .get() + assert(batchInfoResponse.getStatus == 200) + val batchInfo = batchInfoResponse.readEntity(classOf[Batch]) + assert( + batchInfo.getState === "ERROR", + "Batch should eventually become ERROR after being picked and failed by the " + + "catch path, rather than remaining stuck in PENDING") + } + + sessionManager.metadataManager = originalMetadataManager + + val requestObj2 = newSparkBatchRequest(Map("spark.master" -> "local")) + val response2 = webTarget.path("api/v1/batches") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous")) + .post(Entity.entity(requestObj2, MediaType.APPLICATION_JSON_TYPE)) + assert(response2.getStatus == 200) + val batch2Id = response2.readEntity(classOf[Batch]).getId + eventually(timeout(30.seconds), interval(1.second)) { + val batch2InfoResponse = webTarget.path(s"api/v1/batches/$batch2Id") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous")) + .get() + val batch2Info = batch2InfoResponse.readEntity(classOf[Batch]) + assert( + batch2Info.getState === "PENDING" || batch2Info.getState === "RUNNING", + "Second batch should be processed (batch service still running after catch)") + } + } finally { + sessionManager.metadataManager = originalMetadataManager + } + } } abstract class BatchesResourceSuiteBase extends KyuubiFunSuite