Cats Effect Ref
As a newcomer to functional programming, I thought mutability was prohibited. Although I figured out how to use immutability in most situations, I found there were obvious cases where it doesn’t work such as modeling and updating in-memory state (e.g. in-memory caches and counters). I was wrong to think that functional programming mandated using only immutable values. Functional programming requires referential transparency, but as long as that holds, mutability is fine.
In some cases, in-memory state can be modeled using the state monad. It provides a way to represent state that is updated sequentially. If you have a case where state moves serially from one value to another by taking the previous state as an input, then you should look at the state monad. However, if your use case needs to update state concurrently rather than sequentially, look no further than Cats Effect’s Ref
(Additionally, you can look at Zio for another Ref
implementation if you prefer).
Ref: An Overview
Ref
is a mutable reference which:
- Is non-blocking
- Is accessed and modified concurrently
- Is modified atomically
- Is dealt with using only pure functions
Ref
gets its atomicity from Java’s AtomicReference
and its referential transparency by using Cats Effect’s Sync
type class. To show how these building blocks combine to create Ref
, I am going to walk through each of the major operations that are available on Ref:
of
get
set
update
modify
Note: I have changed some of the implementations below to simplify them for this post, but they are mostly the same. If you wish to see the real implementation of Ref
, check it out here.
Ref#of
The of
method is a constructor for Ref
.
Implementation
def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] =
F.delay(new Ref[F, A](new AtomicReference[A](a)))
This constructor creates a Ref
filled with an AtomicReference
containing an initial value a
. It does this within the F
context to maintain purity.
This operation is referentially transparent since it is composed in F
using delay
from the Sync
type class. No effects are executed by calling this function. It merely returns an effect F
with the ability to run at some future point and create the Ref
. The same is true for all of the functions within Ref
. They are pure because they are taking advantage of the delay behavior provided by the Sync
type class.
Example
val ref: IO[Ref[IO, Int]] = Ref.of[IO, Int](42)
Note: This is equivalent to Ref[IO].of(42)
because Ref
takes advantage of partially applied types.
Ref#get
This operation is the simplest of all operations on Ref
. It gets the value that the AtomicReference
currently contains.
Here, ar
is the AtomicReference
supplied within the of
constructor.
Implementation
def get: F[A] = F.delay(ar.get)
The get
operation on an AtomicReference
happens in a single machine instruction. This means that any number of threads can be performing a get
on an AtomicReference
at the same time without any contention. As the name AtomicReference
implies, thread-safety holds for all of its operations. This is because AtomicReference
cleverly performs each of its operations in a single machine instruction.
Another key thing to note about AtomicReference
is that it is backed by a volatile
variable. volatile
variables will be read from and written to the host computer’s main memory rather than a CPU cache. This means that even if our program is running on different CPUs it will behave as though it is running on a single CPU. For more on volatile variables, see here.
Example
val printRef = for {
ref <- Ref[IO].of(42)
contents <- ref.get
} yield println(contents)
printRef.unsafeRunSync()
The example above does the following:
- Create a
Ref
usingRef[IO].of(42)
- Acquire the contents of the
Ref
usingref.get
- Print the contents out to the console using
println(contents)
In this example, printRef
returns an IO[Unit]
which is an effect representing a computation that can be run at some future point in time. In this case (and in all of the examples of this post), the effect is run immediately by calling .unsafeRunSync()
. Typically, you don't call this explicitly within your programs, but rather use IOApp to run your programs.
Ref#set
This operation is structured the same as get
except we are performing a set
operation this time. This operation relies on the same volatile
and thread-safe nature of AtomicReference
.
Implementation
def set(a: A): F[Unit] = F.delay(ar.set(a))
Example
val printRef = for {
ref <- Ref[IO].of(42)
_ <- ref.set(21)
contents <- ref.get
} yield println(contents)
printRef.unsafeRunSync()
Ref#update
Having get
and set
operations is great, but they alone don’t allow us to atomically update the value contained inside of the reference. For example, if we wanted to take whatever value is stored in our Ref
and increment it by one, we might be tempted to do something like:
def getThenSet(ref: Ref[IO, Int]): IO[Unit] = {
ref.get.flatMap { contents =>
ref.set(contents + 1)
}
}
val printRef = for {
ref <- Ref[IO].of(42)
_ <- NonEmptyList.of(getThenSet(ref), getThenSet(ref)).parSequence
contents <- ref.get
} yield println(contents)
The issue here is that we are going to get inconsistent outputs when running this code. We have two separate processes working in parallel to perform a getThenSet
operation. Both of these processes may get
the value in the Ref
before the other process has a chance to increment the value using the set
operation. Below is an example of the execution of this program where each of the processes gets the value 42
when calling Ref#get
.
process1: Ref#get -> returns 42
process2: Ref#get -> returns 42
process1: Ref#set -> sets to 43
process2: Ref#set -> sets to 43
We would have expected the final output of this program to be 44
, but due to get
and set
happening in two different operations, we end up potentially incrementing to the same value twice.
This is where update comes in.
Implementation
def update(f: A => A): F[Unit] =
modify(a => (f(a), ()))
This operation is logically more simple than modify
, which is why I have included it before modify
in this post. Don’t worry too much about the implementation here until after you have read the section about modify
. For now the important thing to note is what update
does.
update
takes a function f
as an argument that will be called on whatever value is currently contained inside of the Ref
. Then whatever value the function f
returns will be the new value inside of the Ref
.
How update
works will make more sense after we understand how the modify
function on Ref
works. For now the key thing to understand is that update
provides a way to atomically perform a "get then set" operation.
Example
val printRef = for {
ref <- Ref[IO].of(42)
_ <- ref.update(_ + 1)
_ <- ref.update(_ + 1)
contents <- ref.get
} yield println(contents)
printRef.unsafeRunSync()
Ref#modify
We’ve shown that update can perform a “get and set” operation (although we have yet to explain how), but we haven’t yet seen how to perform a “get and set and get” operation. Although it may seem unlikely that such an operation is needed, it is a common one. Take the example above using update
to increment a counter. This method works unless we need to atomically access the value after it’s been incremented. For example, if we want to keep track of the number of requests that an HTTP server has seen over time, we could use a Ref
with the update
function to add one to a counter for every request. However, the update
function won’t work if we want to return the new request number after updating it. We could use update
followed by a get
, but this is not atomic because the get
is performed as a separate machine instruction. This could lead to two requests getting the same ID.
This is the problem that modify solves.
Implementation
def modify[B](f: A => (A, B)): F[B] = {
@tailrec
def spin: B = {
val c = ar.get
val (u, b) = f(c)
if (!ar.compareAndSet(c, u)) spin
else b
}
F.delay(spin)
}
Before I dive into explaining the modify
method, we need to understand what the AtomicReference#compareAndSet
method does. Compare and set (CAS) takes two parameters: 1) the value c
that we expect to be contained in the AtomicReference and 2) the value u
to which we want to update the AtomicReference
. CAS will update the AtomicReference
to the value of u
if and only if the value currently held within the AtomicReference
is equal to the value of c
. If the value of c
is not equal to the value currently held in the AtomicReference, then compareAndSet does not update the reference and returns false
to indicate such. The CAS operation is atomic because both the compare and the set functionality happen in a single machine instruction.
Now let’s look at the Ref#modify
method. This method takes a function f
as a parameter. This function accepts the value that is currently stored in the Ref
and returns a tuple containing: 1) the value with which to update the Ref
and 2) the value to return from the modify
method.
The modify
method contains a tail-recursive (stack-safe) inner method called spin
. The spin
method forms what is called a compare and set loop. The reason this loop is needed is that modify
requires two operations to work: AtomicReference#get
and AtomicReference#compareAndSet
. Each of these operations on its own is atomic, but when we call both of them in sequence the resulting operation is not atomic. Thus the AtomicReference
may get updated between the call to get
and the call to compareAndSet
causing the compareAndSet
operation to fail (return false). For this reason, we need to retry this operation repeatedly until it finally succeeds. It will succeed when there are no updates to the underlying AtomicReference
between the calls to get
and compareAndSet
.
Example
val printRef = for {
ref <- Ref[IO].of(42)
contents <- ref.modify(current => (current + 20, current + 20))
} yield println(contents)
printRef.unsafeRunSync()
Full Ref Example
Here is a full example using Ref
to manage the state of a set of bank accounts.
import BankAccounts._
final class BankAccounts(ref: Ref[IO, Map[String, BankAccount]]) {
def alterAmount(accountNumber: String, amount: Int): IO[Option[Balance]] = {
ref.modify { allBankAccounts =>
val maybeBankAccount = allBankAccounts.get(accountNumber).map { bankAccount =>
bankAccount.copy(balance = bankAccount.balance + amount)
}
val newBankAccounts = allBankAccounts ++ maybeBankAccount.map(m => (m.number, m))
val maybeNewBalance = maybeBankAccount.map(_.balance)
(newBankAccounts, maybeNewBalance)
}
}
def getBalance(accountNumber: String): IO[Option[Balance]] =
ref.get.map(_.get(accountNumber).map(_.balance))
def addAccount(account: BankAccount): IO[Unit] =
ref.update(_ + (account.number -> account))
}
object BankAccounts {
type Balance = Int
final case class BankAccount(number: String, balance: Balance)
}
val example = for {
ref <- Ref[IO].of(Map.empty[String, BankAccount])
bankAccounts = new BankAccounts(ref)
_ <- bankAccounts.addAccount(BankAccount("1", 0))
_ <- bankAccounts.alterAmount("1", 50)
_ <- bankAccounts.alterAmount("1", -25)
endingBalance <- bankAccounts.getBalance("1")
} yield println(endingBalance)
example.unsafeRunSync()
This example is still fairly basic and is certainly not intended to show how you would implement real banking software. However, it gives you an idea of what is possible using Ref
.
Regions of Sharing
Here is a final note about something that can be confusing when first working with Ref
. Whenever we are working with Ref
or its operations, we are using a for-comprehension. As you may know, for-comprehensions are syntactic sugar that represent a series of flatMap
calls followed by a map
at the end. Because Ref
’s operations are contained within the F
effect context, we need to map
or flatMap
over the results of those operations to access the values. This is referred to as a region of sharing.
Ref[IO].of(42).flatMap { i =>
// This is the region of sharing
}
// We can't access the value the Ref contains from out here
Regions of sharing are convenient because they give us the ability to reason about our code locally. In other words, we can see every possible operation that could be modifying our Ref
from right inside its region of sharing. We don’t have to look at anything outside of that region since it can’t do anything to access the Ref
.
Conclusion
I hope that this overview has given you a better intuition for what Ref
is and how you can use it within your code. Ref
is a super powerful construct that can give you an easy way to manage application state in a way that is both atomic and pure. For more on Ref
I would recommend checking out these resources: