Concurrency and data races
Much of the last two weeks has focused on the topic of memory safety—how can we avoid segfaults, double frees, and the whole litany of possible memory errors? I showed you how Rust’s ownership model plus some carefully crafted smart pointers can ensure memory safety while still permitting a wide range of low-level programs. Today, I want to discuss a different, but related problem of data races. To motivate the problem, let’s walk through a simple multithreaded program in C.
Difficulty of data races
Naive threading
Imagine for a moment we have a bank with one customer.
typedef struct {
int cash;
} Bank;
static Bank the_bank;
The customer can withdraw or deposit to the bank (where possible).
void deposit(int n) {
the_bank.cash += n;
}
void withdraw(int n) {
if (the_bank.cash >= n) {
the_bank.cash -= n;
}
}
We can simulate the customer doing some series of actions like this:
void* customer(void* args) {
for (int i = 0; i < 100; ++i) {
deposit(2);
}
for (int i = 0; i < 100; ++i) {
withdraw(2);
}
return NULL;
}
And then we can run the simulation, which should have a 0
balance at the end.
int main() {
the_bank.cash = 0;
customer(NULL);
printf("Total: %d\n", the_bank.cash);
return 0;
}
This will run as expected:
$ gcc bank.c -o bank
$ ./bank
Total: 0
Now the customer gets busy (or is having his account defrauded), and multiple people are simultaneously accessing from his account. We can simulate this with pthreads:
int main() {
the_bank.cash = 0;
// Create a thread for 32 customers.
int N = 32;
pthread_t tids[N];
for (int i = 0; i < N; ++i) {
pthread_create(&tids[i], NULL, &customer, NULL);
}
// Wait til each on is done.
for (int i = 0; i < N; ++i) {
pthread_join(tids[i], NULL);
}
printf("Total: %d\n", the_bank.cash);
return 0;
}
What happens then?
$ gcc bank.c -o bank
$ ./bank
Total: 0
$ ./bank
Total: 232
$ ./bank
Total: 0
$ ./bank
Total: 66
$ ./bank
Total: 116
Uh-oh! Our simulation should always come back to zero, even when executed in any theoretical order, since all the deposits happen before all the withdrawls. What’s happening is our first example of a data race—two simultaneous threads are attempting to touch the same data at the same time, while at least one of them is mutating it.
The first data race happens in deposit
. Think about what happens when we run the_bank.cash += n
. The +=
hides what is actually two distinct operations: reading the cash, and writing to the cash, which we could expand like this.
int cur_cash = the_bank.cash;
// what if the_bank.cash changes here?
the_bank.cash = cur_cash + n;
This is a classic data race where we accidentally ignore updates to a piece of shared data. The second data race is in withdraw
.
if (the_bank.cash >= n) {
the_bank.cash -= n;
}
This is an example of a “time of check to time of use” bug (usefully abbreviated as TOCTTOU). Even though we ensure the bank has enough cash at the beginning, if two withdrawls are happening simultaneously, we could potentially end up with negative money.
Note: Time of check to time of use is technically not a “data race” so much as a more general “concurrency bug”, since it’s not actually a problem of two threads accessing the same data at exactly the same time.
Lock attempt #1
As you learned in 110 (or equivalent), our first go-to mechanism to solve this is a mutex. I’m going to place a mutex around deposit
and around withdraw
.
typedef struct {
int cash;
pthread_mutex_t deposit_lock;
pthread_mutex_t withdraw_lock;
} Bank;
void deposit(int n) {
pthread_mutex_lock(&the_bank.deposit_lock);
the_bank.cash += n;
pthread_mutex_unlock(&the_bank.deposit_lock);
}
void withdraw(int n) {
if (the_bank.cash >= n) {
pthread_mutex_lock(&the_bank.withdraw_lock);
the_bank.cash -= n;
pthread_mutex_unlock(&the_bank.withdraw_lock);
}
}
After changing main
to initalize the mutexes, I can run my program:
$ gcc test.c -o test
$ ./test
Total: 152
$ ./test
Total: 174
$ ./test
Total: 0
$ ./test
Total: 76
Alas, still buggy! This attempted solution has two problems:
- By locking each thread independently, a concurrent deposit and withdraw can still race against each other to
the_bank.cash
, one potentially overwriting each other. Ideally, we want our locks to be associated with the data they protect, not with the functions using them. - If we don’t put the locks in the right place, we can still have data races, e.g. the time of check to time of use bug still occurs because the if statement is not protected.
Lock attempt #2
We can fix our previous bug by condensing two locks into one, and properly wrapping both functions.
typedef struct {
int cash;
pthread_mutex_t lock;
} Bank;
void deposit(int n) {
pthread_mutex_lock(&the_bank.lock);
the_bank.cash += n;
pthread_mutex_unlock(&the_bank.lock);
}
void withdraw(int n) {
pthread_mutex_lock(&the_bank.lock);
if (the_bank.cash >= n) {
the_bank.cash -= n;
}
pthread_mutex_unlock(&the_bank.lock);
}
Confirming that this seems to work:
$ gcc test.c -o test
$ ./test
Total: 0
$ ./test
Total: 0
$ ./test
Total: 0
$ ./test
Total: 0
Great! If it runs correctly a few times, surely it has no concurrency issues, right? Hopefully this convinced you (or at least reminded you) that like memory safety, avoiding data races is tricky and easy to mess up. Thankfully, Rust has a solution.
Closures
Before we talk about concurrency in Rust, I first need to introduce the basics of closures. Unlike OCaml, Rust has the restriction that by default, functions can only occur at the top-level. For example:
fn main() {
let x = 1;
fn printx() { println!("{}", x); }
printx();
}
error[E0434]: can't capture dynamic environment in a fn item
--> test.rs:3:32
|
3 | fn printx() { println!("{}", x); }
| ^
|
= help: use the `|| { ... }` closure form instead
The comment helpfully suggests that in order to write functions that access their environment, we have to use a special ||
syntax.
fn main() {
let x = 1;
let printx = || { println!("{}", x); };
printx();
}
Closures are inline (or nested) functions that are allowed to reference variables defined inside their containing function. Parts of Rust’s standard library heavily rely on higher-order functions, and subsequently closures are commonplace. For example, iterators:
let v: Vec<i32> = vec![1, 2, 3, 4, 5];
let divis_by: i32 = 2;
let v2: Vec<i32> = v.into_iter()
.filter(|x: i32| -> bool { x % divis_by == 0 }) // maximum annotation
// .filter(|x| x % divis_by == 0 ) minimum annotation
.map(|x| x + 1)
.collect();
// v2 == [3, 5]
Why are closures different than normal functions in Rust? Like many things in the language, the answer is: ownership. For example, consider a closure like this:
fn main() {
let f = {
let n = 3;
|m: i32| n + m
};
println!("{}", f(5));
}
error[E0597]: `n` does not live long enough
--> test.rs:4:14
|
4 | |m: i32| n + m
| -------- ^ borrowed value does not live long enough
| |
| capture occurs here
5 | };
| - borrowed value only lives until here
6 | println!("{}", f(5));
7 | }
| - borrowed value needs to live until here
As the compiler helpfully illustrates, we have an issue. A closure is a combination of both its code and its environment, the set of variables used in the closure (often called “captures”). By default, a closure borrows all external variables used, which means the returned closure object contains a pointer to n
. However, n
is gone at the end of the scope, meaning this is a dangling pointer! We can fix this with the move
keyword:
let f = {
let n = 3;
move |m: i32| n + m
};
This says: instead of borrowing, take ownership of any captured variables, moving n
into the closure now owned by f
. That’s the essentials of how closures differ from normal functions—for more nuance, you can read the Closures chapter of the Rust Book.
Threads and locks
Single-threaded Rust
To understand concurrency in Rust, let’s walk through re-implementing the bank example in Rust, and see what changes we have to make a long the way. First, we can naively translate a shortened version of the original C program:
struct Bank {
cash: i32
}
static mut the_bank: Bank = Bank { cash: 0 };
fn deposit(n: i32) {
the_bank.cash += n;
}
fn main() {
deposit(2);
}
error[E0133]: use of mutable static is unsafe and requires unsafe function or block
--> bank1.rs:8:5
|
8 | the_bank.cash += n;
| ^^^^^^^^^^^^^^^^^^ use of mutable static
|
= note: mutable statics can be mutated by multiple threads: aliasing violations or data races will cause undefined behavior
This uses a feature we haven’t discussed until now, which is global variables. Like C, Rust allows the declaration of globals with the static
keyword. Unfortunately, global variables break the ownership model, and so using them reqires unsafe (see Accessing or Modifying a Mutable Static Variable for more). We can resolve this by moving our global into the main
function.
struct Bank {
cash: i32
}
fn deposit(mut the_bank: Bank, n: i32) {
the_bank.cash += n;
}
fn main() {
let mut the_bank = Bank { cash: 0 };
deposit(the_bank, 2);
deposit(the_bank, 2);
}
error[E0382]: use of moved value: `the_bank`
--> bank2.rs:17:13
|
16 | deposit(the_bank, 2);
| -------- value moved here
17 | deposit(the_bank, 2);
| ^^^^^^^^ value used here after move
|
= note: move occurs because `the_bank` has type `Bank`, which does not implement the `Copy` trait
This doesn’t work, since just taking the_bank: Bank
as input will move the entire bank into the deposit
function on the first call, rendering it useless on subsequent calls. What we really want is a mutable reference:
fn deposit(the_bank: &mut Bank, n: i32) {
the_bank.cash += n;
}
fn withdraw(the_bank: &mut Bank, n: i32) {
the_bank.cash -= n;
}
fn main() {
let mut the_bank = Bank { cash: 0 };
deposit(&mut the_bank, 2);
deposit(&mut the_bank, 2);
withdraw(&mut the_bank, 2);
withdraw(&mut the_bank, 2);
println!("Total: {}", the_bank.cash); // prints 0
}
This works as intended. Now how do we make this concurrent?
Thread lifetimes
Like in C, the fundamental unit of concurrency in Rust is the thread, akin to a pthread, but with a slightly different interface. The std::thread
docs do a good job explaining the basics. To create a new thread, we can use thread::spawn
. For example:
let t = thread::spawn(|| { println!("Hello world!"); });
t.join().unwrap();
The thread::spawn
interface takes as input a function to be the new thread, which above we use a closure that takes no arguments and prints “Hello world!”. The output of thread::spawn
is a JoinHandle
that allows us to wait until the thread is done and cleanup its resources. A first attempt at using threads in our bank looks like this:
use std::thread;
fn customer(the_bank: &mut Bank) {
deposit(the_bank, 2);
}
fn main() {
let mut the_bank = Bank { cash: 0 };
thread::spawn(|| {
customer(&mut the_bank)
}).join().unwrap();
println!("Total: {}", the_bank.cash);
}
error[E0373]: closure may outlive the current function, but it borrows `the_bank`, which is owned by the current function
--> bank4.rs:26:19
|
26 | thread::spawn(|| {
| ^^ may outlive borrowed value `the_bank`
27 | customer(&mut the_bank)
| -------- `the_bank` is borrowed here
help: to force the closure to take ownership of `the_bank` (and any other referenced variables), use the `move` keyword
|
26 | thread::spawn(move || {
| ^^^^^^^
Let’s take a moment to carefully walk through the rationale for this error. Let’s think about the lifetime of the function passed to thread::spawn
. A thread may outlive the function that spawned it, so it would be illegal to hold a pointer to anything inside the main
function, as the value it points to could be deallocated before the thread exits. This logical issue materializes as the error: “closure may outlive the current function”, referring to the thing passed to thread::spawn
as the closure, and main
as the function.
Atomic pointers
So… what are we to do then? Having the_bank
as a global is unsafe, but we can’t pass around references to threads since that’s also unsafe. Recall from last lecture that we can use smart pointers to work around restrictions in Rust’s ownership system by trading off some runtime overhead. Specifically, we saw Rc<RefCell<T>>
as an idiom that allows multiple owners with mutable access to the same data. Let’s try it out.
use std::rc::Rc;
use std::cell::RefCell;
fn customer(the_bank: Rc<RefCell<Bank>>) {
deposit(&mut the_bank.borrow_mut(), 2);
}
fn main() {
let the_bank: Rc<RefCell<Bank>> =
Rc::new(RefCell::new(Bank { cash: 0 }));
let bank_ref = the_bank.clone();
thread::spawn(|| customer(bank_ref)).join().unwrap();
println!("Total: {}", the_bank.borrow().cash);
}
error[E0277]: `std::rc::Rc<std::cell::RefCell<Bank>>` cannot be sent between threads safely
--> bank5.rs:33:3
|
33 | thread::spawn(|| customer(bank_ref)).join().unwrap();
| ^^^^^^^^^^^^^ `std::rc::Rc<std::cell::RefCell<Bank>>` cannot be sent between threads safely
|
= help: within `[closure@bank5.rs:33:17: 33:38 bank_ref:std::rc::Rc<std::cell::RefCell<Bank>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<Bank>>`
= note: required because it appears within the type `[closure@bank5.rs:33:17: 33:38 bank_ref:std::rc::Rc<std::cell::RefCell<Bank>>]`
= note: required by `std::thread::spawn`
The error is a little imposing, but the summary captures the key idea: Rc<RefCell<Bank>>
cannot be sent between threads safely. Why is this unsafe, and how can the compiler possibly know that? Rust has a special trait called Send
that it tracks for all types, that determines whether a type can be safely sent between two threads. Most basic types (e.g. ints, bools, strings) are sendable, and composite types built of sendable types are also sendable (e.g. struct Point { x: f32, y: f32 }
).
However, Send exists so we can mark certain types explicitly as not sendable. Specifically, the Rc
smart pointer is marked as !Send
(indicated here in the docs). To understand why Rc
is not sendable, recall our Clone
implementation from last lecture:
impl<T> Clone for Rc<T> {
fn clone(&self) -> Rc<T> {
unsafe {
*self.count += 1;
}
Rc { count: self.count, value: self.value }
}
}
What if multiple threads cloned the same underlying Rc
at the same time? Just like in the original bank example, *self.count += 1
has a race condition where one thread could potentially overwrite the other while incrementing. Hence, Rc
is not thread-safe, so it is not Send
. To address this, Rust provides two reference-counted pointer types: Rc
and Arc
, or the atomic reference-counted pointer. On an interface level, Arc
functions identically to Rc
, except all reads and writes to the count
field are atomic, in this case through an AtomicUsize
.
Similarly, RefCell
suffers from the same issue—it’s not safe to share between threads due to potential data races on shared values. A Mutex
provides the equivalent services (in a slightly different interface) in the concurrent context. For example:
let n1 = Arc::new(Mutex::new(1));
let n2 = n1.clone();
let t = thread::spawn(move || {
let mut n2_locked = n2.lock().unwrap();
*n2_locked = *n2_locked + 1;
});
{
let mut n1_locked = n1.lock().unwrap();
*n1_locked = *n1_locked + 1;
}
t.join().unwrap();
assert_eq!(*n1.lock().unwrap(), 3);
Here, the outer Arc
allows us to share multiple references to the inner Mutex
, indicates by n1.clone()
. Then inside the thread, we first lock the mutex with n2.lock().unwrap()
, which gives us effectively a mutable borrow on the number inside the mutex, and then we can modify this number. While n2_locked
is in scope, no other thread can access the number. In our bank example, this looks like:
use std::sync::{Arc, Mutex};
fn customer(the_bank: Arc<Mutex<Bank>>) {
let mut bank_locked = the_bank.lock().unwrap();
bank_locked.cash += 1;
}
fn main() {
let mut the_bank: Arc<Mutex<Bank>> =
Arc::new(Mutex::new(Bank { cash: 0 }));
let bank_ref = the_bank.clone();
thread::spawn(|| {
customer(bank_ref)
}).join().unwrap();
}
At long last! We can finally a pointer to a single bank across a thread boundary. We don’t want the customer to be responsible for locking the bank, so we can shift this burden to the bank’s functions.
fn deposit(the_bank: &Arc<Mutex<Bank>>, n: i32) {
let mut bank_ref = the_bank.lock().unwrap();
bank_ref.cash += n;
}
fn withdraw(the_bank: &Arc<Mutex<Bank>>, n: i32) {
let mut bank_ref = the_bank.lock().unwrap();
bank_ref.cash -= n;
}
fn customer(the_bank: Arc<Mutex<Bank>>) {
for _ in 0..100 {
deposit(&the_bank, 2);
}
for _ in 0..100 {
withdraw(&the_bank, 2);
}
}
Note: what’s the difference between deposit taking
Arc<..>
vs&Arc<..>
? Here, it means within a thread, we don’t have to clone the pointer every time we call deposit and withdraw (requiring an atomic increment and decrement), so it’s slightly faster.
Lastly, we need to spawn many competing threads to stress test the system.
fn main() {
let the_bank: Arc<Mutex<Bank>> =
Arc::new(Mutex::new(Bank { cash: 0 }));
let n = 32;
let handles = (0..n).iter().map(|_| {
let bank_ref = the_bank.clone();
thread::spawn(|| {
customer(bank_ref)
})
});
for handle in handles {
handle.join().unwrap();
}
println!("Total: {}", the_bank.lock().unwrap().cash);
}
It compiles, and we’re done! We successfully ported the (safe) C example into Rust. But how we got here matters as much as the final program, since it reveals the many ways in which Rust’s type system protects us from concurrency issues.
-
Ownership precludes the basic issue of mutable shared state, which is the foundation of data races. To have two threads reading/writing the same value at the same time is impossible in a system that correctly enforces ownership. We saw this when attempting to send mutable references across thread boundaries.
-
The
Send
trait ensures that (when implemented correctly) smart pointers can appropriately indicate their level of thread-safety. You can think about this as the mechanization of documentation—while most libraries document their concurrency guarantees through plain English (e.g. see “Data races” in C++vector::insert
), theSend
trait ensures that invariants be upheld like thatRc
cannot be used across threads. -
Mutexes are associated with the data they protect. A subtle yet important shift from C/C++ to Rust is that instead of having a separate mutex and data value, the mutex becomes a container that wraps the data value itself, i.e.
Mutex<T>
. This has the consequence that the data can only be accessed while holding the lock. You can’t accidentally forget to take the mutex. Moreover, Rust’s custom destructors ensure that the lock is automatically released at the end of its scope, similar to C++std::lock_guard
.
Message passing
One of the main innovations of Rust’s concurrency model is that it ensures data race avoidance in the presence of shared memory. This matters because shared mem is often critical for some high-performance concurrent programs. By contrast, many languages promote the usage of message passing as a safer model than shared memory, most notably Golang and Erlang. Rust also has facilities for message passing that I want to briefly highlight.
Consider a simple communication protocol with a Counter
that increments a number until being asked to exit. In a single thread, this would look like:
struct Counter {
counter: i32
}
impl Counter {
pub fn new() -> Counter { Counter { counter: 0 } }
pub fn value(&self) -> i32 { self.counter }
pub fn incr(&mut self, n: i32) { self.counter += n; }
}
fn main() {
let mut ctr = Counter::new();
ctr.incr(1);
ctr.incr(2);
println!("{}", ctr.value()); // 3
}
If we wanted to move our Counter
into a separate thread, rather than accessing the counter directly through an Arc<Mutex<Counter>>
, we can use a message passing approach:
enum Message {
Incr(i32),
Exit
}
fn main() {
let (sender, receiver) = mpsc::channel();
let t = thread::spawn(move || {
let mut ctr = Counter::new();
loop {
let message = receiver.recv().unwrap();
match message {
Message::Incr(n) => { ctr.incr(n); },
Message::Exit => { return; }
};
}
});
sender.send(Message::Incr(1)).unwrap();
sender.send(Message::Incr(2)).unwrap();
sender.send(Message::Exit).unwrap();
t.join().unwrap();
}
This is a simple example that just shows communication in one direction, from the main thread to the counter thread. First, we define a sum type for messages called Message
, which for now just has Incr
and Exit
. This effectively defines a sanitized interface for communication between these threads—it’s morally similar to remote procedure calls if you’re familiar, except manually generated.
Then we create a channel using Rust’s mpsc (multiple producer, single consumer) module. A channel is a one-directional means of communication consisting of a message sender and a message receiver. Above, we keep the sender in the main thread, and move the receiver into the counter thread. The counter thread loops infinitely reading messages from the receiver until exiting, and the main thread can send as many messages as it likes. To extend this for bidirectional communication, we can create a second channel:
enum Message {
Incr(i32),
Value,
Exit
}
fn main() {
let (input_sender, input_receiver) = mpsc::channel();
let (output_sender, output_receiver) = mpsc::channel();
let t = thread::spawn(move || {
let mut ctr = Counter::new();
loop {
let message = input_receiver.recv().unwrap();
match message {
Message::Incr(n) => { ctr.incr(n); }
Message::Value => { output_sender.send(ctr.value()).unwrap(); }
Message::Exit => { return; }
};
}
});
input_sender.send(Message::Incr(1)).unwrap();
input_sender.send(Message::Incr(2)).unwrap();
input_sender.send(Message::Value).unwrap();
input_sender.send(Message::Exit).unwrap();
println!("Counter: {}", output_receiver.recv().unwrap());
t.join().unwrap();
}
Here, Rust has two main differences from other languages. First, sum types (enum
) allow the creation of rich typed messages (e.g. by contrast, Go has to fallback to dynamic typing to accomplish the same task). Second, Rust’s channels require that all values communicated must be Send
, precluding issues of accidentally unsafely sharing memory over a channel. For example:
let (sender, receiver) = mpsc::channel();
let t = thread::spawn(move || {
println!("{}", receiver.recv().unwrap());
});
let n = Rc::new(1);
sender.send(n.clone());
error[E0277]: `std::rc::Rc<i32>` cannot be sent between threads safely
--> test.rs:8:11
|
8 | let t = thread::spawn(move || {
| ^^^^^^^^^^^^^ `std::rc::Rc<i32>` cannot be sent between threads safely
|
= help: the trait `std::marker::Send` is not implemented for `std::rc::Rc<i32>`
= note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::mpsc::Receiver<std::rc::Rc<i32>>`
= note: required because it appears within the type `[closure@test.rs:8:25: 10:4 receiver:std::sync::mpsc::Receiver<std::rc::Rc<i32>>]`
= note: required by `std::thread::spawn`