반응형

spring-webfluxreactor-core에 의존하고, 그것을 내부적으로 사용하여 비동기 로직을 구성하고 Reactive Streams 지원을 제공한다. 일반적으로 WebFlux API는 Flux, Mono 반환하고 (이들은 내부에서 사용되기 때문에) 입력으로 Reactive Streams Publisher 구현을 관용적으로 받을 수 있다. FluxMono의 사용은 중요하다. 카디널리티 를 표현하는데 도움이 때문이다 - 예를 들어, 하나 또는 여러 개의 비동기 값이 예상되는지 여부, 그것은 결정을 위해 필수적이다(예를 들어, HTTP 메시지를 인코딩 또는 디코딩하는 경우).

어노테이션이 선언된 컨트롤러의 경우 WebFlux는 응용 프로그램에 의해 선택된 리액티브 라이브러리에 투명하게 적응한다. 이는 ReactiveAdapterRegistry의 도움으로 이루어진다. ReactiveAdapterRegistry은 리액티브 라이브러리와 다른 비동기 플러그 가능한 지원을 제공한다. 레지스트리에는 RxJava 2/3, RxJava 1(RxJava Reactive Streams 브리지 공유) 및 CompletableFuture의 지원이 포함되어 있지만, 다른 사용자도 등록할 수 있다.

Spring Framework 5.3 이상, RxJava 1 지원은 폐지되었다.

함수 API ( 함수 엔드 포인트, WebClient 등)의 경우, WebFlux API의 일반 규칙이 적용된다. 반환 값으로서 FluxMono입력, Reactive Streams Publisher. Publisher가 사용자 정의 또는 다른 리액티브 라이브러리에서 제공되는 경우, 알 수 없는 시멘틱스 (0..N)를 가진 스트림으로으로만 다른 수 있다. 다만 시멘틱스를 알고 있는 경우는 Publisher를 그대로 전달하는 대신에 FluxMono.from(Publisher)로 래핑 할 수 있다.

예를 들어, Mono가 아닌 Publisher를 지정하면, Jackson JSON 메시지 라이터는 여러 값을 예상한다. 미디어 유형이 무한 스트림(application/json+stream등)을 의미하는 경우, 값은 개별적으로 기록되고 플래시된다. 그렇지 않으면 값은 목록에 버퍼링된 JSON 배열로 렌더링된다.

반응형
반응형

응답자는 메타 데이터를 해석해야 한다. 복합 메타 데이터는 고유의 MIME 타입을 사용하여 개별적으로 포맷 된 메타 데이터 값(라우팅, 보안, 추적 등)을 사용할 수 있다. 응용 프로그램은 지원하는 메타 데이터 MIME 유형을 구성하는 방법과 추출 된 값에 액세스하는 방법이 필요한다.

MetadataExtractor는 직렬화 된 메타 데이터를 검색하고, 디코딩된 이름 및 값 쌍으로 반환하도록 되어 있다. 명명된 헤더처럼, 예를 들어 어노테이션이 선언된 핸들러 메소드의 @Header를 통해 액세스 할 수 있다.

DefaultMetadataExtractor에 Decoder 인스턴스를 제공하여, 메타 데이터를 디코딩 할 수 있다. 즉시 사용 가능한 "message/x.rsocket.routing.v0" 내장 지원이 있고, String 디코딩하여 "라우트" 키에 저장한다. 그외에 MIME 타입의 경우 Decoder를 제공하여 MIME 유형을 다음과 같이 등록해야 한다.

Java

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");

Kotlin

import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

복합 메타 데이터는 별도의 메타 데이터 값을 결합하는데 적합하다. 다만, 요청자 복합 메타 데이터를 지원하지 않거나, 사용하지 않는 것을 선택하는 경우가 있다. 따라서 DefaultMetadataExtractor는 디코딩된 값을 출력 맵에 매핑하는사용자 지정 논리를 필요로 하는 경우가 있다. JSON이 메타 데이터로 사용되는 예를 보여준다.

Java

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
    MimeType.valueOf("application/vnd.myapp.metadata+json"),
    new ParameterizedTypeReference<Map<String,String>>() {},
    (jsonMap, outputMap) -> {
        outputMap.putAll(jsonMap);
    });

Kotlin

import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
    outputMap.putAll(jsonMap)
}

MetadataExtractor로 부터 RSocketStrategies를 구성하는 경우, RSocketStrategies.Builder에 구성된 디코더를 사용하여 추출 프로그램을 작성하고 콜백을 사용하여 등록을 다음과 같이 정의 할 수 있다.

Java

RSocketStrategies strategies = RSocketStrategies.builder()
    .metadataExtractorRegistry(registry -> {
        registry.metadataToExtract(fooMimeType, Foo.class, "foo");
        // ...
    })
    .build();

Kotlin

import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
        .metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
            registry.metadataToExtract<Foo>(fooMimeType, "foo")
            // ...
        }
        .build()
반응형
반응형

RSocket 응답자는 @MessageMapping@ConnectMapping 메소드로 구현할 수 있다. @MessageMapping 메소드는 개별 요청을 처리하고, @ConnectMapping 메소드가 연결 레벨의 이벤트(설치 및 메타 데이터 푸시)를 처리한다. 어노테이션이 선언된 응답자는 서버 측에서 응답과 클라이언트 측에서 응답을 위해 대칭적으로 지원된다.

5.3.1. 서버 응답자(Server Responders)

서버 측 어노테이션이 선언된 응답자를 사용하려면, RSocketMessageHandler를 Spring 설정에 추가하여 @MessageMapping@ConnectMapping 메소드에서 @Controller Bean을 검색한다.

Java

@Configuration
class ServerConfig {

    @Bean
    fun rsocketMessageHandler() = RSocketMessageHandler().apply {
        routeMatcher = PathPatternRouteMatcher()
    }
}

Kotlin

@Configuration
class ServerConfig {

    @Bean
    fun rsocketMessageHandler() = RSocketMessageHandler().apply {
        routeMatcher = PathPatternRouteMatcher()
    }
}

다음은 Java RSocket API를 사용하여 RSocket 서버를 시작하고 응답자의 RSocketMessageHandler를 다음과 같이 연결한다.

Java

import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
        .bind(TcpServerTransport.create("localhost", 7000))
        .awaitSingle()

Kotlin

import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
        .bind(TcpServerTransport.create("localhost", 7000))
        .awaitSingle()

RSocketMessageHandler는 기본적으로 복합 메타 데이터와 라우팅 메타 데이터를 지원한다. 다른 MIME 타입으로 전환하거나, 추가 메타 데이터 MIME 유형을 등록해야하는 경우는 MetadataExtractor을 설정할 수 있다.

지원하는 메타 데이터 및 데이터 형식에 필요한 EncoderDecoder 인스턴스를 설정해야 한다. 코덱의 구현은 spring-web 모듈이 필요할 수 있다.

기본적으로 SimpleRouteMatcherAntPathMatcher를 통해 경로 매칭에 사용된다. 효율적인 경로 매칭을 위해 spring-web 부터 PathPatternRouteMatcher를 폐쇄하는 것을 권장한다. RSocket 루트는 계층화 할 수 있지만 URL 경로는 없다. 둘 모두 라우트 매처(route matchers)가 "."을 사용하도록 구성되어 있다. 기본적으로 구분자로 사용되며 HTTP URL 같은 URL 디코딩은 하지 않는다.

RSocketMessageHandler은 이 같은 프로세스에서 클라이언트와 서버 사이에서 구성을 공유 할 필요가 있는 경우에 편리한 RSocketStrategies 을 통해 구성 할 수 있다.

Java

@Configuration
static class ServerConfig {

    @Bean
    public RSocketMessageHandler rsocketMessageHandler() {
        RSocketMessageHandler handler = new RSocketMessageHandler();
        handler.setRSocketStrategies(rsocketStrategies());
        return handler;
    }

    @Bean
    public RSocketStrategies rsocketStrategies() {
        return RSocketStrategies.builder()
            .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
            .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
            .routeMatcher(new PathPatternRouteMatcher())
            .build();
    }
}

