Reactive Stream을 다루기 위해서 아래의 예제를 바탕으로 요구조건을 변경해가면서 코드 위주로 알아보겠다. 

 

 요구사항 

 

   일정한 시간을 주기로 2000건의 정수 배열을 받은 후 해당 각각의 결과 값을 합하는 요구사항이 발생하였다.

   생성해야할 정수 배열은 난수로 이루어진 값으로 1000개의 아이템을 가지고 있다. 

 

 

Reactive Stream Java 8 버전 기준 


1번, Main :  진입점

public class Sample05_reactiveStream04 {

    public static void main(String[] args) {

        BlockingQueue<Command> blockingQueue = new ArrayBlockingQueue<Command>(2000);

        for (int i = 0; i < 2000 ; i++) {
            IntStream randomValueList = new Random().ints();

            List<Integer> integerList = randomValueList.limit(1000).boxed().collect(Collectors.toList());

            Command command = new InsertToDataBaseCommand(integerList);

            blockingQueue.add(command);
        }

        Publisher publisher = new CustomPublisher(blockingQueue);

        Subscriber subscriber = new CustomSubscriber();

        publisher.subscribe(subscriber);
    }
}

 

2번, Publisher : Publisher를 구현한 CustomPublisher 

public class CustomPublisher<Command> implements Publisher<Command> {

    private final BlockingQueue<Command> blockingQueue;

    private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

    public CustomPublisher(BlockingQueue<Command> blockingQueue){
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void subscribe(Subscriber<? super Command> subscriber) {
        subscriber.onSubscribe(new Subscription() {
            @Override
            public void request(long n) {
                while (!blockingQueue.isEmpty()) {
                    try {
                        Command command = blockingQueue.take();

                        subscriber.onNext(command);

                    } catch (InterruptedException e) {
                        // Exception handling.
                    }
                }

                executorService.schedule(() -> {
                    if(Counter.get() == 2000){
                        System.out.println("onComplete");
                        subscriber.onComplete();

                        executorService.shutdown();
                    }
                }, 1, TimeUnit.SECONDS);
            }

            @Override
            public void cancel() {

            }
        });
    }
}

 

3번, Subscriber : Subscriber를 구현한 CustomSubscriber 

public class CustomSubscriber implements Subscriber<Command> {

    private Subscription subscription;

    private final ExecutorService executorService = Executors.newFixedThreadPool(1000);

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;

        this.subscription.request(1);
    }

    @Override
    public void onNext(Command command) {
        executorService.execute(command::work);
    }

    @Override
    public void onError(Throwable t) {
        System.out.println("We Error");
    }

    @Override
    public void onComplete() {
        System.out.println("We Finished");

        executorService.shutdown();
    }
}

 

4번, Command Interface

public interface Command {
    public void work();
}

 

5번, Command Implementation : 실제 구현 객체 

public class InsertToDataBaseCommand implements Command {

    private final List<Integer> integerList;

    public InsertToDataBaseCommand(List<Integer> listInteger){
        integerList = listInteger;
    }

    @Override
    public void work() {
        Optional<Integer> sum = integerList.stream().reduce(Integer::sum);

        System.out.println(Thread.currentThread().getName() +  " Final Result : " + sum);
        System.out.println(Thread.currentThread().getName()  + " " + Counter.incrementAndGet());

    }
}

 

6번, Counter Class :  수행된 횟수를 저장하고, 가져온다. 

public class Counter {

    private static final AtomicInteger atomicInteger = new AtomicInteger();

    public static int incrementAndGet(){
        return atomicInteger.incrementAndGet();
    }

    public static int get(){
        return atomicInteger.get();
    }
}

 


 

'알아보기' 카테고리의 다른 글

0002 Reactive Programming 4  (0) 2019.12.09
0002 Reactive Programming 3  (0) 2019.12.08
0004 Reactive Programming 1  (0) 2019.12.04
0002 Realm 활용하기 2  (0) 2019.11.22
0002 Realm 활용하기 3  (0) 2019.11.22

+ Recent posts