수년 전까지 대규모 애플리케이션은 수십 대의 서버, 기가바이트의 데이터, 수초의 응답 시간, 당연히 여겨졌던 몇 시간의 유지보수 시간 등의 특징을 가졌지만 오늘날에는 다음과 같은 세 가지 이유로 상황이 변하고 있다.
- 빅데이터 : 엄청난 양의 빅 데이터를 처리할 일이 많아졌다.
- 다양한 환경 : 모바일 디바이스부터 수천 개의 멀티 코어 프로세서로 실행되는 클라우드 환경까지 다양하게 분포되어있다.
- 사용 패턴 : 언제 어디서나 접근 가능한 밀리초 단위의 응답 시간을 기대하는 서비스가 많아졌다.
위와 같은 이유로 패러다임에 맞게 설계된 애플리케이션은 발생한 데이터 항목을 바로 처리함으로 사용자에게 높은 응답성을 제공한다.
리액티브 애플리케이션과 시스템 개발의 핵심 원칙
- 반응성 (responsive) : 리액티브 시스템은 빠를 뿐 아니라 일정하고 예상할 수 있는 반응 시간을 제공하여 사용자가 기대치를 가질 수 있도록 해야한다.
- 회복성 (resilient) : 장애가 발생하더라도 시스템은 반응해야 한다. 컴포넌트 실행 복제, 여러 컴포넌트의 시간과 공간분리, 각 컴포넌트가 비동기적으로 작업을 다른 컴포넌트에 위임하는 등 다양한 방법을 통해 시스템이 반응 할 수 있도록 해야 한다.
- 탄력성 (elastic) : 리액티브 시스템에서는 무거운 작업 부하가 발생한다면 자동으로 관련 컴포넌트에 할당된 자원 수를 늘릴 수 있어야 한다.
- 메시지 주도 (message-driven) : 회복성과 탄력성을 지원하기 위해 약한 결합도와 강한 응집력을 지원할 수 있도록 시스템을 구성하는 컴포넌트의 경계를 명확하게 정의 해야 한다.
이벤트 스트림을 블록하지 않고 비동기로 처리하는 것이 최신 멀티코어 CPU의 사용률을 극대화 할 수 있는 방법인데 이 목표를 달성할 수 있도록 리액티브 프레임워크와 라이브러리는 스레드를 퓨처, 액터, 일련의 콜백을 발생시킬 수 있는 이벤트 루프 등과 공유하고 처리할 이벤트를 변환하고 관리한다.
이러한 기술들은 스레드보다 가벼울 뿐 아니라 개발자에게 큰 이득을 제공한다. 개발자 입장에서는 이러한 기술을 이용함으로 동기, 비동기 애플리케이션 구현의 추상 수준을 높일 수 있으며 동기 블록, 경쟁 조건, 데드락 과 같은 저 수준의 멀티스레드 API 호출 문제를 직접 처리할 필요가 없어지면서 비즈니스 요구사항을 구현하는데 더 집중할 수 있다.
리액티브 애플리케이션 과 리액티브 시스템
리액티브 시스템은 여러 애플리케이션이 한 개의 일관적인, 회복할 수 있는 플랫폼을 구성할 수 있게 해줄 뿐 아니라 이들 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 도와주는 소프트웨어 아키텍처이다. 리액티브 애플리케이션은 비교적 짧은 시간 동안만 유지되는 데이터 스트림에 기반한 연산을 수행하며 보통 이벤트 주도로 분류된다면 리액티브 시스템은 애플리케이션을 조립하고 상호소통을 조절한다. 따라서 리액티브 시스템의 주요 속성으로 메세지 주도를 뽑을 수 있다.
이것은 모든 컴포넌트가 수신자의 위치에 상관없이 다른 모든 서비스와 통신 할 수 있음을 의미한다.
리액티브 프로그래밍의 역압력
리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그램이기 때문에 리액티브 스트림의 무한 비동기 데이터를 순서대로 브리고 블록하지 않는 역압력을 전제해 처리하는 표준 기술이다. 역압력은 발행-구독 프로토콜에서 이벤트 스트림의 구독자가 발행자의 이벤트 제공 속도보다 느린 속도로 이벤트를 소비하더라도 문제가 발생하지 않도록 보장하는 것을 의미한다. 따라서 스트림 처리의 비동기적인 특성상 역압력 기능의 내장은 필수라는 사실을 알 수 있다.
리액티브 프로그래밍 구현을 위한 Flow API
자바에서는 리액티브 프로그래밍 구현을 위해 인스턴스화 할 수 없는 정적 컴포넌트를 제공하고 있으며 발행-구독 프로토콜 모델을 지원할 수 있도록 Flow 클래스의 중첩된 인터페이스 네 개를 제공하고 있다.
- Publisher
- Subscriber
- Subscription
- Processor
Publisher 가 항목을 발행하면 Subscriber 가 한 개씩 또는 한 번에 여러 항목을 소비하는데 Subscription이 이 과정을 관리 할 수 있도록 하고 있다. Publisher는 수많은 일련의 이벤트를 제공할 수 있지만 Subscriber의 요구사항에 따라 역압력 기법에 의해 이벤트 제공 속도를 제한 할 수도 있다. Publisher는 자바의 함수형 인트페이스로, Subscriber는 Publisher가 발행한 이벤트의 리스너로 자신을 등록할 수 있으며 Subscription은 Publisher 와 Subcriber 사이의 제어 흐름, 역압력을 관리한다.
아래는 자바에서 정의된 네 개의 인터페이스 정의이다.
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
함수형 인터페이스 Publisher의 추상 메서드 subscribe는 Subscriber 를 등록할 수 있도록 한다.
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
인터페이스 Subscriber 의 onSubscribe 메서드는 Publisher 가 Subcriber를 등록 할 때 Subscription 객체를 전달인자로 등록 할 수 있도록 한다. onNext 메서드는 이벤트 전달, onError 메서드는 이벤트 에러, onComplete 메서드는 이벤트 완료를 의미한다.
public static interface Subscription {
public void request(long n);
public void cancel();
}
Subcripton 은 request 메서드를 통해 Publisher 에게 이벤트 처리 준비가 완료되었음을 알리고 cancel 메서드를 통해 더 이상의 이벤트를 받지 않음을 알린다.
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
Processor 인터페이스는 Subscriber 와 Publisher 인터페이스를 상속 받는다.
실전 예제
현재 온도를 전달하는 예제를 만들어 본다.
public class TempInfo {
public static final Random random = new Random();
private final String town;
private final int temp;
public TempInfo(String town, int temp) {
this.town = town;
this.temp = temp;
}
public static TempInfo fetch(String town){
if(random.nextInt(10)==0)
throw new RuntimeException("Error");
return new TempInfo(town, random.nextInt(100));
}
@Override
public String toString() {
return town + " : " + temp;
}
public String getTown() {
return town;
}
public int getTemp() {
return temp;
}
}
TempInfo는 도시 이름을 입력하면 랜덤하게 온도를 설정하여 10% 확률로 에러를 발생시키고 90%확률로 TempInfo 를 반환한다.
public class TempSubscription implements Subscription {
private final Subscriber<? super TempInfo> subscriber;
private final String town;
private static final ExecutorService executor = Executors.newSingleThreadExecutor();
public TempSubscription(Subscriber<? super TempInfo> subscriber, String town) {
this.subscriber = subscriber;
this.town = town;
}
@Override
public void request(long n) {
executor.submit(() -> {
for(long i=0; i<n; i++){
try {
subscriber.onNext(TempInfo.fetch(town));
}catch (Exception e){
subscriber.onError(e);
break;
}
}
});
}
@Override
public void cancel() {
subscriber.onComplete();
}
}
Subscription을 선언하여 request 메서드를 통해 몇번의 TempInfo 정보를 받아 올 것인지 정의 한다.
public class TempSubscriber implements Subscriber<TempInfo> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(TempInfo item) {
System.out.println(item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
Subscriber 에 onSubscribe 메서드를 통해 Subscription 을 등록하고 첫 번째 요청을 전달 한다. onNext 메서드를 통해 TempInfo 정보를 출력하고 한 번의 요청을 더 전달한다.
public class Main{
public static void main(String[] args){
getTemperatures("New York").subscribe(new TempSubscriber());
}
private static Publisher<TempInfo> getTemperaturesV1(String town){
return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town));
}
}
무한 루프를 돌며 onNext 와 request 가 양방향 호출을 하기 때문에 계속적으로 TempInfo 값을 출력하게 된다. 이때 10%의 확률로 에러를 터뜨리기 때문에 에러가 발생했을 경우 종료된다.
public class TempProcessor implements Processor<TempInfo, TempInfo> {
private Subscriber<? super TempInfo> subscriber;
@Override
public void subscribe(Subscriber<? super TempInfo> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}
@Override
public void onNext(TempInfo item) {
subscriber.onNext(new TempInfo(item.getTown(), (item.getTemp()-32)*5/9));
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
}
TempProcessor 는 TempInfo 정보를 받아서 TempInfo 정보를 반환하는 인터페이스를 의미한다. 이때 TempInfo 의 정보를 받아서 데이터를 가공후 원하는 TempInfo 형태로 변환하여 반환 할 수 있다. TempProcessor은 Subscriber의 onNext 메서드를 오버라이딩 하여 화씨 온도를 섭씨 온도로 변환하여 재 정의 한다.
public class Main{
public static void main(String[] args){
System.out.println("getTemperaturesV2 : ");
getTemperaturesV2("New York").subscribe(new TempSubscriber());
}
private static Publisher<TempInfo> getTemperaturesV2(String town){
return subscriber -> {
TempProcessor tempProcessor = new TempProcessor();
tempProcessor.subscribe(subscriber);
tempProcessor.onSubscribe(new TempSubscription(tempProcessor, town));
};
}
}
TempProcessor 를 통해 화씨 온도로 재 정의하여 확인해 보면 위와 같이 화씨 온도로 변화된 값을 확인 할 수있다.
RxJava 를 통한 Observable 구현
Flow API 를 통해서 자바에서는 구독-발행 패턴을 구현할 수 있었다. 하지만 자바는 Flow API 를 이용하는 직접적인 구현체를 제공하고 있지는 않다. 그 이유는 이미 다른 라이브러리에서 이러한 발행-구독 패턴을 사용할 수 있는 구현체들이 존재했기 때문이다. 대표적으로 Observable 과 같은 클래스가 있는데 이 클래스를 사용하기 위해서는 RxJava 라는 라이브러리를 이용해야 한다.
하지만 Observable 추상 클래스는 단순 프로그램의 이벤트 스트림에 적합할 뿐 역압력을 적용하기 어렵기 때문에 이벤트 스트림을 다른 클래스인 Flowable 로 사용해야 한다.
- 역압력이 필요하지 않은 경우 : Observable 로 처리
- 역압력이 필요한 경우 : Flowable 로 처리
Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);
Observable 로 구현한 onePerSec는 Interval 메서드를 이용하여 0에서 시작해 1초 간격으로 long 형식의 값을 무한으로 증가시키며 값을 방출한다.
Observable 추상 클래스의 경우 함수형인터페이스 ObservableSource 를 상속받고 subscribe 메서드를 오버라이딩 하게 된다. 이때 subscribe 메서드의 전달인자는 Observer 인터페이스 형태를 띄고 있고 Observer 인터페이스는 자바 Flow API 의 Subscriber 메서드와 동일하게 구성되어 있다. 이때 Observer 인터페이스를 onNext 메서드의 시그니처에 해당하는 람다 표현식을 전달해 Observer를 만들어 Observable 에 아래와 같은 형태로 적용할 수 있다.
onePerSec.subscribe(i -> System.out.println(TempInfo.fetch("New York")));
위와 같은 연산은 main 쓰레드에서 진행되지 않으며 RxJava의 연산 쓰레드 풀에서 진행되기 때문에 실제로 결과를 확인하고 싶다면 아래와 같은 blockingSubscribe 메서드를 통해 Observable 을 적용해야 한다.
onePerSec.blockingSubscribe(i -> System.out.println(TempInfo.fetch("New York")));
Observable 을 활용하여 1초마다 온도 값 받기
public class TempObserver implements Observer<TempInfo> {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull TempInfo tempInfo) {
System.out.println(tempInfo);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("Got problem : "+ e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
Subcriber 와 동일한 작업을 하는 TempObserver 를 선언한다.
public class Main{
public static void main(String[] args){
System.out.println("getTemperaturesV3 : ");
Observable<TempInfo> observable = getTemperaturesV3("New York");
observable.blockingSubscribe(new TempObserver());
}
private static Observable<TempInfo> getTemperaturesV3(String town){
return Observable.create(emitter -> {
Observable.interval(1, TimeUnit.SECONDS).subscribe(i -> {
if(!emitter.isDisposed()){
if(i>=5){
emitter.onComplete();
}else{
try{
emitter.onNext(TempInfo.fetch(town));
}catch (Exception e){
emitter.onError(e);
}
}
}
});
});
}
private static Observable<TempInfo> getTemperatureV4(String town){
return getTemperaturesV3(town)
.map(tempInfo -> new TempInfo(tempInfo.getTown(),(tempInfo.getTemp()-32)*5/9));
//.filter(tempInfo -> tempInfo.getTemp()>0);
}
}
1초 마다 값을 갱신하고 5번째 값이 변화되는 순간 종료되는 Observable 을 만들고 TempObserver 를 등록하여 온도를 출력해서 확인해보면 아래와 같이 New York 의 온도는 1초 간격으로 5번 출력된후 종료된다.
추가적으로 map 연산을 통하여 받아온 온도 값을 변환하거나 filter를 통하여 원하는 값을 sort 하는 과정도 진행 할 수 있다.
참고 : 모던 자바 인 액션 Chapter 17