Loading...
Spring Framework Reference Documentation 7.0.2의 Coroutines의 한국어 번역본입니다.
아래의 경우에 피드백에서 신고해주신다면 반영하겠습니다.
감사합니다 :)
Kotlin Coroutines는 Kotlin
lightweight threads로, 명령형 방식으로 논블로킹 코드를 작성할 수 있게 해줍니다. Language 측면에서
suspending functions는 비동기 작업에 대한 추상을 제공하고, library 측면에서
kotlinx.coroutines는
async { }
와 같은 함수와 Flow와 같은 타입를 제공합니다.
Spring Framework는 다음 범위에서 Coroutines에 대한 지원을 제공합니다:
@Controller에서 Deferred 및 Flow 반환 값 지원@Controller에서 suspending function 지원CoWebFilter@MessageMapping에 annotated된 메서드에서 suspending function 및 Flow 지원RSocketRequester에 대한 extensionsCoroutines 지원은 kotlinx-coroutines-core 및 kotlinx-coroutines-reactor
의존성이 classpath에 있을 때 활성화됩니다:
build.gradle.kts
1dependencies { 2 implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}") 3 implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}") 4}
Version 1.4.0 이상이 지원됩니다.
반환 값의 경우, Reactive에서 Coroutines API로의 변환은 다음과 같습니다:
fun handler(): Mono<Void> 는 suspend fun handler() 가 됩니다fun handler(): Mono<T> 는 suspend fun handler(): T 또는 suspend fun handler(): T? 가 되며, 이는 Mono가 empty일 수 있는지 여부에 따라 달라집니다 (보다 정적 타입이라는 장점이 있음)fun handler(): Flux<T> 는 fun handler(): Flow<T> 가 됩니다입력 매개변수의 경우:
fun handler(mono: Mono<T>) 는 fun handler(value: T) 가 되는데, 이는 suspending functions가 값 매개변수를 얻기 위해 호출될 수 있기 때문입니다.fun handler(mono: Mono<T>) 는 fun handler(supplier: suspend () → T) 또는 fun handler(supplier: suspend () → T?) 가 됩니다Flow는 Coroutines 세계에서의 Flux 동등체로, hot 또는 cold 스트림, 유한 또는 무한 스트림에 적합하며, 주요 차이점은 다음과 같습니다:
Flow는 푸시 기반인 반면 Flux는 푸시-풀 하이브리드입니다Flow는 단일 suspending collect 메서드만 가지고 있으며, 연산자는 extensions로 구현됩니다Flow에 커스텀 연산자를 추가할 수 있게 해줍니다map 연산자는 suspending function 매개변수를 받기 때문에 비동기 작업을 지원합니다 (flatMap이 필요 없음)Coroutines로 코드를 동시에 실행하는 방법을 포함한 더 자세한 내용은 Going Reactive with Spring, Coroutines and Kotlin Flow 에 대한 이 블로그 게시물을 읽어보세요.
다음은 Coroutines @RestController의 예시입니다.
1@RestController 2class CoroutinesRestController(client: WebClient, banner: Banner) { 3 4 @GetMapping("/suspend") 5 suspend fun suspendingEndpoint(): Banner { 6 delay(10) 7 return banner 8 } 9 10 @GetMapping("/flow") 11 fun flowEndpoint() = flow { 12 delay(10) 13 emit(banner) 14 delay(10) 15 emit(banner) 16 } 17 18 @GetMapping("/deferred") 19 fun deferredEndpoint() = GlobalScope.async { 20 delay(10) 21 banner 22 } 23 24 @GetMapping("/sequential") 25 suspend fun sequential(): List<Banner> { 26 val banner1 = client 27 .get() 28 .uri("/suspend") 29 .accept(MediaType.APPLICATION_JSON) 30 .awaitExchange() 31 .awaitBody<Banner>() 32 val banner2 = client 33 .get() 34 .uri("/suspend") 35 .accept(MediaType.APPLICATION_JSON) 36 .awaitExchange() 37 .awaitBody<Banner>() 38 return listOf(banner1, banner2) 39 } 40 41 @GetMapping("/parallel") 42 suspend fun parallel(): List<Banner> = coroutineScope { 43 val deferredBanner1: Deferred<Banner> = async { 44 client 45 .get() 46 .uri("/suspend") 47 .accept(MediaType.APPLICATION_JSON) 48 .awaitExchange() 49 .awaitBody<Banner>() 50 } 51 val deferredBanner2: Deferred<Banner> = async { 52 client 53 .get() 54 .uri("/suspend") 55 .accept(MediaType.APPLICATION_JSON) 56 .awaitExchange() 57 .awaitBody<Banner>() 58 } 59 listOf(deferredBanner1.await(), deferredBanner2.await()) 60 } 61 62 @GetMapping("/error") 63 suspend fun error() { 64 throw IllegalStateException() 65 } 66 67 @GetMapping("/cancel") 68 suspend fun cancel() { 69 throw CancellationException() 70 } 71 72}
@Controller를 사용한 뷰 렌더링도 지원됩니다.
1@Controller 2class CoroutinesViewController(banner: Banner) { 3 4 @GetMapping("/") 5 suspend fun render(model: Model): String { 6 delay(10) 7 model["banner"] = banner 8 return "index" 9 } 10}
다음은 coRouter { } DSL과 관련 핸들러를 통해 정의된 Coroutines 라우터의 예시입니다.
1@Configuration 2class RouterConfiguration { 3 4 @Bean 5 fun mainRouter(userHandler: UserHandler) = coRouter { 6 GET("/", userHandler::listView) 7 GET("/api/user", userHandler::listApi) 8 } 9}
1class UserHandler(builder: WebClient.Builder) { 2 3 private val client = builder.baseUrl("...").build() 4 5 suspend fun listView(request: ServerRequest): ServerResponse = 6 ServerResponse.ok().renderAndAwait( 7 "users", 8 mapOf("users" to client.get().uri("...").awaitExchange().awaitBody<User>()) 9 ) 10 11 suspend fun listApi(request: ServerRequest): ServerResponse = 12 ServerResponse.ok() 13 .contentType(MediaType.APPLICATION_JSON) 14 .bodyAndAwait( 15 client.get().uri("...").awaitExchange().awaitBody<User>() 16 ) 17}
Coroutines에서의 트랜잭션은 Reactive 트랜잭션 관리의 프로그래밍 방식 변형을 통해 지원됩니다.
Suspending functions의 경우, TransactionalOperator.executeAndAwait extension이 제공됩니다.
1import org.springframework.transaction.reactive.executeAndAwait 2 3class PersonRepository(private val operator: TransactionalOperator) { 4 5 suspend fun initDatabase() = operator.executeAndAwait { 6 insertPerson1() 7 insertPerson2() 8 } 9 10 private suspend fun insertPerson1() { 11 // INSERT SQL statement 12 } 13 14 private suspend fun insertPerson2() { 15 // INSERT SQL statement 16 } 17}
Kotlin Flow의 경우, Flow<T>.transactional extension이 제공됩니다.
1import org.springframework.transaction.reactive.transactional 2 3class PersonRepository(private val operator: TransactionalOperator) { 4 5 fun updatePeople() = findPeople().map(::updatePerson).transactional(operator) 6 7 private fun findPeople(): Flow<Person> { 8 // SELECT SQL statement 9 } 10 11 private suspend fun updatePerson(person: Person): Person { 12 // UPDATE SQL statement 13 } 14}
Spring 애플리케이션는 Observability 지원을 위한 Micrometer로 계측되어 있습니다.
Tracing 지원의 경우, 현재 observation은 블로킹 코드에서는 ThreadLocal을 통해,
reactive 파이프라인에서는 Reactor Context를 통해 전파됩니다. 하지만 현재 observation은 suspended function의 실행 컨텍스트에서도
사용 가능해야 합니다.
그렇지 않으면, 현재 "traceId"가 coroutines로부터의 로깅된 문에 자동으로 앞에 붙지 않습니다.
PropagationContextElement 연산자는 일반적으로
Micrometer Context Propagation library가 Kotlin Coroutines와 함께 동작하도록 보장합니다.
이는 io.micrometer:context-propagation 의존성과, 선택적으로
org.jetbrains.kotlinx:kotlinx-coroutines-reactor 의존성을 필요로 합니다. Spring이 Coroutines를 Reactor Flux 또는 Mono에
적응시키기 위해 사용하는 CoroutinesUtils#invokeSuspendingFunction을 통한 automatic context propagation은
Hooks.enableAutomaticContextPropagation()을 호출하여 활성화할 수 있습니다.
애플리케이션는 또한 PropagationContextElement를 명시적으로 사용하여 CoroutineContext를
컨텍스트 전파 메커니즘으로 보강할 수 있습니다:
1fun main() { 2 runBlocking(Dispatchers.IO + PropagationContextElement()) { 3 waitAndLog() 4 } 5} 6 7suspend fun waitAndLog() { 8 delay(10) 9 logger.info("Suspending function with traceId") 10}
여기서, Micrometer Tracing이 구성되어 있다고 가정하면, 결과 로깅 문은 현재 "traceId"를 보여주며 애플리케이션에 대한 더 나은 가시성을 가능하게 합니다.
Web
Spring Projects in Kotlin