이번에는 Reactive Stream을 이용해서 실질적으로 현업에서 사용할 만한 코드를 작성해보겠다. 

 

 요구사항 

 

   - Oracle DB , OJdbc, DBCP2

   - org.reactivesreams

   - ExecutorService

 

  A, B라는 Oracle DB User에 대해서 TB_USER로 부터 USER_ID를 가져와 출력하라는 요구 사항을 접수 받았다. 하지만 이 DB User의 수는 상황에 따라서 증가하거나 줄어들 수 있으며 그에 따라 고객이 빠른 응답을 받을 수 있도록 구성이 필요하다고 하자. 이와 같은 요구사항을 ReactiveStream(Java8)을 이용해서 구현해본다. 


1번 : Main 진입점 

 

 DataSource를 생성한뒤 각각의 Command 객체 생성시 전달하여 DB Select시에 사용하려고 한다. 

 해당 부분은 Controller 를 통해서 전달받은 파라미터 및 구분자에 의해서 동적으로 변경될 수 있는 부분이라고 가정한다. 

즉 commandHadlerList의 배열이 요청 갯수에 따라서 증가하거나 감소할 수 있다. 

public static void main(String[] args) {

        BasicDataSource dataSource = new BasicDataSource();
        dataSource.setDriverClassName("oracle.jdbc.driver.OracleDriver");
        dataSource.setUrl("jdbc:oracle:thin:@***********:****/****");
        dataSource.setUsername("*****");
        dataSource.setPassword("*****");

        BasicDataSource dataSource2 = new BasicDataSource();
        dataSource2.setDriverClassName("oracle.jdbc.driver.OracleDriver");
        dataSource2.setUrl("jdbc:oracle:thin:@***********:****/****");
        dataSource2.setUsername("*****");
        dataSource2.setPassword("*****");

        CommandHandler commandHandler1 = new CommandHandler(1);

        commandHandler1.add(new SelectDBCommand(dataSource));
        commandHandler1.add(new SelectDBCommand(dataSource2));

        List<CommandHandler> commandHandlerList = new ArrayList<>();

        commandHandlerList.add(commandHandler1);

        Publisher<CommandHandler> publisher = new SamplePublisher(commandHandlerList);

        Subscriber<CommandHandler> subscriber = new SampleSubscriber(commandHandlerList.size());

        publisher.subscribe(subscriber);
        
}

2번 : Publisher 

 

 Observer 패턴의  Observable과 같은 역할을 하며, Subscriber를 구독할 수 있게 설정하여 실행시 Publihser에서 Subscriber로 onNext, onComplete, onError를 처리할 수 있다. 

public class SamplePublisher implements Publisher<CommandHandler> {

    private final List<CommandHandler> commandHandlerList;

    public SamplePublisher(List<CommandHandler> commandHandlerList){
        this.commandHandlerList = commandHandlerList;
    }

    @Override
    public void subscribe(Subscriber<? super CommandHandler> subscriber) {
        subscriber.onSubscribe(new Subscription() {
            @Override
            public void request(long n) {
                commandHandlerList.forEach(subscriber::onNext);

                // 데이터 전송 처리 완료
                subscriber.onComplete();
            }

            @Override
            public void cancel() {

            }
        });
    }
}

3번 : Subcriber - Publisher 내부에서 호출한 각기 함수(onNext, onComplete, onError)에 맞게 진행된다. 

 

 SampleSubscriber가 생성되는 시점에 만들어진 ExectorService에 대해서  Java의 Future를 활용하여 onComplete 시점에 ExecutorService를 Shutdown 시킬 수 있게 처리하였다. 

public class SampleSubscriber implements Subscriber<CommandHandler> {

    private final ExecutorService executorService;

    List<Future<?>> futures = new ArrayList<>();

    public SampleSubscriber(int executeCount){
        this.executorService = Executors.newFixedThreadPool(executeCount);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        subscription.request(1);
    }

    @Override
    public void onNext(CommandHandler commandHandler) {
        futures.add(executorService.submit(commandHandler::StartAPICall));
    }

    @Override
    public void onError(Throwable t) {

    }

