ecsimsw

Future 를 활용한 비동기 이미지 비동기 업로드 흐름과 시연 본문

Future 를 활용한 비동기 이미지 비동기 업로드 흐름과 시연

JinHwan Kim 2023. 11. 28. 14:37

0. AS IS

기존에는 이 메인 / 백업 스토리지 업로드를 동기로 처리했다. 400KB의 이미지를 업로드할 때 main 33ms, backup 1680ms 정도가 필요했고, 사용자 응답은 이 둘을 더한 값 +  ⍺ 가 될 것이다.

 

이미지 업로드 시에 각 스토리지 업로드를 비동기로 처리하되 모든 업로드가 정상일 경우에만 사용자에게 정상으로 응답하고자 한다. 그리고 동시에 비동기식 업로드 과정에서 생길 수 있는 더미 파일을 사용자 흐름에 포함하지 않고 처리하고자 한다. 이 글에선 위 요구 사항을 만족하기 위한 작업 과정을 소개한다.

 

목차는 다음과 같다.

 

1. 단순 비동기 처리 

2. Future로 비동기 / 블록킹 방식으로 처리하는 경우, 그 문제점

3. CompletableFuture으로 쉽게 구현하는 다양한 처리 흐름들, 비동기 / 논 블록킹 방식의 동작 흐름

4. 예외 처리 / 이미 업로드된 더미 파일을 처리하기 위한 요구사항들

5. 메시지 큐를 활용한 더미 파일 제거 흐름과 시연

 

1. 단순 비동기 처리

메인 스토리지와 백업 스토리지의 업로드를 모두 비동기로 처리한다. 이 경우 각 스토리지 업로드 처리 결과를 응답 받지 않고 응답하기에 속도는 빠르다. 

 

다만 처리 결과를 확인하지 않기에 스토리지 중 하나에서 업로드 실패가 발생하는 경우에도 사용자에겐 정상으로 응답하게 된다.

비동기 업로드를 하되, 처리 결과를 확인하고자 한다.

 

public ImageUploadResponse upload() {
    for(var storage : Storages) {
        storage.create(resourceKey, imageFile);
    }
    return new ImageUploadResponse();
}

 

2. Future : 비동기 처리 후 블록킹

각 비동기 처리의 응답을 Future로 하고 응답 결과를 조합하는 과정에서 Future.get() 메서드로 호출하면 응답까지를 blocking 한다. 스토리지 별 업로드는 각각 비동기로 처리하되 응답을 하기 전에 모든 비동기 처리들을 기다려 모두 정상 처리 되는 경우에만 응답을 하겠다는 것이 그 아이디어다. 

 

아래는 Picup 에서의 future 사용 예시이다. 간단하게 설명하면 비동기 처리 메서드의 응답을 List으로 모으고, 각 비동기 처리의 결과를 future.get으로 확인한다. 업로드 응답으로 업로드를 마친 스토리지 정보를 확인하여 리소스에 표시하고 이를 DB에 기록한다.

 

public ImageUploadResponse upload() {
    var responseFutures = storages.stream()
        .map(storage -> storage.create(resource.getResourceKey(), imageFile))
        .collect(Collectors.toList());
    for (var future : responseFutures) {
        var uploadResponse = future.get();
        resource.storedTo(uploadResponse.getStorageKey());
        resourceRepository.save(resource);
    }
    return new ImageUploadResponse();
}

 

각 비동기 처리들의 결과를 블록, 확인하기에 처리 결과 중 하나라도 문제가 생긴 경우에는 사용자 요청에 에러를 응답할 수 있다.

 

 

만약 예상보다 오랜 시간을 점유하고 있으면 어떨까. Future의 blocking에 타임 아웃을 지정하는 것으로 예상 시간보다 처리 시간이 오래 걸리는 경우 예외로 처리할 수 있다.

 

이때 주의해야하는 점이 대기 타임 아웃을 넘겨 예외로 처리, 응답하더라도 오랜 시간을 점유하고 있던 그 비동기 처리 스레드는 종료되지 않는다는 것이다. 아래는 1초 이상 걸리는 작업이 존재할 경우 이를 Timeout 에러로 확인하고 처리 완료 전의 모든 처리들을 취소하는 예제이다.

 

public ImageUploadResponse upload() {
    try {
        for (var future : responseFutures) {
            var uploadResponse = future.get(1, TimeUnit.SECONDS);
        }
        // 응답
    } catch (TimeoutException e) {
        responseFutures.forEach(it -> {
            if (!it.isDone()) {
                it.cancel(true);
            }
        });
        // 예외 처리
    }
}

 

3. CompletableFuture : 비동기 처리 후, 논블록킹 후처리

future.get()은 블록킹으로 비동기로 처리했던 작업을 대기한다. 이 말은 비동기 처리를 대기하기 위해 위 코드에서 반복문을 처리하는 주 스레드의 흐름이 빼앗긴다는 것이다.

 

만약 3개의 작업이 비동기로 처리되고 있고, 각 비동기 작업과 후처리에 3초씩 걸린다면 위와 같은 흐름이 갖게 된다. 각 Future는 서로 다른 세 개의 스레드로 동작하지만, future.get()으로 응답을 대기하는 것은 결국 주 스레드 하나로 한다.

 

스레드 MAIN - A 응답 대기 | 스레드 A - 1초 경과 | 스레드 B - 1초 경과 | 스레드 C - 1초 경과
스레드 MAIN - A 응답 대기 | 스레드 A - 2초 경과 | 스레드 B - 2초 경과 | 스레드 C - 2초 경과
스레드 MAIN - A 응답 대기 | 스레드 A - 3초 경과 | 스레드 B - 3초 경과 | 스레드 C - 3초 경과
스레드 MAIN - A 후처리 시작
스레드 MAIN - A 후처리 1초 경과
스레드 MAIN - A 후처리 2초 경과
스레드 MAIN - A 후처리 3초 경과
스레드 MAIN - B 후처리 시작
스레드 MAIN - B 후처리 1초 경과
스레드 MAIN - B 후처리 2초 경과
스레드 MAIN - B 후처리 3초 경과
스레드 MAIN - C 후처리 시작
스레드 MAIN - C 후처리 1초 경과
스레드 MAIN - C 후처리 2초 경과
스레드 MAIN - C 후처리 3초 경과
스레드 MAIN - 처리 결과 조합 후 응답

 

이런 비동기 후 블록킹 방식의 시간 흐름을 그림으로 그리면 아래와 같을 것이다. 특히 A,B,C의 처리에 걸리는 시간과 블록킹 순서에 영향을 받게 된다. (각 A, B, C 비동기 작업 시간이 다 다르다고 가정한다.)

 

 

만약 각 비동기 처리가 종료되었을 때 메인 스레드가 이를 기다리는 것이 아닌 각 스레드가 메인 스레드에 처리 완료 이벤트를 알리고 후처리를 받아가는 꼴이면 어떨까. 논블록킹으로 비동기 처리 스레드에 후처리 방법을 알리고, 메인 스레드는 제어권을 잃지 않은 채로 또 다른 처리 완료된 비동기 처리 이벤트를 맞이하는 것이다.

 

스레드 MAIN - 이벤트 처리기 | 스레드 A - 1초 경과 | 스레드 B - 1초 경과 | 스레드 C - 1초 경과
스레드 MAIN - 이벤트 처리기 | 스레드 A - 2초 경과 | 스레드 B - 2초 경과 | 스레드 C - 2초 경과
스레드 MAIN - 이벤트 처리기 | 스레드 A - 3초 경과 | 스레드 B - 3초 경과 | 스레드 C - 3초 경과
스레드 MAIN - 이벤트 처리기 | 스레드 A - 후처리 1 | 스레드 B - 후처리 1 | 스레드 C - 후처리 1
스레드 MAIN - 이벤트 처리기 | 스레드 A - 후처리 2 | 스레드 B - 후처리 2 | 스레드 C - 후처리 2
스레드 MAIN - 이벤트 처리기 | 스레드 A - 후처리 3 | 스레드 B - 후처리 3 | 스레드 C - 후처리 3
스레드 MAIN - 처리 결과 조합 후 응답

 

