Reactive Programming은 선언적인 프로그래밍 패러다임으로써 데이터 스트림(Data Stream)과 변화의 전파에 초점을 맞추고 있다. 또한 이것의 핵심은  비동기 (Async) 이벤트와 Observer 디자인 패턴이다. 외부의 이벤트나 데이터가 발생하였을 때, 사용자에게 자연스러운 응답을 주고, 규모 탄력적으로 리소스를 사용하며 실패에 있어서 유연하게 대처할 수 있다. 

 

 

 Reactive Stream은 Reactive Programming을 근간으로 한 개발을 하기 위한 명세이며, Netflix, Pivotal, TypeSafe의 발의로 시작되었다. 

 

Reactive Streams : https://www.reactive-streams.org/

 

 Reactive Stream은 비동기적인 스트림 프로세싱을 막힘없이 ( Non Blocking) 처리하기 위한 하나의 표준이다. 

 

Reactive Stream의 상세 스펙 보기

 

 

기본적인 API 컴포넌트 구성 


  1. Publisher
  2. Subscriber
  3. Subscription
  4. Processor

코드로 작성된 추상화 수준으로 확인을 해보면, 

// Publisher
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

// Subscriber
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

// Subscription
public interface Subscription {
    public void request(long n);
    public void cancel();
}

// Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

 

 

 

위의 코드에서 Subscriber의 역할에 대해서 아래의 그림으로 확인해보면, 

 

Subscriber 의 동작흐름

 

 

Publisher,  Subscriber, Subscription 사이의 연관관계를 아래와 같이 구성해봤다.

 

Reactive Stream 구성

 

실제로 위의 구성 관계를 이용해서 코드를 작성해보면, 아래와 같이 작성해볼 수 있다. 

 

package com.reactive.reactive.basic11;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PublishOnWithSubscribeOnBoth {

    private static Logger logger = LoggerFactory.getLogger(PublishOnWithSubscribeOnBoth.class);

    public static void main(String[] args) {
        Publisher<Integer> publisher = subscriber -> {
            subscriber.onSubscribe(new Subscription() { 
                @Override
                public void request(long n) {
                    logger.debug("request()");
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                    subscriber.onNext(4);
                    subscriber.onNext(5);
                    subscriber.onComplete();
                }

                @Override
                public void cancel() {

                }
            });
        };

        Publisher<Integer> subOnPub = subscriber -> {
            ExecutorService executorService = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                @Override
                public String getThreadNamePrefix() {
                    return "subOn-";
                }
            });

            executorService.execute(() -> publisher.subscribe(subscriber));

            executorService.shutdown();
        };

        Publisher<Integer> pubOnPub = subscriber -> {
            subOnPub.subscribe(new Subscriber<Integer>() {

                ExecutorService executorService = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                    @Override
                    public String getThreadNamePrefix() {
                        return "pubOn-";
                    }
                });

                @Override
                public void onSubscribe(Subscription s) {
                    subscriber.onSubscribe(s);
                }

                @Override
                public void onNext(Integer integer) {
                    executorService.execute(() -> {
                        subscriber.onNext(integer);
                    });
                }

                @Override
                public void onError(Throwable t) {
                    executorService.execute(() -> {
                        subscriber.onError(t);
                    });

                    executorService.shutdown();
                }

                @Override
                public void onComplete() {
                    executorService.execute(() -> {
                        subscriber.onComplete();
                    });

                    executorService.shutdown();
                }
            });
        };

        pubOnPub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                logger.debug("OnSubscribe");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                logger.debug("onNext :{}", integer);
            }

            @Override
            public void onError(Throwable t) {
                logger.debug("onError:{}" , t);
            }

            @Override
            public void onComplete() {
                logger.debug("onComplete");
            }
        });
    }
}

 

이번편은 개요로써 간단히 기본 항목들에 대해서 알아보았다.

 

 Java 8 부터 시작해서 차근차근 알아보도록 하겠다. 

 

다음편 보기

'알아보기' 카테고리의 다른 글

0002 Reactive Programming 3  (0) 2019.12.08
0004 Reactive Programming 2  (0) 2019.12.05
0002 Realm 활용하기 2  (0) 2019.11.22
0002 Realm 활용하기 3  (0) 2019.11.22
0003 자바를 이용한 병렬 프로그래밍 1  (0) 2019.11.20

+ Recent posts