Assignment 7: Futures
Due: Wednesday, November 13 at 11:59pm
Submission cutoff: Saturday, November 16 at 11:59pm
A key design challenge in modern programming languages is representing asynchrony. Most imperative languages assume a sequential model of computation, where there is only a single thread of control, and only one kind of work is done at a time. However, many application domains can be usefully represented as multiple concurrent processes. For example:
- User interfaces: network requests in a webpage, like fetching a result of a database query from a backend, are often performed asynchronously with changes to the user interface. A webpage shouldn’t completely freeze while the request is loading. In Javascript, this is done with Promises and Events.
- Servers: servers often handle hundreds of concurrent requests to a particular web page, and one user’s request shouldn’t prevent another’s from completing. Web servers use non-blocking I/O e.g. with Rust’s Tokio library.
- Operating systems: an OS needs to switch between hundreds of running processes, allocating I/O and compute resources as necessary.
Designing asynchronous computation has two parts: representation and execution. What is the API for expressing asynchrony, and how does the runtime execute the API on the hardware? For example, one representation of asynchrony is the thread model, where computation is launched on independent threads of control, often synchronized through constructs like barriers, mutexes, and condition variables. Threads are the conventional asynchrony model because they map closely to how a modern OS/CPU actually works.
However, with the rise of higher-order functions and closures, one approach that has gained rapid adoption in recent years is futures, or promises. Futures are objects that represent work to be done, and they can be combined through operators(or “combinators”) like then
, map
, join
, and many others. They are perhaps most popular in Javascript, but have also gained prominence in C++ and Scala. For example, in Javascript, promises turn “callback hell” code like this:
doSomething(function(result) {
doSomethingElse(result, function(newResult) {
doThirdThing(newResult, function(finalResult) {
console.log('Got the final result: ' + finalResult);
}, failureCallback);
}, failureCallback);
}, failureCallback);
Into this:
doSomething()
.then(result => doSomethingElse(result))
.then(newResult => doThirdThing(newResult))
.then(finalResult => {
console.log('Got the final result: ' + finalResult);
})
.catch(failureCallback);
Note: promises are frequently juxtaposed against “async/await”, an alternative syntax that allows programmers to write seemingly straight-line that effectively compiles into promises, e.g. see Javascript and Rust.
More generally, futures are preferable versus threads because they simplify synchronization. Manually synchronizing multiple threads with mutexes an condition variables is tricky and easy to mess up (deadlocks, livelocks, etc.). This is why most modern programming languages have moved away from threaded models to futures models.
The goal of this assignment is to understand how to implement type-safe, high-performance futures in Rust. Inspired by the actual Rust futures library, you will use Rust’s many functional features and traits to recreate a number of future combinators. Then you will get a taste of concurrency in Rust by executing futures in parallel.
1. Futures (40%)
The core of the futures interface is the Future
trait, defining the primitive unit of concurrent work.
enum Poll<T> { Ready(T), NotReady }
trait Future: Send {
type Item;
fn poll(&mut self) -> Poll<Self::Item>;
}
A future is anything that produces an Item
through a polling interface, where a future responds with NotReady
if it’s still computing, and Ready(item)
if it is completed. The syntax trait Future: Send
means “anything implementing the Future
trait must also implement Send
.
Polling is just one way of implementing a futures interface. I recommend reading Zero-cost futures in Rust for more on the alternatives, and why Rust chooses polling.
For example, here’s a future that doesn’t do any work, but just immediately returns a value when polled.
// Container for the state of the future.
pub struct Immediate<T> {
t: Option<T>,
}
// Constructor to build the future. The return type means "this function
// returns a type implementing the trait Future producing Item = T."
// See: https://doc.rust-lang.org/edition-guide/rust-2018/trait-system/impl-trait-for-returning-complex-types-with-ease.html
pub fn immediate<T: Send>(t: T) -> impl Future<Item = T> {
Immediate { t: Some(t) }
}
// To treat Immediate as a future, we have to implement poll. Here it's
// relatively simple, since we return immediately with a Poll::Ready.
impl<T: Send> Future for Immediate<T> {
type Item = T;
fn poll(&mut self) -> Poll<Self::Item> {
// take replaces the option with a None, returning ownership of the value
// on the inside. This poll would fail if called again, but we are
// guaranteed that poll will never be called after Ready is returned.
Poll::Ready(self.t.take().unwrap())
}
}
We can use this future like this:
fn execute_future<F: Future>(fut: F) -> F::Item {
loop {
match fut.poll() {
Poll::Ready(f) => { return f; }
Poll::NotReady => {}
}
}
}
fn main() {
assert_eq!(execute_future(immediate(3)), 3);
}
Next, we will look at a future combinator, or a future that takes another future as input. Consider a map
future that runs a function over the output of a future.
struct Map<Fut, Fun> {
fut: Fut,
fun: Option<Fun>,
}
fn map<T, Fut, Fun>(fut: Fut, fun: Fun) -> impl Future<Item=T>
where
T: Send,
Fut: Future,
Fun: (FnOnce(Fut::Item) -> T) + Send
{
Map { fut, fun: Some(fun) }
}
impl<T, Fut, Fun> Future for Map<Fut, Fun>
where
T: Send,
Fut: Future,
Fun: (FnOnce(Fut::Item) -> T) + Send
{
type Item = T;
fn poll(&mut self) -> Poll<Self::Item> {
match self.fut.poll() {
Poll::NotReady => Poll::NotReady,
Poll::Ready(s) => {
let mut f = self.fun.take();
Poll::Ready(f.unwrap()(s))
}
}
}
}
We can use the map future like this:
assert_eq!(execute(map(immediate(3), |n| n + 1)), 4);
This is an example of a future combinator, or a function that takes a future as input, and returns another future. This is also our first example of a higher order function in Rust, which you can see in the trait bound Fun: (FnOnce(Fut::Item) -> T) + Send
. The syntax FnOnce(Fut::Item) -> T
is a special syntax for a trait that means “can be called as a function from Fut::Item
to T
(i.e. it implements the call f()
operator). See the Rust Book for more about function traits.
Your task is to implement two other particular future combinators:
join
: a future, which takes two futures, runs them to completion, and returns a tuple of their results. For example,join(immediate(1), immediate(2))
should return a future that outputs(1, 2)
.and_then
: a future which takes a future and a function that outputs a future, and chains them together. For example,and_then(immediate(1), |n| immediate(n + 1))
should return a future that outputs2
.
In src/future.rs
, we have provided you with the necessary wrapper code (type definition, constructor, and impl Future
) to get started. You just need to fill in the poll
method for both futures. Your poll function should not block, i.e. do not loop on sub-futures until completion.
Note that ownership around futures can get particularly tricky. Often you will need to pass off ownership of things once the future is ready to return Poll::Ready
, except without consuming ownership of the future itself. For example, in both Immediate
and Map
, we need to consume ownership of the value/function when returning ready, so we can address this by using an Option<T>
along with the Option::take
operation to consume ownership of the inner value and replace it with a None
.
For more complex state machines like join
and and_then
, a particularly useful tool is take_mut
, a library included in the starter code that allows you to temporarily treat a mutable pointer as an owned value. This allows you to avoid the kind of problem you encountered on Assignment 6 where mutating a binary tree in place required careful surgery with mem::replace
. For example, we can rewrite a prepend
function on lists:
use take_mut;
use std::mem;
enum List<T> { Nil, Cons(T, Box<List<T>>) }
impl<T> List<T> {
fn prepend_replace(&mut self, t: T) {
// You normally have to do this...
let self_owned = mem::replace(self, List::Nil);
*self = List::Cons(t, Box::new(self_owned));
}
fn prepend_takemut(&mut self, t: T) {
// This makes `l` an owned version of self. Then it places
// the reutrn value of the closure back into self.
take_mut::take(self, |l: List<T>| -> List<T> {
List::Cons(t, Box::new(l))
});
}
}
fn main() {
let mut l = List::Cons(1, Box::new(List::Nil));
l.prepend_replace(2);
l.prepend_takemut(3);
match l {
List::Cons(n, _) => assert_eq!(n, 3),
List::Nil => assert!(false)
}
}
2. Executors (40%)
While futures define independent units of work, we still need an underlying runtime that can execute them concurrently. This is the idea of an executor, or future-executing engine. Executors have the following trait:
trait Executor {
fn spawn<F>(&mut self, f: F) where F: Future<Item=()> + 'static;
fn wait(&mut self);
}
The idea is that spawn
registers a future to be executed with the executor, and wait
blocks until all the futures have finished completing. In this interface, all top-level futures (i.e. those passed to spawn
) are expected to return the unit type (Item=()
), meaning their only interaction to the world outside the executor can be through side effects. For example:
let mut exec = // make an executor ...
exec.spawn(
map(
immediate(5),
|n| { println!("n: {}", n) }));
exec.wait();
Here’s an example of a naive implementation for an executor that immediately blocks on spawn
.
struct BlockingExecutor;
impl BlockingExecutor {
fn new() -> BlockingExecutor { BlockingExecutor }
}
impl Executor for BlockingExecutor {
fn spawn<F>(&mut self, mut f: F) where F: Future<Item=()> {
loop {
if let Poll::Ready(_) = f.poll() {
break;
}
}
}
fn wait(&mut self) {}
}
Your task is to implement two executors:
-
SingleThreadExecutor
: single-threaded executor that permits concurrent work on one thread (e.g. through asynchronous I/O). A single-threaded executor contains a list of futures that it needs to make progress on:pub struct SingleThreadExecutor { futures: Vec<Box<Future<Item = ()>>>, }
When a future is spawned on the single-threaded executor, you must call
poll
exactly once to start the future (making sure to discard the future if it immediately returnsReady
). Otherwise, most of the work is done inwait
to loop through all the futures and poll them to completion.Note:
Box<Future<..>>
is a use of trait objects which enable a single vector to contain potentially multiple different kinds of futurues. -
MultiThreadExecutor
: a multi-threaded executor that splits future execution across multiple threads. Your implementation should follow this protocol:- The main thread creates an instance of a
MultiThreadExecutor
, which creates a pool of threads (seethread::spawn
) and a shared communication channel to send work (seempsc::channel
). This is the inverse situation ofmpsc
(single producer, multiple consumer), so you will need to use smart pointers so each thread can access the same single receiver. - Each thread in the pool should create a
SingleThreadExecutor
, and pull work from the channel to spawn into its local executor. - Spawning a future on the
MultiThreadExecutor
should send the future over the channel. - Calling
wait
on theMultiThreadExecutor
should send a signal to each thread that they each shouldwait
on theirSingleThreadExecutor
, and then exit afterwards. TheMultiThreadExecutor
should wait until all threads have exited (seeThreadHandle::join
).
- The main thread creates an instance of a
3. Async I/O (10%)
To get a sense of how to apply these futures to actual asynchronous tasks, you are going to implement a simple asynchronous file read future. Specifically, you have the following interface:
struct FileReader { .. }
impl FileReader {
pub fn new(path: PathBuf) -> FileReader { .. }
}
impl Future for FileReader {
type Item = io::Result<String>;
fn poll(&mut self) -> Poll<Self::Item> { .. }
}
The file reader should, on the first poll, launch a thread to begin reading the provided PathBuf
into a string (see: fs::read_to_string
). The future must not block on this thread. To accomplish this, you can use an AtomicBool
to act as a thread-safe flag (see AtomicBool::load
and AtomicBool::store
). When polled, the future should check the flag, returning Poll::NotReady
if the the thread is not finished. When the thread is finished, it should set the flag to true, and return the value, and the future can access the value using JoinHandle::join
.
In src/asyncio.rs
, implement the FileReader::new
and Future::poll
functions. We have provided you a type definition for FileReader
as a hint for how to implement your future.
4. Pointer profiling (10%)
In a performance-oriented systems (like a futures library), we often want to instrument the language with profiling constructs to help understand hotspots in the code. Specifically, we want to know: how many times was a reference to a value created?
One way to accomplish this task is with smart pointers. Smart pointers offer a means of wrapping a value with metadata while enabling the value to be used as if it’s not in a wrapper. You are going to implement a UseCounter
smart pointer that tracks the number of times a pointer is used (referenced). As an example:
#[test]
fn usecount_basic() {
let mut uc: UseCounter<i32> = UseCounter::new(1);
// You can use the counter as a standard pointer, so
// &mut uc becomes &mut i32, NOT &mut UseCounter<i32>
let n: &mut i32 = &mut uc;
*n += 1;
let n: &i32 = &uc;
let x = *n + 1;
assert_eq!(x, 3);
assert_eq!(uc.count(), 2);
}
In src/usecount.rs
, we have provided you with a blank file to write your UseCounter
struct in. For this exercise, you get to design the type signatures for the struct and methods, as well as what traits to implement. Your implementation just needs to support all the provided tests in tests/usecount_tests.rs
, meaning you need a new
function, count
function, and implementations of the Deref
and DerefMut
traits.
Note that you need to manually uncomment the tests in tests/usecount_tests.rs
once you get to this part. Don’t forget to do this, otherwise your code won’t actually be tested!
Testing
To test your code, we have provided you a few tests in the tests
folder, which you can execute using cargo test
.
Submission
Run ./make_submission.sh
and upload the generated assign7.zip
to Gradescope.