리액티브 프로그래밍이란?
리액티브 시스템은 비동기 메시지 통신을 기반으로 한다. 비동기 메시지 통신은 Blocking I/O 방식이 아닌 Non-Blocking I/O 방식의 통신
특징
- 선언형 프로그래밍
- 선언형 프로그래밍 방식은 실행할 동작을 구체적으로 명시하지 않고 이러이러한 동작을 하겠다는 목표만 선언한다.
- C언어나 Java는 명령형 프로그래밍 방식이다. 실행할 동작을 구체적으로 명시하는 프로그래밍 코드 형태.
- 데이터 스트림
- 데이터 스트림이란 데이터가 지속적으로 발생한다는 의미
public class Examplel_1 {
public static void main (String[] args) {
List<Integer> numbers = Arrays asList(1, 3, 21, 10, 8, 11);
int sum = 0 ;
for(int number : numbers){
if number > 6 && (number % 2 ! = 0)) {
sum += number;
}
}
System.out.println(7|: " + sum);
}
}
public class Examplel_2 {
public static void main (String[] args) {
List<Integer> numbers = Arrays asList (1, 3, 21, 10, 8, 11);
int sum = numbers.stream()
.filter (number -> number > 6 && (number % 2 ! = 0))
.mapToInt (number -> number)
.sum();
System.out. println("sum: " + sum) ;
}
}
예제 1은 명령형, 예제 2는 선언형프로그래밍 방식이다.
예제 2에서는 마치 'for문을 돌면서 numbers List에 포함된 숫자들에 하나씩 접근하겠어'라고 내가 할 동작을 직접 설명하기보다는 'numbers List에 포함된 숫자들에 접근을 좀 해줘'라고 내가 아닌 다른 누군가에게 부탁하는 것과 비슷하다.
그리고 코드 1-1에서는 if문을 사용한 반면에, 코드 1-2에서는 if문 대신에 filter 메서드를 사용해서 조건에 맞는 데이터를 필터링하며(5번 라인), Sum 메서드를 사용하여 filter 메서드에서 필터링된 숫자들의 합계를 한다.
코드 1-1에서는 조건에 맞는 숫자들을 sum 변수에 하나씩 더하는 동작을 직접 했지만, 코드 1-2에서는 Sum이라는 메서드를 선언만 했지 구체적인 숫자를 더 하는 동작은 스트림 내부에서 대신 처리해 주는 모습을 볼 수 있다.
리액티브 프로그래밍 코드 구성
Publisher : 데이터를 제공하는 역할
Subscriber : Publisher가 제공한 데이터를 전달받아서 사용하는 주체
Data Source : Publisher의 입력으로 들어오는 데이터를 대표하는 용어
Operator : Publisher Subscriber 사이에서 가공 처리를 담당
리액티브 스트림즈
개발자가 리액티브한 코드를 작성하기 위해서는 코드 구성을 용이하게 해주는 라이브러리가 있어야한다. 라이브러리를 어떻게 구현할지 정의해 놓은 별도의 표준 사양이 리액티브 스트림즈이다.
리액티브 스트림즈를 구현한 구현체로 RxJava, Reactor, Akka Streams, Java 9 Flow API 등이 있다.
Spring framework는 Reactor와 궁합이 가장 잘 맞는다.
리액티브 스트림즈 구성 요소
| 컴포넌트 | 설명 |
|---|---|
| Publisher | 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다. |
| Subscriber | 구독한 Publisher로부터 통지(발행, 게시, 방출)된 데이터를 전달받아서 처리하는 역 할을 한다. |
| Subscription | Publisher에 요청할 데이터의 개수를 지정하고, 데이터의 구독을 취소하는 역할을 한다. |
| Processor | Publisher와 Subscriber의 기능을 모두 가지고 있다. 즉, Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscriber가 구독할 수 있다. |
Publisher와 Subscriber간의 데이터 전달 과정은 다음과 같다.
- 먼저 Subscriber는 전달받을 데이터를 구독한다(subscribe).
- 다음으로 Publisher는 데이터를 발행할 준비가 되었음을 Subscriber에 알린다(onSubscribe).
- Publisher가 데이터를 통지할 준비가 되었다는 알림을 받은 Subscriber 는 전달받기를 원하는 데이터의 개수를 Publisher에게 요청합니다.(Subscription.request)
- 다음으로 Publisher는 Subscriber로부터 요청받은 만큼의 데이터를 발행한다(onNext)
- 이렇게 Publisher와 Subscriber 간에 데이터 통지, 데이터 수신, 데이 터 요청의 과정을 반복하다가 Publisher가 모든 데이터를 통지하게 되 면 마지막으로 데이터 전송이 완료되었음을 Subscriber에게 알린다 (onComplete). 만약에 Publisher가 데이터를 처리하는 과정에서 에러가 발생하면 에러가 발생했음을 Subscriber에게 알린다(onError).
Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s) ;
}
kafka의 경우 Publisher와 Subscriber 사이에 브로커가 존재하여 토픽을 바라보면 되지만, 리액티브 스트림즈에서는 브로커가 없기때문에 Publisher가 Subscriber를 등록하는 형태로 구독이 이루어진다.
Subscriber
public interface Subscriber<T> {
public void onSubscribe (Subscription s);
public void onNext(T t);
public void onError (Throwable t) ;
public void onComplete();
}
- onSubscribe 메서드는 구독 시작 시점에 어떤 처리를 하는 역할을 한다. 여기서의 처리는 Publisher에게 요청할 데이터의 개수를 지정하거나 구독을 해지하는 것을 의미하는데, 이것은 onSubscribe 메서드의 파라미터로 전달 되는 Subscription 객체를 통해서 이루어진다.
- onNext 메서드는 Publisher가 통지한 데이터를 처리하는 역할을 한다.
- onError 메서드는 Publisher가 데이터 통지를 위한 처리 과정에서 에러가 발생했을 때 해당 에러를 처리하는 역할을 한다.
- onComplete 메서드는 Publisher가 데이터 통지를 완료했음을 알릴 때 호출 되는 메서드이다.
Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
Subscription가 구독한 데이터의 개수를 요청하거나, 구독을 해지하는 역할을 한다.
용어 설명
signal : Publisher와 Subscriber가 주고받는 상호작용
demand : Subscriber가 Publisher에게 요청하는 데이터를 의미
emit : Publisher의 데이터 발행
Upstream/Downstream
public class Example2_5 {
public static void main (Stringll args) {
Flux
.just (1, 2, 3, 4, 5, 6)
.filter (n -> n % 2 == 0)
.map (n →> n * 2)
.subscribe(System.out::println) ;
}
}
.just().filter().map.subcribe() 와같이 하나로 연결된 것 처럼 보이는것을 메서드 체인이라고 한다.
map을 기준으로 예를 들면 map 위에 있는 filter, just는 upstream이고 subscribe는 downstream이라고 할 수 있다.
sequence : publisher가 emit하는 데이터의 연속적인 흐름을 정의, flux를 통해 데이터를 생성, emit하고 filter메서드를 통해 필터링 후 map메서드를 통해 변환하는 과정 자체를 sequence라 부름
source : '최초의' 라는 의미로 사용. original이라는 용어로도 사용함
publisher 구현을 위한 주요 기본 규칙
| 번호 | 규칙 |
|---|---|
| 1 | Publisher가 Subscriber에게 보내는 onNext signal의 총 개수는 항상 해당 Subscriber의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다. |
| 2 | Publisher는 요청된 것보다 적은 수의 onNext signal을 보내고 onComplete 또는 onError를 호출하여 구독을 종료할 수 있다. |
| 3 | Publisher의 데이터 처리가 실패하면 onError signal을 보내야 한다. |
| 4 | Publisher의 데이터 처리가 성공적으로 종료되면 onComplete signal을 보내야 한다. |
| 5 | Publisher가 Subscriber에게 onError 또는 onComplete signal을 보내는 경우 해당 Subscriber의 구독은 취소된 것으로 간주되어야 한다. |
| 6 | 일단 종료 상태 signal을 받으면(onError, onComplete) 더 이상 signal이 발생되지 않아야 한다. |
| 7 | 구독이 취소되면 Subscriber는 결국 signal을 받는 것을 중지해야 한다. |
Subscriber 구현을 위한 기본 규칙
| 번호 | 규칙 |
|---|---|
| 1 | Subscriber는 Publisher로부터 onNext signal을 수신하기 위해 Subscription.request(n) 를 통해 Demand signa을 Publisher에게 보내야 한다. |
| 2 | Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 Subscription 또는 Publisher의 메서드를 호출해서는 안 된다. |
| 3 | Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 signal을 수신한 후 구 독이 취소된 것으로 간주해야 한다. |
| 4 | 구독이 더 이상 필요하지 않은 경우 Subscriber는 Subscription.cancel()을 호출해야 한다. |
| 5 | Subscriber.onSubscribe( )는 지정된 Subscriber에 대해 최대 한 번만 호출되어야 한다. |
Subscription 구현을 위한 주요 기본 규칙
| 번호 | 규칙 |
|---|---|
| 1 | 구독은 Subscriber가 onNext 또는 onSubscribe 내에서 동기적으로 Subscription request 를 호출하도록 허용해야 한다. |
| 2 | 구독이 취소된 후 추가적으로 호출되는 Subscription.request(ong n)는 효력이 없어야 한다. |
| 3 | 구독이 취소된 후 추가적으로 호출되는 Subscription.cancel()은 효력이 없어야 한다. |
| 4 | 구독이 취소되지 않은 동안 Subscription.requestong n)의 매개변수가 0보다 작거나 같 으면 javalang.legalArgumentException과 함께 onError signal을 보내야 한다. |
| 5 | 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher가 Subscriber에게 보내는 signal을 결국 중지하도록 요청해야 한다. |
| 6 | 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다. |
| 7 | Subscription.cancel(), Subscription.request() 호출에 대한 응답으로 예외를 던지는 것 을 허용하지 않는다.. |
| 8 | 구독은 무제한 수의 request 호출을 지원해야 하고 최대 2^63-1개의 Demand를 지원해야 한다. |
Reactor
Reactor는 Srping Framework 팀의 주도하에 개발된 리액티브 스트림즈의 구현체이다. Spring WebFlux 프레임워크에 라이브러리로 포함되어있다.
Reactor의 Publisher타입은 크게 2가지이다.
- Flux[N] : n은 n개, 즉 무한대의 데이터를 emit할 수 있다.
- Mono[0|1] : 0개 또는 1개 의 데이터만 emit하는 단발성 데이터에 특화된 퍼블리셔이다.
Reactor는 Backpressure-Ready network라는 publicsher로부터 전달받은 데이터를 처리하는 데 있어 과부하가 걸리지 않도록 제어하는 기능을 제공
public class Example5_1 {
public static void main (Stringll args) {
Flux<String> sequence = Flux.just("Hello", "Reactor");
sequence.map(data -> data. toLowerCase ( ))
.subscribe(data -> System.out.println(data));
}
}
"Hello", "Reactor" -> 데이터 소스
Flux -> Publisherdata -> System.out.println(data) -> subscriber.just("Hello", "Reactor"), data -> data. toLowerCase ( ) -> Operator
'Web' 카테고리의 다른 글
| ElasticSearch를 이용한 검색 기능 만들어보기 - 1 (1) | 2025.06.20 |
|---|---|
| ElasticSearch를 이용한 검색 기능 만들어보기 - 0 (0) | 2025.06.18 |
| 헥사고날 아키텍처에 관한 생각 (1) | 2025.06.09 |
| 유튜브 설계 (가상 면접 사례로 배우는 대규모 시스템 설계) (1) | 2025.06.04 |
| 검색어 자동완성 시스템 설계 (가상 면접 사례로 배우는 대규모 시스템 설계) (0) | 2025.06.02 |