zenn.skin 무료버전 배포중!
자세히보기

Web/Spring

Spring은 비동기 처리를 한 후에 어떻게 응답할까? - 1 : 요청 쓰레드의 반환

koosco! 2024. 10. 25. 04:14

Spring의 동기와 비동기

Spring에서는 @Async 애노테이션을 사용하면 내부에서 ThreadExecutorTaskPool을 사용해 비동기 처리를 위한 쓰레드 풀을 관리한다. 비동기 처리를 사용하지 않으면 서블릿 컨테이너가 요청부터 응답까지의 처리를 수행하게 된다.

 

DispatcherServlet에 Request가 들어와서 DAO를 통해 데이터를 받아 응답할 때까지 서블릿의 쓰레드는 결과를 기다리고 있다가 응답값을 반환하게 된다.

 

@Async 애노테이션을 사용하면 조금 다르게 동작한다. 요청은 비동기 처리를 수행하는 쓰레드 풀에게 위임하고 서블릿의 쓰레드는 다시 서블릿 쓰레드 풀로 반환되게 된다.

 

Tomcat은 Request를 받는 쓰레드 풀을, Spring은 비동기 처리를 위한 쓰레드 풀을 관리하게 된다

이때, 서블릿의 쓰레드는 비동기 처리를 하는 쓰레드 풀로 요청에 대한 처리를 위임하고 CompletableFuture 객체를 받게 된다. CompletableFuture는 비동기 처리를 한 후에 결과를 받기 위해 사용하는 객체이다. CompletableFuture은 Non-Blocking 비동기 처리를 위해 사용되는 객체로, Future 객체가 get을 사용하면 Blocking되는 것과 다르게 Non-Blocking 방식으로 비동기 처리 결과를 받을 수 있다.

 

(좌) Blocking (우) Non-Blocking

Blocking 방식으로 결과를 받게 되면 main이 되는 쓰레드가 멈춰버려 동기적으로 움직이는 것과 별반 차이 나지 않게 된다. 그렇기 때문에 비동기 처리의 결과를 받을 때는 Non-Blocking으로 결과를 받는 것이 중요하다. CompletableFuture를 사용하면 Callback을 통해 결과를 Non-Blocking 방식으로 받을 수 있다. @Async 애노테이션을 추가한 메서드는 CompletableFuture를 먼저 반환하고 나중에 Callback 방식으로 CompletableFuture의 결괏값을 받을 수 있다.

 

Controller

@RestController
public class ApplyController {

    private final TestService testService;

    public ApplyController(TestService testService) {
        this.testService = testService;
    }

    @GetMapping("/test")
    public CompletableFuture<String> hi() {
        return testService.hello();
    }
}

 

Service

@Service
public class TestService {

    private final TestRepository testRepository;

    public TestService(TestRepository testRepository) {
        this.testRepository = testRepository;
    }

    @Async
    public CompletableFuture<String> hello() {
        String result = testRepository.findById(1L);
        return CompletableFuture.completedFuture(result);
    }
}

 

Repository

@Repository
public class TestRepository {

    public String findById(long l) {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "this is result";
    }
}

 

이렇게 되면 서블릿의 쓰레드는 @Async 애노테이션이 걸려있는 Service의 hello 메서드를 수행하게 되고, ThreadPoolTaskExecutor에게 처리를 맡기게 된다. 여기서는 Repository에서 5초의 대기를 걸도록 코드를 작성했다. 비동기 쓰레드 풀에게 처리를 맡긴 후 서블릿의 쓰레드는 서블릿의 쓰레드 풀로 반환되고, 이후에 비동기 쓰레드 풀의 동작이 끝나면 사용자에게 반환된다.

 

이 부분을 공부하면서 궁금중이 생겼다. 서블릿의 쓰레드는 이미 쓰레드 풀에 반환되었는데 결괏값은 도대체 어떻게 클라이언트에게 다시 반환이 될까? 하지만 이와 관련한 내용을 찾아보질 못했다. DispatcherServlet 코드도 최근에 뜯어봤고 이것도 뭐 뜯어보지라는 생각에 디버깅을 찍고 코드를 까보기 시작했는데... 엄청난 심연을 보게 되었다....

끊임없는 추상화와 끊임없는 코드의 늪..

 

처음 요청이 들어왔을 때 서블릿의 쓰레드 동작

처음 요청이 들어왔을 때 쓰레드는 HandlerMethod, 즉 Controller로부터 CompletableFuture 객체를 받게 된다. 난 HandlerAdapter가 Handler를 호출하는 줄 알았는데 실제로 호출하는 것은 HandlerAdapter가 아닌 InvocableHandlerMethod라는 클래스이다.

 

RequestMappingHandlerAdapter

invokeHandlerMethod

HandlerAdapter는 invocableMethod.invokeAndHandle을 호출하게 되는데, 이 invocableMethod가 실질적으로 HandlerMethod를 호출하게 된다.

@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
    ...
    
    invocableMethod.invokeAndHandle(webRequest, mavContainer, new Object[0]);
    return asyncManager.isConcurrentHandlingStarted() ? null : this.getModelAndView(mavContainer, modelFactory, webRequest);
}

 

ServletInvocableHandlerMethod

invokdeAndHandle

invokeAndHandler 메서드는 invokeForRequest 메서드를 호출하여 MethodHandler로부터 결과를 returnValue로 받는다. 그 후에 returnValueHandlers를 호출하여 반환받은 값에 대한 처리를 하도록 동작한다.

public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer, Object... providedArgs) throws Exception {
    Object returnValue = this.invokeForRequest(webRequest, mavContainer, providedArgs);
    ...
    
    try {
        this.returnValueHandlers.handleReturnValue(returnValue, this.getReturnValueType(returnValue), mavContainer, webRequest);
    } catch (Exception var6) {
        if (logger.isTraceEnabled()) {
            logger.trace(this.formatErrorForReturnValue(returnValue), var6);
        }
        throw var6;
    }
}

 

invokeForRequest & doInvoke

호출 로직만을 살펴보기 위해 예외 처리 부분이나 Kotlin 관련 처리 로직은 생략했다. 여기서 doInvoke를 호출하면 MethodHandler(Controller)의 메서드가 호출되고 결과를 받을 수 있다. 위 Service 코드에서 반환했던 것과 같이 여기서 결과인 returnValue는 CompletableFuture가 된다.

@Nullable
public Object invokeForRequest(NativeWebRequest request, @Nullable ModelAndViewContainer mavContainer, Object... providedArgs) throws Exception {
    ...

    Object returnValue = this.doInvoke(args);
    ...

    return returnValue;
}

@Nullable
protected Object doInvoke(Object... args) throws Exception {
    Method method = this.getBridgedMethod();

    try {
        ...

        return method.invoke(this.getBean(), args);
    } catch (IllegalArgumentException var8) {
        ...
    } catch (InvocationTargetException var9) {
        ...
    }
}

 

HandlerMethodReturnValueHandler - HandlerMethodReturnValueHandlerComposite

invokeForRequest 메서드가 returnValue를 반환하면 invokeAndHandle 메서드는 returnValueHandlers.handleReturnValue 메서드를 호출하게 된다. returnValueHandlers는 HandlerMethodReturnValueHandlerComposite라는 엄청나게 긴 이름의 객체를 나타낸다. Composite라는 이름이 디자인 패턴을 나타내는 건가 찾아보니 Composite 패턴이라는 것이 있었고, 파일 시스템에서 재귀되는 디렉터리 구조와 같이 부모와 자식이 동일한 인터페이스를 사용하는 디자인 패턴이라고 한다.

Composite 패턴의 특징으로는 구현체들을 내부 리스트로 관리한다는 점인데, 실제로 HandlerMethodReturnvalueHandlerComposite 코드를 보면 returnValueHandlers라는 반환값 처리 핸들러 리스트를 가지고 있다.

다양한 결과를 처리하는 Handler를 가지고 있다

CompletableFuture는 바로 값을 반환 받는 것이 아닌, 비동기 쓰레드가 완료되면 결과가 채워지는 식으로 동작한다. 즉, DeferredResultMethodReturnValueHandler에 의해 처리된다. 

 

DeferredResultMethodReturnValueHandler

handleReturnValue

두 번째 나오는 else 문을 보면 CompletionStage 타입으로 returnValue를 casting하여 adaptCompletionStage 메서드에 넘겨주고 있다. CompletableFuture는 Future 인터페이스 외에도 CompletionStage 인터페이스를 구현한다. 여기서는 adaptCompletionStage 메서드의 반환값을 DeferredResult로 넣어주고 있다.

