Take 2: Threading in C++

Dennis J. McWherter, Jr. bio photo By Dennis J. McWherter, Jr. Comment

The last post I wrote about threading in C++ received some sharp criticism in certain circles. Trolling aside, many of these criticisms were valid. That said, the title itself may have been slightly misleading in the first place. The real goal of that post was– more or less– to introduce awareness of the updated STL (particularly related to threads). In any case, I would like to address some of the concerns raised in the last article. The examples are poor but they are simply demonstrative of using particular STL features. But rather than defend that particular post, I have decided to simply discuss some important concepts in multithreading here.

Threads

The most primitive (intra-process) and often most error-prone method of concurrency is to use threads. In short, threads allow a user to spawn parallel execution within a single process. Since the thread is spawned within the same process, they are incredibly light-weight because they share their memory space with the existing process. However, this shared state is also what tends to make threads tricky.

For instance, consider a simple increment of an integer. This is the typical “read-modify-update” problem. To increment an integer, it is actually three non-atomic operations. This means that when this is done in parallel, either of the threads could be operating on stale data. For this particular scenario, locks are a common solution. In general, you would like to avoid locks though (i.e. mutexes/semaphores) anywhere in your code. If you must use them, it’s best to keep those critical sections as short as possible.

Why are locks bad? The fundamental property of locks is that they block other threads of execution from entering a critical section. What this means is that when your threads are blocked on a mutex, it’s like they’re standing in line to checkout at the grocery store. What I mean is that if they’re waiting, they’re not doing any computation and you’ve just turned your multi-threaded process (i.e. supposedly parallel) into a serial process. This is in addition to all of the potential errors that can occur with locks such as deadlock, priority inversion, and even resource starvation.

Let’s define a problem. The last time I wrote about threads, I received flak for a poor producer-consumer example. Consequently, this seems like a perfect time to use the same example (since anything embarrassingly parallel is not actually interesting). This time, however, I will give it context. Suppose we are authoring a real-time audio processing application. Basically our producer will be the thread capturing audio while our consumer will be performing some processing on the captured signal.

Let’s analyze the problem. We know what we want to do; we want to perform some computation in real-time on a captured audio signal. Fundamentally, the producer is simply passing data to the consumer. The most obvious implementation of such a process is a shared queue of sample data. Likewise, we also know that locking to protect our data is bad and will inhibit our ability to process data in real-time. Enter lock- and wait-free algorithms. In summary:

  • Lock-free – The parallel algorithm guarantees global process (i.e. no deadlock), but individual thread starvation could still occur.
  • Wait-free – The parallel algorithm is guaranteed to complete in a finite number of steps. (i.e. lock-free without starvation).

Lock-free and wait-free algorithms are actually very complex, but some of the fundamental research about universality is actually quite interesting. But rather than go into all of the details here, I will cut to the chase with how we are going to implement our solution (if you’re interested in a more thorough introduction, see this post). We can use a wait-free (FIFO) ring buffer. Ring buffers are simply queues implemented as arrays that “wrap-around” (hence, ring or circular buffers) to fill data. This works well for us for a few reasons:

  1. We have only 1 producer, 1 consumer. This means we can use a wait-free implementation (remember, this is a stronger guarantee than simply lock-free). Particularly, we will be using boost’s spsc_queue (single-producer, single-consumer queue) implementation.
  2. We’re dealing with audio samples. In our application, we are just processing some audio for a fun application (i.e. not a mission-critical embedded system). That said, if overflow is detected, it is not much of a problem to drop that data on the floor. That is, audio sampling is already a lossy process (i.e. discretizing a continuous wave of sound) and we specify that some loss in our case can be tolerated.

At this point I think I have linked to enough references for you to figure out how the wait-free buffer is implemented and I will skip the details as this is one of the more straight-forward lock-/wait-free data structures to reason about. The quick summary is that you have a head and tail pointer which index into your array that both only move in a single direction. As a result, you can always determine when the queue is full or empty based on the relative positions of the pointers. The only other note I will make is that both boost and C++11 have capabilities for support controlling memory barriers which adds language support for enforcing memory ordering on certain operations (this is still lock-free).

