Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,57 +60,97 @@ class KyuubiBatchService(
}

override def start(): Unit = {
val UNINITIALIZED_BATCH_ID = "UNINITIALIZED_BATCH_ID"
assert(running.compareAndSet(false, true))
val submitTask: Runnable = () => {
restFrontend.waitForServerStarted()
while (running.get) {
metadataManager.pickBatchForSubmitting(kyuubiInstance) match {
case None => Thread.sleep(1000)
case Some(metadata) =>
val batchId = metadata.identifier
info(s"$batchId is picked for submission.")
val batchSession = sessionManager.createBatchSession(
metadata.username,
"anonymous",
metadata.ipAddress,
metadata.requestConf,
metadata.engineType,
Option(metadata.requestName),
metadata.resource,
metadata.className,
metadata.requestArgs,
Some(metadata),
fromRecovery = false)
sessionManager.openBatchSession(batchSession)
var submitted = false
while (!submitted) { // block until batch job submitted
submitted = metadataManager.getBatchSessionMetadata(batchId) match {
case Some(metadata) if OperationState.isTerminal(metadata.opState) =>
true
case Some(metadata) if metadata.opState == OperationState.RUNNING =>
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
// app that is pending in resource manager while the local startup
// process is alive. For example, in Spark YARN cluster mode, if set
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
// process exits immediately once Application goes ACCEPTED status,
// even no resource could be allocated for the AM container.
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
}
case Some(_) =>
false
case None =>
error(s"$batchId does not existed in metastore, assume it is finished")
true
var batchId = UNINITIALIZED_BATCH_ID
try {
metadataManager.pickBatchForSubmitting(kyuubiInstance) match {
case None => Thread.sleep(1000)
case Some(metadata) =>
batchId = metadata.identifier
info(s"$batchId is picked for submission.")
val batchSession = sessionManager.createBatchSession(
metadata.username,
"anonymous",
metadata.ipAddress,
metadata.requestConf,
metadata.engineType,
Option(metadata.requestName),
metadata.resource,
metadata.className,
metadata.requestArgs,
Some(metadata),
fromRecovery = false)
sessionManager.openBatchSession(batchSession)
var submitted = false
while (!submitted) { // block until batch job submitted
submitted = metadataManager.getBatchSessionMetadata(batchId) match {
case Some(metadata) if OperationState.isTerminal(metadata.opState) =>
true
case Some(metadata) if metadata.opState == OperationState.RUNNING =>
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
// app that is pending in resource manager while the local startup
// process is alive. For example, in Spark YARN cluster mode, if set
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
// process exits immediately once Application goes ACCEPTED status,
// even no resource could be allocated for the AM container.
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
}
case Some(_) =>
false
case None =>
error(s"$batchId does not exist in metastore, assume it is finished")
true
}
if (!submitted) Thread.sleep(1000)
}
info(s"$batchId is submitted or finished.")
}
} catch {
case e: InterruptedException =>
if (batchId == UNINITIALIZED_BATCH_ID) {
error(s"Interrupted while picking batch for submission", e)
} else {
error(s"Interrupted while opening batch session for $batchId", e)
try {
metadataManager.failScheduledBatch(batchId)
} catch {
case ex: Exception =>
error(
s"Unable to modify metadata for $batchId to ERROR; " +
"an administrator may need to reset the batch state manually.",
ex)
}
}
throw e
// If the batch session failed to open, reinitialize the batch state to ERROR
// This can be due to a DB error or batch_connection_limits exceeded
case e: Exception =>
if (batchId == UNINITIALIZED_BATCH_ID) {
error(s"Error picking batch for submission", e)
} else {
error(s"Error opening batch session for $batchId", e)
try {
metadataManager.failScheduledBatch(batchId)
} catch {
case ex: Exception =>
error(
s"Unable to modify metadata for $batchId to ERROR; " +
"an administrator may need to reset the batch state manually.",
ex)
}
if (!submitted) Thread.sleep(1000)
}
info(s"$batchId is submitted or finished.")
// sleep 1 second to avoid excessive retries during transient network/DB failures
Thread.sleep(1000)
}
Comment thread
oh0873 marked this conversation as resolved.
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ class MetadataManager extends AbstractService("MetadataManager") {
_metadataStore.transformMetadataState(batchId, "INITIALIZED", "CANCELED")
}

def failScheduledBatch(batchId: String): Boolean = {
_metadataStore.transformMetadataState(batchId, "PENDING", "ERROR")
}

def getBatchesRecoveryMetadata(
state: String,
kyuubiInstance: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
targetState: String): Boolean = {
val query = s"UPDATE $METADATA_TABLE SET state = ? WHERE identifier = ? AND state = ?"
JdbcUtils.withConnection { connection =>
withUpdateCount(connection, query, fromState, identifier, targetState) { updateCount =>
withUpdateCount(connection, query, targetState, identifier, fromState) { updateCount =>
updateCount == 1
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.server.KyuubiRestFrontendService
import org.apache.kyuubi.server.http.util.HttpAuthUtils.{basicAuthorizationHeader, AUTHORIZATION_HEADER}
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.authentication.{AnonymousAuthenticationProviderImpl, AuthUtils}
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle, SessionType}
Expand Down Expand Up @@ -72,6 +73,71 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase {
Utils.tryLogNonFatalError { sessionManager.closeSession(session.handle) }
}
}

test("KyuubiBatchService catch block when openBatchSession fails during metadata update") {
val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
val realMetadataManager = sessionManager.metadataManager.get

val wrapperMetadataManager = new MetadataManager {
override def updateMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
throw new RuntimeException("test metadata update failure")
}
override def insertMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
realMetadataManager.insertMetadata(metadata, asyncRetryOnError)
}
override def getBatch(batchId: String) = realMetadataManager.getBatch(batchId)
}

val originalMetadataManager = sessionManager.metadataManager
try {
sessionManager.metadataManager = Some(wrapperMetadataManager)

val requestObj = newSparkBatchRequest(Map("spark.master" -> "local"))
val response = webTarget.path("api/v1/batches")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
assert(response.getStatus == 200)
val batch = response.readEntity(classOf[Batch])
val batchId = batch.getId
assert(batch.getState === "INITIALIZED")

eventually(timeout(15.seconds), interval(1.second)) {
val batchInfoResponse = webTarget.path(s"api/v1/batches/$batchId")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
assert(batchInfoResponse.getStatus == 200)
val batchInfo = batchInfoResponse.readEntity(classOf[Batch])
assert(
batchInfo.getState === "ERROR",
"Batch should eventually become ERROR after being picked and failed by the " +
"catch path, rather than remaining stuck in PENDING")
}

sessionManager.metadataManager = originalMetadataManager

val requestObj2 = newSparkBatchRequest(Map("spark.master" -> "local"))
val response2 = webTarget.path("api/v1/batches")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.post(Entity.entity(requestObj2, MediaType.APPLICATION_JSON_TYPE))
assert(response2.getStatus == 200)
val batch2Id = response2.readEntity(classOf[Batch]).getId
eventually(timeout(30.seconds), interval(1.second)) {
val batch2InfoResponse = webTarget.path(s"api/v1/batches/$batch2Id")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
val batch2Info = batch2InfoResponse.readEntity(classOf[Batch])
assert(
batch2Info.getState === "PENDING" || batch2Info.getState === "RUNNING",
"Second batch should be processed (batch service still running after catch)")
}
} finally {
sessionManager.metadataManager = originalMetadataManager
}
}
}

abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
Expand Down
Loading