public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
    if (returnValue == null) {
        mavContainer.setRequestHandled(true);
    } else {
        DeferredResult result;
        if (returnValue instanceof DeferredResult) {
            DeferredResult<?> deferredResult = (DeferredResult)returnValue;
            result = deferredResult;
        } else if (returnValue instanceof ListenableFuture) {
            ListenableFuture<?> listenableFuture = (ListenableFuture)returnValue;
            result = this.adaptListenableFuture(listenableFuture);
        } else {
            if (!(returnValue instanceof CompletionStage)) {
                throw new IllegalStateException("Unexpected return value type: " + returnValue);
            }

            CompletionStage<?> completionStage = (CompletionStage)returnValue;
            result = this.adaptCompletionStage(completionStage);
        }

        WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, new Object[]{mavContainer});
    }
}

 

CompletionStage를 구현하고 있는 CompletableFuture 클래스

 

adaptCompletionStage

드디어 CompletableFuture의 콜백을 처리해주는 함수이다. handlerMethod로부터 받은 future 객체를 콜백으로 받아 result에 할당하도록 한다. 이 콜백 코드 덕분에 non blocking으로 비동기 쓰레드의 결괏값을 받아올 수 있게 된다. javascript의 콜백함수 동작과 비슷하게 생각할 수 있다.

private DeferredResult<Object> adaptCompletionStage(CompletionStage<?> future) {
    DeferredResult<Object> result = new DeferredResult();
    future.whenComplete((value, ex) -> {
        if (ex != null) {
            if (ex instanceof CompletionException && ex.getCause() != null) {
                ex = ex.getCause();
            }

            result.setErrorResult(ex);
        } else {
            result.setResult(value);
        }

    });
    return result;
}

 

handleReturnValue

다시 adaptCompletionStage 메서드를 통해 handleReturnValue 메서드는 DeferredResult<Object>인 result를 받게 되고, AsyncManager에게 DeferredResult를 넘겨주게 된다

public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
    		DeferredResult result;
		    ...
            
            CompletionStage<?> completionStage = (CompletionStage)returnValue;
            result = this.adaptCompletionStage(completionStage);
        }

        WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, new Object[]{mavContainer});
    }
}

 

WebAsyncManager

startDeferredResultProcessing

이 메서드에서는 초기화를 하거나 핸들러를 등록하는 등의 작업을 하여 비동기 처리가 일어나기 위한 설정을 수행하게 된다.

public void startDeferredResultProcessing(final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
    ...
    
    List<DeferredResultProcessingInterceptor> interceptors = new ArrayList();
    interceptors.add(deferredResult.getInterceptor());
    interceptors.addAll(this.deferredResultInterceptors.values());
    interceptors.add(timeoutDeferredResultInterceptor);
    DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
    
    /**
     * TimeoutHandler 추가
     * interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
     */
     
    /**
     * ErrorHandler 추가
     * deferredResult.setErrorResult(ex);
     */

    this.asyncWebRequest.addCompletionHandler(() -> {
        interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult);
    });
    interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
    this.startAsyncProcessing(processingContext);

    try {
        interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
        deferredResult.setResultHandler((result) -> {
            result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
            this.setConcurrentResultAndDispatch(result);
        });
    } catch (Throwable var7) {
        this.setConcurrentResultAndDispatch(var7);
    }
}

코드가 워낙 긴데 가장 아래 부분 코드에 대해서만 살펴보려고 한다.

1. addCompletionHandler

  • asyncWebRequest에 완료 핸들러를 등록
  • 비동기 처리가 완료될 때 호출
  • 핸들러가 호출되면, interceptorChain.triggerAfterCompletion이 실행
  • DeferredResultProcessingInterceptor 인터셉터 체인에 등록된 afterCompletion 메서드를 차례로 실행하며 후속 작업을 처리

 

2. applyBeforeConcurrentHandling

  • 비동기 처리 시작 전에 수행할 작업을 정의
  • 인터셉터 체인 내의 각 인터셉터의 beforeConcurrentHandling 메서드를 실행하며 초기화 작업이나 기타 사전 준비 작업을 수행
  • 비동기 처리가 실제로 시작되기 전에 필요한 설정들이 완료된다