    @Override
    public void onComplete() {
        for(Future<?> future : futures) {
            try {
                future.get();
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        executorService.shutdown();
    }
}

4번 : Command Handler 

 

3번 항목과 처리되는 방식은 거의 유사하며, Command Handler에서 각각의 Command를 ExecutorService를 이용하여  실행시키는 구조이다. 

public class CommandHandler {

    private final List<Command> commandList = new ArrayList<>();
    List<Future<?>> futures = new ArrayList<>();

    private final ExecutorService executorService;

    public CommandHandler(int threadCount){
        executorService = Executors.newFixedThreadPool(threadCount);
    }

    public void StartAPICall() {

        commandList.forEach(command -> {
            futures.add(executorService.submit(command::execute, "Success"));
        });

        for(Future<?> future : futures) {
            try {
                future.get();
            }catch(Exception e){
                e.printStackTrace();
            }
        }

        executorService.shutdown();
    }

    public synchronized void add(Command command) {
        commandList.add(command);
    }
}

5번 : Command 

public interface Command {
    public void execute();
}

6번 : Command 구현 - SelectDBCommand 

 

 실질적으로 데이터를 조회하고, 이를 용도에 맞게 처리하는 Command의 구현체이다. SelectDBCommand 객체가 

생성되는 시점에 필요한 정보를 모두 합성 방식을 이용하여 미리 준비하고, execute가 호출 되는 시점에 활용한다. 

아래와 같은 방식으로 구성했을 때 ThreadSafe하게 구현되었다고 할 수 있다. 

public class SelectDBCommand implements Command {

    private final DataSource dataSource;

    public SelectDBCommand(DataSource dataSource){
        this.dataSource =  dataSource;
    }

    @Override
    public void execute() {

        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

        List<User> list =  jdbcTemplate.query("SELECT USER_ID FROM *********", new CustomMapper());

        System.out.println("START" + Thread.currentThread().getName());
        list.stream().forEach(item -> {
            System.out.print(item.getUSER_ID() + ",");
        });
        System.out.println("END" + Thread.currentThread().getName());
    }
}

7번 : CustomMapper - JDBC Rowmapper 

 

JdbcTemplate의 RowMapper를 구현하여 필요에 맞게 값을 바인딩 처리한다. 

public class CustomMapper implements RowMapper<User> {
    @Override
    public User mapRow(ResultSet resultSet, int i) throws SQLException {
        User user = new User();
        user.setUSER_ID(resultSet.getString("USER_ID"));
        return user;
    }
}

'알아보기' 카테고리의 다른 글

0006 WebJars  (0) 2019.12.18
0002 Reactive Programming 3  (0) 2019.12.08
0004 Reactive Programming 2  (0) 2019.12.05
0004 Reactive Programming 1  (0) 2019.12.04
0002 Realm 활용하기 2  (0) 2019.11.22

Reactive Programming 2 편에서 다루었던 개발건에 추가적인 요구사항에 생겼다고 해보자, 

 

 

 요구사항

 

 ( 2편의 요구사항 )

   일정한 시간을 주기로 2000건의 정수 배열을 받은 후 해당 각각의 결과 값을 합하는 요구사항이 발생하였다.

   생성해야할 정수 배열은 난수로 이루어진 값으로 1000개의 아이템을 가지고 있다. 

 

 추가 요구사항 

    난수로 생성되는 2000건의 정수 배열에 대해서 각 배열의 총합이  음수인 값은 제외하고, 양수인 값을 추출하여하 출력한다.  

 

 ( 사실, 자바8의 Stream API를 사용한다면 이런 힘들고 복잡한 작업을 전혀할 필요가 없다. 우리는 Reactive Stream의 기본이 되는 개념을 익히고 이를 실질적으로 활용하기 위한 Akka, RXJava, Java9 Flow API 등등을 쉽게 이해하기 위해 진행 중인 것이다. )

 


2편에서 다룬 코드 샘플을 아래와 같이 변경하였다. 

아래의 코드에서 중요한 부분은 Publisher 1, Publisher 2를 연결하면서 처리되는 Data 흐름이다.  

데이터 전달 순서 : CustomPublisher -> FilterPublisher -> CustomSubscriber  

 

1번 : Main 진입점

public static void main(String[] args) {
	// 1000개의 난수 배열을 가진 아이템 생성 - 각각을 합쳐

	BlockingQueue<Command> blockingQueue = new ArrayBlockingQueue<Command>(500);

	for (int i = 0; i < 500; i++) {

	IntStream intStream = new Random().ints();

	List<Integer> integerList = intStream.limit(1000).boxed().collect(Collectors.toList());

	Command command = new IntegerSumCommand(integerList);

	blockingQueue.add(command);
}

// Publisher 1
Publisher<Command> publisher = new CustomPublisher(blockingQueue);

// Publisher 2
Publisher<Command> publisher1 = new FilterPublisher(publisher);

// Subscriber
Subscriber subscriber = new CustomSubscriber();

// Start !
publisher1.subscribe(subscriber);
}


2번 : Publisher 1 : Publisher를 구현한 CustomPublisher 

public class CustomPublisher implements Publisher<Command> {

    private final BlockingQueue<Command> blockingQueue;

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    public CustomPublisher(BlockingQueue<Command> blockingQueue){
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void subscribe(Subscriber subscriber) {

        subscriber.onSubscribe(new Subscription() {
            @Override
            public void request(long n) {
                while (!blockingQueue.isEmpty()){
                    try {
                        Command command = blockingQueue.take();

                        subscriber.onNext(command);

                    } catch (InterruptedException e) {

                        e.printStackTrace();
                    }
                }

                scheduledExecutorService.schedule(() -> {
                    if(Counter.get() == 500){
                        subscriber.onComplete();

                        if(!scheduledExecutorService.isShutdown()){
                            scheduledExecutorService.shutdown();
                        }
                    }
                }, 1, TimeUnit.SECONDS);

            }

            @Override
            public void cancel() {

            }
        });
    }
}

 

3번 : Publisher 2 : Publisher 1을 전달 받아 값을 필터링하는 FilterPublisher

public class FilterPublisher implements Publisher<Command> {

    private Publisher<Command> publisher;

    public FilterPublisher(Publisher<Command> publisher){
        this.publisher = publisher;
    }

    @Override
    public void subscribe(Subscriber<? super Command> subscriber) {
        this.publisher.subscribe(new Subscriber<Command>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }

            @Override
            public void onNext(Command command) {
                if(!command.isMinus()){
                    subscriber.onNext(command);
                }
            }

            @Override
            public void onError(Throwable t) {
                subscriber.onError(t);
            }

            @Override
            public void onComplete() {
                subscriber.onComplete();
            }
        });
    }
}

4번 : Subscriber : 최종적으로 Publisher 2의 데이터 처리를 전달받아 최종값을 출력한다.

public class CustomSubscriber implements Subscriber<Command> {

