이번에는 Reactive Stream을 이용해서 실질적으로 현업에서 사용할 만한 코드를 작성해보겠다. 

 

 요구사항 

 

   - Oracle DB , OJdbc, DBCP2

   - org.reactivesreams

   - ExecutorService

 

  A, B라는 Oracle DB User에 대해서 TB_USER로 부터 USER_ID를 가져와 출력하라는 요구 사항을 접수 받았다. 하지만 이 DB User의 수는 상황에 따라서 증가하거나 줄어들 수 있으며 그에 따라 고객이 빠른 응답을 받을 수 있도록 구성이 필요하다고 하자. 이와 같은 요구사항을 ReactiveStream(Java8)을 이용해서 구현해본다. 


1번 : Main 진입점 

 

 DataSource를 생성한뒤 각각의 Command 객체 생성시 전달하여 DB Select시에 사용하려고 한다. 

 해당 부분은 Controller 를 통해서 전달받은 파라미터 및 구분자에 의해서 동적으로 변경될 수 있는 부분이라고 가정한다. 

즉 commandHadlerList의 배열이 요청 갯수에 따라서 증가하거나 감소할 수 있다. 

public static void main(String[] args) {

        BasicDataSource dataSource = new BasicDataSource();
        dataSource.setDriverClassName("oracle.jdbc.driver.OracleDriver");
        dataSource.setUrl("jdbc:oracle:thin:@***********:****/****");
        dataSource.setUsername("*****");
        dataSource.setPassword("*****");

        BasicDataSource dataSource2 = new BasicDataSource();
        dataSource2.setDriverClassName("oracle.jdbc.driver.OracleDriver");
        dataSource2.setUrl("jdbc:oracle:thin:@***********:****/****");
        dataSource2.setUsername("*****");
        dataSource2.setPassword("*****");

        CommandHandler commandHandler1 = new CommandHandler(1);

        commandHandler1.add(new SelectDBCommand(dataSource));
        commandHandler1.add(new SelectDBCommand(dataSource2));

        List<CommandHandler> commandHandlerList = new ArrayList<>();

        commandHandlerList.add(commandHandler1);

        Publisher<CommandHandler> publisher = new SamplePublisher(commandHandlerList);

        Subscriber<CommandHandler> subscriber = new SampleSubscriber(commandHandlerList.size());

        publisher.subscribe(subscriber);
        
}

2번 : Publisher 

 

 Observer 패턴의  Observable과 같은 역할을 하며, Subscriber를 구독할 수 있게 설정하여 실행시 Publihser에서 Subscriber로 onNext, onComplete, onError를 처리할 수 있다. 

public class SamplePublisher implements Publisher<CommandHandler> {

    private final List<CommandHandler> commandHandlerList;

    public SamplePublisher(List<CommandHandler> commandHandlerList){
        this.commandHandlerList = commandHandlerList;
    }

