지난 글에서 HttpClient에 대해서 살펴봤다. 이번 글에서는 HttpAsyncClient에 대해서 살펴보도록 하겠다.
먼저 기본적인 설정은 아래와 같이 할 수 있다. httpClient와 유사하며 IOReactor 설정이 추가됐다.
HttpAsyncClient httpAsyncClient() {
// 전체 커넥션 풀
int maxPool = 200;
// 호스트 당 커넥션 풀에서 쓸 수 있는 커넥션 개수
int maxPerRoute = maxPool;
// 연결을 설정한 후 데이터를 기다리는 시간(패킷 간 기다리는 시간(요청 한번에 패킷이 여러개 가는데 각 패킷 사이의 거리 같은 느낌))
int socketTimeoutMilliSec = 2000;
// 유휴 커넥션이 이 시간이 지나면 커넥션 반납
int idleConnectionTimeoutSec = 30;
// 커넥션 풀로부터 커넥션 받기를 대기하는 시간
long requestConnectionTimeoutMilliSec = 5000;
// 서버로부터 응답을 받기 위한 최대 대기 시간
long responseTimeoutMilliSec = 5000;
// 원격 호스트와의 연결을 설정하는 데 걸리는 시간
long connectionTimeoutMilliSec = 3000;
// 어떠한 예외로 인해 처리하지 못한 요청에 대한 retry 횟수
int retryCount = 1;
// retry 간 back off
long backoff = 1000;
PoolingAsyncClientConnectionManager cm = new PoolingAsyncClientConnectionManager();
cm.setMaxTotal(maxPool);
cm.setDefaultMaxPerRoute(maxPerRoute);
CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients.custom()
.setVersionPolicy(HttpVersionPolicy.NEGOTIATE)
.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy()) // default 값 서버에서 받은 keep-alive
.setRetryStrategy(new DefaultHttpRequestRetryStrategy(retryCount, TimeValue.ofMilliseconds(backoff))) // 몇번 재시도 할지
.setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE) // default로 켜져 있는데 일단 명시함
.setDefaultRequestConfig(
RequestConfig.custom()
//.setContentCompressionEnabled(true) 서버에서 압축 요청을 처리 가능한지
.setResponseTimeout(responseTimeoutMilliSec, TimeUnit.MILLISECONDS)
.setConnectionRequestTimeout(requestConnectionTimeoutMilliSec, TimeUnit.MILLISECONDS)
.build()
)
.setConnectionManager(cm)
.evictExpiredConnections() // 이거 뭔가 문제 생기면 끄기. proactively evict expired connections from the connection pool using a background thread.
.evictIdleConnections(TimeValue.ofSeconds(idleConnectionTimeoutSec)) // 유휴 커넥션이 이 시간(초)동안 존재하면 커넥션 종료
.setIOReactorConfig(IOReactorConfig.DEFAULT)
.build();
/* default 설정은 아래와 같음
IOReactorConfig.custom()
.setSoTimeout(Timeout.ZERO_MILLISECONDS) // 패킷 간 거리 느낌
.setSoReuseAddress(false) //
.setSoLinger(TimeValue.NEG_ONE_SECOND) // 소켓 연결 종료 보류 시간 (소켓 close 호출 후 이 시간 만큼 보류)
.setSoKeepAlive(false) // 이거 http 계층에서의 keep-alive 와 어떻게 다른지 모르겠음
.setTcpNoDelay(true) // tcp 패킷 스트림으로 보내는거
.setSndBufSize(0) // 전송 버퍼 크기
.setRcvBufSize(0) // 수신 버퍼 크기
.setIoThreadCount(Runtime.getRuntime().availableProcessors()) // IO 스레드 개수
.setBacklogSize(0) // 연결 대기 큐 크기
// 0으로 설정된 값들은 적당한 값을 자동으로 설정하는 듯..?(포트 0 할당할 때 알아서 포트 찾는거 마냥)
.build();
*/
return httpAsyncClient;
}
사용법은 아래와 같다. futureCallback 클래스에 응답 콜백이 담겨서 오는데, 각각 실패, 성공, 취소 케이스에 대해 어떤 작업을 할지 정의 할 수 있다.
client.start();
SimpleHttpRequest httpGet = SimpleRequestBuilder.get("https://google.com").build();
client.execute(httpGet, new FutureCallback<>() {
@Override
public void completed(SimpleHttpResponse result) {
// 응답 수신을 완료했을 때의 동작
}
@Override
public void failed(Exception ex) {
// 요청을 보내는데 실패했을 때의 동작
}
@Override
public void cancelled() {
// 요청이 취소되었을 때의 동작
}
});
이제 이 비동기 요청이 내부적으로 어떻게 흘러가는지 살펴보자.
먼저 HttpAsyncClient 클래스 내의 start() 메소드는 아래와 같이 구성되어 있다.
public final void start() {
if (this.status.compareAndSet(AbstractHttpAsyncClientBase.Status.READY, AbstractHttpAsyncClientBase.Status.RUNNING)) {
this.executorService.execute(new Runnable() {
public void run() {
AbstractHttpAsyncClientBase.this.ioReactor.start();
}
});
}
}
위 코드를 보면 IOReactor 라는 클래스가 start() 실행되는 것을 확인 가능하다.
그렇다면 start() 메소드는 어떤 행위를 할까?
클래스를 타고타고 들어가보니
MultiCoreIOReactor의 start() 메소드가 호출되는데, 여기서는 내부적으로 배열로 선언된 thread의 run() 메소드를 호출한다.
public final void start() {
if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
for(int i = 0; i < this.threads.length; ++i) {
this.threads[i].start();
}
}
}
그렇다면 이 스레드는 어떻게 생성될까?
코드를 추적한 결과 IOReactor는 DefaultConnectingIOReactor 를 기본 값으로 사용하는데, 이 클래스의 생성자는 다음과 같이 구현되어 있다.
public DefaultConnectingIOReactor(IOEventHandlerFactory eventHandlerFactory, IOReactorConfig ioReactorConfig, ThreadFactory threadFactory, Decorator<IOSession> ioSessionDecorator, Callback<Exception> exceptionCallback, IOSessionListener sessionListener, Callback<IOSession> sessionShutdownCallback) {
Args.notNull(eventHandlerFactory, "Event handler factory");
this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
this.workers = new SingleCoreIOReactor[this.workerCount];
Thread[] threads = new Thread[this.workerCount];
for(int i = 0; i < this.workers.length; ++i) {
SingleCoreIOReactor dispatcher = new SingleCoreIOReactor(exceptionCallback, eventHandlerFactory, ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT, ioSessionDecorator, sessionListener, sessionShutdownCallback);
this.workers[i] = dispatcher;
threads[i] = (threadFactory != null ? threadFactory : THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
}
this.ioReactor = new MultiCoreIOReactor(this.workers, threads);
this.workerSelector = IOWorkers.newSelector(this.workers);
}
즉, MultiCoreIOReactor에서 호출한 threads는 SingleCoreIoReactor 인스턴스를 여러개 갖고 있는 IOReactorWorker라는 객체가 들어있는데, 이 클래스는 다음과 같이 구현되어 있다.
final class IOReactorWorker implements Runnable {
private final AbstractSingleCoreIOReactor ioReactor;
private volatile Throwable throwable;
public IOReactorWorker(AbstractSingleCoreIOReactor ioReactor) {
this.ioReactor = ioReactor;
}
public void run() {
try {
this.ioReactor.execute();
} catch (Error var2) {
this.throwable = var2;
throw var2;
} catch (Exception var3) {
this.throwable = var3;
}
}
public Throwable getThrowable() {
return this.throwable;
}
}
즉 실행할 때 ioReactor의 execute 라는 메소드를 실행하며 이 메소드를 추적한 결과 SingleCoreIOReactor 클래스의 doExecute() 메소드가 수행되는 것을 확인할 수 있다.
근데 여기서 doExecute()를 이해하기 위해서는 java의 Selector라는 클래스를 이해해야 코드가 읽힐 것이다.
Selector란 자바에서 하나의 스레드로 여러 요청을 처리할 수 있게 도와주는 자바 클래스다. Selector는 마치 일급 컬렉션처럼 구현되어 있다. 내부적으로 소켓 채널이라는 객체를 Set으로 갖고 있으며 이 Set을 탐색하며 연결 여부를 확인하고 데이터를 읽고 쓰는 작업을 수행한다. 이를 코드로 보면 아래와 같다. (작성 방법은 Selector 클래스 위에 작성되어 있는 javadoc을 읽으면 어느정도 도움이 될 것이다.)
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.io.IOException;
public class NonBlockingEchoServer {
public static void main(String[] args) throws IOException {
// 서버 소켓 채널 생성 및 설정
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(8080));
serverChannel.configureBlocking(false); // 논블로킹 모드 설정
// Selector 생성 및 서버 소켓 채널을 selector에 등록
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server started on port 8080...");
// 버퍼 준비
ByteBuffer buffer = ByteBuffer.allocate(256);
while (true) {
// 준비된 I/O 이벤트가 있는지 확인
selector.select();
// 준비된 이벤트들의 키 셋을 가져옴
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 연결 수락 준비가 되었으면
if (key.isAcceptable()) {
register(selector, serverChannel);
}
// 읽기 준비가 되었으면
if (key.isReadable()) {
answerWithEcho(buffer, key);
}
// 처리된 키는 삭제
iter.remove();
}
}
}
private static void register(Selector selector, ServerSocketChannel serverChannel) throws IOException {
SocketChannel client = serverChannel.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
System.out.println("Connection Accepted: " + client.getLocalAddress() + "\n");
}
private static void answerWithEcho(ByteBuffer buffer, SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
if (buffer.position() == 0) {
// 클라이언트가 연결을 종료
client.close();
System.out.println("Connection closed by client");
return;
}
buffer.flip();
client.write(buffer);
buffer.clear();
}
}
다시 본문으로 돌아와 doExecute() 메소드는 아래와 같이 구성되어 있다.
void doExecute() throws IOException {
while(true) {
if (!Thread.currentThread().isInterrupted()) {
int readyCount = this.selector.select(this.selectTimeoutMillis); // (1)
if (this.getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
if (this.shutdownInitiated.compareAndSet(false, true)) {
this.initiateSessionShutdown();
}
this.closePendingChannels();
}
if (this.getStatus() != IOReactorStatus.SHUT_DOWN) {
if (readyCount > 0) {
this.processEvents(this.selector.selectedKeys()); // (2)
}
this.validateActiveChannels(); // (3)
this.processClosedSessions(); // (4)
if (this.getStatus() == IOReactorStatus.ACTIVE) {
this.processPendingChannels(); // (5)
this.processPendingConnectionRequests(); // (6)
}
if ((this.getStatus() != IOReactorStatus.SHUTTING_DOWN || !this.selector.keys().isEmpty()) && this.getStatus() != IOReactorStatus.SHUT_DOWN) {
continue;
}
}
}
return;
}
}
보는 바와 같이 이벤트 루프가 돌아가고 있으며 주석에 숫자를 매긴 메소드를 간단히 설명하자면 다음과 같다. (참고로 Selector는 SingleCoreIOReactor 객체가 초기화 되는 시점에 open 상태가 된다.)
(1) : 소켓 이벤트 중 준비가 된 모든 이벤트의 개수를 가져온다.
(2) : select() 메소드에 의해 감지된 준비된 이벤트들을 처리한다. selectedKeys()는 준비된 채널의 키를 포함하는 집합을 반환하고, 이 집합에 있는 각 키를 사용하여 적절한 I/O 작업(읽기, 쓰기, 연결 수립 등)을 수행한다.
(3) : 활성화된 채널들이 여전히 유효하고, 올바르게 작동하고 있는지 검증한다. 이는 채널이 여전히 연결 상태인지, 오류가 발생하지 않았는지 등을 확인할 수 있다.
(4) : 이미 종료된 세션들을 처리한다. 세션이 종료된 후 필요한 정리 작업을 수행하며, 관련 리소스의 해제를 포함한다.
(5) : 처리를 대기 중인 채널들을 처리한다. 이는 새로운 연결 요청이나 데이터 전송 요청 등, 아직 처리되지 않은 작업들을 완료하는 과정이다.
(6) : 연결을 대기 중인 요청들을 처리한다. 새로운 연결 요청이 들어왔을 때, 이를 수락하고 필요한 초기화 작업을 수행하는 등의 과정을 포함한다.
정리하자면 3번 작업에서 이벤트가 감지된 소켓들에 대해 작업을 처리하고 5, 6번 단계에서는 이벤트를 감지하는 역할을 한다. 내부적으로 selector에 register 메소드를 호출하여 이벤트에 넣어주는 코드를 확인할 수 있다.
이를 그림으로 나타내면 다음의 계층 구조를 갖는다. 보는 바와 같이 적은 스레드로 많은 요청을 처리할 수 있는 구조이다.
이렇게 IOReactor가 어떻게 하나의 스레드로 여러 소켓과 연결을 맺고 처리할 수 있는지 코드로 확인했다. 이 이벤트 루프라는 코드를 보니 어떻게 작은 스레드로 여러 요청을 처리할 수 있는지 이해할 수 있게 됐다. 레디스, node.js 또한 이벤트 루프를 사용하는데 이러한 추상적인 개념인 이벤트 루프를 좀 더 구체적인 개념으로 상상할 수 있게 되는 효과도 있는 것 같다.