Kotlin

@Configuration
class ServerConfig {

    @Bean
    fun rsocketMessageHandler() = RSocketMessageHandler().apply {
        rSocketStrategies = rsocketStrategies()
    }

    @Bean
    fun rsocketStrategies() = RSocketStrategies.builder()
            .encoders { it.add(Jackson2CborEncoder()) }
            .decoders { it.add(Jackson2CborDecoder()) }
            .routeMatcher(PathPatternRouteMatcher())
            .build()
}

5.3.2. 클라이언트 응답자

클라이언트 측의 어노테이션이 선언된 응답자는 RSocketRequester.Builder로 구성해야 한다. 자세한 내용은 클라이언트 응답자를 참조해라.

5.3.3. @MessageMapping

서버 또는 클라이언트 응답자의 설정이 완료되면, @MessageMapping 메소드를 다음과 같이 사용할 수 있다.

Java

Kotlin

@Controller
class RadarsController {

    @MessageMapping("locate.radars.within")
    fun radars(request: MapRequest): Flow<AirportLocation> {
        // ...
    }
}

위에 있는 @MessageMapping 메소드는 "locate.radars.within"라는 라우팅 정보를 가진 Request-Stream 상호 작용에 응답한다. 다음의 메소드 인수를 사용하는 옵션을 가진 유연한 메소드 시그너처를 지원하고 있다.

메소드 인수설명
@Payload요청의 페이로드. 이는 MonoFlux 등의 비동기 유형의 구체적인 값이다. 주의 : 어노테이션 사용은 옵션이다. 단순 형식이 아니라 지원되는 다른 인자 중의 하나가 아니면, 메소드 인수는 예상 페이로드로 간주된다.
RSocketRequester원격 종료를 요청하는 요청자.
@DestinationVariable매핑 패턴의 변수에 따라 루트에서 추출된 값. @MessageMapping("find.radar.{id}")
@HeaderMetadataExtractor에서 언급된 추출을 위해 등록된 메타 데이터 값.
@Headers Map<String, Object>MetadataExtractor 따라 추출에 등록된 모든 메타 데이터 값.

반환 값은 응답 페이로드로 직렬화되는 하나 이상의 오브젝트 일 것으로 예상된다. 이는 Mono , Flux와 같은 비동기 타입이거나 구체적인 값 또는 void, Mono<Void>와 같은 값이 없는 비동기 타입 중 하나이다.

@MessageMapping 메소드가 지원하는 RSocket 상호 작용 유형은 입력(즉, @Payload 인수)와 출력 카디널리티에서 결정된다. 카디널리티는 다음을 의미한다.

기수설명
1명시적인 값 또는 Mono<T> 같은 단일 값 비동기 유형 중 하나.
ManyFlux<T> 등의 여러 값 비동기 타입.
0입력의 경우,이 메서드에 @Payload 인수가 없음을 의미한다. 출력의 경우, 이는 void, Mono<Void>와 같은 값이 없는 비동기 유형이다.

다음 표는 모든 입력 및 출력 중요도의 조합과 대응하는 상호 작용 유형을 보여준다.

입력 카디널리티출력 카디널리티상호 작용 유형
0, 10단방향 메시지 요청과 응답
0, 11Request-Response
0, 1ManyRequest-Stream
Many0, 1, ManyRequest-Channel

5.3.4. @ConnectMapping

@ConnectMapping는 RSocket 연결을 시작할 때 SETUP 프레임을 처리하며 이어서 메타 데이터는 METADATA_PUSH 프레임, 즉 io.rsocket.RSocketmetadataPush(Payload)를 통해 알림을 푸시한다.

@ConnectMapping 메소드는 @MessageMapping과 같은 인수를 지원하고 있지만, SETUP, METADATA_PUSH 프레임에서 메타 데이터와 데이터를 기반으로 한다. @ConnectMapping 메타 데이터 경로를 사용하여 지정된 연결에 처리 수정 패턴을 설정할 수 있다. 패턴이 선언되지 않은 경우 모든 연결이 일치한다.