InterceptorChain에 포함되는 Interceptor들

 

 

startAsyncProcessing

비동기 요청 처리를 시작한다.

concurrentResult = RESULT_NONE, concurrentResultContext = processingContext로 초기화하게 되는데 이때, 동기화를 위해 synchronized를 사용한다. concurrentResult와 concurrentResultContext는 volatile로 선언되어 있는데, 이는 가시성을 보장하여 여러 쓰레드에서 동일한 값이 사용되도록 보장되게 해 준다.

 

즉, 여기서 concurrentResult를 통해 아직 비동기 처리 중인 결과가 아직 없음을 나타내고, processingContext를 저장하여 이후 비동기 처리에 참조할 수 있도록 값을 할당해 주게 된다.

 

asyncWebRequest는 비동기 처리와 관련된 요청 객체로, 이 객체가 null인지 확인하는 로직을 포함한다.

private void startAsyncProcessing(Object[] processingContext) {
    synchronized(this) {
        this.concurrentResult = RESULT_NONE;
        this.concurrentResultContext = processingContext;
    }

    Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
    if (logger.isDebugEnabled()) {
        logger.debug("Started async request for " + formatUri(this.asyncWebRequest));
    }

    this.asyncWebRequest.startAsync();
}

 

@Nullable
private volatile Object concurrentResult;

@Nullable
private volatile Object[] concurrentResultContext;

 

StandardServletAsyncWebRequest extends AsyncRequest

여기서 사용하고 있는 this.getRequest는 RequestFacade를 나타낸다. RequestFacade는 "HttpServletRequest 인터페이스를 구현한 클래스를 감싸는 클래스"이다. 실제 요청 객체를 캡슐화하고 접근을 제한함으로써 불필요한 변경을 방지하기 위해 사용된다.

 

startAsync 메서드가 호출되면 StandardServletAsyncWebRequest.State.NEW에서 ASYNC로 상태가 변경되고 RequestFacade의 startAsync 메서드를 호출하게 된다. Facade는 다시 실제 Request의 startAsync 메서드를 호출한다.

 

startAsync

public void startAsync() {
    Assert.state(this.getRequest().isAsyncSupported(), "Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding \"<async-supported>true</async-supported>\" to servlet and filter declarations in web.xml.");
    if (!this.isAsyncStarted()) {
        if (this.state == StandardServletAsyncWebRequest.State.NEW) {
            this.state = StandardServletAsyncWebRequest.State.ASYNC;
        } else {
            Assert.state(this.state == StandardServletAsyncWebRequest.State.ASYNC, "Cannot start async: [" + this.state + "]");
        }

        this.asyncContext = this.getRequest().startAsync(this.getRequest(), this.getResponse());
        this.asyncContext.addListener(this);
        if (this.timeout != null) {
            this.asyncContext.setTimeout(this.timeout);
        }

    }
}

 

Request

startAsync

request는 asyncContext를 갖지 않기 때문에 새로운 AsyncContextImpl 객체를 생성한다. AsyncContextImpl은 생성자로부터 받은 Request를 저장하게 된다.

public AsyncContext startAsync(ServletRequest request, ServletResponse response) {
    if (!this.isAsyncSupported()) {
        IllegalStateException ise = new IllegalStateException(sm.getString("request.asyncNotSupported"));
        log.warn(sm.getString("coyoteRequest.noAsync", new Object[]{StringUtils.join(this.getNonAsyncClassNames())}), ise);
        throw ise;
    } else {
        if (this.asyncContext == null) {
            this.asyncContext = new AsyncContextImpl(this);
        }

        this.asyncContext.setStarted(this.getContext(), request, response, request == this.getRequest() && response == this.getResponse().getResponse());
        this.asyncContext.setTimeout(this.getConnector().getAsyncTimeout());
        return this.asyncContext;
    }
}
class AsyncContextImpl {
    ...
    private volatile Request request;
    ...

    public AsyncContextImpl(Request request) {
        this.request = request;
        if (log.isTraceEnabled()) {
            this.logDebug("Constructor");
        }
    }
    
    ...
}

 

AsyncContextImpl

setStarted

이 메서드에서 ASYNC_START 액션을 설정함으로써 코요테 요청에 비동기 처리를 시작하는 것을 알리게 된다. 코요테 요청의 상태는 비동기로 전환되고, 이를 통해 서블릿의 쓰레드를 쓰레드 풀에 반환할 수 있게 되었다.

이후 context.incrementInProgressAsyncCount()를 통해 비동기 요청 수를 늘리게 된다.

public void setStarted(Context context, ServletRequest request, ServletResponse response, boolean originalRequestResponse) {
    synchronized(this.asyncContextLock) {
        this.request.getCoyoteRequest().action(ActionCode.ASYNC_START, this);
        this.context = context;
        context.incrementInProgressAsyncCount();
        this.servletRequest = request;
        this.servletResponse = response;
        this.hasOriginalRequestAndResponse = originalRequestResponse;
        this.event = new AsyncEvent(this, request, response);
        List<AsyncListenerWrapper> listenersCopy = new ArrayList(this.listeners);
        this.listeners.clear();
        if (log.isTraceEnabled()) {
            log.trace(sm.getString("asyncContextImpl.fireOnStartAsync"));
        }

        Iterator var7 = listenersCopy.iterator();

        while(var7.hasNext()) {
            AsyncListenerWrapper listener = (AsyncListenerWrapper)var7.next();

            try {
                listener.fireOnStartAsync(this.event);
            } catch (Throwable var11) {
                ExceptionUtils.handleThrowable(var11);
                log.warn(sm.getString("asyncContextImpl.onStartAsyncError", new Object[]{listener.getClass().getName()}), var11);
            }
        }

    }
}

 

 

이렇게 처리된 AsyncContextImpl은 StandardServletAsyncWebRequest의 startAsync까지 다시 올라가게 된다.

public void startAsync() {
	...
        this.asyncContext = this.getRequest().startAsync(this.getRequest(), this.getResponse());
        this.asyncContext.addListener(this);
        if (this.timeout != null) {
            this.asyncContext.setTimeout(this.timeout);
        }

    }
}

위 과정을 통해 asyncContext를 반환받으면 asyncContext에 StandardServletAsyncWebRequest를 Listener로 등록하게 된다.

AsyncWebRequest는 비동기 요청의 상태 변화에 대한 이벤트를 수신하고 처리할 수 있게 된다.

요청 쓰레드는 이렇게 리스너까지 등록하게 되면 다시 DispatcherServlet을 거쳐 쓰레드 풀로 반환된다.

위에 future 콜백을 설정하는 부분이 있었는데, 비동기 요청이 완료되면 onComplete 콜백이 호출되고 요청 결과를 처리하게 된다.

여기서 생성된 AsyncContextImpl은 콜백이 완료된 이후 응답값을 반환하기 위해 DispatcherServlet에 다시 요청을 보낼 때 사용된다.


생각보다 코드가 너무 길어져서 두 번째 요청에 대한 내용은 나누어서 보려고 한다. HandlerMapping이나 HandlerAdapter까지만 생각하고 라이브러리 코드를 뜯어보기 시작했더니 정말 하루가 사라졌다 (???) 그래도 3, 4시간이면 하겠지 했는데 Spring의 추상화는 대단했다. 디버거를 찍어도 파일이 숑숑 바뀌고 한 줄씩 내리면서 코드를 까는 수준까지 갔다.. 여기서 비동기 처리 쓰레드를 확인하기 위해 Repository 코드에서 일부로 sleep을 주었는데, 디버깅 시간이 길어지면 콜백이 실행되고 다시 요청이 가게 돼서 다시 처음부터 디버그 찍어보고 무한 반복.. 오늘만 12~13시간 정도 이 코드만 죽어라 본 것 같다.. 시간이 그만큼 오래 걸렸단 건 코드 읽는 능력이 많이 부족하다는 뜻이고 코드를 읽으면서도 배운 점이 많다고 생각한다. 중간에 RequestFacade나 Compostie 패턴, 코요테 요청 등 자잘하게 배운 내용도 많고 비동기 처리 로직에 대한 이해가 확실히 는 것 같다. 내일은 마저 남은 부분을 다시 보고 정리를 해보려 한다.

'Web/Spring'의 다른글

  • 현재글 Spring은 비동기 처리를 한 후에 어떻게 응답할까? - 1 : 요청 쓰레드의 반환

관련글