briantheprogrammer

Home of the O2 Programming Language

The Bounded Buffer Problem In O2

Posted by Brian Andersen on April 26, 2012

A few weeks back I did a post on solving the Producer-Consumer Problem. In that post I introduced the dispatch operator which takes a vector of symbols in its left argument ($L) and a number in its right argument ($R). It either returns $R rows of data for each symbol in $L or it suspends the calling fiber until it can return all of the data requested and no more. And it does this in such a way that no other fiber will read the same rows again using dispatch. In other words dispatch allows you to treat the O2 blackboard like a fifo queue. Because O2′s fibers are not based on operating system threads, the number you can create is limited only by available memory to store the current stack for a fiber. I also used the dispatch operator in some fancier scenarios in my posts on The Cigarette Smoker’s Problem and the Building H2O Problem.

When I solved the Producer-Consumer Problem I only dealt with the unbounded version. In the unbounded version of the problem producers may create a queue whose size is only limited by available memory. But if items in the queue represent work to be done, it may be the case that we want to control the rate at which new work can be requested, or avoid creating too big of a list of unfinished tasks. The Bounded-Buffer Problem deals with this situation. In the Bounded-Buffer problem producers are asked to check the queue before writing and if the queue is too big, suspend until it has shrunk. You might think we could use read for this but there are two problems:

  • read doesn’t know which rows have been consumed via dispatch, you tell it exactly where to read from.
  • read will only help you suspend until something is written to the blackboard, not until something is read from it.

To solve this problem in an appealing way I had to introduce a new operator; throttle, into the language.

Introducing Throttle

throttle takes the exact same arguments as dispatch, and interprets them in the same way as dispatch with one critical exception. If dispatch would yield, then throttle will suspend. And if dispatch would suspend, throttle will yield. throttle does return not the entire result set, just a number indicating the last line of the blackboard that would have been consumed. When the blackboard reaches a state where dispatch will not yield, then throttle will yield. It might sound complicated but when you see it in action it is actually pretty straightforward.

The Bounded Buffer Solution

This program is going to look almost exactly the same as the Producer-Consumer example with one slight twist.

