Netty
Netty
Netty는 비동기 이벤트 기반 네트워크 애플리케이션 프레임워크로 고성능의 네트워크 프로그래밍을 지원하기 위해 이벤트를 활용해 비동기적으로 최소한의 리소스로 많은 연결을 처리할 수 있다. Spring 5.0에서 도입된 리액티브 프로그래밍 모델을 기반으로 동작하는 Spring WebFlux도 서버 구현을 위한 다양한 옵션을 제공하지만, Netty를 기본적으로 사용하는 서버 엔진이다.
Netty가 어떻게 이벤트 루프를 통해 각 I/O 작업(예: HTTP 요청, 응답)을 비동기적으로 처리하는지 알아보자.
Blocking I/O vs Non-Blocking I/O
네티는 Java NIO 기반의 고성능 네트워크 프레임워크이다. Java NIO의 Channel과 Selector를 사용하여 비동기, 논블로킹을 지원하기 때문에 Java IO의 블로킹과 Java NIO의 논블로킹에 대해
Java IO와 Blocking I/O
@Slf4j
public class JavaIOServer {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
try (ServerSocket serverSocket = new ServerSocket()) {
serverSocket.bind(new InetSocketAddress("localhost", 7777));
while (true) {
Socket clientSocket = serverSocket.accept();
byte[] requestBytes = new byte[1024];
InputStream in = clientSocket.getInputStream();
in.read(requestBytes);
log.info("request: {}", new String(requestBytes).trim());
OutputStream out = clientSocket.getOutputStream();
String response = "This is server";
out.write(response.getBytes());
out.flush();
}
}
}
}
Java 1.0에서 처음 도입된 Java IO는 byte 단위로 읽고 쓸 수 있는 Stream(InputStream과 OutputStream)에 대한 API를 제공한다. 이러한 최초의 자바 API는 블로킹 함수만 지원했다. 위 코드에 대한 특징은 다음과 같다.
- accept()는 ServerSocket에서 연결될 때까지 진행을 블로킹하며, 연결되면 클라이언트와 서버 간 통신을 위한 새로운 Socket 하나를 반환한다. 이후 들어오는 연결을 수신하는 역할은 ServerSocket이 재개한다.
- Socket의 입력과 출력 스트림으로부터 클라이언트가 소켓을 통해 보낸 데이터를 읽은 후, 클라이언트에게 다시 응답을 보낸다.
위 코드는 한 번에 한 연결만 처리한다. 어플리케이션이 read system call을 호출하면 kernel이 응답을 돌려줄 때까지 아무것도 할 수 없다. 즉, Blocking으로 동작하기 때문에 다수의 동시 클라이언트를 관리하려면 각 새로운 클라이언트 Socket마다 새로운 Thread를 할당해야 한다. 하지만 I/O 요청이 발생할 때마다 스레드를 새로 할당하면 스레드를 생성 및 관리하는 비용과 컨텍스트 스위칭으로 인한 CPU 자원이 소모된다.
Java NIO와 Non-Blocking I/O
위에서 살펴본 블로컹 모드의 소켓은 read, write, accept 메서드 등과 같은 입출력 메서드가 호출되면 완료될 때까지 스레드가 멈추게 되어 다른 처리를 할 수 없다. 이러한 단점을 해결하는 방식이 논블로킹 소켓이다.
Java 1.4에 도입된 Java NIO는 Non-blocking을 지원하고, Selector, Channel 도입으로 높은 성능을 보장한다. 또한, 위에서 살펴봤듯이 Java IO가 InputStream과 OutputStream으로 데이터를 단방향으로 처리할 수 있었다면 Java NIO는 Channel 하나로 데이터를 Input/Output 양방향으로 처리할 수 있다.
Channel
public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
public abstract SelectableChannel configureBlocking(boolean block) throws IOException;
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException { ... }
}
Channel은 I/O 작업을 수행하기 위한 핵심 개념으로 하나 이상의 입출력 작업(예: 읽기 또는 쓰기)을 수행할 수 있는 하드웨어 장치, 파일, 네트워크 소켓, 프로그램 컴포넌트와 같은 엔티티에 대한 열린 연결을 의미한다. 위에서 설명했듯이 양방향 통신(읽기와 쓰기를 동시에 지원)을 지원하고, 아래 Selector와 연동하여 단일 스레드에서 여러 채널을 관리할 수 있다.
위 그림을 확인하면 서버 측에서 클라이언트의 연결 요청을 처리하는 데 사용되는 ServerSocketChannel과 클라이언트와 서버 간의 데이터 송수신을 처리하는데 사용되는 SocketChannel이 SelectableChannel을 상속하고 있다. SelectableChannel은 다음 주요한 두 메서드를 지원하고 있다.
- configureBlocking
- 채널을 논블로킹 모드에서 동작하게 만들어, I/O 작업 중 블로킹되지 않고 애플리케이션의 성능을 향상시킬 수 있다.
- register
- Selector를 첫 번째 인자로 받고, 두 번째 인자로 관심있는 이벤트(accept, write, read 등)를 등록하여 Selector가 해당 채널을 관리할 수 있도록 할 수 있다.
Selector
Selector는 자바의 논블로킹 입출력 구현의 핵심으로서, 논블로킹 Socket의 집합에서 입출력이 가능한 항목을 지정하기 위해 이벤트 통지 API를 이용한다. 언제든지 읽기나 쓰기 작업의 완료 상태를 확인할 수 있고, 한 스레드로 여러 동시 연결을 처리할 수 있다. Selector는 자신에게 등록된 채널에 변경 사항이 발생했는지 검사하고 변경 사항이 발생한 채널에 대해 조회하여 접근을 가능하게 해준다.
이러한 모델은 블로킹 입출력 모델에 비해 전체적으로 훨씬 개선된 리소스 관리 효율을 보여준다.
- 적은 수의 스레드로 더 많은 연결을 처리할 수 있으므로 메모리 관리와 컨텍스트 전환에 따르는 오버헤드가 감소한다.
- 입출력을 처리하지 않을 때는 스레드를 다른 작업에 활용할 수 있다.
Java NIO를 활용한 Echo Server
@Slf4j
public class ServerSocketChannelSimpleExample {
@SneakyThrows
public static void main(String[] args) throws IOException {
// Stream 기반이 아닌 Channel 기반의 ServerSocket open(생성)
try (ServerSocketChannel serverChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
) {
// serverSocketChannel을 생성하고, localhost:8080으로 바인딩
SocketAddress address = new InetSocketAddress("localhost", 8080);
serverChannel.bind(address);
// serverChannel을 non-blocking으로 설정
serverChannel.configureBlocking(false);
// serverChannel의 Accept 이벤트(작업)을 selector에 등록
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 무한 루프를 통해서 지속적으로 채널의 작업들을 처리
while (true) {
selector.select(); // 준비될 때까지 blocking, busy-wait은 발생하지 X
// selector에서 준비가 완료된 작업 목록을 가져옴
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
// 준비가 완료된 작업들을 하나씩 처리
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
// 준비가 완료된 작업 목록에서 제외한다.
// iterator가 마지막으로 반환된 값을 제거
selectedKeys.remove();
// accept 이벤트 처리
if (key.isAcceptable()) { // 작업이 ACCEPT 라면
// accept 통해서 clientSocket에 접근
SocketChannel clientSocket = ((ServerSocketChannel) key.channel()).accept();
// clientSocket을 non-blocking으로 설정
clientSocket.configureBlocking(false);
// clientSocket의 read 작업을 selector에 등록
clientSocket.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { // 작업이 READ 라면
// clientSocket에 접근
SocketChannel clientSocket = (SocketChannel) key.channel();
// clientSocket으로부터 데이터 읽음
String requestBody = getRequestBody(clientSocket);
// clientSocket에 데이터 씀
sendResponse(clientSocket, requestBody);
}
}
}
}
}
private static String getRequestBody(SocketChannel clientSocket) throws IOException {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
clientSocket.read(requestBuffer);
requestBuffer.flip();
return new String(requestBuffer.array()).trim();
}
@SneakyThrows
private static void sendResponse(SocketChannel clientSocket, String requestBody) throws IOException {
log.info("request: {}", requestBody);
Thread.sleep(10);
String response = "received: " + requestBody;
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
clientSocket.write(responseBuffer);
responseBuffer.clear();
clientSocket.close();
}
}
처음에 ServerSocketChannel을 생성하여 논블로킹 모드로 설정하고, Selector를 생성하여 ServerSocketChannel에 대해 OP_ACCEPT(ServerSocketChannel에서 accept할 준비가 완료)를 관심있는 이벤트로 채널을 등록한다.
Selector 객체의 select 메서드에서 등록한 채널들 중 준비된 이벤트가 없다면 계속 스레드를 blocking하거나 준비가 완료된 작업이 있다면 다음 line으로 이동한다.
selectedKeys() 메서드를 통해 준비가 완료된 이벤트 목록을 Set으로 제공하여 iterator로 변경하여 하나씩 순회하며, 관심 있는 이벤트에 따라 원하는 작업을 실행한다.
OP_ACCEPT인 경우에는 SelectionKey 객체의 isAcceptable() 조회 시 true로 반환되어 연결된 SocketChannel을 생성하고 이를 다시 Selector에 OP_READ(socketChannel의 읽기 준비가 완료)를 관심있는 이벤트로 등록하여 준비가 완료되면 Selector를 통해 조회하여 처리할 수 있다.
소켓의 동작 방식이 블로킹 방식에서 논블로킹 방식으로 변경되면서 더 많은 클라이언트 연결을 수용할 수 있게 되었다. I/O 작업을 처리해야할 때 단일 스레드가 I/O 작업이 완료되기 전까지 대기하지 않고, 여러 채널의 I/O 이벤트를 처리하는 것을 Selector 기반 I/O 멀티플렉싱(Multiplexing)이라고 한다. 또한, 각 이벤트를 먼저 정의해두고 발생한 Selector를 사용하여 관심있는 이벤트가 발생하면 조회하여 발생한 이벤트에 따라 코드가 실행되도록 프로그램을 작성하는 것을 이벤트 프로그래밍의 한 종류라고 할 수 있다.
Netty의 핵심 컴포넌트
네티는 위에서 살펴본 Java NIO 위에 구축된 추상화 레이어라고 할 수 있다. 네티의 Channel은 Java NIO의 SocketChannel 및 ServerSocketChannel을 추상화하거나 EventLoop를 통해 NIO의 Selector를 관리하며, 이벤트 멀티플렉싱을 처리한다.
간단한 Echo 서버를 만들면서 Netty의 핵심 컴포넌트들에 대해 살펴보자.
Channel
public static void main(String[] args) {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup(4);
NioServerSocketChannel serverSocketChannel = new NioServerSocketChannel();
parentGroup.register(serverSocketChannel);
serverSocketChannel.pipeline().addLast(acceptor(childGroup));
ChannelFuture channelFuture = serverSocketChannel.bind(new InetSocketAddress(8080));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info("Server bound to port 8080");
} else {
log.info("Failed to bind to port 8080");
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
});
}
네티에서 Channel은 Socket으로 기본 입출력 작업(bind(), connect(), read(), write())할 때의 복잡성을 크게 완화하는 API를 제공하면서 pipeline 같은 추가적인 기능이 있다.
public interface ChannelFuture extends Future<Void> {
/**
* Returns a channel where the I/O operation assiociated with this
* future taskes place
*/
Channel channel();
// Channel I/O 작업이 완료되면 수행할 futureListener 등록
@Override
ChannelFuture addListener(
GenericFutureListener<? extends Future<? super Void>> listener);
// 등록된 futureListener 제거
@Override
ChannelFutuere removeListener(
GenericFutureListener<? extends Future<? super Void>> listener);
// 작업이 완료될 때까지 blocking
@Override
ChannelFuture sync() throws InterrupedException;
}
네티는 비동기 작업이 실행됐을 때 이용할 수 있는 자체 구현 ChannelFuture를 제공한다. ChannelFuture에는 ChannelFutureListener 인스턴스를 하나 이상 등록할 수 있는 추가 메서드가 있다. IO 작업이 완료되면 ChannelFuture를 통해서 리스너의 콜백 메서드인 operationComplete()가 호출되며, 이 시점에 리스너는 작업이 정상적으로 완료됐는지, 아니면 오류가 발생했는지 확인할 수 있다. 오류가 발생한 경우 생성된 Throwable을 가져올 수 있다. 즉, ChannelFutureListener가 제공하는 알림 메커니즘을 이용하면 작업 완료를 수동으로 검사할 필요가 없다.
네티의 모든 아웃바운드 입출력 작업은 ChannelFuture를 반환하며 진행을 블로킹하는 작업은 없다. ChannelFuture를 통해 Channel에서 I/O 연산이 완료되었을 때, 결과를 쉽게 조회하고 리스너를 추가할 수 있는 방법을 제공한다.
NioServerSocketChannel
EventLoopGroup parentGroup = new NioEventLoopGroup();
NioServerSocketChannel serverSocketChannel = new NioServerSocketChannel();
parentGroup.register(serverSocketChannel);
register 메서드를 사용하여 채널을 등록하면 내부적으로 NioServerSocketChannel 내 doRegister가 호출되면서 채널을 EventGroup에 존재하는 Selector에 등록이 된다.
이벤트 루프(Event Loop)
네티의 이벤트 루프 그룹(EventLoopGroup)과 이벤트 루프(EventLoop)는 이벤트 기반 비동기 아키텍처의 핵심 구성 요소이다. 이벤트 루프 그룹은 여러 이벤트 루프를 관리하는 컨테이너이며, 내부적으로 스레드 풀을 통해 각 이벤트 루프를 실행한다. 이를 통해 네트워크 I/O 작업 및 태스크 실행을 관리하는 데 중요한 역할을 한다.
다음 만들 에코 네티 서버 애플리케이션에서는 두 가지 이벤트 루프 그룹이 사용된다.
- BossGroup
- 클라이언트의 연결 요청을 수락하고, 새로운 연결을 생성한 뒤 WorkerGroup으로 위임한다.
- 보통 단일 스레드로 구성되어 ServerSocketChannel을 감시하여 연길 수락 이벤트(OP_ACCEPT)를 처리한다.
- WorkerGroup
- 실제 I/O 작업(읽기, 쓰기) 및 채널과 관련된 이벤트를 처리한다.
- 여러 스레드(NioEventLoop)로 구성되며, 각 스레드는 다수의 채널을 처리할 수 있다.
- 하나의 스레드에서 다수의 채널을 처리하기 때문에 Blocking I/O 작업이 들어오는 경우 해당 작업이 완료될 때까지 대기하기 때문에 다른 채널의 작업 처리 지연이 발생하여 성능에 부정적인 영향을 미칠 수 있다.
- 따라서 블로킹 작업은 별도의 스레드풀을 활용하거나 비동기 처리 방식으로 구현하는 것이 좋다.
- 각 클라이언트 연결은 WorkerGroup의 특정 이벤트 루프에 할당되며, 해당 이벤트 루프에서 모든 작업을 처리한다.
NIOEventLoop
네티의 단일 스레드 이벤트 루프를 사용하는 경우에는 이벤트 큐에 이벤트를 등록하고 단일 스레드 이벤트 루프가 이벤트 큐에 접근하여 처리하는 방법으로 구현이 단순하고 예측 가능한 동작을 보장해준다. NioEventLoopGroup 객체를 생성할 때에도 기본적으로 CPU 코어 * 2개의 싱글 스레드 이벤트 루프를 생성하고 관리한다.
// 주요 필드들(EventExecutor[], TaskQueue, Selector)이 있는 클래스 확인
public final class NioEventLoop extends SingleThreadEventLoop {
// ...
private Selector selector;
}
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// ...
private final Queue<Runnable> taskQueue;
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
}
네티에서 가장 큰 역할을 하고 있는 이벤트 루프는 EventExecutor, TaskQueue, Selector를 포함하고 있다.
- EventExecutor
- task를 실행하는 스레드풀로 위에서 확인할 수 있는 것처럼 이벤트 루프들이 들어 있다.
- TaskQueue
- task를 저장하는 큐로 EventExecutor가 즉시 task를 수행하지 않고 taskQueue에 넣은 후, 나중에 꺼내서 처리한다.
- Selector
- I/O Multiplexing을 지원한다.
- 관심있는 작업에 대한 IO 작업이 준비 완료되었을 떄, 이벤트 루프의 스레드가 채널 목록을 조회하여 처리한다.
NioEventLoopGroup 생성
NioEventLoopGroup 기본 생성자로 객체를 생성하면 MultithreadEventExecutorGroup 부모 클래스까지 올라가 해당 클래스에서 newChild 메서드를 통해 nThreads 개수만큼 NioEventLoop를 생성한다.
즉, 하나의 NioEventLoopGroup에 1개 이상의 NioEventLoop가 존재한다.
NioEventLoop 실행
@Slf4j
public class EchoNettyServer {
public static void main(String[] args) {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup(4);
NioServerSocketChannel serverSocketChannel = new NioServerSocketChannel();
parentGroup.register(serverSocketChannel);
serverSocketChannel.pipeline().addLast(acceptor(childGroup));
ChannelFuture channelFuture = serverSocketChannel.bind(new InetSocketAddress(8080));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info("Server bound to port 8080");
} else {
log.info("Failed to bind to port 8080");
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
});
}
}
Nio를 수행하는 ServerSocketChanneldmf todtjdgkrh ACCEPT network I/O 이벤트를 EventLoop에 등록한다.
NioEventLoop는 SingleThreadEventLoop를 상속하기 때문에 EventLoopGroup 객체에 register 메서드를 사용하여 ServerSocketChannel을 등록하면 SingleThreadEventLoop의 execute()가 실행되어 Single Thread로 run()이 실행된다. 또한 하나의 EventLoop에는 여러 Channel을 등록할 수 있고, I/O 이벤트 완료 시 Channel의 Pipeline이 실행된다.
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
run()이 실행되면 shutdown 시그널을 받기 전까지 무한 루프를 돌게 된다.
if (ioRatio == 100) { // ioRatio
try {
if (strategy > 0) {
processSelectedKeys(); // I/O 관련된 작업들을 먼저 처리
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks(); // 그 다음 Non I/O 작업 실행
}
}
ioRatio는 EventLoop Thread가 I/O 처리에 소비할 시간과 non I/O 처리에 소비할 시간의 비율을 정한다. 기본값은 50이며I/O task와 Non I/O task에 가능한한 동일한 시간을 소모한다. ioRatio가 100인 경우에는 이 기능이 disabled된다.
EventLoop 관계
EventLoop는 연결의 수명주기 중 발생하는 이벤트를 처리하는 네티의 핵심 추상화를 정의한다. 위 그림에서 확인할 수 있는 Channel, Event, Thread, EventLoopGroup 간의 관계는 다음과 같다. 아래 설계로 한 Channel의 입출력이 하나의 Event Loopd 동일한 Thread에서 처리되므로 동기화가 필요 없다.
- 한 EventLoopGroup은 하나 이상의 EventLoop를 포함한다.
- 한 EventLoop는 수명주기 동안 한 Thread로 바인딩된다.
- 한 EventLoop에서 처리되는 모든 입출력 이벤트는 해당 전용 Thread에서 처리된다.
- 한 Channel은 수명주기 동안 한 EventLoop에 등록할 수 있다.
- 한 EventLoop를 하나 이상의 Channel로 할당할 수 있다.
Channel은 EventLoop가 할당되면 할당된 EventLoop (및 연결된 Thread)를 수명주기 동안 이용한다. 덕분에 ChannelHandler 구현에서 동기화와 스레드 안정성에 대해 걱정할 필요가 없다.
하지만 ThreadLocal 이용을 하는 경우에는 EventLoop 하나가 둘 이상의 Channel에 이용되므로 ThreadLocal은 연결된 모든 Channel이 동일하기 때문에, 멀티 스레딩 모델에서 사용하던 방식과는 적합하지 않다. 이는 Context를 이용하여 해결할 수 있다.
ChannelPipeline
@Slf4j
public class EchoNettyServer {
public static void main(String[] args) {
// ...
serverSocketChannel.pipeline().addLast(acceptor(childGroup));
ChannelFuture channelFuture = serverSocketChannel.bind(new InetSocketAddress(8080));
// ...
}
private static ChannelInboundHandler acceptor(EventLoopGroup childGroup) {
EventExecutorGroup executorGroup = new DefaultEventExecutorGroup(4);
return new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("Acceptor.channelRead");
if (msg instanceof SocketChannel) {
SocketChannel socketChannel = (SocketChannel) msg;
socketChannel.pipeline().addLast(executorGroup, new LoggingHandler(LogLevel.INFO));
socketChannel.pipeline().addLast(
requestHandler(),
responseHandler(),
echoHandler());
// 소켓 채널을 childGroup에 등록(Selector가 READ 작업 감시하고 알림)
childGroup.register(socketChannel);
}
}
};
}
}
위 코드에서 생성한 ServerSockerChannel 객체에 채널 파이프라인을 등록한 것을 확인할 수 있다. 그러면 EventLoop에 Channel이 등록(register)된 후에 Channel의 관심있는 IO 이벤트가 발생하면 Selector를 통해 계속 감시하다 이벤트가 준비되면 Channel이 가지고 있는 ChannelPipeline을 사용하여 pipeline을 실행한다.
ChannelHandler
네티는 비동기 호출을 지원하기 위해 리액터 패턴의 구현체인 이벤트 핸들러를 제공하고, Channel의 I/O 이벤트를 처리하거나 I/O 작업을 수행하는 ChannelHandler가 있다. Reactor 패턴은 하나 이상의 클라이언트로부터의 요청을 동시 처리하기 위해서 사용하는 패턴을 말한다. reactor는 이벤트가 발생하기를 기다리고, 이벤트가 발생하면 eventHandler에게 이벤트를 보낸다.
네티에서 이벤트 핸들러는 네티의 소켓 채널에서 발생한 이벤트를 처리하는 인터페이스이다. 이 인터페이스를 상속받은 이벤트 핸들러를 작성하여 Channel 파이프라인에 등록하면 Channel 파이프라인으로 입력되는 이벤트를 EventLoop가 가로채어 이벤트에 해당하는 메서드를 수행하는 구조이다. 예를 들어, Echo 서버는 들어오는 메시지에 반응해야 하므로 인바운드 이벤트에 반응하는 메서드가 정의된 ChannelInboundHandler 인터페이스를 구현해야 한다.
인바운드 이벤트(Inbound Event)
네티는 소켓 채널에서 발생하는 이벤트를 인바운드 이벤트(Inbound Event)와 아웃바운드 이벤트(Outbound Event)로 추상화한다. 인바운드 이벤트는 소켓 채널에서 발생한 이벤트 중에서 연결 상대방이 어떤 동작을 취했을 때 발생한다. 예를 들면, 채널 활성화, 데이터 수신 등의 이벤트가 이에 해당한다.
위 그림은 클라이언트가 서버에 접속한 상태에서 서버로 데이터를 전송하는 상태를 보여준다. 서버의 관점에서는 데이터가 수싱되는데, 이때 네티는 소켓 채널에서 읽을 데이터가 있다는 이벤트를 채널 파이프라인으로 흘려보내고 채널 파이프라인에 등록된 이벤트 핸들러 중에서 인바운드 이벤트 핸들러가 해당 이벤트에 해당하는 메서드를 수행한다.
private static ChannelInboundHandler acceptor(EventLoopGroup childGroup) {
EventExecutorGroup executorGroup = new DefaultEventExecutorGroup(4);
return new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("Acceptor.channelRead");
if (msg instanceof SocketChannel) {
SocketChannel socketChannel = (SocketChannel) msg;
socketChannel.pipeline().addLast(executorGroup, new LoggingHandler(LogLevel.INFO));
socketChannel.pipeline().addLast(
requestHandler(),
responseHandler(),
echoHandler());
// 소켓 채널을 childGroup에 등록(Selector가 READ 작업 감시하고 알림)
childGroup.register(socketChannel);
}
}
};
}
private static ChannelInboundHandler requestHandler() {
return new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
try {
ByteBuf buf = (ByteBuf) msg;
int len = buf.readableBytes();
Charset charset = StandardCharsets.UTF_8;
CharSequence body = buf.readCharSequence(len, charset);
log.info("RequestHandler.channelRead: " + body);
ctx.fireChannelRead(body);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
};
}
네티는 인바운드 이벤트를 ChannelInboundHandler 인터페이스로 제공하는데 위 ChannelPipeline 코드에서 ChannelInboundHandler의 기본 구현을 제공하는 ChannelInboundHandlerAdapter의 하위 클래스를 생성했다. 관심을 가질 메서드는 다음과 같다.
- channelRead()
- 메시지가 들어올 때마다 호출된다.
- channelReadComplete()
- channelRead()의 마지막 호출에서 현재 일괄 처리의 마지막 메시지를 처리했음을 핸들러에 통보한다.
- exceptionCaught()
- 읽기 작업 중 예외가 발생하면 호출된다.
channelRead 메서드는 데이터가 수신되었음을 알려준다. 수신된 데이터는 네티의 ByteBuf 객체에 저장되어 있으며 이벤트 메서드의 . 두번쨰 인자인 msg를 통해서 접근할 수 있따. acceptor 메서드를 사용하여 NioServerSocketChannel 객체에 파이프라인으로 등록하고, 이벤트 수신이 발생했을 때 msg는 clientSocket이고, read와 write 작업을 위해 여러 이벤트 핸들러를 파이프라인에 추가하고, childGroup 객체에 존재하는 Selector에 등록했다.
ChannelInboundHandler 기반의 RequestHandler는 들어오는 ByteBuf로부터 CharSequence를 읽어서 ctx.fireChannelRead 메서드를 사용하여 다음 InboundHandler에게 전달한다.
private static ChannelInboundHandler echoHandler() {
return new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof String) {
String request = (String) msg;
log.info("EchoHandler.channelRead: " + request);
ctx.writeAndFlush(request)
.addListener(ChannelFutureListener.CLOSE);
}
}
};
}
ChannelHandlerContext는 네티 객체에 대한 상호작용을 도와주는 인터페이스다. 첫 번쨰는 채널에 대한 입출력 처리이다. 위 예와 같이 ChannelHandlerContext의 writeAndFlush 메서드로 채널에 데이터를 기록하고, 현재 위치한 ChannelInboundHandler에서 Outbound 방향으로 전환하겠다는 의미이다. 또는 ChannelHandlerContext의 close 메서드로 채널의 연결을 종료할 수 있다.
위 EchoHandler에서는 Inbound된 msg를 출력하고 Outbound 방향으로 전환하였다.
아웃바운드 이벤트(Outbound Event)
private static ChannelOutboundHandler responseHandler() {
return new ChannelOutboundHandlerAdapter() {
@Override
public void write(
ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof String) {
log.info("ResponseHandler.write: " + msg);
String body = (String) msg;
Charset charset = StandardCharsets.UTF_8;
ByteBuf buf = ctx.alloc().buffer();
buf.writeCharSequence(body, charset);
ctx.write(buf, promise)
.addListener(ChannelFutureListener.CLOSE);
}
}
};
}
아웃바운드 이벤트는 소켓 채널에서 발생한 이벤트 중에서 네티 사용자(프로그래머)가 요청한 동작에 해당하는 이벤트를 말하며, 연결 요청, 데이터 전송, 소켓 닫기 등이 이에 해당된다. 네티는 아웃바운드 이벤트를 ChannelOutboundHandler 인터페이스로 제공한다. 또한 모든 ChannelOutboundHandler 이벤트는 ChannelHandlerContext 객체를 인수로 받는다.
위 ChannelOutboundHandler 기반의 ResponseHandler는 나가는 String을 ByteBuf로 바꿔서 값을 집어넣고 ctx.write 메서드를 사용하여 데이터를 다음 Handler에게 전달한다.
위 이벤트 흐름을 그림을 사용하면 위와 같다.
전체 코드
@Slf4j
public class EchoNettyServer {
public static void main(String[] args) {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup(4);
NioServerSocketChannel serverSocketChannel = new NioServerSocketChannel();
parentGroup.register(serverSocketChannel);
serverSocketChannel.pipeline().addLast(acceptor(childGroup));
ChannelFuture channelFuture = serverSocketChannel.bind(new InetSocketAddress(8080));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info("Server bound to port 8080");
} else {
log.info("Failed to bind to port 8080");
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
});
}
private static ChannelInboundHandler acceptor(EventLoopGroup childGroup) {
EventExecutorGroup executorGroup = new DefaultEventExecutorGroup(4);
return new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("Acceptor.channelRead");
if (msg instanceof SocketChannel) {
SocketChannel socketChannel = (SocketChannel) msg;
socketChannel.pipeline().addLast(executorGroup, new LoggingHandler(LogLevel.INFO));
socketChannel.pipeline().addLast(
requestHandler(),
responseHandler(),
echoHandler());
// 소켓 채널을 childGroup에 등록(Selector가 READ 작업 감시하고 알림)
childGroup.register(socketChannel);
}
}
};
}
private static ChannelInboundHandler requestHandler() {
return new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
try {
ByteBuf buf = (ByteBuf) msg;
int len = buf.readableBytes();
Charset charset = StandardCharsets.UTF_8;
CharSequence body = buf.readCharSequence(len, charset);
log.info("RequestHandler.channelRead: " + body);
ctx.fireChannelRead(body);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
};
}
private static ChannelOutboundHandler responseHandler() {
return new ChannelOutboundHandlerAdapter() {
@Override
public void write(
ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof String) {
log.info("ResponseHandler.write: " + msg);
String body = (String) msg;
Charset charset = StandardCharsets.UTF_8;
ByteBuf buf = ctx.alloc().buffer();
buf.writeCharSequence(body, charset);
ctx.write(buf, promise)
.addListener(ChannelFutureListener.CLOSE);
}
}
};
}
private static ChannelInboundHandler echoHandler() {
return new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof String) {
String request = (String) msg;
log.info("EchoHandler.channelRead: " + request);
ctx.writeAndFlush(request)
.addListener(ChannelFutureListener.CLOSE);
}
}
};
}
}
출처
네티 인 액션
자바 네트워크 소녀 네티
Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지