Understanding Structured Concurrency with Java and Kotlin by Bored Dev | October, 2022

Discover how to run multiple programs — safe and easy

Photo by Deepak Rautela on Unsplash

You might have heard about structured concurrency lately and wondered what exactly it is. Structured concurrency is a paradigm that seeks to make concurrent programs easier to read and understand, faster to write, and above all, secure.

Before we get into the details, why is this needed? Let us know about the reasons behind this.

To understand the need for structured concurrency, we have to look back in time and see how concurrent programs were written a few years ago.

In an unstructured concurrency paradigm, we start threads somewhere in our code, and it is not clear where this thread starts or ends.

We can see the below example in Java; Despite being easier to understand than other older examples in Java, it still has one major problem: it is not safe, because the failure of one thread does not cancel the other tasks and the results are unpredictable.

If we run the code above, we’ll see how task number 3 triggers an exception immediately, and even though this happens, none of the other tasks are closed. The application continues, and other tasks complete successfully after a few seconds. It’s not what we expect. Ideally, we want other tasks to be canceled immediately. Imagining one of the tasks runs forever, we can leak a thread every time something fails.

> Task :UnstructuredConcurrency.main() FAILED
I'm task 3 running on ForkJoinPool.commonPool-worker-3
I'm task 2 running on ForkJoinPool.commonPool-worker-2
I'm task 1 running on ForkJoinPool.commonPool-worker-1
Deprecated Gradle features were used in this build, making it incompatible with Gradle 8.0.You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins.See https://docs.gradle.org/7.4/userguide/command_line_interface.html#sec:command_line_warnings
2 actionable tasks: 1 executed, 1 up-to-date
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: Something went wrong!

What is the solution then? This is where structured concurrency comes into play. Let’s see how

The main goals of structured concurrency are two: being able to write more understandable concurrent code and avoiding thread leaks. The idea is always to guarantee that when the control flow of our code is split into multiple concurrent threads, we make sure that at the end of this flow, we always merge these threads and control that how they are executed. With structured concurrency, we ensure that no orphan threads in our application are spiraling out of control.

Unsplash. photo by Ellen Pham on

This means that the beginning and end of our concurrent code are always clear. In this way, any developer can easily understand the concurrent code independently from their experience or expertise in concurrent programming. Simplicity always wins.

Several programming languages ​​have introduced structured concurrency in recent years – Golang, Kotlin and Python are among them. We are going to see how Kotlin, in particular, achieves structured concurrency.

Kotlin implements structured concurrency using coroutine scope. Coroutines are the name Kotlin gives to its “virtual threads”, units of execution that require an OS thread to work but are not tied to a given OS thread. They can wait for IO without blocking a given OS thread, and another coroutine can take over the same OS thread during that time. You can read more about Kotlin coroutines here.

kotlin logo

The idea is that a coroutine scope always belongs to an entity with a limited lifetime, for example, a heavily scheduled batch job using thousands of threads to achieve high throughput.

When that task terminates, as its scope should also end, all coroutines or virtual threads associated with it should no longer be alive.

There is a GlobalScope available in Kotlin, however its use is not recommended. Why? Because global scope will persist throughout the lifetime of an application, in most cases, this is not what we want, as it will lead to leaks from multiple threads.

The only valid use case of global scope is when the lifetime of an entity is the same as that of the application or service. For example, a background task that repeats its execution forever. Let’s imagine that we are implementing a distributed system that requires running background jobs in each node to send periodic heartbeats to the rest of the nodes in the system as long as the application is running.

If we want to do this in Kotlin, we have to run a thread with GlobalScope.launch, By doing this, we are creating a (virtual) thread that will stop only when the application has been asked to shut down.

As a general rule, you must always define a scope for your threads. It doesn’t matter if you are running one or more threads. Doing so guarantees that if something goes wrong with your application in that scope, all child threads in that scope are gracefully canceled, and you won’t have any thread leaks no matter what. !

Let’s look at an example in Kotlin to understand the behavior we just described.

In the above example, we run two background threads in the given scope. The result of running this code would be something like this:

Adding random number to 10
Adding random number to 1
Task completed with result 13232!
Process finished with exit code 0

Job2 It prints its value earlier due to the longer delay than what we have specified to simulate the IO delay.

To prove that this code is safe when something goes wrong, we will throw an exception in the second coroutine when the first thread waits for one second. In the earlier example, using CompletableFuture, when an exception was raised, the other threads continued as if nothing went wrong. Later in this example, this will not happen. We will abort any threads running in that scope. The following is the code used to simulate what we just described:

If you check the second job, we raise an exception after 500ms. at that point, job1 Should still wait an additional 500ms and then print something. However, if things are implemented correctly, we should throw an exception, and the runtime should immediately abort all running threads. This is exactly what happens when we run this code:

Exception in thread "main" java.lang.RuntimeException: Something went wrong!
at org.theboreddev.CoroutinesKt$sumNumbersToRandomNumbers$2$job2$1.invokeSuspend(Coroutines.kt:27)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)

Feel free to try it yourself and play around to see if it works as expected.

We have seen how Kotlin allows us to group coroutines within a single scope and how it automatically handles any threads within that scope. But what about Java? Can we do something similar in Java?

The short answer is no, or at least not yet. The JDK team has been working on some changes to the Java Concurrency Model for a long time. This project is called Project Loom. The changes implemented in this work will introduce virtual threads and structured concurrency to Java. With regard to structured concurrency, it is included in Java 19 as part of JEP 428, but it is still in the “incubator” stage. Virtual threads are introduced as a preview feature in Java 19 and part of JEP 425. You need to enable Preview to be able to test it.

When these changes stabilize, our understanding of the Java concurrency model and multithreading in Java will change for the better.

The existing limitation in Java that links a Java thread to an OS thread will no longer exist, and more robust, reliable and performant systems will be enabled. This is a huge step forward in the Java ecosystem!

The code below gives an example of the new proposal for structured concurrency in Java. You will be able to see that this is a completely new paradigm in Java.

Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());
scope.join(); // Join both forks
scope.throwIfFailed(); // ... and propagate errors
// Here, both forks have succeeded, so compose their results
return new Response(user.resultNow(), order.resultNow());
}
}

We’re not going to go into more detail about Java virtual threads in this article, but I promise we’ll talk more about them soon. What we’ve seen so far looks very promising, but we’ll leave it here as a short teaser so you can start getting interested in the concurrency model of the future in Java!

We have seen how some languages ​​implement virtual threads to create more flexible systems where an OS thread can be shared among hundreds or thousands of virtual threads, as virtual threads are much lighter than existing Java threads. . Even Java is now migrating to this new paradigm which makes concurrent code easier to understand and write. Very soon, there will be no need to write error-prone concurrent code where we will have to consider the many possible situations where our code may execute incorrectly!

If you’re interested in how Golang implements concurrency, you can start by looking at “goroutines” here.

That’s all from us today! We really hope you enjoyed this article and have a better understanding of what structured concurrency is and why we need it.

As always, if you like our articles, stay tuned for more.

Thanks for reading!

Leave a Comment