위 예시와 같이 각 비동기 처리에 3초, 후처리에 3초가 걸리는 작업을 가정할 때 비동기 처리 만료시 후처리를 논블록킹으로 처리하면 이런 시간 흐름을 갖는다. 앞선 블록킹 방식보다 확연히 처리 시간이 빨라졌다.

 

자바 8에서 등장한 CompletableFuture는 기존 Future에 여러 기능이 추가되었다. 에러를 처리하는 방식에 더 많은 옵션이 생겨 코드가 깔끔해지고, 서로 다른 비동기 연산을 조합할 수 있는 등 Future에서 하기 까다로웠던 비동기 연산들이 가능해졌다. 그 외에도 다양한 기능들이 가능해지고, 개발자에게 더 많은 코드 옵션이 생겼다.

 

나는 다른 것보다도 '비동기 작업이 종료되었을 때 논 블록킹으로 후처리 할 다음 작업을 지정할 수 있음', 더 정확히는 '비동기 작업 이후 다음 작업이나 대기 조건들을 Future보다 더 다양한 방식으로 커스텀 할 수 있음'을 이유로 Future에서 CompletableFuture으로 넘어가게 되었다. 

 

 

 

예를 들어 위 그림은 CompletableFuture 로 구현해 본 비동기 작업과 후처리의 시나리오들이다. 다양한 시나리오들이 간단히 가능했고 그중 세 가지 선택지로 추려 서비스에 가장 필요한 방식을 선택할 수 있었다. Picup에선 1번 방식을 채택하였다. 

 

 

아래는 이를 적용한 Picup에서의 코드 예시이다. thenAccept로 각 비동기 처리에 대한 논 블록킹 후처리가 가능해져 전체 응답 속도를 기존보다 개선하였고, 마지막 모든 future를 join 하는 것으로 모든 비동기 작업 - 후처리 로직을 마친 후에 다음 로직을 진행하는 것을 보장할 수 있었다.

 

public ImageUploadResponse upload() {
    var futures = List.of(
        mainStorage.create(resourceKey, imageFile).thenAccept(result -> {
            // 후처리 로직
        }),
        backUpStorage.create(resourceKey, imageFile).thenAccept(result -> {
            // 후처리 로직
        })
    );
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
    return new ImageUploadResponse(resourceKey, imageFile.getSize());
}

 

이로써 '비동기 작업으로 파일 업로드를 마치고, 논 블록킹으로 각각의 결과를 대기 시간 없이 DB에 기록하며 모든 {업로드 - 후처리}를 마침이 보장된 이후에 사용자에게 응답한다'는 요구 사항을 만족할 수 있게 된 것이다.

 

4.  예외 시 더미 파일을 처리하기 위한 요구사항들

업로드 중 하나라도 예외가 발생하는 경우 서버 에러를 응답해야 한다. 이때 이미 스토리지 업로드 처리가 완료된 더미 파일에 대한 처리가 필요하다. 이때 에러 처리에 고민을 많이 했다. 원했던 요구 사항은 다음과 같다.

 

1. 각 비동기 업로드가 모두 정상으로 종료되었을 때만 사용자에게 정상 응답한다.

2. 업로드 시간이 하나라도 N초 이상으로 걸리면 그 즉시 서버 에러를 응답한다.

3. 업로드 도중 예외가 하나라도 발생한다면 그 즉시 서버 에러를 응답한다.

