Reactor Netty(三)


Reactor Netty Reference Guide

1. About the Documentation

本節簡要概述了Reactor Netty參考文檔。您無需線性閱讀本指南。盡管每個零件經常引用其他零件,但它們各自獨立。

1.1. Latest Version and Copyright Notice

Reactor Netty參考指南可作為HTML文檔獲得。最新的副本位於https://projectreactor.io/docs/netty/release/reference/index.html

本文檔的副本可以供您自己使用,也可以分發給他人,但前提是您不對此類副本收取任何費用,並且還應確保每份副本均包含本版權聲明(無論是印刷版本還是電子版本)。

1.2. Contributing to the Documentation

參考指南是用Asciidoc編寫的,您可以在以下位置找到其參考資料 https://github.com/reactor/reactor-netty/tree/master/docs/asciidoc.

如果您有改進,我們將很高興收到您的拉動請求!

我們建議您簽出存儲庫的本地副本,以便可以通過使用asciidoctor Gradle任務並檢查渲染來生成文檔。有些部分依賴於包含的文件,因此GitHub渲染並不總是完整的。

1.3. Getting Help

有幾種方法可以聯系Reactor Netty尋求幫助。您可以:

  • 與社區保持聯系 Gitter.
  • 在react-netty上的stackoverflow.com上提問。
  • 報告Github問題中的錯誤。存儲庫如下:reactor-netty

2. Getting Started

本節包含的信息應有助於您使用Reactor Netty。它包含以下信息:

2.1. Introducing Reactor Netty

適用於微服務架構,Reactor Netty為HTTP(包括Websockets),TCP和UDP提供了支持背壓的網絡引擎。

2.2. Prerequisites

Reactor Netty在Java 8及更高版本上運行。

它具有以下傳遞依賴項:

  • Reactive Streams v1.0.3
  • Reactor Core v3.x
  • Netty v4.1.x

2.3. 了解BOM和版本控制方案

Reactor Netty是Project Reactor BOM的一部分(因為鋁釋放鏈)。盡管這些工件中可能存在不同的版本控制方案,但該精選列表將旨在良好協作的工件分組,提供了相關版本。

工件遵循MAJOR.MINOR.PATCH-QUALIFIER的版本控制方案,而BOM使用受CalVer啟發的YYYY.MINOR.PATCH-QUALIFIER的方案進行版本控制,其中:

  • MAJOR是當前的Reactor一代,每個新一代的Reactor都可以對項目結構帶來根本性的改變(這可能意味着需要進行更大的移植工作)
  • YYYY是給定發行周期中首次發布GA的年份(如1.0.x的1.0.0)
  • .MINOR是從0開始的數字,每個新發行周期遞增
    • 就項目而言,它通常反映了更廣泛的變化,並且可以表明進行了適度的遷移工作
    • 在BOM表的情況下,如果兩個在同一年首次發布,則可以區分發布周期
  • .PATCH是基於0的數字,隨每個服務版本遞增
  • -QUALIFIER是文本限定符,在GA版本中會省略(請參見下文)

因此,遵循該約定的第一個發行周期是2020.0.x,代號Europium。該方案按順序使用以下限定符(注意使用破折號):

  • -M1..-M9: 里程碑(我們預計每個服務版本不會超過9個)
  • -RC1..-RC9: 候選版本(我們預計每個服務版本不會超過9個)
  • -SNAPSHOT: 快照
  • 對於GA版本沒有限定符

每個發布周期都被賦予一個代號,與以前的基於代號的方案相一致,可用於更非正式地引用它(例如在討論,博客文章等中)。代號代表傳統上的MAJOR.MINOR號。它們(大多數)來自元素周期表,按字母順序遞增。

2.4. Getting Reactor Netty

如前所述,在核心中使用Reactor Netty的最簡單方法是使用BOM,並將相關的依賴項添加到項目中。請注意,添加此類依賴項時,必須省略版本,以便從BOM表中提取該版本。

但是,如果您要強制使用特定工件的版本,則可以像通常那樣在添加依賴項時指定它。您也可以完全放棄BOM表,並通過工件版本指定依賴關系。

2.4.1. Maven Installation

BOM概念由Maven原生支持。首先,您需要通過將以下代碼段添加到pom.xml來導入BOM。如果您的pom中已經存在頂部(dependencyManagement),則僅添加內容。

<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>Dysprosium-SR10</version> 
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

接下來,照常將依賴項添加到相關的reactor項目中(不用使用<version>)。以下清單顯示了如何執行此操作:

<dependencies>
    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty-core</artifactId> 
    </dependency>
</dependencies>
<dependencies>
    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty-http</artifactId>
    </dependency>
</dependencies>

2.4.2. Gradle Installation

從版本5開始,Gradle支持BOM表概念。以下清單顯示了如何導入BOM表並向Reactor Netty添加依賴項:

dependencies {
    // import a BOM
    implementation platform('io.projectreactor:reactor-bom:Dysprosium-SR10') 

    // define dependencies without versions
    implementation 'io.projectreactor.netty:reactor-netty-core' 
    implementation 'io.projectreactor.netty:reactor-netty-http'
}

2.4.3. Milestones and Snapshots

里程碑和開發人員預覽是通過Spring Milestones存儲庫而不是Maven Central分發的。要將其添加到您的構建配置文件中,請使用以下代碼段:

<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones Repository</name>
        <url>https://repo.spring.io/milestone</url>
    </repository>
</repositories>

對於Gradle,請使用以下代碼段:

repositories {
  maven { url 'https://repo.spring.io/milestone' }
  mavenCentral()
}

同樣,快照也可在單獨的專用存儲庫中使用(適用於Maven和Gradle):

-SNAPSHOTs in Maven

<repositories>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshot Repository</name>
        <url>https://repo.spring.io/snapshot</url>
    </repository>
</repositories>

-SNAPSHOTs in Gradle

repositories {
  maven { url 'https://repo.spring.io/snapshot' }
  mavenCentral()
}

3. TCP Server

Reactor Netty提供了易於使用和配置的TcpServer。它隱藏了創建TCP服務器所需的大多數Netty功能,並增加了Reactive Streams背壓。

3.1. Starting and Stopping

要啟動TCP服務器,必須創建並配置TcpServer實例。默認情況下,主機配置為使用任何本地地址,並且在調用綁定操作時,系統會選擇一個臨時端口。以下示例顯示如何創建和配置TcpServer實例:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/create/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()   
                         .bindNow(); 

        server.onDispose()
              .block();
    }
}

返回的DisposableServer提供了一個簡單的服務器API,包括disposeNow(),它以阻塞方式關閉服務器。

3.1.1. Host and Port

要在特定的主機和端口上提供服務,可以將以下配置應用於TCP服務器:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/address/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .host("localhost") 
                         .port(8080)        
                         .bindNow();

        server.onDispose()
              .block();
    }
}

3.2. Writing Data

為了將數據發送到連接的客戶端,必須附加一個I / O處理程序。 I / O處理程序有權訪問NettyOutbound以便寫入數據。以下示例顯示了如何附加I / O處理程序:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/send/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello"))) 
                         .bindNow();

        server.onDispose()
              .block();
    }
}

3.3. Consuming Data

為了從連接的客戶端接收數據,必須附加I / O處理程序。 I / O處理程序可以訪問NettyInbound以便讀取數據。以下示例顯示了如何使用它:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/read/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .handle((inbound, outbound) -> inbound.receive().then()) 
                         .bindNow();

        server.onDispose()
              .block();
    }
}

3.4. Lifecycle Callbacks

提供以下生命周期回調,以便您擴展TCP服務器:

  • doOnBind:當服務器通道即將綁定時調用。
  • doOnBound: 綁定服務器通道時調用。
  • doOnConnection: 連接遠程客戶端時調用
  • doOnUnbound: 服務器通道未綁定時調用。

以下示例使用doOnConnection回調:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/lifecycle/Application.java

import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.util.concurrent.TimeUnit;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .doOnConnection(conn ->
                             conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) 
                         .bindNow();

        server.onDispose()
              .block();
    }
}

3.5. TCP-level Configurations

本節描述了可以在TCP級別上使用的三種配置:

3.5.1. Setting Channel Options

默認情況下,TCP服務器配置有以下選項:

./../../reactor-netty-core/src/main/java/reactor/netty/tcp/TcpServerBind.java

TcpServerBind() {
    Map<ChannelOption<?>, Boolean> childOptions = new HashMap<>(2);
    childOptions.put(ChannelOption.AUTO_READ, false);
    childOptions.put(ChannelOption.TCP_NODELAY, true);
    this.config = new TcpServerConfig(
            Collections.singletonMap(ChannelOption.SO_REUSEADDR, true),
            childOptions,
            () -> new InetSocketAddress(DEFAULT_PORT));
}

