briantheprogrammer

Home of the O2 Programming Language

A Publish-Subscribe Server In O2

Posted by Brian Andersen on August 29, 2012

Today’s post is the third part in my new series on distributed computing applications in O2. In the first post I demonstrated O2′s messaging operators using a Simple Echo Server. In the second post I demonstrated how to use O2′s messaging facility to build a basic point-to-point Chat Server. In this post I will take these ideas one step further and build a Publish-Subscribe server.

A Publish-Subscribe system is a little like the chat system but with an extra level of indirection. Like the chat model, clients may publish messages to a central server, but rather than explicitly specifying who the recipient is they direct their message at a particular “topic”. Then, other clients can subscribe on particular topics. This model is analogous to a chat room or news service. The code for the Publish-Subscribe server will be a little more complicated than the chat server, but not excessively so.

The Code

{
  server:{
    publish:{

We begin with the definition of the server, which will be a bot, and publish, which will be a fiber running inside the server bot.

      message:$L accept 1l
      :$message.body.verb switch {

The publish fiber accepts one message at a time from the left argument which is an integer handle to a server connection created with the listen operator. This is following the same pattern as my chat demo. The server accepts two types of messages indicated using the “verb” variable on the message body. The two allowed verbs are #publish and #listen.

        publish: {
          start: (#publish + $message.body.topic) write eval
            {i:++ text:$message.body.text}

The handler for the verb #publish will write the message that was published into the blackboard under the topic specified in the message body.

          : ((#listen + $message.body.topic) peek 0l) switch {

Now we need to check to see if there are any listeners waiting for messages on the topic that that has just been published. If there are, we must deliver the messages to the waiting clients. Now comes the part that is a little bit tricky to implement using the blackboard.

            :{ listeners:(#listen + $message.body.topic) gawk 0l
               : ($listeners.S part 2 3 4l) reply eval
                 {topic:$message.body.topic start:$start messages:$message.body.text}
               all:(#ids + $listeners.S part 2 3 4l) dispatch 0l
               topics:(#listen + $all.S part 4 1 2 3l) dispatch 0l
             }

A few quick points will help me to explain this section.

  • The #listen messages that arrive from clients may contain multiple topics.
  • Likewise, the resulting content messages that are delivered may contain multiple messages spanning one or more topics.
  • To keep matters simple and robust, I want each listen request to get exactly one response, but the response may be delayed until appropriate content arrives.
  • You can dispatch records from the blackboard using prefixes. For example #x dispatch 1l will dispatch 1 record even if the the symbol is #x,a #x,b or #x,c
  • But you cannot dispatch records using suffixes. For example if the blackboard contains symbols #a,x #b,x #c,x there is no way to dispatch all the records with symbols ending in “x”.

So for each listen request we have multiple topics and a single id. When a request comes in we need to write multiple entries into the blackboard. There will be one entry for combination of id and topic and a second set of entries for each combination of topic and ids. This is so that you can fetch all the topics for an id, or all the ids for a given topic with equal ease. So in this section we first find all the listeners for the topic on the incoming message. Those listeners get a reply. Then we need to dispatch those requests and all the other records having the same topic. Otherwise the server would later try to reply to the same request multiple times. The part operator is to rearrange the individual elements in a vector of symbols.

          }
          :$message.id reply {ack=true}
        }
        listen:{

Reply back to the publish request. This is a good practice so the client can know that their request was accepted by the server. Now let’s look at the listen fiber in the server.

          messages:(#publish + $message.body.topic) poll $message.body.start

Poll is a new, special operator. It is the only operator for accessing the blackboard that will return an empty set of results, and will never suspend. Instead of returning an empty set, read and dispatch will wait until they have has a result that meets the specified constraints. Poll however, will always yield immediately and will return an empty series if there are no matching rows available. But there is more. read let’s you specify a line in the blackboard to start reading from and the number of rows per symbol you want. Poll lets you specify a different starting row for each and every symbol! This is how clients of the Publish-Subscribe server can make sure they get every message for their chosen topics, and get each message only once. They maintain a list of starting points for each topic.

          : (0l < count $messages) switch {
            :$message.id reply eval
              {topic:$message.body.topic start:lines $messages messages:$messages.text}

If poll returns at least one message (ie there is at least one message matching the client's subscription) the server replies back to that request.

            :{  : (#listen + $message.body.topic + $message.id) write eval {
                  i: (count $message.body.topic) repeat ++
                  start:$message.body.start
                }
                : (#ids + $message.id + $message.body.topic) write eval {
                  i: (count $message.body.topic) repeat ++
                }
             }

Otherwise the request goes into a holding queue. This is that part where we have to write records for all topics by id and all ids by topic.

          }
        }
      }
      <-$L publish $R
    }
    :fiber {<-(listen $L) publish 0l}
    <-wait 0l
  }

Then we finish off the publish fiber and the server bot in the usual fashion. Now to discuss the client bot.

  client:{
    outbox:{
      topic:0l from shuffle $R.pub
      text:"This is a message about " + string $topic
      :cwrite "(" + (format $R.pub) + ") SEND:" + $text
      :receive $L send eval {verb:#publish topic:$topic text:$text}
      <-$L outbox $R
    }

This is the client outbox fiber. The outbox is a loop that publishes a message on a randomly selected topic on each turn. The topics are selected from a list supplied in the right argument of the outbox operator.

    inbox:{
      starts:$R last 0l
      sub:eval {verb:#listen topic:$starts.topic start:$starts.start}
      message:receive $L send $sub
      :cwrite "(" + (format $R) + ") RECV:" + $message.body.messages
      :$message.body.topic write eval {
        topic:$message.body.topic start:(count $message.body.topic) repeat $message.body.start}
      <-$L inbox $R
    }

The inbox sends requests to the server in a loop. Each request specifies a list of topics to subscribe and also a list of starting points which indicate how far back in time the server should look for events with that topic. The responses sent by the server always provide a line number so that clients can be sure to get all of the messages. In this example I wrote that number into the blackboard between iterations. I could have passed the line numbers as a parameter to the inbox operator as well.

    :fiber {
      :$R.sub write eval {topic:$R.sub start:(count $R.sub) repeat 0l}
      <-(open $L) inbox $R.sub
    }

At the end of the client bot definition we launch the inbox fiber...

    :fiber {<-(open $L) outbox $R}
    <-wait 0l
  }

And the outbox fiber.

  :bot {<-#tcp,9090l server 0l}
  :bot {<-#tcp,localhost,9090l client {pub:#sports #weather sub:#news #weather}}
  :bot {<-#tcp,localhost,9090l client {pub:#news #sports sub:#sports #weather}}
  :bot {<-#tcp,localhost,9090l client {pub:#weather #news sub:#sports #news}}
  <-wait 0l
}

At the top level start the server bot and three client bots. Each client is configured to publish on two topics and to subscribe on two different topics. Here is the entire program.

{
  server:{
    publish:{
      message:$L accept 1l
      :$message.body.verb switch {
        publish: {
          start:(#publish + $message.body.topic) write eval
            {i:++ text:$message.body.text}
          : ((#listen + $message.body.topic) peek 0l) switch {
            :{ listeners:(#listen + $message.body.topic) gawk 0l
               : ($listeners.S part 2 3 4l) reply eval
                 {topic:$message.body.topic start:$start messages:$message.body.text}
               all:(#ids + $listeners.S part 2 3 4l) dispatch 0l
               topics:(#listen + $all.S part 4 1 2 3l) dispatch 0l
             }
          }
          :$message.id reply {ack=true}
        }
        listen:{
          messages:(#publish + $message.body.topic) poll $message.body.start
          : (0l < count $messages) switch {
            :$message.id reply eval
              {topic:$message.body.topic start:lines $messages messages:$messages.text}
            :{  : (#listen + $message.body.topic + $message.id) write eval {
                  i:(count $message.body.topic) repeat ++
                  start:$message.body.start
                }
                : (#ids + $message.id + $message.body.topic) write eval {
                  i:(count $message.body.topic) repeat ++
                }
             }
          }
        }
      }
      <-$L publish $R
    }
    :fiber {<-(listen $L) publish 0l}
    <-wait 0l
  }
  client:{
    outbox:{
      topic:0l from shuffle $R.pub
      text:"This is a message about " + string $topic
      :cwrite "(" + (format $R.pub) + ") SEND:" + $text
      :receive $L send eval {verb:#publish topic:$topic text:$text}
      <-$L outbox $R
    }
    inbox:{
      starts:$R last 0l
      sub:eval {verb:#listen topic:$starts.topic start:$starts.start}
      message:first receive $L send $sub
      :cwrite "(" + (format $R) + ") RECV:" + $message.body.messages
      :$message.body.topic write eval {
        topic:$message.body.topic start:(count $message.body.topic) repeat $message.body.start}
      <-$L inbox $R
    }
    :fiber {
      :$R.sub write eval {topic:$R.sub start:(count $R.sub) repeat 0l}
      <-(open $L) inbox $R.sub
    }
    :fiber {<-(open $L) outbox $R}
    <-wait 0l
  }
  :bot {<-#tcp,9090l server 0l}
  :bot {<-#tcp,localhost,9090l client {pub:#sports #weather sub:#news #weather}}
  :bot {<-#tcp,localhost,9090l client {pub:#news #sports sub:#sports #weather}}
  :bot {<-#tcp,localhost,9090l client {pub:#weather #news sub:#sports #news}}
  <-wait 0l
}

As you can see this is one of the bigger O2 programs so far. But it is still under 70 lines incorporating both the client and server code.

The Output

Welcome to O2, ask brian if you need help
O2>eval fload "../../demo/messaging/publish.o2"
(#sports #weather) SEND:This is a message about #weather
(#news #sports) SEND:This is a message about #news
(#weather #news) SEND:This is a message about #weather
(#news #weather) RECV:This is a message about #weather
(#sports #weather) RECV:This is a message about #weather
(#sports #weather) SEND:This is a message about #weather
(#sports #news) RECV:This is a message about #news
(#news #sports) SEND:This is a message about #news
(#weather #news) SEND:This is a message about #weather
(#sports #weather) SEND:This is a message about #weather
(#news #weather) RECV:This is a message about #news
(#news #weather) RECV:This is a message about #weather
(#sports #weather) RECV:This is a message about #weather
(#news #sports) SEND:This is a message about #news
(#sports #news) RECV:This is a message about #news
(#weather #news) SEND:This is a message about #news
(#sports #weather) SEND:This is a message about #weather
(#news #weather) RECV:This is a message about #news
(#news #weather) RECV:This is a message about #weather
(#sports #weather) RECV:This is a message about #weather
(#news #sports) SEND:This is a message about #news
(#sports #news) RECV:This is a message about #news
(#weather #news) SEND:This is a message about #news
(#news #weather) RECV:This is a message about #news
(#news #weather) RECV:This is a message about #weather
(#sports #weather) SEND:This is a message about #weather
(#sports #weather) RECV:This is a message about #weather
(#news #sports) SEND:This is a message about #news
(#sports #news) RECV:This is a message about #news
(#weather #news) SEND:This is a message about #news
(#sports #weather) SEND:This is a message about #weather
(#news #weather) RECV:This is a message about #news
(#news #weather) RECV:This is a message about #weather
(#sports #weather) RECV:This is a message about #weather

In this example I used the subscription to identify the client rather than my usual practice of assigning a number. This way it is easy to see that the messages are being delivered to the correct client and each message is delivered exactly once.

Posted in Examples | Tagged: | Leave a Comment »

A Chat Server In O2

Posted by Brian Andersen on June 19, 2012

In my prior post I introduced O2′s basic operators for distributed computing using the example of a simple tcp Echo Server. The echo server received messages from connected clients and spat those same messages back to the same clients. Now it’s time to use those same operators to build something slightly harder. Today’s demo is going to be a point-to-point chat server. Like the echo server, the chat server will accept client connections followed by messages from each connected client. But instead of responding back to the same client that sent the message, in the chat demo the messages are addressed specifically to another connected client. So messages travel from one client to a different client rather than back to the same one that sent the message.

This example is slightly more complex than the echo server. In the echo demo the client bots only used a single fiber to send requests and get responses back. Today’s client will use two fibers, one to send messages (the outbox) and another to receive messages asynchronously from other clients (the inbox). The chat server will still use a single fiber, but it will use the blackboard in order to keep track of state between requests. So in this demo you will see how the concurrency operators from the prior series and the new messaging operators can work together.

The Code

{
  server:
  { deliver:
    { waiter:(#wait #chat + $R) dispatch 1l
      :$waiter.id reply eval {to:$waiter.to from:$waiter.from text:$waiter.text}
    }

Let’s start with the definition of the server bot. The server requires a subroutine that will be invoked in two places. We will call this operator “deliver”. There are going to be two types of messages added to the blackboard. #wait messages indicate that a client is waiting for the next message to be delivered via its inbox, as discussed above. #chat messages are produced by a client outbox and are waiting to be delivered to a different client. In order to deliver a message we need for there to be two events queued in the blackboard, a #wait message and a #chat message. Every #wait event needs to be paired with exactly one chat event and vice-versa. We use dispatch to pair the two requests and to ensure that none of the requests are processed multiple times. Then we reply to the id which will have been previously recorded by the waiter when it sent it’s #wait request. All of this should become more concrete as I continue.

    chat:
    { message:$L accept 1l

This is the main loop in which the server processes both #wait and #chat events. Accept one request at a time from the listening server.

      :$message.body.verb switch
      { chat:
        { : (#chat + $message.body.to) write eval
            {i:++ to:$message.body.to from:$message.body.from text:$message.body.text}
          : ((#wait + $message.body.to) peek 1l) switch {:deliver $message.body.to}
          :$message.id reply {ack=true}
        }

Each message body contains a “verb” which is either #wait or #chat depending on whether it is sent from the inbox or the outbox respectively. In this section we handle the #chat requests. First, write a line in the blackboard whose symbol indicates that there is a chat message awaiting delivery “to” to a specific user. The to symbol is in both the body of the data and the symbol by which it is indexed in the blackboard. This is so that it can be easily dispatched when that user comes along to retrieve their message. Next, we check to see if the “to” user is already waiting using the peek operator. If they are already waiting we invoke the deliver operator as discussed above. The server also replies directly to the chat request with an ack message. It is a good practice for the server to ack all client requests and for the client to wait for those acks.

        wait:
        { : (#wait + $message.body.user) write eval {i:++ id:$message.id}
          : ((#chat + $message.body.user) peek 1l) switch {:deliver $message.body.user}
        }

Now let’s consider what happens when the server receives a #wait request from the outbox. Again the fact of the request is recorded in the blackboard, this time using the name of the user who is waiting for the request. The body of the blackboard data contains the id which will be later used to reply to the the #wait request. Following a similar pattern the the #chat block, use peek to see if there is already a message pending delivery and invoke the deliver operator if this is the case.

      }
      <-$L chat $R + 1l
    }

Reenter the chat operator using a tail recursion to loop forever.

    :fiber {<-(listen $L) chat 0l}
    <-wait 0l
  }

Now we are back in the main block of the server bot definition. This is where the chat fiber is launched. Use the wait 0l idiom to wait forever in the main fiber. This concludes the definition of the server bot.

  client:
  { outbox:
    { to:$R.others at randoml 1 0l append count $R.others
      text:"hey " + (string $to) + ", it's " + string $R.me
      :cwrite (string $R.me) + ":" + $text
      :receive $L send eval {verb:#chat to:$to from:$R.me text:$text}
      <-$L outbox $R
    }

This is the client code, beginning with the outbox that I have been promising all along. The client code is configured with a block in its right argument. The block has two variables $me which is the name for the current bot and $others is the possible list of recipients for chat messages. In the first line of the outbox the client selects another client to direct his message to. The selection is random. Then it sends the message using the the verb #chat to indicate to the server how this message should be processed, along with the to and from information. As discussed in the echo demonstration, the send operator yields a symbol which can be used as a token to the receive operator, which will then wait for a response to the given request. The outbox loops forever.

    inbox:
    { message:receive $L send eval {verb:#wait user:$R.me}
      :cwrite (string $R.me) + ": got a message from " + (string $message.body.from)
      <-$L inbox $R
    }

Finally the inbox. The inbox sends a message to the server with the verb #wait and the name of the user waiting. This request will remain outstanding until someone sends a #chat message directed at that user. The inbox, like the outbox, loops forever.

    :fiber {<-(open $L) inbox $R}
    :fiber {<-(open $L) outbox $R}
    <-wait 0l
  }

This is where the client bot launches it's two inbox and outbox fibers. In this case I created separate connections for the inbox and outbox fibers. But they could just as easily share a single connection.

  :bot {<-#tcp,9090l server #hub,0l}
  names:#client + symbol 0l to 4l
  :{ config:eval {me:$R others:$names at find $names != $R}
     <-bot {<-#tcp,localhost,9090l client $config}
   } each $names
  <-wait 0l
}

This last section of code configures and starts the client and server bots. The server bot is set to use the tcp protocol on port 9090. Each client bot gets a right argument that contains its name in the $me variable and a vector containing the names of the others in the $others variable. The main program fiber then waits forever while the bots chat with each other. Here is the complete program.

{
  server:
  { deliver:
    { waiter:(#wait #chat + $R) dispatch 1l
      :$waiter.id reply eval {to:$waiter.to from:$waiter.from text:$waiter.text}
    }
    chat:
    { message:$L accept 1l
      :$message.body.verb switch
      { chat:
        { : (#chat + $message.body.to) write eval
            {i:++ to:$message.body.to from:$message.body.from text:$message.body.text}
          : ((#wait + $message.body.to) peek 1l) switch {:deliver $message.body.to}
          :$message.id reply {ack=true}
        }
        wait:
        { : (#wait + $message.body.user) write eval {i:++ id:$message.id}
          : ((#chat + $message.body.user) peek 1l) switch {:deliver $message.body.user}
        }
      }
      <-$L chat $R + 1l
    }
    :fiber {<-(listen $L) chat 0l}
    <-wait 0l
  }
  client:
  { outbox:
    { to:$R.others at randoml 1 0l append count $R.others
      text:"hey " + (string $to) + ", it's " + string $R.me
      :cwrite (string $R.me) + ":" + $text
      :receive $L send eval {verb:#chat to:$to from:$R.me text:$text}
      <-$L outbox $R
    }
    inbox:
    { message:receive $L send eval {verb:#wait user:$R.me}
      :cwrite (string $R.me) + ": got a message from " + (string $message.body.from)
      <-$L inbox $R
    }
    :fiber {<-(open $L) inbox $R}
    :fiber {<-(open $L) outbox $R}
    <-wait 0l
  }
  :bot {<-#tcp,9090l server #server,0l}
  names:#client + symbol 0l to 4l
  :{ config:eval {me:$R others:$names at find $names != $R}
     <-bot {<-#tcp,localhost,9090l client $config}
   } each $names
  <-wait 0l
}

The Output

Let's look at some example output.

Welcome to O2, ask brian if you need help
O2>eval fload "../../demo/messaging/chat.o2"
#client,0l:hey #client,4l, it's #client,0l
#client,1l:hey #client,3l, it's #client,1l
#client,2l:hey #client,3l, it's #client,2l
#client,3l:hey #client,2l, it's #client,3l
#client,4l:hey #client,0l, it's #client,4l
#client,0l:hey #client,2l, it's #client,0l
#client,1l:hey #client,4l, it's #client,1l
#client,2l:hey #client,0l, it's #client,2l
#client,3l: got a message from #client,1l
#client,2l: got a message from #client,3l
#client,3l:hey #client,2l, it's #client,3l
#client,4l: got a message from #client,0l
#client,0l: got a message from #client,4l
#client,4l:hey #client,0l, it's #client,4l
#client,0l:hey #client,2l, it's #client,0l
#client,1l:hey #client,3l, it's #client,1l
#client,2l:hey #client,1l, it's #client,2l
#client,3l: got a message from #client,2l
#client,2l: got a message from #client,0l
#client,3l:hey #client,1l, it's #client,3l
#client,4l: got a message from #client,1l
#client,0l: got a message from #client,2l
#client,4l:hey #client,1l, it's #client,4l
#client,0l:hey #client,3l, it's #client,0l
#client,1l:hey #client,4l, it's #client,1l
#client,1l: got a message from #client,2l
#client,2l:hey #client,4l, it's #client,2l
#client,3l: got a message from #client,1l
#client,2l: got a message from #client,3l
#client,3l:hey #client,1l, it's #client,3l
#client,0l: got a message from #client,4l
#client,4l:hey #client,1l, it's #client,4l
#client,4l: got a message from #client,1l
#client,0l:hey #client,2l, it's #client,0l
#client,1l:hey #client,0l, it's #client,1l
#client,1l: got a message from #client,3l
#client,2l:hey #client,1l, it's #client,2l
#client,3l: got a message from #client,0l
#client,2l: got a message from #client,0l
#client,3l:hey #client,0l, it's #client,3l
#client,4l:hey #client,3l, it's #client,4l
#client,4l: got a message from #client,2l
#client,0l: got a message from #client,1l
#client,0l:hey #client,3l, it's #client,0l
#client,1l:hey #client,2l, it's #client,1l
#client,1l: got a message from #client,4l
#client,2l:hey #client,0l, it's #client,2l
#client,2l: got a message from #client,0l
#client,3l:hey #client,4l, it's #client,3l

Ok you get the idea. The server allows clients to send messages to directly to each other by name.

Discussion

In this post you got to see how bots can make productive use of multiple fibers and how the blackboard can be used in conjunction with O2's messaging facilities to create distributed data servers and clients. One aspect of this design that merits discussion is the way the client inbox continually requests messages from the server. In the past when I have developed messaging systems this way I have been accused of polling, a strategy which is considered undesirable in these types of systems. It's undesirable because polling is busy; clients keep checking for results and consuming resources repeatedly. Also, polling is slow because you might have received a message already but you won't see it until the next polling cycle. In this application I'm using more of a "long-polling" design where clients make a request once and then the requests can remain outstanding for any length of time. I dislike the term "long-polling" because it contains the word "polling" which to me means busy, and this is not a busy waiting system.

The alternative to this approach is one where the outbox would send a single request and that request could get multiple responses back. The server implements subscriptions that persist across multiple responses. I have implemented this a number of times and I do not like it at all. First, it is deceptively hard to ensure that your server subscription state remains coherent in the face of dropped connections, firewalls that time connections out and hung clients. A common remedy for these problems is to have the client send regular "keep-alive" messages which indicate to the server that the client still cares about the subscription. But a solution where the client sends a new request for each update is just as easy to implement AND robust as a polling solution AND just as real-time as a stateful subscription. Neither does it have the busy waiting problems of a classic polling architecture. The only case where this does not work is for extreme low latency systems where you cannot afford to wait for the message to travel from client to server to start processing the next request. But in these situations it is normally better to move your client closer to the source of the data anyway.

So that about covers the chat server. In my next post we will increase the complexity one more notch and do a pubsub server!

Posted in Examples | Tagged: | 1 Comment »

An Echo Server In O2

Posted by Brian Andersen on June 12, 2012

Hello everyone I am back after a brief posting hiatus. I have spent the last few weeks working on O2′s facility for distributed computing. In my last series on concurrency I demonstrated how O2 can be used to build highly concurrent programs that share state using O2′s blackboard. O2′s concurrency facility makes it possible to write programs that communicate with themselves. But what about programs that communicate with the outside world, especially other O2 programs? In the next series of posts I will demonstrate how O2 programs can communicate with other from a distance, that is, without use of a common blackboard.

Introducing Bots

The basic unit of distributed computation in O2 is a bot. The wikipedia article on bots lists some examples of how the term bot is used in computing:

  • Web crawler or Web spider, a computer program that does automated tasks
  • Internet bot, a computer program that does automated tasks
  • Chatterbot or Chat bot, a computer program that converses in natural language
  • Internet Relay Chat bot, a computer program connected to an Internet Relay Chat server as a user, but providing special services or performing special functions
  • Computer game bot, a computer-controlled player or opponent
  • Bots (edi), an open-source EDI software
  • BOTS, a computer game
  • Bot, a line of budget desktop PCs manufactured by Alienware

So basically, a bot is some program that sits on a computer network, sends and receives messages with other programs, and hopefully does some useful and beneficial task. In terms specific to O2, a bot is a collection of fibers all connected to a single blackboard. You can create multiple bots within a single instance of O2 running on a single computer, but they will not be able to communicate using the blackboard because each will have its own blackboard that is isolated from the others. Communication between bots needs to happen via other means.

Introducing Send and Receive, Accept and Reply, Listen and Open

In my concurrency programs I made heavy use of the operators read and write, along with their variants like dispatch, throttle, peek, and gawk. In a similar vein, I am creating a suite of operators for inter-bot messaging. It is based on a paradigm of asynchronous request and response. A bot can initiate communication with another bot using the send operator to send a message, and the receive operator to get a response back. Or it can wait for someone to send a message to it using the accept operator, and can then respond using the reply operator. This is an asynchronous paradigm because sending messages out is a separate operation from receiving responses. This affords more control to the programmer than the synchronous, Remote Procedure Call model where sending messages out and getting responses back is a single operation. For example, a single bot can fire off several requests to other bots and let them work in parallel while waiting for all the results to come back, rather than doing one request at a time. But O2′s innovative syntax allows you to keep the code concise regardless of which messaging pattern you want to follow.

In the example below you will see two other operators; listen and open which allow you to listen for client connections and open connections to a server. I will show you how all of these operators work together in the context of a simple echo client and server. The client sends out requests to the server and the server echo’s the messages back as it sees them.

The Code

{ server:

This is going to be the definition of the server bot. Creating a new bot is just as easy as creating a fiber.

  { echo:
    { message:$L accept 1l
      :$message.id reply eval {text:"ECHO:" + $message.body.text}
      <-$L echo $R
    }

As I mentioned above, a bot is just a collection of fibers tied to a single blackboard. So within the definition for a bot is where you will define and start fibers for that bot. This is the echo operator which will be launched as a new fiber within the server bot. The left argument to the echo operator is the integer handle to the server socket. So the accept operator waits for messages from the server. The message that is received is a block that has at least two children; an id which is a symbol that you can use to reply back to that message, and a body which is the content sent from the client. So echo uses the reply operator, passing the id on the left and the body of the reply on the right. At the end it calls itself in order to process the next message.

    :fiber {<-(listen $L) echo $R}
    <-wait 0l
  }

Whenever you define a bot, the initial fiber that is created for that bot will have the number zero. So in this bot we launch a new child fiber for the echo process, and wait forever for the initial fiber to finish. Notice that we call the listen operator using the left argument to the bot, which will be a symbol indicating the network protocol and port number to use. Now let's look at the definition of the client bot.

  client:
  { talk:
    { message:"hello? is anyone there?"
      :cwrite (string $R) + " SEND:" + $message
      response:receive $L send eval {text:$message}
      :cwrite (string $R) + " RECV:" + $response.body.text
      <-$L talk $R
    }

Following a similar pattern to the server. The client bot has one child fiber called "talk". It uses the send operator to send the message in a block to the server. In this case the left argument $L contains the integer handle of the client connection that will be setup before entering the talk fiber. On the other hand, receive waits for a response to the message id returned by the send operator. Once the response arrives, the talk fiber prints out the text of the response on the console and repeats.

    :fiber {<-(open $L) talk $R}
    <-wait 0l
  }

The client bot creates a child fiber for the talk operator and then waits forever in its main fiber, whose number is always 0l. open is used to create a connection to the server host and port configured in the left argument of the client bot. You will see how this works below.

  :bot {<-#tcp,localhost,9090l client #talk,0l}
  :bot {<-#tcp,localhost,9090l client #talk,1l}
  :bot {<-#tcp,localhost,9090l client #talk,2l}
  :bot {<-#tcp,9090l server #echo,0l}
  <-wait 0l
}

This is where we create the four bots, three clients and two servers. As you can see creating a bot is very much like creating a fiber. You just pass a block to the bot operator containing the code which you want the new bot to execute. The listen and open operators both take symbols which describe the network protocol to be used along with additional information such as the host and port number. In this case we are using the tcp interface. I am working on other interfaces for http, disk files, and so on. Here is the complete echo server program, client and server:

{
  server:
  { echo:
    { message:$L accept 1l
      :$message.id reply eval {text:"ECHO:" + $message.body.text}
      <-$L echo $R
    }
    :fiber {<-(listen $L) echo $R}
    <-wait 0l
  }
  client:
  { talk:
    { message:"hello? is anyone there?"
      :cwrite (string $R) + " SEND:" + $message
      response:receive $L send eval {text:$message}
      :cwrite (string $R) + " RECV:" + $response.body.text
      <-$L talk $R
    }
    :fiber {<-(open $L) talk $R}
    <-wait 0l
  }
  :bot {<-#tcp,localhost,9090l client #talk,0l}
  :bot {<-#tcp,localhost,9090l client #talk,1l}
  :bot {<-#tcp,localhost,9090l client #talk,2l}
  :bot {<-#tcp,9090l server #echo,0l}
  <-wait 0l
}

Example Output

Welcome to O2, ask brian if you need help
O2>eval fload "../../demo/messaging/echo.o2"
#talk,0l SEND:hello? is anyone there?
#talk,1l SEND:hello? is anyone there?
#talk,2l SEND:hello? is anyone there?
#talk,0l RECV:ECHO:hello? is anyone there?
#talk,1l RECV:ECHO:hello? is anyone there?
#talk,0l SEND:hello? is anyone there?
#talk,1l SEND:hello? is anyone there?
#talk,2l RECV:ECHO:hello? is anyone there?
#talk,2l SEND:hello? is anyone there?
#talk,0l RECV:ECHO:hello? is anyone there?
#talk,1l RECV:ECHO:hello? is anyone there?
#talk,0l SEND:hello? is anyone there?
#talk,1l SEND:hello? is anyone there?
#talk,2l RECV:ECHO:hello? is anyone there?
#talk,2l SEND:hello? is anyone there?
#talk,0l RECV:ECHO:hello? is anyone there?
#talk,0l SEND:hello? is anyone there?
#talk,1l RECV:ECHO:hello? is anyone there?
#talk,1l SEND:hello? is anyone there?
#talk,2l RECV:ECHO:hello? is anyone there?
#talk,2l SEND:hello? is anyone there?
#talk,0l RECV:ECHO:hello? is anyone there?
#talk,0l SEND:hello? is anyone there?
#talk,1l RECV:ECHO:hello? is anyone there?
#talk,1l SEND:hello? is anyone there?
#talk,2l RECV:ECHO:hello? is anyone there?
#talk,2l SEND:hello? is anyone there?
#talk,0l RECV:ECHO:hello? is anyone there?
#talk,1l RECV:ECHO:hello? is anyone there?
#talk,0l SEND:hello? is anyone there?
#talk,1l SEND:hello? is anyone there?
#talk,2l RECV:ECHO:hello? is anyone there?
#talk,2l SEND:hello? is anyone there?
#talk,0l RECV:ECHO:hello? is anyone there?
#talk,0l SEND:hello? is anyone there?
#talk,1l RECV:ECHO:hello? is anyone there?
#talk,1l SEND:hello? is anyone there?
#talk,2l RECV:ECHO:hello? is anyone there?
#talk,2l SEND:hello? is anyone there?
#talk,0l RECV:ECHO:hello? is anyone there?

Conclusion

In this demo we explored O2's distributed messaging operators and the basic unit of distributed computation; bots. An echo server is about the closest I can get to a "Hello World" of distributed computing. Using bots and the basic messaging operators introduced in this article, we can develop some sophisticated distributed applications.

Posted in Examples | Tagged: | 1 Comment »

The Room Party Problem In O2

Posted by Brian Andersen on May 21, 2012

Since this is the final post in my series of twelve concurrency problems in O2, I decided to finish with a celebration. The Room Party Problem is one of the tougher problems in Allen Downey’s Little Book of Semaphores. I owe the guy a thank you note. Here is the problem as he described it.

I wrote this problem while I was at Colby College. One semester there was a controversy over an allegation by a student that someone from the Dean of Students Office had searched his room in his absence. Although the allegation was public, the Dean of Students wasn’t able to comment on the case, so we never found out what really happened. I wrote this problem to tease a friend of mine, who was the Dean of Student Housing. The following synchronization constraints apply to students and the Dean of Students:

  • Any number of students can be in a room at the same time.
  • The Dean of Students can only enter a room if there are no students in the room (to conduct a search) or if there are more than 50 students in the room (to break up the party).
  • While the Dean of Students is in the room, no additional students may enter, but students may leave.
  • The Dean of Students may not leave the room until all students have left.
  • There is only one Dean of Students, so you do not have to enforce exclusion among multiple deans.

Puzzle: write synchronization code for students and for the Dean of Students that enforces all of these constraints.

Thankfully this solution doesn’t require any new operators or functionality that I haven’t discussed already.

The Code

{
  student:
  {
    =sleep randoml 1 0 3000l
    =#room take
    {
      =#room,dean throttle 1l
      =(#room + $R) write {i=++}
      =cwrite (string $R) + " entered the room"
    }

Begin with the definition for the student fiber. Randomize the students entry into the room. Use a take block for entering the room. There is no limit on the number of students in the room. The purpose of this take block is to ensure that the dean does not leave the room and then reenter before the student has a chance to enter the room, violating the constraint that students may not enter while the dean is in the room.

    =sleep randoml 1 0 1000l
    =(#room + $R) dispatch 1l
    =cwrite (string $R) + " left the room"
    <-student $R
  }

Once in the room, the student stays for a random amount of time before leaving. Then repeat.

  dean:
  {
    case=#room take
    {
      empty=not #room peek 1l
      full=#room peek 5l
      case=#empty #full at find $empty append $full
      =(0l < count $case) switch {=(#room + $R) write {i=++}}
      <-$case
    }

Now for the dean fiber. Each lap for the dean is broken into two parts, determining the state of the room (empty, full, or neither), and doing the appropriate action (conducting a search, breaking up the party, or nothing). Inside the take block the dean checks to see if the room is empty, then checks to see if the room is full. In either case the dean enters the room otherwise he stays away.

    =$case switch
    {
      empty=
      {
        =cwrite (string $R) + " is conducting a search"
        =sleep randoml 1 0 1000l
        =cwrite (string $R) + " is done conducting a search"
        =(#room + $R) dispatch 1l
      }

The empty and full cases are of course mutually exclusive. Use the switch block introduced in my last post to choose between the three possible actions. In the empty case the dean has entered the room. He conducts a search, thus preventing students from entering the room for a period of time, then leaves.

      full=
      {
        =cwrite (string $R) + " is breaking up the party"
        =#room throttle 2l
        =cwrite (string $R) + " is done breaking up the party"
        =(#room + $R) dispatch 1l
      }
    }
    <-dean $R
  }

On the other hand if the room is full, the dean has also entered the room. He uses the throttle operator to wait until everyone except himself has left the room. The right argument to throttle is 2l so that it the fiber will suspend until there is only one occupant; the dean. The dean leaves the room by dispatching his own message from the blackboard using a pattern that is now familiar.

  d0=fiber {<-dean #dean,0l}
  s0=fiber {<-student #student,0l}
  s1=fiber {<-student #student,1l}
  s2=fiber {<-student #student,2l}
  s3=fiber {<-student #student,3l}
  s4=fiber {<-student #student,4l}
  s5=fiber {<-student #student,5l}
  s6=fiber {<-student #student,6l}
  <-wait $d0
}

Setup fibers for the single dean and 7 students. To keep the output brief I used 5 students as the room limit for this example, rather than 50. Here is the complete program.

{
  student:
  {
    =sleep randoml 1 0 3000l
    =#room take
    {
      =#room,dean throttle 1l
      =(#room + $R) write {i=++}
      =cwrite (string $R) + " entered the room"
    }
    =sleep randoml 1 0 1000l
    =(#room + $R) dispatch 1l
    =cwrite (string $R) + " left the room"
    <-student $R
  }
  dean:
  {
    case=#room take
    {
      empty=not #room peek 1l
      full=#room peek 5l
      case=#empty #full at find $empty append $full
      =(0l < count $case) switch {=(#room + $R) write {i=++}}
      <-$case
    }
    =$case switch
    {
      empty=
      {
        =cwrite (string $R) + " is conducting a search"
        =sleep randoml 1 0 1000l
        =cwrite (string $R) + " is done conducting a search"
        =(#room + $R) dispatch 1l
      }
      full=
      {
        =cwrite (string $R) + " is breaking up the party"
        =#room throttle 2l
        =cwrite (string $R) + " is done breaking up the party"
        =(#room + $R) dispatch 1l
      }
    }
    <-dean $R
  }
  d0=fiber {<-dean #dean,0l}
  s0=fiber {<-student #student,0l}
  s1=fiber {<-student #student,1l}
  s2=fiber {<-student #student,2l}
  s3=fiber {<-student #student,3l}
  s4=fiber {<-student #student,4l}
  s5=fiber {<-student #student,5l}
  s6=fiber {<-student #student,6l}
  <-wait $d0
}

The Output

Welcome to O2, ask brian if you need help
O2>eval fload "../../src/party.o2"
#dean,0l is conducting a search
#dean,0l is done conducting a search
#student,6l entered the room
#student,5l entered the room
#student,4l entered the room
#student,0l entered the room
#student,3l entered the room
#dean,0l is breaking up the party
#student,6l left the room
#student,0l left the room
#student,3l left the room
#student,5l left the room
#student,4l left the room
#dean,0l is done breaking up the party
#student,6l entered the room
#student,4l entered the room
#student,2l entered the room
#student,4l left the room
#student,2l left the room
#student,6l left the room
#dean,0l is conducting a search
#dean,0l is done conducting a search
#student,1l entered the room
#student,0l entered the room
#student,5l entered the room
#student,0l left the room
#student,1l left the room
#student,5l left the room
#dean,0l is conducting a search
#dean,0l is done conducting a search
#student,3l entered the room
#student,4l entered the room
#student,4l left the room
#student,3l left the room
#dean,0l is conducting a search
#dean,0l is done conducting a search
#student,6l entered the room
#student,2l entered the room
#student,2l left the room
#student,1l entered the room
#student,0l entered the room
#student,3l entered the room
#student,6l left the room
#student,4l entered the room
#student,5l entered the room
#dean,0l is breaking up the party
#student,4l left the room
#student,1l left the room
#student,3l left the room
#student,0l left the room
#student,5l left the room
#dean,0l is done breaking up the party
#dean,0l is conducting a search

Whenever the room happens to be empty the dean does his search, during which time no students enter. When the room reaches 5 occupants, the dean enters and waits until everyone has left the room.

Discussion

And with that I am done with my first twelve concurrency demos in O2. I would love to keep going but I need this language to do more than just solve concurrency problems, so its time to move on. The next series of demos will be on distributed computing topics. It's gonna be great year.

Posted in Examples | Tagged: | Leave a Comment »

The River Crossing Problem In O2

Posted by Brian Andersen on May 14, 2012

I am now getting into the more obscure problems. These are all out of Allen Downey’s Little Book of Semaphores. Whereas as most textbooks take the form of a collection of algorithms, disembodied from any particular problem, this one is organized in a problem-first way which is really fun and refreshing. I wish there were more textbooks written in this style. I am afraid that when I am done with this set of twelve I will need to starting inventing my own problems. The river crossing problem as he describes it:

This is from a problem set written by Anthony Joseph at U.C. Berkeley, but I don’t know if he is the original author. It is similar to the H2O problem in the sense that it is a peculiar sort of barrier that only allows threads to pass in certain combinations.


Somewhere near Redmond, Washington there is a rowboat that is used by both Linux hackers and Microsoft employees (serfs) to cross a river. The ferry can hold exactly four people; it won’t leave the shore with more or fewer. To guarantee the safety of the passengers, it is not permissible to put one hacker in the boat with three serfs, or to put one serf with three hackers. Any other combination is safe.

As each thread boards the boat it should invoke a function called board. You must guarantee that all four threads from each boatload invoke board before any of the threads from the next boatload do. After all four threads have invoked board, exactly one of them should call a function named rowBoat, indicating that that thread will take the oars. It doesn’t matter which thread calls the function, as long as one does. Don’t worry about the direction of travel. Assume we are only interested in traffic going in one of the directions.

Colin Angus crossed the Atlantic in a rowboat. And I thought this problem was hard.

The Code

{
  programmer:
  {
    =sleep randoml 1 0 1000l
    self=$R part 0l
    other=$self switch {hacker=#serf serf=#hacker}

All of the players in this game are programmers and we will know which type we are based on the fiber name passed in the right argument ($R), as in many of my previous examples.

    crew=#line take
    {
      case=((#line + $self) peek 3l) switch
      {
        =#allsame
        =(((#line + $self) peek 1l) and (#line + $other) peek 2l) switch {=#twoeach =#none}
      }

As programmers arrive at the dock they examine the line to see whether their arrival would form a complete crew to cross the river. The two relevant cases are when there are already three of the same type (#serf or #hacker) as the person arriving; #allsame, and when there are one of the same type and two of the opposite type; #twoeach. Otherwise the new arrival will have to get in line until a crew can form.

      <-$case switch
      {
        allsame=
        {
          crew=(#line + $self) dispatch 3l
          <-$crew.S part 1 2l
        }

Remember we are still inside of the take block here. So only one new arrival can be in this section at once. If the crew is #allsame then the new arrival dispatches those three programmers out of the waiting line. And yields the names of his fellow crew members (not including himself) out of the take block.

        twoeach=
        {
          crew0=(#line + $other) dispatch 2l
          crew1=(#line + $self) dispatch 1l
          <-($crew0.S append $crew1.S) part 1 2l
        }

In the #twoeach case the logic is much the same but we have to call dispatch twice and append the results together to get the full list of crew members, again, not including the new arrival, who will become the captain for the journey.

        none=
        {
          =(#line + $R) write {i=++}
          <-#nothing
        }
      }
    }

In the final case there is no full crew and the programmer writes a message on the blackboard indicating that she is waiting in line. Note that she does not actually suspend here; because the current fiber is still in the critical section that would prevent the next fiber from inspecting the line.

    =(not $crew equals #nothing) switch
    {
      =#boat take
      {
        =cwrite "captain " + (string $R) + "'s crew is " + " " delimit string $crew
        =(#crew + $crew) write {i=++ ++ ++}

If this fiber established a full crew then the names of the crew members will be in the variable $crew otherwise it will contain #nothing. In this case the current fiber will be the captain. Note that I am using another mutually-exclusive take block around this section. That is to ensure that only one crew may occupy the boat at a time. It is possible that there could be multiple captains and crews that have exited #line and are now waiting to get into the #boat. So the captain will occupy the #boat take block for the entire period of the journey. The captain writes #crew messages to the members of $crew to signal them to get in the boat. You will see how this works in a moment.

        =(#boat + $R) write {i=++}
        =#boat gawk 4l

Next the captain himself gets in the boat, and waits for the other three crew members using the gawk operator. gawk was just introduced in my post on the Collegial Facility/Unisex Bathroom Problem. It works just like dispatch except without actually consuming the messages.

        =sleep randoml 1 0 1000l
        =(#land + $crew) write {i=++ ++ ++}
        =#boat throttle 2l

Simulate the time taken to paddle across the river. Then write #land messages to the crew members to let them know its time to get off the boat. Finally wait for the entire crew to get off the boat. Using 2l as the right argument of throttle waits until there are less than two members, since the captain is one.

        =cwrite "captain " + (string $R) + " is getting off the boat"
        =(#boat + $R) dispatch 1l
      }

Finally the captain gets off the boat.

      =
      {
        =(#line + $R) throttle 1l
        =(#crew + $R) dispatch 1l
        =(#boat + $R) write {i=++}
        =cwrite (string $R) + " got on the boat"

In the alternative case this fiber is a passenger. Wait to be taken out of line by the captain. Then wait for the #crew message which is the passengers signal that the captain has control of the boat. Finally board the boat by writing a #boat message.

        =(#land + $R) dispatch 1l
        =(#boat + $R) dispatch 1l
        =cwrite (string $R) + " got off the boat"

Wait for the #land message to be sent by the captain. Then dispatch the #boat message that represents occupancy of the boat above. This signals to the captain that the passenger is off the boat.

      }
    }
    <-programmer $R
  }

Repeat forever.

  h0=fiber {<-programmer #hacker,0l}
  h1=fiber {<-programmer #hacker,1l}
  h2=fiber {<-programmer #hacker,2l}
  h3=fiber {<-programmer #hacker,3l}
  s0=fiber {<-programmer #serf,0l}
  s1=fiber {<-programmer #serf,1l}
  s2=fiber {<-programmer #serf,2l}
  s3=fiber {<-programmer #serf,3l}
  <-wait $h0
}

This is just the standard initialization seen in many of my examples. Here is the complete program.

{
  programmer:
  {
    =sleep randoml 1 0 1000l
    self=$R part 0l
    other=$self switch {hacker=#serf serf=#hacker}
    crew=#line take
    {
      case=((#line + $self) peek 3l) switch
      {
        =#allsame
        =(((#line + $self) peek 1l) and (#line + $other) peek 2l) switch {=#twoeach =#none}
      }
      <-$case switch
      {
        allsame=
        {
          crew=(#line + $self) dispatch 3l
          <-$crew.S part 1 2l
        }
        twoeach=
        {
          crew0=(#line + $other) dispatch 2l
          crew1=(#line + $self) dispatch 1l
          <-($crew0.S append $crew1.S) part 1 2l
        }
        none=
        {
          =(#line + $R) write {i=++}
          <-#nothing
        }
      }
    }
    =(not $crew equals #nothing) switch
    {
      =#boat take
      {
        =cwrite "captain " + (string $R) + "'s crew is " + " " delimit string $crew
        =(#crew + $crew) write {i=++ ++ ++}
        =(#boat + $R) write {i=++}
        =#boat gawk 4l
        =sleep randoml 1 0 1000l
        =(#land + $crew) write {i=++ ++ ++}
        =#boat throttle 2l
        =cwrite "captain " + (string $R) + " is getting off the boat"
        =(#boat + $R) dispatch 1l
      }
      =
      {
        =(#line + $R) throttle 1l
        =(#crew + $R) dispatch 1l
        =(#boat + $R) write {i=++}
        =cwrite (string $R) + " got on the boat"
        =(#land + $R) dispatch 1l
        =(#boat + $R) dispatch 1l
        =cwrite (string $R) + " got off the boat"
      }
    }
    <-programmer $R
  }
  h0=fiber {<-programmer #hacker,0l}
  h1=fiber {<-programmer #hacker,1l}
  h2=fiber {<-programmer #hacker,2l}
  h3=fiber {<-programmer #hacker,3l}
  s0=fiber {<-programmer #serf,0l}
  s1=fiber {<-programmer #serf,1l}
  s2=fiber {<-programmer #serf,2l}
  s3=fiber {<-programmer #serf,3l}
  <-wait $h0
}

Example Output

Welcome to O2, ask brian if you need help
O2>eval fload "../../src/river.o2"
captain #serf,0l's crew is #hacker,3l #hacker,1l #serf,3l
#hacker,3l got on the boat
#hacker,1l got on the boat
#serf,3l got on the boat
#hacker,3l got off the boat
#hacker,1l got off the boat
#serf,3l got off the boat
captain #serf,0l is getting off the boat
captain #hacker,2l's crew is #serf,2l #serf,1l #hacker,0l
#serf,2l got on the boat
#serf,1l got on the boat
#hacker,0l got on the boat
#serf,2l got off the boat
#serf,1l got off the boat
#hacker,0l got off the boat
captain #hacker,2l is getting off the boat
captain #serf,0l's crew is #hacker,3l #hacker,1l #serf,3l
#hacker,3l got on the boat
#hacker,1l got on the boat
#serf,3l got on the boat
#hacker,3l got off the boat
#hacker,1l got off the boat
#serf,3l got off the boat
captain #serf,0l is getting off the boat
captain #hacker,2l's crew is #serf,2l #serf,1l #hacker,0l
#serf,2l got on the boat
#serf,1l got on the boat
#hacker,0l got on the boat
#serf,2l got off the boat
#serf,1l got off the boat
#hacker,0l got off the boat
captain #hacker,2l is getting off the boat
captain #hacker,1l's crew is #serf,0l #serf,3l #hacker,3l
#serf,0l got on the boat
#serf,3l got on the boat
#hacker,3l got on the boat
#serf,0l got off the boat
#serf,3l got off the boat
#hacker,3l got off the boat
captain #hacker,1l is getting off the boat
captain #hacker,2l's crew is #serf,1l #serf,2l #hacker,0l
#serf,1l got on the boat
#serf,2l got on the boat
#hacker,0l got on the boat
#serf,1l got off the boat
#serf,2l got off the boat
#hacker,0l got off the boat
captain #hacker,2l is getting off the boat
captain #hacker,3l's crew is #serf,3l #serf,0l #hacker,1l
#serf,3l got on the boat
#serf,0l got on the boat
#hacker,1l got on the boat
#serf,3l got off the boat
#serf,0l got off the boat
#hacker,1l got off the boat
captain #hacker,3l is getting off the boat
captain #serf,0l's crew is #hacker,0l #hacker,2l #serf,2l
#hacker,0l got on the boat
#hacker,2l got on the boat
#serf,2l got on the boat
#hacker,0l got off the boat
#hacker,2l got off the boat
#serf,2l got off the boat
captain #serf,0l is getting off the boat
captain #hacker,3l's crew is #serf,1l #serf,0l #hacker,1l
#serf,1l got on the boat
#serf,0l got on the boat
#hacker,1l got on the boat
#serf,1l got off the boat
#serf,0l got off the boat
#hacker,1l got off the boat
captain #hacker,3l is getting off the boat
captain #hacker,0l's crew is #serf,3l #serf,2l #hacker,2l
#serf,3l got on the boat
#serf,2l got on the boat
#hacker,2l got on the boat
#serf,3l got off the boat
#serf,2l got off the boat
#hacker,2l got off the boat
captain #hacker,0l is getting off the boat
captain #serf,1l's crew is #hacker,1l #hacker,3l #serf,0l
#hacker,1l got on the boat
#hacker,3l got on the boat
#serf,0l got on the boat
#hacker,1l got off the boat
#hacker,3l got off the boat
#serf,0l got off the boat
captain #serf,1l is getting off the boat
captain #hacker,2l's crew is #serf,2l #serf,3l #hacker,1l
#serf,2l got on the boat
#serf,3l got on the boat
#hacker,1l got on the boat
#serf,2l got off the boat
#serf,3l got off the boat
#hacker,1l got off the boat
captain #hacker,2l is getting off the boat
captain #serf,1l's crew is #hacker,3l #hacker,0l #serf,0l
#hacker,3l got on the boat
#hacker,0l got on the boat
#serf,0l got on the boat
#hacker,3l got off the boat
#hacker,0l got off the boat
#serf,0l got off the boat
captain #serf,1l is getting off the boat
captain #hacker,2l's crew is #serf,3l #serf,2l #hacker,1l
#serf,3l got on the boat
#serf,2l got on the boat
#hacker,1l got on the boat
#serf,3l got off the boat
#serf,2l got off the boat
#hacker,1l got off the boat
captain #hacker,2l is getting off the boat
captain #hacker,0l's crew is #serf,1l #serf,0l #hacker,3l
#serf,1l got on the boat
#serf,0l got on the boat
#hacker,3l got on the boat
#serf,1l got off the boat
#serf,0l got off the boat
#hacker,3l got off the boat
captain #hacker,0l is getting off the boat
captain #hacker,1l's crew is #serf,2l #serf,1l #hacker,2l
#serf,2l got on the boat
#serf,1l got on the boat
#hacker,2l got on the boat
#serf,2l got off the boat
#serf,1l got off the boat
#hacker,2l got off the boat
captain #hacker,1l is getting off the boat
captain #hacker,0l's crew is #serf,3l #serf,0l #hacker,3l
#serf,3l got on the boat
#serf,0l got on the boat
#hacker,3l got on the boat
#serf,3l got off the boat
#serf,0l got off the boat
#hacker,3l got off the boat
captain #hacker,0l is getting off the boat
captain #hacker,3l's crew is #serf,0l #serf,2l #hacker,1l
#serf,0l got on the boat
#serf,2l got on the boat
#hacker,1l got on the boat
#serf,0l got off the boat
#serf,2l got off the boat
#hacker,1l got off the boat
captain #hacker,3l is getting off the boat
captain #hacker,0l's crew is #serf,3l #serf,1l #hacker,2l
#serf,3l got on the boat
#serf,1l got on the boat
#hacker,2l got on the boat
#serf,3l got off the boat
#serf,1l got off the boat
#hacker,2l got off the boat
captain #hacker,0l is getting off the boat
captain #hacker,2l's crew is #serf,0l #serf,2l #hacker,3l
#serf,0l got on the boat
#serf,2l got on the boat
#hacker,3l got on the boat
#serf,0l got off the boat
#serf,2l got off the boat
#hacker,3l got off the boat
captain #hacker,2l is getting off the boat
captain #hacker,0l's crew is #serf,3l #serf,1l #hacker,2l
#serf,3l got on the boat
#serf,1l got on the boat
#hacker,2l got on the boat
#serf,3l got off the boat
#serf,1l got off the boat
#hacker,2l got off the boat
captain #hacker,0l is getting off the boat
captain #serf,1l's crew is #hacker,1l #hacker,2l #serf,0l
#hacker,1l got on the boat
#hacker,2l got on the boat
#serf,0l got on the boat
#hacker,1l got off the boat
#hacker,2l got off the boat
#serf,0l got off the boat
captain #serf,1l is getting off the boat
captain #hacker,0l's crew is #serf,2l #serf,3l #hacker,3l
#serf,2l got on the boat
#serf,3l got on the boat
#hacker,3l got on the boat
#serf,2l got off the boat
#serf,3l got off the boat
#hacker,3l got off the boat
captain #hacker,0l is getting off the boat
captain #serf,2l's crew is #hacker,3l #hacker,1l #serf,1l
#hacker,3l got on the boat
#hacker,1l got on the boat
#serf,1l got on the boat
#hacker,3l got off the boat
#hacker,1l got off the boat
#serf,1l got off the boat
captain #serf,2l is getting off the boat
captain #hacker,0l's crew is #serf,0l #serf,3l #hacker,2l
#serf,0l got on the boat
#serf,3l got on the boat
#hacker,2l got on the boat
#serf,0l got off the boat
#serf,3l got off the boat
#hacker,2l got off the boat
captain #hacker,0l is getting off the boat
captain #serf,2l's crew is #hacker,3l #hacker,1l #serf,1l
#hacker,3l got on the boat
#hacker,1l got on the boat
#serf,1l got on the boat
#hacker,3l got off the boat
#hacker,1l got off the boat
#serf,1l got off the boat
captain #serf,2l is getting off the boat
captain #hacker,2l's crew is #serf,0l #serf,3l #hacker,0l
#serf,0l got on the boat
#serf,3l got on the boat
#hacker,0l got on the boat
#serf,0l got off the boat
#serf,3l got off the boat
#hacker,0l got off the boat
captain #hacker,2l is getting off the boat
captain #hacker,0l's crew is #hacker,3l #hacker,2l #hacker,1l
#hacker,3l got on the boat
#hacker,2l got on the boat
#hacker,1l got on the boat
#hacker,3l got off the boat
#hacker,2l got off the boat
#hacker,1l got off the boat
captain #hacker,0l is getting off the boat
captain #serf,0l's crew is #serf,3l #serf,1l #serf,2l
#serf,3l got on the boat
#serf,1l got on the boat
#serf,2l got on the boat
#serf,3l got off the boat
#serf,1l got off the boat
#serf,2l got off the boat

I included a little more output than usual in order to prove that this program will form both types of crews; those with all hackers or serfs, or two of each.

Discussion

At this stage all I can really say is that I'm getting tired of working on concurrency problems. Which is how I should feel at this point. This is number eleven out my set of twelve. One of the best techniques I learned in jazz school was to practice at one idea, in as many variations as I could come up with, until it was completely beaten into the ground. This happens way after the idea stops being fun and new and interesting. You have to keep going until you are totally sick of it. I had way more patience for this style of practice than most people. And it works. I'm finding that it works for programming as well, especially in the process of developing a language. Anyway it feels like O2 is getting pretty good at expressing concurrency problems. I'm going to bang out one more of these puppies and then were going to try something different.

Posted in Examples | Tagged: | Leave a Comment »

The Collegial Facility Problem In O2

Posted by Brian Andersen on May 12, 2012

Today’s problem tested my wits, my patience, and of course O2, my new programming language. I started with The Unisex Bathroom Problem as described in Allen Downey’s Little Book of Semaphores. He describes the problem as follows:

I wrote this problem when a friend of mine left her position teaching physics at Colby College and took a job at Xerox. She was working in a cubicle in the basement of a concrete monolith, and the nearest women’s bathroom was two floors up. She proposed to the Uberboss that they convert the men’s bathroom on her floor to a unisex bathroom, sort of like on Ally McBeal. The Uberboss agreed, provided that the following synchronization constraints can be maintained:

  • There cannot be men and women in the bathroom at the same time.
  • There should never be more than three employees squandering company time in the bathroom.

Of course the solution should avoid deadlock. For now, though, don’t worry about starvation. You may assume that the bathroom is equipped with all the semaphores you need.


Downey then provides two concise solutions to the problem using semaphores. The first one is highly concurrent but prone to starvation. It involves a LightSwitch; a higher level abstraction of a semaphore, on the outside of a Multiplex. The lightswitch controls whether men or women are allowed in the room at a given time. And the multiplex ensures that only three occupants are in the room concurrently. The problem is that while one sex is occupying the room, if the other sex is waiting outside they may be skipped over and over again by new arrivals of the same sex that is in the room. The second one is starvation free but suffers from reduced concurrency. Basically everyone goes in first-in-first-out order. There will only be multiple occupants in the bathroom if multiple men or women entered the line together. In the degenerate case, men and women alternate perfectly, and only one person can use the bathroom at a time. I could have implemented either of these solutions in O2. But I felt that since I am introducing a new language that aims to be good at solving concurrency problems I should have a solution that is highly concurrent and free of starvation. So I added the following constraints to my version of the problem. I call it the Collegial Facility.

  • If the room is occupied by men/women and a woman/man arrives at the door, she/he should step aside and let any man/woman that arrives go first, in order to maximize the throughput of the bathroom.
  • If there are one or more women/men waiting in this “skip line”, then they should only be skipped by at most three new arrivals.
  • Once they have been skipped the third time, all of the people that have been skipped get to go before any new arrival can access the bathroom.

The Code

My solution requires a separate “bathroom controller” fiber to coordinate entries and exits from the bathroom. In general I like to avoid using these kinds of helper threads, but sometimes they are necessary. In this case the bathroom fiber can suspend at the necessary times in a way that would not work for one of the person fibers if they were inside a mutually-exclusive take block. Also, many real systems like guis, transaction processing systems, etc… involve serial event processor loops so it is valuable to show how those can be implemented in O2.

{
  person:
  {
    me=$R
    mysex=$R part 0l

We shall start with the definition of the person fiber. The right argument is a symbol (tuple) consisting of the sex of the person; #man or #woman and an integer unique to that fiber.

    =sleep randoml 1 0 1000l

Randomize the attempts enter the bathroom.

    =(#door,entry + $me) write {i=++}
    =(#door,entry + $me) throttle 1l
    =(#skip + $me) throttle 1l

This is a new pattern you haven’t seen before. The throttle operator can be used to implement a kind of request response between fibers. The person writes a message under the symbol #door,entry and then uses throttle to wait until another fiber dispatches that same message from the blackboard. The room fiber will decide whether this person should enter the skip line. All that is necessary to make the person respect the skip line is to introduce the second throttle using the symbol #skip with $me appended.

    =(#room + $me) write {i=++}
    =(#ack + $me) write {i=++}
    =cwrite (string $me) + " entered the bathroom"

Once allowed through the entry and possibly removed from the skip line, the person enters the bathroom. After entering the room the person writes back on the symbol #ack. I will show you when I get to the room code why this is important. Once letting a person in, the room has to wait for the person to actually enter in order to avoid potential race conditions.

    =sleep randoml 1 0 1000l
    =cwrite (string $me) + " left the bathroom"
    =(#room + $me) dispatch 1l
    =(#door,exit + $me) write {i=++}

Person uses the bathroom for a random period of time, then exits by dispatching the line previously written. In addition to removing themselves from the room, the person writes an additional message explicitly indicating that they exited the room.

    <-person $R
  }

Repeat from the top. Now for the room code:

  room:
  {
    =#room throttle 3l

Each lap for the room fiber begins by waiting until the the room is not full. throttle handles this. Recall that the person fiber writes a message to the #room symbol in order to indicate that the person is in the room, and dispatches that same symbol upon leaving.

    next=#door gawk 1l

The gawk operator is yet another variation of dispatch, along with throttle and peek. gawk lets you actually look at the messages that dispatch would yield if you invoked it with the same arguments. The only difference is that gawk does not remove the message from the queue. In this way the room fiber can look at like a kind of lightweight server, inspecting messages and then deciding how to act upon them. Once done it will dispatch the message, indicating to the person fiber that it is ok to continue.

    <-($next.S part 1l) switch
    {
      entry=($R > 2l) switch
      {

In prior posts I have used the if operator to do conditional branches. I hate conditional logic. I prefer code that proceeds in a straight line, such as the person fiber above. But I can not always have my way. So I introduced switch which is a less ugly version of conditional branching. On the left argument it takes a single boolean or symbol value. In the case of a boolean it will execute the first or second branch on the right depending on whether the value is true or false. In the case of a symbol it chooses the branch to execute using its name in the block on the right. So in this case if the event message symbol is #entry then it chooses the branch otherwise it chooses the #exit branch as you shall see. In the entry block there is a nested conditional which checks to see if the right argument ($R) is greater than 2l. This is how the room controls the case where people have been skipped too many times.

        ={=#room throttle 1l =drain {} <-room 0l}

Wait until the room is empty. Then invoke the drain operator which will follow below. The drain operator lets everyone in the skip line use the restroom before continuing. Then reenter the room operator with the right argument, the skip count set to zero.

        =
        {
          othersex=($next.S part 2l) switch {man=#woman woman=#man}
          sexok=not (#room + $othersex) peek 1l
          =cwrite (string $next.S) + " at the entry " + ($sexok switch {="(ok)" ="(not ok)"})

Now begin the case where a person is at the entry and the people in the skip line have not been skipped too many times. Here the switch operator is used again to determine the symbol for the other sex. Then we calculate the $sexok flag which is false if there are members of the other sex in the room, according to the peek operator.

          skip=$sexok switch
          {
            =(#skip peek 1l) switch {=$R + 1l =0l}
            ={=(#skip + $next.S part 2 3l) write {i=++} <-$R}
          }

Another use of switch, you can see why I had to do something about the aesthetics of if before posting this example. The new skip count will be stored in the variable $skip. If $sexok is true, then check to see if someone is in the skip line and increment the skip count before returning. Otherwise we put the person into the skip line by writing a new message to the symbol #skip appended with the name of the fiber that is at the entry. In this case the skip count remains the same.

          =$next.S dispatch 1l
          =($sexok) switch {=#ack dispatch 1l}
          <-room $skip
        }
      }

After deciding whether the person is to enter the room or go into the skip line, we dispatch the symbol of the next message that was obtained from the gawk operator. This will cause the person fiber to advance to the next stage which will either be the skip line or they will actually enter the bathroom (finally!). If they entered the room then the room fiber should wait for the #ack which will only happen after the person has entered the room; that is written to the #room symbol. Another way to accomplish the same thing would be to read from the line given by result of $next.S dispatch 1l.

      exit=((not #room peek 1l) and #skip peek 1l) switch
      {
        ={=$next.S dispatch 1l =drain {} <-room 0l}
        ={=$next.S dispatch 1l <-room $R}
      }
    }
  }

This is the block of code that will evaluate if the room receives a #door,exit message. If the room is now empty and there are people in the skip line then all of the people in the skip line should get to go next. Otherwise just take it from the top.

  drain:
  {
    <-(#skip peek 1l) switch
    {
      =
      {
        =#skip dispatch 1l
        =#ack dispatch 1l
        =#room throttle 3l
        <-drain {}
      }
    }
  }

This is the drain routine as promised. It is recursive. It removes a person from the skip line, waiting for the #ack as they enter the room. Then it waits as long is it takes until there a seat free in the room and lets the next person in, until there is no one left in the room.

  test0:
  {
    b=fiber {<-#none room 0l}
    m0=fiber {<-person #man,0l}
    m1=fiber {<-person #man,1l}
    m2=fiber {<-person #man,2l}
    m3=fiber {<-person #man,3l}
    m4=fiber {<-person #man,4l}
    m5=fiber {<-person #man,5l}
    m6=fiber {<-person #man,6l}
    m7=fiber {<-person #man,7l}
    w8=fiber {<-person #man,8l}
    w0=fiber {<-person #woman,0l}
    w1=fiber {<-person #woman,1l}
    w2=fiber {<-person #woman,2l}
    w3=fiber {<-person #woman,3l}
    w4=fiber {<-person #woman,4l}
    w5=fiber {<-person #woman,5l}
    w6=fiber {<-person #woman,6l}
    w7=fiber {<-person #woman,7l}
    w8=fiber {<-person #woman,8l}
    <-wait $b
  }
  <-test0 {}
}

I configured this test with eight man and eight woman threads. I used a few extra configurations to test this program but I will omit them here in the name of brevity (For some definition of brevity anyway ;) . Here is the full text of the program. It is the most complex O2 example I have published yet.

{
  person:
  {
    me=$R
    mysex=$R part 0l
    othersex=$mysex switch {man=#woman woman=#man}
    =sleep randoml 1 0 1000l
    =(#door,entry + $me) write {i=++}
    =(#door,entry + $me) throttle 1l
    =(#skip + $me) throttle 1l
    =(#room + $me) write {i=++}
    =(#ack + $me) write {i=++}
    =cwrite (string $me) + " entered the bathroom"
    =sleep randoml 1 0 1000l
    =cwrite (string $me) + " left the bathroom"
    =(#room + $me) dispatch 1l
    =(#door,exit + $me) write {i=++}
    <-person $R
  }
  room:
  {
    =#room throttle 3l
    next=#door gawk 1l
    <-($next.S part 1l) switch
    {
      entry=($R > 2l) switch
      {
        ={=#room throttle 1l =drain {} <-room 0l}
        =
        {
          othersex=($next.S part 2l) switch {man=#woman woman=#man}
          sexok=not (#room + $othersex) peek 1l
          =cwrite (string $next.S) + " at the entry " + ($sexok switch {="(ok)" ="(not ok)"})
          skip=$sexok switch
          {
            =(#skip peek 1l) switch {=$R + 1l =0l}
            ={=(#skip + $next.S part 2 3l) write {i=++} <-$R}
          }
          =$next.S dispatch 1l
          =($sexok) switch {=#ack dispatch 1l}
          <-room $skip
        }
      }
      exit=((not #room peek 1l) and #skip peek 1l) switch
      {
        ={=$next.S dispatch 1l =drain {} <-room 0l}
        ={=$next.S dispatch 1l <-room $R}
      }
    }
  }
  drain:
  {
    <-(#skip peek 1l) switch
    {
      =
      {
        =#skip dispatch 1l
        =#ack dispatch 1l
        =#room throttle 3l
        <-drain {}
      }
    }
  }
  test0:
  {
    b=fiber {<-#none room 0l}
    m0=fiber {<-person #man,0l}
    m1=fiber {<-person #man,1l}
    m2=fiber {<-person #man,2l}
    m3=fiber {<-person #man,3l}
    m4=fiber {<-person #man,4l}
    m5=fiber {<-person #man,5l}
    m6=fiber {<-person #man,6l}
    m7=fiber {<-person #man,7l}
    w8=fiber {<-person #man,8l}
    w0=fiber {<-person #woman,0l}
    w1=fiber {<-person #woman,1l}
    w2=fiber {<-person #woman,2l}
    w3=fiber {<-person #woman,3l}
    w4=fiber {<-person #woman,4l}
    w5=fiber {<-person #woman,5l}
    w6=fiber {<-person #woman,6l}
    w7=fiber {<-person #woman,7l}
    w8=fiber {<-person #woman,8l}
    <-wait $b
  }
  <-test0 {}
}

The Output

Welcome to O2, ask brian if you need help
O2>eval fload "../../src/facility.o2"
#door,entry,man,4l at the entry (ok)
#man,4l entered the bathroom
#door,entry,man,7l at the entry (ok)
#man,7l entered the bathroom
#door,entry,woman,1l at the entry (not ok)
#door,entry,man,2l at the entry (ok)
#man,2l entered the bathroom
#man,7l left the bathroom
#door,entry,woman,0l at the entry (not ok)
#door,entry,man,8l at the entry (ok)
#man,8l entered the bathroom
#man,2l left the bathroom
#door,entry,man,5l at the entry (ok)
#man,5l entered the bathroom
#man,4l left the bathroom
#man,8l left the bathroom
#man,5l left the bathroom
#woman,1l entered the bathroom
#woman,0l entered the bathroom
#door,entry,man,3l at the entry (not ok)
#door,entry,man,6l at the entry (not ok)
#door,entry,woman,3l at the entry (ok)
#woman,3l entered the bathroom
#woman,0l left the bathroom
#door,entry,woman,7l at the entry (ok)
#woman,7l entered the bathroom
#woman,7l left the bathroom
#door,entry,woman,8l at the entry (ok)
#woman,8l entered the bathroom
#woman,3l left the bathroom
#woman,8l left the bathroom
#woman,1l left the bathroom
#man,3l entered the bathroom
#man,6l entered the bathroom
#door,entry,man,4l at the entry (ok)
#man,4l entered the bathroom
#man,6l left the bathroom
#door,entry,woman,4l at the entry (not ok)
#door,entry,woman,2l at the entry (not ok)
#door,entry,man,0l at the entry (ok)
#man,0l entered the bathroom
#man,3l left the bathroom
#door,entry,woman,6l at the entry (not ok)
#door,entry,woman,5l at the entry (not ok)
#door,entry,man,1l at the entry (ok)
#man,1l entered the bathroom
#man,1l left the bathroom
#door,entry,man,2l at the entry (ok)
#man,2l entered the bathroom
#man,4l left the bathroom
#man,0l left the bathroom
#man,2l left the bathroom
#woman,4l entered the bathroom
#woman,2l entered the bathroom
#woman,6l entered the bathroom
#woman,2l left the bathroom
#woman,5l entered the bathroom
#woman,6l left the bathroom
#door,entry,man,7l at the entry (not ok)
#door,entry,man,8l at the entry (not ok)
#door,entry,woman,7l at the entry (ok)
#woman,7l entered the bathroom
#woman,4l left the bathroom
#door,entry,woman,0l at the entry (ok)
#woman,0l entered the bathroom
#woman,5l left the bathroom
#door,entry,man,5l at the entry (not ok)
#door,entry,woman,3l at the entry (ok)
#woman,3l entered the bathroom
#woman,7l left the bathroom
#woman,3l left the bathroom
#woman,0l left the bathroom
#man,7l entered the bathroom
#man,8l entered the bathroom
#man,5l entered the bathroom
#man,5l left the bathroom
#door,entry,woman,1l at the entry (not ok)
#door,entry,man,6l at the entry (ok)
#man,6l entered the bathroom
#man,8l left the bathroom
#door,entry,woman,8l at the entry (not ok)
#door,entry,man,4l at the entry (ok)
#man,4l entered the bathroom

I won't break this down line by line but if you do you can see that the bathroom remains highly concurrent while avoiding starvation and obeying the constraints set out in the original problem.

Discussion

When I first added the throttle and peek operators into O2 I thought that they would not be used much in practice. But now that I have them I find myself using them all over. I don't believe I could have solved this problem at all without them. Also, I think that a number of my earlier posts could have been written more elegantly with the benefit of these operators. In this post I introduced yet another operator in the same family; gawk. gawk lets you actually look at the data that would be dispatched without actually experiencing the side effect of dispatch. I think that this operator is going to come in handy in my upcoming series on distributed computing in O2. But who knows? I don't really know what pieces I need until I start trying to build stuff with them. The new switch operator is another example. I think I am going to be able to get rid of ugly old if entirely. It is pretty exciting to feel the language getting better and stronger through repeated use and practice, which is the whole purpose of this blog.

Posted in Uncategorized | 1 Comment »

The Sleeping Barber Problem In O2

Posted by Brian Andersen on April 30, 2012

In an earlier post I wrote about the Producer-Consumer Problem and used it to introduce the dispatch operator in O2. In my most recent post I wrote about the Bounded-Buffer variation of this problem and how it calls for the throttle operator. Whereas dispatch suspends the calling fiber until it can yield the requested data, throttle suspends until the requested data has been dispatched. The Sleeping Barber is one more variation on a theme Producer-Consumer Problems that can arise regularly in concurrent and distributed software. Fortunately, if you understood the other two problems, then this one should be in the walk in the park.

The Sleeping Barber Problem

A barber operates a shop with a single barber chair and n seats for waiting customers. As customers enter the shop, they look at the number of available seats. If the shop is empty then they have to wake up the barber who goes to sleep whenever he doesn’t have customers. On the other hand, if all the waiting seats are occupied then the customer decides to leave without a haircut.

In thinking about this problem you have to remember that the customer is the producer and the barber is the consumer. As far as managing a queue, having the barber go to sleep and wake up when the customer enters, there is nothing new in this problem. It is exactly like the Bounded-Buffer Problem. The only distinction worth making is that in this example the customer doesn’t want to wait for a seat and then take it when it becomes available. He wants to turn around and leave. It’s an important distinction!

Introducing Peek

This time I would like to introduce the peek operator which lets you simply check whether dispatch would yield for the arguments given. peek never suspends, it simply returns the line number of the last item that would have been seen by dispatch, or a value less than zero if dispatch would have suspended. The line number can be quite useful in some cases which I will come to later. That’s it, nothing fancy in today’s post.

The Code

{
  consumer:
  {
    item=#item dispatch 1l
    =cwrite (string $R) + " cutting hair for customer " + string $item.i
    =sleep randoml 1 0 3000l
    <-consumer $R
  }

There is nothing new here, just the barber code which uses dispatch to read one customer at a time off the blackboard.

  producer:
  {
    =sleep randoml 1 0 1000l
    =if
    {
      test:0l >= #item peek 10l

This is where we test the result of peek to decide what to do. Remember always read from right to left! There is no precedence among operators in O2. Operators are processed from right to left no matter what. The arguments to peek are exactly the same as dispatch. In this case, dispatch would suspend if there were at least 10 new rows with symbol #item in the blackboard and would return those rows immediately otherwise. So peek is going to return the last line number that would have been yielded by dispatch. If dispatch would have suspended, peek will return a number less than zero.

      then:
      {
        item=#item write {i=++}
        item=#item read ($item - 1l) append 1l
        =cwrite (string $R) + " customer " + (string $item.i) + " entered the shop"
      }

So, if there are fewer than 10 people already waiting, our customer enters the shop. He records that fact that he entered the shop by writing a line to the blackboard with the symbol #item. Again we use the IncrOp notation ++ to ensure that a new record is written to the blackboard. I introduced this feature in the post about the Building H2O Problem. Then read back the row that was just written so we can see which customer number it was.

      else:cwrite (string $R) + " found waiting room full, going away"
    }
    <-producer $R
  }

If there are more than 10 unprocessed #items in the blackboard, the customer walks away without a haircut. Once again this is exactly like last week's Bounded-Buffer Problem with the minor exception that we make a choice based on the state of the buffer rather than having that choice made for us by the throttle or dispatch operator.

  p0=fiber {<-producer #p,0l}
  p1=fiber {<-producer #p,1l}
  p2=fiber {<-producer #p,2l}
  c0=fiber {<-consumer #c,0l}
  c1=fiber {<-consumer #c,1l}
  c2=fiber {<-consumer #c,2l}
  <-wait $p0
}

Now fire off three producer and three consumer fibers. As I have said, all of the these examples work with multiple consumers and multiple producers. Here is the full program.

{
  consumer:
  {
    item=#item dispatch 1l
    =cwrite (string $R) + " cutting hair for customer " + string $item.i
    =sleep randoml 1 0 3000l
    <-consumer $R
  }
  producer:
  {
    =sleep randoml 1 0 1000l
    =if
    {
      test:0l >= #item peek 10l
      then:
      {
        item=#item write {i=++}
        item=#item read ($item - 1l) append 1l
        =cwrite (string $R) + " customer " + (string $item.i) + " entered the shop"
      }
      else:cwrite (string $R) + " found waiting room full, going away"
    }
    <-producer $R
  }
  p0=fiber {<-producer #p,0l}
  p1=fiber {<-producer #p,1l}
  p2=fiber {<-producer #p,2l}
  c0=fiber {<-consumer #c,0l}
  c1=fiber {<-consumer #c,1l}
  c2=fiber {<-consumer #c,2l}
  <-wait $p0
}

The Output

Welcome to O2, ask brian if you need help
O2>eval fload "../../src/barber.o2"
#c,0l cutting hair for customer 0
#p,1l customer 0 entered the shop
#c,2l cutting hair for customer 1
#p,2l customer 1 entered the shop
#c,1l cutting hair for customer 2
#p,0l customer 2 entered the shop
#c,2l cutting hair for customer 3
#p,1l customer 3 entered the shop
#c,0l cutting hair for customer 4
#p,0l customer 4 entered the shop
#p,2l customer 5 entered the shop
#p,1l customer 6 entered the shop
#p,1l customer 7 entered the shop
#c,0l cutting hair for customer 5
#p,2l customer 8 entered the shop
#p,1l customer 9 entered the shop
#p,0l customer 10 entered the shop
#p,0l customer 11 entered the shop
#p,2l customer 12 entered the shop
#p,2l customer 13 entered the shop
#p,1l customer 14 entered the shop
#p,1l customer 15 entered the shop
#p,0l found waiting room full, going away
#p,2l found waiting room full, going away
#p,1l found waiting room full, going away
#c,2l cutting hair for customer 6
#p,2l customer 16 entered the shop
#c,1l cutting hair for customer 7
#p,0l customer 17 entered the shop
#p,1l found waiting room full, going away
#p,2l found waiting room full, going away
#c,1l cutting hair for customer 8
#c,1l cutting hair for customer 9
#p,1l customer 18 entered the shop
#c,2l cutting hair for customer 10
#p,0l customer 19 entered the shop
#c,0l cutting hair for customer 11
#p,2l customer 20 entered the shop
#p,1l customer 21 entered the shop
#c,0l cutting hair for customer 12
#p,1l customer 22 entered the shop
#p,0l found waiting room full, going away
#p,1l found waiting room full, going away
#c,2l cutting hair for customer 13
#p,2l customer 23 entered the shop
#c,1l cutting hair for customer 14
#p,0l customer 24 entered the shop
#p,2l found waiting room full, going away
#p,1l found waiting room full, going away
#c,1l cutting hair for customer 15
#p,2l customer 25 entered the shop
#c,1l cutting hair for customer 16
#p,0l customer 26 entered the shop
#c,2l cutting hair for customer 17
#p,1l customer 27 entered the shop
#p,0l found waiting room full, going away
#p,2l found waiting room full, going away
#c,0l cutting hair for customer 18
#p,2l customer 28 entered the shop
#p,1l found waiting room full, going away
#p,2l found waiting room full, going away

I tuned this example so that the customers arrive at three times the rate that the barber does the haircuts. This is so that it would quickly get to a point where customers were walking away from the shop. This is number nine in my twelve part series of concurrent programming examples in O2. Check back later this week for a much tougher problem.

Posted in Examples | Tagged: | Leave a Comment »

The Bounded Buffer Problem In O2

Posted by Brian Andersen on April 26, 2012

A few weeks back I did a post on solving the Producer-Consumer Problem. In that post I introduced the dispatch operator which takes a vector of symbols in its left argument ($L) and a number in its right argument ($R). It either returns $R rows of data for each symbol in $L or it suspends the calling fiber until it can return all of the data requested and no more. And it does this in such a way that no other fiber will read the same rows again using dispatch. In other words dispatch allows you to treat the O2 blackboard like a fifo queue. Because O2′s fibers are not based on operating system threads, the number you can create is limited only by available memory to store the current stack for a fiber. I also used the dispatch operator in some fancier scenarios in my posts on The Cigarette Smoker’s Problem and the Building H2O Problem.

When I solved the Producer-Consumer Problem I only dealt with the unbounded version. In the unbounded version of the problem producers may create a queue whose size is only limited by available memory. But if items in the queue represent work to be done, it may be the case that we want to control the rate at which new work can be requested, or avoid creating too big of a list of unfinished tasks. The Bounded-Buffer Problem deals with this situation. In the Bounded-Buffer problem producers are asked to check the queue before writing and if the queue is too big, suspend until it has shrunk. You might think we could use read for this but there are two problems:

  • read doesn’t know which rows have been consumed via dispatch, you tell it exactly where to read from.
  • read will only help you suspend until something is written to the blackboard, not until something is read from it.

To solve this problem in an appealing way I had to introduce a new operator; throttle, into the language.

Introducing Throttle

throttle takes the exact same arguments as dispatch, and interprets them in the same way as dispatch with one critical exception. If dispatch would yield, then throttle will suspend. And if dispatch would suspend, throttle will yield. throttle does return not the entire result set, just a number indicating the last line of the blackboard that would have been consumed. When the blackboard reaches a state where dispatch will not yield, then throttle will yield. It might sound complicated but when you see it in action it is actually pretty straightforward.

The Bounded Buffer Solution

This program is going to look almost exactly the same as the Producer-Consumer example with one slight twist.

{
  consumer:
  {
    item=#item dispatch 1l
    =cwrite (string $R) + " consuming item " + string $item.i
    =sleep randoml 1 0 3000l
    <-consumer $R
  }

The consumer fiber should be pretty clear, it uses dispatch to remove one line under the symbol #item. In the Producer-Consumer post I used incrementing symbols like #item,0l #item,1l. In this example everything will be written using the symbol #item. This way the output is easy to follow when there are multiple producers. All of my Producer-Consumer examples will work for multiple producers and consumers. Then we sleep for a random period of time and repeat.

  producer:
  {
    =sleep randoml 1 0 500l

The producer fiber sleeps at the beginning of its lap to simulate the time cost of production.

    item=#throttle take
    {
      =#item throttle 10l

Now we see throttle in action. Before writing to the blackboard, the producer fiber calls throttle with the symbol #item and the (long) number 10l. This achieves the following: If dispatch could return 10l #items from the blackboard then throttle will suspend until this is no longer the case. If dispatch would suspend because there are less than 10l unconsumed items, then throttle will yield immediately. Bounded buffer problem solved. Also note that I am using a take block around this section. This means that only one producer fiber will be in this section at one time. The reason for the reduced concurrency here is that throttle will release all waiting fibers as soon as dispatch could return. If there are multiple producers waiting they will all jump ahead and write at once. The result is that you could end up with more than 10 items in the queue. If you are willing to run the risk of having up to 13 (10 + 3 producer fibers) unconsumed items in the queue then there is no harm in getting rid of this take block. But I am going to be pedantic here; we want a max of 10 items in the queue.

      item=#item write {i=++}
      <-#item read ($item - 1l) append 1l
    }

Now write to the blackboard. Use my new IncrOp data type to increment the variable $i which is just a sequence number for the #item queue. I introduced the IncrOp in my post on the Building H2O Problem. Then read from the blackboard to find out what $i was. I think I am going to change write so that it returns the actual data that was written. But at the moment it returns the count of the blackboard up to the point of the write. When you call read with two numbers on the right the first one is the line number to start at and the second one is the number of items you want to read per symbol, like dispatch.

    =cwrite (string $R) + " produced item " + string $item.i
    <-producer $R
  }

Write the item number out the the console so that we can see what is happening.

  p0=fiber {<-producer #p,0l}
  p1=fiber {<-producer #p,1l}
  p2=fiber {<-producer #p,2l}
  c0=fiber {<-consumer #c,0l}
  c1=fiber {<-consumer #c,1l}
  c2=fiber {<-consumer #c,2l}
  <-wait $p0
}

Start up three producer fibers and three consumer fibers and go. Here is the complete program.

{
  consumer:
  {
    item=#item dispatch 1l
    =cwrite (string $R) + " consuming item " + string $item.i
    =sleep randoml 1 0 3000l
    <-consumer $R
  }
  producer:
  {
    =sleep randoml 1 0 500l
    item=#throttle take
    {
      =#item throttle 10l
      item=#item write {i=++}
      <-#item read ($item - 1l) append 1l
    }
    =cwrite (string $R) + " produced item " + string $item.i
    <-producer $R
  }
  p0=fiber {<-producer #p,0l}
  p1=fiber {<-producer #p,1l}
  p2=fiber {<-producer #p,2l}
  c0=fiber {<-consumer #c,0l}
  c1=fiber {<-consumer #c,1l}
  c2=fiber {<-consumer #c,2l}
  <-wait $p0
}

Now let's look at some output.

Output of the Bounded-Buffer Solution

Welcome to O2, ask brian if you need help
O2>eval fload "../../src/bounded.o2"
#c,0l consuming item 0
#p,1l produced item 0
#c,1l consuming item 1
#p,2l produced item 1
#c,2l consuming item 2
#p,0l produced item 2
#c,1l consuming item 3
#p,1l produced item 3
#p,0l produced item 4
#p,2l produced item 5
#p,2l produced item 6
#p,1l produced item 7
#p,1l produced item 8
#c,0l consuming item 4
#p,0l produced item 9
#p,2l produced item 10
#p,1l produced item 11
#p,0l produced item 12
#p,1l produced item 13
#p,1l produced item 14
#c,1l consuming item 5
#p,2l produced item 15
#c,2l consuming item 6
#p,0l produced item 16
#c,2l consuming item 7
#p,1l produced item 17
#c,1l consuming item 8
#p,2l produced item 18
#c,0l consuming item 9
#p,1l produced item 19
#c,0l consuming item 10
#p,0l produced item 20
#c,1l consuming item 11
#p,2l produced item 21
#c,2l consuming item 12
#p,1l produced item 22
#c,2l consuming item 13
#p,0l produced item 23
#c,1l consuming item 14
#p,2l produced item 24
#c,0l consuming item 15
#p,1l produced item 25
#c,0l consuming item 16
#p,0l produced item 26
#c,1l consuming item 17

Since the producer fibers produce items at a rate of 250 ms average (500l/2l) and the consumers consume items at a much slower rate of 1500 ms, we quickly see a queue develop, after which all producers are eagerly lined up to pass the barrier created by the throttle operator. Every time a consumer removes an item, the producer immediately adds a new item into the queue. You can also see in the output that the console printing is sometimes out of order. For example item 0l appears to be consumed before it was produced. This is just random variation in the console output and not a problem with the program itself. One way to fix this might be to drive the console output from a separate fiber reading records from the blackboard in the same order they are written. I may explore this in a future example.

A Word On the O2 Design Process

The first time I developed a programming language I rushed into trying to build real software using it, hoping that the process would force me to see all of the issues with it. It didn't work very well. In the end the application code was very unappealing, and the base of libraries that I created to serve the application were brittle and didn't work very well outside the application they were created for. It also took a very long time. And to make matters worse, it became very hard to change the language after I started depending on applications written in it.

This time I am taking a different approach. I decided to build up collections of example programs revolving around a particular theme, making certain that the resulting programs were good programs according to my taste, which mostly means short and easy to follow. At this point I have done eight (out of twelve) examples of classic concurrency problems in O2. On the surface it might seem like I had to introduce a lot of operators, but when I review them I feel pretty happy with the results. You read and write from the blackboard with read and write. When you want to treat the blackboard as a queue you use dispatch instead of read. When you need a critical section you use take. throttle and peek (to be introduced shortly) are just variants on the same conceptual and algorithmic plumbling as dispatch. While not as mathematically elegant as Djikstra's P() and V() operators, I think people will find O2's concurrency facilities much easier to use and understand.

Posted in Examples | Tagged: | 1 Comment »

The Building H2O Problem In O2

Posted by Brian Andersen on April 23, 2012

Today I will continue my journey through Allen Downey’s Little Book of Semaphores, by tackling one of the less familiar problems called “Building H2O”. According to him, and a little googling on my own, this problem is mostly known as an exam problem in college operating system classes. Of course my solution won’t use semaphores, so you won’t be able to pass the test using what follows. But it does provide a very interesting test bed for O2′s concurrency operators. Downey specifies the problem as follows:

There are two kinds of threads, oxygen and hydrogen. In order to assemble these threads into water molecules, we have to create a barrier that makes each thread wait until a complete molecule is ready to proceed. As each thread passes the barrier, it should invoke bond. You must guarantee that all the threads from one molecule invoke bond before any of the threads
from the next molecule do. In other words:

  • If an oxygen thread arrives at the barrier when no hydrogen threads are present, it has to wait for two hydrogen threads.
  • If a hydrogen thread arrives at the barrier when no other threads are present, it has to wait for an oxygen thread and another hydrogen thread.

We don’t have to worry about matching the threads up explicitly; that is, the threads do not necessarily know which other threads they are paired up with. The key is just that threads pass the barrier in complete sets; thus, if we examine the sequence of threads that invoke bond and divide them into groups of three, each group should contain one oxygen and two hydrogen threads. Puzzle: Write synchronization code for oxygen and hydrogen molecules that enforces these constraints.

Since I am no longer in college and am in the “real world” as they call it, I decided to add the third constraint to my solution. The hydrogen O2 fibers will have to record which oxygen fiber it is bound with, and the oxygen threads will have to record the names of it’s two hydrogen fibers on each lap. Why did I do this? Because in building real software you can encounter similar complex transactional requirements, for example in matching buyers and sellers. But you will always need or at least want to know who is buying from which seller and vice versa. So in building a language for production software, not homework problems, it is important to go the extra mile. So let’s get right into it shall we?

The Building H2O Solution

{
  oxygen:
  {

Let’s begin by looking at the oxygen fiber because it serves as the “master” in this case, because it needs two hydrogens, whereas the hydrogens only need one oxygen to proceed.

    h=#wait dispatch 2l

The dispatch operator easily takes care of the first constraint; getting the oxygen fiber to wait until two hydrogens are available. The hydrogen threads will write to a symbol that begins with #wait, and ends with the unique name for the fiber. This name will be passed into the right argument ($R) of the oxygen operator. For example #wait,h,0l. That records the fact that fiber #h,0l is waiting for an oxygen. Passing 2l on the right-hand side causes dispatch to suspend until there are 2 lines for symbols beginning with #hwait sitting on the blackboard. The dispatch operator ensures that each line will only be seen by one of any number of callers to dispatch.

    =#bond take
    {

What follows happens inside a take block, which means that only one oxygen thread can be inside this section at a time. This is necessary to enforce the constraint that all three fibers involved in the transaction move through the barrier (invoke bond) before any fibers from the next molecule do. Please note, however that it is not necessary to for this program to work correctly from a transactional perspective. If we remove the take operator then the program would allow H2O molecules to form concurrently rather than one at a time.

      =cwrite (string $R) + " initiating bonds with " + " " delimit string $h.fiber

Remember always read from right to left! $h.fiber will be a vector with two elements, the names of the two fibers. Recall that dispatch returns $h which is a signal type. Signals are kind of like tables with rows and columns, so the dot notation can be used to access the columns as vectors. The string operator turns those vector elements into strings leaving you with a vector of two strings. The delimit operator takes a vector of strings on the right and a delimiter on the left and creates a single string (vector of one element) with whatever delimiter you chose in between the original elements. So if the oxygen thread was bound to fibers named #h,0l and #h1l, The expression ” ” delimit string $h.fiber would give you “#h,0l #h1l”. The rest is just concatenating strings.

      =(#init + $h.fiber) write eval {i=++ ++ fiber=$R append $R}

Now the oxygen thread writes back to the hydrogen thread that it received from the dispatch operator. There is no direct messaging between fibers, they need to agree on a protocol to coordinate over the blackboard. In this case the protocol is that the hydrogen fiber will wait for a message using its own fiber name (#h,0l #h,1l and so on) prefixed with the symbol #init. You might be wondering what happens when you apply the + operator to the value #init which is a vector of one element and $h.fiber which is a vector of two elements (as discussed above). The answer is that all the basic arithmetic operators are overloaded in such a way that if either operand is a vector of one element then that element will be applied to all the elements of the other operand. This is to spare you from having to create a really long vector of some repeated element just so that you do something simple like add one to all the elements in a vector. It also spares me from having to formally introduce scalar values into the language which would clutter up the syntax and the runtime implementation considerably, and for no good reason. So to recap, the expression #init + #h,0l #h,1l will give you the vector of symbols; #init,h,0l #init,h,1l. So this line of code is writing two messages to the blackboard whose purpose is to notify the two hydrogen fibers that this oxygen fiber is bonding with them.

Extra Credit Assignment: Solve the Building Caffeine Problem


I also need to explain the expression to the right of the write operator: {i=++ ++ fiber=$R append $R}. If you want to write to two symbols in one shot then you need to pass a block where all the elements in the block are vectors with two elements. The first element in each vector corresponds to the first row/symbol and so on. But what on earth is “++ ++”? This is a little bit of magic that I invented to get around a problem I have been having repeatedly while working these concurrency problems. When I created signals and the O2 blackboard I designed it to avoid storing duplicate data, meaning that if you write to the blackboard and you write the same values that were already there before (for a particular symbol) it becomes a no-op. This makes it possible to handle huge numbers of symbols and columns in the blackboard. There are other reasons as well which I will discuss on my future dedicated post about the blackboard. But it has become a problem in trying to use the blackboard for concurrency and messaging between fibers. In this case nothing will change between laps so writing to the blackboard will be a noop which would cause the whole program to reach a deadlock state. To solve this problem I created a new datatype that I am calling an IncrOp. It is just another kind of vector like long, string, or double, but it has a special behavior in conjunction with the write operator, where it will increment whatever value was there prior, for that symbol. You can use this whenever you need to ensure that you don’t write duplicate data to the blackboard by accident.

      =(#bound + $h.fiber) dispatch 1l

Now wait for both fibers to write back. The left hand argument to dispatch is a vector of both symbols. The right hand argument tells dispatch to wait for one response on each symbol. Waiting on both fibers here ensures that we do not release the take block until both hydrogens acknowledge the binding relationship.

      =cwrite (string $R) + " done"
    }
    <-oxygen $R
  }

Exit the take block, letting the next oxygen go, and repeat.

  hydrogen:
  {
    =(#wait + $R) write eval {i=++ fiber=$R}

Now we move on to the hydrogen fiber. This line writes to the blackboard signaling that this hydrogen fiber is waiting to bond.

    o=(#init + $R) dispatch 1l

Wait for a response from the oxygen fiber.

    =cwrite (string $R) + " getting bound to " + string $o.fiber
    =(#bound + $R) write eval {i=++ fiber=$R}

Once we get a signal from the oxygen fiber, write back.

    <-hydrogen $R
  }

Rinse, wash and repeat.

  h0=fiber {<-hydrogen #h,0l}
  h1=fiber {<-hydrogen #h,1l}
  h2=fiber {<-hydrogen #h,2l}
  h3=fiber {<-hydrogen #h,3l}
  h4=fiber {<-hydrogen #h,4l}
  o0=fiber {<-oxygen #o,0l}
  o1=fiber {<-oxygen #o,1l}
  o2=fiber {<-oxygen #o,2l}
  o3=fiber {<-oxygen #o,3l}
  <-wait $h0
}

Setup a bunch of hydrogen and oxygen threads and let'em rip. Here is the entire program without commentary. Even though the explanations can be very verbose, the program itself is very concise.

{
  oxygen:
  {
    h=#wait dispatch 2l
    =#bond take
    {
      =cwrite (string $R) + " initiating bonds with " + " " delimit string $h.fiber
      =(#init + $h.fiber) write eval {i=++ ++ fiber=$R append $R}
      =(#bound + $h.fiber) dispatch 1l
      =cwrite (string $R) + " done"
    }
    <-oxygen $R
  }
  hydrogen:
  {
    =(#wait + $R) write eval {i=++ fiber=$R}
    o=(#init + $R) dispatch 1l
    =cwrite (string $R) + " getting bound to " + string $o.fiber
    =(#bound + $R) write eval {i=++ fiber=$R}
    <-hydrogen $R
  }
  h0=fiber {<-hydrogen #h,0l}
  h1=fiber {<-hydrogen #h,1l}
  h2=fiber {<-hydrogen #h,2l}
  h3=fiber {<-hydrogen #h,3l}
  h4=fiber {<-hydrogen #h,4l}
  o0=fiber {<-oxygen #o,0l}
  o1=fiber {<-oxygen #o,1l}
  o2=fiber {<-oxygen #o,2l}
  o3=fiber {<-oxygen #o,3l}
  <-wait $h0
}

Output of the Building H2O Solution

When you run this program you will get output that looks like this:

Welcome to O2, ask brian if you need help
O2>eval fload "../../src/water.o2"
#o,0l initiating bonds with #h,1l #h,0l
#h,1l getting bound to #o,0l
#h,0l getting bound to #o,0l
#o,0l done
#o,1l initiating bonds with #h,2l #h,3l
#h,2l getting bound to #o,1l
#h,3l getting bound to #o,1l
#o,1l done
#o,3l initiating bonds with #h,4l #h,1l
#h,4l getting bound to #o,3l
#h,1l getting bound to #o,3l
#o,3l done
#o,0l initiating bonds with #h,0l #h,2l
#h,0l getting bound to #o,0l
#h,2l getting bound to #o,0l
#o,0l done
#o,2l initiating bonds with #h,3l #h,4l
#h,3l getting bound to #o,2l
#h,4l getting bound to #o,2l
#o,2l done
#o,3l initiating bonds with #h,1l #h,0l
#h,1l getting bound to #o,3l
#h,0l getting bound to #o,3l
#o,3l done
#o,0l initiating bonds with #h,2l #h,3l
#h,2l getting bound to #o,0l
#h,3l getting bound to #o,0l
#o,0l done
#o,2l initiating bonds with #h,4l #h,1l
#h,4l getting bound to #o,2l
#h,1l getting bound to #o,2l
#o,2l done
#o,1l initiating bonds with #h,0l #h,2l
#h,0l getting bound to #o,1l
#h,2l getting bound to #o,1l
#o,1l done
#o,3l initiating bonds with #h,3l #h,4l
#h,3l getting bound to #o,3l
#h,4l getting bound to #o,3l
#o,3l done
#o,0l initiating bonds with #h,1l #h,0l
#h,1l getting bound to #o,0l
#h,0l getting bound to #o,0l
#o,0l done
#o,2l initiating bonds with #h,2l #h,3l
#h,2l getting bound to #o,2l
#h,3l getting bound to #o,2l
#o,2l done
#o,1l initiating bonds with #h,4l #h,1l
#h,4l getting bound to #o,1l
#h,1l getting bound to #o,1l
#o,1l done
#o,3l initiating bonds with #h,0l #h,3l
#h,0l getting bound to #o,3l
#h,3l getting bound to #o,3l
#o,3l done
#o,0l initiating bonds with #h,2l #h,4l
#h,2l getting bound to #o,0l
#h,4l getting bound to #o,0l
#o,0l done
#o,1l initiating bonds with #h,1l #h,0l
#h,1l getting bound to #o,1l
#h,0l getting bound to #o,1l
#o,1l done
#o,3l initiating bonds with #h,3l #h,2l
#h,3l getting bound to #o,3l
#h,2l getting bound to #o,3l
#o,3l done
#o,0l initiating bonds with #h,4l #h,0l
#h,4l getting bound to #o,0l
#h,0l getting bound to #o,0l
#o,0l done
#o,2l initiating bonds with #h,1l #h,2l
#h,1l getting bound to #o,2l
#h,2l getting bound to #o,2l
#o,2l done

The program is forming the water molecules as expected. You can see that the three required fibers are passing through the barrier together and that they are accurately recording which other fibers they are interacting with. Awesome.

Conclusion - O2 Fibers Can Do Request/Response

In this program we got to see how O2's concurrency operators write, dispatch and take can be used to implement complex transactional messaging between fibers. This solution rests on the ability to have either fiber waiting on the other one using the dispatch operator, and also the dispatch operator's unique ability to wait on messages from multiple fibers. This is yet another example of how array programming concepts can be usefully applied to concurrency problems.

This solution also shows how write and dispatch can be used together to implement a request-response style of communication between fibers. Here I used symbols to create a crude protocol for correlating the requests and responses. I might come up with a simpler way to do this in the future. This is post seven in my twelve part series on concurrency problems in O2.

Posted in Examples | Tagged: | Leave a Comment »

The Readers-Writers Problem In O2 (Part II)

Posted by Brian Andersen on April 7, 2012

In my previous post I solved the first variation of the Readers-Writers Problem. To recall, there are multiple reader fibers that can operate concurrently; at the same time. But there is a writer thread that must block all of the readers before it can proceed. Solving the basic version of this problem in O2 is quite easy because O2′s take operator allows you to acquire multiple locks in an atomic all-or-none fashion. However all is not yet well because this solution would expose your writer thread to the risk of starvation. If the reader threads are too aggressive, it is possible that the writer thread would never or rarely be able to gain all of the locks required in order to enter its critical section.

Good Night New York


In a database application this could mean that all the readers are getting data, but the data they are getting is stale because the writer could not update the database. So let’s assume you have an application where you want to serve incoming readers as quickly as possible. But you want any changes (from the writer) to be seen by readers as quickly as possible. The first thing you want to do is make sure readers should not wait for each other. We did that in Part I. The second thing you want to do is ensure that when the writer arrives, any waiting readers can finish with their work, but new readers will wait until the writer is done. In traditional solutions this is accomplished using shared counter variables and several semaphores to manage concurrency. The O2 solution will have the same effect but is, in my opinion, easier to understand and implement correctly. We will use the blackboard as a communication channel for the writer to tell the readers that they need to wait.

Good Morning Boulder

Solution to the Readers-Writers Problem

{
  writer:
  {
    =#wait write {hold=true}

We begin by defining an operator for the writer. The first thing the writer does is write to the blackboard under the symbol #wait. There is nothing magical about the symbol #wait, but it has to be agreed upon by the reader and the writer. The message written has a variable “hold” with the boolean value true. This is going to signal to the reader that the writer is present and that it should wait.

    =$R take
    {
      =cwrite "writer begins writing"
      =sleep randoml 1 0 1000l
      =cwrite "writer done writing"
      =#wait write {hold=false}
    }

In this case the right argument $R receives a vector containing multiple symbols, one for each reader. This vector becomes the left argument to the take operator, which suspends evaluation until the writer fiber can gain exclusive access to all of the symbols in $R. This means that it will wait for any readers that are already in progress. When the writer is done, it writes the value false to the hold variable under the symbol #wait. This signals waiting readers that they may now proceed, since the writer is done.

    =sleep randoml 1 0 1000l
    <-writer $R
  }

Let the writer sleep for a while and then continue by reentering. This is the end of the writer operator. A quick note about nomenclature. Operators are equivalent to functions or subroutines in other languages. I call them operators because they are allowed to receive two arguments. One on the right, called $R, and one on the left, called $L. $L is always optional but the syntax requires that you always pass something into $R, even if it is not referenced. Traditionally in functional programming, functions can only receive a single argument, but that argument is sometimes a list or a dictionary containing multiple values. In O2 we have monadic operators that take one argument on the right ($R) and dyadic operators that take arguments on both the right and the left ($R and $L respectively).

  reader:
  {
    signal=#wait last 0l
    =if
    {
      test:$signal.hold
      then:#wait last lines $signal
      else:{}
    }

The reader begins every iteration by reading the most recent state of the #wait symbol in the blackboard. It does this by using the last operator which takes one or more symbols on the left, and on the right, it takes the line number in the blackboard where you want to start reading. By passing 0l (zero) here, the reader ensures that last will always return immediately with the most recent state. If it received a right argument greater than 0l, last would block until the blackboard had an appropriate value at or beyond the requested line number. The value returned from last will be a signal, which is a tabular data structure that can have multiple rows and columns. In this case there will be only one column; hold, and one row for the symbol #wait. Then we use the if operator to test the value of the hold column; using the dot notation to reference columns in the signal value ($signal.hold). If hold is set to true, reader will call last again, but this time the goal is to block until the value of #wait changes. The lines operator is used to figure out how many lines were in the blackboard at the point when $signal was created. This ensures that the reader will not miss any updates to #wait even though there is no concurrency control between the two calls to last. It's just a couple lines of code but a lot of explanation is required. That is why I broke this into two posts. The net result of all of this is that the reader will now check to see if the writer is trying to write. If the writer is trying to write, the reader will wait until the writer sets hold=false again.

    =$R take
    {
      =cwrite "reader " + (string $R) + " begins reading"
      =sleep randoml 1 0 1000l
      =cwrite "reader " + (string $R) + " done reading"
    }
    <-reader $R
  }

At this point the reader is ensured that the writer is either not waiting, or if it is, has not been waiting for long because hold was set to false just moments ago when the writer checked on it. So the reader will go again and use take to enter its critical section, acquiring the lock that corresponds to just this reader fiber. This code is identical to the reader code from the prior post with one exception. In last week's example the reader would sleep for a while after the take block. This was to allow the writer thread a chance to occasionally get all the locks. With the improved concurrency control to avoid starving the writer thread, this is now unnecessary. So I have removed the sleep call to simulate a very aggressive reader.

  s:#readers,0l #readers,1l #readers,2l #readers,3l #readers,4l
  =#wait write {hold=false}

$s is a vector of symbols that correspond to each reader fiber. Note that symbols are tuples (ordered lists of typed scalar values). The elements of each tuple are separated by commas (,). This allows the blackboard to represent hierarchical data structures and also gives you a neat way to keep many symbols and groups of symbols organized.

  w=fiber {<-writer $s}
  r={<-fiber {<-reader $R}} each $s
  <-wait $w
}

Now launch a single writer and multiple reader fibers. Wait indefinitely for the writer to finish. Here is the entire piece without commentary.

{
  writer:
  {
    =#wait write {hold=true}
    =$R take
    {
      =cwrite "writer begins writing"
      =sleep randoml 1 0 1000l
      =cwrite "writer done writing"
      =#wait write {hold=false}
    }
    =sleep randoml 1 0 10000l
    <-writer $R
  }
  reader:
  {
    signal=#wait last 0l
    =if
    {
      test:$signal.hold
      then:#wait last lines $signal
      else:{}
    }
    =$R take
    {
      =cwrite "reader " + (string $R) + " begins reading"
      =sleep randoml 1 0 1000l
      =cwrite "reader " + (string $R) + " done reading"
    }
    <-reader $R
  }
  s:#readers,0l #readers,1l #readers,2l #readers,3l #readers,4l
  =#wait write {hold=false}
  w=fiber {<-writer $s}
  r={<-fiber {<-reader $R}} each $s
  <-wait $w
}

Example Output of the Readers-Writers Solution

Now let's run this program and see how it compares to the old version without the extra layer of communication to avoid starving the writer thread.

Welcome to O2, ask brian if you need help
O2>eval fload "../../src/rwnostarve.o2"
writer begins writing
{:2l :3l :4l :5l :6l}
O2>writer done writing
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
reader #readers,3l done reading
reader #readers,4l done reading
writer begins writing
writer done writing
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
reader #readers,3l done reading
reader #readers,4l done reading
writer begins writing
writer done writing
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
reader #readers,3l done reading
reader #readers,4l done reading
writer begins writing
writer done writing
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
reader #readers,3l done reading
reader #readers,4l done reading
writer begins writing
writer done writing
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
reader #readers,3l done reading
reader #readers,4l done reading
writer begins writing
writer done writing
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
reader #readers,3l done reading
reader #readers,4l done reading
writer begins writing
writer done writing
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l done reading
reader #readers,4l done reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
writer begins writing
writer done writing
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l done reading
reader #readers,4l done reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l done reading
reader #readers,4l done reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l done reading
reader #readers,4l done reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
reader #readers,3l begins reading
reader #readers,4l begins reading
reader #readers,0l begins reading
reader #readers,1l begins reading
reader #readers,2l begins reading
reader #readers,3l done reading
reader #readers,4l done reading
reader #readers,0l done reading
reader #readers,1l done reading
reader #readers,2l done reading
writer begins writing
writer done writing

Discussion

So does it work or what?

Yes.

  • The reader fibers are able to read concurrently (at the same time).
  • The writer fiber never begins writing until all readers are done reading.
  • The writer fiber never waits very long to get all of the locks even though the readers are very aggressive.

Why are the readers synchronized with each other?

You might also notice that the readers seem to be synchronized with each other. In most cases all five readers will finish before the first one begins again. After doing some research I found that this is due to a bug in my randoml operator, which is based on the .net Random class. You can pass your own seed into the left argument of randoml, but for simplicity's sake I don't do that in these examples. What happens is that each fiber creates its own instance of the Random class using the default constructor. The default constructor uses a seed that is based on the current time. But because the clock has a limited resolution the Random instances created by each fiber can end up having the same seed, and the producing the same values. This causes the reader fibers to sleep for the same amount of time and wake up at the same time. The recommended fix for this in the .net documentation is to share an instance of the Random class. I might do that but at the moment I don't have a good strategy for sharing instances of values across operators. Operators are always stateless unless they interact with common infrastructure like the blackboard. So I need to meditate a bit on how to proceed. It's kind of a tangent to this post, but a question that could come up.

What about the Third Readers-Writers Problem?

In the CS literature there is also a discussion of a Third Readers-Writers problem. The first one is the naive version that I did in my prior post. The second one is the problem of giving preference to writers, which we have just discussed. But what happens if the writer is too greedy and prevents readers from reading? This could be just as bad as seeing stale data in your database. Instead of stale data, you get no data at all because the writer keeps updating the database and never gives you a chance to read! If I were using classical concurrency control primitives like semaphores I would have to do a third post to solve this problem, and the solution could get very hairy. Fortunately O2's take operator always enforces first-in-first-out semantics. This means that if a symbol has been released that would allow two fibers to proceed, the one that has been waiting the longest will get to go first. This ensures that when the writer releases its symbols, any waiting readers will get to go before the writer goes again.

Where would I use a solution like this?

I would like to point out that you don't ever need to use O2's concurrency control facilities like take and dispatch to safely read and write from the blackboard. Those operations will always be thread-safe to the extreme. The purpose of fiber-level concurrency control is to give you control over how your fibers interact with or avoid interfering with each other. For example, you could have a need to update a database, filesystem, or other external hardware while other fibers are using it. Fibers are not heavy like threads. Potentially they can be used to keep tabs and coordinate things out in the real world, like robots in a factory, soldiers on a battlefield, orders on a trading floor, etc... Object Oriented Programming encouraged us to think about the world as a collection of objects sending messages to each other. I would rather think of the world as a collection of processes that communicate, coordinate, and compete in order to reach their goals. That is why I am spending so much time on concurrency in O2. It has to work and has to be done right. This is post number six which means I am halfway done.

Posted in Examples | Tagged: | Leave a Comment »