DEV.EUN
<
2025 April
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2025-04-24

    Flux와 Mono는 왜 나눠져 있을까?

    Flux가 Mono를 포함하는 개념 아닌가?

    1. 의도 표현
      • 어떤 API가 Mono<User>를 리턴하면 "단일 User만 반환한다"고 명확함.
      • Flux<User>라면, 여러 개일 수도, 없을 수도, 무한 개일 수도 있음.
      • Flux<User> getUser() 함수가 있을 때, 과연 이 함수에게 어떤 기대를 해야하는지 명확하지 않음
    2. 최적화
      • Flux는 여러 이벤트를 처리하므로 더 복잡한 backpressure, buffer, drain 등을 고려해야 함

    Flux는 그럼 큐랑 비슷한 건가?

    큐처럼 파이프라인으로 구성이 되어 있기는 함. 하지만 큐는 push만 해주는 반면, Flux는 pull 방식이 가능 <- 이것 덕분에 backpressure도 가능한 것.

    Flux를 컨베이어 벨트라고 생각하면 된다.

    그럼 Flux 객체는 어디에 저장되고 있는거지?

    subscribe()할 때 비로소 흐름이 시작된다고하면, Flux.just(1, 2, 3)을 선언해두고 변수로 놔두면, 메모리에 일단 객체는 생기는 거 아닌가? 그럼 옵저버 패턴처럼 구독자들을 등록을 다 해두고, 이벤트(흐름 시작 트리거)가 발생하면 구독자들에게 알려주는 건가?

    새로운 스레드풀 생성하고 거기서 스레드한테 작업 시키면되는데(@Async 처럼) 굳이 Reactor를 써야 할 이유가?

    기존 멀티 스레드에서 ThreadPool 사용해서 비동기 실행은 할 수 있기는 하지만, 관리가 힘듦.

    node에서나 보던 콜백 지옥을 여기서도 맛볼 수 있음.

    ExecutorService executor = Executors.newFixedThreadPool(10);
    
    executor.submit(() -> {
        String result1 = blockingHttpCall();
        executor.submit(() -> {
            String result2 = saveToDb(result1);
            executor.submit(() -> {
                log(result2);
            });
        });
    });
    

    게다가 생각해보면, 스레드로 작업을 수행하는 건 "blocking"임. 비동기라고 말은 해도, 사실은 다른 스레드에 작업을 넘겨서 백그라운드에서 실행하는 것일 뿐. 그 백그라운드에서 작업을 맡은 스레드 안에서는 blocking&async로 동작함.

    netty와 reactor의 조합은 하드웨어 레벨에서 non-blocking&async를 지원하기 때문에, 꼭 코드 레벨에서의 콜백 지옥이 아니더라도 성능에서 이점을 볼 수 있음.


    Cold Stream vs Hot Stream

    Cold Stream : 하나의 구독자가 하나의 flux를 소비 Hot Stream : 여러 구독자가 동시에 flux를 소비

    netty의 main reactor thread는 코어 수만큼 존재

    main reactor thread는 이벤트 루프임. 즉, netty를 사용하면 이벤트 루프가 코어수만큼 생성된다는 것

    아래처럼 로그찍히는 게 메인 스레드.

    reactor-http-nio-2
    

    참고로 nio는 non-blocking i/o

    flatMap vs map

    flatMap은 javascript의 then()과 똑같음.

    js에서 비동기를 실행하면 Promise를 반환받음. Promise는 값이 아직 "없음" 그래서 비동기로 나중에 값이 채워지면 받아야 하는 게 then()인데.

    const filenames = ["a.jpg", "b.jpg", "c.jpg"];
    
    const urls = filenames.map(name => uploadImage(name)); 
    // uploadImage: (name) => Promise<string>
    
    console.log(urls); // [Promise, Promise, Promise]
    

    → 결과는 [Promise, Promise, Promise]
    → 이거 실제 값을 쓰려면 Promise.all() 해야 함.

    여기서 흐름을 연결하려면?

    uploadImage("a.jpg")
      .then(url => saveToDB(url))
      .then(() => console.log("done"));
    

    → 여기서 then()비동기 작업(Promise)을 “벗겨서” 다음으로 연결하는 역할

    이게 java의 flatMap()

    • then()이 없으면 Promise가 중첩됨 (Promise<Promise<T>>)
    • flatMap()이 없으면 Mono<Mono<T>>, Flux<Mono<T>> 이런 게 생김

    실험해볼 것 리스트

    • 100개 발행했을 때, 10개씩 잘라서 실행하는 방법?
      • 이걸 병렬로 처리하는 방법?
      • 10개씩 배치식으로 잘라서 발행한다고 하면, 소비자가 10개 다 소비하면 그 때 또 다음 10개를 보내주는 방식인건지? 이게 하나씩 동기처리하는거랑 뭐가 다른지...?
    • 10개 요청했는데 1개 실패했을 때 처리 방법
      • 전체 스트림을 실패 처리
        • subscribe() 두번 째 인자로 에러 처리 함수 받음
        • retry() : 내부에서 쓰면 단일 스트림 재시도이고, 바깥에 쓰면 전체 스트림 재시도라는데 이게.. 무슨 말인지 모르게음(flatMap이 기준인가?)
      • 1개만 예외 처리
        • onErrorContinue() : 에러 발생했을 때 해당 요소 skip한 후, 스트림을 계속 이어감
        • onErrorResume() : 에러 발생했을 때 대체값 제공 가능