如果需要其他選項,或者需要更改當前選項,則可以應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/channeloptions/Application.java

import io.netty.channel.ChannelOption;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                         .bindNow();

        server.onDispose()
              .block();
    }
}

您可以在以下鏈接中找到有關Netty頻道選項的更多信息:

3.5.2. Using a Wire Logger

當需要檢查對等點之間的流量時,Reactor Netty提供有線記錄。默認情況下,禁用有線日志記錄。要啟用它,必須將logger的react.netty.tcp.TcpServer級別設置為DEBUG並應用以下配置;

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/wiretap/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .wiretap(true) 
                         .bindNow();

        server.onDispose()
              .block();
    }

3.5.3. Using an Event Loop Group

默認情況下,TCP服務器使用“事件循環組”,其中工作線程數等於初始化時可用於運行時的處理器數(但最小值為4)。需要其他配置時,可以使用LoopResource#create方法之一。

事件循環組的默認配置如下:

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

/**
 * Default worker thread count, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
 * Default selector thread count, fallback to -1 (no selector thread)
 */
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
 * Default worker thread count for UDP, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
 * Default quiet period that guarantees that the disposal of the underlying LoopResources
 * will not happen, fallback to 2 seconds.
 */
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
 */
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

/**
 * Default value whether the native transport (epoll, kqueue) will be preferred,
 * fallback it will be preferred when available
 */

如果需要更改這些設置,則可以應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/eventloop/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

        DisposableServer server =
                TcpServer.create()
                         .runOn(loop)
                         .bindNow();

        server.onDispose()
              .block();
    }
}

3.6. SSL and TLS

當需要SSL或TLS時,可以應用下清單中顯示的配置。默認情況下,如果OpenSSL可用,則將SslProvider.OPENSSL提供程序用作提供程序。否則,將使用SslProvider.JDK。可以通過SslContextBuilder或通過設置-Dio.netty.handler.ssl.noOpenSsl = true來切換提供程序。

以下示例使用SslContextBuilder:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/security/Application.java

import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.io.File;

public class Application {

    public static void main(String[] args) {
        File cert = new File("certificate.crt");
        File key = new File("private.key");

        SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);

        DisposableServer server =
                TcpServer.create()
                         .secure(spec -> spec.sslContext(sslContextBuilder))
                         .bindNow();

        server.onDispose()
              .block();
    }
}

3.6.1. Server Name Indication

您可以配置具有映射到特定域的多個SslContext的TCP服務器。配置SNI映射時,可以使用確切的域名或包含通配符的域名。

以下示例使用包含通配符的域名:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/sni/Application.java

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.io.File;

public class Application {

    public static void main(String[] args) throws Exception {
        File defaultCert = new File("default_certificate.crt");
        File defaultKey = new File("default_private.key");

        File testDomainCert = new File("default_certificate.crt");
        File testDomainKey = new File("default_private.key");

        SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();
        SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();

        DisposableServer server =
                TcpServer.create()
                         .secure(spec -> spec.sslContext(defaultSslContext)
                                             .addSniMapping("*.test.com",
                                                     testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))
                         .bindNow();

        server.onDispose()
              .block();
    }
}

3.7. Metrics

TCP服務器支持與Micrometer的內置集成。它使用前綴Reactor.netty.tcp.server公開所有度量。

下表提供了有關TCP服務器指標的信息:

metric name type description
reactor.netty.tcp.server.data.received DistributionSummary 接收到的數據量,以字節為單位
reactor.netty.tcp.server.data.sent DistributionSummary 發送的數據量(以字節為單位)
reactor.netty.tcp.server.errors Counter 發生的錯誤數
reactor.netty.tcp.server.tls.handshake.time Timer TLS握手花費的時間

這些其他指標也可用:

ByteBufAllocator metrics

metric name type description
reactor.netty.bytebuf.allocator.used.heap.memory Gauge 堆內存的字節數
reactor.netty.bytebuf.allocator.used.direct.memory Gauge 直接存儲器的字節數
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge The number of heap arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge The number of direct arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge The number of thread local caches (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge The size of the tiny cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge The size of the small cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge The size of the normal cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge The chunk size for an arena (when PooledByteBufAllocator)

以下示例啟用了該集成:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/metrics/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .metrics(true) 
                         .bindNow();

        server.onDispose()
              .block();
    }
}

當與非Micrometer的系統集成時需要TCP服務器度量標准,或者要與Micrometer集成時,可以提供自己的度量記錄器,如下所示:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/metrics/custom/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.tcp.TcpServer;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .metrics(true, CustomChannelMetricsRecorder::new) 
                         .bindNow();

        server.onDispose()
              .block();
    }

3.8. Unix Domain Sockets

使用本地傳輸時,TCP服務器支持Unix域套接字(UDS)。

以下示例顯示了如何使用UDS支持:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/uds/Application.java

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) 
                         .bindNow();

        server.onDispose()
              .block();
    }
}

4. TCP Client

Reactor Netty提供了易於使用和易於配置的TcpClient。它隱藏了創建TCP客戶端所需的大多數Netty功能,並增加了Reactive Streams背壓。

4.1. Connect and Disconnect

要將TCP客戶端連接到給定的端點,必須創建並配置TcpClient實例。默認情況下,主機是localhost,端口是12012。以下示例顯示如何創建TcpClient:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/create/Application.java

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()      
                         .connectNow(); 

        connection.onDispose()
                  .block();
    }
}

返回的Connection提供了一個簡單的連接API,包括disposeNow(),該API以阻塞的方式關閉了客戶端。

4.1.1. Host and Port

要連接到特定的主機和端口,可以將以下配置應用於TCP客戶端。以下示例顯示了如何執行此操作:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/address/Application.java

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()
                         .host("example.com") 
                         .port(80)            
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.2. Writing Data

要將數據發送到給定的端點,必須附加一個I / O處理程序。 I / O處理程序有權訪問NettyOutbound以便寫入數據。

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/send/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()
                         .host("example.com")
                         .port(80)
                         .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello"))) 
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.3. Consuming Data

要從給定的端點接收數據,必須附加一個I / O處理程序。 I / O處理程序可以訪問NettyInbound以便讀取數據。以下示例顯示了如何執行此操作:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/read/Application.java

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()
                         .host("example.com")
                         .port(80)
                         .handle((inbound, outbound) -> inbound.receive().then()) 
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.4. Lifecycle Callbacks

提供以下生命周期回調,以便您擴展TCP客戶端。

  • doOnConnect: 在通道即將連接時調用。
  • doOnConnected: 連接通道后調用。
  • doOnDisconnected: 斷開通道后,調用此按鈕。

以下示例使用doOnConnected回調:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/lifecycle/Application.java

import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import java.util.concurrent.TimeUnit;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()
                         .host("example.com")
                         .port(80)
                         .doOnConnected(conn ->
                             conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) 
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.5. TCP-level Configurations

本節描述了可以在TCP級別上使用的三種配置:

4.5.1. Channel Options

默認情況下,TCP客戶端配置有以下選項:

./../../reactor-netty-core/src/main/java/reactor/netty/tcp/TcpClientConnect.java

TcpClientConnect(ConnectionProvider provider) {
    this.config = new TcpClientConfig(
            provider,
            Collections.singletonMap(ChannelOption.AUTO_READ, false),
            () -> AddressUtils.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));
}

如果需要其他選項,或者需要更改當前選項,則可以應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/channeloptions/Application.java

import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()
                         .host("example.com")
                         .port(80)
                         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

您可以在以下鏈接中找到有關Netty頻道選項的更多信息:

4.5.2. Wire Logger

當需要檢查對等點之間的流量時,Reactor Netty提供有線記錄。默認情況下,禁用有線日志記錄。要啟用它,必須將記錄器react.netty.tcp.TcpClient級別設置為DEBUG並應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/wiretap/Application.java

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()
                         .wiretap(true) 
                         .host("example.com")
                         .port(80)
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.5.3. Event Loop Group

默認情況下,TCP客戶端使用“事件循環組”,其中工作線程數等於初始化時可用於運行時的處理器數(但最小值為4)。需要其他配置時,可以使用LoopResource#create方法之一。

以下清單顯示了事件循環組的默認配置:

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

/**
 * Default worker thread count, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
 * Default selector thread count, fallback to -1 (no selector thread)
 */
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
 * Default worker thread count for UDP, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
 * Default quiet period that guarantees that the disposal of the underlying LoopResources
 * will not happen, fallback to 2 seconds.
 */
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
 */
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

/**
 * Default value whether the native transport (epoll, kqueue) will be preferred,
 * fallback it will be preferred when available
 */

如果需要更改這些設置,則可以應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/eventloop/Application.java

import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

        Connection connection =
                TcpClient.create()
                         .host("example.com")
                         .port(80)
                         .runOn(loop)
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.6. Connection Pool

