티스토리 뷰
Reactive Streams
The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
출처: reactive-stream-jvm github
Reactive Streams는 비동기 데이터 스트림을 Non-blocking 방식으로 처리하면서, 백프레셔(Backpressure)를 지원하는 표준으로 Publisher는 Subscriber에게 비동기적으로 이벤트를 전달한다. Reactive Streams를 구현한 구현체로 RxJava, Reactor, Akka Streams, Java 9 Flow API 등이 있다.
Reactive Streams 구성 요소
표준 사양에는 구현체들이 다음과 같은 공통적으로 구현해야 하는 구성 요소들이 있다.
- Publisher
- 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다.
- 단, 구독자(Subscriber)가 요청해야만 데이터를 발행(onNext)한다.
- Subscriber
- 구독한 Publisher로부터 통지(발행, 게시, 방출)된 데이터를 전달받아서 처리하는 역할을 한다.
- 구독 시 요청량을 지정(request(n))할 수 있다.
- Subscription
- Publisher에 요청할 데이터의 개수를 지정하고, 데이터의 구독을 취소하는 역할을 한다.
- Processor
- Publisher와 Subscriber의 기능을 모두 가지고 있다.
- 즉, Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscriber가 구독할 수 있다.
Publisher
package org.reactivestreams;
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Publisher<T>는 T 타입의 데이터를 스트리밍하는 생산자 역할을 하며, Publisher는 다음과 같은 특징이 있다.
- subscribe(Subscriber<? super T> s) 메서드를 사용하여 구독을 시작할 수 있다.
- 파라미터로 전달받은 Subscriber를 등록하는 역할을 한다.
- 해당 메서드는 여러 번 호출될 수 있으며, 호출할 때마다 새로운 Subscription 인스턴스를 생성한다.
- 각각의 Subscription은 하나의 Subscriber만을 위해 동작한다.
- Subscriber는 하나의 Publisher에 대해서 한 번만 구독해야 한다. 여러 번 구독하면 명세 위반이 된다.
- 구독이 거부되거나 실패하면 Publisher는 Subscriber.onError(Throwable) 메서드를 통해 오류를 전달한다
Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscriber<T>는 Publisher로부터 T 타입의 데이터를 수신하는 소비자 역할을 하며, 각 메서드는 다음과 같은 역할을 한다.
- onSubscribe(Subscription s)
- Publisher.subscribe() 호출 후 가장 먼저 실행되는 메서드이다.
- Subscriber와 Publisher 사이에 연결이 시작될 때 호출되며, Subscription 객체를 전달한다.
- 전달받은 Subscription을 이용해 데이터를 요청해야 데이터가 흐르기 시작한다.
- Subscription.request(n)를 호출하지 않으면 아무런 데이터도 수신받지 못한다.
- Publisher.subscribe() 호출 후 가장 먼저 실행되는 메서드이다.
- onNext(T t)
- Publisher가 request(n)에 응답하기 위해 데이터를 생성하고 데이터를 전달할 때 호출한다.
- 데이터를 받을 때마다 호출되며, Subscriber가 해당 메서드에서 데이터를 처리한다.
- onError(Throwable t)
- 데이터 스트림 처리 중 오류가 발생하여 종료될 때 호출된다.
- 오류 정보를 전달하고 Publisher와 Subscriber의 연결이 종료된다.
- onComplete()
- 정상적으로 모든 데이터가 전달되어 더 이상 전달할 데이터가 없을 때 호출된다.
- onError()와 마찬가지로 호출 이후에는 더 이상 이벤트가 발생하지 않는다.
Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
Subscription은 Subscriber가 Publisher에게 데이터를 요청(request)하거나 중단(cancel)할 수 있게 해주는 인터페이스로 데이터 흐름을 제어하는 역할을 한다. 각 메서드는 다음과 같은 역할을 한다.
- request(long n)
- Publisher에게 n개의 데이터를 요청한다.
- Subscriber가 직접 호출해야 데이터가 전달되기 시작하며, 해당 메서드를 사용하기 전까지는 데이터가 전달되지 않는다.
- 요청한 수보다 적게 도착할 수 있으며, 이 경우 데이터 스트림은 onError()나 onComplete()로 종료된다.
- cancel()
- Subscriber가 데이터 요청을 취소하고 연결을 종료하여 스트림을 중단한다.
- 단, cancel() 호출 전 이미 request()된 데이터는 계속 도착할 수 있다.
- Subscriber가 더 이상 데이터를 받지 않거나 에러가 발생한 경우 호출한다.
Reactive Streams 동작 흐름