@ConnectMapping 메소드는 데이터를 반환 할 수 없으며 반환으로 void, Mono<Void>를 사용하여 선언해야 한다. 새 연결에 대한 처리가 오류를 반환하면 연결이 거부된다. 연결에 대한 RSocketRequester 요청을 수행하기 위해 처리를 지연되지 않는다. 자세한 내용은 서버 요청자 를 참조해라.

반응형
반응형

RSocketRequester는 RSocket 요청을 실행하기 위한 능숙한 API를 제공하고, 낮은 레벨의 데이터 버퍼가 아닌 데이터와 메타 데이터 객체를 받아들이고 반환한다. 대칭적으로 사용하여 클라이언트에서 요청을 만들거나 서버에서 요청을 만들 수 있다.

5.2.1. 클라이언트 요청자(Client Requester)

클라이언트에서 RSocketRequester을 받으려면 서버에 연결한다. 여기에는 연결 설정을 포함한 RSocket SETUP 프레임의 전송이 포함된다. RSocketRequester는 SETUP 프레임의 연결 설정을 포함한 io.rsocket.core.RSocketConnector의 준비를 위한 빌더를 제공한다.

이는 디폴트로 연결하는 가장 기본적인 방법이다.

Java

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);

Kotlin

val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

위는 즉시 연결되지 않는다. 요청이 이루어지면 공유 연결이 투명하게 확립되어 사용된다.

연결 설정

RSocketRequester.Builder 는 초기 SETUP 프레임을 정의하기 위해 다음을 제공한다.

  • dataMimeType(MimeType) - 연결 상의 데이터의 MIME 타입을 설정한다.
  • metadataMimeType(MimeType) - 연결의 메타 데이터의 MIME 타입을 설정한다.
  • setupData(Object) - SETUP에 포함할 데이터.
  • setupRoute(String, Object…) - SETUP에 포함할 메타 데이터의 루트.
  • setupMetadata(Object, MimeType) - SETUP에 포함하는 다른 메타 데이터.

데이터의 경우는 기본 MIME 타입은 처음 구성된 Decoder에서 파생한다. 메타 데이터의 경우 기본 MIME 유형은 복합 메타 데이터(composite metadata)이며, 요청마다 여러 메타 데이터 값과 MIME 타입의 페어를 허가한다. 일반적으로 모두를 변경할 필요가 없다.

SETUP 프레임의 데이터와 메타 데이터는 옵션이다. 서버 측에서 @ConnectMapping 메소드를 사용하여 연결 시작과 SETUP 프레임의 콘텐츠를 처리 할 수 있다. 메타 데이터는 연결 수준의 보안에 사용할 수 있다.

전략(Strategies)

RSocketRequester.BuilderRSocketStrategies를 수락하여 요청자를 구성한다. 이를 사용하여 데이터와 메타 데이터 값 (de) -serialization 용 인코더 및 디코더를 제공해야 한다. 기본적으로 String, byte[], ByteBufferspring-core의 기본 코덱만 등록된다. spring-web를 추가하면 다음과 같이 등록 할 수있는 기타 기능에 액세스 할 수 있다.

Java

RSocketStrategies strategies = RSocketStrategies.builder()
    .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
    .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
    .build();

RSocketRequester requester = RSocketRequester.builder()
    .rsocketStrategies(strategies)
    .tcp("localhost", 7000);

Kotlin

val strategies = RSocketStrategies.builder()
        .encoders { it.add(Jackson2CborEncoder()) }
        .decoders { it.add(Jackson2CborDecoder()) }
        .build()

val requester = RSocketRequester.builder()
        .rsocketStrategies(strategies)
        .tcp("localhost", 7000)

RSocketStrategies는 재사용을 위해 설계되어 있다. 일부 시나리오에서는 예를 들어 동일한 응용 프로그램의 클라이언트와 서버의 경우, Spring 설정으로 선언하는 것이 좋다.

클라이언트 응답자(Client Responders)

RSocketRequester.Builder를 사용하여 서버에서 요청에 대한 응답자를 구성 할 수 있다.

