이번에는 Reactive Stream을 이용해서 실질적으로 현업에서 사용할 만한 코드를 작성해보겠다.
요구사항
- Oracle DB , OJdbc, DBCP2
- org.reactivesreams
- ExecutorService
A, B라는 Oracle DB User에 대해서 TB_USER로 부터 USER_ID를 가져와 출력하라는 요구 사항을 접수 받았다. 하지만 이 DB User의 수는 상황에 따라서 증가하거나 줄어들 수 있으며 그에 따라 고객이 빠른 응답을 받을 수 있도록 구성이 필요하다고 하자. 이와 같은 요구사항을 ReactiveStream(Java8)을 이용해서 구현해본다.
1번 : Main 진입점
DataSource를 생성한뒤 각각의 Command 객체 생성시 전달하여 DB Select시에 사용하려고 한다.
해당 부분은 Controller 를 통해서 전달받은 파라미터 및 구분자에 의해서 동적으로 변경될 수 있는 부분이라고 가정한다.
즉 commandHadlerList의 배열이 요청 갯수에 따라서 증가하거나 감소할 수 있다.
public static void main(String[] args) {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName("oracle.jdbc.driver.OracleDriver");
dataSource.setUrl("jdbc:oracle:thin:@***********:****/****");
dataSource.setUsername("*****");
dataSource.setPassword("*****");
BasicDataSource dataSource2 = new BasicDataSource();
dataSource2.setDriverClassName("oracle.jdbc.driver.OracleDriver");
dataSource2.setUrl("jdbc:oracle:thin:@***********:****/****");
dataSource2.setUsername("*****");
dataSource2.setPassword("*****");
CommandHandler commandHandler1 = new CommandHandler(1);
commandHandler1.add(new SelectDBCommand(dataSource));
commandHandler1.add(new SelectDBCommand(dataSource2));
List<CommandHandler> commandHandlerList = new ArrayList<>();
commandHandlerList.add(commandHandler1);
Publisher<CommandHandler> publisher = new SamplePublisher(commandHandlerList);
Subscriber<CommandHandler> subscriber = new SampleSubscriber(commandHandlerList.size());
publisher.subscribe(subscriber);
}
2번 : Publisher
Observer 패턴의 Observable과 같은 역할을 하며, Subscriber를 구독할 수 있게 설정하여 실행시 Publihser에서 Subscriber로 onNext, onComplete, onError를 처리할 수 있다.
public class SamplePublisher implements Publisher<CommandHandler> {
private final List<CommandHandler> commandHandlerList;
public SamplePublisher(List<CommandHandler> commandHandlerList){
this.commandHandlerList = commandHandlerList;
}
@Override
public void subscribe(Subscriber<? super CommandHandler> subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
commandHandlerList.forEach(subscriber::onNext);
// 데이터 전송 처리 완료
subscriber.onComplete();
}
@Override
public void cancel() {
}
});
}
}
3번 : Subcriber - Publisher 내부에서 호출한 각기 함수(onNext, onComplete, onError)에 맞게 진행된다.
SampleSubscriber가 생성되는 시점에 만들어진 ExectorService에 대해서 Java의 Future를 활용하여 onComplete 시점에 ExecutorService를 Shutdown 시킬 수 있게 처리하였다.
public class SampleSubscriber implements Subscriber<CommandHandler> {
private final ExecutorService executorService;
List<Future<?>> futures = new ArrayList<>();
public SampleSubscriber(int executeCount){
this.executorService = Executors.newFixedThreadPool(executeCount);
}
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(1);
}
@Override
public void onNext(CommandHandler commandHandler) {
futures.add(executorService.submit(commandHandler::StartAPICall));
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
for(Future<?> future : futures) {
try {
future.get();
}catch(Exception e){
e.printStackTrace();
}
}
executorService.shutdown();
}
}
4번 : Command Handler
3번 항목과 처리되는 방식은 거의 유사하며, Command Handler에서 각각의 Command를 ExecutorService를 이용하여 실행시키는 구조이다.
public class CommandHandler {
private final List<Command> commandList = new ArrayList<>();
List<Future<?>> futures = new ArrayList<>();
private final ExecutorService executorService;
public CommandHandler(int threadCount){
executorService = Executors.newFixedThreadPool(threadCount);
}
public void StartAPICall() {
commandList.forEach(command -> {
futures.add(executorService.submit(command::execute, "Success"));
});
for(Future<?> future : futures) {
try {
future.get();
}catch(Exception e){
e.printStackTrace();
}
}
executorService.shutdown();
}
public synchronized void add(Command command) {
commandList.add(command);
}
}
5번 : Command
public interface Command {
public void execute();
}
6번 : Command 구현 - SelectDBCommand
실질적으로 데이터를 조회하고, 이를 용도에 맞게 처리하는 Command의 구현체이다. SelectDBCommand 객체가
생성되는 시점에 필요한 정보를 모두 합성 방식을 이용하여 미리 준비하고, execute가 호출 되는 시점에 활용한다.
아래와 같은 방식으로 구성했을 때 ThreadSafe하게 구현되었다고 할 수 있다.
public class SelectDBCommand implements Command {
private final DataSource dataSource;
public SelectDBCommand(DataSource dataSource){
this.dataSource = dataSource;
}
@Override
public void execute() {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List<User> list = jdbcTemplate.query("SELECT USER_ID FROM *********", new CustomMapper());
System.out.println("START" + Thread.currentThread().getName());
list.stream().forEach(item -> {
System.out.print(item.getUSER_ID() + ",");
});
System.out.println("END" + Thread.currentThread().getName());
}
}
7번 : CustomMapper - JDBC Rowmapper
JdbcTemplate의 RowMapper를 구현하여 필요에 맞게 값을 바인딩 처리한다.
public class CustomMapper implements RowMapper<User> {
@Override
public User mapRow(ResultSet resultSet, int i) throws SQLException {
User user = new User();
user.setUSER_ID(resultSet.getString("USER_ID"));
return user;
}
}
일정한 시간을 주기로 2000건의 정수 배열을 받은 후 해당 각각의 결과 값을 합하는 요구사항이 발생하였다.
생성해야할 정수 배열은 난수로 이루어진 값으로 1000개의 아이템을 가지고 있다.
추가 요구사항
난수로 생성되는 2000건의 정수 배열에 대해서 각 배열의 총합이 음수인 값은 제외하고, 양수인 값을 추출하여하 출력한다.
( 사실, 자바8의 Stream API를 사용한다면 이런 힘들고 복잡한 작업을 전혀할 필요가 없다. 우리는 Reactive Stream의 기본이 되는 개념을 익히고 이를 실질적으로 활용하기 위한 Akka, RXJava, Java9 Flow API 등등을 쉽게 이해하기 위해 진행 중인 것이다. )
2편에서 다룬 코드 샘플을 아래와 같이 변경하였다.
아래의 코드에서 중요한 부분은 Publisher 1, Publisher 2를 연결하면서 처리되는 Data 흐름이다.
데이터 전달 순서 : CustomPublisher -> FilterPublisher -> CustomSubscriber
1번 : Main 진입점
public static void main(String[] args) {
// 1000개의 난수 배열을 가진 아이템 생성 - 각각을 합쳐
BlockingQueue<Command> blockingQueue = new ArrayBlockingQueue<Command>(500);
for (int i = 0; i < 500; i++) {
IntStream intStream = new Random().ints();
List<Integer> integerList = intStream.limit(1000).boxed().collect(Collectors.toList());
Command command = new IntegerSumCommand(integerList);
blockingQueue.add(command);
}
// Publisher 1
Publisher<Command> publisher = new CustomPublisher(blockingQueue);
// Publisher 2
Publisher<Command> publisher1 = new FilterPublisher(publisher);
// Subscriber
Subscriber subscriber = new CustomSubscriber();
// Start !
publisher1.subscribe(subscriber);
}
2번 : Publisher 1 : Publisher를 구현한 CustomPublisher
public class CustomPublisher implements Publisher<Command> {
private final BlockingQueue<Command> blockingQueue;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public CustomPublisher(BlockingQueue<Command> blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void subscribe(Subscriber subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
while (!blockingQueue.isEmpty()){
try {
Command command = blockingQueue.take();
subscriber.onNext(command);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
scheduledExecutorService.schedule(() -> {
if(Counter.get() == 500){
subscriber.onComplete();
if(!scheduledExecutorService.isShutdown()){
scheduledExecutorService.shutdown();
}
}
}, 1, TimeUnit.SECONDS);
}
@Override
public void cancel() {
}
});
}
}
3번 : Publisher 2 : Publisher 1을 전달 받아 값을 필터링하는 FilterPublisher
public class FilterPublisher implements Publisher<Command> {
private Publisher<Command> publisher;
public FilterPublisher(Publisher<Command> publisher){
this.publisher = publisher;
}
@Override
public void subscribe(Subscriber<? super Command> subscriber) {
this.publisher.subscribe(new Subscriber<Command>() {
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}
@Override
public void onNext(Command command) {
if(!command.isMinus()){
subscriber.onNext(command);
}
}
@Override
public void onError(Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
});
}
}
4번 : Subscriber : 최종적으로 Publisher 2의 데이터 처리를 전달받아 최종값을 출력한다.
public class CustomSubscriber implements Subscriber<Command> {
private ExecutorService executorService = Executors
.newFixedThreadPool(1000);
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Command command) {
executorService.execute(() -> {
int total = command.getTotal();
System.out.println(String.format("총 값은 %s", total));
});
}
@Override
public void onError(Throwable throwable) {
subscription.cancel();
}
@Override
public void onComplete() {
if(!executorService.isShutdown()){
executorService.shutdown();
}
}
}
5번 : Command Interface
public interface Command {
public int getTotal();
public boolean isMinus();
}
6번 : Command Implementation
public class IntegerHandlerCommand implements Command {
private Optional<Integer> optionalInteger;
public IntegerHandlerCommand(List<Integer> integerList){
Counter.addAndGet();
this.optionalInteger = integerList.stream().reduce(Integer::sum);
}
@Override
public int getTotal() {
return this.optionalInteger.orElse(0);
}
@Override
public boolean isMinus() {
return this.optionalInteger.orElse(0) < 0;
}
}
7번 : Counter Class - Executor Service 종료를 위한 Counter 서비스
public class Counter {
private static int count = 0;
public synchronized static int addAndGet(){
return count ++;
}
public synchronized static int get(){
return count;
}
}
Reactive Stream을 다루기 위해서 아래의 예제를 바탕으로 요구조건을 변경해가면서 코드 위주로 알아보겠다.
요구사항
일정한 시간을 주기로 2000건의 정수 배열을 받은 후 해당 각각의 결과 값을 합하는 요구사항이 발생하였다.
생성해야할 정수 배열은 난수로 이루어진 값으로 1000개의 아이템을 가지고 있다.
Reactive Stream Java 8 버전 기준
1번, Main : 진입점
public class Sample05_reactiveStream04 {
public static void main(String[] args) {
BlockingQueue<Command> blockingQueue = new ArrayBlockingQueue<Command>(2000);
for (int i = 0; i < 2000 ; i++) {
IntStream randomValueList = new Random().ints();
List<Integer> integerList = randomValueList.limit(1000).boxed().collect(Collectors.toList());
Command command = new InsertToDataBaseCommand(integerList);
blockingQueue.add(command);
}
Publisher publisher = new CustomPublisher(blockingQueue);
Subscriber subscriber = new CustomSubscriber();
publisher.subscribe(subscriber);
}
}
2번,Publisher : Publisher를 구현한 CustomPublisher
public class CustomPublisher<Command> implements Publisher<Command> {
private final BlockingQueue<Command> blockingQueue;
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
public CustomPublisher(BlockingQueue<Command> blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void subscribe(Subscriber<? super Command> subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
while (!blockingQueue.isEmpty()) {
try {
Command command = blockingQueue.take();
subscriber.onNext(command);
} catch (InterruptedException e) {
// Exception handling.
}
}
executorService.schedule(() -> {
if(Counter.get() == 2000){
System.out.println("onComplete");
subscriber.onComplete();
executorService.shutdown();
}
}, 1, TimeUnit.SECONDS);
}
@Override
public void cancel() {
}
});
}
}
3번,Subscriber : Subscriber를 구현한 CustomSubscriber
public class CustomSubscriber implements Subscriber<Command> {
private Subscription subscription;
private final ExecutorService executorService = Executors.newFixedThreadPool(1000);
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Command command) {
executorService.execute(command::work);
}
@Override
public void onError(Throwable t) {
System.out.println("We Error");
}
@Override
public void onComplete() {
System.out.println("We Finished");
executorService.shutdown();
}
}
4번, Command Interface
public interface Command {
public void work();
}
5번,Command Implementation : 실제 구현 객체
public class InsertToDataBaseCommand implements Command {
private final List<Integer> integerList;
public InsertToDataBaseCommand(List<Integer> listInteger){
integerList = listInteger;
}
@Override
public void work() {
Optional<Integer> sum = integerList.stream().reduce(Integer::sum);
System.out.println(Thread.currentThread().getName() + " Final Result : " + sum);
System.out.println(Thread.currentThread().getName() + " " + Counter.incrementAndGet());
}
}
6번,Counter Class : 수행된 횟수를 저장하고, 가져온다.
public class Counter {
private static final AtomicInteger atomicInteger = new AtomicInteger();
public static int incrementAndGet(){
return atomicInteger.incrementAndGet();
}
public static int get(){
return atomicInteger.get();
}
}
Reactive Programming은 선언적인 프로그래밍 패러다임으로써 데이터 스트림(Data Stream)과 변화의 전파에 초점을 맞추고 있다. 또한 이것의 핵심은 비동기 (Async) 이벤트와 Observer 디자인 패턴이다. 외부의 이벤트나 데이터가 발생하였을 때, 사용자에게 자연스러운 응답을 주고, 규모 탄력적으로 리소스를 사용하며 실패에 있어서 유연하게 대처할 수 있다.
Reactive Stream은 Reactive Programming을 근간으로 한 개발을 하기 위한 명세이며, Netflix, Pivotal, TypeSafe의 발의로 시작되었다.
// Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
// Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
// Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
// Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
위의 코드에서 Subscriber의 역할에 대해서 아래의 그림으로 확인해보면,
Publisher, Subscriber, Subscription 사이의 연관관계를 아래와 같이 구성해봤다.
Lines 25의 name은 Realm으로 부터 Model을 꺼내 데이터를 저장, 삭제, 수정할 수 있는 Key되는 명칭이다.
Lines 26 부터는 Model에서 사용할 Property를 지정하는데, Table의 Field라고 생각하면 되며, Field에 지정하는 Not Null, Nullable, Default Value, Primary Key , Type 등을 지정할 수 있다. 다만 데이터 타입으로 올 수 있는 것이 좀더 확장된다고 생각해도 좋을 것 같다.
Lines 34 :
- Realm 연결하기
> Realm의 새로운 객체를 생성 후 반환하다. 만약 Realm 이 아예 생성되어 있지않다면 새로 생성할 것이며, 이미 존재한다면 존재하는 Realm 객체를 반환할 것이다.
Lines 36 :
- 쓰기 트랜잭션을 사용하기 위해서 callback 메서드를 제공하는데, 동기적으로 호출 된다.
Lines 37 :
- 정의한 데이터 타입과 속성에 맞는 새로운 Realm 객체를 생성한다.
Lines 42 :
- Realm 오브젝트에 대해서 업데이트를 처리한다.
Lines 47 :
- Realm 안에 등록되어 있는 type의 모든 오브젝트를 반환한다.
- realm.objects로 부터 시작해 데이터를 조회 하고 정렬하는 등의 다양한 작업을 지원할 수 있으며, 메서드 체인으로 호출이 가능합니다.
프로세스는 컴퓨터에서 실행되고 있는 컴퓨터 프로그램으로 운영 체제로 부터 시스템 자원을 할당 받는 단위이다.
스레드는 프로세스 내에서 실행되는 여러 흐름의 단위를 이야기하고, 프로세스가 할당받은 자원을 실행하는 단위이다.
Context Switching
우리가 사용하는 프로그램들은 각기 프로세스를 가지고 있다. 한 프로세스가 CPU를 사용중일 때, 다른 프로세스가 CPU를 사용하기 위해, 이전의 프로세스 상태를 보관하고 새로운 프로세스 상태를 CPU에 적재하는 것을 Context Switching 이라고 하는데, 프로세스 단위의 Context Switching은 Data, Heap, Stack의 모든 것이 그대로 추가 된다. Process 간에는 데이터 공유가 불가능하다.
Thread Context Switching
Thread 간에도 지금 실행되고 있는 Thread에서 다른 Thread가 실행될 수 잇게 스케줄러가 현재 실행 중인 스레드를 잠시 멈추고, 이전 스레드의 Context를 저장하고 작업을 진행할 스레드의 Context를 저장하고 작업을 진행할 Thread의 Context를 읽어오는 작업을 한다. 프로세스와는 달리 Data와 Heap은 그대로 사용되어 공유하게 되고, Stack 만 추가되어 새로운 class 호출이 용이하다.
- 자바 병렬 프로그래밍 책 중,
잘 설계된 병렬 프로그램은 스레드를 사용해서 궁극적으로 성능을 향상시킬 수 있다. 하지만 스레드를 사용하면 실행 중에 어느정도 부하가 생기는 것도 사실이다.실행 중인 컨텍스트를 저장하고 다시 읽어 들여야 하며, 메모리를 읽고 쓰는 데 있어 지역성이 손실되고, 스레드를 실행하기도 버거운 CPU 시간을 스케쥴링하는데 소모해야한다. 또 스레드가 데이터를 공유할 때는 동기화 수단도 사용해야한다. 이런 동기화는 컴파일러 최적화를 방해하고, 메모리 캐시를 지우거나 무효화 하기도 한다.
이를 우리는 멀티스레드 상에서 동작시에 발생할 수 있는 성능 위험이라고 이야기할 수 있다.
멀티스레드와 관련된 사항
경쟁 조건
경쟁조건은 상대적인 시점이나 또는 JVM이 여러 스레드를 교차해서 실행하는 상황에 따라 계산의 정확성이 달라질 때 나타난다. 다시 말하자면 타이밍이 딱 맞았을 때만 정답을 얻는 경우를 말한다.
경쟁 조건 vs 데이터 경쟁
데이터 경쟁은 공유된 final이 아닌 필드에 대한 접근을 동기화로 보호하지 않았을 때 발생한다. 스레드가 다음에 다른 스레드가 읽을 수 있는 변수에 값을 쓰거나 다른 스레드가 마지막에 수정했을 수 있는 변수를 읽을 때 두 스레드 모두 동기화하지 않으면 데이터 경쟁이 생길 위험이 있다. 데이터 경쟁이 있는 코드는 자바 메모리 모델 하에선 유용한 정의된 의미가 없다. 모든 경쟁조건이 데이터 경쟁인 것 아니고, 모든 데이터 경쟁이 경쟁 조건인 것도 아니다. 하지만 경쟁조건이든 데이터 경쟁이든 병렬 프로그램을 예측할 수 없이 실패하게 만든다.
가시성 ( 메모리 가시성 )
메모리 상의 공유된 변수를 여러 스레드에서 서로 사용할 수 있게 하려먼 반드시 동기화 기능을 구현해야 한다. 공유 자원(변수)에 대해서는 반드시 적절한 동기화 작업이 필요하다.
동기화 기능을 지정하지 않으면 컴파일러나 프로세서, JVM(자바 가상 머신) 등이 프로그램 코드가 실행되는 순서를 임의로 바꿔 실행하는 이상한 경우가 발생하기도 한다. 다시 말하자면, 동기화 되지 않은 상황에서 메모리 상의 변수를 대상으로 작성해둔 코드가 '반드시 이런 순서로 동작할 것이다' 라고 단정 지을 수 없다.
내장된 락을 적절히 활용하면 특정 스레드가 특정 변수를 사용하려 할 때 이전에 동작한 스레드가 해당 변수를 사용하고 난 결과를 상식적으로 예측할 수 있는 상태에서 사용할 수 있다. 락은 상호 배제 뿐만 아니라 정상적인 메모리 가시성을 확보하기 위해서도 사용된다. 변경 가능하면서 여러 스레드가 공유해 사용하는 변수를 각 스레드에서 각자 최신의 정상적인 값으로 활용하려면 동일한 락을 사용해 모두 동기화 시켜야 한다.
여러 스레드에서 접근할 수 있고 변경 가능한 모든 변수를 대상으로 해당 변수에 접근할 때는 항상 동일한 락을 먼저 확보한 상태여야 한다. 이 경우 해당 변수는 확보된 락에 의해 보호된다고 말한다. 모든 변경할 수 있는 공유 변수는 정확하게 단 하나의 락으로 보호해야한다. 유지보수 하는 사람이 알 수 있게 어느 락으로 보호하고 있는지를 명확하게 표시하라. 락을 활용함에 있어 일반적인 사용 예는 먼저 모든 변경 가능한 변수를 객체 안에 캡슐화 하고, 해당 객체의 암묵적인 락을 사용해 캡슐화한 변수에 접근하는 모든 코드 경로를 동기화 함으로써 여러스레드가 동시에 접근하는 상태에서 내부 변수를 보호하는 방법이다
public class TestLockProtect {
private Lock lockForProtect = new Lock();
private String targetValue = "";
public String getTargetValue(){
synchronized(this){
targetValue = "Value";
return targetValue;
}
}
}
public class TestLockProtect {
private Lock lockForProtect = new Lock();
private String targetValue = "";
public String getTargetValue(){
lockForProtect.lock();
targetValue = "Value";
lockForProtect.unlock();
return targetValue;
}
}
public class Lock {
private boolean isLocked = false;
public synchronized void lock() {
throws InterruptedException{
while(isLocked){
wait();
}
isLocked = true;
}
public synchronized void unlock(){
isLocked = false;
notify();
}
}
}
// synchronized의 경우 내부 함수의 synchronized에 진입이 가능하다. 그 이유는 'this' 에 대한 동기화를 의미하기 때문이다.
// 하지만 Lock에 대해서는 내부 함수 내의 lock에 대해서는 재진입이 불가능하다.
단일 동작과 복합동작
a = b ++; 를 봤을 때 동기화가 안되어 있다면 b를 증가하는 연산(a++)과 a에 대입하는 연산(a=...)이 두 개가 있으므로 단일 연산으로 볼 수 없다. 해당 연산이 동기화 되었다면 외부에서 성공과 실패로 나뉘므로 단일 연산이다.
가능하면 클래스 상태를 관리하기 위해 AtomicLog 처럼 스레드에 안전하게 이미 만들어져 있는 객체를 사용하는 편이 좋다. 스레드 안전하지 않는 상태 변수를 선언해두고 사용하는 것보다 이미 스레드 안전하게 만들어진 클래스가 가질 수 있는 가능한 상태의 변화를 파악하는 편이 휠씬 쉽고, 스레드 안전성을 더 쉽게 유지하고 검증할 수 있다.
재진입성
스레드가 다른 스레드가 가진 락을 요청하면 해당 스레드는 대기 상태에 들어간다. 하지만 암묵적인 락은 재진입 가능하기 때문에 특정 스레드가 자기가 이미 획득한 락을 다시 확보할 수 있다. 재진입성은 확보 요청의 단위가 아닌 스레드 단위로 락을 얻는다는 것을 의미한다.
public class Reentrancy {
public synchronized void getA(){
System.out.println("a");
// b 가 synchronized 로 선언되어 있지만 a 진입시 이미 락을 획득하였으므로, b를 호출할 수 있다.
b();
}
public synchronized void b(){
System.out.println("b");
}
public static void main(String[] args){
new Reentrancy().a();
}
}
컴퓨터 프로그램 또는 서브 루틴에 재진입성이 있으면, 이 서브 루틴은 동시에(병렬) 안전하게 실행 가능하다. 즉 재진입이 가능한 루틴은 동시에 접근해도 언제나 같은 실행결과를 보장한다. 재진입이 가능하려면 함수는 다음 조건을 만족해야 한다.
정적 (전역) 변수를 사용하면 안된다.
정적 (전역) 변수의 주소를 반환하면 안된다.
호출자가 호출시 제공한 매개변수만으로 동작해야한다.
싱글턴 객체의 잠금에 의존하면 안된다.
다른 비-재진입 함수를 호출하면 안된다.
원자성
CPU가 처리하는 하나의 단일 연산을 의미한다. 하나의 Thread에서 읽기와 쓰기, 다른 Thread에서는 읽기만 한다면, 원자성을 고려한 변수를 선언하고 싶은 경우, volatile 변수를 활용한다. ( 메모리 문제 고려해야함 )
Volatile 변수는 약간 다른 형태의 좀더 약한 동기화 기능을 제공하는데, 다시 말해 volatile로 선언된 변수의 값을 바꿨을 때 다른 스레드에서 항상 최신 값을 읽어갈 수 있도록 해준다. 특정 변수를 선언할 때 volatile 키워드를 지정하면 컴파일러와 런타임 모두 '이 변수는 공유해 사용하고, 따라서 실행 순서를 재배치해서는 안된다'라고 이해한다. volatile로 지정된 변수는 프로세스의 레지스터에 캐시 되지도 않고, 프로세서의 외부 캐시에도 들어가지 않기 때문에 volatile 변수의 값을 읽으면 항상 다른 스레드가 보관해둔 최신의 값을 읽어갈 수 있다.
Volatile를 사용하기 적합할 때
MultiThread 환경에서 하나의 Thread만 Read&Write를 하고 나머지 Thread가 read 하는 상황에서 가장 최신의 값을 보장합니다. 하지만 하나의 Thread에서만 write하더라도, 그 변수(메모리)에 대한 접근과 수정이 잦다면 다른 스레드에서 read할때 원자성이 보장되지 않을 것 같습니다.
Volatile를 사용하기 부적합할 때
하나의 Thread가 아닌 여러 Thread에서 write를 하는 상황에서는 적합하지 않습니다. 이와 같은 경우에는 synchronized를 통해서 read & write의 원자성을 보장해야 합니다.
스레드 한정
스택 한정 기법은 특정 객체를 로컬 변수를 통해서만 사용할 수 있는 특별한 경우의 스레드 한정 기법이라고 할 수 있다. 변수를 클래스 내부에 숨겨두면 변경상태를 관리하기가 쉬운데, 또한 클래스 내부에 숨겨둔 변수는 특정 스레드에 쉽게 한정시킬 수도 있다. 로컬 변수는 모두 암묵적으로 현재 실행 중인 스레드에 한정되어 있다고 볼 수 있다.
public int loadTheArk(Collection<Animal> candidates) {
SortedSet<Animal> animals;
int numPairs = 0;
Animal candidate = null;
// animals 변수는 메소드에 한정되어 있으며, 유출돼서는 안된다.
animals = new TreeSet<Animal> ( new SpeciesGenderComparator());
animals.addAll(candidates)
for ( Animal a : animals){
if(candidate == null || !candidate.isPotentialMate(a))
candidate = a;
else {
ark.load(new AnimalPair(candidate, a));
++numPairs;
candidate = null;
}
}
return numPairs;
}
Final에 대하여
final을 지정한 변수의 값을 변경할 수 없다. 물론 변수가 가리키는 객체가 불변 객체가 아니라면 해당 객체에 들어있는 값은 변경할 수 있다. final 키워드를 적절하게 사용하면 초기화 안정성을 보장하기 때문에 별다른 동기화 작업 없이 불변 객체를 자유롭게 사용하고 공유할 수 있다.
@Immutable
class OneValueCache {
private final BigInteger lastNumber;
private final BigInteger[] lastFactors;
public OneValueCache(BigInteger i, BigInteger[] factors){
lastNumber = i;
lastFactors = Arrays.copyOf(factors, factors.length);
}
public BigInteger[] getFactors(BigInteger i) {
if ( lastNumber == null || !lastNumber.equals(i))
return null;
else
return Arrays.copyOf(lastFactors, lastFactors.leghth);
}
}
불변 객체의 요구조건
상태를 변경할 수 없어야 하고,
모든 필드의 값이 final로 선언되어야 하며,
적절한 방법으로 생성되어야 한다.
멀티스레드에 대한 스레드 안정성
동기화를 처리할 수 있는 3가지 방식
만약 여러 스레드가 변경할 수 있는 하나의 상태 변수를 적절한 동기화 없이 접근하면 그 프로그램은 잘못된 것이다. 이렇게 잘못된 프로그램을 고치는 데는 세가지 방법이 있다.
해당 상태 변수를 스레드 간에 공유되지 않거나
해당 상태 변수를 변경할 수 없도록 만들거나
해당 상태 변수에 접근할 때는 언제나 동기화
스레드 안전한 클래스를 설계할 땐, 바람직한 객체 지향 기법이 왕도다. 캡슐화와 불변객체를 잘 활용하고, 불변 조건을 명확하게 기술해야 한다.
여러 스레드가 클래스에 접근할 때, 실행 환경이 해당 스레드들의 실행을 어떻게 스케쥴하든 어디에 끼워 넣든, 호출하는 쪽에서 추가적인 동기화나 다른 조율 없이도 정확하게 동작하면 해당 클래스는 스레드 안전하다고 말한다.
애당초 단일 스레드 환경에서도 제대로 동작하지 않으면 스레드 안전할 수 없다. 객체가 제대로 구현됬으면 어떤 일련의 작업도 해당 객체의 불변 조건이나 후조건에 위배될 수 없다.
스레드 안전한 클래스는 클라이언트 쪽에서 별도로 동기화할 필요가 없도록 동기화 기능도 캡슐화 한다.
// 상태가 없는 항상 안전한 객체
// 상태 없는 객체에 접근하는 스레드가 어떤 일을 하든 다른 스레드가 수행하는 동작의 정확성에 영향을 끼칠 수 없기 때문에 객체는 항상 스레드에 안전하다.
@ThreadSafe
public class StatelessFactorizer implements Servlet {
public void service(ServletRequest req, SevletRespones resp){
BigInteger i = extractFromRequest(req);
BigInteger[] factors = factor(i);
encodeIntoResponse(resp, factors);
}
}
안전한 공개 방법의 특성
객체를 안전하게 공개하려면 해당 객체에 대한 참조와 객체 내부의 상태를 외부의 스레드에게 동시에 볼 수 있어야 한다. 올바르게 생성 메소드가 실행되고 난 개체는 아래와 같이 처리할 수 있다.
객체에 대한 참조를 static 메서드로 초기화 시킨다.
객체에 대한 참조를 volatile 변수 또는 AtomicReference 클래스에 보관한다.
객체에 대한 참조를 올바르게 생성된 클래스 내부의 final 변수에 보관한다.
락을 사용해 올바르게 막혀 있는 변수에 객체에 대한 참조를 보관한다.
HashTable, ConcurrentMap, synchronizedMap을 사용해 만든 Map 객체를 사용하면 그 안에 보관하고 있는 키와 값 모두를 어느 스레드에서라도 항상 안전하게 사용할 수 있다.
스레드 한정
읽기 전용 개체를 공유
스레드에 안전한 객체를 공유
동기화 방법 적용
스레드 안전한 클래스 설계
클래스가 스레드 안정성을 확보하도록 설계하고자 할 때에는 아래를 고려해야한다.
객체의 상태를 보관하는 변수가 어떤 것인가?
객체의 상태를 보관하는 변수가 가질 수 있는 값이 어떤 종류, 어떤 범위에 해당하는가?
객체 내부의 값을 동시에 사용하고자 할 때, 그 과정을 관리할 수 잇는 정책
/*
primitive type을 사용할 경우 아래와 같이 객체의 상태를 완벽하기 동기화 처리 가능하다.
*/
@ThreadSafe
public final class Counter {
@GuardedBy("this") private long value = 0;
public synchronized long getValue() {
return value;
}
public synchronized long increment() {
if ( value == Long.MAX_VALUE )
throw new IllegalStateException("counter overflow");
return ++value;
}
}
인스턴스 한정
객체를 적절하게 캡슐화하는 것으로도 스레드 안정성을 확보할 수 있는데, 이런 경우 흔히 '한정' 이라고 단순하게 부르기도 하는 '인스턴스 한정' 기법을 활용하는 셈이다.
@ThreadSafe
public class PersonSet {
@GuardedBy("this")
private final Set<Person> mySet = new HashSet<Person>();
public synchronized void addPerson(Person p) {
mySet.add(p);
}
public synchronized boolean containsPerson(Person p) {
return mySet.contains(p);
}
}
스레드의 안정성을 확보하는 방법으로 대부분 데코레이터(장식자) 패턴을 활용한다.
자바 모니터 패턴
자바 모니터 패턴을 따르는 객체는 변경 가능한 데이터를 모두 객체 내부에 숨긴 다음 객체의 암묵적인 락으로 데이터에 대한 동시 접근을 막는다. 자바 모니터 패턴은 단순한 관례에 불과하며 일정한 형태로 스레드 안전성을 확보할 수 있다면 어떤 형태의 락을 사용해도 무방하다.
public class PrivateLock {
private final Object myLock = new Object();
@GuardedBy("myLock") Widget Widget;
void someMethod() {
synchronized (myLock){
}
}
}