    @Override
    public void subscribe(Subscriber<? super CommandHandler> subscriber) {
        subscriber.onSubscribe(new Subscription() {
            @Override
            public void request(long n) {
                commandHandlerList.forEach(subscriber::onNext);

                // 데이터 전송 처리 완료
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    }
}

3번 : Subcriber - Publisher 내부에서 호출한 각기 함수(onNext, onComplete, onError)에 맞게 진행된다. 

 

 SampleSubscriber가 생성되는 시점에 만들어진 ExectorService에 대해서  Java의 Future를 활용하여 onComplete 시점에 ExecutorService를 Shutdown 시킬 수 있게 처리하였다. 

public class SampleSubscriber implements Subscriber<CommandHandler> {

    private final ExecutorService executorService;

    List<Future<?>> futures = new ArrayList<>();

    public SampleSubscriber(int executeCount){
        this.executorService = Executors.newFixedThreadPool(executeCount);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        subscription.request(1);
    }

    @Override
    public void onNext(CommandHandler commandHandler) {
        futures.add(executorService.submit(commandHandler::StartAPICall));
    }

    @Override
    public void onError(Throwable t) {

    }

    @Override
    public void onComplete() {
        for(Future<?> future : futures) {
            try {
                future.get();
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        executorService.shutdown();
    }
}

4번 : Command Handler 

 

3번 항목과 처리되는 방식은 거의 유사하며, Command Handler에서 각각의 Command를 ExecutorService를 이용하여  실행시키는 구조이다. 

public class CommandHandler {

    private final List<Command> commandList = new ArrayList<>();
    List<Future<?>> futures = new ArrayList<>();

    private final ExecutorService executorService;

    public CommandHandler(int threadCount){
        executorService = Executors.newFixedThreadPool(threadCount);
    }

    public void StartAPICall() {

        commandList.forEach(command -> {
            futures.add(executorService.submit(command::execute, "Success"));
        });

        for(Future<?> future : futures) {
            try {
                future.get();
            }catch(Exception e){
                e.printStackTrace();
            }
        }

        executorService.shutdown();
    }

    public synchronized void add(Command command) {
        commandList.add(command);
    }
}

5번 : Command 

public interface Command {
    public void execute();
}

6번 : Command 구현 - SelectDBCommand 

 

 실질적으로 데이터를 조회하고, 이를 용도에 맞게 처리하는 Command의 구현체이다. SelectDBCommand 객체가 

생성되는 시점에 필요한 정보를 모두 합성 방식을 이용하여 미리 준비하고, execute가 호출 되는 시점에 활용한다. 

아래와 같은 방식으로 구성했을 때 ThreadSafe하게 구현되었다고 할 수 있다. 

public class SelectDBCommand implements Command {

    private final DataSource dataSource;

    public SelectDBCommand(DataSource dataSource){
        this.dataSource =  dataSource;
    }

    @Override
    public void execute() {

        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

        List<User> list =  jdbcTemplate.query("SELECT USER_ID FROM *********", new CustomMapper());

        System.out.println("START" + Thread.currentThread().getName());
        list.stream().forEach(item -> {
            System.out.print(item.getUSER_ID() + ",");
        });
        System.out.println("END" + Thread.currentThread().getName());
    }
}

7번 : CustomMapper - JDBC Rowmapper 

 

JdbcTemplate의 RowMapper를 구현하여 필요에 맞게 값을 바인딩 처리한다. 

public class CustomMapper implements RowMapper<User> {
    @Override
    public User mapRow(ResultSet resultSet, int i) throws SQLException {
        User user = new User();
        user.setUSER_ID(resultSet.getString("USER_ID"));
        return user;
    }
}

'알아보기' 카테고리의 다른 글

0006 WebJars  (0) 2019.12.18
0002 Reactive Programming 3  (0) 2019.12.08
0004 Reactive Programming 2  (0) 2019.12.05
0004 Reactive Programming 1  (0) 2019.12.04
0002 Realm 활용하기 2  (0) 2019.11.22

Reactive Programming 2 편에서 다루었던 개발건에 추가적인 요구사항에 생겼다고 해보자, 

 

 

 요구사항

 

 ( 2편의 요구사항 )

   일정한 시간을 주기로 2000건의 정수 배열을 받은 후 해당 각각의 결과 값을 합하는 요구사항이 발생하였다.

   생성해야할 정수 배열은 난수로 이루어진 값으로 1000개의 아이템을 가지고 있다. 

 

 추가 요구사항 

    난수로 생성되는 2000건의 정수 배열에 대해서 각 배열의 총합이  음수인 값은 제외하고, 양수인 값을 추출하여하 출력한다.  

 

 ( 사실, 자바8의 Stream API를 사용한다면 이런 힘들고 복잡한 작업을 전혀할 필요가 없다. 우리는 Reactive Stream의 기본이 되는 개념을 익히고 이를 실질적으로 활용하기 위한 Akka, RXJava, Java9 Flow API 등등을 쉽게 이해하기 위해 진행 중인 것이다. )

 


2편에서 다룬 코드 샘플을 아래와 같이 변경하였다. 

아래의 코드에서 중요한 부분은 Publisher 1, Publisher 2를 연결하면서 처리되는 Data 흐름이다.  

데이터 전달 순서 : CustomPublisher -> FilterPublisher -> CustomSubscriber  

 

1번 : Main 진입점

public static void main(String[] args) {
	// 1000개의 난수 배열을 가진 아이템 생성 - 각각을 합쳐

	BlockingQueue<Command> blockingQueue = new ArrayBlockingQueue<Command>(500);

	for (int i = 0; i < 500; i++) {

	IntStream intStream = new Random().ints();

	List<Integer> integerList = intStream.limit(1000).boxed().collect(Collectors.toList());

	Command command = new IntegerSumCommand(integerList);

	blockingQueue.add(command);
}

// Publisher 1
Publisher<Command> publisher = new CustomPublisher(blockingQueue);

// Publisher 2
Publisher<Command> publisher1 = new FilterPublisher(publisher);

// Subscriber
Subscriber subscriber = new CustomSubscriber();

// Start !
publisher1.subscribe(subscriber);
}


2번 : Publisher 1 : Publisher를 구현한 CustomPublisher 

public class CustomPublisher implements Publisher<Command> {

    private final BlockingQueue<Command> blockingQueue;

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    public CustomPublisher(BlockingQueue<Command> blockingQueue){
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void subscribe(Subscriber subscriber) {

        subscriber.onSubscribe(new Subscription() {
            @Override
            public void request(long n) {
                while (!blockingQueue.isEmpty()){
                    try {
                        Command command = blockingQueue.take();

                        subscriber.onNext(command);

                    } catch (InterruptedException e) {

                        e.printStackTrace();
                    }
                }

                scheduledExecutorService.schedule(() -> {
                    if(Counter.get() == 500){
                        subscriber.onComplete();

                        if(!scheduledExecutorService.isShutdown()){
                            scheduledExecutorService.shutdown();
                        }
                    }
                }, 1, TimeUnit.SECONDS);

            }

            @Override
            public void cancel() {

            }
        });
    }
}

 

3번 : Publisher 2 : Publisher 1을 전달 받아 값을 필터링하는 FilterPublisher

public class FilterPublisher implements Publisher<Command> {

    private Publisher<Command> publisher;

    public FilterPublisher(Publisher<Command> publisher){
        this.publisher = publisher;
    }

    @Override
    public void subscribe(Subscriber<? super Command> subscriber) {
        this.publisher.subscribe(new Subscriber<Command>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }

            @Override
            public void onNext(Command command) {
                if(!command.isMinus()){
                    subscriber.onNext(command);
                }
            }

            @Override
            public void onError(Throwable t) {
                subscriber.onError(t);
            }

            @Override
            public void onComplete() {
                subscriber.onComplete();
            }
        });
    }
}

4번 : Subscriber : 최종적으로 Publisher 2의 데이터 처리를 전달받아 최종값을 출력한다.

public class CustomSubscriber implements Subscriber<Command> {

    private ExecutorService executorService = Executors
            .newFixedThreadPool(1000);

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(Command command) {
        executorService.execute(() -> {
            int total = command.getTotal();

            System.out.println(String.format("총 값은 %s", total));
        });
    }

    @Override
    public void onError(Throwable throwable) {
        subscription.cancel();
    }

    @Override
    public void onComplete() {
        if(!executorService.isShutdown()){
            executorService.shutdown();
        }
    }
}

 

5번 : Command Interface 

public interface Command {
    public int getTotal();
    public boolean isMinus();
}

 

6번 : Command Implementation 

public class IntegerHandlerCommand implements Command {

    private Optional<Integer> optionalInteger;

    public IntegerHandlerCommand(List<Integer> integerList){

        Counter.addAndGet();

        this.optionalInteger = integerList.stream().reduce(Integer::sum);
    }

    @Override
    public int getTotal() {
        return this.optionalInteger.orElse(0);
    }

    @Override
    public boolean isMinus() {
        return this.optionalInteger.orElse(0) < 0;
    }
}

 

7번 : Counter Class - Executor Service 종료를 위한 Counter 서비스 

public class Counter {
    private static int count = 0;

    public synchronized static int addAndGet(){
        return count ++;
    }

    public synchronized static int get(){
        return count;
    }
}

'알아보기' 카테고리의 다른 글

0006 WebJars  (0) 2019.12.18
0002 Reactive Programming 4  (0) 2019.12.09
0004 Reactive Programming 2  (0) 2019.12.05
0004 Reactive Programming 1  (0) 2019.12.04
0002 Realm 활용하기 2  (0) 2019.11.22

 

 Reactive Programming은 선언적인 프로그래밍 패러다임으로써 데이터 스트림(Data Stream)과 변화의 전파에 초점을 맞추고 있다. 또한 이것의 핵심은  비동기 (Async) 이벤트와 Observer 디자인 패턴이다. 외부의 이벤트나 데이터가 발생하였을 때, 사용자에게 자연스러운 응답을 주고, 규모 탄력적으로 리소스를 사용하며 실패에 있어서 유연하게 대처할 수 있다. 

 

 

 Reactive Stream은 Reactive Programming을 근간으로 한 개발을 하기 위한 명세이며, Netflix, Pivotal, TypeSafe의 발의로 시작되었다. 

 

Reactive Streams : https://www.reactive-streams.org/

 

 Reactive Stream은 비동기적인 스트림 프로세싱을 막힘없이 ( Non Blocking) 처리하기 위한 하나의 표준이다. 

 

Reactive Stream의 상세 스펙 보기

 

 

기본적인 API 컴포넌트 구성 


  1. Publisher
  2. Subscriber
  3. Subscription
  4. Processor

코드로 작성된 추상화 수준으로 확인을 해보면, 

// Publisher
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

// Subscriber
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

// Subscription
public interface Subscription {
    public void request(long n);
    public void cancel();
}

// Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

 

 

 

위의 코드에서 Subscriber의 역할에 대해서 아래의 그림으로 확인해보면, 

 

Subscriber 의 동작흐름

 

 

Publisher,  Subscriber, Subscription 사이의 연관관계를 아래와 같이 구성해봤다.

 

Reactive Stream 구성

 

실제로 위의 구성 관계를 이용해서 코드를 작성해보면, 아래와 같이 작성해볼 수 있다. 

 

package com.reactive.reactive.basic11;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PublishOnWithSubscribeOnBoth {

    private static Logger logger = LoggerFactory.getLogger(PublishOnWithSubscribeOnBoth.class);

    public static void main(String[] args) {
        Publisher<Integer> publisher = subscriber -> {
            subscriber.onSubscribe(new Subscription() { 
                @Override
                public void request(long n) {
                    logger.debug("request()");
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                    subscriber.onNext(4);
                    subscriber.onNext(5);
                    subscriber.onComplete();
                }

                @Override
                public void cancel() {

                }
            });
        };

        Publisher<Integer> subOnPub = subscriber -> {
            ExecutorService executorService = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                @Override
                public String getThreadNamePrefix() {
                    return "subOn-";
                }
            });

            executorService.execute(() -> publisher.subscribe(subscriber));

            executorService.shutdown();
        };

        Publisher<Integer> pubOnPub = subscriber -> {
            subOnPub.subscribe(new Subscriber<Integer>() {

                ExecutorService executorService = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                    @Override
                    public String getThreadNamePrefix() {
                        return "pubOn-";
                    }
                });

                @Override
                public void onSubscribe(Subscription s) {
                    subscriber.onSubscribe(s);
                }

                @Override
                public void onNext(Integer integer) {
                    executorService.execute(() -> {
                        subscriber.onNext(integer);
                    });
                }

                @Override
                public void onError(Throwable t) {
                    executorService.execute(() -> {
                        subscriber.onError(t);
                    });

                    executorService.shutdown();
                }

                @Override
                public void onComplete() {
                    executorService.execute(() -> {
                        subscriber.onComplete();
                    });

                    executorService.shutdown();
                }
            });
        };

        pubOnPub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                logger.debug("OnSubscribe");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                logger.debug("onNext :{}", integer);
            }

            @Override
            public void onError(Throwable t) {
                logger.debug("onError:{}" , t);
            }

            @Override
            public void onComplete() {
                logger.debug("onComplete");
            }
        });
    }
}

 

이번편은 개요로써 간단히 기본 항목들에 대해서 알아보았다.

 

 Java 8 부터 시작해서 차근차근 알아보도록 하겠다. 

 

다음편 보기

'알아보기' 카테고리의 다른 글

0002 Reactive Programming 3  (0) 2019.12.08
0004 Reactive Programming 2  (0) 2019.12.05
0002 Realm 활용하기 2  (0) 2019.11.22
0002 Realm 활용하기 3  (0) 2019.11.22
0003 자바를 이용한 병렬 프로그래밍 1  (0) 2019.11.20

+ Recent posts