서버에서 사용되는 것과 동일한 인프라를 기반으로 클라이언트의 응답과 어노테이션 처리기를 사용할 수 있지만, 다음과 같이 프로그램에 등록한다.

Java

RSocketStrategies strategies = RSocketStrategies.builder()
    .routeMatcher(new PathPatternRouteMatcher())    // (1)
    .build();

SocketAcceptor responder =
    RSocketMessageHandler.responder(strategies, new ClientHandler());   // (2)

RSocketRequester requester = RSocketRequester.builder()
    .rsocketConnector(connector -> connector.acceptor(responder))   // (3)
    .tcp("localhost", 7000);

Kotlin

val strategies = RSocketStrategies.builder()
        .routeMatcher(PathPatternRouteMatcher())    // (1)
        .build()

val responder =
    RSocketMessageHandler.responder(strategies, new ClientHandler());   // (2)

val requester = RSocketRequester.builder()
        .rsocketConnector { it.acceptor(responder) }   // (3)
        .tcp("localhost", 7000)
  • (1) spring-web 가 존재하는 경우, 효율적인 경로 매칭을 위해 PathPatternRouteMatcher를 사용한다.
  • (2) @MessageMaping 그리고/또는 @ConnectMapping 메소드를 사용하여 클래스에서 응답자을 만든다.
  • (3) 응답자을 등록한다.

상기 클라이언트 응답자의 프로그램에 의한 등록을 위해 설계된 도구에 지나지 않는다는 점에 유의해라. 클라이언트 응답자가 Spring 설정의 대체 시나리오의 경우 RSocketMessageHandler를 Spring Bean으로 선언하고 다음과 같이 적용 할 수 있다.

Java

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
    .rsocketConnector(connector -> connector.acceptor(handler.responder()))
    .tcp("localhost", 7000);

Kotlin

import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
        .rsocketConnector { it.acceptor(handler.responder()) }
        .tcp("localhost", 7000)

위의 경우 RSocketMessageHandler에서 setHandlerPredicate를 사용하여 클라이언트 응답자를 검출하기 위한 다른 전략으로 전환해야 하는 경우도 있다. @RSocketClientResponder 등의 커스텀 어노테이션과 기본의 @Controller을 기반으로한다. 이것은 클라이언트와 서버 또는 동일한 응용 프로그램에서 여러 클라이언트를 사용하는 시나리오에 필요한다.

프로그래밍 모델에 대한 자세한 내용은 어노테이션이 선언된 응답자를 참조해라.

확장(Advanced)

RSocketRequesterBuilder는 기본이 되는io.rsocket.core.RSocketConnector를 공개하는 콜백을 제공하고, keep-alive 간격(intervals), 세션 재개, 인터셉터 등의 고급 옵션을 제공한다. 다음과 같이 그 레벨에 옵션을 구성 할 수 있다.

Java

RSocketRequester requester = RSocketRequester.builder()
    .rsocketConnector(connector -> {
        // ...
    })
    .tcp("localhost", 7000);

Kotlin

val requester = RSocketRequester.builder()
        .rsocketConnector {
            //...
        }
        .tcp("localhost", 7000)

5.2.2 서버 요청자

서버에서 연결된 클라이언트에 요청을 할 서버에서 연결된 클라이언트 요청자를 얻을 수 있다.

어노테이션이 선언된 응답자@ConnectMapping@MessageMapping 메소드는 RSocketRequester 인수를 지원한다. 이것을 사용하여 연결 요청자에 액세스한다. @ConnectMapping 메서드는 본질적으로 SETUP 프레임 핸들러이며 요청을 시작하기 전에 처리해야 하는 점에 유의해라. 따라서 첫번째 요청은 처리에서 분리해야 한다. 예를 들면 아래와 같다.

Java

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
    requester.route("status").data("5")
        .retrieveFlux(StatusReport.class)
        .subscribe(bar -> {   // (1)
            // ...
        });
    return ...   // (2)
}
  • (1) 처리와 관계없이 요청을 비동기적으로 시작한다.
  • (2) 처리를 실행하고, 완료되면 Mono<Void>를 반환한다.

