출처1 : https://www.javacodegeeks.com/2012/08/io-demystified.html
출처2 : https://docs.oracle.com/javase/7/docs/api/java/nio/channels/AsynchronousChannelGroup.html
출처3 : https://docs.oracle.com/javase/7/docs/api/java/nio/channels/AsynchronousChannel.html
출처4 : https://docs.oracle.com/javase/7/docs/api/java/nio/channels/CompletionHandler.html
출처5 : https://yangbox.tistory.com/28
출처6 : https://stackoverflow.com/questions/20057497/program-does-not-terminate-immediately-when-all-executorservice-tasks-are-done
출처7 : https://velog.io/@tkadks123/Java-NIO%EC%97%90-%EB%8C%80%ED%95%B4-%EC%95%8C%EC%95%84%EB%B3%B4%EC%9E%90-22

Proactor 패턴

리액터 패턴에 이어 프로액터 패턴을 알아 보겠습니다.

1. Reactor 패턴

프로액터 패턴을 알아보기 전 리액터 패턴에 대해 먼저 알아 보겠습니다.

동기 이벤트 디멀티플렉서를 활용한 리액터 패턴 구조도

이전 글에서 구현한 Reactor 패턴은 동기 이벤트 디멀티플렉서인 Selector 를 통해 Handle 을 받아오고 Handler 로 Handle 을 처리 하는 구조 입니다.

Selector 는 IO Complete 이벤트를 기다리는 동안 블로킹 됩니다. 애플리케이션 동작 과정에서 블로킹이 발생하지만 IO 작업을 위해 블로킹 되지 않습니다.
싱글스레드에서 IO 작업을 위해 블로킹이 발생하면, 이전 연결의 IO 작업이 완료되지 않았을 때 다음 연결을 처리할 수 없습니다.
하지만 동기 이벤트 디멀티플렉서(Selctor)를 사용하면 IO 작업을 위해 블로킹 되지 않기 때문에 싱글스레드에서 이전 연결의 IO 작업과 상관없이 여러 연결의 IO 를 처리할 수 있습니다.

그리고 Dispatcher 에서 Selecotr 를 호출해 Handle 을 리턴 받고 핸들러에 전달해 처리하기 때문에 동기적인 작업 흐름이라고 볼 수 있습니다.

리액터 패턴에 대한 자세한 내용은 아래 링크를 참고해 주세요.

2. Proactor 패턴

프로액터 패턴 구조도

Proactor 패턴은 비동기 IO 모델로서 애플리케이션이 블로킹 되는 부분 없이 실행됩니다. 그리고 IO 작업의 결과는 호출한 곳에서 처리되지 않고 CompletionHandler 콜백을 통해 비동기적으로 실행됩니다.

2-1. AsynchronousChannelGroup

리소스 공유를 위한 AsynchronousChannel 의 그룹입니다.
그룹에 연결된 AsynchronousChannel 이 실행한 비동기 IO 작업의 결과를 처리하는데 필요한 과정이 캡슐화 되어 있습니다.

AsynchronousChannel 는 IO 이벤트를 처리하고 AsynchronousChannel 에서 수행된 비동기 작업의 결과를 CompletionHandler 로 전달하는데 사용하는 스레드 풀을 가지고 있습니다.
스레드 풀은 비동기 IO 작업을 처리하기 위한 다른 태스크를 실행하기 위해서도 쓰일 수 있습니다.

AsynchronousChannelGroup 은 withFixedThreadPool 또는 withCachedThreadPool 메소드를 통해 생성됩니다.
AsynchronousChannel 은 특정 그룹을 지정해 연결되며 생성됩니다. 그룹을 명시하지 않은 채널은 디폴트 그룹에 등록되고 디플트 그룹도 스레드 풀을 가지고 있습니다.

디플트 그룹의 스레드 풀의 스레드는 특별한 설정이 없는 한 데몬 스레드로 만들어 집니다.
데몬 스레드는 사용자 스레드가 종료되면 작업이 끝날 때 까지 기다리지 않고 종료됩니다. 디폴트 그룹에 등록된 AsynchronousServerSocketChannel 은 연결을 accept 하기 전에 사용자 스레드가 종료되어 연결에 실패할 수 있습니다.
따라서 while (true) … 와 같은 반복문을 넣어 사용자 스레드가 종료되지 않도록 유지해야 합니다.

public class ServerInitializer {

  private static int PORT = 6000;
  private static int threadPoolSize = 8;
  private static int initialSize = 4;
  private static int backlog = 50;
  
  ...

