• <center id="ye0c0"><u id="ye0c0"></u></center>
  • <optgroup id="ye0c0"><acronym id="ye0c0"></acronym></optgroup>
  • Spring Cloud Stream實戰

      Spring Cloud Stream是一個用于構建消息驅動的微服務應用程序的框架。Spring Cloud Stream構建于Spring Boot之上,用于創建獨立的生產級Spring應用程序,并使用Spring Integration提供與消息代理的連接。也就是說,Spring Cloud Stream是構建于Spring Boot和Spring Integration之上的框架,幫助創建事件驅動或消息驅動的微服務。

    主要模型如圖:

    這里我們使用Kafka作為消息底層設施,原因見:為什么我們從RabbitMQ切換到apache kafka?

    引入Kafka的Stream啟動器:

    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    <version>2.0.1.RELEASE</version>
    </dependency>

    微服務架構遵循“?智能端點和啞管?”原則,端點之間的通信由RabbitMQ或Apache Kafka等消息傳遞中間件方驅動,服務通過這些端點或通道發布領域事件進行通信。

    首先我們定義一個接口,定義輸入和輸出隊列管道:


    public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
    @Input(INPUT)
    SubscribableChannel inboundGreetings();
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
    }

    @Input注釋用來表示輸入的消息隊列,通過該通道接收消息并輸入當前應用;@Output注釋表示一個輸出通道,通過它發布消息出去。@Input和@Output注解可以采取指定的通道名稱(比如這里greetings-in greetings-out")作為參數,如果未提供名稱,則使用注釋的方法名稱。

    在application.yaml或property中具體配置該消息通道到Kafka:

    spring:
    cloud:
    stream:
    kafka:
    binder:
    brokers: localhost:9092
    bindings:
    greetings-in:
    destination: greetings
    contentType: application/json
    greetings-out:
    destination: greetings
    contentType: application/json

    其中greetings-in和greetings-out配置到Kafka具體的主題topic名稱為greetings,序列化類型是json,kafka默認端口在本地9092。

    好了,底層基礎設施準備完成,現在需要將這個設施安裝到我們的應用中。

     

    @EnableBinding(GreetingsStreams.class)
    public class StreamsConfig {
    }

    @EnableBinding將應用配置綁定接口GreetingsStreams中定義的通道INPUT和OUTPUT。

    現在我們的應用和消息基礎設施已經綁定了,可使用@StreamListener到具體方法以接收具體的流處理事件了。


    @Component
    @Slf4j
    public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greetings greetings) {
    log.info("Received greetings: {}", greetings);
    }
    }

    StreamListeners?是消息監聽者處理方法,接收類型的傳入消息Greetings,可以看到框架的核心功能之一:它嘗試自動將傳入的消息有效負載轉換為類型Person。

    上面方法是一個沒有返回結果的void方法,如果有返回結果,必須使用@SendTo注釋指定方法返回的數據的輸出綁定隊列目標output,如以下示例所示;通過


    @Component
    @Slf4j
    public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
    @SendTo(GreetingsStreams.OUTPUT)
    public String handleGreetings(Greetings greetings) {
    log.info("Received greetings: {}", greetings);
    return "Received greetings: {}" + greetings;
    }
    }

     

    Spring cloud stream實現了一個默認的Processor類,類似我們的GreetingsStreams接口,也就是說,可以不用自己做這個接口

    public interface Processor extends Source, Sink {
    }

    public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
    }
    public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
    }

    如果使用默認的Processor通道名稱,注意配置文件里也要配置成相應的通道名。

     

    測試運行

    有了接收方,下面我們實現一個發送方,我們通過調用rest接口發送消息,先看看發送方代碼:


    @Service
    @Slf4j
    public class GreetingsService {
    private final GreetingsStreams greetingsStreams;

    public GreetingsService(GreetingsStreams greetingsStreams) {
    this.greetingsStreams = greetingsStreams;
    }

    public void sendGreeting(final Greetings greetings) {
    log.info("Sending greetings {}", greetings);
    MessageChannel messageChannel = greetingsStreams.outboundGreetings();
    messageChannel.send(MessageBuilder
    .withPayload(greetings)
    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
    .build());
    }
    }

    我們暴露一個端口來調用這個發送方:


    @RestController
    public class GreetingsController {
    private final GreetingsService greetingsService;
    public GreetingsController(GreetingsService greetingsService) {
    this.greetingsService = greetingsService;
    }
    @GetMapping("/greetings")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void greetings(@RequestParam("message") String message) {
    Greetings greetings = Greetings.builder()
    .message(message)
    .timestamp(System.currentTimeMillis())
    .build();
    greetingsService.sendGreeting(greetings);
    }

    }

    也就是說:這個發送方REST和發送服務 與我們的GreetingsListener是通過消息系統通訊的,不是直接在發送服務里調用GreetingsListener的方法,這樣這兩者之間就解耦了。

    下面我們用postman調用:

    http://localhost:8080/greetings?message=hello

    控制臺結果輸出:

    c.e.c.GreetingsService : Sending greetings Greetings(timestamp=1535614400754, message=hello)

    c.e.c.GreetingsListener : Received greetings: Greetings(timestamp=1535614400754, message=hello)

    一個發送和一個接受完成了一個請求調用,如果GreetingsListener還有返回結果,是放在greetings-out之中的,那么GreetingsListener就變成發送方了,我們也可以參考這套做法再做個監聽器。

     

    源碼下載

    使用Spring Request-Reply實現基于Kafka的同步請求響應

    Spring Cloud專題

    Kafka專題

    美女漫画大全