默認情況下,TCP客戶端使用“固定”連接池,其中最大通道數為500,最大注冊請求數為1000,以保留在掛起隊列中(對於其余配置,請檢查系統屬性下面)。這意味着如果有人嘗試獲取頻道但池中沒有頻道,則實現會創建一個新頻道。當達到池中通道的最大數目時,新的獲取通道的嘗試將延遲,直到再次將通道返回到池中為止。

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

/**
 * Default max connections. Fallback to
 * available number of processors (but with a minimum value of 16)
 */
public static final String POOL_MAX_CONNECTIONS = "reactor.netty.pool.maxConnections";
/**
 * Default acquisition timeout (milliseconds) before error. If -1 will never wait to
 * acquire before opening a new
 * connection in an unbounded fashion. Fallback 45 seconds
 */
public static final String POOL_ACQUIRE_TIMEOUT = "reactor.netty.pool.acquireTimeout";
/**
 * Default max idle time, fallback - max idle time is not specified.
 */
public static final String POOL_MAX_IDLE_TIME = "reactor.netty.pool.maxIdleTime";
/**
 * Default max life time, fallback - max life time is not specified.
 */
public static final String POOL_MAX_LIFE_TIME = "reactor.netty.pool.maxLifeTime";
/**
 * Default leasing strategy (fifo, lifo), fallback to fifo.
 * <ul>
 *     <li>fifo - The connection selection is first in, first out</li>
 *     <li>lifo - The connection selection is last in, first out</li>
 * </ul>
 */

如果需要禁用連接池,則可以應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/pool/Application.java

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.newConnection()
                         .host("example.com")
                         .port(80)
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

如果需要為連接池中的通道指定空閑時間,則可以應用以下配置

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/pool/config/Application.java

import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        ConnectionProvider provider =
                ConnectionProvider.builder("fixed")
                                  .maxConnections(50)
                                  .pendingAcquireTimeout(Duration.ofMillis(30000))
                                  .maxIdleTime(Duration.ofMillis(60))
                                  .build();

        Connection connection =
                TcpClient.create(provider)
                         .host("example.com")
                         .port(80)
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.6.1. Metrics

池中的ConnectionProvider支持與Micrometer的內置集成。它使用前綴react.netty.connection.provider公開所有度量。

Pooled ConnectionProvider metrics

metric name type description
reactor.netty.connection.provider.total.connections Gauge The number of all connections, active or idle
reactor.netty.connection.provider.active.connections Gauge The number of the connections that have been successfully acquired and are in active use
reactor.netty.connection.provider.idle.connections Gauge The number of the idle connections
reactor.netty.connection.provider.pending.connections Gauge The number of requests that are waiting for a connection

以下示例啟用了該集成:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/pool/metrics/Application.java

import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        ConnectionProvider provider =
                ConnectionProvider.builder("fixed")
                                  .maxConnections(50)
                                  .metrics(true) 
                                  .build();

        Connection connection =
                TcpClient.create(provider)
                         .host("example.com")
                         .port(80)
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.7. SSL and TLS

當需要SSL或TLS時,可以應用以下配置。默認情況下,如果OpenSSL可用,則將SslProvider.OPENSSL提供程序用作提供程序。否則,提供程序為SslProvider.JDK。您可以通過使用SslContextBuilder或通過設置-Dio.netty.handler.ssl.noOpenSsl = true來切換提供程序。

以下示例使用SslContextBuilder:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/security/Application.java

import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();

        Connection connection =
                TcpClient.create()
                         .host("example.com")
                         .port(443)
                         .secure(spec -> spec.sslContext(sslContextBuilder))
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.7.1. Server Name Indication

默認情況下,TCP客戶端將遠程主機名作為SNI服務器名發送。當需要更改此默認設置時,可以按以下方式配置TCP客戶端:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/sni/Application.java

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

import javax.net.ssl.SNIHostName;

public class Application {

    public static void main(String[] args) throws Exception {
        SslContext sslContext = SslContextBuilder.forClient().build();

        Connection connection =
                TcpClient.create()
                         .host("127.0.0.1")
                         .port(8080)
                         .secure(spec -> spec.sslContext(sslContext)
                                             .serverNames(new SNIHostName("test.com")))
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.8. Proxy Support

TCP客戶端支持Netty提供的代理功能,並提供了一種通過ProxyProvider構建器指定“非代理主機”的方法。以下示例使用ProxyProvider:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/proxy/Application.java

import reactor.netty.Connection;
import reactor.netty.transport.ProxyProvider;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()
                         .host("example.com")
                         .port(80)
                         .proxy(spec -> spec.type(ProxyProvider.Proxy.SOCKS4)
                                            .host("proxy")
                                            .port(8080)
                                            .nonProxyHosts("localhost"))
                        .connectNow();

        connection.onDispose()
                  .block();
    }
}

4.9. Metrics

TCP客戶端支持與Micrometer的內置集成。它使用前綴react.netty.tcp.client公開所有度量。

下表提供了有關TCP客戶端指標的信息:

metric name type description
reactor.netty.tcp.client.data.received DistributionSummary Amount of the data received, in bytes
reactor.netty.tcp.client.data.sent DistributionSummary Amount of the data sent, in bytes
reactor.netty.tcp.client.errors Counter Number of errors that occurred
reactor.netty.tcp.client.tls.handshake.time Timer Time spent for TLS handshake
reactor.netty.tcp.client.connect.time Timer Time spent for connecting to the remote address
reactor.netty.tcp.client.address.resolver Timer Time spent for resolving the address

這些其他指標也可用:

Pooled ConnectionProvider metrics

metric name type description
reactor.netty.connection.provider.total.connections Gauge The number of all connections, active or idle
reactor.netty.connection.provider.active.connections Gauge The number of the connections that have been successfully acquired and are in active use
reactor.netty.connection.provider.idle.connections Gauge The number of the idle connections
reactor.netty.connection.provider.pending.connections Gauge The number of requests that are waiting for a connection

ByteBufAllocator metrics

metric name type description
reactor.netty.bytebuf.allocator.used.heap.memory Gauge The number of the bytes of the heap memory
reactor.netty.bytebuf.allocator.used.direct.memory Gauge The number of the bytes of the direct memory
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge The number of heap arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge The number of direct arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge The number of thread local caches (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge The size of the tiny cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge The size of the small cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge The size of the normal cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge The chunk size for an arena (when PooledByteBufAllocator)

以下示例啟用了該集成:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/metrics/Application.java

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()
                         .host("example.com")
                         .port(80)
                         .metrics(true) 
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

當需要TCP客戶端度量標准來與Micrometer以外的系統集成時,或者您想要提供自己與Micrometer的集成時,可以提供自己的度量記錄器,如下所示:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/metrics/custom/Application.java

import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.tcp.TcpClient;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()
                         .host("example.com")
                         .port(80)
                         .metrics(true, CustomChannelMetricsRecorder::new) 
                         .connectNow();

        connection.onDispose()
                  .block();
    }
Enables TCP client metrics and provides ChannelMetricsRecorder implementation.

4.10. Unix Domain Sockets

使用本地傳輸時,TCP客戶端支持Unix域套接字(UDS)。

以下示例顯示了如何使用UDS支持:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/uds/Application.java

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                TcpClient.create()
                         .remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock")) 
                         .connectNow();

        connection.onDispose()
                  .block();
    }
}

5. HTTP Server

Reactor Netty提供了易於使用和易於配置的HttpServer類。它隱藏了創建HTTP服務器所需的大多數Netty功能,並增加了Reactive Streams背壓。

5.1. Starting and Stopping

要啟動HTTP服務器,您必須創建並配置HttpServer實例。默認情況下,主機配置為使用任何本地地址,並且在調用綁定操作時,系統會選擇一個臨時端口。以下示例顯示如何創建HttpServer實例:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/create/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()   
                          .bindNow(); 

        server.onDispose()
              .block();
    }
}

返回的DisposableServer提供了一個簡單的服務器API,包括disposeNow(),它以阻塞方式關閉服務器。

5.1.1. Host and Port

要在特定的主機和端口上提供服務,可以將以下配置應用於HTTP服務器:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/address/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .host("localhost") 
                          .port(8080)        
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.2. Routing HTTP

定義HTTP服務器的路由需要配置提供的HttpServerRoutes構建器。以下示例顯示了如何執行此操作:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/routing/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .route(routes ->
                              routes.get("/hello",        
                                        (request, response) -> response.sendString(Mono.just("Hello World!")))
                                    .post("/echo",        
                                        (request, response) -> response.send(request.receive().retain()))
                                    .get("/path/{param}", 
                                        (request, response) -> response.sendString(Mono.just(request.param("param"))))
                                    .ws("/ws",            
                                        (wsInbound, wsOutbound) -> wsOutbound.send(wsInbound.receive().retain())))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.2.1. SSE

