Last Updated:

How to work with callbacks and threads in Kotlin

callbacks and threads in Kotlin

Basic asynchronous programming tools in Kotlin. We analyze the features of callbacks and threads using code examples.

Development-based asynchronous describes the occurrence of events that are independent of the main thread of the application and how to interact with them without being blocked for waiting.

In programming languages without built-in support for asynchrony, it is implemented using two patterns: callbacks and future or promise constructs. Callbacks are basic primitives, and asynchronous programming relies on them.future

For example, the Java 5 type does not support waiting for completion without locking. You use only the method and wait. However, Java 8 added an extended CompletableFuture type with the whenComplete method, which sets a callback to wait for execution without locking, which is suitable for asynchronous programming.Futureget

Kotlin corutins support asynchronousness, give pause without blocking. They also integrate with asynchronous libraries in the JVM through callbacks.

One-time callback

 

Consider a hypothetical interface with a method for performing an operation asynchronously. It takes a callback as a parameter to report completion with either the result value or the error:Operation

interface Operation<T> {
    fun performAsync(callback: (T?, Throwable?) -> Unit)
}

Let's define a suspending extension function in Kotlin to perform an operation without blocking using from the standard library:suspendCoroutine

suspend fun <T> Operation<T>.perform(): T =
     suspendCoroutine { continuation ->
         performAsync { value, exception ->
             when {
                 exception != null -> // operation error
                     continuation.resumeWithException(exception)
                 else -> // success, there is a value
                     continuation.resume(value as T)
             }
         }
     }

Note that this is a cold source of values. It is idle before it starts and after it returns because it waits for the task to complete through a callback.perform

Canceled operation

 

Engineering practice encourages the provision of some undo functionality in an asynchronous API. For example, in add a method for this purpose:Operationcancel

interfaceOperation<T> {
     fun performAsync(callback: (T?, Throwable?) -> Unit)
     fun cancel() // cancels the current operation
}

Now let's define the suspend function with the use from the library as canceled:performsuspendCancellableCoroutinekotlinx.coroutines

suspend fun <T> Operation<T>.perform(): T =
     suspendCancellableCoroutine { continuation ->
         performAsync { /* ... as before ... */ }
         continuation.invokeOnCancellation { cancel() }
     }

Multiple callbacks

But what if it sends an asynchronous stream of values and uses the specified callback more than once? We need a signal about its completion. For our example, suppose this happens by running a callback with a value of .Operationnull

Do not use such with features like that, so as not to get when you try to resume work a second time. Pausing and resuming execution in Kotlin are one-time.OperationsuspendCoroutineIllegalStateException

Stream Kotlin rushes to the rescue. It was designed to represent a cold asynchronous stream with multiple values. Use the callbackFlow function to convert a multiple callback to a stream:

fun <T : Any> Operation<T>.perform(): Flow<T> =
     callbackFlow {
         performAsync { value, exception ->
             when {
                 exception != null -> // operation error
                     close(exception)
                 value == null -> // operation succeeded
                     close()
                 else -> // there is a value
                     offer(value as T)
             }
         }
         awaitClose { cancel() }
     }

Take a look at a number of differences. So, the suspend function is no longer available. By itself, nothing awaits. And returns cold. The code inside the block is not run until the stream is collected by the calling function of the final operation.performFlowcallbackFlow {...}

As before, it installs a callback, but now instead of Continuation we work with a hot SendChannel, which is open to receive data. Therefore, the offer function is called for each value, and close is called for an error or success message. Here replaces and performs the function of suspending the block inside at the time when the data arrives.performAsyncawaitCloseinvokeOnCancellationcallbackFlow

Back pressure

What happens if it delivers values to a callback function faster than the collecting corotene processes? The back pressure that constantly occurs during operations with asynchronous data streams comes into play. The buffer stores values, but when it overflows, it returns , and they disappear. See how to avoid or control losses.performAsyncoffersfalse

Replace with . In this case, the thread triggering the callback is blocked when the buffer overflows until there is space in it. This is a typical way to signal backpressure in most legacy threading APIs based on callbacks. And it ensures that no meaning is lost.offer(value)sendBlocking(value)

With a limited amount of data, the rate of receipt will be lower. Then use the operator to configure the unlimited buffer size by adding a call after . In this case, it returns , so the values do not disappear anywhere, and there is no lock. However, we run the risk of running out of memory with data from the buffer.buffer.buffer(Channel.UNLIMITED)callbackFlow {...}offertrue

Union

Often, the flow of values is a partial result of an operation or a state update, so that only the last value is interested. Therefore, it is safe to combine data using an operator in the result stream that ensures that it returns , and the collector sees the last value even when the intermediate value is discarded (joined).

conflateoffertrue

Reactive flows

When the original asynchronous data source is a reactive stream that conforms to the specification, use the built-in extension function from the module to convert the type to Kotlin. Don't reinvent the wheel.

PublisherFlowPublisher.asFlowkotlinx-coroutines-reactive