Flux가 Mono를 포함하는 개념 아닌가?
Mono<User>
를 리턴하면 "단일 User만 반환한다"고 명확함.Flux<User>
라면, 여러 개일 수도, 없을 수도, 무한 개일 수도 있음.Flux<User> getUser()
함수가 있을 때, 과연 이 함수에게 어떤 기대를 해야하는지 명확하지 않음Flux
는 여러 이벤트를 처리하므로 더 복잡한 backpressure, buffer, drain 등을 고려해야 함큐처럼 파이프라인으로 구성이 되어 있기는 함. 하지만 큐는 push만 해주는 반면, Flux는 pull 방식이 가능 <- 이것 덕분에 backpressure도 가능한 것.
Flux를 컨베이어 벨트라고 생각하면 된다.
subscribe()할 때 비로소 흐름이 시작된다고하면,
Flux.just(1, 2, 3)
을 선언해두고 변수로 놔두면, 메모리에 일단 객체는 생기는 거 아닌가?
그럼 옵저버 패턴처럼 구독자들을 등록을 다 해두고, 이벤트(흐름 시작 트리거)가 발생하면 구독자들에게 알려주는 건가?
기존 멀티 스레드에서 ThreadPool 사용해서 비동기 실행은 할 수 있기는 하지만, 관리가 힘듦.
node에서나 보던 콜백 지옥을 여기서도 맛볼 수 있음.
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(() -> {
String result1 = blockingHttpCall();
executor.submit(() -> {
String result2 = saveToDb(result1);
executor.submit(() -> {
log(result2);
});
});
});
게다가 생각해보면, 스레드로 작업을 수행하는 건 "blocking"임. 비동기라고 말은 해도, 사실은 다른 스레드에 작업을 넘겨서 백그라운드에서 실행하는 것일 뿐. 그 백그라운드에서 작업을 맡은 스레드 안에서는 blocking&async로 동작함.
netty와 reactor의 조합은 하드웨어 레벨에서 non-blocking&async를 지원하기 때문에, 꼭 코드 레벨에서의 콜백 지옥이 아니더라도 성능에서 이점을 볼 수 있음.
Cold Stream : 하나의 구독자가 하나의 flux를 소비 Hot Stream : 여러 구독자가 동시에 flux를 소비
main reactor thread는 이벤트 루프임. 즉, netty를 사용하면 이벤트 루프가 코어수만큼 생성된다는 것
아래처럼 로그찍히는 게 메인 스레드.
reactor-http-nio-2
참고로 nio는 non-blocking i/o
flatMap은 javascript의 then()
과 똑같음.
js에서 비동기를 실행하면 Promise를 반환받음. Promise는 값이 아직 "없음" 그래서 비동기로 나중에 값이 채워지면 받아야 하는 게 then()인데.
const filenames = ["a.jpg", "b.jpg", "c.jpg"];
const urls = filenames.map(name => uploadImage(name));
// uploadImage: (name) => Promise<string>
console.log(urls); // [Promise, Promise, Promise]
→ 결과는 [Promise, Promise, Promise]
→ 이거 실제 값을 쓰려면 Promise.all()
해야 함.
여기서 흐름을 연결하려면?
uploadImage("a.jpg")
.then(url => saveToDB(url))
.then(() => console.log("done"));
→ 여기서 then()
은 비동기 작업(Promise)을 “벗겨서” 다음으로 연결하는 역할
이게 java의 flatMap()
then()
이 없으면 Promise가 중첩됨 (Promise<Promise<T>>
)flatMap()
이 없으면 Mono<Mono<T>>
, Flux<Mono<T>>
이런 게 생김subscribe()
두번 째 인자로 에러 처리 함수 받음retry()
: 내부에서 쓰면 단일 스트림 재시도이고, 바깥에 쓰면 전체 스트림 재시도라는데 이게.. 무슨 말인지 모르게음(flatMap이 기준인가?)onErrorContinue()
: 에러 발생했을 때 해당 요소 skip한 후, 스트림을 계속 이어감onErrorResume()
: 에러 발생했을 때 대체값 제공 가능