Due: Wednesday, November 7 at 4:00pm
Submission cutoff: Saturday, November 10 at 4:00pm

In the Concurrency lecture, we discussed how low-level concurrency works in Rust, using system threads, shared memory, and channels to parallelize and synchronize simultaneous computation. However, in the same way manually allocating memory is verbose and error-prone, even with Rust’s data race protection, we would like a higher level approach to expressing concurrent programs.

This is a huge area of ongoing work in modern programming languages. One big question is: given concurrent work, how do you schedule that work onto a CPU? Assuming each piece of work is a different logical “thread”, then an underlying runtime can use:

  • 1:1 threading: map each logical thread to a physical thread. Rely on the kernel to decide when to switch between threads. This is the approach Rust and C use with threading interfaces.
  • N:1 threading: map each logical thread onto a single thread. Rely on the threads to explicitly release control, and the userspace scheduler to switch between threads. This approach is adopted by a number of libraries including OCaml’s Async.
  • M:N threading: map M logical threads on to N physical threads, hybridizing the above two approaches. This approach Go uses for its lightweight goroutines.

Note: while there are granularities of concurrency larger than physical threads, like processes (in an operating system) and machines (in a distributed system), we’re just going to think about concurrency within a single process for now.

The other major question in concurrency design is around synchronization: how do you express dependencies and handle shared resources between concurrent threads? With the rise of higher-order functions and closures, one approach that has gained rapid adoption in recent years is futures, or promises. Futures represent work to be done, and have proven to be a convenient and expressive means for writing concurrent programs. They are perhaps most popular in Javascript, but have also gained prominence in C++ and Scala. For example, in Javascript, promises turn “callback hell” code like this:

doSomething(function(result) {
  doSomethingElse(result, function(newResult) {
    doThirdThing(newResult, function(finalResult) {
      console.log('Got the final result: ' + finalResult);
    }, failureCallback);
  }, failureCallback);
}, failureCallback);

Into this:

doSomething()
.then(result => doSomethingElse(result))
.then(newResult => doThirdThing(newResult))
.then(finalResult => {
  console.log('Got the final result: ' + finalResult);
})
.catch(failureCallback);

Note: promises are frequently juxtaposed against “async/await”, an alternative syntax that allows programmers to write seemingly straight-line that effectively compiles into promises, e.g. see Javascript and Rust.

The goal of this assignment is to implement a simplified version of a futures library in Rust, inspired by the actual Rust futures library. It will involve both the concurrency API (how to write futures) and an M:N threading implementation (distributing many futures onto multiple threads).

1. Futures (45%)

The core of the futures interface is the Future trait, defining the primitive unit of concurrent work.

enum Poll<T> { Ready(T), NotReady }

trait Future: Send {
  type Item;
  fn poll(&mut self) -> Poll<Self::Item>;
}

A future is anything that produces an Item through a polling interface, where a future responds with NotReady if it’s still computing, and Ready(item) if it is completed. The syntax trait Future: Send means “anything implementing the Future trait must also implement Send.

Note: polling is just one way of implementing a futures interface. I recommend reading Zero-cost futures in Rust for more on the alternatives, and why Rust chooses polling.

For example, here’s a future that doesn’t do any work, but just immediately returns a value when polled.

// Container for the state of the future.
pub struct Immediate<T> {
  t: Option<T>,
}

// Constructor to build the future. Note that the return type just says
// "this produces a future", not specifying concretely the type Immediate.
pub fn immediate<T>(t: T) -> impl Future<Item = T>
where
  T: Send,
{
  Immediate { t: Some(t) }
}

// To treat Immediate as a future, we have to implement poll. Here it's
// relatively simple, since we return immediately with a Poll::Ready.
impl<T> Future for Immediate<T>
where
  T: Send,
{
  type Item = T;

  fn poll(&mut self) -> Poll<Self::Item> {
    Poll::Ready(self.t.take().unwrap())
  }
}

We can use this future like this:

fn execute<F>(fut: F) -> F::Item where F: Future {
  loop {
    if let Poll::Ready(f) = fut.poll() {
      return f;
    }
  }
}

fn main() {
  assert_eq!(execute(immediate(3)), 3);
}

For a more complex example, consider a map future that runs a function over the output of a future.

struct Map<Fut, Fun> {
  fut: Fut,
  fun: Option<Fun>,
}

fn map<T, Fut, Fun>(fut: Fut, fun: Fun) -> impl Future<Item=T>
where T: Send, Fut: Future, Fun: (FnOnce(Fut::Item) -> T) + Send
{
  Map { fut, fun: Some(fun) }
}

impl<T, Fut, Fun> Future for Map<Fut, Fun>
where T: Send, Fut: Future, Fun: (FnOnce(Fut::Item) -> T) + Send {
  type Item = T;

  fn poll(&mut self) -> Poll<Self::Item> {
    match self.fut.poll() {
      Poll::NotReady => Poll::NotReady,
      Poll::Ready(s) => {
        let mut f = self.fun.take();
        Poll::Ready(f.unwrap()(s))
      }
    }
  }
}

We can use the map future like this:

assert_eq!(execute(map(immediate(3), |n| n + 1)), 4);

This is an example of a future combinator, or a function that takes a future as input, and returns another future. This is also our first example of a higher order function, which you can see in the trait bound Fun: (FnOnce(Fut::Item) -> T) + Send. The syntax FnOnce(Fut::Item) -> T is a special syntax for a trait that means “can be called as a function from Fut::Item to T (i.e. it implements the call f() operator). See the Rust Book for more about function traits.