위 그림은 Reactive Streams의 구성 요소인 Publisher와 Subscriber 간에 데이터가 전달되는 동작 과정이다.
- Subscriber가 subscribe()를 호출해 구독을 요청한다.
- Publisher는 내부적으로 Subscription 객체를 생성하고, 데이터를 통지(발행, 게시, 방출)할 준비가 되었음을 Subscriber에게 알린다.
- Subscriber는 Subscription을 사용해 request(n)을 호출하여 Publisher에게 n개의 데이터 수를 요청한다.
- Publisher는 요청받은 만큼의 데이터를 생성하고, 각각에 대해 onNext(T)를 호출하여 데이터를 통지한다.
- 모든 데이터가 정상적으로 전달되면 onComplete()를 호출하여 스트림을 종료한다.
- 처리 중 문제가 발생하면 onError(Throwable)를 호출하여 오류 종료한다.
- Subscriber는 필요 시 cancel()을 호출하여 구독을 중단하고 Publisher는 더 이상 데이터를 전송하지 않고 내부 자원을 정리한다.
Reactor
Reactor는 리액티브 프로그래밍을 위한 라이브러리로 Spring Framework 팀 주도하에 개발된 리액티브 스트림즈(Reactive Streams)의 구현체로서 비동기 데이터 스트림 처리를 지원한다. Reactor의 특징은 다음과 같다.
- Reactor Streams 사양을 구현하여 비동기, 논 블로킹 데이터 스트림 처리를 지원한다.
- Publisher와 Subscriber 간의 상호작용은 Java의 함수형 프로그래밍 API를 통해 이루어진다.
- Publisher 타입으로 Mono, Flux를 지원하여 데이터를 emit할 수 있다.
- Mono: 데이터를 한 건을 emit하거나 한 건도 emit하지 않을 수 있다.
- Flux: N개의 데이터를 emit할 수 있다.
- Publisher로부터 전달되는 대량의 데이터를 Subscriber가 적절히 처리하는데 있어 과부하가 걸리지 않도록 Backpressure를 지원하여 논블로킹 환경에서도 안정적으로 데이터 스트림을 관리할 수 있다.

Mono와 Flux
Reactor에서 Publisher 구현체로 Mono와 Flux를 제공한다.
Flux

Flux는 Subscriber에게 0개부터 n개의 데이터를 전달하고, onComplete, onError 신호를 전달하면 연결을 종료하는 Reactive Streams의 Publisher 구현체이다.
Flux<String> flux = Flux.just("A", "B", "C")
.map(value -> {
int count = counter.incrementAndGet();
return count + ": " + value;
});
flux.subscribe(System.out::println); // 1: A, 2: B, 3: C
flux.subscribe(System.out::println); // 4: A, 5: B, 6: C
Flux 연산자 내부에서 사용하는 람다 표현식에 상태(state)를 사용하는 것을 지양해야 한다. Flux의 map, filter, flatMap 같은 연산자에서 사용하는 람다 함수는 내부적으로 재사용되거나 공유될 수 있기 때문에 변수(상태)를 보관하거나 변경하는 로직을 넣는 것은 위험하다. 위 예시처럼 여러 Subscriber가 동시에 동일한 연산자와 람다를 공유하면서 실행되면, 상태 충돌이나 예상치 못한 동작이 발생할 수 있다.

위 코드처럼 작성한다면 doOnNext에서 로그가 출력이 될까? "Value is never used as Publisher"라는 메세지에서 확인할 수 있듯이 Flux 스트림만 생성되었을 뿐, 아무데도 사용(실행)되지 않고 끝났다는 것을 알 수 있다. 이 코드를 실행하기 위해서는 Publisher에 subscribe하여 구독하여 데이터 스트림을 시작하는 것이 중요하다.
Mono

