Spring Reactive響應式編程-WebFlux編程實戰


springboot2 webflux 響應式編程學習路徑 : https://zhuanlan.zhihu.com/p/36160025

先學習jdk8的lambda表達式和stream流編程,了解函數式編程的知識點和思想,接着學習jdk9的響應式流flux,理解響應式流概念,理解背壓和實現機制。這2者學好之后,很容易理解webflux的基石reactor,再學習webflux就水到渠成了!

Reactive Stream

jdk9的響應式流

就是Reactive Stream,也就是flow。其實和jdk8的stream沒有一點關系。說白了就一個發布-訂閱模式,一共只有4個接口,3個對象,非常簡單清晰
image

什么是背壓?

背壓是指訂閱者能和發布者交互,可以調節發布者發布數據的速率,解決把訂閱者壓垮的問題

我們重點理解背壓在jdk9里面是如何實現的。關鍵在於發布者Publisher的實現SubmissionPublishersubmit方法是block方法。訂閱者會有一個緩沖池,默認為Flow.defaultBufferSize() = 256。當訂閱者的緩沖池滿了之后,發布者調用submit方法發布數據就會被阻塞,發布者就會停(慢)下來;訂閱者消費了數據之后(調用Subscription.request方法),緩沖池有位置了,submit方法就會繼續執行下去,就是通過這樣的機制,實現了調節發布者發布數據的速率,消費得快,生成就快,消費得慢,發布者就會被阻塞,當然就會慢下來了

package jdk9;


import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class FlowDemo {

    public static void main(String[] args) {
        // 1. 定義發布者,發布的數據類型式Integer
        // 直接使用jdk自帶的SubmissionPublisher,它實現了Publisher接口
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        // 2. 定義訂閱者
      Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 保存訂閱關系,需要用它來給發布者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一個數據,處理
                System.out.println("接收到數據:" + item);
                // 處理完調用request再請求一個數據
                this.subscription.request(1);

                // 或者已經達到了目標,調用cancel告訴發布者不再接受數據了
                // this.subscription.cancel()
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理數據的時候產生了異常)
                throwable.printStackTrace();

                // 我們可以告訴發布者,后面不接受數據了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數據處理完了(發布者關閉了)
                System.out.println("處理完了!");
            }
        };

        // 3. 發布者和訂閱者 建立訂閱關系
        publisher.subscribe(subscriber);

        // 4. 生產數據,並發布
        // 這里忽略數據生產過程
        int data = 111;
        publisher.submit(data);
        publisher.submit(222);
        publisher.submit(333);

        // 5. 結束后,關閉發布者
        // 正式環境應該放入finally或者使用 try-resource 確保關閉
        publisher.close();

        // 主線程延遲停止,否則數據沒有消費就退出
        try {
            Thread.currentThread().join(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }
}

自定義 Processer(中間處理器,相當於是發布者的同時又是訂閱者)代碼示例

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * Processor, 需要繼承SubmissionPublisher並實現Processor接口
 *
 * 輸入源數據 integer, 過濾掉小於0的, 然后轉換成字符串發布出去
 */
class MyProcessor extends SubmissionPublisher<String>
        implements Flow.Processor<Integer, String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        // 保存訂閱關系, 需要用它來給發布者響應
        this.subscription = subscription;

        // 請求一個數據
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一個數據, 處理
        System.out.println("處理器接受到數據: " + item);

        // 過濾掉小於0的, 然后發布出去
        if (item > 0) {
            this.submit("轉換后的數據:" + item);
        }

        // 處理完調用request再請求一個數據
        this.subscription.request(1);

        // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出現了異常(例如處理數據的時候產生了異常)
        throwable.printStackTrace();

        // 我們可以告訴發布者, 后面不接受數據了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部數據處理完了(發布者關閉了)
        System.out.println("處理器處理完了!");
        // 關閉發布者
        this.close();
    }
}

public class FlowDemoWithProcessor {

    public static void main(String[] args) throws Exception {
        // 1. 定義發布者, 發布的數據類型是 Integer
        // 直接使用jdk自帶的SubmissionPublisher
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        // 2. 定義處理器, 對數據進行過濾, 並轉換為String類型
        MyProcessor processor = new MyProcessor();

        // 3. 發布者 和 處理器 建立訂閱關系
        publisher.subscribe(processor);

        // 4. 定義最終訂閱者, 消費 String 類型數據
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 保存訂閱關系, 需要用它來給發布者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一個數據, 處理
                System.out.println("接受到數據: " + item);

                // 處理完調用request再請求一個數據
                this.subscription.request(1);

                // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理數據的時候產生了異常)
                throwable.printStackTrace();

                // 我們可以告訴發布者, 后面不接受數據了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數據處理完了(發布者關閉了)
                System.out.println("處理完了!");
            }

        };

        // 5. 處理器 和 最終訂閱者 建立訂閱關系
        processor.subscribe(subscriber);

        // 6. 生產數據, 並發布
        // 這里忽略數據生產過程
        publisher.submit(-111);
        publisher.submit(111);

        // 7. 結束后 關閉發布者
        // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
        publisher.close();

        // 主線程延遲停止, 否則數據沒有消費就退出
        Thread.currentThread().join(1000);
    }

}

運行結果:
image

發布者生產的數據會存儲到默認緩沖池的數組中發送給訂閱者,默認緩沖池是256個長度,當緩沖區滿了而訂閱者還沒來的及處理數據時,發布者就會被block(阻塞)而停止生產數據,直到訂閱者消費完緩沖區中的數據而產生空位時發布者才會重新生成新的數據

Spring WebFlux

初識Spring WebFlux

Spring WebFlux 是 Spring Framework 5.0中引入的新的響應式web框架。與Spring MVC不同,它不需要Servlet API,是完全異步且非阻塞的,並且通過Reactor項目實現了Reactive Streams規范。

官方地址: https://spring.io/reactive
image

架構 說明
spring-webmvc + Servlet + Tomcat 命令式的、同步阻塞的
spring-webflux + Reactor + Netty 響應式的、異步非阻塞的

所謂異步非阻塞是針對服務端而言的,是說服務端可以充分利用CPU資源去做更多事情,這與客戶端無關,客戶端該怎么請求還是怎么請求。

架構 說明
Reactive Streams 用於構建高吞吐量、低延遲應用的規范
Reactor 基於Reactive Streams 規范的實現,它是一個完全非阻塞的基礎,且支持背壓
Spring WebFlux 基於Reactor實現了完全異步非阻塞的一套web框架,是一套響應式堆棧

編寫響應式代碼之前,我們還需要了解2個重要的概念,就是異步servletSSE(server-sent events)

異步servlet

學習異步servlet我們最重要的了解同步servlet阻塞了什么?為什么需要異步servlet?異步servlet能支持高吞吐量的原理是什么?

  • 同步servlet
    servlet容器(如tomcat)里面,每處理一個請求會占用一個線程,同步servlet里面,業務代碼處理多久,servlet容器的線程就會等(阻塞)多久,而servlet容器的線程是由上限的,當請求多了的時候servlet容器線程就會全部用完,就無法再處理請求(這個時候請求可能排隊也可能丟棄,得看如何配置),就會限制了應用的吞吐量!

  • 異步servlet
    servlet容器的線程不會傻等業務代碼處理完畢,而是直接返回(繼續處理其他請求),給業務代碼一個回調函數(asyncContext.complete()),業務代碼處理完了再通知我!這樣就可以使用少量的線程處理更加高的請求,從而實現高吞吐量!

代碼示例:

  • 同步servlet
@WebServlet(name = "SyncServlet", urlPatterns="/SyncServlet")
public class SyncServlet extends HttpServlet {
    protected void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        long t1 = System.currentTimeMillis();
        // 執行業務代碼
        doSomeTing(request, response);

        System.out.println("sync use:" + (System.currentTimeMillis() - t1));
    }

    private void doSomeTing(HttpServletRequest request, HttpServletResponse response) throws IOException {

        // 模擬耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        response.getWriter().append("done");
    }
}
  • 異步servlet
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@WebServlet(name = "AsyncServlet", urlPatterns = "/AsyncServlet", asyncSupported = true)
public class AsyncServlet extends HttpServlet {

    protected void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 1.開啟異步
        AsyncContext asyncContext = request.startAsync();

        // 2.把我們要執行的代碼放到一個獨立的線程中,多線程/線程池
        CompletableFuture.runAsync(() ->
                // 執行業務代碼
        {
            try {
                doSomeTing(asyncContext, asyncContext.getRequest(), asyncContext.getResponse());
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        
        System.out.println("async use:" + (System.currentTimeMillis() - t1));
    }

    private void doSomeTing(AsyncContext asyncContext, ServletRequest request, ServletResponse response) throws IOException {

        // 模擬耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        response.getWriter().append("async done");

        // 3.業務代碼處理完畢,通知結束
        asyncContext.complete();
    }
}

運行上面代碼,業務代碼花了5秒,但servlet容器的線程幾乎沒有任何耗時。而如果是同步servlet的,線程就會傻等5秒,這5秒內這個線程只處理了這一個請求/。

異步servlet在處理耗時任務時會立馬執行完成並且將任務放到另一個線程中去運行,這樣我們的這個servlet主線程就不會被阻塞從而能夠去執行其他的任務

SSE(Server-Sent Events)

響應式流里面,可以多次返回數據(其實和響應式沒有關系),使用的技術就是H5的SSE。我們學習技術,API的使用只是最初級也是最簡單的,更加重要的是需要知其然並知其所以然,否則你只能死記硬背不用就忘!我們不滿足在spring里面能實現sse效果,更加需要知道spring是如何做到的。其實SSE很簡單,我們花一點點時間就可以掌握,我們在純servlet環境里面實現。我們看代碼,這里一個最簡單的示例。

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

@WebServlet(name = "SSE", urlPatterns = "/SSE")
public class SSE extends HttpServlet {

    protected void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        doGet(request, response);
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        for (int i = 0; i < 5; i++) {
            // 指定事件標識
            response.getWriter().write("event:me\n");
            // 格式:data: + 數據 + 2個回車
            response.getWriter().write("data:" + i + "\n\n");
            response.getWriter().flush();

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

關鍵是ContentType 是 "text/event-stream",然后返回的數據有固定的要求格式即可。
如果我們想要在前端接受和使用事件流,可以使用以下方式

<!DOCTYPE html>
<html lang="en">
<head>
	<meta charset="UTF-8">
	<title>Title</title>
</head>
<body>
<script type="text/javascript">
	// 初始化,參數為url
	// 依賴H5
	var sse = new EventSource("SSE")

	// 監聽消息並打印
	sse.onmessage = function (evt) {
	    console.log("message", evt.data, evt)
	}

	// 如果指定了事件標識需要用這種方式來進行監聽事件流
	sse.addEventListener("me", function (evt) {
	    console.log("me event", evt.data)
		// 事件流如果不關閉會自動刷新請求,所以我們需要根據條件手動關閉
		if (evt.data == 3) {
		    sse.close()
		}
	})
</script>
</body>
</html>

使用場景:服務器向客戶端推送數據,例如聊天室

WebFlux完整案例

搭建項目

  1. 添加mongodb-reactive依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
  1. 添加mongodb注解
@SpringBootApplication
@ServletComponentScan("com.javaming.study.webflux.servlet")
// 設置開啟mongodb響應式存儲
@EnableReactiveMongoRepositories
public class SpringWebfluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringWebfluxApplication.class, args);
    }

}
  1. 添加User對象
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Document(collection = "user")
@Data
public class User {
    
    @Id
    private String id;
    
    private String name;
    
    private int age;
}
  1. 新建user的數據庫操作對象UserRepository
import com.javaming.study.webflux.domain.mongo.User;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface UserRepository extends ReactiveMongoRepository<User, String> {

}
  1. 新建Controller
@RestController
@RequestMapping("/user")
public class UserController {

    private final UserRepository userRepository;

    /**
     * 構造函數的方式注入(官方推薦,降低耦合)
     */
    public UserController(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    @GetMapping("/")
    public Flux<User> getAll() {
        return userRepository.findAll();
    }

    /**
     * 推薦新增另一個相同的方法通過流的方式獲取數據
     * @return
     */
    @GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamGetAll() {
        return userRepository.findAll();
    }

}
  1. 安裝和啟動mongodb

RouterFunction模式

webflux的另一種開發模式,和以前的Controller進行對應

  1. HandlerFunction(輸入ServerRequest返回ServerResponse)
@Component
public class UserHandler {

    private final UserRepository userRepository;

    public UserHandler(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    /**
     * 得到所有用戶
     * @param request
     * @return
     */
    public Mono<ServerResponse> getAllUser(ServerRequest request) {
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
                .body(this.userRepository.findAll(), User.class);
    }

    /**
     * 創建用戶
     * @param request
     * @return
     */
    public Mono<ServerResponse> createUser(ServerRequest request) {
        Mono<User> user = request.bodyToMono(User.class);
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
                .body(this.userRepository.saveAll(user), User.class);
    }


    /**
     * 根據id刪除用戶
     * @param request
     * @return
     */
    public Mono<ServerResponse> deleteUserById(ServerRequest request) {

        String id = request.pathVariable("id");
        return this.userRepository.findById(id)
                .flatMap(user -> this.userRepository.delete(user).then(ServerResponse.ok().build()))
                .switchIfEmpty(ServerResponse.notFound().build());
    }
}
  1. 編寫路由類 RouterFunction(請求URL和HandlerFunction對應起來)
@Configuration
public class AllRouters {

    @Bean
    RouterFunction<ServerResponse> userRouter(UserHandler userHandler) {
        return RouterFunctions.nest(
                // 相當於類上面的@RequestMapping("/user")
                RequestPredicates.path("/user"),
                RouterFunctions
                        // 相當於類里面的@GetMapping("/")
                        // 得到所有用戶
                        .route(RequestPredicates.GET("/"),
                                userHandler::getAllUser)
                        // 創建用戶
                        .andRoute(RequestPredicates.POST("/").
                                        and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
                                userHandler::createUser)
                        // 刪除用戶
                        .andRoute(RequestPredicates.DELETE("/{id}"),
                                userHandler::deleteUserById)
        );
    }
}

源碼下載地址:https://gitee.com/javaming/springboot-webflux


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM