Streams

Add compression to Server-Sent events with Undertow

With this post, we’ll tell you how we added compression support to a Server-Sent Events server built with Undertow.

Our goal was to validate that this compressed flow would directly be understood by our favorite browsers: Chrome, Firefox, Safari, even IE, and possibly more…

The context

At Streamdata.io, we build up an incremental API cache proxy solution.

What the heck is that?
What problem does this solve and how?

If you get used to reading our blog posts, you may already know, but let’s set the scene and make it clear to everyone.

If you want your app to display fresh data from your favorite REST/JSON API you probably do polling at a frequency rate you define (some seconds) to your back-end server.

polling

When you want this to be more efficient you probably use a cache in front (some kind of varnish cache proxy).

polling with a cache proxy

Most of your clients will benefit from the cache and your server will see its load decrease. But you’re still polling from the client, potentially getting the same data from the cache (request #2).

There’s more that can be done, and we have a solution.

You can delegate the request to our service (the incremental API cache proxy thing). It will take care of the polling tasks, optimizing both the method in which the pollings are made and the data flow.

What do we do? After each polling, we cache the retrieved data set and calculate the difference with the previous one in the form of a patch.

Thus, we are able to send only patches to client apps and drastically reduce the amount of data exchanged to the required minimum to keep data up to date.

Server-Sent event proxy

We do it using the standard JSON-Patch format (here is the rfc).

We do it using the standard HTML5 Server-Sent Events to easily integrate with browsers and any HTTP environment.

The cherry on top is that it adds the push capability. This means no useless data transmitted to the client, no polling for nothing, and connection is kept opened.

Nice isn’t it?

Our concern

Now that you have the big picture of what Streamdata.io can do for you and how, let’s dive in our topic.

At Streamdata.io, we focus on performance in order to provide the best developer experience.

We wanted to take it a step further in reducing volume of data exchanged between clients and our proxy.

We picked an obvious idea in the first place:

“Let’s add compression”

  • this should be easy as we are using HTTP standards.
  • this should be efficient as we are talking JSON, meaning text format, which will benefit strongly from compression.

Okay, no sooner said than done, we started working on the topic.

But getting to a working implementation was not a long, quiet road.

No big surprises here for a developer guy, but at least a good story to tell. 🙂

The journey to deflation

Isn’t that as simple as setting an option somewhere ? Where’s the “use compression” checkbox, dammit!

Well,  it’s not exactly that simple. Now it’s time to get your hands dirty.

Here’s why:

We chose to use Undertow as the core component for our proxy:

  • Undertow is a flexible Web server: you can add and remove features easily.
  • Undertow is an open source project sponsored by JBoss. It’s the core web server of the Wildfly application server. It can be relied on.
  • Undertow is lightweight and can be customized to be as close as possible to your needs.
  • Undertow comes with lots of feature implementations you can directly use or get inspired from.

Here’s how you can build a web server quickly.

public class HelloWorldServer {

public static void main(final String[] args) {
  Undertow server = Undertow.builder()
    .addHttpListener(8080, "localhost")
    .setHandler(new HttpHandler() {
        @Override
        public void handleRequest(final HttpServerExchange exchange) throws Exception {
          // whatever the request is, say "Hello World"
          exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
          exchange.getResponseSender().send("Hello World");
       }
    }).build();
    server.start();
  }
}

As you can see, Undertow offers a handler mechanism which makes it super easy to build a handler chain and process an incoming request.

How do we do to add processing of the response content (in our case we would like to deflate it using zip)?

Well, thanks to the Undertow flexibility, this is possible, but it’s not that easy.

The Encoding Handler

Our first step was to ask google,  our findings were numerous on the handler mechanism topic, but there were very few about encoding.

The second call was to check the whole Undertow documentation: user’s guide, javadoc and source code.

We finally found out an example using a class called EncodingHandler.

We found it in a TestCase: the DeflateContentEncodingTestCase. Looks like we’re on our way.

Here’s how:


EncodingHandler handler = new EncodingHandler(new ContentEncodingRepository()
  .addEncodingHandler("deflate", new DeflateEncodingProvider(), 50, Predicates.maxContentSize(5)))
  .setNext(new HttpHandler() {
     @Override
     public void handleRequest(final HttpServerExchange exchange) throws Exception {
       // whatever the request is, say "Hello World"
       exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
       exchange.getResponseSender().send("Hello World");
     }
  });
  server.setHandler(handler);

These few lines of code add a “deflate” compression handler to the server.

But wait, what does that do exactly?

This EncodingHandler manages a repository of EncodingProvider.

With the addEncodingHandler method you can add as many encoder as you want.

Give it:

  •  the type (“deflate”),
  • the EncodingProvider to apply (Undertow provides implementations for deflate and gzip),
  • a priority (if multiple providers can be applied, the priority will be used to make a choice)
  • finally a predicate to activate the encoding (here only responses with content size > 5 ko will be encoded).

As most of the browsers nowadays manage deflate and gzip compression, let’s add a gzip EncodingProvider :


EncodingHandler handler = new EncodingHandler(new ContentEncodingRepository()
  .addEncodingHandler("gzip", new GzipEncodingProvider(), 100, Predicates.maxContentSize(5)))
  .addEncodingHandler("deflate", new DeflateEncodingProvider(), 50, Predicates.maxContentSize(5)))
  .setNext(new HttpHandler() {
    @Override
    public void handleRequest(final HttpServerExchange exchange) throws Exception {
      // whatever the request is, say "Hello World"
      exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
      exchange.getResponseSender().send("Hello World");
    }
  });
  server.setHandler(handler);

Well guys, we’re all set. The last step is testing it.

The not working Encoding Providers

As you can imagine, the story does not end here.

