Skip to content

Commit

Permalink
Bulk Load CDK: Exception Handler for TaskRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Sep 26, 2024
1 parent 477bcc4 commit 4bbfa4e
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import io.airbyte.cdk.write.StreamLoader
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.DefaultImplementation
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Provider
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -182,6 +183,13 @@ class DefaultDestinationTaskLauncher(
override suspend fun handleTeardownComplete() {
stop()
}

override suspend fun handleException(t: Throwable) {
log.error(t) { "Task error: $t" }
stop()
// TODO: Execute stream cleanup and destination teardown tasks.
throw t
}
}

@Factory
Expand All @@ -200,6 +208,7 @@ class DestinationTaskLauncherFactory(
private val teardownTaskFactory: TeardownTaskFactory
) : Provider<DestinationTaskLauncher> {
@Singleton
@Secondary
override fun get(): DestinationTaskLauncher {
return DefaultDestinationTaskLauncher(
catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ interface TaskLauncher {
suspend fun stop() {
taskRunner.close()
}
suspend fun handleException(t: Throwable): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch

/**
Expand All @@ -31,11 +31,19 @@ class TaskRunner {
}

suspend fun run() = coroutineScope {
queue.consumeAsFlow().collect { task ->
launch {
log.info { "Executing task: $task" }
task.execute()
try {
queue.receiveAsFlow().collect { task ->
launch {
log.info { "Executing task: $task" }
task.execute()
}
}
} catch (t: Throwable) {
log.error { "Exception in task runner, flushing queue: $t" }
queue.receiveAsFlow().collect { task ->
log.info { "Flushing task $task due to caught exception" }
}
throw t
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.airbyte.cdk.message.DestinationMessage
import io.airbyte.cdk.message.MessageQueueWriter
import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.io.InputStream
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -55,6 +56,7 @@ interface DeserializingInputStreamConsumer<T : Any> : InputConsumer<T> {
}

@Singleton
@Secondary
class DefaultInputConsumer(
override val inputStream: InputStream,
override val deserializer: Deserializer<DestinationMessage>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
package io.airbyte.cdk.write

import io.airbyte.cdk.Operation
import io.airbyte.cdk.message.DestinationMessage
import io.airbyte.cdk.task.TaskLauncher
import io.airbyte.cdk.task.TaskRunner
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Secondary
Expand All @@ -23,18 +23,29 @@ import kotlinx.coroutines.runBlocking
@Singleton
@Requires(property = Operation.PROPERTY, value = "write")
class WriteOperation(
private val inputConsumer: InputConsumer<DestinationMessage>,
private val inputConsumer: InputConsumer<*>,
private val taskLauncher: TaskLauncher,
private val taskRunner: TaskRunner
) : Operation {
val log = KotlinLogging.logger {}

override fun execute() {
runBlocking {
launch { inputConsumer.run() }

launch { taskLauncher.start() }

launch { taskRunner.run() }
launch {
try {
taskRunner.run()
} catch (t: Throwable) {
launch { taskRunner.run() }
taskLauncher.handleException(t)
}
}
}

// TODO: Catch outer exception and pass to fancy handler.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ class MockTaskLauncher(override val taskRunner: TaskRunner) : DestinationTaskLau
override suspend fun start() {
throw NotImplementedError()
}

override suspend fun handleException(t: Throwable) {
throw NotImplementedError()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.write

import io.airbyte.cdk.Operation
import io.airbyte.cdk.task.Task
import io.airbyte.cdk.task.TaskLauncher
import io.airbyte.cdk.task.TaskRunner
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject
import jakarta.inject.Singleton
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

@MicronautTest(environments = ["WriteOperationTest"])
@Property(name = Operation.PROPERTY, value = "write")
class WriteOperationTest {
@Inject lateinit var writeOperation: WriteOperation

@Singleton
@Requires(env = ["WriteOperationTest"])
class NoopInputConsumer : InputConsumer<String> {
override suspend fun run() {
// Do nothing
}
}

@Singleton
@Requires(env = ["WriteOperationTest"])
class SimpleTaskLauncher(override val taskRunner: TaskRunner) : TaskLauncher {
val handledException = CompletableDeferred<Boolean>()

override suspend fun start() {
taskRunner.enqueue(
object : Task {
override suspend fun execute() {
throw RuntimeException("Task failed.")
}
}
)
}

override suspend fun handleException(t: Throwable) {
// Do this with a task to verify that the
// exception workflow can still run tasks.
taskRunner.enqueue(
object : Task {
override suspend fun execute() {
handledException.complete(true)
}
}
)
}
}

@Test
fun testExceptionHandling(simpleTaskLauncher: SimpleTaskLauncher) = runTest {
withContext(Dispatchers.IO) {
launch { writeOperation.execute() }
Assertions.assertTrue(simpleTaskLauncher.handledException.await())
simpleTaskLauncher.stop()
}
}
}

0 comments on commit 4bbfa4e

Please sign in to comment.