以下代碼顯示了如何配置HTTP服務器以服務於服務器發送的事件:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/sse/Application.java

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.function.BiFunction;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .route(routes -> routes.get("/sse", serveSse()))
                          .bindNow();

        server.onDispose()
              .block();
    }

    /**
     * Prepares SSE response
     * The "Content-Type" is "text/event-stream"
     * The flushing strategy is "flush after every element" emitted by the provided Publisher
     */
    private static BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> serveSse() {
        Flux<Long> flux = Flux.interval(Duration.ofSeconds(10));
        return (request, response) ->
                response.sse()
                        .send(flux.map(Application::toByteBuf), b -> true);
    }

    /**
     * Transforms the Object to ByteBuf following the expected SSE format.
     */
    private static ByteBuf toByteBuf(Object any) {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try {
            out.write("data: ".getBytes(Charset.defaultCharset()));
            MAPPER.writeValue(out, any);
            out.write("\n\n".getBytes(Charset.defaultCharset()));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return ByteBufAllocator.DEFAULT
                               .buffer()
                               .writeBytes(out.toByteArray());
    }

    private static final ObjectMapper MAPPER = new ObjectMapper();
}

5.2.2. Static Resources

以下代碼顯示了如何配置HTTP服務器以提供靜態資源:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/staticresources/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;

public class Application {

    public static void main(String[] args) throws URISyntaxException {
        Path file = Paths.get(Application.class.getResource("/logback.xml").toURI());
        DisposableServer server =
                HttpServer.create()
                          .route(routes -> routes.file("/index.html", file))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.3. Writing Data

要將數據發送到已連接的客戶端,必須使用handle(...)或route(...)附加I / O處理程序。 I / O處理程序有權訪問HttpServerResponse,以便能夠寫入數據。以下示例使用handle(…)方法:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/send/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .handle((request, response) -> response.sendString(Mono.just("hello"))) 
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.3.1. Adding Headers and Other Metadata

將數據發送到連接的客戶端時,可能需要發送其他標頭,cookie,狀態代碼和其他元數據。您可以使用HttpServerResponse提供此附加元數據。以下示例顯示了如何執行此操作:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/send/headers/Application.java

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .route(routes ->
                              routes.get("/hello",
                                  (request, response) ->
                                      response.status(HttpResponseStatus.OK)
                                              .header(HttpHeaderNames.CONTENT_LENGTH, "12")
                                              .sendString(Mono.just("Hello World!"))))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.3.2. Compression

您可以配置HTTP服務器以發送壓縮的響應,具體取決於請求標頭Accept-Encoding。

Reactor Netty提供了三種用於壓縮傳出數據的策略:

  • compress(boolean):根據提供的布爾值,啟用壓縮(true)還是禁用壓縮(false)。
  • compress(int):一旦響應大小超過給定值(以字節為單位),就執行壓縮。
  • compress(BiPredicate ):如果謂詞返回true,則執行壓縮。

以下示例使用compress方法(設置為true)啟用壓縮:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/compression/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;

public class Application {

    public static void main(String[] args) throws URISyntaxException {
        Path file = Paths.get(Application.class.getResource("/logback.xml").toURI());
        DisposableServer server =
                HttpServer.create()
                          .compress(true)
                          .route(routes -> routes.file("/index.html", file))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.4. Consuming Data

要從已連接的客戶端接收數據,必須使用handle(...)或route(...)附加I / O處理程序。 I / O處理程序有權訪問HttpServerRequest,以便能夠讀取數據。

以下示例使用handle(…)方法:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/read/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .handle((request, response) -> request.receive().then()) 
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.4.1. Reading Headers, URI Params, and other Metadata

從連接的客戶端接收數據時,可能需要檢查請求標頭,參數和其他元數據。您可以使用HttpServerRequest獲得此其他元數據。以下示例顯示了如何執行此操作:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/read/headers/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .route(routes ->
                              routes.get("/{param}",
                                  (request, response) -> {
                                      if (request.requestHeaders().contains("Some-Header")) {
                                          return response.sendString(Mono.just(request.param("param")));
                                      }
                                      return response.sendNotFound();
                                  }))
                          .bindNow();

        server.onDispose()
              .block();
    }
}
Obtaining the Remote (Client) Address

除了可以從請求中獲取的元數據之外,您還可以接收主機(服務器)地址,遠程(客戶端)地址和方案。根據選擇的工廠方法,您可以直接從通道或使用Forwarded或X-Forwarded- * HTTP請求標頭檢索信息。以下示例顯示了如何執行此操作:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/clientaddress/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .forwarded(true) 
                          .route(routes ->
                              routes.get("/clientip",
                                  (request, response) ->
                                      response.sendString(Mono.just(request.remoteAddress() 
                                                                           .getHostString()))))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

還可以自定義Forwarded或X-Forwarded- *標頭處理程序的行為。以下示例顯示了如何執行此操作:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/clientaddress/CustomForwardedHeaderHandlerApplication.java

import java.net.InetSocketAddress;

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.transport.AddressUtils;

public class CustomForwardedHeaderHandlerApplication {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .forwarded((connectionInfo, request) -> {  
                              String hostHeader = request.headers().get("X-Forwarded-Host");
                              if (hostHeader != null) {
                                  String[] hosts = hostHeader.split(",", 2);
                                  InetSocketAddress hostAddress = AddressUtils.createUnresolved(
                                      hosts[hosts.length - 1].trim(),
                                      connectionInfo.getHostAddress().getPort());
                                  connectionInfo = connectionInfo.withHostAddress(hostAddress);
                              }
                              return connectionInfo;
                          })
                          .route(routes ->
                              routes.get("/clientip",
                                  (request, response) ->
                                      response.sendString(Mono.just(request.remoteAddress() 
                                                                           .getHostString()))))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.4.2. HTTP Request Decoder

默認情況下,Netty為傳入請求配置一些限制,例如:

  • 初始行的最大長度。
  • 請求頭的最大長度。
  • 內容或每個塊的最大長度。

有關更多信息,請參見HttpRequestDecoder和HttpServerUpgradeHandler。

默認情況下,HTTP服務器配置有以下設置:

./../../reactor-netty-http/src/main/java/reactor/netty/http/HttpDecoderSpec.java

public static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096;
public static final int DEFAULT_MAX_HEADER_SIZE         = 8192;
public static final int DEFAULT_MAX_CHUNK_SIZE          = 8192;
public static final boolean DEFAULT_VALIDATE_HEADERS    = true;
public static final int DEFAULT_INITIAL_BUFFER_SIZE     = 128;

./../../reactor-netty-http/src/main/java/reactor/netty/http/server/HttpRequestDecoderSpec.java

/**
 * The maximum length of the content of the HTTP/2.0 clear-text upgrade request.
 * By default the server will reject an upgrade request with non-empty content,
 * because the upgrade request is most likely a GET request.
 */
public static final int DEFAULT_H2C_MAX_CONTENT_LENGTH = 0;

當需要更改這些默認設置時,可以按以下方式配置HTTP服務器:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/requestdecoder/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .httpRequestDecoder(spec -> spec.maxHeaderSize(16384)) 
                          .handle((request, response) -> response.sendString(Mono.just("hello")))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.5. TCP-level Configuration

當需要在TCP級別上更改配置時,可以使用以下代碼段擴展默認的TCP服務器配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/channeloptions/Application.java

import io.netty.channel.ChannelOption;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                          .bindNow();

        server.onDispose()
              .block();
    }
}

有關TCP級別配置的更多詳細信息,請參見TCP Server。

5.5.1. Wire Logger

當您需要檢查對等方之間的流量時,Reactor Netty提供了線路日志記錄。默認情況下,禁用有線日志記錄。要啟用它,必須將記錄器react.netty.http.server.HttpServer級別設置為DEBUG並應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/wiretap/Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .wiretap(true) 
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.6. SSL and TLS

當需要SSL或TLS時,可以應用下一個示例中顯示的配置。默認情況下,如果OpenSSL可用,則將SslProvider.OPENSSL提供程序用作提供程序。否則,將使用SslProvider.JDK。您可以通過使用SslContextBuilder或通過設置-Dio.netty.handler.ssl.noOpenSsl = true來切換提供程序。

以下示例使用SslContextBuilder:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/security/Application.java

import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import java.io.File;

public class Application {

    public static void main(String[] args) {
        File cert = new File("certificate.crt");
        File key = new File("private.key");

        SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);

        DisposableServer server =
                HttpServer.create()
                          .secure(spec -> spec.sslContext(sslContextBuilder))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.6.1. Server Name Indication

您可以為HTTP服務器配置多個映射到特定域的SslContext。配置SNI映射時,可以使用確切的域名或包含通配符的域名。

以下示例使用包含通配符的域名:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/sni/Application.java

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.io.File;

public class Application {