Last time I really wanted to make use of and show the capabilities of pure C++11 when it comes to threading. However, rather than reinvent the wheel, I will be using boost’s spsc_queue implementation which I linked to above. To remain consistent, I will simply use boost’s facilities whenever possible. So let’s look at the code:

/**
 * audioprocessing.cpp
 */
#include <boost/atomic.hpp>
#include <boost/chrono.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/thread.hpp>

#include <iostream>

#include <cassert>
#include <cstdlib>

const size_t sample_rate = 44100;
const size_t process_rate = sample_rate / 10;
boost::atomic_bool quit(false);

// Our shared ringbuffer state
// 2s worth of samples to buffer
boost::lockfree::spsc_queue<float, boost::lockfree::capacity<sample_rate*2> > buffer;

// Audio capture thread
struct AudioCapture {
  void operator()() {
    while (!quit) {
      // Sample 44.1KHz (Hz is 1/s)
      boost::this_thread::sleep_for(boost::chrono::seconds(1));
      for (size_t i = 0 ; i < sample_rate ; ++i) {
        // We aren't really doing audio capture...
        // Generate a fake signal with no imaginary components
        if (!buffer.push(0.5f * static_cast<float>(rand()))) {
          std::cout << "Overflow detected." << std::endl;
          // Let consumer catch up (i.e. throw away our "current" samples)
          boost::this_thread::sleep_for(boost::chrono::milliseconds(300));
          break;
        }
      }
    }
  }
};

// Audio processing thread
struct AudioProcess {
  void operator()() {
    // Give our producer a chance to capture some audio
    boost::this_thread::sleep_for(boost::chrono::seconds(1));
    while (!quit) {
      size_t ctr = 0;
      // We only need to process a certain number of samples each time (i.e. process_rate in number)
      for (size_t i = 0 ; i < process_rate &amp;&amp; ctr < process_rate ;) {
        // Only increment if we grabbed a sample-- we're only processing on the full stream
        // We don't care about underrun since we can only process as fast as we have
        // data
        if (buffer.pop(samples[i])) {
          i++;
        } else {
          ctr++; // If we haven't seen any data recently, we want to check if our producer died (i.e. should we keep running)
        }
      }
      if (ctr == process_rate) {
        std::cout << "Could not retrieve any samples in previous loop (checking if we should exit)..." << std::endl;
      } else if (ctr > 0) {
        std::cout << "Buffer underrun detected." << std::endl;
      }
      // NOTE: In practice you would want to move the result to yet another thread
      //   i.e. either the main thread or a dedicated UI thread to render the result
      //   of the computation in some way.
      process();
      std::cout << "Fake processing... samples[0] == " << samples[0] << std::endl;
    }
  }

  void process() {
    // Should have an FFT library and probably add a filter or do
    // something interesting with the signal here
    // Simulate 100ms of processing by sleeping
    boost::this_thread::sleep_for(boost::chrono::milliseconds(98));
  }

private:
  float samples[process_rate];
};

int main() {
  // Assure ourselves that our impl is really lock-free
  assert (buffer.is_lock_free() &amp;&amp; quit.is_lock_free());

  // Random sampling to generate some bizarre signal
  srand(time(NULL));

  // Spawn our producer
  AudioCapture p;
  boost::thread producer(p);

  // Spawn our consumer
  AudioProcess c;
  boost::thread consumer(c);

  // Consume the data until we receive input from the user
  char input;
  std::cin >> input;

  quit = true;

  producer.join();
  consumer.join();

  return 0;
}

