Reactive Stream을 다루기 위해서 아래의 예제를 바탕으로 요구조건을 변경해가면서 코드 위주로 알아보겠다.
요구사항
일정한 시간을 주기로 2000건의 정수 배열을 받은 후 해당 각각의 결과 값을 합하는 요구사항이 발생하였다.
생성해야할 정수 배열은 난수로 이루어진 값으로 1000개의 아이템을 가지고 있다.
Reactive Stream Java 8 버전 기준
1번, Main : 진입점
public class Sample05_reactiveStream04 { public static void main(String[] args) { BlockingQueue<Command> blockingQueue = new ArrayBlockingQueue<Command>(2000); for (int i = 0; i < 2000 ; i++) { IntStream randomValueList = new Random().ints(); List<Integer> integerList = randomValueList.limit(1000).boxed().collect(Collectors.toList()); Command command = new InsertToDataBaseCommand(integerList); blockingQueue.add(command); } Publisher publisher = new CustomPublisher(blockingQueue); Subscriber subscriber = new CustomSubscriber(); publisher.subscribe(subscriber); } }
2번, Publisher : Publisher를 구현한 CustomPublisher
public class CustomPublisher<Command> implements Publisher<Command> { private final BlockingQueue<Command> blockingQueue; private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); public CustomPublisher(BlockingQueue<Command> blockingQueue){ this.blockingQueue = blockingQueue; } @Override public void subscribe(Subscriber<? super Command> subscriber) { subscriber.onSubscribe(new Subscription() { @Override public void request(long n) { while (!blockingQueue.isEmpty()) { try { Command command = blockingQueue.take(); subscriber.onNext(command); } catch (InterruptedException e) { // Exception handling. } } executorService.schedule(() -> { if(Counter.get() == 2000){ System.out.println("onComplete"); subscriber.onComplete(); executorService.shutdown(); } }, 1, TimeUnit.SECONDS); } @Override public void cancel() { } }); } }
3번, Subscriber : Subscriber를 구현한 CustomSubscriber
public class CustomSubscriber implements Subscriber<Command> { private Subscription subscription; private final ExecutorService executorService = Executors.newFixedThreadPool(1000); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(Command command) { executorService.execute(command::work); } @Override public void onError(Throwable t) { System.out.println("We Error"); } @Override public void onComplete() { System.out.println("We Finished"); executorService.shutdown(); } }
4번, Command Interface
public interface Command { public void work(); }
5번, Command Implementation : 실제 구현 객체
public class InsertToDataBaseCommand implements Command { private final List<Integer> integerList; public InsertToDataBaseCommand(List<Integer> listInteger){ integerList = listInteger; } @Override public void work() { Optional<Integer> sum = integerList.stream().reduce(Integer::sum); System.out.println(Thread.currentThread().getName() + " Final Result : " + sum); System.out.println(Thread.currentThread().getName() + " " + Counter.incrementAndGet()); } }
6번, Counter Class : 수행된 횟수를 저장하고, 가져온다.
public class Counter { private static final AtomicInteger atomicInteger = new AtomicInteger(); public static int incrementAndGet(){ return atomicInteger.incrementAndGet(); } public static int get(){ return atomicInteger.get(); } }
'알아보기' 카테고리의 다른 글
0002 Reactive Programming 4 (0) | 2019.12.09 |
---|---|
0002 Reactive Programming 3 (0) | 2019.12.08 |
0004 Reactive Programming 1 (0) | 2019.12.04 |
0002 Realm 활용하기 2 (0) | 2019.11.22 |
0002 Realm 활용하기 3 (0) | 2019.11.22 |