    public static void main(String[] args) throws Exception {
        File defaultCert = new File("default_certificate.crt");
        File defaultKey = new File("default_private.key");

        File testDomainCert = new File("default_certificate.crt");
        File testDomainKey = new File("default_private.key");

        SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();
        SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();

        DisposableServer server =
                HttpServer.create()
                          .secure(spec -> spec.sslContext(defaultSslContext)
                                              .addSniMapping("*.test.com",
                                                      testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

5.7. HTTP Access Log

當前的日志支持僅提供“通用日志格式”。

您可以使用-Dreactor.netty.http.server.accessLogEnabled = true啟用HTTP訪問日志。默認情況下,它是禁用的。

您可以使用以下配置(用於Logback或類似的日志記錄框架)來擁有單獨的HTTP訪問日志文件:

<appender name="accessLog" class="ch.qos.logback.core.FileAppender">
    <file>access_log.log</file>
    <encoder>
        <pattern>%msg%n</pattern>
    </encoder>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="accessLog" />
</appender>

<logger name="reactor.netty.http.server.AccessLog" level="INFO" additivity="false">
    <appender-ref ref="async"/>
</logger>

5.8. HTTP/2

默認情況下,HTTP服務器支持HTTP / 1.1。如果需要HTTP / 2,則可以通過配置獲取它。除了協議配置之外,如果您需要H2而不是H2C(明文),則還必須配置SSL。

以下清單提供了一個簡單的H2示例:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/http2/H2Application.java

import io.netty.handler.ssl.SslContextBuilder;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;
import java.io.File;

public class H2Application {

    public static void main(String[] args) {
        File cert = new File("certificate.crt");
        File key = new File("private.key");

        SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);

        DisposableServer server =
                HttpServer.create()
                          .port(8080)
                          .protocol(HttpProtocol.H2)                          
                          .secure(spec -> spec.sslContext(sslContextBuilder)) 
                          .handle((request, response) -> response.sendString(Mono.just("hello")))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

現在,應用程序的行為應如下所示:

$ curl --http2 https://localhost:8080 -i
HTTP/2 200

hello

以下清單提供了一個簡單的H2C示例:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/http2/H2CApplication.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;

public class H2CApplication {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .port(8080)
                          .protocol(HttpProtocol.H2C)
                          .handle((request, response) -> response.sendString(Mono.just("hello")))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

現在,應用程序的行為應如下所示:

$ curl --http2-prior-knowledge http://localhost:8080 -i
HTTP/2 200

hello

5.8.1. Protocol Selection

./../../reactor-netty-http/src/main/java/reactor/netty/http/HttpProtocol.java

public enum HttpProtocol {

    /**
     * The default supported HTTP protocol by HttpServer and HttpClient
     */
    HTTP11,

    /**
     * HTTP/2.0 support with TLS
     * <p>If used along with HTTP/1.1 protocol, HTTP/2.0 will be the preferred protocol.
     * While negotiating the application level protocol, HTTP/2.0 or HTTP/1.1 can be chosen.
     * <p>If used without HTTP/1.1 protocol, HTTP/2.0 will always be offered as a protocol
     * for communication with no fallback to HTTP/1.1.
     */
    H2,

    /**
     * HTTP/2.0 support with clear-text.
     * <p>If used along with HTTP/1.1 protocol, will support H2C "upgrade":
     * Request or consume requests as HTTP/1.1 first, looking for HTTP/2.0 headers
     * and {@literal Connection: Upgrade}. A server will typically reply a successful
     * 101 status if upgrade is successful or a fallback HTTP/1.1 response. When
     * successful the client will start sending HTTP/2.0 traffic.
     * <p>If used without HTTP/1.1 protocol, will support H2C "prior-knowledge": Doesn't
     * require {@literal Connection: Upgrade} handshake between a client and server but
     * fallback to HTTP/1.1 will not be supported.
     */
    H2C
}

5.9. Metrics

HTTP服務器支持與Micrometer的內置集成。它使用前綴react..netty.http.server公開所有度量。

metric name type description
reactor.netty.http.server.data.received DistributionSummary Amount of the data received, in bytes
reactor.netty.http.server.data.sent DistributionSummary Amount of the data sent, in bytes
reactor.netty.http.server.errors Counter Number of errors that occurred
reactor.netty.http.server.data.received.time Timer Time spent in consuming incoming data
reactor.netty.http.server.data.sent.time Timer Time spent in sending outgoing data
reactor.netty.http.server.response.time Timer Total time for the request/response

These additional metrics are also available:

ByteBufAllocator metrics

metric name type description
reactor.netty.bytebuf.allocator.used.heap.memory Gauge The number of the bytes of the heap memory
reactor.netty.bytebuf.allocator.used.direct.memory Gauge The number of the bytes of the direct memory
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge The number of heap arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge The number of direct arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge The number of thread local caches (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge The size of the tiny cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge The size of the small cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge The size of the normal cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge The chunk size for an arena (when PooledByteBufAllocator)

以下示例啟用了該集成:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/metrics/Application.java

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.config.MeterFilter;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        Metrics.globalRegistry 
               .config()
               .meterFilter(MeterFilter.maximumAllowableTags("reactor.netty.http.server", "URI", 100, MeterFilter.deny()));

        DisposableServer server =
                HttpServer.create()
                          .metrics(true, s -> {
                              if (s.startsWith("/stream/")) { 
                                  return "/stream/{n}";
                              }
                              else if (s.startsWith("/bytes/")) {
                                  return "/bytes/{n}";
                              }
                              return s;
                          }) 
                          .route(r ->
                              r.get("/stream/{n}",
                                   (req, res) -> res.sendString(Mono.just(req.param("n"))))
                               .get("/bytes/{n}",
                                   (req, res) -> res.sendString(Mono.just(req.param("n")))))
                          .bindNow();

        server.onDispose()
              .block();
    }
}

當需要與Micrometer以外的系統集成時需要HTTP服務器指標,或者您想與Micrometer提供自己的集成時,可以提供自己的指標記錄器,如下所示:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/metrics/custom/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.http.server.HttpServer;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .metrics(true, CustomHttpServerMetricsRecorder::new) 
                          .route(r ->
                              r.get("/stream/{n}",
                                   (req, res) -> res.sendString(Mono.just(req.param("n"))))
                               .get("/bytes/{n}",
                                   (req, res) -> res.sendString(Mono.just(req.param("n")))))
                          .bindNow();

        server.onDispose()
              .block();
    }

5.10. Unix Domain Sockets

使用本機傳輸時,HTTP服務器支持Unix域套接字(UDS)。

以下示例顯示了如何使用UDS支持:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/uds/Application.java

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) 
                          .bindNow();

        server.onDispose()
              .block();
    }
}

6. HTTP Client

Reactor Netty提供了易於使用和易於配置的HttpClient。它隱藏了創建HTTP客戶端所需的大多數Netty功能,並增加了Reactive Streams背壓。

6.1. Connect

要將HTTP客戶端連接到給定的HTTP端點,必須創建並配置HttpClient實例。以下示例顯示了如何執行此操作:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/connect/Application.java

import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client = HttpClient.create();  

        client.get()                      
              .uri("http://example.com/") 
              .response()                 
              .block();
    }
}

以下示例使用WebSocket:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/websocket/Application.java

import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Flux;
import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client = HttpClient.create();

        client.websocket()
              .uri("wss://echo.websocket.org")
              .handle((inbound, outbound) -> {
                  inbound.receive()
                         .asString()
                         .take(1)
                         .subscribe(System.out::println);

                  final byte[] msgBytes = "hello".getBytes(CharsetUtil.ISO_8859_1);
                  return outbound.send(Flux.just(Unpooled.wrappedBuffer(msgBytes), Unpooled.wrappedBuffer(msgBytes)))
                                 .neverComplete();
              })
              .blockLast();
    }
}

6.1.1. Host and Port

為了連接到特定的主機和端口,可以將以下配置應用於HTTP客戶端:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/address/Application.java

import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .host("example.com") 
                          .port(80);           

        client.get()
              .uri("/")
              .response()
              .block();
    }
}

6.2. Writing Data

要將數據發送到給定的HTTP端點,可以使用send(Publisher)方法提供一個Publisher。默認情況下,Transfer-Encoding:chunked適用於預期請求正文的那些HTTP方法。通過請求標頭提供的Content-Length會禁用Transfer-Encoding:分塊(如有必要)。以下示例發送hello:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/send/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client = HttpClient.create();

        client.post()
              .uri("http://example.com/")
              .send(ByteBufFlux.fromString(Mono.just("hello"))) 
              .response()
              .block();
    }
}

6.2.1. Adding Headers and Other Metadata

將數據發送到給定的HTTP端點時,您可能需要發送其他標頭,cookie和其他元數據。您可以使用以下配置來這樣做:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/send/headers/Application.java