Kotlin

@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
    GlobalScope.launch {
        requester.route("status").data("5").retrieveFlow<StatusReport>().collect {   // (1)
            // ...
        }
    }
    /// ...   // (2)
}
  • (1) 처리와 관계없이 요청을 비동기적으로 시작한다.
  • (2) 일시 중지 기능으로 처리한다.

5.2.3. Requests

클라이언트 또는 서버의 요청자를 얻으면 다음과 같이 요청을 만들 수 있다.

Java

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within")   // (1)
        .data(viewBox)   // (2)
        .retrieveFlux(AirportLocation.class);   // (3)

Kotlin

val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within")   // (1)
        .data(viewBox)   // (2)
        .retrieveFlow<AirportLocation>()   // (3)
  • (1) 요청 메시지의 메타 데이터에 포함 경로를 지정한다.
  • (2) 요청 메시지의 데이터를 제공한다.
  • (3) 예상되는 응답을 선언한다.

상호 작용 유형은 입력과 출력의 카디널리티에서 암묵적으로 결정된다. 위의 예는 하나의 값이 전송된 값의 스트림이 수신되기 때문에 Request-Stream 이다. 대부분의 경우, 입력과 출력의 선택이 RSocket 상호 작용의 유형과 응답자가 기대하는 입력과 출력의 종류와 일치하는 한 이에 대하여 생각할 필요가 없다. 잘못된 조합의 유일한 예는 "다대일"이다.

data(Object) 메소드는 FluxMono 포함하는 Reactive Streams PublisherReactiveAdapterRegistry에 등록되어 있는 값의 다른 프로듀서도 받아 들이다. 동일한 유형의 값을 생성하는 Flux 등의 여러 값 Publisher의 경우 오버로드된 data 메서드 중 하나를 사용하여 모든 요소 유형 검사와 Encoder 조회를 회피하는 것을 고려해라.

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) 단계는 선택 사항이다. 데이터를 전송하지 요청의 경우는 생략한다.

Java

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
    .retrieveMono(AirportLocation.class);

Kotlin

import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
    .retrieveAndAwait<AirportLocation>()

복합 메타 데이터(composite metadata) (디폴트)을 사용하여 값이 등록된 Encoder으로 지원되는 경우, 추가로 메타 데이터 값을 추가 할 수 있다. 예를 들면 다음과 같다.

Java

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
        .metadata(securityToken, mimeType)
        .data(viewBox)
        .retrieveFlux(AirportLocation.class);

Kotlin

import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
        .metadata(securityToken, mimeType)
        .data(viewBox)
        .retrieveFlow<AirportLocation>()

Fire-and-Forget의 경우 Mono<Void>를 반환하는 send() 메소드를 사용한다. Mono는 메시지가 성공적으로 전송된 것만을 보여주고 처리 된 것을 나타내는 것은 아니다는 것에 유의해라.

Metadata-Push의 경우 Mono<Void>을 반환 값으로 지정하고 sendMetadata() 메소드를 사용한다.

반응형
반응형

RSocket는 TCP, WebSocket, 그외에 다른 바이트 스트림 전송을 통해 다중화된 이중 통신 응용 프로그램 프로토콜이며, 다음의 상호 작용 모델 중 하나를 사용한다.

  • Request-Response - 하나의 메시지를 보내고 하나를 받는다.
  • Request-Stream - 하나의 메시지를 보내고 메시지 스트림을 수신한다.
  • Channel - 메시지 스트림을 양방향으로 전송한다.
  • Fire-and-Forget - 단방향 메시지를 보낸다.

최초 연결이 설정되면, 양쪽이 대칭이 되고, 각 측면이 상기 상호 작용 중 하나를 시작할 수 있기 때문에 '클라이언트'와 '서버'의 구별이 없어진다. 이 프로토콜에 참가 측을 "요청자"와 "응답자"라고 부르고, 위의 상호 작용을 "요청 스트림(request streams)" 또는 단순히 "요청(request)"이라고 부르는 이유이다.

