Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,57 +60,77 @@ class KyuubiBatchService(
}

override def start(): Unit = {
val UNINITIALIZED_BATCH_ID = "UNINITIALIZED_BATCH_ID"
assert(running.compareAndSet(false, true))
val submitTask: Runnable = () => {
restFrontend.waitForServerStarted()
while (running.get) {
metadataManager.pickBatchForSubmitting(kyuubiInstance) match {
case None => Thread.sleep(1000)
case Some(metadata) =>
val batchId = metadata.identifier
info(s"$batchId is picked for submission.")
val batchSession = sessionManager.createBatchSession(
metadata.username,
"anonymous",
metadata.ipAddress,
metadata.requestConf,
metadata.engineType,
Option(metadata.requestName),
metadata.resource,
metadata.className,
metadata.requestArgs,
Some(metadata),
fromRecovery = false)
sessionManager.openBatchSession(batchSession)
var submitted = false
while (!submitted) { // block until batch job submitted
submitted = metadataManager.getBatchSessionMetadata(batchId) match {
case Some(metadata) if OperationState.isTerminal(metadata.opState) =>
true
case Some(metadata) if metadata.opState == OperationState.RUNNING =>
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
// app that is pending in resource manager while the local startup
// process is alive. For example, in Spark YARN cluster mode, if set
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
// process exits immediately once Application goes ACCEPTED status,
// even no resource could be allocated for the AM container.
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
}
case Some(_) =>
false
case None =>
error(s"$batchId does not existed in metastore, assume it is finished")
true
var batchId = UNINITIALIZED_BATCH_ID
try {
metadataManager.pickBatchForSubmitting(kyuubiInstance) match {
case None => Thread.sleep(1000)
case Some(metadata) =>
batchId = metadata.identifier
info(s"$batchId is picked for submission.")
val batchSession = sessionManager.createBatchSession(
metadata.username,
"anonymous",
metadata.ipAddress,
metadata.requestConf,
metadata.engineType,
Option(metadata.requestName),
metadata.resource,
metadata.className,
metadata.requestArgs,
Some(metadata),
fromRecovery = false)
sessionManager.openBatchSession(batchSession)
var submitted = false
while (!submitted) { // block until batch job submitted
submitted = metadataManager.getBatchSessionMetadata(batchId) match {
case Some(metadata) if OperationState.isTerminal(metadata.opState) =>
true
case Some(metadata) if metadata.opState == OperationState.RUNNING =>
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
// app that is pending in resource manager while the local startup
// process is alive. For example, in Spark YARN cluster mode, if set
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
// process exits immediately once Application goes ACCEPTED status,
// even no resource could be allocated for the AM container.
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
}
case Some(_) =>
false
case None =>
error(s"$batchId does not existed in metastore, assume it is finished")
Comment thread
oh0873 marked this conversation as resolved.
Outdated
true
}
if (!submitted) Thread.sleep(1000)
}
info(s"$batchId is submitted or finished.")
}
} catch {
// If the batch session failed to open, reinitialize the batch state to ERROR
// This can be due to a DB error or batch_connection_limits exceeded
case e: Exception =>
if (batchId == UNINITIALIZED_BATCH_ID) {
error(s"Error picking batch for submission", e)
} else {
error(s"Error opening batch session for $batchId", e)
try {
metadataManager.failScheduledBatch(batchId)
} catch {
case ex: Exception =>
error(s"Unable to modify metadata for $batchId to ERROR", ex)
}
if (!submitted) Thread.sleep(1000)
}
info(s"$batchId is submitted or finished.")
Thread.sleep(1000)
}
Comment thread
oh0873 marked this conversation as resolved.
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ class MetadataManager extends AbstractService("MetadataManager") {
_metadataStore.transformMetadataState(batchId, "INITIALIZED", "CANCELED")
}

def failScheduledBatch(batchId: String): Boolean = {
_metadataStore.transformMetadataState(batchId, "PENDING", "ERROR")
}

def getBatchesRecoveryMetadata(
state: String,
kyuubiInstance: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
targetState: String): Boolean = {
val query = s"UPDATE $METADATA_TABLE SET state = ? WHERE identifier = ? AND state = ?"
JdbcUtils.withConnection { connection =>
withUpdateCount(connection, query, fromState, identifier, targetState) { updateCount =>
withUpdateCount(connection, query, targetState, identifier, fromState) { updateCount =>
updateCount == 1
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.server.KyuubiRestFrontendService
import org.apache.kyuubi.server.http.util.HttpAuthUtils.{basicAuthorizationHeader, AUTHORIZATION_HEADER}
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.authentication.{AnonymousAuthenticationProviderImpl, AuthUtils}
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle, SessionType}
Expand Down Expand Up @@ -72,6 +73,72 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase {
Utils.tryLogNonFatalError { sessionManager.closeSession(session.handle) }
}
}

test("KyuubiBatchService catch block when openBatchSession fails during metadata update") {
val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
val realMetadataManager = sessionManager.metadataManager.get

val wrapperMetadataManager = new MetadataManager {
override def updateMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
throw new RuntimeException("test metadata update failure")
}
override def insertMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
realMetadataManager.insertMetadata(metadata, asyncRetryOnError)
}
override def getBatch(batchId: String) = realMetadataManager.getBatch(batchId)
}
wrapperMetadataManager.initialize(sessionManager.getConf)
Comment thread
oh0873 marked this conversation as resolved.
Outdated

val originalMetadataManager = sessionManager.metadataManager
try {
sessionManager.metadataManager = Some(wrapperMetadataManager)

val requestObj = newSparkBatchRequest(Map("spark.master" -> "local"))
val response = webTarget.path("api/v1/batches")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
assert(response.getStatus == 200)
val batch = response.readEntity(classOf[Batch])
val batchId = batch.getId
assert(batch.getState === "INITIALIZED")

eventually(timeout(15.seconds), interval(1.second)) {
val batchInfoResponse = webTarget.path(s"api/v1/batches/$batchId")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
assert(batchInfoResponse.getStatus == 200)
val batchInfo = batchInfoResponse.readEntity(classOf[Batch])
assert(
batchInfo.getState === "INITIALIZED",
"Batch should remain INITIALIZED after openBatchSession failed and catch block ran " +
"(failScheduledBatch only transitions PENDING->ERROR)")
Comment thread
oh0873 marked this conversation as resolved.
Outdated
}

sessionManager.metadataManager = originalMetadataManager

val requestObj2 = newSparkBatchRequest(Map("spark.master" -> "local"))
val response2 = webTarget.path("api/v1/batches")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.post(Entity.entity(requestObj2, MediaType.APPLICATION_JSON_TYPE))
assert(response2.getStatus == 200)
val batch2Id = response2.readEntity(classOf[Batch]).getId
eventually(timeout(30.seconds), interval(1.second)) {
val batch2InfoResponse = webTarget.path(s"api/v1/batches/$batch2Id")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
val batch2Info = batch2InfoResponse.readEntity(classOf[Batch])
assert(
batch2Info.getState === "PENDING" || batch2Info.getState === "RUNNING",
"Second batch should be processed (batch service still running after catch)")
}
} finally {
sessionManager.metadataManager = originalMetadataManager
}
}
}

abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
Expand Down
Loading