import io.netty.handler.codec.http.HttpHeaderNames;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .headers(h -> h.set(HttpHeaderNames.CONTENT_LENGTH, 5)); 

        client.post()
              .uri("http://example.com/")
              .send(ByteBufFlux.fromString(Mono.just("hello")))
              .response()
              .block();
    }
}
Compression

您可以在HTTP客戶端上啟用壓縮,這意味着將請求標頭Accept-Encoding添加到了請求標頭中。以下示例顯示了如何執行此操作:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/compression/Application.java

import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .compress(true);

        client.get()
              .uri("http://example.com/")
              .response()
              .block();
    }
}
Auto-Redirect Support

您可以配置HTTP客戶端以啟用自動重定向支持。

Reactor Netty提供了兩種不同的自動重定向支持策略:

  • followRedirect(boolean):指定是否為狀態301 | 302 | 307 | 308啟用HTTP自動重定向支持。
  • followRedirect(BiPredicate ):如果提供的謂詞匹配,則啟用自動重定向支持。

以下示例使用followRedirect(true):

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/redirect/Application.java

import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .followRedirect(true);

        client.get()
              .uri("http://example.com/")
              .response()
              .block();
    }
}

6.3. Consuming Data

要從給定的HTTP端點接收數據,可以使用HttpClient.ResponseReceiver中的一種方法。以下示例使用responseContent方法:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/read/Application.java

import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client = HttpClient.create();

        client.get()
              .uri("http://example.com/")
              .responseContent() 
              .aggregate()       
              .asString()        
              .block();
    }
}

6.3.1. Reading Headers and Other Metadata

從給定的HTTP端點接收數據時,您可以檢查響應標頭,狀態代碼和其他元數據。您可以使用HttpClientResponse獲得此其他元數據。以下示例顯示了如何執行此操作。

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/read/status/Application.java

import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client = HttpClient.create();

        client.get()
              .uri("http://example.com/")
              .responseSingle((resp, bytes) -> {
                  System.out.println(resp.status()); 
                  return bytes.asString();
              })
              .block();
    }
}

6.3.2. HTTP Response Decoder

默認情況下,Netty為傳入的響應配置一些限制,例如:

  • 初始行的最大長度。
  • 請求頭的最大長度。
  • 內容或每個塊的最大長度。

有關更多信息,請參見HttpResponseDecoder。

默認情況下,HTTP客戶端配置有以下設置:

./../../reactor-netty-http/src/main/java/reactor/netty/http/HttpDecoderSpec.java

public static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096;
public static final int DEFAULT_MAX_HEADER_SIZE         = 8192;
public static final int DEFAULT_MAX_CHUNK_SIZE          = 8192;
public static final boolean DEFAULT_VALIDATE_HEADERS    = true;
public static final int DEFAULT_INITIAL_BUFFER_SIZE     = 128;

./../../reactor-netty-http/src/main/java/reactor/netty/http/client/HttpResponseDecoderSpec.java

public static final boolean DEFAULT_FAIL_ON_MISSING_RESPONSE         = false;
public static final boolean DEFAULT_PARSE_HTTP_AFTER_CONNECT_REQUEST = false;

/**
 * The maximum length of the content of the HTTP/2.0 clear-text upgrade request.
 * By default the client will allow an upgrade request with up to 65536 as
 * the maximum length of the aggregated content.
 */
public static final int DEFAULT_H2C_MAX_CONTENT_LENGTH = 65536;

當需要更改這些默認設置時,可以按以下方式配置HTTP客戶端:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/responsedecoder/Application.java

import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .httpResponseDecoder(spec -> spec.maxHeaderSize(16384)); 

        client.get()
              .uri("http://example.com/")
              .responseContent()
              .aggregate()
              .asString()
              .block();
    }
}

6.4. TCP-level Configuration

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/channeloptions/Application.java

import io.netty.channel.ChannelOption;
import reactor.netty.http.client.HttpClient;
import java.net.InetSocketAddress;

public class Application {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .bindAddress(() -> new InetSocketAddress("host", 1234))
                          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);

        String response =
                client.get()
                      .uri("http://example.com/")
                      .responseContent()
                      .aggregate()
                      .asString()
                      .block();

        System.out.println("Response " + response);
    }
}

6.4.1. Wire Logger

當需要檢查對等點之間的流量時,Reactor Netty提供有線記錄。默認情況下,禁用有線日志記錄。要啟用它,必須將記錄器react.netty.http.client.HttpClient級別設置為DEBUG並應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/wiretap/Application.java

import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .wiretap(true); 

        client.get()
              .uri("http://example.com/")
              .response()
              .block();
    }
}

6.5. SSL and TLS

當需要SSL或TLS時,可以應用下一個示例中顯示的配置。默認情況下,如果OpenSSL可用,則將SslProvider.OPENSSL提供程序用作提供程序。否則,將使用SslProvider.JDK提供程序。您可以通過使用SslContextBuilder或通過設置-Dio.netty.handler.ssl.noOpenSsl = true來切換提供程序。以下示例使用SslContextBuilder:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/security/Application.java

import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();

        HttpClient client =
                HttpClient.create()
                          .secure(spec -> spec.sslContext(sslContextBuilder));

        client.get()
              .uri("https://example.com/")
              .response()
              .block();
    }
}

6.5.1. Server Name Indication

默認情況下,HTTP客戶端將遠程主機名作為SNI服務器名發送。當需要更改此默認設置時,可以按以下方式配置HTTP客戶端:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/sni/Application.java

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.http.client.HttpClient;

import javax.net.ssl.SNIHostName;

public class Application {

    public static void main(String[] args) throws Exception {
        SslContext sslContext = SslContextBuilder.forClient().build();

        HttpClient client =
                HttpClient.create()
                          .secure(spec -> spec.sslContext(sslContext)
                                              .serverNames(new SNIHostName("test.com")));

        client.get()
              .uri("https://127.0.0.1:8080/")
              .response()
              .block();
    }
}

6.6. Retry Strategies

默認情況下,如果HTTP客戶端在TCP級別中止,則HTTP客戶端將重試該請求一次。

6.7. HTTP/2

默認情況下,HTTP客戶端支持HTTP / 1.1。如果需要HTTP / 2,則可以通過配置獲取它。除了協議配置之外,如果您需要H2而不是H2C(明文),則還必須配置SSL。

以下清單提供了一個簡單的“ H2”示例:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/http2/H2Application.java

import io.netty.handler.codec.http.HttpHeaders;
import reactor.core.publisher.Mono;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
import reactor.util.function.Tuple2;

public class H2Application {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .protocol(HttpProtocol.H2) 
                          .secure();                 

        Tuple2<String, HttpHeaders> response =
                client.get()
                      .uri("https://example.com/")
                      .responseSingle((res, bytes) -> bytes.asString()
                                                           .zipWith(Mono.just(res.responseHeaders())))
                      .block();

        System.out.println("Used stream ID: " + response.getT2().get("x-http2-stream-id"));
        System.out.println("Response: " + response.getT1());
    }
}

以下清單展示了一個簡單的“ H2C”示例:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/http2/H2CApplication.java

import io.netty.handler.codec.http.HttpHeaders;
import reactor.core.publisher.Mono;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
import reactor.util.function.Tuple2;

public class H2CApplication {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .protocol(HttpProtocol.H2C);

        Tuple2<String, HttpHeaders> response =
                client.get()
                      .uri("http://localhost:8080/")
                      .responseSingle((res, bytes) -> bytes.asString()
                                                           .zipWith(Mono.just(res.responseHeaders())))
                      .block();

        System.out.println("Used stream ID: " + response.getT2().get("x-http2-stream-id"));
        System.out.println("Response: " + response.getT1());
    }
}

6.7.1. Protocol Selection

./../../reactor-netty-http/src/main/java/reactor/netty/http/HttpProtocol.java

public enum HttpProtocol {

    /**
     * The default supported HTTP protocol by HttpServer and HttpClient
     */
    HTTP11,

    /**
     * HTTP/2.0 support with TLS
     * <p>If used along with HTTP/1.1 protocol, HTTP/2.0 will be the preferred protocol.
     * While negotiating the application level protocol, HTTP/2.0 or HTTP/1.1 can be chosen.
     * <p>If used without HTTP/1.1 protocol, HTTP/2.0 will always be offered as a protocol
     * for communication with no fallback to HTTP/1.1.
     */
    H2,

    /**
     * HTTP/2.0 support with clear-text.
     * <p>If used along with HTTP/1.1 protocol, will support H2C "upgrade":
     * Request or consume requests as HTTP/1.1 first, looking for HTTP/2.0 headers
     * and {@literal Connection: Upgrade}. A server will typically reply a successful
     * 101 status if upgrade is successful or a fallback HTTP/1.1 response. When
     * successful the client will start sending HTTP/2.0 traffic.
     * <p>If used without HTTP/1.1 protocol, will support H2C "prior-knowledge": Doesn't
     * require {@literal Connection: Upgrade} handshake between a client and server but
     * fallback to HTTP/1.1 will not be supported.
     */
    H2C
}

