briantheprogrammer

Home of the O2 Programming Language

A Publish-Subscribe Server In O2

Posted by Brian Andersen on August 29, 2012

Today’s post is the third part in my new series on distributed computing applications in O2. In the first post I demonstrated O2′s messaging operators using a Simple Echo Server. In the second post I demonstrated how to use O2′s messaging facility to build a basic point-to-point Chat Server. In this post I will take these ideas one step further and build a Publish-Subscribe server.

A Publish-Subscribe system is a little like the chat system but with an extra level of indirection. Like the chat model, clients may publish messages to a central server, but rather than explicitly specifying who the recipient is they direct their message at a particular “topic”. Then, other clients can subscribe on particular topics. This model is analogous to a chat room or news service. The code for the Publish-Subscribe server will be a little more complicated than the chat server, but not excessively so.

The Code

{
  server:{
    publish:{

We begin with the definition of the server, which will be a bot, and publish, which will be a fiber running inside the server bot.

      message:$L accept 1l
      :$message.body.verb switch {

The publish fiber accepts one message at a time from the left argument which is an integer handle to a server connection created with the listen operator. This is following the same pattern as my chat demo. The server accepts two types of messages indicated using the “verb” variable on the message body. The two allowed verbs are #publish and #listen.

        publish: {
          start: (#publish + $message.body.topic) write eval
            {i:++ text:$message.body.text}

The handler for the verb #publish will write the message that was published into the blackboard under the topic specified in the message body.

          : ((#listen + $message.body.topic) peek 0l) switch {

Now we need to check to see if there are any listeners waiting for messages on the topic that that has just been published. If there are, we must deliver the messages to the waiting clients. Now comes the part that is a little bit tricky to implement using the blackboard.

            :{ listeners:(#listen + $message.body.topic) gawk 0l
               : ($listeners.S part 2 3 4l) reply eval
                 {topic:$message.body.topic start:$start messages:$message.body.text}
               all:(#ids + $listeners.S part 2 3 4l) dispatch 0l
               topics:(#listen + $all.S part 4 1 2 3l) dispatch 0l
             }

A few quick points will help me to explain this section.

  • The #listen messages that arrive from clients may contain multiple topics.
  • Likewise, the resulting content messages that are delivered may contain multiple messages spanning one or more topics.
  • To keep matters simple and robust, I want each listen request to get exactly one response, but the response may be delayed until appropriate content arrives.
  • You can dispatch records from the blackboard using prefixes. For example #x dispatch 1l will dispatch 1 record even if the the symbol is #x,a #x,b or #x,c
  • But you cannot dispatch records using suffixes. For example if the blackboard contains symbols #a,x #b,x #c,x there is no way to dispatch all the records with symbols ending in “x”.

So for each listen request we have multiple topics and a single id. When a request comes in we need to write multiple entries into the blackboard. There will be one entry for combination of id and topic and a second set of entries for each combination of topic and ids. This is so that you can fetch all the topics for an id, or all the ids for a given topic with equal ease. So in this section we first find all the listeners for the topic on the incoming message. Those listeners get a reply. Then we need to dispatch those requests and all the other records having the same topic. Otherwise the server would later try to reply to the same request multiple times. The part operator is to rearrange the individual elements in a vector of symbols.

          }
          :$message.id reply {ack=true}
        }
        listen:{

Reply back to the publish request. This is a good practice so the client can know that their request was accepted by the server. Now let’s look at the listen fiber in the server.

          messages:(#publish + $message.body.topic) poll $message.body.start

Poll is a new, special operator. It is the only operator for accessing the blackboard that will return an empty set of results, and will never suspend. Instead of returning an empty set, read and dispatch will wait until they have has a result that meets the specified constraints. Poll however, will always yield immediately and will return an empty series if there are no matching rows available. But there is more. read let’s you specify a line in the blackboard to start reading from and the number of rows per symbol you want. Poll lets you specify a different starting row for each and every symbol! This is how clients of the Publish-Subscribe server can make sure they get every message for their chosen topics, and get each message only once. They maintain a list of starting points for each topic.

          : (0l < count $messages) switch {
            :$message.id reply eval
              {topic:$message.body.topic start:lines $messages messages:$messages.text}

If poll returns at least one message (ie there is at least one message matching the client's subscription) the server replies back to that request.

            :{  : (#listen + $message.body.topic + $message.id) write eval {
                  i: (count $message.body.topic) repeat ++
                  start:$message.body.start
                }
                : (#ids + $message.id + $message.body.topic) write eval {
                  i: (count $message.body.topic) repeat ++
                }
             }

Otherwise the request goes into a holding queue. This is that part where we have to write records for all topics by id and all ids by topic.

          }
        }
      }
      <-$L publish $R
    }
    :fiber {<-(listen $L) publish 0l}
    <-wait 0l
  }

Then we finish off the publish fiber and the server bot in the usual fashion. Now to discuss the client bot.

  client:{
    outbox:{
      topic:0l from shuffle $R.pub
      text:"This is a message about " + string $topic
      :cwrite "(" + (format $R.pub) + ") SEND:" + $text
      :receive $L send eval {verb:#publish topic:$topic text:$text}
      <-$L outbox $R
    }

This is the client outbox fiber. The outbox is a loop that publishes a message on a randomly selected topic on each turn. The topics are selected from a list supplied in the right argument of the outbox operator.

    inbox:{
      starts:$R last 0l
      sub:eval {verb:#listen topic:$starts.topic start:$starts.start}
      message:receive $L send $sub
      :cwrite "(" + (format $R) + ") RECV:" + $message.body.messages
      :$message.body.topic write eval {
        topic:$message.body.topic start:(count $message.body.topic) repeat $message.body.start}
      <-$L inbox $R
    }

The inbox sends requests to the server in a loop. Each request specifies a list of topics to subscribe and also a list of starting points which indicate how far back in time the server should look for events with that topic. The responses sent by the server always provide a line number so that clients can be sure to get all of the messages. In this example I wrote that number into the blackboard between iterations. I could have passed the line numbers as a parameter to the inbox operator as well.

    :fiber {
      :$R.sub write eval {topic:$R.sub start:(count $R.sub) repeat 0l}
      <-(open $L) inbox $R.sub
    }

At the end of the client bot definition we launch the inbox fiber...

    :fiber {<-(open $L) outbox $R}
    <-wait 0l
  }

And the outbox fiber.

  :bot {<-#tcp,9090l server 0l}
  :bot {<-#tcp,localhost,9090l client {pub:#sports #weather sub:#news #weather}}
  :bot {<-#tcp,localhost,9090l client {pub:#news #sports sub:#sports #weather}}
  :bot {<-#tcp,localhost,9090l client {pub:#weather #news sub:#sports #news}}
  <-wait 0l
}

At the top level start the server bot and three client bots. Each client is configured to publish on two topics and to subscribe on two different topics. Here is the entire program.

{
  server:{
    publish:{
      message:$L accept 1l
      :$message.body.verb switch {
        publish: {
          start:(#publish + $message.body.topic) write eval
            {i:++ text:$message.body.text}
          : ((#listen + $message.body.topic) peek 0l) switch {
            :{ listeners:(#listen + $message.body.topic) gawk 0l
               : ($listeners.S part 2 3 4l) reply eval
                 {topic:$message.body.topic start:$start messages:$message.body.text}
               all:(#ids + $listeners.S part 2 3 4l) dispatch 0l
               topics:(#listen + $all.S part 4 1 2 3l) dispatch 0l
             }
          }
          :$message.id reply {ack=true}
        }
        listen:{
          messages:(#publish + $message.body.topic) poll $message.body.start
          : (0l < count $messages) switch {
            :$message.id reply eval
              {topic:$message.body.topic start:lines $messages messages:$messages.text}
            :{  : (#listen + $message.body.topic + $message.id) write eval {
                  i:(count $message.body.topic) repeat ++
                  start:$message.body.start
                }
                : (#ids + $message.id + $message.body.topic) write eval {
                  i:(count $message.body.topic) repeat ++
                }
             }
          }
        }
      }
      <-$L publish $R
    }
    :fiber {<-(listen $L) publish 0l}
    <-wait 0l
  }
  client:{
    outbox:{
      topic:0l from shuffle $R.pub
      text:"This is a message about " + string $topic
      :cwrite "(" + (format $R.pub) + ") SEND:" + $text
      :receive $L send eval {verb:#publish topic:$topic text:$text}
      <-$L outbox $R
    }
    inbox:{
      starts:$R last 0l
      sub:eval {verb:#listen topic:$starts.topic start:$starts.start}
      message:first receive $L send $sub
      :cwrite "(" + (format $R) + ") RECV:" + $message.body.messages
      :$message.body.topic write eval {
        topic:$message.body.topic start:(count $message.body.topic) repeat $message.body.start}
      <-$L inbox $R
    }
    :fiber {
      :$R.sub write eval {topic:$R.sub start:(count $R.sub) repeat 0l}
      <-(open $L) inbox $R.sub
    }
    :fiber {<-(open $L) outbox $R}
    <-wait 0l
  }
  :bot {<-#tcp,9090l server 0l}
  :bot {<-#tcp,localhost,9090l client {pub:#sports #weather sub:#news #weather}}
  :bot {<-#tcp,localhost,9090l client {pub:#news #sports sub:#sports #weather}}
  :bot {<-#tcp,localhost,9090l client {pub:#weather #news sub:#sports #news}}
  <-wait 0l
}

As you can see this is one of the bigger O2 programs so far. But it is still under 70 lines incorporating both the client and server code.

The Output

Welcome to O2, ask brian if you need help
O2>eval fload "../../demo/messaging/publish.o2"
(#sports #weather) SEND:This is a message about #weather
(#news #sports) SEND:This is a message about #news
(#weather #news) SEND:This is a message about #weather
(#news #weather) RECV:This is a message about #weather
(#sports #weather) RECV:This is a message about #weather
(#sports #weather) SEND:This is a message about #weather
(#sports #news) RECV:This is a message about #news
(#news #sports) SEND:This is a message about #news
(#weather #news) SEND:This is a message about #weather
(#sports #weather) SEND:This is a message about #weather
(#news #weather) RECV:This is a message about #news
(#news #weather) RECV:This is a message about #weather
(#sports #weather) RECV:This is a message about #weather
(#news #sports) SEND:This is a message about #news
(#sports #news) RECV:This is a message about #news
(#weather #news) SEND:This is a message about #news
(#sports #weather) SEND:This is a message about #weather
(#news #weather) RECV:This is a message about #news
(#news #weather) RECV:This is a message about #weather
(#sports #weather) RECV:This is a message about #weather
(#news #sports) SEND:This is a message about #news
(#sports #news) RECV:This is a message about #news
(#weather #news) SEND:This is a message about #news
(#news #weather) RECV:This is a message about #news
(#news #weather) RECV:This is a message about #weather
(#sports #weather) SEND:This is a message about #weather
(#sports #weather) RECV:This is a message about #weather
(#news #sports) SEND:This is a message about #news
(#sports #news) RECV:This is a message about #news
(#weather #news) SEND:This is a message about #news
(#sports #weather) SEND:This is a message about #weather
(#news #weather) RECV:This is a message about #news
(#news #weather) RECV:This is a message about #weather
(#sports #weather) RECV:This is a message about #weather

In this example I used the subscription to identify the client rather than my usual practice of assigning a number. This way it is easy to see that the messages are being delivered to the correct client and each message is delivered exactly once.

Leave a Comment

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>