{
  consumer:
  {
    item=#item dispatch 1l
    =cwrite (string $R) + " consuming item " + string $item.i
    =sleep randoml 1 0 3000l
    <-consumer $R
  }

The consumer fiber should be pretty clear, it uses dispatch to remove one line under the symbol #item. In the Producer-Consumer post I used incrementing symbols like #item,0l #item,1l. In this example everything will be written using the symbol #item. This way the output is easy to follow when there are multiple producers. All of my Producer-Consumer examples will work for multiple producers and consumers. Then we sleep for a random period of time and repeat.

  producer:
  {
    =sleep randoml 1 0 500l

The producer fiber sleeps at the beginning of its lap to simulate the time cost of production.

    item=#throttle take
    {
      =#item throttle 10l

Now we see throttle in action. Before writing to the blackboard, the producer fiber calls throttle with the symbol #item and the (long) number 10l. This achieves the following: If dispatch could return 10l #items from the blackboard then throttle will suspend until this is no longer the case. If dispatch would suspend because there are less than 10l unconsumed items, then throttle will yield immediately. Bounded buffer problem solved. Also note that I am using a take block around this section. This means that only one producer fiber will be in this section at one time. The reason for the reduced concurrency here is that throttle will release all waiting fibers as soon as dispatch could return. If there are multiple producers waiting they will all jump ahead and write at once. The result is that you could end up with more than 10 items in the queue. If you are willing to run the risk of having up to 13 (10 + 3 producer fibers) unconsumed items in the queue then there is no harm in getting rid of this take block. But I am going to be pedantic here; we want a max of 10 items in the queue.

      item=#item write {i=++}
      <-#item read ($item - 1l) append 1l
    }

Now write to the blackboard. Use my new IncrOp data type to increment the variable $i which is just a sequence number for the #item queue. I introduced the IncrOp in my post on the Building H2O Problem. Then read from the blackboard to find out what $i was. I think I am going to change write so that it returns the actual data that was written. But at the moment it returns the count of the blackboard up to the point of the write. When you call read with two numbers on the right the first one is the line number to start at and the second one is the number of items you want to read per symbol, like dispatch.

    =cwrite (string $R) + " produced item " + string $item.i
    <-producer $R
  }

Write the item number out the the console so that we can see what is happening.

  p0=fiber {<-producer #p,0l}
  p1=fiber {<-producer #p,1l}
  p2=fiber {<-producer #p,2l}
  c0=fiber {<-consumer #c,0l}
  c1=fiber {<-consumer #c,1l}
  c2=fiber {<-consumer #c,2l}
  <-wait $p0
}

Start up three producer fibers and three consumer fibers and go. Here is the complete program.

{
  consumer:
  {
    item=#item dispatch 1l
    =cwrite (string $R) + " consuming item " + string $item.i
    =sleep randoml 1 0 3000l
    <-consumer $R
  }
  producer:
  {
    =sleep randoml 1 0 500l
    item=#throttle take
    {
      =#item throttle 10l
      item=#item write {i=++}
      <-#item read ($item - 1l) append 1l
    }
    =cwrite (string $R) + " produced item " + string $item.i
    <-producer $R
  }
  p0=fiber {<-producer #p,0l}
  p1=fiber {<-producer #p,1l}
  p2=fiber {<-producer #p,2l}
  c0=fiber {<-consumer #c,0l}
  c1=fiber {<-consumer #c,1l}
  c2=fiber {<-consumer #c,2l}
  <-wait $p0
}

Now let's look at some output.

Output of the Bounded-Buffer Solution

Welcome to O2, ask brian if you need help
O2>eval fload "../../src/bounded.o2"
#c,0l consuming item 0
#p,1l produced item 0
#c,1l consuming item 1
#p,2l produced item 1
#c,2l consuming item 2
#p,0l produced item 2
#c,1l consuming item 3
#p,1l produced item 3
#p,0l produced item 4
#p,2l produced item 5
#p,2l produced item 6
#p,1l produced item 7
#p,1l produced item 8
#c,0l consuming item 4
#p,0l produced item 9
#p,2l produced item 10
#p,1l produced item 11
#p,0l produced item 12
#p,1l produced item 13
#p,1l produced item 14
#c,1l consuming item 5
#p,2l produced item 15
#c,2l consuming item 6
#p,0l produced item 16
#c,2l consuming item 7
#p,1l produced item 17
#c,1l consuming item 8
#p,2l produced item 18
#c,0l consuming item 9
#p,1l produced item 19
#c,0l consuming item 10
#p,0l produced item 20
#c,1l consuming item 11
#p,2l produced item 21
#c,2l consuming item 12
#p,1l produced item 22
#c,2l consuming item 13
#p,0l produced item 23
#c,1l consuming item 14
#p,2l produced item 24
#c,0l consuming item 15
#p,1l produced item 25
#c,0l consuming item 16
#p,0l produced item 26
#c,1l consuming item 17

Since the producer fibers produce items at a rate of 250 ms average (500l/2l) and the consumers consume items at a much slower rate of 1500 ms, we quickly see a queue develop, after which all producers are eagerly lined up to pass the barrier created by the throttle operator. Every time a consumer removes an item, the producer immediately adds a new item into the queue. You can also see in the output that the console printing is sometimes out of order. For example item 0l appears to be consumed before it was produced. This is just random variation in the console output and not a problem with the program itself. One way to fix this might be to drive the console output from a separate fiber reading records from the blackboard in the same order they are written. I may explore this in a future example.

A Word On the O2 Design Process

The first time I developed a programming language I rushed into trying to build real software using it, hoping that the process would force me to see all of the issues with it. It didn't work very well. In the end the application code was very unappealing, and the base of libraries that I created to serve the application were brittle and didn't work very well outside the application they were created for. It also took a very long time. And to make matters worse, it became very hard to change the language after I started depending on applications written in it.

This time I am taking a different approach. I decided to build up collections of example programs revolving around a particular theme, making certain that the resulting programs were good programs according to my taste, which mostly means short and easy to follow. At this point I have done eight (out of twelve) examples of classic concurrency problems in O2. On the surface it might seem like I had to introduce a lot of operators, but when I review them I feel pretty happy with the results. You read and write from the blackboard with read and write. When you want to treat the blackboard as a queue you use dispatch instead of read. When you need a critical section you use take. throttle and peek (to be introduced shortly) are just variants on the same conceptual and algorithmic plumbling as dispatch. While not as mathematically elegant as Djikstra's P() and V() operators, I think people will find O2's concurrency facilities much easier to use and understand.

Leave a Comment

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>