    private ExecutorService executorService = Executors
            .newFixedThreadPool(1000);

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(Command command) {
        executorService.execute(() -> {
            int total = command.getTotal();

            System.out.println(String.format("총 값은 %s", total));
        });
    }

    @Override
    public void onError(Throwable throwable) {
        subscription.cancel();
    }

    @Override
    public void onComplete() {
        if(!executorService.isShutdown()){
            executorService.shutdown();
        }
    }
}

 

5번 : Command Interface 

public interface Command {
    public int getTotal();
    public boolean isMinus();
}

 

6번 : Command Implementation 

public class IntegerHandlerCommand implements Command {

    private Optional<Integer> optionalInteger;

    public IntegerHandlerCommand(List<Integer> integerList){

        Counter.addAndGet();

        this.optionalInteger = integerList.stream().reduce(Integer::sum);
    }

    @Override
    public int getTotal() {
        return this.optionalInteger.orElse(0);
    }

    @Override
    public boolean isMinus() {
        return this.optionalInteger.orElse(0) < 0;
    }
}

 

7번 : Counter Class - Executor Service 종료를 위한 Counter 서비스 

public class Counter {
    private static int count = 0;

    public synchronized static int addAndGet(){
        return count ++;
    }

    public synchronized static int get(){
        return count;
    }
}

'알아보기' 카테고리의 다른 글

0006 WebJars  (0) 2019.12.18
0002 Reactive Programming 4  (0) 2019.12.09
0004 Reactive Programming 2  (0) 2019.12.05
0004 Reactive Programming 1  (0) 2019.12.04
0002 Realm 활용하기 2  (0) 2019.11.22

Reactive Stream을 다루기 위해서 아래의 예제를 바탕으로 요구조건을 변경해가면서 코드 위주로 알아보겠다. 

 

 요구사항 

 

   일정한 시간을 주기로 2000건의 정수 배열을 받은 후 해당 각각의 결과 값을 합하는 요구사항이 발생하였다.

   생성해야할 정수 배열은 난수로 이루어진 값으로 1000개의 아이템을 가지고 있다. 

 

 

Reactive Stream Java 8 버전 기준 


1번, Main :  진입점

public class Sample05_reactiveStream04 {

    public static void main(String[] args) {

        BlockingQueue<Command> blockingQueue = new ArrayBlockingQueue<Command>(2000);

        for (int i = 0; i < 2000 ; i++) {
            IntStream randomValueList = new Random().ints();

            List<Integer> integerList = randomValueList.limit(1000).boxed().collect(Collectors.toList());

            Command command = new InsertToDataBaseCommand(integerList);

            blockingQueue.add(command);
        }

        Publisher publisher = new CustomPublisher(blockingQueue);

        Subscriber subscriber = new CustomSubscriber();

        publisher.subscribe(subscriber);
    }
}

 

2번, Publisher : Publisher를 구현한 CustomPublisher 

public class CustomPublisher<Command> implements Publisher<Command> {

    private final BlockingQueue<Command> blockingQueue;

    private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

    public CustomPublisher(BlockingQueue<Command> blockingQueue){
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void subscribe(Subscriber<? super Command> subscriber) {
        subscriber.onSubscribe(new Subscription() {
            @Override
            public void request(long n) {
                while (!blockingQueue.isEmpty()) {
                    try {
                        Command command = blockingQueue.take();

                        subscriber.onNext(command);

                    } catch (InterruptedException e) {
                        // Exception handling.
                    }
                }

                executorService.schedule(() -> {
                    if(Counter.get() == 2000){
                        System.out.println("onComplete");
                        subscriber.onComplete();

                        executorService.shutdown();
                    }
                }, 1, TimeUnit.SECONDS);
            }

            @Override
            public void cancel() {

            }
        });
    }
}

 

3번, Subscriber : Subscriber를 구현한 CustomSubscriber 

public class CustomSubscriber implements Subscriber<Command> {

    private Subscription subscription;

    private final ExecutorService executorService = Executors.newFixedThreadPool(1000);

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;

        this.subscription.request(1);
    }

    @Override
    public void onNext(Command command) {
        executorService.execute(command::work);
    }

    @Override
    public void onError(Throwable t) {
        System.out.println("We Error");
    }

    @Override
    public void onComplete() {
        System.out.println("We Finished");

        executorService.shutdown();
    }
}

 

4번, Command Interface

public interface Command {
    public void work();
}

 

5번, Command Implementation : 실제 구현 객체 

public class InsertToDataBaseCommand implements Command {

    private final List<Integer> integerList;

    public InsertToDataBaseCommand(List<Integer> listInteger){
        integerList = listInteger;
    }

    @Override
    public void work() {
        Optional<Integer> sum = integerList.stream().reduce(Integer::sum);

        System.out.println(Thread.currentThread().getName() +  " Final Result : " + sum);
        System.out.println(Thread.currentThread().getName()  + " " + Counter.incrementAndGet());

    }
}

 

6번, Counter Class :  수행된 횟수를 저장하고, 가져온다. 

public class Counter {

    private static final AtomicInteger atomicInteger = new AtomicInteger();

    public static int incrementAndGet(){
        return atomicInteger.incrementAndGet();
    }

    public static int get(){
        return atomicInteger.get();
    }
}

 


 

'알아보기' 카테고리의 다른 글

0002 Reactive Programming 4  (0) 2019.12.09
0002 Reactive Programming 3  (0) 2019.12.08
0004 Reactive Programming 1  (0) 2019.12.04
0002 Realm 활용하기 2  (0) 2019.11.22
0002 Realm 활용하기 3  (0) 2019.11.22

+ Recent posts