Your task is to implement two other particular future combinators, or futures that take other futures as arguments and process them.

  1. join: a future, which takes two futures, runs them to completion, and returns a tuple of their results. For example, join(immediate(1), immediate(2)) should return a future that outputs (1, 2).
  2. and_then: a future which takes a future and a function that outputs a future, and chains them together. For example, and_then(immediate(1), |n| immediate(n + 1)) should return a future that outputs 2.

In src/future.rs, we have provided you with the necessary wrapper code (type definition, constructor, and impl Future) to get started. You just need to fill in the poll method for both futures. Your poll function should not block, i.e. do not loop on sub-futures until completion. A call to poll should execute a fixed number of polls to sub-futures for any such futures.

Note that ownership around futures can get particularly tricky. Often you will need to pass off ownership of things once the future is ready to return Poll::Ready, except without consuming ownership of the wrapper future itself. For example, in both Immediate and Map, we need to consume ownership of the value/function when returning ready, so we can address this by using an Option<T> along with the Option::take operation to consume ownership of the inner value and replace it with a None.

For more complex state machines like join and and_then, a particularly useful tool is take_mut, a library included in the starter code that allows you to temporarily treat a mutable pointer as an owned value. This can help you extract fields of an enum as owned values without consuming ownership of the enum itself. For example:

enum Test { A(String), B }
let mut t: Test = Test::A(String::from("Hello"));
let tmut: &mut Test = &mut t;
let mut s: Option<String> = None;
take_mut::take(tmut, |t2: Test| -> Test {
  if let Test::A(s2) = t2 {
    s = Some(s2);
    Test::B
  } else {
    t2
  }
});
println!("{}", s.unwrap()); // Hello

2. Executors (45%)

While futures define independent units of work, we still need an underlying runtime that can execute them concurrently. This is the idea of an executor, or future-executing engine. Executors have the following trait:

trait Executor {
  fn spawn<F>(&mut self, f: F) where F: Future<Item=()> + 'static;
  fn wait(&mut self);
}

The idea is that spawn registers a future to be executed with the executor, and wait blocks until all the futures have finished completing. In this interface, all top-level futures (i.e. those passed to spawn) are expected to return the unit type (Item=()), meaning their only interaction to the world outside the executor can be through side effects. For example:

let mut exec = // make an executor ...
exec.spawn(
  map(
    immediate(5),
    |n| { println!("n: {}", n); () }));
exec.wait();

Here’s an example of a naive implementation for an executor that immediately blocks on spawn.

struct BlockingExecutor;

impl BlockingExecutor {
  fn new() -> BlockingExecutor { BlockingExecutor }
}

impl Executor for BlockingExecutor {
  fn spawn<F>(&mut self, mut f: F) where F: Future<Item=()> {
    loop {
      if let Poll::Ready(_) = f.poll() {
        break;
      }
    }
  }

  fn wait(&mut self) {}
}

Your task is to implement two executors:

  1. SingleThreadExecutor: single-threaded executor that permits concurrent work on one thread (e.g. through asynchronous I/O). A single-threaded executor contains a list of futures that it needs to make progress on:

    pub struct SingleThreadExecutor {
      futures: Vec<Box<Future<Item = ()>>>,
    }
    

    When a future is spawned on the single-threaded executor, you must call poll exactly once to start the future (making sure to discard the future if it immediately returns Ready). Otherwise, most of the work is done in wait to loop through all the futures and poll them to completion.

    Note: Box<Future<..>> is a use of trait objects which enable a single vector to contain potentially multiple different kinds of futurues.

  2. MultiThreadExecutor: a multi-threaded executor that splits future execution across multiple threads. Your implementation should follow this protocol:

    • The main thread creates an instance of a MultiThreadExecutor, which creates a pool of threads (see thread::spawn) and a shared communication channel to send work (see mpsc::channel). This is the inverse situation of mpsc (single producer, multiple consumer), so you will need to use smart pointers so each thread can access the same single receiver.
    • Each thread in the pool should create a SingleThreadExecutor, and pull work from the channel to spawn into its local executor.
    • Spawning a future on the MultiThreadExecutor should send the future over the channel.
    • Calling wait on the MultiThreadExecutor should send a signal to each thread that they each should wait on their SingleThreadExecutor, and then exit afterwards. The MultiThreadExecutor should wait until all threads have exited (see ThreadHandle::join).

3. Async I/O (10%)

Lastly, to get a sense of how to apply these futures to actual asynchronous tasks, you are going to implement a simple asynchronous file read future. Specifically, you have the following interface:

struct FileReader { .. }

impl FileReader {
  pub fn new(path: PathBuf) -> FileReader { .. }
}

impl Future for FileReader {
  type Item = io::Result<String>;

  fn poll(&mut self) -> Poll<Self::Item> { .. }
}

The file reader should, on the first poll, launch a thread to begin reading the provided PathBuf into a string (see: fs::read_to_string). The future must not block on this thread. To accomplish this, you can use an AtomicBool to act as a thread-safe flag (see AtomicBool::load and AtomicBool::store). When polled, the future should check the flag, returning Poll::NotReady if the the thread is not finished. When the thread is finished, it should set the flag to true, and return the value, and the future can access the value using JoinHandle::join.

In src/asyncio.rs, implement the FileReader::new and Future::poll functions. We have provided you a type definition for FileReader as a hint for how to implement your future.

Testing

To test your code, we have provided you a few tests in the tests folder, which you can execute using cargo test.

Submission

Run ./make_submission.sh and upload the generated assign6.zip to Gradescope.