Mono는 Subscriber에게 최대 1개의 데이터만 방출하고, onComplete, onError 신호를 전달하면서 연결을 종료하는 Reactive Streams의 Publisher 구현체이다. 즉, onNext(T)를 바로 호출한 이후 곧바로 onComplete()를 호출하도록 설계되어 있다. 일반적으로 최대 1개의 데이터가 예상되면 Mono를, 다수일 경우에는 Flux를 사용한다.
Reactor에서 자주 사용되는 오퍼레이터로 flatMap은 단일 값을 보장하여 Mono를 반환하고, 여러 개의 값을 방출할 수 있는 경우엔 flatMapMany를 사용해서 Flux를 반환하도록 해야 한다.
public Mono<Void> saveProduct(Product product) {
return repository.save(product).then(); // Mono<Void> 반환
}
Mono는 하나의 값 혹은 객체를 필요로 하는 경우에 사용될 수도 있지만 위 코드처럼 Mono<Void>를 통해서 Publisher가 값을 반환하는 값 없이 완료된 시점을 전달해야 하는 경우에도 사용될 수 있다.
Flux 동작 흐름
@Slf4j
public class Example {
public static void main(String[] args) {
Flux<String> sequence = Flux.just("Hello", "Reactor");
sequence.map(data -> data.toLowerCase())
.subscribe(data -> log.info(data));
}
}
Flux를 통해 "Hello"와 "Reactor" 문자열 스트림을 생성하고, 각 문자열을 소문자로 변환한 후 로그를 남기는 예시 코드를 통해 Reactive Streams 동작 흐름으로 살펴봤던대로 코드를 확인해보려고 한다.


1. Flux 데이터 스트림 객체의 subscribe() 메서드를 디버깅하다보면 위 메서드에서 CorePublisher와 CoreSubscriber로 변환한 후 publisher.subscribe(Subscriber)를 실행하여 Subscriber가 Publisher에게 구독을 요청하는 것을 확인할 수 있다.
그리고 중간에 LambdaSubscriber 객체를 생성 시 onNext 실행 시 사용할 비즈니스 로직, onComplete 실행 시 사용할 비즈니스 로직, onError 실행 시 비즈니스 로직 등을 Consumer 객체로 받고 있다.


2. Publisher(FluxArray)는 Subscription(ArraySubscription)을 생성하고, s.onSubscribe(Subscription)을 실행하여 Subscriber에게 데이터를 방출할 준비가 되었음을 알린다.

3. 1번 subscribe 실행 시 만들어졌던 LambdaSubscriber 내 onSubscribe 메서드가 실행되는데 Subscription을 통해 s.request(n)을 호출하여 Publisher에게 Long.MAX_VALUE 만큼의 데이터 수를 요청하고 있다.

4. Subscription 내 request -> fastPath 메서드를 살펴보면 현재 array의 수만큼 루프를 돌면서 각각의 데이터에 대해 s.onNext(T)를 실행하여 Subscriber에게 데이터를 방출하고 있다.

5. 해당 메서드는 LambdaSubscriber 내에 있는 메서드로 onNext 메서드 내부를 살펴보면 LambdaSubscriber 생성 시 넘겨줬던 Consumer(data -> log.info(data))를 accept 메서드에 통지받은 데이터를 넘겨주면서 호출하여 실행하는 것을 확인할 수 있다.

디버깅을 통해 해당 로직까지 실행하고 있는 것을 확인했다.

그 다음 Publisher에서 방출할 Reactor 데이터도 동일한 로직을 실행하고 있다.

6. 중간에 구독이 취소되는 일 없이(cancelled: false) 모든 데이터가 정상적으로 전달되어 onComplete()을 실행하는 것을 확인할 수 있다. onComplete()에 대한 Consumer 객체는 넘겨주지 않았기 때문에 onComplete에 대한 비즈니스 로직은 실행되지 않는다.
출처
An Intro to Spring WebFlux Threading Model
Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
'Java > Java' 카테고리의 다른 글
HTTP Request 파싱하기 (0) | 2023.09.21 |
---|---|
Socket을 이용해 HTTP Server 만들기 with 병렬 처리 (0) | 2023.09.20 |
Spring MVC와 Spring WebFlux로 알아보는 Blocking I/O vs Non-blocking I/O (0) | 2023.09.05 |
Deadlock 상황 만들어 VisualVM으로 분석하고, 해결하기 (0) | 2023.09.03 |
소켓을 활용한 Echo Server 만들기 (0) | 2023.09.01 |
- Total
- Today
- Yesterday
- redis session
- pessimistic lock
- spring session
- Synchronized
- 분산 락
- 비관적 락
- Kafka
- TDD
- 구름톤 챌린지
- 웹플럭스 리액터
- sql
- nginx
- spring webflux
- 넥스트스탭
- annotation
- 카프카
- 트랜잭션
- mdcfilter
- jvm 메모리 구조
- mono flux
- nginx configuration
- 구름톤챌린지
- transaction
- Java
- mysql
- postgresql
- 리액티브 스트림즈
- 람다
- NeXTSTEP
- socket
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |