ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • API Latency를 줄이는 방법 (Part. 2)
    카테고리 없음 2023. 9. 3. 18:13
    API의 응답속도 및 지연에 민감한 서비스인 경우, 어떤 방식으로 latency를 줄일 수 있을까?

     

     

    이전 게시글에서는 Network IO, Cpu Io 등 Test를 위해 IO 발생 로직들을 추가한 API를 만들었다. 그리고 Jmeter를 통한 테스트 방법에 대한 안내를 진행했다. 이번에는 이전에 만든 API의 Latency를 낮추기 위해 CompletableFuture와 Async를 사용해보겠다.

     

    TestController에 V2 API를 추가한다.

    /** with CompletableFuture and Async */
    @GetMapping("/api/ral/v2/test")
    fun getTestV2(
        request: TestRequest
    ) = testV2Service.getTestV2(request).wrap()

     

    자, 그럼 서비스 단위로 가자..

    @Service
    class TestV2Service(
        private val testV2AsyncService: TestV2AsyncService
    ) {
        fun getTestV2(request: TestRequest): TestResponse {
            /** Database */
            val test1ModelCf = testV2AsyncService.asyncFindAllByIdTest1(request.test1Id)
            val test2ModelCf = testV2AsyncService.asyncFindAllByIdTest2(request.test2Id)
            val test3ModelCf = testV2AsyncService.asyncFindAllByIdTest3(request.test3Id)
            val test4ModelCf = testV2AsyncService.asyncFindAllByIdTest4(request.test4Id)
    
            /** Redis Cache */
            val test1CacheCf = testV2AsyncService.asyncGetCache("test1:key:${request.test1Id}")
            val test2CacheCf = testV2AsyncService.asyncGetCache("test2:key:${request.test2Id}")
            val test3CacheCf = testV2AsyncService.asyncGetCache("test3:key:${request.test3Id}")
            val test4CacheCf = testV2AsyncService.asyncGetCache("test4:key:${request.test4Id}")
    
            /** Cpu Logic */
            val result1Cf = testV2AsyncService.asyncMathEngine()
            val result2Cf = testV2AsyncService.asyncMathEngine()
            val result3Cf = testV2AsyncService.asyncMathEngine()
            val result4Cf = testV2AsyncService.asyncMathEngine()
    
            /** WebClient Api Call */
            val realTrend1Cf = testV2AsyncService.asyncGetRealTrend()
            val realTrend2Cf = testV2AsyncService.asyncGetRealTrend()
            val realTrend3Cf = testV2AsyncService.asyncGetRealTrend()
            val realTrend4Cf = testV2AsyncService.asyncGetRealTrend()
    
            /** join all completable futures */
            CompletableFuture.allOf(
                test1ModelCf, test2ModelCf, test3ModelCf, test4ModelCf,
                test1CacheCf, test2CacheCf, test3CacheCf, test4CacheCf,
                result1Cf, result2Cf, result3Cf, result4Cf,
                realTrend1Cf, realTrend2Cf, realTrend3Cf, realTrend4Cf
            ).join()
    
            return TestResponse.of(
                cacheModel = TestCacheModel(
                    test1 = test1CacheCf.get(),
                    test2 = test2CacheCf.get(),
                    test3 = test3CacheCf.get(),
                    test4 = test4CacheCf.get()
                ),
                test1s = test1ModelCf.get(),
                test2s = test2ModelCf.get(),
                test3s = test3ModelCf.get(),
                test4s = test4ModelCf.get(),
                result = listOf(result1Cf.get(), result2Cf.get(), result3Cf.get(), result4Cf.get()),
                trendModels = listOf(realTrend1Cf.get(), realTrend2Cf.get(), realTrend3Cf.get(), realTrend4Cf.get())
            )
        }
    }
    

    TestV2Service는 내부적으로 TestV2AsyncService를 호출하고 있다. 여기서 Cf라는 것은 CompletableFutrue의 줄임말이다.

    관례라고 생각하면 될 것이다.

     

    /** join all completable futures */
    CompletableFuture.allOf(
        test1ModelCf, test2ModelCf, test3ModelCf, test4ModelCf,
        test1CacheCf, test2CacheCf, test3CacheCf, test4CacheCf,
        result1Cf, result2Cf, result3Cf, result4Cf,
        realTrend1Cf, realTrend2Cf, realTrend3Cf, realTrend4Cf
    ).join()

    요 코드가 중요하긴 한데, 그전에 asyncService쪽을 먼저 살펴보자.

     

    TestV2AsyncService.kt

    @Service
    class TestV2AsyncService(
        private val test1Repository: Test1Repository,
        private val test2Repository: Test2Repository,
        private val test3Repository: Test3Repository,
        private val test4Repository: Test4Repository,
        private val cacheService: CacheService,
        private val mathEngine: MathEngine,
        private val googleClient: GoogleClient
    ) {
        @Async(value = "taskExecutor")
        fun asyncFindAllByIdTest1(ids: Set<Long>): CompletableFuture<List<Test1Model>> {
            return CompletableFuture.completedFuture(
                test1Repository.findAllById(ids)
                    .map { test1 -> Test1Model.from(test1) }
            )
        }
    
        @Async(value = "taskExecutor")
        fun asyncFindAllByIdTest2(ids: Set<Long>): CompletableFuture<List<Test2Model>> {
            return CompletableFuture.completedFuture(
                test2Repository.findAllById(ids)
                    .map { test2 -> Test2Model.from(test2) }
            )
        }
    
        @Async(value = "taskExecutor")
        fun asyncFindAllByIdTest3(ids: Set<Long>): CompletableFuture<List<Test3Model>> {
            return CompletableFuture.completedFuture(
                test3Repository.findAllById(ids)
                    .map { test3 -> Test3Model.from(test3) }
            )
        }
    
        @Async(value = "taskExecutor")
        fun asyncFindAllByIdTest4(ids: Set<Long>): CompletableFuture<List<Test4Model>> {
            return CompletableFuture.completedFuture(
                test4Repository.findAllById(ids)
                    .map { test4 -> Test4Model.from(test4) }
            )
        }
    
        @Async(value = "taskExecutor")
        fun asyncMathEngine(): CompletableFuture<String> {
            return CompletableFuture.completedFuture(
                mathEngine.execute()
            )
        }
    
        @Async(value = "taskExecutor")
        fun asyncGetCache(key: String): CompletableFuture<String> {
            return CompletableFuture.completedFuture(
                cacheService.get(key)
            )
        }
    
        @Async(value = "taskExecutor")
        fun asyncGetRealTrend(): CompletableFuture<GoogleRealTimeSearchTrendModel> {
            return CompletableFuture.completedFuture(
                runBlocking { googleClient.getRealTimeTrends() }
            )
        }
    }

     

     

    해당 서비스에 있는 모든 메서드들은, 모두 비동기로 실행하도록 만들었다.

     

    @EnableAsync
    @Configuration
    class AsyncConfig : AsyncConfigurerSupport() {
        @Bean("taskExecutor")
        fun taskExecutor(): ThreadPoolTaskExecutor {
            return ExecutorGenerator(
                threadName = "taskExecutor",
                corePoolSize = 10,
                maxPoolSize = 20,
                queueCapacity = 20
            ).generate()
        }
    }
    

    Executor의 설정은 다음과 같이 진행했다. 아! 그런데 해당 메서드의 extension method들은 내가 직접 만든 거라서, GITHUB 코드를 찾고하길 바란다.

     

    @Async(value = "taskExecutor")
    fun asyncFindAllByIdTest1(ids: Set<Long>): CompletableFuture<List<Test1Model>> {
        return CompletableFuture.completedFuture(
            test1Repository.findAllById(ids)
                .map { test1 -> Test1Model.from(test1) }
        )
    }

    모든 메서드들은 다음과 같이 CompletableFutrue<>로 return값이 박싱되어 있다.

    그럼 이것은 무엇을 의미하는 걸까?

     

    쉽게 생각하면, 해당 작업의 결과값들은 미래에는 완료되어 사용될 수 있는 값을 의미한다. 아직은 완료되지 않았고, 이 값들이 미래에는 완료되어 이용될 수 있다.

     

    좀 더 깊게 공부하고 싶으면, java.util.concurrent.CompletableFutrue 를 확인하자. 주석이 기깔난다.

     

    @Async를 통해 모든 메서드들이 별도의 스레드에서 작업이 수행되고, 그것의 응답을 CompletableFuture를 통해 쉽게 받을 수 있다.

     

    다음으로는.......

    /** join all completable futures */
    CompletableFuture.allOf(
        test1ModelCf, test2ModelCf, test3ModelCf, test4ModelCf,
        test1CacheCf, test2CacheCf, test3CacheCf, test4CacheCf,
        result1Cf, result2Cf, result3Cf, result4Cf,
        realTrend1Cf, realTrend2Cf, realTrend3Cf, realTrend4Cf
    ).join()

    위에서 봤던, 이 코드다.

    @Async + CompletableFuture로 만든 로직들에 대해 어떤 것이 먼저 완료되어 종료되었는지.. 그리고 어떤 순서로 반환할 것인지.. 우리는 정해야한다. 그대로 반환한다면, 로직이 수행되기 전에 메서드는 완료되었다고 처리가 되어 NullpointException이 발생할 수 있다.

     

    CompletableFuture.allOf().join()은 모든 function으로 등록된 CompletableFuture들이 모두 완료될때까지 기다리는 작업을 수행한다. (쉽게 생각하면, 모든 호출이 완료될때까지 Blocking한다고 생각하자.)

     

     

    join()을 들어가서 확인해보면,

    public T join() {
            Object r;
            if ((r = result) == null)
                r = waitingGet(false);
            return (T) reportJoin(r);
    }
        
    private Object waitingGet(boolean interruptible) {
            Signaller q = null;
            boolean queued = false;
            Object r;
            while ((r = result) == null) {
                if (q == null) {
                    q = new Signaller(interruptible, 0L, 0L);
                    if (Thread.currentThread() instanceof ForkJoinWorkerThread)
                        ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
                }
                else if (!queued)
                    queued = tryPushStack(q);
                else {
                    try {
                        ForkJoinPool.managedBlock(q);
                    } catch (InterruptedException ie) { // currently cannot happen
                        q.interrupted = true;
                    }
                    if (q.interrupted && interruptible)
                        break;
                }
            }
            if (q != null && queued) {
                q.thread = null;
                if (!interruptible && q.interrupted)
                    Thread.currentThread().interrupt();
                if (r == null)
                    cleanStack();
            }
            if (r != null || (r = result) != null)
                postComplete();
            return r;
    }
    
    private static Object reportJoin(Object r) {
            if (r instanceof AltResult) {
                Throwable x;
                if ((x = ((AltResult)r).ex) == null)
                    return null;
                if (x instanceof CancellationException)
                    throw (CancellationException)x;
                if (x instanceof CompletionException)
                    throw (CompletionException)x;
                throw new CompletionException(x);
            }
            return r;
    }

    작업이 수행완료되었는지, while문을 계속 돌면서 파악한다. (그냥 이런식으로 동작하구나..만 파악하자. 깊게 보면 뇌절온다.)

     

    여기까지 코드 작성을 따라왔다면, Jmeter를 통한 테스트 작업을 진행해보자. 이전에 만들었던, Jmeter 조건과 동일하게 진행하면 될 것

    이다.

    - user : 10

    - call : 10

    - report : 전부 ㅋ

     

    확실히, v2 API에 비해서 Latency가 낮게 형성됨을 확인할 수 있다. 기존에 1초 이상 소요되었던 것이, 현재는 300~450ms으로 Latency가 형성되어 있다. 나름 선방했다.

     

     

    그런데, 동작이 수행이 진행되면서, 어느 순간부터 Latency가 올라가는 것을 확인할 수 있다. 이유가 무엇일까??

    크게 2가지 정도의 문제가 있을 것이다.

     

    1. Tomcat에서 처리하는 Thread가 부족하다

    현재 Test를 진행하면서, 다수의 Thread를 이용하도록 구성하였다. 요청된 작업을 수행할 수 있는 Thread가 없다면, Latency가 올라갈 것이다.

     

    2. @Async 및 CompletableFuture 기반의 Blocking 처리에 따른 Thread 부족

    이번 파트에서 만든 API는 @Async 및 CompletableFuture 기반으로 구성되어 있다.

    속도는 빠를 것이다. 하지만 최대 단점이, Thread가 Blocking이 되는 문제가 발생한다. 만약에... 처리 수행 시간이 긴 로직이라면?, 그 동안 Thread는 Blocking이 되고, 다른 요청들에 대한 처리는 미뤄지게 될 것이다.

     

    Thread에 대한 스케줄링 없이 비동기 처리 작업을 진행할 경우, 낭비되는 리소스들이 많아지게 된다.

    Latency는 줄어들게 되었지만, 자원 비효율적인 코드가 완성되었다.

     

    이번 게시글에서는 자바의 기본적인 비동기 처리 방식을 통해 API의 Latency를 개선해보았다.

    그럼 오늘도 극락..

     

     

     

     

    Reference

    Github

     

극락코딩