RSocket 프로토콜의 주요 기능 및 장점은 다음과 같다.

  • 네트워크 경계를 넘어선 Reactive Streams 시멘틱스 - Request-StreamChannel와 같은 스트리밍 요청에 대한 네트워크 경계를 횡단하고 역 배압 신호는 요청자와 응답자 사이를 이동하고, 요청이 소스에서 응답자의 속도를 저하시키는 것을 허용하고, 네트워크 계층 혼잡 제어에 대한 의존도 및 네트워크 레벨 또는 모든 레벨에서 버퍼링의 필요성을 줄일 수 있다.

  • 요청 제한(Request throttling) - 이 기능은 LEASE 프레임을 따서 'Leasing'라는 이라고 한다. 이 프레임은 각 엔드에서 전송하여 특정 시간에 다른 엔드에서 허용하는 요청 수를 제한 할 수 있다. 리즈는 정기적으로 업데이트된다.

  • 세션 재개(Session resumption) - 이는 연결이 끊어져도 일부 상태를 유지할 수 있도록 설계되어 있다. 상태 관리는 응용 프로그램에 투명하며, 가능한 경우에 생산자를 중지하고, 필요한 상태의 양을 줄일 수 있도록 역 배압과 함께 잘 작동한다.

  • 큰 메시지 단편화(fragmentation) 및 재조립(re-assemply).

  • Keepalive (heartbeats).

RSocket는 여러 언어로 구현 되어 있다. Java 라이브러리프로젝트 ReactorReactor Netty 위에 구축되어 전송 용이다. 즉, 응용 프로그램 Reactive Streams 게시자의 신호는 RSocket을 통해 네트워크 전체에 투명하게 전달한다.

5.1.1. 프로토콜

RSocket의 장점 중 하나는 네트워크 상에서 잘 정의된 동작과 어떤 프로토콜 확장에 따른 읽기 쉬운 사양을 가지고 있다는 것이다. 언어의 구현체과 높은 수준의 프레임워크 API에 관계없이 사양을 읽는 것이 좋다. 이 섹션에서는 문맥을 확립하기 위한 간결한 개요를 제공한다.

연결

먼저 클라이언트는 TCP와 WebSocket과 같은 낮은 레벨의 스트리밍 전송을 통해 서버에 연결하여 SETUP 프레임을 서버로 전송하고 연결 매개 변수를 설정한다.

서버는 SETUP 프레임을 거부 할 수 있지만, 일반적으로 전송(클라이언트)과 수신(서버)하고 나서 SETUP 이 leasing 시멘틱스를 사용하여 요청의 수를 제한하지 않는 한 요청을 시작할 수 있다. 어느 쪽도 요청을 허용하기 위해 반대쪽에서 LEASE 프레임을 기다려야 한다.

요청하기(Making Requests)

연결이 되면 양쪽에서 프레임 REQUEST_RESPONSE, REQUEST_STREAM, REQUEST_CHANNEL 또는 REQUEST_FNF 중 하나를 통해 요청을 시작할 수 있다. 이러한 각 프레임은 요청자로부터 응답자에 하나의 메시지를 전하고 있다.

다음은 응답자는 응답 메시지와 같이 PAYLOAD 프레임을 반환한다. REQUEST_CHANNEL의 경우 요청자 더 많은 요청 메시지를 포함하는 PAYLOAD 프레임을 전송한다.

요청에 Request-StreamChannel 등의 메시지 스트림이 포함되는 경우, 응답자는 요청자의 요청 신호를 고려해야 한다. 요청 사항은 메시지의 숫자로 표현된다. 초기 수요는 REQUEST_STREAMREQUEST_CHANNEL 프레임으로 지정된다. 후속 요청은 REQUEST_N 프레임을 통해 통보된다.

양쪽 모두 METADATA_PUSH 프레임을 통해 각각의 요청이 아니라, 전체 연결에 대한 메타 데이터 통지도 보낼 수 있다.

메시지 포맷

