Reactive Stream을 다루기 위해서 아래의 예제를 바탕으로 요구조건을 변경해가면서 코드 위주로 알아보겠다.
요구사항
일정한 시간을 주기로 2000건의 정수 배열을 받은 후 해당 각각의 결과 값을 합하는 요구사항이 발생하였다.
생성해야할 정수 배열은 난수로 이루어진 값으로 1000개의 아이템을 가지고 있다.
Reactive Stream Java 8 버전 기준
1번, Main : 진입점
public class Sample05_reactiveStream04 {
public static void main(String[] args) {
BlockingQueue<Command> blockingQueue = new ArrayBlockingQueue<Command>(2000);
for (int i = 0; i < 2000 ; i++) {
IntStream randomValueList = new Random().ints();
List<Integer> integerList = randomValueList.limit(1000).boxed().collect(Collectors.toList());
Command command = new InsertToDataBaseCommand(integerList);
blockingQueue.add(command);
}
Publisher publisher = new CustomPublisher(blockingQueue);
Subscriber subscriber = new CustomSubscriber();
publisher.subscribe(subscriber);
}
}
2번, Publisher : Publisher를 구현한 CustomPublisher
public class CustomPublisher<Command> implements Publisher<Command> {
private final BlockingQueue<Command> blockingQueue;
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
public CustomPublisher(BlockingQueue<Command> blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void subscribe(Subscriber<? super Command> subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
while (!blockingQueue.isEmpty()) {
try {
Command command = blockingQueue.take();
subscriber.onNext(command);
} catch (InterruptedException e) {
// Exception handling.
}
}
executorService.schedule(() -> {
if(Counter.get() == 2000){
System.out.println("onComplete");
subscriber.onComplete();
executorService.shutdown();
}
}, 1, TimeUnit.SECONDS);
}
@Override
public void cancel() {
}
});
}
}
3번, Subscriber : Subscriber를 구현한 CustomSubscriber
public class CustomSubscriber implements Subscriber<Command> {
private Subscription subscription;
private final ExecutorService executorService = Executors.newFixedThreadPool(1000);
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Command command) {
executorService.execute(command::work);
}
@Override
public void onError(Throwable t) {
System.out.println("We Error");
}
@Override
public void onComplete() {
System.out.println("We Finished");
executorService.shutdown();
}
}
4번, Command Interface
public interface Command {
public void work();
}
5번, Command Implementation : 실제 구현 객체
public class InsertToDataBaseCommand implements Command {
private final List<Integer> integerList;
public InsertToDataBaseCommand(List<Integer> listInteger){
integerList = listInteger;
}
@Override
public void work() {
Optional<Integer> sum = integerList.stream().reduce(Integer::sum);
System.out.println(Thread.currentThread().getName() + " Final Result : " + sum);
System.out.println(Thread.currentThread().getName() + " " + Counter.incrementAndGet());
}
}
6번, Counter Class : 수행된 횟수를 저장하고, 가져온다.
public class Counter {
private static final AtomicInteger atomicInteger = new AtomicInteger();
public static int incrementAndGet(){
return atomicInteger.incrementAndGet();
}
public static int get(){
return atomicInteger.get();
}
}
'알아보기' 카테고리의 다른 글
| 0002 Reactive Programming 4 (0) | 2019.12.09 |
|---|---|
| 0002 Reactive Programming 3 (0) | 2019.12.08 |
| 0004 Reactive Programming 1 (0) | 2019.12.04 |
| 0002 Realm 활용하기 2 (0) | 2019.11.22 |
| 0002 Realm 활용하기 3 (0) | 2019.11.22 |