Streaming

Using Spring Web Flux as a Java Client of Streamdata.io

People on mobile phones with Spring and Streamdata.io logos

Despite the fact Server-Sent Events (aka SSE) are a bit older than WebSocket, SSE is less known than WebSockets as a push technology. If you wish to get the difference between Server-Sent Events and WebSockets, you can read this article or watch one video of our great drones enchantress Audrey here.

But it seems there is a renewed interest in this technology. For instance, Spring Framework is giving it a place among its new (but not yet released) reactive stack.

Thus, in Spring Web Flux that will be part of Spring Framework 5.0, one can find a WebClient that supports Server-Sent Events. And who says SSE, says Streamdata.io!

So, let’s see how to create a great Java Client of Streamdata.io with Spring Web Flux.

Step 0: initiating a Spring Web Flux project

The best way to start a Spring Boot project is to use the Spring Initialzr (either through an IDE like IntelliJ or through the Spring website generator). So, let’s connect on the website!

Go to https://start.spring.io/ to create a SpringBoot project

As of the time of writing, neither Spring Framework 5.0 is not released nor its fellow Spring Boot 2.0.0.

So, I chose SpringBoot 2.0.0.M1 and added the “Reactive Web” as dependencies. “Reactive Web” is the new web stack of Spring based on new Spring support of reactive paradigm (see Project Reactor). By including it as dependencies, you will include a SpringBoot starter for Spring Web Flux.

Then, fill in the other information for creating a new Maven or Gradle artifact. In the rest of the blog post, I will generate a Maven project, but it’s easy to transpose what will be done for Gradle project if you’re a Gradle aficionado.

Once the project is generated, import it in your favorite IDE.

In the pom.xml (or in the gradle file), the Spring Boot starter for Spring Web Flux is declared to be used automatically:


  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
  </dependency>

By default, this starter starts a Web server. But we don’t want to start one: we want to use a Spring Web Flux client to consume streaming APIs through Streamdata.io! So, let’s tune our Spring Boot project a bit more.

Replace the main() of StreamdataioSpringWebfluxApplication with this code:


public static void main(String[] args) throws URISyntaxException {
    SpringApplication app = new SpringApplication(StreamdataioSpringWebfluxApplication.class);
    // prevent SpringBoot from starting a web server
    app.setWebApplicationType(WebApplicationType.NONE);
    app.run(args);
}

We will create a CommandLineRunner application, so let’s add such a bean:


@Bean
public CommandLineRunner myCommandLineRunner() {
   return args -> {
             // Our code will be here

   };
}

We will add the code that deals with getting Server-Sent Events from Streamdata.io into this bean. As we will use non-blocking code, basically Flux code, any reactive code declared in the CommandLineRunner will be executed in another thread. Thus, at the end of the CommandLineRunner, the program will exit.

Let’s block the main thread for the demo purpose. To do so, let’s add Mono/Flux code:


@Bean
public CommandLineRunner myCommandLineRunner() {
return args -> {
              // Our reactive code will be declared here

      // Add a block here because CommandLineRunner returns after the execution of the code
      // ... and make the code run 30 sec.
Mono.just("That's the end!")
        .delayElement(Duration.ofDays(1))
    .block();
   };
}

I have added a code that blocks the main thread 1 day. After 1 day, the program exits. That’s long enough to consume a streaming API for a demo! In a real program, you don’t exit that way.

So, that’s it for Step 0! The code can be found under the tag named, with lots of originality, “step0.

We are now ready to consume a streaming API thanks to Streamdata.io!

Step 1: connecting and printing the events

So, what's the plan?The plan is to consume Server-Sent Events from Streamdata.io with a Spring Web Flux client and print these events in the console. In other words, consume a streaming API through a Spring Web Flux client and thanks to Streamdata.io.

To do so, we will use an API that we use in demos: our well-known StockMarket API. This API emulates an API that provides a stock market. If you call it several times, some data changes from time to time. But wait! We want to consume a streaming API not a “simple” API! That’s the role (of course) of Streamdata.io that will convert it into a streaming API.