6.8. Metrics

HTTP客戶端支持與Micrometer的內置集成。它使用前綴react.netty.http.client公開所有度量。

metric name type description
reactor.netty.http.client.data.received DistributionSummary Amount of the data received, in bytes
reactor.netty.http.client.data.sent DistributionSummary Amount of the data sent, in bytes
reactor.netty.http.client.errors Counter Number of errors that occurred
reactor.netty.http.client.tls.handshake.time Timer Time spent for TLS handshake
reactor.netty.http.client.connect.time Timer Time spent for connecting to the remote address
reactor.netty.http.client.address.resolver Timer Time spent for resolving the address
reactor.netty.http.client.data.received.time Timer Time spent in consuming incoming data
reactor.netty.http.client.data.sent.time Timer Time spent in sending outgoing data
reactor.netty.http.client.response.time Timer Total time for the request/response

These additional metrics are also available:

Pooled ConnectionProvider metrics

metric name type description
reactor.netty.connection.provider.total.connections Gauge The number of all connections, active or idle
reactor.netty.connection.provider.active.connections Gauge The number of the connections that have been successfully acquired and are in active use
reactor.netty.connection.provider.idle.connections Gauge The number of the idle connections
reactor.netty.connection.provider.pending.connections Gauge The number of requests that are waiting for a connection

ByteBufAllocator metrics

metric name type description
reactor.netty.bytebuf.allocator.used.heap.memory Gauge The number of the bytes of the heap memory
reactor.netty.bytebuf.allocator.used.direct.memory Gauge The number of the bytes of the direct memory
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge The number of heap arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge The number of direct arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge The number of thread local caches (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge The size of the tiny cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge The size of the small cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge The size of the normal cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge The chunk size for an arena (when PooledByteBufAllocator)

以下示例啟用了該集成:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/metrics/Application.java

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.config.MeterFilter;
import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        Metrics.globalRegistry 
               .config()
               .meterFilter(MeterFilter.maximumAllowableTags("reactor.netty.http.client", "URI", 100, MeterFilter.deny()));

        HttpClient client =
                HttpClient.create()
                          .metrics(true, s -> {
                              if (s.startsWith("/stream/")) { 
                                  return "/stream/{n}";
                              }
                              else if (s.startsWith("/bytes/")) {
                                  return "/bytes/{n}";
                              }
                              return s;
                          }); 

        client.get()
              .uri("http://httpbin.org/stream/2")
              .responseContent()
              .blockLast();

        client.get()
              .uri("http://httpbin.org/bytes/1024")
              .responseContent()
              .blockLast();
    }
}

如果要與Micrometer以外的系統進行集成時需要HTTP客戶端指標,或者您想與自己的Micrometer提供集成,則可以提供自己的指標記錄器,如下所示:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/metrics/custom/Application.java

import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.http.client.HttpClient;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .metrics(true, CustomHttpClientMetricsRecorder::new); 

        client.get()
              .uri("https://httpbin.org/stream/2")
              .response()
              .block();
    }

6.9. Unix Domain Sockets

使用本地傳輸時,HTTP客戶端支持Unix域套接字(UDS)。

以下示例顯示了如何使用UDS支持:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/uds/Application.java

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        HttpClient client =
                HttpClient.create()
                          .remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock")); 

        client.get()
              .uri("/")
              .response()
              .block();
    }
}

7. UDP Server

Reactor Netty提供了易於使用和易於配置的UdpServer。它隱藏了創建UDP服務器所需的大多數Netty功能,並增加了Reactive Streams背壓。

7.1. Starting and Stopping

要啟動UDP服務器,必須創建和配置UdpServer實例。默認情況下,主機配置為localhost,端口配置為12012。以下示例顯示如何創建和啟動UDP服務器:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/create/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection server =
                UdpServer.create()                         
                         .bindNow(Duration.ofSeconds(30)); 

        server.onDispose()
              .block();
    }
}

返回的Connection提供了一個簡單的服務器API,包括disposeNow(),該API以阻塞的方式關閉了服務器。

7.1.1. Host and Port

為了在特定主機和端口上提供服務,可以將以下配置應用於UDP服務器:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/address/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection server =
                UdpServer.create()
                         .host("localhost") 
                         .port(8080)        
                         .bindNow(Duration.ofSeconds(30));

        server.onDispose()
              .block();
    }
}

7.2. Writing Data

要將數據發送到遠程對等方,必須附加I / O處理程序。 I / O處理程序有權訪問UdpOutbound,以便能夠寫入數據。以下示例顯示如何發送hello:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/send/Application.java

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;

import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection server =
                UdpServer.create()
                         .handle((in, out) ->
                             out.sendObject(
                                 in.receiveObject()
                                   .map(o -> {
                                       if (o instanceof DatagramPacket) {
                                           DatagramPacket p = (DatagramPacket) o;
                                           ByteBuf buf = Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8);
                                           return new DatagramPacket(buf, p.sender()); 
                                       }
                                       else {
                                           return Mono.error(new Exception("Unexpected type of the message: " + o));
                                       }
                                   })))
                         .bindNow(Duration.ofSeconds(30));

        server.onDispose()
              .block();
    }
}

7.3. Consuming Data

要從遠程對等方接收數據,必須附加一個I / O處理程序。 I / O處理程序有權訪問UdpInbound,以便能夠讀取數據。以下示例顯示了如何使用數據:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/read/Application.java

import io.netty.channel.socket.DatagramPacket;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;

import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection server =
                UdpServer.create()
                         .handle((in, out) ->
                             out.sendObject(
                                 in.receiveObject()
                                   .map(o -> {
                                       if (o instanceof DatagramPacket) {
                                           DatagramPacket p = (DatagramPacket) o;
                                           return new DatagramPacket(p.content().retain(), p.sender()); 
                                       }
                                       else {
                                           return Mono.error(new Exception("Unexpected type of the message: " + o));
                                       }
                                   })))
                         .bindNow(Duration.ofSeconds(30));

        server.onDispose()
              .block();
    }
}

7.4. Lifecycle Callbacks

提供以下生命周期回調,以便您擴展UDP服務器:

  • doOnBind:當服務器通道即將綁定時調用。
  • doOnBound:綁定服務器通道時調用。
  • doOnUnbound:當服務器通道未綁定時調用。

以下示例使用doOnBound方法:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/lifecycle/Application.java

import io.netty.handler.codec.LineBasedFrameDecoder;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection server =
                UdpServer.create()
                         .doOnBound(conn -> conn.addHandler(new LineBasedFrameDecoder(8192))) 
                         .bindNow(Duration.ofSeconds(30));

        server.onDispose()
              .block();
    }
}

7.5. Connection Configuration

本節描述了可以在UDP級別上使用的三種配置:

7.5.1. Channel Options

默認情況下,UDP服務器配置有以下選項:

./../../reactor-netty-core/src/main/java/reactor/netty/udp/UdpServerBind.java

UdpServerBind() {
  this.config = new UdpServerConfig(
    Collections.singletonMap(ChannelOption.AUTO_READ, false),
    () -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
}

如果需要其他選項或需要更改當前選項,則可以應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/channeloptions/Application.java

import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection server =
                UdpServer.create()
                         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                         .bindNow(Duration.ofSeconds(30));

        server.onDispose()
              .block();
    }
}

有關Netty頻道選項的更多信息,請參見以下鏈接:

7.5.2. Wire Logger

當需要檢查對等點之間的流量時,Reactor Netty提供有線記錄。默認情況下,禁用有線日志記錄。要啟用它,您必須將記錄器react.netty.udp.UdpServer級別設置為DEBUG並應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/wiretap/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection server =
                UdpServer.create()
                         .wiretap(true) 
                         .bindNow(Duration.ofSeconds(30));

        server.onDispose()
              .block();
    }
}

7.5.3. Event Loop Group

默認情況下,UDP服務器使用“事件循環組”,其中工作線程數等於初始化時可用於運行時的處理器數(但最小值為4)。需要其他配置時,可以使用LoopResource#create方法之一。

“事件循環組”的默認配置如下:

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

/**
 * Default worker thread count, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
 * Default selector thread count, fallback to -1 (no selector thread)
 */
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
 * Default worker thread count for UDP, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
 * Default quiet period that guarantees that the disposal of the underlying LoopResources
 * will not happen, fallback to 2 seconds.
 */
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
 */
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

/**
 * Default value whether the native transport (epoll, kqueue) will be preferred,
 * fallback it will be preferred when available
 */

如果需要更改這些設置,則可以應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/eventloop/Application.java

