Wednesday, December 27, 2017

Simple Reuse of org.reactivestreams in CXF

I mentioned earlier that one could link RxJava2 Flowable with JAX-RS AsyncResponse with Subscriber which will do the best effort at streaming the data pieces converted to JSON array elements, see this example.

That works but requires the application code refer to both JAX-RS AsyncResponse and CXF specific JsonStreamingAsyncSubscriber (RxJava2 specific at the earlier stage), as opposed to simply returning Flowable from the resource method.

In meantime, John Ament added the initial Reactor integration code, and as part of this work John also provided the org.reactivestreams compatible JsonStreamingAsyncSubscriber to be optionally used with the CXF Reactor invoker.

As a result we've found the opportunity to do some refactoring and introduce the simple org.reactivestreams utility module which is now reused between CXF RxJava2 invoker and Reactor invoker: the common invoker code both invokers delegate to will check if JSON is expected and if yes then will register JsonStreamingAsyncSubscriber as org.reactivestreams.Subscriber with org.reactivestreams.Publisher which can be either RxJava2 Flowable or Reactor Flux.

The end result is that users can now write simpler code by returning Flowable or Flux from the service methods.

It is an interesting but simple example of reusing the org.reactivestreams aware code between different org.reactivestreams implementations.

No comments: