Adventures in HttpContext All the stuff after 'Hello, World'

Adding Http Server-Side Events to Akka-Streams

In my last blog post we pushed messages from RabbitMq to the console using Akka-Streams. We used the reactive-rabbit library to create an Akka-Streams Source for our streams-playground queue and mapped the stream to a println statement before dropping it into an empty Sink. All Akka-Streams need both a Source and Sink to be runnable; we created a complete stream blueprint to be run later.

Printing to the console is somewhat boring, so let’s take it up a notch. The excellent Spray Web Service library is being merged into Akka as Akka-Http. It’s essentially Spray built with Akka-Streams in mind. The routing dsl, immutable request/response model, and high-performance http server are all there; think of it as Spray vNext. Check out Mathias Doenitz’s excellent slide deck on kaka-http from Scala days to learn more on this evolution of Spray; it also highlights the back-pressure functionality Akka-Streams will give you for Http.

Everyone’s familiar with the Request/Response model of Http, but to show the power of Akka-Streams we’ll add Heiko Seeberger’s Akka-SSE library which brings Server-Side Events to Akka-Http. Server-Side Events are a more efficient form of long-polling that’s a lighter protocol to the bi-directional WebSocket API. It allows the client to easily register a handler which the server can then push events to. Akka-SSE adds an SSE-enabled completion marshaller to Akka-Http so your response can be SSE-aware. Instead of printing messages to the console, we’ll push those messages to the browser with SSE. This shows one of my favorite features of stream-based programming: we simply connect the specific pipes to create more complex flows, without worrying about the how; the framework handles that for us.

Changing the Original Example

If you’re interested in the code, simply clone the original repo with git clone https://github.com/mhamrah/streams-playground.git and then git checkout adding-sse to get to this step in the repo.

To modify the original example we’re going to remove the println and Sink calls from RabbitMqConsumer so we can plug in our enhanced Source to the Akka-Http sink.

def consume() = {
    Source(connection.consume("streams-playground"))
      .map(_.message.body.utf8String)
  }

This is now a partial flow: we build up the original RabbitMq Source with our map function to get the message body. Now the “other end” of the stream needs to be connected, which we defer until later. This is the essence of stream composition. There are multiple ways we can cut this up: our map call could be the only thing in this function, with our RabbitMq source defined elsewhere.

Adding Akka-Http

If you’re familiar with Spray, Akka-Http won’t look that much different. We want to create an Actor for our http service. There are just a few different traits we extend our original Actor from, and a different way plug our routing functions into the Akka-Streams pipeline.

class HttpService
  extends Actor
  with Directives
  with ImplicitFlowMaterializer
  with SseMarshalling {
  // implementation
  // ...
}

Directives gives us the routing dsl, similar to spray-routing (the functions are pretty much the same). Because Akka-Http uses Akka-Streams, we need an implicit FlowMaterializer in scope to run the stream. ImplicitFlowMaterializer provides a default. Finally, the SseMarshalling trait from Heiko Seeberger’s library provides the SSE functionality we want for our app. If you’re interested in a robust Akka-Streams sample, Heiko’s Reactive-Flows is worth checking out.

##Binding to Http

Within our actor body we’ll create our http stream by binding a routing function to an http port. This is a little different than Spray; there’s just some syntactical sugar so we can plug our routing function directly into the http pipeline:

//need an ExecutionContext for Futures
    import context.dispatcher

    //There's no receive needed, this is implicit
    //by our routing dsl.
    override def receive: Receive = Actor.emptyBehavior

    //We bind to an interface and create a 
    //Flow with our routing function
    Http()(context.system)
      .bind(Config.interface, Config.port)
      .startHandlingWith(route)

    //Simple composition of basic routes
    private def route: Route = sse ~ assets

    //Defined later
    private def see: Route = ???
    private def assets: Route = ???

If we weren’t using the Routing DSL we’d need to explicitly handling HttpRequest messages in our receive partial function. But the startHandlingWith call will do this for us; like spray-routing it takes in a routing function, and will call the appropriate route handler. New http requests will be pumped into the route handler and completed with the completion function at the end of the route.

##Adding SSE

The last piece of the puzzle is adding a specific route for SSE. We need two pieces for SSE support: first, an implicit function which converts the type produced from our Source to an Sse.Message; in this case, we need to go from a String to an Sse.Message. Secondly we need a route where a client can subscribe to the stream of server-side events.

//Convert a String (our RabbitMq output) to an SSE Message
 implicit def stringToSseMessage(event: String): Sse.Message = {
      Sse.Message(event, Some("published"))
    }

 //add a route for our sse endpoint.
 private def sse: Route = {
      path("messages") {
        get {
          complete {
            RabbitMqConsumer.consume
          }
        }
      }
    }

In order for SSE to work in the browser we need to produce a stream of SSE messages with a specific content-type: Content-Type: text/event-stream. That’s what Akka-SSE provides: the SSE Message case classes and serialization to text/event-stream. Our implicit function stringToSseMessage allows the Scala types to align so the “stream pipes” can be attached together. In our case, we produce a stream of Strings, our RabbitMq message body. We need to produce a stream of SSE.Messages so we add a simple conversion function. When a new client connects, they’ll attach themselves to the consuming RabbitMq Source. Akka-Http lets you natively complete a route with a Flow; Akka-Sse simply completes that Flow with the proper Http response for SSE.

Trying It Out

Fire up SBT and run ~reStart, ensuring you have RabbitMq running and set up a queue named streams-playground (see the README). In your console, try a simple curl command:

$ curl http://localhost:8080/messages

The curl command won’t return. Start sending messages via the RabbitMq Admin console and you’ll see the SSE output in action:

$ curl localhost:8080/messages
event:published
data:woot!

event:published
data:another message!

Close the curl command, and fire up your browser at http://localhost:8080 you’ll see a simple web page (served from the assets route). Continue sending messages via RabbitMq, and those messages will be added to the dom. Most modern browsers natively support SSE with the EventSource object. The following gist creates an event listener on the 'published' event, which is produced from our implicit string => sse function above:

There’s also handlers for opening the initial sse connection and any errors produced. You could also add more events; our simple conversion only goes from a String to one specific SSE of type published. You could map a set of case classes–preferably an algebraic data type–to a set of events for the client. Most modern browsers support EventStream; there’s no need a for an additional framework or library. The gist above includes a test I copied from the html5 rocks page on SSE.

A Naive Implementation

If you open up multiple browsers to localhost, or curl http://localhost:8080/messages a few times, you’ll notice that a published message only goes to one client. This is because our initial RabbitMq Source only consumes one message from a queue, and passes that down the stream pipeline. That single message will only go to one of the connected clients; there’s no fanout or broadcasting. You can do that with either RabbitMq or Akka-Streams, try experimenting for yourself!