Popular Posts

Spring boot RxJava Integration

What is RxJava

What is RxJava

RxJava is a Java implementation of ReactiveX library for composing asynchronous and event-based programs by using observable sequences.

The building blocks of RxJava are Observables and Subscribers.

Observable is used for emitting items and Subscriber is used for consuming those items.

How it works

RxJava works like this.

Subscriber subscribes to Observable, then Observable calls Subscriber.onNext() for any number of items, if something goes wrong here is Subsciber.onError() and if all finishes fine, here is Subscriber.onCompleted().

Read more about RxJava at their official documentation

Types of Observables and Observers

The following are the different types of Observables in RxJava:

  1. Single

  2. Observable

  3. Flowable

  4. Maybe

  5. Completable

For each Observable there is one Observer

Following are the different types of Observers in RxJava:

  1. Observer

  2. SingleObserver

  3. MaybeObserver

  4. CompletableObserver

Single Observable

This tutorial focus on Single observable , which helps to make parallel calls to downstream systems.

Consider a scenario where you need construct api response by reading information from different down-steam systems.

parallel_calls
                         __(10 sec)__ SERVICE-B
                       /
                      /
CLIENT --> API ------
      <1>       <2>   \
                       \ __(14 sec)__ SERVICE-C

<1> CLIENT making call to API

<2> API makes calls to downstream service SERVICE-B and SERVICE-C

One option is, to make sequence of calls to SERVICE-B and then SERVICE-C , which takes (10+ 14= 24) seconds to respond back to CLIENT.

Other option is , to make parallel calls so that max (10, 14) = 10 seconds could be response time to your CLIENT.

Different ways we can achieve this

  1. by initiating direct threads.

  2. using executor service

  3. Reactive libraries such as RxJava.

RxJava library helps you to achieve this in seamless way to so that you dont need worry about thread pool management and cleaning up thread once task is done.

Parallel calls with Single

Single Observable is best option to make network calls

Single is used when the Observable has to emit only one value like a response from a network call.

ParallelCallsWithRxJava.java
        Single<CityStateRes> csRes = cityClient.findCityNameAsync(zipCode)
                                        .subscribeOn(Schedulers.io()); (1)

        Single<AlternateCities> csRes2 = alternateCitiesClient
                                        .findAlternateCitiesAsync(zipCode)
                                        .subscribeOn(Schedulers.io()); (2)

        CityFinder cityFinderResponse= Single.zip(csRes, csRes2,
                            (res, res2) ->  (3)
                                {
                                    CityFinder cityFinder = new CityFinder();
                                    cityFinder.setCityStateRes(res);
                                    cityFinder.setAlternateCities(res2);
                                    return cityFinder;
                                }
        ).subscribeOn(Schedulers.io()).toBlocking().value();

Single.zip will helps us to initiate parallel calls on IO Scheduler.

Schedulers

If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.

  • IO — This scheduler used for IO related stuff. Such as network requests, file system operations. IO scheduler is backed by thread-pool which are re-used based on availability.

Spring boot RxJava Integration

RxJavaHooks helpful to pass parent Thread context(Tomcat worker thread) to Child (RxJava )Threads.

RxJavaHooks useful

  • Passing Parent Thread Context(custom headers) to child Threads(RxJava Subscribers)

  • Sleuth Integration with RxJava (To pass TraceId and spanId)

Create RxJava Action0 instance

ContextHandoverAction.java
package com.tvajjala.reactive.spring.hooks;

import brave.Span;
import brave.Tracer;
import com.tvajjala.reactive.spring.context.ThreadContext;
import com.tvajjala.reactive.spring.context.ThreadContextHolder;
import rx.functions.Action0;

public class ContextHandoverAction implements Action0{


    private Action0 actual;
    private final Tracer tracer;
    private final Span parent;
    private final ThreadContext threadContext;


    public ContextHandoverAction(Action0 actual, ThreadContext threadContext, Tracer tracer ){
        this.actual=actual;
        this.threadContext=threadContext;
        this.tracer = tracer;
        this.parent = this.tracer.currentSpan();
    }


    @Override
    public void call() {

        Span span = this.parent;
        boolean created = false;
        if (span != null) {
            span = this.tracer.toSpan(this.parent.context());
        } else {
            span = this.tracer.nextSpan().name("rxjava").start();
            span.tag("thread", Thread.currentThread().getName());
            created = true;
        }

        try {
            Tracer.SpanInScope ws = this.tracer.withSpanInScope(span);
            Throwable var4 = null;

            try {
                ThreadContextHolder.setContext(threadContext); //(1)
                this.actual.call();
            } catch (Throwable var20) {
                var4 = var20;
                throw var20;
            } finally {
                if (ws != null) {
                    if (var4 != null) {
                        try {
                            ws.close();
                        } catch (Throwable var19) {
                            var4.addSuppressed(var19);
                        }
                    } else {
                        ws.close();
                    }
                }

            }
        } finally {
            ThreadContextHolder.clear(); //(2)
            if (created) {
                span.finish();
            }

        }

    }
}
  1. Passing ThreadContext

  2. Must clear after thread execution to avoid Memory issues

Registering RxJavaHooks

Register RxJavaHooks after application startup

filename.java
package com.tvajjala.reactive.spring;

import brave.Tracer;
import com.tvajjala.reactive.spring.context.ThreadContextHolder;
import com.tvajjala.reactive.spring.hooks.ApplicationPostInitializationHook;
import com.tvajjala.reactive.spring.hooks.ApplicationPreInitializationHook;
import com.tvajjala.reactive.spring.hooks.ContextHandoverAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import rx.plugins.RxJavaHooks;


/**
 * @author ThirupathiReddy Vajjala
 */
@SpringBootApplication
public class ReactiveSpringApplication implements CommandLineRunner {

    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveSpringApplication.class);

    @Autowired
    Tracer tracer;

    public static void main(String[] args) {
        SpringApplication springApplication = new SpringApplication(ReactiveSpringApplication.class);
        springApplication.run(args);

    }

    @Override
    public void run(String... args) {

        LOGGER.info("Application initialization completed ");
        /* Any scheduler actions (Scheduler.io) this code executes */
        RxJavaHooks.setOnScheduleAction(action0-> new ContextHandoverAction(action0, ThreadContextHolder.getThreadContext(), tracer)); (1)
    }
}
  1. Registering RxJavaHooks

No comments:

Post a Comment