We had to dig deeper into Undertow’s guts.

The EncodingProvider uses a StreamSinkConduit. This is a writable conduit for byte streams going from the server to the client.

In our case the DeflateStreamSinkConduit is used and implement a write method like this:


    @Override
    public int write(final ByteBuffer src) throws IOException {
        if (anyAreSet(state, SHUTDOWN | CLOSED) || currentBuffer == null) {
            throw new ClosedChannelException();
        }
        try {
            if (!performFlushIfRequired()) {
                return 0;
            }
            if (src.remaining() == 0) {
                return 0;
            }
            //we may already have some input, if so compress it
            if (!deflater.needsInput()) {
                deflateData();
                if (!deflater.needsInput()) {
                    return 0;
                }
            }
            byte[] data = new byte[src.remaining()];
            src.get(data);
            preDeflate(data);
            deflater.setInput(data);
            deflateData();
            return data.length;
        } catch (IOException e) {
            freeBuffer();
            throw e;
        }
    }

What does not work for us here is that the deflate action is performed but no flush action is done.

The flush action is delegated to a generic StreamSinkConduit flush mechanism which is called when the response connection is closed.

For a standard HTTP Request/Response, this is working perfectly.

But in our case, we are handling the HTTP connection differently, keeping it open and pushing blocks of data (patches) when available. This is the principle of a Server-Sent Events connection.

We have then to write our own StreamSinkConduit and add a performFlush method.

This method just mimics the generic flush mechanism but we call it each time write method is called.

Our conduit overrides the write method like this:


    @Override
    public int write(final ByteBuffer src) throws IOException {
        if (anyAreSet(state, SHUTDOWN | CLOSED) || currentBuffer == null) {
            throw new ClosedChannelException();
        }
        try {

            ...

            byte[] data = new byte[src.remaining()];
            src.get(data);
            preDeflate(data);
            deflater.setInput(data);
            deflateData();

            // performing flush at each write
            performFlush(deflater);

            return data.length;

        } catch (IOException e) {
            freeBuffer();
            throw e;
        }
    }

Are we done? Not yet!

With this implementation, we come close to the conclusion but still… not working:

chrome server sent event gzip error

 

While testing with our client app, as you can see, the first block of data sent is correctly uncompressed and we can log it in the console, then a second block of data came and led to a ERR_CONTENT_DECODING_FAILED.

Same as for the flushing mechanism, the Undertow implementation assumes that we are deflating a finite response that will be flushed.

Thus, the implementation looks like this:


@Override
public boolean flush() throws IOException {
   if (currentBuffer == null) {
        if (anyAreSet(state, NEXT_SHUTDOWN)) {
            return next.flush();
        } else {
            return true;
        }
    }
    try {
       boolean nextCreated = false;
       try {
           if (anyAreSet(state, SHUTDOWN)) {
               if (anyAreSet(state, NEXT_SHUTDOWN)) {
                   return next.flush();
               } else {
                   if (!performFlushIfRequired()) {
                       return false;
                   }
                   //if the deflater has not been fully flushed we need to flush it
                   if (!deflater.finished()) {
                       deflateData();
                       //if could not fully flush
                        if (!deflater.finished()) {
                           return false;
                        }
                    }
                    final ByteBuffer buffer = currentBuffer.getResource();
                    if (allAreClear(state, WRITTEN_TRAILER)) {
                       state |= WRITTEN_TRAILER;
                       byte[] data = getTrailer();

                       if (data != null) {
                         if(additionalBuffer != null) {
                           byte[] newData = new byte[additionalBuffer.remaining() + data.length];
                           int pos = 0;
                           while (additionalBuffer.hasRemaining()) {
                               newData[pos++] = additionalBuffer.get();
                           }
                           for (byte aData : data) {
                              newData[pos++] = aData;
                           }
                           this.additionalBuffer = ByteBuffer.wrap(newData);

That’s the call to the getTrailer method here which causes trouble.


    @Override
    protected byte[] getTrailer() {
        byte[] ret = new byte[8];
        int checksum = (int) crc.getValue();
        int total = deflater.getTotalIn();
        ret[0] = (byte) ((checksum) & 0xFF);
        ret[1] = (byte) ((checksum >> 8) & 0xFF);
        ret[2] = (byte) ((checksum >> 16) & 0xFF);
        ret[3] = (byte) ((checksum >> 24) & 0xFF);
        ret[4] = (byte) ((total) & 0xFF);
        ret[5] = (byte) ((total >> 8) & 0xFF);
        ret[6] = (byte) ((total >> 16) & 0xFF);
        ret[7] = (byte) ((total >> 24) & 0xFF);
        return ret;
    }

getTrailer returns a set of bytes which indicates to the browser that our deflated flow of data is over.

Again, in a standard HTTP Request/Response scenario, it works. In our case, we need to tweak our StreamSinkConduit and get rid of this method call and let the deflated flow open as long as the Server-Sent Event connection is not closed.

It was bit of a struggle to get this one fixed.

We had great help from Mozilla support team on this particular point. we submitted a support ticket and they lead us down the correct path.

Conclusion

We built our solution basically reusing what Undertow offered us:

  • we reused the provided EncodingHandler
  • we reused and tweaked the StreamConduit to fit our needs

In the end, the final implementation required few lines of specific code but a lots of struggling.

Our list of favorites browsers agree to this implementation.

It works like a charm.

With this, we moved one step forward on the path to turn your APIs data into an awesome streaming experience.

You can clone or download here the github sample project we used above.

Give it a try with or without compression using the optional X-Sd-Compress query parameter (default is set to false).

For example use http://stockmarket.streamdata.io/prices?X-Sd-Compress=true in the URL field to test data stream with compression.

**Original source: streamdata.io blog