RSocket 메시지는 데이터와 메타 데이터가 포함된다. 메타 데이터는 루트, 보안 토큰 등을 보낼 수 있다. 데이터와 메타 데이터는 다른 형식으로 할 수 있다. 각각의 MIME 타입은 SETUP 프레임에 선언된 특정 연결의 모든 요청에 적용된다.

모든 메시지에 메타 데이터를 포함 할 수 있지만 일반적으로 루트 등의 메타 데이터는 각 요청이므로 요청의 첫번째 메시지, 즉 프레임 REQUEST_RESPONSE, REQUEST_STREAM, REQUEST_CHANNEL 또는 REQUEST_FNF중 하나에만 포함된다.

프로토콜 확장 기능은 응용 프로그램에서 사용하는 일반적인 메타 데이터 형식을 정의한다.

5.1.2. Java 구현

RSocket의 Java 구현프로젝트 Reactor 위에 구축되어 있다. TCP 및 WebSocket 전송은 Reactor Netty을 기반으로 한다. Reactive Streams 라이브러리로 Reactor는 프로토콜을 구현하는 작업을 단순화한다. 응용 프로그램은 선언 연산자와 투명한 역압력 지원을 갖춘 FluxMono를 사용하는 것이 자연스럽다.

RSocket Java API는 의도적으로 최소화하고 기본적이다. 프로토콜 기능에 초점을 맞추고 있으며, 응용 프로그램 프로그래밍 모델 (RPC codegen vs other 등)을 높은 레벨의 독립적인 관심사만 처리하면 된다.

주요 계약 io.rsocket.RSocket는 단일 메시지의 약속을 나타내는 Mono 메시지 스트림을 나타내는 Flux 및 바이트 버퍼로 데이터와 메타 데이터에 액세스하는 실제 메시지를 나타내는 io.rsocket.Payload를 사용하여 4개의 요청 대화 유형을 모델링한다. RSocket 계약은 대칭적으로 사용된다. 요청의 경우 응용 프로그램은 요청을 실행하는 RSocket가 주어진다. 응답을 위해 응용 프로그램 RSocket 을 구현하여 요청을 처리한다.

이는 철저한 도입을 의도한 것은 아니다. 대부분의 경우, Spring 애플리케이션은 API를 직접 사용할 필요는 없다. 다만 Spring에 의존하지 않는 RSocket을 확인하거나 실험하는 것이 중요할 수도 있다. RSocket Java 저장소에는 API와 프로토콜의 기능을 보여주는 수많은 샘플 응용 프로그램이 포함되어 있다.

5.1.3. Spring 지원

spring-messaging 모듈에는 다음의 것이 포함된다.

  • RSocketRequester  - 데이터 및 메타 데이터의 인코딩/디코딩을 사용하여 io.rsocket.RSocket를 통해 요청을 할 수 있는 능숙한 API이다.
  • 어노테이션이있는 응답자  -  @MessageMapping 응답의 어노테이션 처리기 메소드이다.

spring-web 모듈에는 Jackson CBOR/JSON 등 Encoder와 Decoder 구현 및 RSocket 응용 프로그램이 필요로 할 가능성이 높은 Protobuf가 포함되어 있다. 또한 효율적인 경로 매칭을 위해 플러그인 할 수 PathPatternParser도 포함되어 있다.

Spring Boot 2.2은 TCP 또는 WebSocket을 통한 RSocket 서버의 출시를 지원하고 있다. 여기에는 WebFlux 서버에서 WebSocket을 통해 RSocket을 게시하는 옵션도 포함되어 있다. RSocketRequester.BuilderRSocketStrategies 클라이언트 지원 및 자동 구성도 있다. 자세한 내용은 Spring Boot 참조 RSocket 섹션을 참조해라.

Spring Security 5.2은 RSocket 지원을 제공한다.

Spring Integration 5.2은 RSocket 클라이언트 및 서버와 상호 작용하는 인바운드 및 아웃 바운드 게이트웨이를 제공한다. 자세한 내용은 Spring Integration 참조 설명서를 참조해라.

Spring Cloud Gateway는 RSocket 연결을 지원한다.

반응형

+ Recent posts