To do so, you need to create an application on the intermediate.io portal to get an application token.  The README.md of the demo gives extra details and in the end, we know that it will send the Server-Sent Events to our web client with the Stock Market API changes.

So, first, we create the final URI the web client will open (1):


@Bean
public CommandLineRunner myCommandLineRunner() {
 return args -> {
    String api = "http://stockmarket.streamdata.io/prices";
    String token = "[YOUR TOKEN HERE]";
    URI streamdataUri = new URI("https://streamdata.motwin.net/"     // (1)
                                 + api + "?X-Sd-Token=" + token);  

    ResolvableType type = forClassWithGenerics(ServerSentEvent.class, JsonNode.class); // (8)
    WebClient client = WebClient.create(); // (2)
    Flux<ServerSentEvent<JsonNode>> events =
              client.get() // (3)
                    .uri(streamdataUri) // (4)
                    .accept(TEXT_EVENT_STREAM) // (5)
                    .exchange() // (6)
           .flatMapMany(response -> response.body(toFlux(type))); // (7)

     // Subscribe to the flux
     events.subscribe(System.out::println,  // (9)
      Throwable::printStackTrace);

     // ... rest of the code
   };
}

Then, we create the client (2) and initiate an HTTP get a connection (3) with our Streamdata.io URLURL (4) specifying we want to connect to a Server-Sent Event source (5). We get the response (6) that we map into a Flux of <ServerSentEvent>.

Note that Streamdata.io sends JSON for the data of the Server-Sent Event. To tell we want a Flux of <ServerSentEvent<JsonNode>>, we have to declare a ResolvableType (8) to help Spring Flux to infer the resulting generic type and we have to use the toFlux(resolvableType) method of the class BodyExtractors (7) to convert the response of the web client into a Flux of ServerSentEvent<JsonNode>.

Then, we subscribe to the Flux to print the Server-Sent Events we receive from Streamdata.io (9). That’s it!

If all works fine, you should get on your console something like that:


ServerSentEvent [id = 'e6194c8e-a47e-4e9c-b4ae-4f78c3ee2bb7', event='data', data=[{"title":"Value 0","price":75,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 1","price":81,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 2","price":25,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 3","price":2,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 4","price":25,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 5","price":99,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 6","price":62,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 7","price":89,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 8","price":83,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 9","price":45,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 10","price":17,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 11","price":12,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 12","price":0,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 13","price":73,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 14","price":73,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"}], retry=null, comment='null']
ServerSentEvent [id = 'e2b4c3ff-be1d-4940-ad64-736f11eaac92', event='patch', data=[{"op":"replace","path":"/2/price","value":39},{"op":"replace","path":"/5/price","value":48}], retry=null, comment='null']
ServerSentEvent [id = '163af19d-04b3-4ad7-81ab-2d96ce260746', event='patch', data=[{"op":"replace","path":"/2/price","value":74},{"op":"replace","path":"/7/price","value":91},{"op":"replace","path":"/9/price","value":4},{"op":"replace","path":"/13/price","value":54}], retry=null, comment='null']
ServerSentEvent [id = '660dbe83-ffd3-40cd-b9ac-511010ae8be9', event='patch', data=[{"op":"replace","path":"/6/price","value":33},{"op":"replace","path":"/13/price","value":59}], retry=null, comment='null']

So, the first set of data (snapshot) followed by patches (JSON-Patch).

The code is under the tag “step1” (so original name, isn’t it?).

Step 2: applying patches and printing the data

So far, we’ve received the snapshot (i.e., the full set of data) and then the patches.

But usually, once you’ve received the patches, you want to apply them to the previous snapshot to re-build the snapshot with the new modifications. To do so, we need a JSON-Patch library. Let’s use zjsonpatch which is a pretty good implementation of the RFC-6902. Just add this dependency into your pom.xml (or gradle file):

<dependency>
   <groupId>com.flipkart.zjsonpatch</groupId>
   <artifactId>zjsonpatch</artifactId>
   <version>0.3.1</version>
</dependency>

 

Step 2.1: the basic solution: consume the Flux

Streamdata.io sends 3 types of events:

– data: which is for the snapshots (the full set of data). By default, only the first set of data is sent as a snapshot. The others are patches
– patch: which is for the JSON patches sent by Streamdata.io
– error: to deal with errors (API authentication failure, API server errors, etc.)

We will focus on the 2 first for our demo. So, by default, we will receive the first snapshot, then we will receive patches. When a patch is received, we need to apply it to the previous snapshot to get a brand new snapshot with the latest modification. To do so, here is a very simple implementation of a consumer of our previous Flux:


// Create the web client and the flux of events
WebClient client = WebClient.create();
Flux<ServerSentEvent> events =
client.get()
      .uri(streamdataUri)
      .accept(TEXT_EVENT_STREAM)
      .exchange()
              .flatMapMany(response -> response.body(toFlux(type)));

// Subscribe to the flux with a consumer that applies patches
events.subscribe(new Consumer<ServerSentEvent<JsonNode>>() { // (1)
  private JsonNode current; // (2)

  @Override
  public void accept(final ServerSentEvent<JsonNode> aEvent) {
     aEvent.event()
           .ifPresent(type -> {
              switch (type) {
                case "data":
                  aEvent.data()
                        .ifPresent(data -> current = data); // (3)
                  break;

                case "patch":
                   aEvent.data()
                         .ifPresent(data -> current = JsonPatch.apply(data, current)); // (4)
                   break;

                case "error":
                   aEvent.data()
                         .ifPresent(System.err::println); // (5)
                   break;

                default:
                   throw new IllegalArgumentException("Unknown type: " + type);
              }

              System.out.println(current);
        });
    }
}, Throwable::printStackTrace);
;

We subscribe to the Flux of ServerSentEvent with a custom subscriber (1). We need to store the current snapshot (2) to be able to apply the next patch received. Depending on the type of the event received, we either update the current snapshot:

– with a new snapshot received from the event of type “data” (3) or
– with the resulting application of the patch to the current snapshot if an event of type “patch” has been received (4)

We also print events of type “error” (5).

If everything is fine, you should get something like that:


[{"title":"Value 0","price":71,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 1","price":55,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 2","price":46,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 3","price":64,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 4","price":16,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 5","price":80,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 6","price":14,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 7","price":97,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 8","price":69,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 9","price":84,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 10","price":36,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 11","price":68,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 12","price":71,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 13","price":18,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 14","price":59,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"}]
[{"title":"Value 0","price":71,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 1","price":33,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 2","price":46,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 3","price":64,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 4","price":87,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 5","price":80,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 6","price":14,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 7","price":97,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 8","price":69,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 9","price":84,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 10","price":36,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 11","price":68,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 12","price":71,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 13","price":18,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 14","price":59,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"}]
[{"title":"Value 0","price":60,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 1","price":44,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 2","price":46,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 3","price":64,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 4","price":87,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 5","price":36,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 6","price":14,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 7","price":97,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 8","price":69,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 9","price":84,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 10","price":36,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 11","price":68,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 12","price":71,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 13","price":18,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"},{"title":"Value 14","price":59,"param1":"value1","param2":"value2","param3":"value3","param4":"value4","param5":"value5","param6":"value6","param7":"value7","param8":"value8"}]

 

Note: the ServerSentEvent class uses heavily the Optional as returned type for the data, the event type, the id, etc., because you can receive partial Server-Sent Events. With Streamdata.io, you always receive a ServerSent-Events with id, data, and event.

The code is committed under the tag “step2.1” (ô, big surprise for the name!)

Can we do better?

I guess so. Depending on your use-case, applying the patch in a custom subscriber can be fine. But if you are in a more reactive mood, you probably want that applying the patch be part of the Flux so that you can get a Flux of JsonNode with the patched applied in the upstream; and thus have a Flux of snapshots. And then do whatever you want with this Flux (for instance, print it ;)). That’s probably more flexible! So, let’s have a look at this case.

Step 2.2: map to a Flux of Snapshots

The idea is pretty the same as the one for the consumer except that we want to perform a transformation instead of consuming the Flux. The map() operator can take a Function<? super T, ? extends V> that converts from type T to type V the Flux. So, why not declaring one to converts our Flux of ServerSentEvent<JsonNode> into a Flux of JsonNode and map the incoming events into snapshots?

So, let’s try:


// Create the web client and the flux of events
WebClient client = WebClient.create();
Flux<ServerSentEvent> events =
    client.get()
          .uri(streamdataUri)
          .accept(TEXT_EVENT_STREAM)
          .exchange()
          .flatMapMany(response -> response.body(toFlux(type)));

 events.map(new Function<ServerSentEvent, JsonNode>() { // (1)
   private JsonNode current; // (2)

   @Override
   public JsonNode apply(final ServerSentEvent aEvent) {
      aEvent.event()
    .ifPresent(type -> {
       switch (type) {
 case "data":
   aEvent.data()
                         .ifPresent(data -> current = data); // (3)
   break;

 case "patch":
   aEvent.data()
                         .ifPresent(data -> current = JsonPatch.apply(data, current)); // (4)
   break;

case "error":
  aEvent.data()
        .ifPresent(data -> {
      throw new RuntimeException("received an error! " + data); });//(5)
          break;

default:
   throw new IllegalArgumentException("Unknown type: " + type);
                }
    });
    return current;
    }
})
// Subscribe to the flux with a consumer that applies patches
.subscribe(System.out::println,
   Throwable::printStackTrace);

That’s pretty much the same code as the one consuming straight the events. We create a Function (2) that maps the ServerSentEvent into JsonNode that represents the snasphots. Snapshots are built depending on the type of the event: “data” (3) or “patch” (4).

Note that with errors, we choose to throw a RuntimeException to stop the Flux straight. That really depends on your business logic. You may implement a retry behavior (i.e. re-creating a new web client call) or wait a certain number of times you received an error before stopping the flux (what’s the point if you receive the same error over and over again?). In all the cases, it’s always better to handle errors and stop the SSE connection in the worse errors.

So, does this work? Well, yes… and no. It actually works and does what we want. Except in some cases. What happens if the upstream flux sent by mistake a ServerSentEvent with no type (let’s say a comment)? What will happen? Well, the current snapshot will be sent again because the upstream above map() operator will be sent to this operator which will trigger the emission of an item. In our case, the re-emission of the current snapshot. That means the subscriber will get twice the current snapshot.

How can we fix that? Well… we can filter the events on the upstream to keep only events that have data and the good type. This gives something like that:


events.filter(evt -> evt.data().isPresent())
      .filter(evt -> evt.event()
                        .map(evtType -> "data".equals(evtType)
                     || "patch".equals(evtType)
                     || "error".equals(evtType)).orElse(FALSE))
      .map(new Function<ServerSentEvent, JsonNode>(() {
         // ...
      })

We can even remove the ifPresent() statement from the mapper Function and use Optional.get() method instead (your IDE may flash some warnings). Does it really work? Yes, it does!

But these three statements are really linked to each other. They serve the same purpose: giving a Flux of snapshots. So, probably a better way to get neater and more readable code would be to wrap these statements within the same transformer. How we do that?

With the as() operator:


// ...
events.as(new PatchTransformer())
       // Subscribe to the flux with a consumer that applies patches
       .subscribe(System.out::println,
  Throwable::printStackTrace);
// ...

static class PatchTransformer 
           implements Function<Flux<ServerSentEvent<JsonNode>>, Flux<JsonNode>> {

   @Override
   public Flux<JsonNode> apply(final Flux<ServerSentEvent<JsonNode>> aFlux) {
      return aFlux
               .filter(evt -> evt.data().isPresent())
               .filter(evt -> evt.event()
                             .map(evtType -> "data".equals(evtType)
                                          || "patch".equals(evtType)
                                          || "error".equals(evtType))
                             .orElse(FALSE))
               .map(new Function<ServerSentEvent<JsonNode>, JsonNode>() {
                  private JsonNode current;

                  @Override
                  public JsonNode apply(final ServerSentEvent<JsonNode> aEvent) {
                     String type = aEvent.event().get();

                     switch (type) {
                        case "data":
                           current = aEvent.data().get();
                           break;

                        case "patch":
                           current = JsonPatch.apply(aEvent.data().get(), current);
                           break;

                        case "error":
                           aEvent.data()
                                .ifPresent(data -> {
                                   throw new RuntimeException("received an error! " + data);
                                });
                           break;

                        default:
                           throw new IllegalArgumentException("Unknown type: " + type);
                     }

                     return current;
                  }
            });
    }
}

The code is committed under the tag “step2.2“.

Could we have done that differently?

Once again the answer is yes! For instance, we could have used a handler instead of a transformer with the operator handle(). The code would have looked like that:


// use a handler to apply patches and generate JsonNodes
events.handle(new BiConsumer<ServerSentEvent<JsonNode>, SynchronousSink<JsonNode>>() { // (1)
   private JsonNode current; // (2)

   @Override
   public void accept(final ServerSentEvent<JsonNode> aEvent,
                      final SynchronousSink<JsonNode> aSink) {
      aEvent.event()
           .ifPresent(type -> {
              switch (type) {
                 case "data":
                    aEvent.data()
                        .ifPresent(data -> {
                           current = data;
                           aSink.next(current); // (3)
                        });
                    break;

                 case "patch":
                    aEvent.data()
                        .ifPresent(data -> {
                           current = JsonPatch.apply(data, current); // (4)
                           aSink.next(current); // (5)
                        });
                    break;

                 case "error":
                    aEvent.data()
                        .ifPresent(data ->
                                       aSink.error(new RuntimeException( // (6)
                                             "received an error! " + data)));
                    break;

                 default:
                    throw new IllegalArgumentException("Unknown type: " + type);
              }
           });
   }
})
// Subscribe to the flux with a consumer that applies patches
.subscribe(System.out::println,
           Throwable::printStackTrace);

The code looks pretty similar to the previous ones. This times, we create a handler (1). We still store the current snapshot (2). If we receive an event of type “data“, we store it as the current snapshot and forward it to the sink via the next() method. If we receive an event of type “patch,” we compute the patch (4), store as the current snapshot, and forward this one to the sink (5). If we get an event of type “error,” we forward the error to the sink (6). Et voilà!

This code is committed under the tag “step2.3“.

There are probably other neater possible implementations for the purpose of this demo. If you find good ones, feel free to comment under this post!

Conclusion

Spring Web Flux client is a nice client to consume Server-Sent Events from Streamdata.io.

The code discussed above must be seen as a starting point to implement your own business logic. Keep in mind to handle errors carefully and pay special attention when you need to close the connection (close it always in case of “fatal error”!).

With reactive APIs like Flux, you may have several possibilities to implement one purpose as we’ve seen. Choose the one that fits you. I hope this helps.

Takeaways

What if I want to pass some headers to my API?

Server-Sent Events specification allows text/event-stream as a header. The Spring web client seems to respect this. But Streamdata.io has a workaround to do that. The solution is on the master of the GitHub.

What if I want to get only snapshots (no patches)?

It’s possible too! You just have to pass the header Accept: application/json to Streamdata.io. That’s also described in the code of the master. In this case, the code can be simplified with a couple of filter() operators (mostly the same as these declared in the PatchTransformer). Note that in most cases, patches are useful to save bandwidth: most of the time, they are far smaller than the original set of the data.

Download the whitepaper navigating the new streaming API landscape.

**Original source: streamdata.io blog