This code is mostly straight-forward with the comments filling in some blanks. Since all of the really difficult wait-free stuff is hidden from us by boost, the only real thing to explain here is the overall design. Basically, we startup our two threads (audio capture and processing) and allow the capture thread to begin buffering some data before we start our processing. Our processing thread then begins processing data. Notice that I have tuned our production and consumption rates with our sleep calls (in lieu of performing and estimating time for a real computation); this enables us to process continuously. Assuming these rates are similar, this is the true benefit of wait-free algorithms. In a real system, however, you may occasionally hit buffer underrun or overflow depending on the state and load of your device. This brings us to one of the problems with our wait-free approach. Since wait-free is always running, you cannot block on data if you have overflow or underrun. As a result, if the producer and consumer get out of sync you have three apparent options:

  1. Busy wait. This is often the wrong option, but only you know your system. Similarly, for certain high-performance applications this may be the best option if at any time you know the waiting period will be small. To do this you can simply operate as normal and ignore that any failure has occurred. For instance, in the case of overflow you will keep trying to push data onto the queue and you will continually be wasting effort since your request will keep being rejected until the consumer consumes a sufficient amount.
  2. Block without data dependence. Another solution is to temporarily block for a specified amount of time relative to the other thread’s rate (i.e. producer waits for consumer and vice versa) which is the solution we have implemented here. Again, consider we detect an overflow. We can drop the current set of samples on the floor and simply allow the consumer a few milliseconds to catch up. To select an appropriate amount of time, you will want to consider how large your queue size is and how fast your consumer typically pulls data from it.
  3. Overwrite existing data. Depending on what data you want to drop on the floor, you can actually just overwrite the old data in favor of processing the new data. See this StackOverflow answer with an idea on how to do it.

In going with option two, our producer algorithm now depends on the consumer to do anything interesting (i.e. it must process samples). That said, it does still have the property that it will always complete in a finite amount of time (i.e. exit when told) even if the consumer stops consuming. However, note that sleep is a blocking operation which makes this approach somewhat unclean. More or less, the sleep acts like a bounded-wait lock which is calibrated for code executing on a particular device (think embedded device programming) which diminishes the lock-free quality of the entire algorithm even though our data structure is still wait-free. I implemented it this way primarily because I found it more interesting than the busy waiting case which is the obvious solution to restoring lock-free guarantees to our algorithm. You can see a simple example of lock-free usage of the spsc_queue here.

As you can see, wait-free algorithms still have an inherent cost. If you want to avoid excessive busy waiting you need some form of synchronization between threads to correct any issues if the threads can ever become out-of-sync. After running this example on my dual-core MBP, underflow only occurs at startup and overflow very rarely occurs. That said, we obviously lose a minimum 1.3 seconds worth of audio samples each time we detect overflow (1 second for the currently captured samples, 300ms to allow the consumer to catch up).

Another possible option is a hybrid. It is possible that in some systems, the right thing to do would be to block the producer on a condition variable that is signaled by the consumer when the size of the buffer reaches a particular value. However, the algorithm itself is no longer lock-free at this point even though the buffer still is.

NOTE: If this feels like a lot of work to move your data around, you may be right. I must admit that most audio processing I have done is actually implemented via callbacks scheduled on independent threads. In any case, I wanted to provide a semi-interesting use-case example to give ourselves a common task to solve here. Likewise, it is conceivable that you would want to use a similar technique to communicate results to your UI/drawing thread (say, for a real-time plot).

Some final thoughts are that lock-free and wait-free solutions may not always be feasible. That is, a wait-free solution may very well not yet have been discovered (if it exists) for your problem, and naive lock-free solutions can end-up being slower in practice than their lock-based counterparts. Similarly, lock-free algorithms are typically more complex and, depending on your system, can lead to detrimental starvation depending on data access patterns (see this post).

Where do I go from here?

The example I have presented is a very specific case study on how you can take data and concurrently operate on it. As you could guess, there are plenty of problems that lend themselves well to using threads that are, in fact, quite different than the scenario which I have just described. If you are such a curious reader, I point your attention to explore concurrency patterns. Just like there are software design patterns for construction, such patterns also exist for trying to use threads properly based on your problem.

I just want to close with drawing your attention to the fact that this post deals solely with threads. I plan to discuss other concepts more in-depth in the future such as Futures and Promises, Actors (i.e. everyone has separate state updated via message passing), and perhaps even Software Transactional Memory (STM). Since concurrency is such an important piece of computing nowadays, all developers should at least be aware of all the different techniques for concurrency available. Similarly, we all should understand the distinctions and trade-offs between them so we can decide which approach is best fit for our particular problem.

comments powered by Disqus