import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

        Connection server =
                UdpServer.create()
                         .runOn(loop)
                         .bindNow(Duration.ofSeconds(30));

        server.onDispose()
              .block();
    }
}

7.6. Metrics

UDP服務器支持與Micrometer的內置集成。它使用前綴react.netty.udp.server公開所有度量。

下表提供了有關UDP服務器指標的信息:

metric name type description
reactor.netty.udp.server.data.received DistributionSummary Amount of the data received, in bytes
reactor.netty.udp.server.data.sent DistributionSummary Amount of the data sent, in bytes
reactor.netty.udp.server.errors Counter Number of errors that occurred

These additional metrics are also available:

ByteBufAllocator metrics

metric name type description
reactor.netty.bytebuf.allocator.used.heap.memory Gauge The number of the bytes of the heap memory
reactor.netty.bytebuf.allocator.used.direct.memory Gauge The number of the bytes of the direct memory
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge The number of heap arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge The number of direct arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge The number of thread local caches (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge The size of the tiny cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge The size of the small cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge The size of the normal cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge The chunk size for an arena (when PooledByteBufAllocator)

The following example enables that integration:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/metrics/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;

import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection server =
                UdpServer.create()
                         .metrics(true) 
                         .bindNow(Duration.ofSeconds(30));

        server.onDispose()
              .block();
    }
}
Enables the built-in integration with Micrometer

如果要與Micrometer以外的系統集成需要UDP服務器度量標准,或者要與Micrometer提供自己的集成,則可以提供自己的度量記錄器,如下所示:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/metrics/custom/Application.java

import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.udp.UdpServer;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection server =
                UdpServer.create()
                         .metrics(true, CustomChannelMetricsRecorder::new) 
                         .bindNow(Duration.ofSeconds(30));

        server.onDispose()
              .block();
    }
Enables UDP server metrics and provides ChannelMetricsRecorder implementation.

Suggest Edit to "UDP Server"

8. UDP Client

Reactor Netty提供了易於使用和易於配置的UdpClient。它隱藏了創建UDP客戶端所需的大多數Netty功能,並增加了Reactive Streams背壓。

8.1. Connecting and Disconnecting

要將UDP客戶端連接到給定的端點,必須創建並配置UdpClient實例。默認情況下,主機配置為localhost,端口為12012。以下示例顯示如何創建和連接UDP客戶端:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/create/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                UdpClient.create()                            
                         .connectNow(Duration.ofSeconds(30)); 

        connection.onDispose()
                  .block();
    }
}

返回的Connection提供了一個簡單的連接API,包括disposeNow(),該API以阻塞的方式關閉了客戶端。

8.1.1. Host and Port

要連接到特定的主機和端口,可以將以下配置應用於UDP客戶端:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/address/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                UdpClient.create()
                         .host("example.com") 
                         .port(80)            
                         .connectNow(Duration.ofSeconds(30));

        connection.onDispose()
                  .block();
    }
}

8.2. Writing Data

要將數據發送到給定的對等方,必須附加I / O處理程序。 I / O處理程序有權訪問UdpOutbound,以便能夠寫入數據。

以下示例顯示如何發送hello:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/send/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;

import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                UdpClient.create()
                         .host("example.com")
                         .port(80)
                         .handle((udpInbound, udpOutbound) -> udpOutbound.sendString(Mono.just("hello"))) 
                         .connectNow(Duration.ofSeconds(30));

        connection.onDispose()
                  .block();
    }
}
Sends hello string to the remote peer.

8.3. Consuming Data

要從給定的對等方接收數據,必須附加I / O處理程序。 I / O處理程序有權訪問UdpInbound,以便能夠讀取數據。以下示例顯示了如何使用數據:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/read/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                UdpClient.create()
                         .host("example.com")
                         .port(80)
                         .handle((udpInbound, udpOutbound) -> udpInbound.receive().then()) 
                         .connectNow(Duration.ofSeconds(30));

        connection.onDispose()
                  .block();
    }
}

8.4. Lifecycle Callbacks

提供以下生命周期回調,以便您擴展UDP客戶端:

  • doOnConnect:通道即將連接時調用。
  • doOnConnected:連接通道后調用。
  • doOnDisconnected:斷開通道后調用。

以下示例使用doOnConnected方法:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/lifecycle/Application.java

import io.netty.handler.codec.LineBasedFrameDecoder;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                UdpClient.create()
                         .host("example.com")
                         .port(80)
                         .doOnConnected(conn -> conn.addHandler(new LineBasedFrameDecoder(8192))) 
                         .connectNow(Duration.ofSeconds(30));

        connection.onDispose()
                  .block();
    }
}

8.5. Connection Configuration

本節描述了可以在UDP級別上使用的三種配置:

8.5.1. Channel Options

默認情況下,UDP客戶端配置有以下選項:

./../../reactor-netty-core/src/main/java/reactor/netty/udp/UdpClientConnect.java

UdpClientConnect() {
    this.config = new UdpClientConfig(
            ConnectionProvider.newConnection(),
            Collections.singletonMap(ChannelOption.AUTO_READ, false),
            () -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
}

如果需要其他選項或需要更改當前選項,則可以應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/channeloptions/Application.java

import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                UdpClient.create()
                         .host("example.com")
                         .port(80)
                         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                         .connectNow(Duration.ofSeconds(30));

        connection.onDispose()
                  .block();
    }
}

您可以在以下鏈接中找到有關Netty頻道選項的更多信息:

8.5.2. Wire Logger

當需要檢查對等點之間的流量時,Reactor Netty提供有線記錄。默認情況下,禁用有線日志記錄。要啟用它,必須將logger的反應堆.netty.udp.UdpClient級別設置為DEBUG並應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/wiretap/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                UdpClient.create()
                         .host("example.com")
                         .port(80)
                         .wiretap(true) 
                         .connectNow(Duration.ofSeconds(30));

        connection.onDispose()
                  .block();
    }
}

8.5.3. Event Loop Group

缺省情況下,UDP客戶端使用“事件循環組”,其中工作線程數等於初始化時可用於運行時的處理器數(但最小值為4)。當需要其他配置時,可以使用LoopResources#create方法之一。

以下清單顯示了“事件循環組”的默認配置:

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

/**
 * Default worker thread count, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
 * Default selector thread count, fallback to -1 (no selector thread)
 */
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
 * Default worker thread count for UDP, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
 * Default quiet period that guarantees that the disposal of the underlying LoopResources
 * will not happen, fallback to 2 seconds.
 */
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
 */
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

/**
 * Default value whether the native transport (epoll, kqueue) will be preferred,
 * fallback it will be preferred when available
 */

如果需要更改這些設置,則可以應用以下配置:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/eventloop/Application.java

import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

        Connection connection =
                UdpClient.create()
                         .host("example.com")
                         .port(80)
                         .runOn(loop)
                         .connectNow(Duration.ofSeconds(30));

        connection.onDispose()
                  .block();
    }
}

8.6. Metrics

UDP客戶端支持與Micrometer的內置集成。它使用前綴react.netty.udp.client公開所有度量。

The following table provides information for the UDP client metrics:

metric name type description
reactor.netty.udp.client.data.received DistributionSummary Amount of the data received, in bytes
reactor.netty.udp.client.data.sent DistributionSummary Amount of the data sent, in bytes
reactor.netty.udp.client.errors Counter Number of errors that occurred
reactor.netty.udp.client.connect.time Timer Time spent for connecting to the remote address
reactor.netty.udp.client.address.resolver Timer Time spent for resolving the address

These additional metrics are also available:

ByteBufAllocator metrics

metric name type description
reactor.netty.bytebuf.allocator.used.heap.memory Gauge The number of the bytes of the heap memory
reactor.netty.bytebuf.allocator.used.direct.memory Gauge The number of the bytes of the direct memory
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge The number of heap arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge The number of direct arenas (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge The number of thread local caches (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge The size of the tiny cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge The size of the small cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge The size of the normal cache (when PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge The chunk size for an arena (when PooledByteBufAllocator)

The following example enables that integration:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/metrics/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;

import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                UdpClient.create()
                         .host("example.com")
                         .port(80)
                         .metrics(true) 
                         .connectNow(Duration.ofSeconds(30));

        connection.onDispose()
                  .block();
    }
}

如果要與Micrometer以外的系統進行集成時需要UDP客戶端指標,或者您想與Micrometer提供自己的集成,則可以提供自己的指標記錄器,如下所示:

./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/metrics/custom/Application.java

import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.udp.UdpClient;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        Connection connection =
                UdpClient.create()
                         .host("example.com")
                         .port(80)
                         .metrics(true, CustomChannelMetricsRecorder::new) 
                         .connectNow(Duration.ofSeconds(30));

        connection.onDispose()
                  .block();
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM