Reactive Programming은 선언적인 프로그래밍 패러다임으로써 데이터 스트림(Data Stream)과 변화의 전파에 초점을 맞추고 있다. 또한 이것의 핵심은 비동기 (Async) 이벤트와 Observer 디자인 패턴이다. 외부의 이벤트나 데이터가 발생하였을 때, 사용자에게 자연스러운 응답을 주고, 규모 탄력적으로 리소스를 사용하며 실패에 있어서 유연하게 대처할 수 있다.
Reactive Stream은 Reactive Programming을 근간으로 한 개발을 하기 위한 명세이며, Netflix, Pivotal, TypeSafe의 발의로 시작되었다.
Reactive Streams : https://www.reactive-streams.org/
Reactive Stream은 비동기적인 스트림 프로세싱을 막힘없이 ( Non Blocking) 처리하기 위한 하나의 표준이다.
기본적인 API 컴포넌트 구성
- Publisher
- Subscriber
- Subscription
- Processor
코드로 작성된 추상화 수준으로 확인을 해보면,
// Publisher public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } // Subscriber public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } // Subscription public interface Subscription { public void request(long n); public void cancel(); } // Processor public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
위의 코드에서 Subscriber의 역할에 대해서 아래의 그림으로 확인해보면,
Publisher, Subscriber, Subscription 사이의 연관관계를 아래와 같이 구성해봤다.
실제로 위의 구성 관계를 이용해서 코드를 작성해보면, 아래와 같이 작성해볼 수 있다.
package com.reactive.reactive.basic11; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class PublishOnWithSubscribeOnBoth { private static Logger logger = LoggerFactory.getLogger(PublishOnWithSubscribeOnBoth.class); public static void main(String[] args) { Publisher<Integer> publisher = subscriber -> { subscriber.onSubscribe(new Subscription() { @Override public void request(long n) { logger.debug("request()"); subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); subscriber.onNext(4); subscriber.onNext(5); subscriber.onComplete(); } @Override public void cancel() { } }); }; Publisher<Integer> subOnPub = subscriber -> { ExecutorService executorService = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){ @Override public String getThreadNamePrefix() { return "subOn-"; } }); executorService.execute(() -> publisher.subscribe(subscriber)); executorService.shutdown(); }; Publisher<Integer> pubOnPub = subscriber -> { subOnPub.subscribe(new Subscriber<Integer>() { ExecutorService executorService = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){ @Override public String getThreadNamePrefix() { return "pubOn-"; } }); @Override public void onSubscribe(Subscription s) { subscriber.onSubscribe(s); } @Override public void onNext(Integer integer) { executorService.execute(() -> { subscriber.onNext(integer); }); } @Override public void onError(Throwable t) { executorService.execute(() -> { subscriber.onError(t); }); executorService.shutdown(); } @Override public void onComplete() { executorService.execute(() -> { subscriber.onComplete(); }); executorService.shutdown(); } }); }; pubOnPub.subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription subscription) { logger.debug("OnSubscribe"); subscription.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { logger.debug("onNext :{}", integer); } @Override public void onError(Throwable t) { logger.debug("onError:{}" , t); } @Override public void onComplete() { logger.debug("onComplete"); } }); } }
이번편은 개요로써 간단히 기본 항목들에 대해서 알아보았다.
Java 8 부터 시작해서 차근차근 알아보도록 하겠다.
'알아보기' 카테고리의 다른 글
0002 Reactive Programming 3 (0) | 2019.12.08 |
---|---|
0004 Reactive Programming 2 (0) | 2019.12.05 |
0002 Realm 활용하기 2 (0) | 2019.11.22 |
0002 Realm 활용하기 3 (0) | 2019.11.22 |
0003 자바를 이용한 병렬 프로그래밍 1 (0) | 2019.11.20 |