  public static void main(String[] args) {
    ...

    try {
      AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port));
      AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(listener);
       listener.accept(state, acceptCompletionHandler);
    } catch (IOException e) {
      e.printStackTrace();
    }
 
    // Sleep indefinitely since otherwise the JVM would terminate
    while (true) {
      try {
        Thread.sleep(Long.MAX_VALUE);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

Executors.newCachedThreadPool() 를 사용해 만들어지는 스레드 풀의 스레드는 non-daemon 스레드 입니다. 따라서 Executors.newCachedThreadPool() 를 통해 만들어진 스레드 풀을 가진 AsynchronousChannelGroup 에 등록된 AsynchronousServerSocketChannel 이 연결을 accept 하기 전에 사용자 스레드가 종료되지 않습니다.

public class ServerInitializer {

  private static int PORT = 6000;
  private static int threadPoolSize = 8;
  private static int initialSize = 4;
  private static int backlog = 50;

  ...

  public static void main(String[] args) {
    ...

    ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);

    try {
      AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(executor, initialSize);
      AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);
      listener.bind(new InetSocketAddress(PORT), backlog);
      listener.accept(listener, new Dispatcher(handleMap));
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

AsynchronousChannelGroup 에 연결된 AsynchronousChannel 의 IO 작업 완료 핸들러는 그룹에 풀링된 스레드 중 하나를 통해 실행됩니다. IO 작업이 즉시 완료되고 실행 중인 스레드가 그룹에 풀링된 스레드 중 하나인 경우 완료 핸들러는 실행 중인 스레드에서 실행 될 수도 있습니다.

2-2. AsynchronousChannel

비동기 IO 작업을 지원하는 채널입니다. 비동기 IO 작업은 두 가지 방법으로 처리됩니다.

  • Future operation(...)
  • void opertaion(… A attachment, CompletionHandler<V, ? super A> handler)

opertion 은 read, write 같은 IO 작업의 이름이고 V 는 IO 작업 결과의 유형입니다. 그리고 A 는 IO 작업에 컨텍스트를 제공하기 사용되는 개체입니다.

Future 인터페이스에 의해 정의된 메서드는 작업이 완료 되었는지 폴링하고 작업의 결과를 확인하는데 사용됩니다.
CompletionHandler 는 IO 작업의 결과를 콜백으로 처리하기 위해 사용됩니다.

AsynchronousChannel 은 멀티스레드 환경에서 Thread-Safe 합니다.

2-3 CompletionHandler

비동기 IO 작업의 결과를 사용하기 위한 핸들러 입니다.
IO 작업이 성공하면 completed 메소드가 실행되고 실패하면 failed 메소드가 실행됩니다.

2-4. 코드

Java NIO 2 API 를 사용해 만든 에코 서버 입니다.

public class ProactorInitiator {
  static int ASYNC_SERVER_PORT = 4333;
 
  public void initiateProactiveServer(int port)
    throws IOException {
 
    final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port));
     AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(listener);
     SessionState state = new SessionState();
     listener.accept(state, acceptCompletionHandler);
  }
 
  public static void main(String[] args) {
    try {
       System.out.println('Async server listening on port : ' + ASYNC_SERVER_PORT);
       new ProactorInitiator().initiateProactiveServer(ASYNC_SERVER_PORT);
    } catch (IOException e) {
      e.printStackTrace();
    }
 
    // Sleep indefinitely since otherwise the JVM would terminate
    while (true) {
      try {
        Thread.sleep(Long.MAX_VALUE);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}
 
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, SessionState> {

  private AsynchronousServerSocketChannel listener;
 
  public AcceptCompletionHandler(AsynchronousServerSocketChannel listener) {
    this.listener = listener;
  }
 
  @Override
  public void completed(AsynchronousSocketChannel socketChannel, SessionState sessionState) {
    // accept the next connection
    SessionState newSessionState = new SessionState();
    listener.accept(newSessionState, this);
 
    // handle this connection
    ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
    ReadCompletionHandler readCompletionHandler = new ReadCompletionHandler(socketChannel, inputBuffer);
    socketChannel.read(inputBuffer, sessionState, readCompletionHandler);
  }
 
  @Override
  public void failed(Throwable exc, SessionState sessionState) {
   // Handle connection failure...
  }
}
 
public class ReadCompletionHandler implements CompletionHandler<Integer, SessionState> {
 
  private AsynchronousSocketChannel socketChannel;
  private ByteBuffer inputBuffer;

  public ReadCompletionHandler(AsynchronousSocketChannel socketChannel, ByteBuffer inputBuffer) {
   this.socketChannel = socketChannel;
   this.inputBuffer = inputBuffer;
  }
 
  @Override
  public void completed(Integer bytesRead, SessionState sessionState) {
    byte[] buffer = new byte[bytesRead];
    inputBuffer.rewind();
    // Rewind the input buffer to read from the beginning
 
    inputBuffer.get(buffer);
    String message = new String(buffer);
 
    System.out.println('Received message from client : ' + message);
 
    // Echo the message back to client
    WriteCompletionHandler writeCompletionHandler = new WriteCompletionHandler(socketChannel);
 
    ByteBuffer outputBuffer = ByteBuffer.wrap(buffer);
 
    socketChannel.write(outputBuffer, sessionState, writeCompletionHandler);
  }
 
  @Override
  public void failed(Throwable exc, SessionState attachment) {
    //Handle read failure.....
  }
}
 
public class WriteCompletionHandler implements CompletionHandler<Integer, SessionState> {
 
  private AsynchronousSocketChannel socketChannel;
 
  public WriteCompletionHandler(AsynchronousSocketChannel socketChannel) {
    this.socketChannel = socketChannel;
  }
 
  @Override
  public void completed(Integer bytesWritten, SessionState attachment) {
    try {
      socketChannel.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
 
  @Override
  public void failed(Throwable exc, SessionState attachment) {
   // Handle write failure.....
  }
}
 
public class SessionState {
 
  private Map<String, String> sessionProps = new ConcurrentHashMap<String, String>();
 
  public String getProperty(String key) {
    return sessionProps.get(key);
  }
 
  public void setProperty(String key, String value) {
    sessionProps.put(key, value);
  }
 
}