Loading...
Spring Framework Reference Documentation 7.0.2의 RSocket의 한국어 번역본입니다.
아래의 경우에 피드백에서 신고해주신다면 반영하겠습니다.
감사합니다 :)
This section describes Spring Framework’s support for the RSocket protocol.
RSocket은 TCP, WebSocket, 그리고 기타 바이트 스트림 전송 위에서 multiplexed, duplex 통신을 하기 위한 애플리케이션 프로토콜이며, 다음과 같은 상호 작용 모델 중 하나를 사용합니다:
Request-Response — 하나의 메시지를 보내고 하나의 메시지를 응답으로 받습니다.Request-Stream — 하나의 메시지를 보내고 메시지 스트림을 응답으로 받습니다.Channel — 양 방향으로 메시지 스트림을 주고받습니다.Fire-and-Forget — one-way 메시지를 전송합니다.초기 연결이 생성된 후에는 양쪽이 대칭적이 되어 각자 위의 상호 작용 중 하나를 시작할 수 있으므로 "client" vs "server" 구분은 사라집니다. 이러한 이유로 프로토콜에서는 참여하는 양쪽을 "requester"와 "responder"라고 부르며, 위의 상호 작용을 "request streams" 또는 간단히 "requests"라고 부릅니다.
다음은 RSocket 프로토콜의 핵심 기능과 이점입니다:
Request-Stream과 Channel과 같은 스트리밍 request의 경우, back pressure 신호가 requester와 responder 사이를 이동하여 requester가 source에서 responder의 속도를 늦출 수 있게 해 줍니다. 이를 통해 네트워크 계층 혼잡 제어에 대한 의존도와 네트워크 레벨 또는 다른 어떤 레벨에서의 버퍼링 필요성을 줄일 수 있습니다.LEASE 프레임에서 이름을 따 "Leasing"이라 부릅니다. Lease는 주기적으로 갱신됩니다.RSocket은 여러 언어로 된 구현체을 가지고 있습니다. Java 라이브러리는 전송을 위해 Project Reactor와 Reactor Netty 위에 구축됩니다. 이는 애플리케이션 내 Reactive Streams Publisher에서 나오는 signal이 네트워크를 가로질러 RSocket을 통해 투명하게 전파된다는 것을 의미합니다.
RSocket의 장점 중 하나는 wire 상에서 잘 정의된 동작을 가지며 읽기 쉬운 명세와 일부 프로토콜 확장을 함께 제공한다는 점입니다. 따라서 언어 구현 및 상위 레벨 프레임워크 API와는 독립적으로 spec을 읽어 보는 것이 좋습니다.
이 섹션은 일부 컨텍스트를 설정하기 위한 간결한 개요를 제공합니다.
Connecting
초기에 client는 TCP 또는 WebSocket과 같은 로우 레벨 스트리밍 전송을 통해 server에 연결하고 연결의 매개변수를 설정하기 위해 server로 SETUP 프레임을 보냅니다.
server는 SETUP 프레임을 거부할 수 있지만, 일반적으로 (client 입장에서) 전송되고 (server 입장에서) 수신된 후에는 둘 다 request를 시작할 수 있습니다. 단, SETUP이 request 수를 제한하기 위해 leasing 의미론 사용을 나타내는 경우에는 양쪽 모두 다른 쪽에서 request를 허용하는 LEASE 프레임을 기다려야 합니다.
Making Requests
연결이 설정되면 양쪽 모두 REQUEST_RESPONSE, REQUEST_STREAM, REQUEST_CHANNEL, REQUEST_FNF 프레임 중 하나를 통해 request를 시작할 수 있습니다. 각 프레임은 requester에서 responder로 하나의 메시지를 전달합니다.
그 다음 responder는 response 메시지를 담은 PAYLOAD 프레임을 반환할 수 있으며, REQUEST_CHANNEL의 경우 requester도 추가 request 메시지를 담은 PAYLOAD 프레임을 보낼 수 있습니다.
Request-Stream과 Channel처럼 메시지 스트림이 관련된 request의 경우, responder는 requester로부터의 demand 신호를 존중해야 합니다. Demand는 메시지의 개수로 표현됩니다. 초기 demand는 REQUEST_STREAM 및 REQUEST_CHANNEL 프레임에서 지정됩니다. 이후 demand는 REQUEST_N 프레임을 통해 signal 됩니다.
각 측은 또한 개별 request가 아닌 전체 연결과 관련된 메타데이터 알림을 METADATA_PUSH 프레임을 통해 전송할 수 있습니다.
Message Format
RSocket 메시지는 data와 metadata를 포함합니다. Metadata는 route, security token 등을 보내는 데 사용할 수 있습니다. Data와 metadata는 서로 다르게 포맷될 수 있습니다. 각 mime type은 SETUP 프레임에서 선언되며, 주어진 연결의 모든 request에 적용됩니다.
모든 메시지는 metadata를 가질 수 있지만, 일반적으로 route와 같은 metadata는 request별이며, 따라서 request의 첫 번째 메시지, 즉 REQUEST_RESPONSE, REQUEST_STREAM, REQUEST_CHANNEL, REQUEST_FNF 프레임 중 하나에만 포함됩니다.
프로토콜 확장은 애플리케이션에서 사용할 공통 메타데이터 포맷을 정의합니다:
RSocket을 위한 Java 구현체는 Project Reactor 위에 구축되어 있습니다. TCP와 WebSocket을 위한 전송은 Reactor Netty 위에 구축됩니다. Reactive Streams 라이브러리로서 Reactor는 프로토콜 구현 작업을 단순화합니다.
애플리케이션 입장에서는 선언적인 operator와 투명한 back pressure 지원을 가진 Flux와 Mono를 사용하는 것이 자연스럽습니다.
RSocket Java의 API는 의도적으로 최소한이고 기본적입니다. 이는 프로토콜 기능에 초점을 맞추고 애플리케이션 프로그래밍 모델(예: RPC codegen vs 기타)을 상위 레벨의 독립적인 관심사로 남겨 둡니다.
주요 계약인
io.rsocket.RSocket은
Mono를 단일 메시지에 대한 promise로, Flux를 메시지 스트림으로, io.rsocket.Payload를 data와 metadata에 바이트 버퍼 형태로 접근할 수 있는 실제 메시지로 사용하여 네 가지 request 상호 작용 타입을 모델링합니다. RSocket 계약은 대칭적으로 사용됩니다. Requesting의 경우 애플리케이션은 request를 수행할 수 있는 RSocket을 받습니다. Responding의 경우 애플리케이션은 request를 처리하기 위해 RSocket을 구현합니다.
이는 철저한 소개를 의도한 것은 아닙니다. 대부분의 경우 Spring 애플리케이션은 해당 API를 직접 사용할 필요가 없습니다. 그러나 Spring과 독립적으로 RSocket을 보거나 실험해 보는 것이 중요할 수 있습니다.
RSocket Java 저장소에는 그 API와 프로토콜 기능을 보여 주는 여러 샘플 앱이 포함되어 있습니다.
spring-messaging 모듈에는 다음이 포함됩니다:
io.rsocket.RSocket을 통해 request를 수행하기 위한 fluent API.@MessageMapping 및 @RSocketExchange가 선언된 handler 메서드.@RSocketExchange 메서드를 가진 Java 인터페이스로서의 RSocket 서비스 선언.spring-web 모듈에는 Jackson CBOR/JSON, Protobuf와 같이 RSocket 애플리케이션에서 필요할 가능성이 높은 Encoder 및 Decoder 구현이 포함됩니다. 또한 효율적인 route matching을 위해 플러그인 할 수 있는 PathPatternParser도 포함합니다.
Spring Boot 2.2는 TCP 또는 WebSocket 위에서 RSocket server를 구동하는 것을 지원하며, WebFlux server에서 WebSocket을 통해 RSocket을 노출하는 옵션도 포함합니다. 또한 client 지원과 RSocketRequester.Builder 및 RSocketStrategies에 대한 자동 설정도 제공합니다.
자세한 내용은 Spring Boot reference의 RSocket section을 참조하십시오.
Spring Security 5.2는 RSocket 지원을 제공합니다.
Spring Integration 5.2는 RSocket client 및 server와 상호 작용하기 위한 inbound 및 outbound 게이트웨이를 제공합니다. 자세한 내용은 Spring Integration Reference Manual을 참조하십시오.
Spring Cloud Gateway는 RSocket 연결을 지원합니다.
RSocketRequester는 RSocket request를 수행하기 위한 fluent API를 제공하며, 로우 레벨 data 버퍼 대신 data와 metadata에 대해 객체를 받아들이고 반환합니다. 이는 대칭적으로 사용될 수 있으며, client에서 request를 만들거나 server에서 request를 만드는 데 사용할 수 있습니다.
Client 측에서 RSocketRequester를 얻는 것은 server에 연결하는 것이며, 이는 연결 설정을 가진 RSocket SETUP 프레임을 보내는 것을 포함합니다. RSocketRequester는 SETUP 프레임에 대한 연결 설정을 포함해 io.rsocket.core.RSocketConnector를 준비하는 데 도움을 주는 builder를 제공합니다.
다음은 기본 설정으로 연결하는 가장 기본적인 방법입니다:
1RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000); 2 3URI url = URI.create("https://example.org:8080/rsocket"); 4RSocketRequester requester = RSocketRequester.builder().webSocket(url);
1val requester = RSocketRequester.builder().tcp("localhost", 7000) 2 3val url = URI.create("https://example.org:8080/rsocket") 4val requester = RSocketRequester.builder().webSocket(url)
위의 코드는 즉시 연결하지 않습니다. Request가 만들어질 때 공유 연결이 투명하게 설정되고 사용됩니다.
RSocketRequester.Builder는 초기 SETUP 프레임을 커스터마이징 하기 위해 다음을 제공합니다:
dataMimeType(MimeType) — 연결의 data에 대한 mime type을 설정합니다.metadataMimeType(MimeType) — 연결의 metadata에 대한 mime type을 설정합니다.setupData(Object) — SETUP에 포함할 data입니다.setupRoute(String, Object…) — SETUP에 포함할 metadata의 route입니다.setupMetadata(Object, MimeType) — SETUP에 포함할 기타 metadata입니다.Data의 경우 기본 mime type은 처음으로 설정된 Decoder에서 파생됩니다. Metadata의 경우 기본 mime type은 request마다 여러 metadata 값과 mime type 쌍을 허용하는
composite metadata입니다. 일반적으로 둘 다 변경할 필요는 없습니다.
SETUP 프레임의 data와 metadata는 optional입니다. Server 측에서는
@ConnectMapping 메서드를 사용하여 연결 시작과 SETUP 프레임의 내용을 처리할 수 있습니다. Metadata는 연결 레벨 보안에 사용할 수 있습니다.
RSocketRequester.Builder는 requester를 구성하기 위해 RSocketStrategies를 받습니다.
Data 및 metadata 값의 (de)-serialization을 위한 encoder와 decoder를 제공하려면 이를 사용해야 합니다. 기본적으로 spring-core의 String, byte[], ByteBuffer에 대한 기본 코덱만 등록됩니다.
spring-web을 추가하면 다음과 같이 등록할 수 있는 더 많은 코덱에 액세스할 수 있습니다:
1RSocketStrategies strategies = RSocketStrategies.builder() 2 .encoders(encoders -> encoders.add(new JacksonCborEncoder())) 3 .decoders(decoders -> decoders.add(new JacksonCborDecoder())) 4 .build(); 5 6RSocketRequester requester = RSocketRequester.builder() 7 .rsocketStrategies(strategies) 8 .tcp("localhost", 7000);
1val strategies = RSocketStrategies.builder() 2 .encoders { it.add(JacksonCborEncoder()) } 3 .decoders { it.add(JacksonCborDecoder()) } 4 .build() 5 6val requester = RSocketRequester.builder() 7 .rsocketStrategies(strategies) 8 .tcp("localhost", 7000)
RSocketStrategies는 재사용을 위해 설계되었습니다. 예를 들어 client와 server가 같은 애플리케이션에 있는 일부 시나리오에서는 이를 Spring 설정에 선언하는 것이 더 나을 수 있습니다.
RSocketRequester.Builder는 server로부터의 request에 대한 responder를 구성하는 데 사용할 수 있습니다.
Client 측 responding을 위해 server에서 사용되는 것과 동일한 인프라를 기반으로 annotated handler를 사용할 수 있지만, 다음과 같이 프로그래밍 방식으로 등록합니다:
1RSocketStrategies strategies = RSocketStrategies.builder() 2 .routeMatcher(new PathPatternRouteMatcher()) 3 .build(); 4 5SocketAcceptor responder = 6 RSocketMessageHandler.responder(strategies, new ClientHandler()); 7 8RSocketRequester requester = RSocketRequester.builder() 9 .rsocketConnector(connector -> connector.acceptor(responder)) 10 .tcp("localhost", 7000);
| 1 | 효율적인 route matching을 위해, spring-web이 존재하는 경우 PathPatternRouteMatcher를 사용합니다. |
| 2 | @MessageMapping 및/또는 @ConnectMapping 메서드를 가진 class에서 responder를 생성합니다. |
| 3 | responder를 등록합니다. |
1val strategies = RSocketStrategies.builder() 2 .routeMatcher(PathPatternRouteMatcher()) 3 .build() 4 5val responder = 6 RSocketMessageHandler.responder(strategies, ClientHandler()) 7 8val requester = RSocketRequester.builder() 9 .rsocketConnector { it.acceptor(responder) } 10 .tcp("localhost", 7000)
| 1 | 효율적인 route matching을 위해, spring-web이 존재하는 경우 PathPatternRouteMatcher를 사용합니다. |
| 2 | @MessageMapping 및/또는 @ConnectMapping 메서드를 가진 class에서 responder를 생성합니다. |
| 3 | responder를 등록합니다. |
위의 내용은 client responder의 프로그래밍 방식 등록을 위해 설계된 shortcut일 뿐입니다. Client responder가 Spring 설정에 있는 다른 시나리오에서는 여전히 RSocketMessageHandler를 Spring bean으로 선언한 다음 다음과 같이 적용할 수 있습니다:
1ApplicationContext context = ... ; 2RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class); 3 4RSocketRequester requester = RSocketRequester.builder() 5 .rsocketConnector(connector -> connector.acceptor(handler.responder())) 6 .tcp("localhost", 7000);
1import org.springframework.beans.factory.getBean 2 3val context: ApplicationContext = ... 4val handler = context.getBean<RSocketMessageHandler>() 5 6val requester = RSocketRequester.builder() 7 .rsocketConnector { it.acceptor(handler.responder()) } 8 .tcp("localhost", 7000)
위의 경우 client responder를 감지하기 위해, 예를 들어 기본 @Controller 대신 @RSocketClientResponder와 같은 custom annotation을 기반으로 하는 다른 전략으로 전환하기 위해 RSocketMessageHandler의 setHandlerPredicate를 사용해야 할 수도 있습니다. 이는 동일한 애플리케이션에 client와 server 또는 여러 client가 있는 시나리오에서 필요합니다.
프로그래밍 모델에 대한 자세한 내용은 Annotated Responders를 참조하십시오.
RSocketRequesterBuilder는 keepalive interval, session resumption, interceptor 등 추가 설정 옵션을 위해 underlying io.rsocket.core.RSocketConnector를 노출하는 callback을 제공합니다.
다음과 같이 해당 레벨에서 옵션을 구성할 수 있습니다:
1RSocketRequester requester = RSocketRequester.builder() 2 .rsocketConnector(connector -> { 3 // ... 4 }) 5 .tcp("localhost", 7000);
1val requester = RSocketRequester.builder() 2 .rsocketConnector { 3 //... 4 } 5 .tcp("localhost", 7000)
Server에서 연결된 client로 request를 만드는 것은 server에서 연결된 client에 대한 requester를 얻는 문제입니다.
Annotated Responders에서 @ConnectMapping 및 @MessageMapping 메서드는 RSocketRequester 인자를 지원합니다. 이를 사용하여 연결에 대한 requester에 접근하십시오.
@ConnectMapping 메서드는 본질적으로 request가 시작되기 전에 처리해야 하는 SETUP 프레임의 handler라는 점을 기억하십시오. 따라서 아주 처음의 request는 handling에서 분리되어야 합니다. 예를 들면 다음과 같습니다:
1@ConnectMapping 2Mono<Void> handle(RSocketRequester requester) { 3 requester.route("status").data("5") 4 .retrieveFlux(StatusReport.class) 5 .subscribe(bar -> { 6 // ... 7 }); 8 return ... ; 9}
| 1 | Handling과 독립적으로 비동기적으로 request를 시작합니다. |
| 2 | Handling을 수행하고 완료 Mono<Void>를 반환합니다. |
1@ConnectMapping 2suspend fun handle(requester: RSocketRequester) { 3 GlobalScope.launch { 4 requester.route("status").data("5").retrieveFlow<StatusReport>().collect { 5 // ... 6 } 7 } 8 /// ... 9}
| 1 | Handling과 독립적으로 비동기적으로 request를 시작합니다. |
| 2 | suspending function에서 handling을 수행합니다. |
client 또는 server requester가 있으면 다음과 같이 request를 만들 수 있습니다:
1ViewBox viewBox = ... ; 2 3Flux<AirportLocation> locations = requester.route("locate.radars.within") 4 .data(viewBox) 5 .retrieveFlux(AirportLocation.class);
| 1 | Request 메시지의 metadata에 포함할 route를 지정합니다. |
| 2 | Request 메시지에 대한 data를 제공합니다. |
| 3 | 예상되는 response를 선언합니다. |
1val viewBox: ViewBox = ... 2 3val locations = requester.route("locate.radars.within") 4 .data(viewBox) 5 .retrieveFlow<AirportLocation>()
| 1 | Request 메시지의 metadata에 포함할 route를 지정합니다. |
| 2 | Request 메시지에 대한 data를 제공합니다. |
| 3 | 예상되는 response를 선언합니다. |
Interaction 타입은 input과 output의 cardinality에서 암묵적으로 결정됩니다. 위 예제는 하나의 값을 보내고 값의 스트림을 받기 때문에 Request-Stream입니다.
대부분의 경우 input과 output의 선택이 RSocket interaction 타입 및 responder에서 예상하는 input과 output 타입과 일치하는 한 이에 대해 신경 쓸 필요가 없습니다. 유일한 잘못된 조합 예시는 many-to-one입니다.
data(Object) 메서드는 Reactive Streams Publisher인 Flux와 Mono를 포함해 ReactiveAdapterRegistry에 등록된 value producer를 모두 허용합니다. 동일한 타입의 값을 생성하는 multi-value Publisher인 Flux의 경우, 모든 element에서 타입 체크와 Encoder lookup을 피하기 위해 overloaded data 메서드 중 하나를 사용하는 것을 고려하십시오:
1data(Object producer, Class<?> elementClass); 2data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object) 단계는 optional입니다. Data를 보내지 않는 request의 경우 이를 생략하십시오:
1Mono<AirportLocation> location = requester.route("find.radar.EWR")) 2 .retrieveMono(AirportLocation.class);
1import org.springframework.messaging.rsocket.retrieveAndAwait 2 3val location = requester.route("find.radar.EWR") 4 .retrieveAndAwait<AirportLocation>()
추가 metadata 값은
composite metadata(기본값)를 사용하고, 값이 등록된 Encoder에서 지원되는 경우에 추가할 수 있습니다. 예를 들면 다음과 같습니다:
1String securityToken = ... ; 2ViewBox viewBox = ... ; 3MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0"); 4 5Flux<AirportLocation> locations = requester.route("locate.radars.within") 6 .metadata(securityToken, mimeType) 7 .data(viewBox) 8 .retrieveFlux(AirportLocation.class);
1import org.springframework.messaging.rsocket.retrieveFlow 2 3val requester: RSocketRequester = ... 4 5val securityToken: String = ... 6val viewBox: ViewBox = ... 7val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0") 8 9val locations = requester.route("locate.radars.within") 10 .metadata(securityToken, mimeType) 11 .data(viewBox) 12 .retrieveFlow<AirportLocation>()
Fire-and-Forget의 경우 Mono<Void>를 반환하는 send() 메서드를 사용하십시오. Mono는 메시지가 성공적으로 전송되었음을 나타낼 뿐, 처리되었음을 나타내지는 않는다는 점에 유의하십시오.
Metadata-Push의 경우 Mono<Void> return value를 가진 sendMetadata() 메서드를 사용하십시오.
RSocket responder는 @MessageMapping 및 @ConnectMapping 메서드로 구현할 수 있습니다.
@MessageMapping 메서드는 개별 request를 처리하고 @ConnectMapping 메서드는 연결 레벨 이벤트(setup 및 metadata push)를 처리합니다.
Annotated responder는 server 측에서의 responding과 client 측에서의 responding 모두에 대해 대칭적으로 지원됩니다.
Server 측에서 annotated responder를 사용하려면 Spring 설정에 RSocketMessageHandler를 추가하여 @MessageMapping 및 @ConnectMapping 메서드를 가진 @Controller bean을 감지하도록 합니다:
1@Configuration 2static class ServerConfig { 3 4 @Bean 5 public RSocketMessageHandler rsocketMessageHandler() { 6 RSocketMessageHandler handler = new RSocketMessageHandler(); 7 handler.routeMatcher(new PathPatternRouteMatcher()); 8 return handler; 9 } 10}
1@Configuration 2class ServerConfig { 3 4 @Bean 5 fun rsocketMessageHandler() = RSocketMessageHandler().apply { 6 routeMatcher = PathPatternRouteMatcher() 7 } 8}
그런 다음 Java RSocket API를 통해 RSocket server를 시작하고 responder로 RSocketMessageHandler를 다음과 같이 플러그인 합니다:
1ApplicationContext context = ... ; 2RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class); 3 4CloseableChannel server = 5 RSocketServer.create(handler.responder()) 6 .bind(TcpServerTransport.create("localhost", 7000)) 7 .block();
1import org.springframework.beans.factory.getBean 2 3val context: ApplicationContext = ... 4val handler = context.getBean<RSocketMessageHandler>() 5 6val server = RSocketServer.create(handler.responder()) 7 .bind(TcpServerTransport.create("localhost", 7000)) 8 .awaitSingle()
RSocketMessageHandler는 기본적으로
composite 및
routing metadata를 지원합니다. 다른 mime type으로 전환하거나 추가 메타데이터 mime type을 등록해야 하는 경우에는
MetadataExtractor를 설정할 수 있습니다.
지원할 metadata 및 data 포맷을 위해 필요한 Encoder 및 Decoder 인스턴스를 설정해야 합니다. 코덱 구현을 위해서는 spring-web 모듈이 필요할 가능성이 큽니다.
기본적으로 SimpleRouteMatcher가 AntPathMatcher를 통해 route matching에 사용됩니다.
효율적인 route matching을 위해 spring-web의 PathPatternRouteMatcher를 플러그인 하는 것을 권장합니다. RSocket route는 계층적일 수 있지만 URL path는 아닙니다. 두 route matcher 모두 기본적으로 "."을 구분자로 사용하도록 구성되어 있으며, HTTP URL에서와 같은 URL 디코딩은 없습니다.
RSocketMessageHandler는 같은 프로세스 내에서 client와 server 간에 설정을 공유해야 하는 경우에 유용할 수 있는 RSocketStrategies를 통해 구성할 수 있습니다:
1@Configuration 2static class ServerConfig { 3 4 @Bean 5 public RSocketMessageHandler rsocketMessageHandler() { 6 RSocketMessageHandler handler = new RSocketMessageHandler(); 7 handler.setRSocketStrategies(rsocketStrategies()); 8 return handler; 9 } 10 11 @Bean 12 public RSocketStrategies rsocketStrategies() { 13 return RSocketStrategies.builder() 14 .encoders(encoders -> encoders.add(new JacksonCborEncoder())) 15 .decoders(decoders -> decoders.add(new JacksonCborDecoder())) 16 .routeMatcher(new PathPatternRouteMatcher()) 17 .build(); 18 } 19}
1@Configuration 2class ServerConfig { 3 4 @Bean 5 fun rsocketMessageHandler() = RSocketMessageHandler().apply { 6 rSocketStrategies = rsocketStrategies() 7 } 8 9 @Bean 10 fun rsocketStrategies() = RSocketStrategies.builder() 11 .encoders { it.add(JacksonCborEncoder()) } 12 .decoders { it.add(JacksonCborDecoder()) } 13 .routeMatcher(PathPatternRouteMatcher()) 14 .build() 15}
Client 측의 annotated responder는 RSocketRequester.Builder에서 구성해야 합니다. 자세한 내용은
Client Responders를 참조하십시오.
server 또는
client responder 설정이 구성되면,
@MessageMapping 메서드를 다음과 같이 사용할 수 있습니다:
1@Controller 2public class RadarsController { 3 4 @MessageMapping("locate.radars.within") 5 public Flux<AirportLocation> radars(MapRequest request) { 6 // ... 7 } 8}
1@Controller 2class RadarsController { 3 4 @MessageMapping("locate.radars.within") 5 fun radars(request: MapRequest): Flow<AirportLocation> { 6 // ... 7 } 8}
위 @MessageMapping 메서드는 "locate.radars.within" route를 가진 Request-Stream 상호 작용에 응답합니다. 이는 다음 메서드 인자를 사용할 수 있는 유연한 메서드 시그니처를 지원합니다:
| Method Argument | Description |
|---|---|
@Payload | Request의 payload입니다. 이는 Mono 또는 Flux와 같은 비동기 타입의 concrete value일 수 있습니다.<br>Note: Annotation 사용은 optional입니다. Simple 타입이 아니고 다른 지원되는 인자에도 해당하지 않는 메서드 인자는 예상되는 payload로 간주됩니다. |
RSocketRequester | 원격 끝에 request를 만들기 위한 requester입니다. |
@DestinationVariable | 예를 들어 @MessageMapping("find.radar.{id}")와 같이 매핑 패턴의 variable을 기반으로 route에서 추출된 값입니다. |
@Header | MetadataExtractor에 설명된 대로 추출을 위해 등록된 metadata 값입니다. |
@Headers Map<String, Object> | MetadataExtractor에 설명된 대로 추출을 위해 등록된 모든 metadata 값입니다. |
Return value는 response payload로 직렬화할 하나 이상의 객체여야 합니다. 이는 Mono 또는 Flux와 같은 비동기 타입, concrete value, 또는 void나 Mono<Void>와 같은 no-value 비동기 타입일 수 있습니다.
@MessageMapping 메서드가 지원하는 RSocket 상호 작용 타입은 input(즉, @Payload 인자)과 output의 cardinality에서 결정되며, 여기서 cardinality는 다음을 의미합니다:
| Cardinality | Description |
|---|---|
| 1 | 명시적인 값이거나 Mono<T>와 같은 single-value 비동기 타입입니다. |
| Many | Flux<T>와 같은 multi-value 비동기 타입입니다. |
| 0 | Input의 경우 메서드에 @Payload 인자가 없음을 의미합니다.<br>Output의 경우 void 또는 Mono<Void>와 같은 no-value 비동기 타입입니다. |
아래 표는 모든 input 및 output cardinality 조합과 해당 상호 작용 타입을 보여 줍니다:
| Input Cardinality | Output Cardinality | Interaction Types |
|---|---|---|
| 0, 1 | 0 | Fire-and-Forget, Request-Response |
| 0, 1 | 1 | Request-Response |
| 0, 1 | Many | Request-Stream |
| Many | 0, 1, Many | Request-Channel |
@MessageMapping의 대안으로, @RSocketExchange 메서드로 request를 처리할 수도 있습니다. 이러한 메서드는
RSocket Interface에 선언되며, requester로서 RSocketServiceProxyFactory를 통해 사용하거나 responder에 의해 구현될 수 있습니다.
예를 들어 responder로서 request를 처리하려면 다음과 같습니다:
1public interface RadarsService { 2 3 @RSocketExchange("locate.radars.within") 4 Flux<AirportLocation> radars(MapRequest request); 5} 6 7@Controller 8public class RadarsController implements RadarsService { 9 10 public Flux<AirportLocation> radars(MapRequest request) { 11 // ... 12 } 13}
1interface RadarsService { 2 3 @RSocketExchange("locate.radars.within") 4 fun radars(request: MapRequest): Flow<AirportLocation> 5} 6 7@Controller 8class RadarsController : RadarsService { 9 10 override fun radars(request: MapRequest): Flow<AirportLocation> { 11 // ... 12 } 13}
@RSocketExhange와 @MessageMapping 사이에는 전자가 requester와 responder 모두에 적합하게 유지되어야 하기 때문에 몇 가지 차이가 있습니다. 예를 들어 @MessageMapping은 여러 route를 처리하도록 선언될 수 있고 각 route는 패턴이 될 수 있지만, @RSocketExchange는 단일, 구체적인 route로 선언되어야 합니다.
Metadata와 관련된 지원 메서드 파라미터에도 작은 차이가 있습니다. 지원 파라미터 목록은 @MessageMapping 및 RSocket Interface를 참조하십시오.
@RSocketExchange는 특정 RSocket 서비스 인터페이스에 대한 모든 route에 공통 prefix를 지정하기 위해 타입 레벨에서 사용할 수 있습니다.
@ConnectMapping은 RSocket 연결 시작 시 SETUP 프레임과, 이후의 METADATA_PUSH 프레임(즉,
io.rsocket.RSocket의 metadataPush(Payload))을 통한 메타데이터 push 알림을 처리합니다.
@ConnectMapping 메서드는
@MessageMapping과 동일한 인자를 지원하지만, SETUP 및 METADATA_PUSH 프레임의 metadata와 data를 기반으로 합니다. @ConnectMapping은 metadata에 route가 있는 특정 연결만 처리하도록 패턴을 가질 수 있으며, 패턴이 선언되지 않은 경우 모든 연결이 일치합니다.
@ConnectMapping 메서드는 data를 반환할 수 없으며 void 또는 Mono<Void>를 return value로 선언해야 합니다. 새 연결에 대한 handling이 error를 반환하면 연결은 거부됩니다.
Handling은 연결에 대한 RSocketRequester에 request를 하기 위해 지연되어서는 안 됩니다. 자세한 내용은
Server Requester를 참조하십시오.
Responder는 metadata를 해석해야 합니다. Composite metadata는 각자 mime type을 가진(예: routing, security, tracing을 위한) 독립적으로 포맷된 메타데이터 값을 허용합니다. 애플리케이션은 지원할 메타데이터 mime type을 구성하고 추출된 값에 접근할 수 있는 방법이 필요합니다.
MetadataExtractor는 직렬화된 metadata를 받아 디코딩된 name-value 쌍을 반환하는 계약이며, 그런 다음 annotated handler 메서드의 @Header를 통해 이름으로 header처럼 접근할 수 있습니다.
DefaultMetadataExtractor는 metadata를 디코딩하기 위한 Decoder 인스턴스를 받을 수 있습니다. 기본적으로
"message/x.rsocket.routing.v0"에 대한 내장 지원을 가지며, 이를 String으로 디코딩하고 "route" 키 아래에 저장합니다. 다른 mime type의 경우에는 Decoder를 제공하고 mime type을 다음과 같이 등록해야 합니다:
1DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders); 2extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
1import org.springframework.messaging.rsocket.metadataToExtract 2 3val extractor = DefaultMetadataExtractor(metadataDecoders) 4extractor.metadataToExtract<Foo>(fooMimeType, "foo")
Composite metadata는 독립적인 메타데이터 값을 결합하는 데 적합합니다. 그러나 requester가 composite metadata를 지원하지 않을 수도 있고, 사용하지 않기로 선택할 수도 있습니다.
이를 위해 DefaultMetadataExtractor는 디코딩된 값을 output map에 매핑하기 위한 custom 로직이 필요할 수 있습니다. 다음은 metadata에 JSON을 사용하는 예입니다:
1DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders); 2extractor.metadataToExtract( 3 MimeType.valueOf("application/vnd.myapp.metadata+json"), 4 new ParameterizedTypeReference<Map<String,String>>() {}, 5 (jsonMap, outputMap) -> { 6 outputMap.putAll(jsonMap); 7 });
1import org.springframework.messaging.rsocket.metadataToExtract 2 3val extractor = DefaultMetadataExtractor(metadataDecoders) 4extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap -> 5 outputMap.putAll(jsonMap) 6}
RSocketStrategies를 통해 MetadataExtractor를 구성하는 경우, RSocketStrategies.Builder가 구성된 decoder로 extractor를 생성하게 하고, 다음과 같이 callback만 사용해 registration을 커스터마이징 할 수 있습니다:
1RSocketStrategies strategies = RSocketStrategies.builder() 2 .metadataExtractorRegistry(registry -> { 3 registry.metadataToExtract(fooMimeType, Foo.class, "foo"); 4 // ... 5 }) 6 .build();
1import org.springframework.messaging.rsocket.metadataToExtract 2 3val strategies = RSocketStrategies.builder() 4 .metadataExtractorRegistry { registry: MetadataExtractorRegistry -> 5 registry.metadataToExtract<Foo>(fooMimeType, "foo") 6 // ... 7 } 8 .build()
Spring Framework는 @RSocketExchange 메서드를 가진 Java 인터페이스로 RSocket 서비스를 정의할 수 있게 해 줍니다. 이러한 인터페이스를 RSocketServiceProxyFactory에 전달하여
RSocketRequester를 통해 request를 수행하는 프록시를 생성할 수 있습니다. 또한 인터페이스를 구현하여 request를 처리하는 responder로 사용할 수도 있습니다.
먼저 @RSocketExchange 메서드를 가진 인터페이스를 생성합니다:
1interface RadarService { 2 3 @RSocketExchange("radars") 4 Flux<AirportLocation> getRadars(@Payload MapRequest request); 5 6 // more RSocket exchange methods... 7 8}
이제 메서드가 호출될 때 request를 수행하는 프록시를 생성할 수 있습니다:
1RSocketRequester requester = ... ; 2RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build(); 3 4RadarService service = factory.createClient(RadarService.class);
또한 인터페이스를 구현하여 responder로서 request를 처리할 수 있습니다. Annotated Responders를 참조하십시오.
Annotated, RSocket exchange 메서드는 다음 메서드 파라미터를 가진 유연한 메서드 시그니처를 지원합니다:
| Method argument | Description |
|---|---|
@DestinationVariable | @RSocketExchange annotation의 route와 함께 RSocketRequester에 전달되어 route의 템플릿 placeholder를 확장하기 위한 route variable을 추가합니다.<br>이 variable은 String이거나, 이후 toString()을 통해 포맷되는 임의의 객체일 수 있습니다. |
@Payload | Request의 input payload(s)를 설정합니다. 이는 concrete value이거나 ReactiveAdapterRegistry를 통해 Reactive Streams Publisher로 어댑트될 수 있는 value producer일 수 있습니다. required 속성이 false로 설정되었거나 파라미터가<br>MethodParameter#isOptional에 의해 optional로 표시되지 않는 한 payload는 제공되어야 합니다. |
Object, if followed by MimeType | Input payload의 메타데이터 엔트리에 대한 값입니다. 이는 다음 인자가 메타데이터 엔트리 MimeType인 한 임의의 Object일 수 있습니다. 이 값은 concrete value이거나 ReactiveAdapterRegistry를 통해 Reactive Streams Publisher로 어댑트될 수 있는 single value producer일 수 있습니다. |
MimeType | 메타데이터 엔트리에 대한 MimeType입니다. 바로 앞의 메서드 인자는 메타데이터 값이어야 합니다. |
Annotated, RSocket exchange 메서드는 concrete value 또는
ReactiveAdapterRegistry를 통해 Reactive Streams Publisher로 어댑트될 수 있는 value producer return value를 지원합니다.
기본적으로 동기(Blocking) 메서드 시그니처를 가진 RSocket 서비스 메서드의 동작은 underlying RSocket ClientTransport의 response timeout 설정과 RSocket keep-alive 설정에 따라 달라집니다.
RSocketServiceProxyFactory.Builder는 response를 기다리기 위한 최대 시간을 구성할 수 있는 blockTimeout 옵션도 제공하지만, 더 많은 제어를 위해 timeout 값을 RSocket 레벨에서 구성하는 것을 권장합니다.
Testing
Reactive Libraries