4. 업로드 도중 예외가 발생하는 경우 이미 업로드된 더미 파일을 제거한다.

 

* 이때 업로드 시간은 스토리지별로 상이하다. 예외가 발생하는 즉시 더미 파일을 제거한다면 특정 스토리지는 더미 삭제 시도 후  업로드 되는 것을 조심한다.

* 같은 이유로 타임 아웃을 바로 예외 처리해선 안된다. 타임 아웃 즉시 더미 파일을 제거한다면 특정 스토리지는 더미 삭제 시도 후 업로드 될 것이다.

 

public ImageUploadResponse upload() {
    var futures = List.of(
        upload(mainStorage, imageFile, resource),
        upload(backUpStorage, imageFile, resource)
    );
    try {
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .orTimeout(N, TimeUnit.SECONDS)
            .join();
    } catch (CompletionException e) {
        futures.forEach(it-> it.thenAccept( result -> {
           storageMessageQueue.deleteByStorage(result.resourceKey(), result.storageKey());
        }));
        throw new StorageException("exception while uploading");
    }
    return new ImageUploadResponse(resource.getResourceKey(), imageFile.getSize());
}

 

각 업로드 작업에 타임 아웃을 줘 타임 아웃이 발생한다면 이는 예외로 처리된다. 이때 타임 아웃으로 만료된 스레드는 타임 아웃으로 종료하지 않는다. 타임 아웃으로 아직 종료되지 않은 스레드를 종료해 버리는 꼴이 되려면 Thread를 직접 다뤄야 하는데 위험할뿐더러 예외 처리에도 더 많은 고민들이 필요할 것이라고 생각했다. (ex, 업로드 성공 후, 후처리 도중 타임 아웃이 발생하는 경우)

 

대신 에러 또는 타임 아웃에도 각 비동기 작업들은 논 블록킹으로 다음 작업을 갖게 된다. 논 블록킹으로 작업 완료 후 다음 작업을 부여받기에 예외가 발생한 그 시점에서 서버 에러를 응답 할 수 있다.

 

그리고 각 비동기 작업들은 다음 작업으로 메시지 큐에 본인이 업로드하려고 했던 리소스 키와 스토리지 정보를 정보를 넘기게 된다.

 

이렇게 각 업로드를 비동기 처리하되 예외 또는 타임 아웃이 발생하는 경우 서버 에러를 그 즉시 응답하면서도, 남은 비동기 작업들을 대기했다가 더미 파일을 안전하게 제거할 수 있었다.

 

5. 메시지 큐를 활용한 보상 요청 흐름과 시연

삭제 처리를 사용자 요청 흐름에서 하지 않고 큐에 보관 후 처리하는 것으로, 사용자 요청 흐름에 파일 삭제를 포함하지 않을 수 있고 Handler가 준비될 때까지 대기 후 처리할 수 있게 된다. 또 메시지 처리 중에 실패가 일어나거나 Time out 이 발생하는 등의 처리를 직접 하지 않고, 메시지 큐의 기능을 사용하여 재시도 / 복구 처리 할 수 있게 된다.

 

 

아래는 Picup - storage 서버에서 25개의 사진 업로드 비동기 요청 처리 중 더미 파일이 생긴 상황에 대한 테스트 영상이다. 우측 상단에는 저장되고 삭제될 파일 목록, 좌측에는 메시지 큐, 우측 하단에는 애플리케이션 로그를 볼 수 있다.

 

Main storage에는 정상 업로드, Backup storage 로의 요청은 2초 후 타임 아웃처리된다. Queue의 동시 처리 consumer 는 5개로 한다. storage server was가 5개씩 메시지를 읽어와 끝에는 모든 더미 파일이 제거된 것을 볼 수 있다.

 

 

명확한 처리 로그를 위해 제거 작업에 딜레이를 주고 시연에 불필요한 로그는 제거 후 녹화했다.

 

Comments