Solving a parallel programming problem with a list homomorphism

Posted on January 27, 2023 by Troels Henriksen

One programming task I greatly enjoy is solving problems by devising new reduction operators. Futhark is slightly unusual among parallel languages as it allows arbitrary reduction operators instead of predefining just a few common cases (e.g. sum, product, maximum). From time to time it is nice to be reminded that it is worth the bother to maintain this capability. However, most programmers do not have experience solving problems in this way, and don’t know how to go about it. In this post I will go through an example of how I analyse a simple problem and devise a map-reduce composition that solves it.

The problem we will look at was devised by Samuel Lampa and is called gccontent-benchmark. We are given a big file that looks something like this:

>Y dna_rm:chromosome chromosome:GRCh37:Y:2649521:59034049:1

Our task is to count the percentage of C/G characters relative to all C/G/A/T characters. Further, lines beginning with > should be skipped. Finally, these files can be very large and it should not be assumed that they will fit entirely in memory. All the existing implementations read it line by line, but I will admit up front that we are going to cheat and read more at a time for performance reasons. Also, we will slightly extend the problem and treat > at any position as a comment extending to the end of line.

At first glance, this problem looks ill suited for Futhark. IO and text processing is not what the language was made for. On second glance, this assessment remains correct. Futhark is most certainly not necessary or perhaps even useful here. But sometimes, life is about doing unnecessary or useless things because they are fun. In fact, I would say life ought mostly be about that.


Let us consider a file to be an array of ASCII characters. We want to compute a value of type

type count = {gc: i32, total: i32}

from which we can easily compute the percentage:

def pct (c: count) = f64.i32 c.gc / f64.i32 * 100

We will eventually need a function for adding together counts, as well as a zero count:

def count_add (x: count) (y: count) = {gc=x.gc+y.gc,}
def count0 : count = {gc=0,total=0}

To figure out how to parallelise something like this, I like to start by considering the case where the input is split up into two chunks, each chunk processed independently into a partial result called a summary, and these summaries then combined to form the final result. Intuitively, a summary contains the partial result for a contiguous chunk of the input (which as a special case can be the entire input). The main question is what these summaries should contain, and how they should be combined. (Later we’ll talk about how to compute the summaries in the first place.)

Suppose we define the summary as being just the count:

type summary = count

Unsurprisingly, this is not good enough. Consider the following input split in two at the blank space:


The summary for the first chunk is {gc=1,total=4}, because the two characters following the comment are ignored. The summary for the second chunk is {gc=2,total=5}, because it has no way of knowing that it is part of a comment begun in the first chunk. This kind of non-local information can only be propagated when we combine the summaries, but in this case that it is impossible. There is nothing in the summary that records that the chunk ends in a comment. We can fix that by adding more information to the summary type:

type summary = {c: count, comment: bool}

For the example above, the two chunks now have these summaries:

{c={gc=1,total=4}, comment=true}
{c={gc=2,total=5}, comment=false}

Now we can define a function for combining two summaries:

def redop (x: summary) (y: summary) =
  {c = if x.comment then x.c else x.c `add_count` y.c,
   comment = x.comment || y.comment}

This computes the correct summary for the example above: {c={gc=1,total=4}, comment=true}. Unfortunately, it is still not fully correct. The > comments are line comments as is proper and should extend only to the next newline. Ponder a new example, where we write newlines as \n:


The comment starting in the first chunk shouldn’t completely nullify the second chunk!

To handle this, we modify our summary to contain two counts:

Further, the summary should record whether the chunk contains a newline at all, so we can determine whether a comment starting in the left chunk extends to the end of the right chunk. The full type is as follows:

type summary = { befnl: count,
                 aftnl: count,
                 hasnl: bool,
                 comment: bool

For the most recent example we have these summaries:

{befnl={gc=1,total=4}, aftnl={gc=0,total=0}, hasnl=false, comment=true}
{befnl={gc=1,total=3}, aftnl={gc=1,total=4}, hasnl=true,  comment=false}

Out combination function is now:

def redop (x: summary) (y: summary) =
  let join = if x.comment then x.aftnl else x.aftnl `count_add` y.befnl
  in {befnl = if x.hasnl then x.befnl else x.befnl `count_add` join,
      aftnl = if x.hasnl then join `count_add` y.aftnl else y.aftnl,
      hasnl = x.hasnl || y.hasnl,
      comment = y.comment || x.comment && (!y.hasnl)}

Note how we use the hasnl field of the second chunk to decide whether to propagate the comment field of the first chunk.

By repeatedly applying redop we can combine many summaries rather than just two. If redop is associative (it is), we can do so in parallel with a parallel reduction.

We can now combine a bunch of summaries. How do we compute them in the first place? Intuitively, we need to split the input into chunks, compute a summary for each chunk, then combine them. It’s essentially divide-and-conquer. How about simply splitting as much as possible, into single-element chunks? Since we can combine any number of summaries, this should work fine. We can compute these summaries simply by maping this function over the input:

def mapop (b: u8) =
    {befnl = match b
             case 'G' -> {gc=1,total=1}
             case 'C' -> {gc=1,total=1}
             case 'A' -> {gc=0,total=1}
             case 'T' -> {gc=0,total=1}
             case _   ->  count0,
    aftnl = count0,
    hasnl = b == '\n',
    comment =   b == '>'

That means the entire thing ends up being a map-reduce composition, where the neutral element is the summary for an empty chunk:

def summary0 = {befnl=count0, aftnl=count0, hasnl=false, comment=false}

def gc (str: []u8) = reduce redop summary0 (map mapop str)

In the original problem description it was mentioned that the program should process single lines at a time, because the dataset may be too large to fit in memory. Now, individual lines are probably too small to be worth processing in parallel, but we can still abide by the spirit of the rule by letting the user pass in a summary of a preceding chunk:

entry gc_chunk (s: summary) (str: []u8) : summary = s `redop` gc str

A driver program can then read chunks of the input file of whatever size it wants and gradually feed them to the Futhark program through its C API. The driver program can be seen here. (It is not really very interesting.) I use a chunk size of 1MiB. Larger is better, but not substantially so for reasons we’re about to get to.


So how fast is it? Just like last time I did something like this, I must emphasise that Futhark is a poor fit for writing command line programs with short runtimes. Initialising the GPU easily takes hundreds of milliseconds, which is going to be a significant fraction of the total runtime.

Using the time(1) command to measure total process runtime, this hand-written C program takes 0.631s on my Ryzen 1700X. This is going to be our yardstick. The Futhark program, compiled to sequential CPU code, takes 1.114s - almost twice as long. Not great, but also not a bad showing for a high level language and an algorithm designed for maximum parallelism.

If we ask the Futhark compiler to generate parallel multicore CPU code, the runtime drops to 0.233s - not too bad!

If we compile the Futhark program for GPU execution, then my AMD RX 7900 takes 0.630s to run it - same as sequential execution! This is clearly terrible, and the problem is that GPU initialisation alone takes 400ms. This is not a problem for long running processes, but rather crippling here. Even if we ignore the initialisation time, the remaining 200ms are largely spent on ferrying data from main memory to the GPU, rather than doing actual computation. If we use Futhark’s own benchmarking methodology, which ignores startup costs and ensures the initial input data is already GPU-resident, we can measure that the actual computation time is a mere 0.023s.

Again, let me emphasise that Futhark is not well suited for this kind of IO-oriented programming. However, the core algorithmic problem turned out to be nicely expressible as a map-reduce composition, and I wanted to show how I go about constructing the necessary operators.

Some theory

The theory that links divide-and-conquer parallelism to map-reduce compositions is the theory of list homomorphisms. This problem, and most interesting instances of map-reduce, are known as near homomorphisms because they require us to carry around extra information, in addition to the desired result, in order for the reduction operator to work (e.g. the comment and hasnl fields). In this post I used an intuitive, informal, example-driven approach to derive the operator, but there is also a principled technique for deriving it from a specification. In practice, I find that the principled approach much too tedious to actually carry out when doing real programming (although I naturally still ask my students to do so in our course on data parallel programming).