When I finally figured out Clojure.core.async

I’ve been playing with Clojure for a long time now but I’ve always had a hard time figuring out Clojure.async. Until it finally clicked!

In case you’re not familiar with Clojure.core.async, it’s a library for async programming in Clojure. (You: Really? I never would’ve guess it.)

Its main abstraction is the channel. We put messages into a channel and take messages out of it. A channel may have a buffer. If the channel doesn’t have a buffer or if the buffer is full, it won’t accept new messages. If we try to put a message into the channel, the calling thread will block until we take a message out of the channel making room in the buffer for the new message.

Conversely, if we try to take a message out of the channel and there’s none available, the calling thread will also block.

Another main component of Clojure.core.async is the go block. A go block executes code on a thread pool. A go block takes a block of code as its input and executes it while watching it for any channel blocking operations (put/take).

The code being executed by a go block in a given point in time is called a process. Therefore, a go block allows for a single thread (or thread pool) to execute multiple processes concurrently by interleaving them.

Whenever a go block encounters a blocking operation it will park it and release the actual thread of execution so it’s free to take on some other work. When the blocking operation completes, it will resume the execution of the code block on the thread pool.

Ok, enough theory, let me tell you what my question was and how I answered it.

So, what got me confused at first was the fact that Clojure.core.async has both blocking and non-blocking put and take calls (>!!, <!! and >!, <!) but at the same time a channel will always block any put operation if the channel’s buffer is full (or if no buffer is available) as well as block any take operation if there are no messages available.

Also, if the channel’s buffer is not full, it won’t block a put operation even if it’s a blocking one (>!!). Conversely, if the channel’s buffer is not empty, it won’t block a take operation even if it’s a blocking one (<!!).

So, what’s the point of having blocking and non-blocking operations if in the end the blocking behavior is defined by the channel itself?

In order to answer my question, let’s play with some Clojure.core.async code.

For the examples below I’m assuming we have Leiningen installed.

In order to start a repl with Clojure.core.async available and without having to create a project we can install the lein-try plugin.

Once we have everything installed you can run the following command to start a repl:

lein try org.clojure/core.async

Then run the following command to import the Clojure.core.async functions into your namespace:

(require '[clojure.core.async :refer :all])

We’ll get some warnings about functions being replaced but for the sake of this example it’s ok to ignore them.

Now we can finally start writing some Clojure.core.async code.

The following code snippet creates a channel:

(def a-channel (chan))

The channel above is unbuffered which means if we put something into it, it will block until “someone” reads from it.

The following code snippet puts a value into the channel and will block if the channel’s buffer is full (or doesn’t exist, which is the case here):

(>!! a-channel 1)

If you’re entering those commands in a repl it probably just got frozen. Sorry about that.

After restarting the repl, we can try the following (don’t forget to re-import the core.async functions):

(def b-channel (chan 1))

This time we created a channel with a buffer with size one. We can try putting a value into it and see what happens this time:

(>!! b-channel 1)

Yeah! No frozen repl this time! Let’s try it again:

(>!! b-channel 2)

Oh s@#$! But ok, it makes sense. We’ve created a channel with buffer size one and tried to put two values into it. Since we didn’t took the first value out it blocked because the buffer was already full.

Let’s see something cool now:

(def c-channel (chan))
(go (>!! c-channel 1) (do (println "\nPut 1")))

We’ve created another unbuffered channel and put a value into it like we did before. Then we used the do special form to print the message “Put 1”. Since c-channel is unbuffered, the call to >!! should block and we shouldn’t see the message being printed. Finally, we’ve wrapped both statements (>!! and do) within a go block so we don’t block our main thread.

As we saw before, a go block will execute the code block we pass to it asynchronously which will result in the calling thread not blocking (which means the repl won’t block, yeah!).

Now we can take a value our of the channel and see what happens:

(<!! c-channel)
1
Put 1

We can see in the output that the value that was put into the channel (integer one) was taken. This unblocks the call to >!! allowing the do statement (that was waiting for the >!! call to complete) to proceed and, finally, print the message.

So far we’ve covered the blocking >!! and <!! functions. What about the non-blocking versions >! and <! ? Let’s repeat the same example but with the non-blocking version of the put operation (>!) this time:

(def d-channel (chan))
(go (>! d-channel 1) (do (println "\nPut 1")))
(<!! d-channel)
1
Put 1

The code is pretty much the same and so is the result. That means that even using the non-blocking put operation the code block we pass to the go block is still blocking, waiting for us to take the value out of the channel so it could proceed to the do statement and print the message.

So, why is the non-blocking put operation actually blocking?

It turns out that non-blocking is a misleading term for the behavior of >! and <!.

Instead of blocking, >! and <! park the code block being executed. The end result might look the same but it’s actually really different. And it’s because >!! and <!! will block the execution thread while >! and <! will park the code block (process) being executed and free the thread to do some other work.

The difference becomes clearer if you compare the following code snippets below:

(def e-channel (chan))
(doseq [i (range 1000)] (go (>! e-channel i)))
(doseq [i (range 1000)] (println (<!! e-channel)))
(def f-channel (chan))
(doseq [i (range 1000)] (go (>!! f-channel i)))
(doseq [i (range 1000)] (println (<!! f-channel)))

The first code snippet will launch a thousand processes that will attempt to put a message into the channel and then will be parked until we start taking messages from it. The thread pool will remain free to take on some other work.

The end result is that we’ll see all integers from 0 up to 999 being printed to the console. The numbers are printed out of order since the processes run in parallel (eight at a time).

In the second code snippet, we’re starving the thread pool since we’re telling Clojure.core.async to block a thousand threads in the thread pool until we start taking messages from the channel. A given Clojure process has a single thread pool for all go blocks. By default, the thread pool contains a low number of threads (usually eight but it can be configured). The first 8 put operations will succeed since we have 8 threads available in the thread pool but the other 992 will fail since there are no threads left to process these operations.

The end result is that we’ll only see the integers from 0 up to 7 being printed to the console. The numbers are also printed out of order (because the 8 threads run in parallel) but we get the first as the result 8 since the put calls are blocking ones. Also, since we’re trying to take 1000 messages from the queue with a blocking operation and the channel only contains 8, the main thread will block (and so the repl, sorry).

So, the answer to the question “what’s the point of having blocking and non-blocking operations if in the end the blocking behavior is defined by the channel itself?” is the following:

The channel will always block when the buffer is full or non-existent when putting a new message into it and it will always block when there are no messages left in the channel when taking a message from it.

When we choose between blocking and non-blocking put and/or take operations what we’re really telling Clojure.core.async is how we want it to behave when a channel operation blocks.

Do we want it to block the entire thread or do we want it to park our code block and release the thread so it can work on something else?

There are cases where we want the former and other cases where we want the latter. It’s beyond the scope of this post to get into the details of each scenario.

For more on Clojure.core.async I recommend the following chapter of Clojure for the Brave and True and the Clojure.core.async‘s official documentation.

Let me know your thoughts on this post and feel free to point any mistakes I might have made.