Reactor Netty Reference Guide
1. About the Documentation
本節簡要概述了Reactor Netty參考文檔。您無需線性閱讀本指南。盡管每個零件經常引用其他零件,但它們各自獨立。
1.1. Latest Version and Copyright Notice
Reactor Netty參考指南可作為HTML文檔獲得。最新的副本位於https://projectreactor.io/docs/netty/release/reference/index.html
本文檔的副本可以供您自己使用,也可以分發給他人,但前提是您不對此類副本收取任何費用,並且還應確保每份副本均包含本版權聲明(無論是印刷版本還是電子版本)。
1.2. Contributing to the Documentation
參考指南是用Asciidoc編寫的,您可以在以下位置找到其參考資料 https://github.com/reactor/reactor-netty/tree/master/docs/asciidoc.
如果您有改進,我們將很高興收到您的拉動請求!
我們建議您簽出存儲庫的本地副本,以便可以通過使用asciidoctor Gradle任務並檢查渲染來生成文檔。有些部分依賴於包含的文件,因此GitHub渲染並不總是完整的。
1.3. Getting Help
有幾種方法可以聯系Reactor Netty尋求幫助。您可以:
- 與社區保持聯系 Gitter.
- 在react-netty上的stackoverflow.com上提問。
- 報告Github問題中的錯誤。存儲庫如下:reactor-netty。
2. Getting Started
本節包含的信息應有助於您使用Reactor Netty。它包含以下信息:
- Introducing Reactor Netty
- Prerequisites
- Understanding the BOM and versioning scheme
- Getting Reactor Netty
2.1. Introducing Reactor Netty
適用於微服務架構,Reactor Netty為HTTP(包括Websockets),TCP和UDP提供了支持背壓的網絡引擎。
2.2. Prerequisites
Reactor Netty在Java 8及更高版本上運行。
它具有以下傳遞依賴項:
- Reactive Streams v1.0.3
- Reactor Core v3.x
- Netty v4.1.x
2.3. 了解BOM和版本控制方案
Reactor Netty是Project Reactor BOM的一部分(因為鋁釋放鏈)。盡管這些工件中可能存在不同的版本控制方案,但該精選列表將旨在良好協作的工件分組,提供了相關版本。
工件遵循MAJOR.MINOR.PATCH-QUALIFIER的版本控制方案,而BOM使用受CalVer啟發的YYYY.MINOR.PATCH-QUALIFIER的方案進行版本控制,其中:
- MAJOR是當前的Reactor一代,每個新一代的Reactor都可以對項目結構帶來根本性的改變(這可能意味着需要進行更大的移植工作)
- YYYY是給定發行周期中首次發布GA的年份(如1.0.x的1.0.0)
- .MINOR是從0開始的數字,每個新發行周期遞增
- 就項目而言,它通常反映了更廣泛的變化,並且可以表明進行了適度的遷移工作
- 在BOM表的情況下,如果兩個在同一年首次發布,則可以區分發布周期
- .PATCH是基於0的數字,隨每個服務版本遞增
- -QUALIFIER是文本限定符,在GA版本中會省略(請參見下文)
因此,遵循該約定的第一個發行周期是2020.0.x,代號Europium。該方案按順序使用以下限定符(注意使用破折號):
-M1
..-M9
: 里程碑(我們預計每個服務版本不會超過9個)-RC1
..-RC9
: 候選版本(我們預計每個服務版本不會超過9個)-SNAPSHOT
: 快照- 對於GA版本沒有限定符
每個發布周期都被賦予一個代號,與以前的基於代號的方案相一致,可用於更非正式地引用它(例如在討論,博客文章等中)。代號代表傳統上的MAJOR.MINOR號。它們(大多數)來自元素周期表,按字母順序遞增。
2.4. Getting Reactor Netty
如前所述,在核心中使用Reactor Netty的最簡單方法是使用BOM,並將相關的依賴項添加到項目中。請注意,添加此類依賴項時,必須省略版本,以便從BOM表中提取該版本。
但是,如果您要強制使用特定工件的版本,則可以像通常那樣在添加依賴項時指定它。您也可以完全放棄BOM表,並通過工件版本指定依賴關系。
2.4.1. Maven Installation
BOM概念由Maven原生支持。首先,您需要通過將以下代碼段添加到pom.xml來導入BOM。如果您的pom中已經存在頂部(dependencyManagement),則僅添加內容。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Dysprosium-SR10</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
接下來,照常將依賴項添加到相關的reactor項目中(不用使用<version>
)。以下清單顯示了如何執行此操作:
<dependencies>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-http</artifactId>
</dependency>
</dependencies>
2.4.2. Gradle Installation
從版本5開始,Gradle支持BOM表概念。以下清單顯示了如何導入BOM表並向Reactor Netty添加依賴項:
dependencies {
// import a BOM
implementation platform('io.projectreactor:reactor-bom:Dysprosium-SR10')
// define dependencies without versions
implementation 'io.projectreactor.netty:reactor-netty-core'
implementation 'io.projectreactor.netty:reactor-netty-http'
}
2.4.3. Milestones and Snapshots
里程碑和開發人員預覽是通過Spring Milestones存儲庫而不是Maven Central分發的。要將其添加到您的構建配置文件中,請使用以下代碼段:
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones Repository</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
對於Gradle,請使用以下代碼段:
repositories {
maven { url 'https://repo.spring.io/milestone' }
mavenCentral()
}
同樣,快照也可在單獨的專用存儲庫中使用(適用於Maven和Gradle):
-SNAPSHOTs in Maven
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshot Repository</name>
<url>https://repo.spring.io/snapshot</url>
</repository>
</repositories>
-SNAPSHOTs in Gradle
repositories {
maven { url 'https://repo.spring.io/snapshot' }
mavenCentral()
}
3. TCP Server
Reactor Netty提供了易於使用和配置的TcpServer。它隱藏了創建TCP服務器所需的大多數Netty功能,並增加了Reactive Streams背壓。
3.1. Starting and Stopping
要啟動TCP服務器,必須創建並配置TcpServer實例。默認情況下,主機配置為使用任何本地地址,並且在調用綁定操作時,系統會選擇一個臨時端口。以下示例顯示如何創建和配置TcpServer實例:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/create/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.bindNow();
server.onDispose()
.block();
}
}
返回的DisposableServer提供了一個簡單的服務器API,包括disposeNow(),它以阻塞方式關閉服務器。
3.1.1. Host and Port
要在特定的主機和端口上提供服務,可以將以下配置應用於TCP服務器:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/address/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.host("localhost")
.port(8080)
.bindNow();
server.onDispose()
.block();
}
}
3.2. Writing Data
為了將數據發送到連接的客戶端,必須附加一個I / O處理程序。 I / O處理程序有權訪問NettyOutbound以便寫入數據。以下示例顯示了如何附加I / O處理程序:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/send/Application.java
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.handle((inbound, outbound) -> outbound.sendString(Mono.just("hello")))
.bindNow();
server.onDispose()
.block();
}
}
3.3. Consuming Data
為了從連接的客戶端接收數據,必須附加I / O處理程序。 I / O處理程序可以訪問NettyInbound以便讀取數據。以下示例顯示了如何使用它:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/read/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.handle((inbound, outbound) -> inbound.receive().then())
.bindNow();
server.onDispose()
.block();
}
}
3.4. Lifecycle Callbacks
提供以下生命周期回調,以便您擴展TCP服務器:
doOnBind
:當服務器通道即將綁定時調用。doOnBound
: 綁定服務器通道時調用。doOnConnection
: 連接遠程客戶端時調用doOnUnbound
: 服務器通道未綁定時調用。
以下示例使用doOnConnection回調:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/lifecycle/Application.java
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.util.concurrent.TimeUnit;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.doOnConnection(conn ->
conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS)))
.bindNow();
server.onDispose()
.block();
}
}
3.5. TCP-level Configurations
本節描述了可以在TCP級別上使用的三種配置:
3.5.1. Setting Channel Options
默認情況下,TCP服務器配置有以下選項:
./../../reactor-netty-core/src/main/java/reactor/netty/tcp/TcpServerBind.java
TcpServerBind() {
Map<ChannelOption<?>, Boolean> childOptions = new HashMap<>(2);
childOptions.put(ChannelOption.AUTO_READ, false);
childOptions.put(ChannelOption.TCP_NODELAY, true);
this.config = new TcpServerConfig(
Collections.singletonMap(ChannelOption.SO_REUSEADDR, true),
childOptions,
() -> new InetSocketAddress(DEFAULT_PORT));
}
如果需要其他選項,或者需要更改當前選項,則可以應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/channeloptions/Application.java
import io.netty.channel.ChannelOption;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.bindNow();
server.onDispose()
.block();
}
}
您可以在以下鏈接中找到有關Netty頻道選項的更多信息:
3.5.2. Using a Wire Logger
當需要檢查對等點之間的流量時,Reactor Netty提供有線記錄。默認情況下,禁用有線日志記錄。要啟用它,必須將logger的react.netty.tcp.TcpServer級別設置為DEBUG並應用以下配置;
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/wiretap/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.wiretap(true)
.bindNow();
server.onDispose()
.block();
}
3.5.3. Using an Event Loop Group
默認情況下,TCP服務器使用“事件循環組”,其中工作線程數等於初始化時可用於運行時的處理器數(但最小值為4)。需要其他配置時,可以使用LoopResource#create方法之一。
事件循環組的默認配置如下:
./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java
/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available
*/
如果需要更改這些設置,則可以應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/eventloop/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
DisposableServer server =
TcpServer.create()
.runOn(loop)
.bindNow();
server.onDispose()
.block();
}
}
3.6. SSL and TLS
當需要SSL或TLS時,可以應用下清單中顯示的配置。默認情況下,如果OpenSSL可用,則將SslProvider.OPENSSL提供程序用作提供程序。否則,將使用SslProvider.JDK。可以通過SslContextBuilder或通過設置-Dio.netty.handler.ssl.noOpenSsl = true來切換提供程序。
以下示例使用SslContextBuilder:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/security/Application.java
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.io.File;
public class Application {
public static void main(String[] args) {
File cert = new File("certificate.crt");
File key = new File("private.key");
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);
DisposableServer server =
TcpServer.create()
.secure(spec -> spec.sslContext(sslContextBuilder))
.bindNow();
server.onDispose()
.block();
}
}
3.6.1. Server Name Indication
您可以配置具有映射到特定域的多個SslContext的TCP服務器。配置SNI映射時,可以使用確切的域名或包含通配符的域名。
以下示例使用包含通配符的域名:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/sni/Application.java
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.io.File;
public class Application {
public static void main(String[] args) throws Exception {
File defaultCert = new File("default_certificate.crt");
File defaultKey = new File("default_private.key");
File testDomainCert = new File("default_certificate.crt");
File testDomainKey = new File("default_private.key");
SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();
SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();
DisposableServer server =
TcpServer.create()
.secure(spec -> spec.sslContext(defaultSslContext)
.addSniMapping("*.test.com",
testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))
.bindNow();
server.onDispose()
.block();
}
}
3.7. Metrics
TCP服務器支持與Micrometer的內置集成。它使用前綴Reactor.netty.tcp.server公開所有度量。
下表提供了有關TCP服務器指標的信息:
metric name | type | description |
---|---|---|
reactor.netty.tcp.server.data.received | DistributionSummary | 接收到的數據量,以字節為單位 |
reactor.netty.tcp.server.data.sent | DistributionSummary | 發送的數據量(以字節為單位) |
reactor.netty.tcp.server.errors | Counter | 發生的錯誤數 |
reactor.netty.tcp.server.tls.handshake.time | Timer | TLS握手花費的時間 |
這些其他指標也可用:
ByteBufAllocator
metrics
metric name | type | description |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory | Gauge | 堆內存的字節數 |
reactor.netty.bytebuf.allocator.used.direct.memory | Gauge | 直接存儲器的字節數 |
reactor.netty.bytebuf.allocator.used.heap.arenas | Gauge | The number of heap arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.direct.arenas | Gauge | The number of direct arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.threadlocal.caches | Gauge | The number of thread local caches (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.tiny.cache.size | Gauge | The size of the tiny cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.small.cache.size | Gauge | The size of the small cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.normal.cache.size | Gauge | The size of the normal cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.chunk.size | Gauge | The chunk size for an arena (when PooledByteBufAllocator ) |
以下示例啟用了該集成:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/metrics/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.metrics(true)
.bindNow();
server.onDispose()
.block();
}
}
當與非Micrometer的系統集成時需要TCP服務器度量標准,或者要與Micrometer集成時,可以提供自己的度量記錄器,如下所示:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/metrics/custom/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.tcp.TcpServer;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.metrics(true, CustomChannelMetricsRecorder::new)
.bindNow();
server.onDispose()
.block();
}
3.8. Unix Domain Sockets
使用本地傳輸時,TCP服務器支持Unix域套接字(UDS)。
以下示例顯示了如何使用UDS支持:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/server/uds/Application.java
import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.bindAddress(() -> new DomainSocketAddress("/tmp/test.sock"))
.bindNow();
server.onDispose()
.block();
}
}
4. TCP Client
Reactor Netty提供了易於使用和易於配置的TcpClient。它隱藏了創建TCP客戶端所需的大多數Netty功能,並增加了Reactive Streams背壓。
4.1. Connect and Disconnect
要將TCP客戶端連接到給定的端點,必須創建並配置TcpClient實例。默認情況下,主機是localhost,端口是12012。以下示例顯示如何創建TcpClient:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/create/Application.java
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.connectNow();
connection.onDispose()
.block();
}
}
返回的Connection提供了一個簡單的連接API,包括disposeNow(),該API以阻塞的方式關閉了客戶端。
4.1.1. Host and Port
要連接到特定的主機和端口,可以將以下配置應用於TCP客戶端。以下示例顯示了如何執行此操作:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/address/Application.java
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.connectNow();
connection.onDispose()
.block();
}
}
4.2. Writing Data
要將數據發送到給定的端點,必須附加一個I / O處理程序。 I / O處理程序有權訪問NettyOutbound以便寫入數據。
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/send/Application.java
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.handle((inbound, outbound) -> outbound.sendString(Mono.just("hello")))
.connectNow();
connection.onDispose()
.block();
}
}
4.3. Consuming Data
要從給定的端點接收數據,必須附加一個I / O處理程序。 I / O處理程序可以訪問NettyInbound以便讀取數據。以下示例顯示了如何執行此操作:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/read/Application.java
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.handle((inbound, outbound) -> inbound.receive().then())
.connectNow();
connection.onDispose()
.block();
}
}
4.4. Lifecycle Callbacks
提供以下生命周期回調,以便您擴展TCP客戶端。
doOnConnect
: 在通道即將連接時調用。doOnConnected
: 連接通道后調用。doOnDisconnected
: 斷開通道后,調用此按鈕。
以下示例使用doOnConnected回調:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/lifecycle/Application.java
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import java.util.concurrent.TimeUnit;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.doOnConnected(conn ->
conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS)))
.connectNow();
connection.onDispose()
.block();
}
}
4.5. TCP-level Configurations
本節描述了可以在TCP級別上使用的三種配置:
4.5.1. Channel Options
默認情況下,TCP客戶端配置有以下選項:
./../../reactor-netty-core/src/main/java/reactor/netty/tcp/TcpClientConnect.java
TcpClientConnect(ConnectionProvider provider) {
this.config = new TcpClientConfig(
provider,
Collections.singletonMap(ChannelOption.AUTO_READ, false),
() -> AddressUtils.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));
}
如果需要其他選項,或者需要更改當前選項,則可以應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/channeloptions/Application.java
import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.connectNow();
connection.onDispose()
.block();
}
}
您可以在以下鏈接中找到有關Netty頻道選項的更多信息:
4.5.2. Wire Logger
當需要檢查對等點之間的流量時,Reactor Netty提供有線記錄。默認情況下,禁用有線日志記錄。要啟用它,必須將記錄器react.netty.tcp.TcpClient級別設置為DEBUG並應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/wiretap/Application.java
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.wiretap(true)
.host("example.com")
.port(80)
.connectNow();
connection.onDispose()
.block();
}
}
4.5.3. Event Loop Group
默認情況下,TCP客戶端使用“事件循環組”,其中工作線程數等於初始化時可用於運行時的處理器數(但最小值為4)。需要其他配置時,可以使用LoopResource#create方法之一。
以下清單顯示了事件循環組的默認配置:
./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java
/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available
*/
如果需要更改這些設置,則可以應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/eventloop/Application.java
import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.runOn(loop)
.connectNow();
connection.onDispose()
.block();
}
}
4.6. Connection Pool
默認情況下,TCP客戶端使用“固定”連接池,其中最大通道數為500,最大注冊請求數為1000,以保留在掛起隊列中(對於其余配置,請檢查系統屬性下面)。這意味着如果有人嘗試獲取頻道但池中沒有頻道,則實現會創建一個新頻道。當達到池中通道的最大數目時,新的獲取通道的嘗試將延遲,直到再次將通道返回到池中為止。
./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java
/**
* Default max connections. Fallback to
* available number of processors (but with a minimum value of 16)
*/
public static final String POOL_MAX_CONNECTIONS = "reactor.netty.pool.maxConnections";
/**
* Default acquisition timeout (milliseconds) before error. If -1 will never wait to
* acquire before opening a new
* connection in an unbounded fashion. Fallback 45 seconds
*/
public static final String POOL_ACQUIRE_TIMEOUT = "reactor.netty.pool.acquireTimeout";
/**
* Default max idle time, fallback - max idle time is not specified.
*/
public static final String POOL_MAX_IDLE_TIME = "reactor.netty.pool.maxIdleTime";
/**
* Default max life time, fallback - max life time is not specified.
*/
public static final String POOL_MAX_LIFE_TIME = "reactor.netty.pool.maxLifeTime";
/**
* Default leasing strategy (fifo, lifo), fallback to fifo.
* <ul>
* <li>fifo - The connection selection is first in, first out</li>
* <li>lifo - The connection selection is last in, first out</li>
* </ul>
*/
如果需要禁用連接池,則可以應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/pool/Application.java
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.newConnection()
.host("example.com")
.port(80)
.connectNow();
connection.onDispose()
.block();
}
}
如果需要為連接池中的通道指定空閑時間,則可以應用以下配置
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/pool/config/Application.java
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
ConnectionProvider provider =
ConnectionProvider.builder("fixed")
.maxConnections(50)
.pendingAcquireTimeout(Duration.ofMillis(30000))
.maxIdleTime(Duration.ofMillis(60))
.build();
Connection connection =
TcpClient.create(provider)
.host("example.com")
.port(80)
.connectNow();
connection.onDispose()
.block();
}
}
4.6.1. Metrics
池中的ConnectionProvider支持與Micrometer的內置集成。它使用前綴react.netty.connection.provider公開所有度量。
Pooled ConnectionProvider
metrics
metric name | type | description |
---|---|---|
reactor.netty.connection.provider.total.connections | Gauge | The number of all connections, active or idle |
reactor.netty.connection.provider.active.connections | Gauge | The number of the connections that have been successfully acquired and are in active use |
reactor.netty.connection.provider.idle.connections | Gauge | The number of the idle connections |
reactor.netty.connection.provider.pending.connections | Gauge | The number of requests that are waiting for a connection |
以下示例啟用了該集成:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/pool/metrics/Application.java
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
ConnectionProvider provider =
ConnectionProvider.builder("fixed")
.maxConnections(50)
.metrics(true)
.build();
Connection connection =
TcpClient.create(provider)
.host("example.com")
.port(80)
.connectNow();
connection.onDispose()
.block();
}
}
4.7. SSL and TLS
當需要SSL或TLS時,可以應用以下配置。默認情況下,如果OpenSSL可用,則將SslProvider.OPENSSL提供程序用作提供程序。否則,提供程序為SslProvider.JDK。您可以通過使用SslContextBuilder或通過設置-Dio.netty.handler.ssl.noOpenSsl = true來切換提供程序。
以下示例使用SslContextBuilder:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/security/Application.java
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
Connection connection =
TcpClient.create()
.host("example.com")
.port(443)
.secure(spec -> spec.sslContext(sslContextBuilder))
.connectNow();
connection.onDispose()
.block();
}
}
4.7.1. Server Name Indication
默認情況下,TCP客戶端將遠程主機名作為SNI服務器名發送。當需要更改此默認設置時,可以按以下方式配置TCP客戶端:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/sni/Application.java
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import javax.net.ssl.SNIHostName;
public class Application {
public static void main(String[] args) throws Exception {
SslContext sslContext = SslContextBuilder.forClient().build();
Connection connection =
TcpClient.create()
.host("127.0.0.1")
.port(8080)
.secure(spec -> spec.sslContext(sslContext)
.serverNames(new SNIHostName("test.com")))
.connectNow();
connection.onDispose()
.block();
}
}
4.8. Proxy Support
TCP客戶端支持Netty提供的代理功能,並提供了一種通過ProxyProvider構建器指定“非代理主機”的方法。以下示例使用ProxyProvider:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/proxy/Application.java
import reactor.netty.Connection;
import reactor.netty.transport.ProxyProvider;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.proxy(spec -> spec.type(ProxyProvider.Proxy.SOCKS4)
.host("proxy")
.port(8080)
.nonProxyHosts("localhost"))
.connectNow();
connection.onDispose()
.block();
}
}
4.9. Metrics
TCP客戶端支持與Micrometer的內置集成。它使用前綴react.netty.tcp.client公開所有度量。
下表提供了有關TCP客戶端指標的信息:
metric name | type | description |
---|---|---|
reactor.netty.tcp.client.data.received | DistributionSummary | Amount of the data received, in bytes |
reactor.netty.tcp.client.data.sent | DistributionSummary | Amount of the data sent, in bytes |
reactor.netty.tcp.client.errors | Counter | Number of errors that occurred |
reactor.netty.tcp.client.tls.handshake.time | Timer | Time spent for TLS handshake |
reactor.netty.tcp.client.connect.time | Timer | Time spent for connecting to the remote address |
reactor.netty.tcp.client.address.resolver | Timer | Time spent for resolving the address |
這些其他指標也可用:
Pooled ConnectionProvider
metrics
metric name | type | description |
---|---|---|
reactor.netty.connection.provider.total.connections | Gauge | The number of all connections, active or idle |
reactor.netty.connection.provider.active.connections | Gauge | The number of the connections that have been successfully acquired and are in active use |
reactor.netty.connection.provider.idle.connections | Gauge | The number of the idle connections |
reactor.netty.connection.provider.pending.connections | Gauge | The number of requests that are waiting for a connection |
ByteBufAllocator
metrics
metric name | type | description |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory | Gauge | The number of the bytes of the heap memory |
reactor.netty.bytebuf.allocator.used.direct.memory | Gauge | The number of the bytes of the direct memory |
reactor.netty.bytebuf.allocator.used.heap.arenas | Gauge | The number of heap arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.direct.arenas | Gauge | The number of direct arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.threadlocal.caches | Gauge | The number of thread local caches (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.tiny.cache.size | Gauge | The size of the tiny cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.small.cache.size | Gauge | The size of the small cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.normal.cache.size | Gauge | The size of the normal cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.chunk.size | Gauge | The chunk size for an arena (when PooledByteBufAllocator ) |
以下示例啟用了該集成:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/metrics/Application.java
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.metrics(true)
.connectNow();
connection.onDispose()
.block();
}
}
當需要TCP客戶端度量標准來與Micrometer以外的系統集成時,或者您想要提供自己與Micrometer的集成時,可以提供自己的度量記錄器,如下所示:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/metrics/custom/Application.java
import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.tcp.TcpClient;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.metrics(true, CustomChannelMetricsRecorder::new)
.connectNow();
connection.onDispose()
.block();
}
Enables TCP client metrics and provides ChannelMetricsRecorder implementation. |
|
---|---|
4.10. Unix Domain Sockets
使用本地傳輸時,TCP客戶端支持Unix域套接字(UDS)。
以下示例顯示了如何使用UDS支持:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/tcp/client/uds/Application.java
import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock"))
.connectNow();
connection.onDispose()
.block();
}
}
5. HTTP Server
Reactor Netty提供了易於使用和易於配置的HttpServer類。它隱藏了創建HTTP服務器所需的大多數Netty功能,並增加了Reactive Streams背壓。
5.1. Starting and Stopping
要啟動HTTP服務器,您必須創建並配置HttpServer實例。默認情況下,主機配置為使用任何本地地址,並且在調用綁定操作時,系統會選擇一個臨時端口。以下示例顯示如何創建HttpServer實例:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/create/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.bindNow();
server.onDispose()
.block();
}
}
返回的DisposableServer提供了一個簡單的服務器API,包括disposeNow(),它以阻塞方式關閉服務器。
5.1.1. Host and Port
要在特定的主機和端口上提供服務,可以將以下配置應用於HTTP服務器:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/address/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.host("localhost")
.port(8080)
.bindNow();
server.onDispose()
.block();
}
}
5.2. Routing HTTP
定義HTTP服務器的路由需要配置提供的HttpServerRoutes構建器。以下示例顯示了如何執行此操作:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/routing/Application.java
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.route(routes ->
routes.get("/hello",
(request, response) -> response.sendString(Mono.just("Hello World!")))
.post("/echo",
(request, response) -> response.send(request.receive().retain()))
.get("/path/{param}",
(request, response) -> response.sendString(Mono.just(request.param("param"))))
.ws("/ws",
(wsInbound, wsOutbound) -> wsOutbound.send(wsInbound.receive().retain())))
.bindNow();
server.onDispose()
.block();
}
}
5.2.1. SSE
以下代碼顯示了如何配置HTTP服務器以服務於服務器發送的事件:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/sse/Application.java
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;
import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.function.BiFunction;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.route(routes -> routes.get("/sse", serveSse()))
.bindNow();
server.onDispose()
.block();
}
/**
* Prepares SSE response
* The "Content-Type" is "text/event-stream"
* The flushing strategy is "flush after every element" emitted by the provided Publisher
*/
private static BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> serveSse() {
Flux<Long> flux = Flux.interval(Duration.ofSeconds(10));
return (request, response) ->
response.sse()
.send(flux.map(Application::toByteBuf), b -> true);
}
/**
* Transforms the Object to ByteBuf following the expected SSE format.
*/
private static ByteBuf toByteBuf(Object any) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
out.write("data: ".getBytes(Charset.defaultCharset()));
MAPPER.writeValue(out, any);
out.write("\n\n".getBytes(Charset.defaultCharset()));
}
catch (Exception e) {
throw new RuntimeException(e);
}
return ByteBufAllocator.DEFAULT
.buffer()
.writeBytes(out.toByteArray());
}
private static final ObjectMapper MAPPER = new ObjectMapper();
}
5.2.2. Static Resources
以下代碼顯示了如何配置HTTP服務器以提供靜態資源:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/staticresources/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
public class Application {
public static void main(String[] args) throws URISyntaxException {
Path file = Paths.get(Application.class.getResource("/logback.xml").toURI());
DisposableServer server =
HttpServer.create()
.route(routes -> routes.file("/index.html", file))
.bindNow();
server.onDispose()
.block();
}
}
5.3. Writing Data
要將數據發送到已連接的客戶端,必須使用handle(...)或route(...)附加I / O處理程序。 I / O處理程序有權訪問HttpServerResponse,以便能夠寫入數據。以下示例使用handle(…)方法:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/send/Application.java
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.handle((request, response) -> response.sendString(Mono.just("hello")))
.bindNow();
server.onDispose()
.block();
}
}
5.3.1. Adding Headers and Other Metadata
將數據發送到連接的客戶端時,可能需要發送其他標頭,cookie,狀態代碼和其他元數據。您可以使用HttpServerResponse提供此附加元數據。以下示例顯示了如何執行此操作:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/send/headers/Application.java
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.route(routes ->
routes.get("/hello",
(request, response) ->
response.status(HttpResponseStatus.OK)
.header(HttpHeaderNames.CONTENT_LENGTH, "12")
.sendString(Mono.just("Hello World!"))))
.bindNow();
server.onDispose()
.block();
}
}
5.3.2. Compression
您可以配置HTTP服務器以發送壓縮的響應,具體取決於請求標頭Accept-Encoding。
Reactor Netty提供了三種用於壓縮傳出數據的策略:
- compress(boolean):根據提供的布爾值,啟用壓縮(true)還是禁用壓縮(false)。
- compress(int):一旦響應大小超過給定值(以字節為單位),就執行壓縮。
- compress(BiPredicate ):如果謂詞返回true,則執行壓縮。
以下示例使用compress方法(設置為true)啟用壓縮:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/compression/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
public class Application {
public static void main(String[] args) throws URISyntaxException {
Path file = Paths.get(Application.class.getResource("/logback.xml").toURI());
DisposableServer server =
HttpServer.create()
.compress(true)
.route(routes -> routes.file("/index.html", file))
.bindNow();
server.onDispose()
.block();
}
}
5.4. Consuming Data
要從已連接的客戶端接收數據,必須使用handle(...)或route(...)附加I / O處理程序。 I / O處理程序有權訪問HttpServerRequest,以便能夠讀取數據。
以下示例使用handle(…)方法:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/read/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.handle((request, response) -> request.receive().then())
.bindNow();
server.onDispose()
.block();
}
}
5.4.1. Reading Headers, URI Params, and other Metadata
從連接的客戶端接收數據時,可能需要檢查請求標頭,參數和其他元數據。您可以使用HttpServerRequest獲得此其他元數據。以下示例顯示了如何執行此操作:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/read/headers/Application.java
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.route(routes ->
routes.get("/{param}",
(request, response) -> {
if (request.requestHeaders().contains("Some-Header")) {
return response.sendString(Mono.just(request.param("param")));
}
return response.sendNotFound();
}))
.bindNow();
server.onDispose()
.block();
}
}
Obtaining the Remote (Client) Address
除了可以從請求中獲取的元數據之外,您還可以接收主機(服務器)地址,遠程(客戶端)地址和方案。根據選擇的工廠方法,您可以直接從通道或使用Forwarded或X-Forwarded- * HTTP請求標頭檢索信息。以下示例顯示了如何執行此操作:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/clientaddress/Application.java
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.forwarded(true)
.route(routes ->
routes.get("/clientip",
(request, response) ->
response.sendString(Mono.just(request.remoteAddress()
.getHostString()))))
.bindNow();
server.onDispose()
.block();
}
}
還可以自定義Forwarded或X-Forwarded- *標頭處理程序的行為。以下示例顯示了如何執行此操作:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/clientaddress/CustomForwardedHeaderHandlerApplication.java
import java.net.InetSocketAddress;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.transport.AddressUtils;
public class CustomForwardedHeaderHandlerApplication {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.forwarded((connectionInfo, request) -> {
String hostHeader = request.headers().get("X-Forwarded-Host");
if (hostHeader != null) {
String[] hosts = hostHeader.split(",", 2);
InetSocketAddress hostAddress = AddressUtils.createUnresolved(
hosts[hosts.length - 1].trim(),
connectionInfo.getHostAddress().getPort());
connectionInfo = connectionInfo.withHostAddress(hostAddress);
}
return connectionInfo;
})
.route(routes ->
routes.get("/clientip",
(request, response) ->
response.sendString(Mono.just(request.remoteAddress()
.getHostString()))))
.bindNow();
server.onDispose()
.block();
}
}
5.4.2. HTTP Request Decoder
默認情況下,Netty為傳入請求配置一些限制,例如:
- 初始行的最大長度。
- 請求頭的最大長度。
- 內容或每個塊的最大長度。
有關更多信息,請參見HttpRequestDecoder和HttpServerUpgradeHandler。
默認情況下,HTTP服務器配置有以下設置:
./../../reactor-netty-http/src/main/java/reactor/netty/http/HttpDecoderSpec.java
public static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096;
public static final int DEFAULT_MAX_HEADER_SIZE = 8192;
public static final int DEFAULT_MAX_CHUNK_SIZE = 8192;
public static final boolean DEFAULT_VALIDATE_HEADERS = true;
public static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
./../../reactor-netty-http/src/main/java/reactor/netty/http/server/HttpRequestDecoderSpec.java
/**
* The maximum length of the content of the HTTP/2.0 clear-text upgrade request.
* By default the server will reject an upgrade request with non-empty content,
* because the upgrade request is most likely a GET request.
*/
public static final int DEFAULT_H2C_MAX_CONTENT_LENGTH = 0;
當需要更改這些默認設置時,可以按以下方式配置HTTP服務器:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/requestdecoder/Application.java
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.httpRequestDecoder(spec -> spec.maxHeaderSize(16384))
.handle((request, response) -> response.sendString(Mono.just("hello")))
.bindNow();
server.onDispose()
.block();
}
}
5.5. TCP-level Configuration
當需要在TCP級別上更改配置時,可以使用以下代碼段擴展默認的TCP服務器配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/channeloptions/Application.java
import io.netty.channel.ChannelOption;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.bindNow();
server.onDispose()
.block();
}
}
有關TCP級別配置的更多詳細信息,請參見TCP Server。
5.5.1. Wire Logger
當您需要檢查對等方之間的流量時,Reactor Netty提供了線路日志記錄。默認情況下,禁用有線日志記錄。要啟用它,必須將記錄器react.netty.http.server.HttpServer級別設置為DEBUG並應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/wiretap/Application.java
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.wiretap(true)
.bindNow();
server.onDispose()
.block();
}
}
5.6. SSL and TLS
當需要SSL或TLS時,可以應用下一個示例中顯示的配置。默認情況下,如果OpenSSL可用,則將SslProvider.OPENSSL提供程序用作提供程序。否則,將使用SslProvider.JDK。您可以通過使用SslContextBuilder或通過設置-Dio.netty.handler.ssl.noOpenSsl = true來切換提供程序。
以下示例使用SslContextBuilder:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/security/Application.java
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import java.io.File;
public class Application {
public static void main(String[] args) {
File cert = new File("certificate.crt");
File key = new File("private.key");
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);
DisposableServer server =
HttpServer.create()
.secure(spec -> spec.sslContext(sslContextBuilder))
.bindNow();
server.onDispose()
.block();
}
}
5.6.1. Server Name Indication
您可以為HTTP服務器配置多個映射到特定域的SslContext。配置SNI映射時,可以使用確切的域名或包含通配符的域名。
以下示例使用包含通配符的域名:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/sni/Application.java
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import java.io.File;
public class Application {
public static void main(String[] args) throws Exception {
File defaultCert = new File("default_certificate.crt");
File defaultKey = new File("default_private.key");
File testDomainCert = new File("default_certificate.crt");
File testDomainKey = new File("default_private.key");
SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();
SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();
DisposableServer server =
HttpServer.create()
.secure(spec -> spec.sslContext(defaultSslContext)
.addSniMapping("*.test.com",
testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))
.bindNow();
server.onDispose()
.block();
}
}
5.7. HTTP Access Log
當前的日志支持僅提供“通用日志格式”。
您可以使用-Dreactor.netty.http.server.accessLogEnabled = true啟用HTTP訪問日志。默認情況下,它是禁用的。
您可以使用以下配置(用於Logback或類似的日志記錄框架)來擁有單獨的HTTP訪問日志文件:
<appender name="accessLog" class="ch.qos.logback.core.FileAppender">
<file>access_log.log</file>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="accessLog" />
</appender>
<logger name="reactor.netty.http.server.AccessLog" level="INFO" additivity="false">
<appender-ref ref="async"/>
</logger>
5.8. HTTP/2
默認情況下,HTTP服務器支持HTTP / 1.1。如果需要HTTP / 2,則可以通過配置獲取它。除了協議配置之外,如果您需要H2而不是H2C(明文),則還必須配置SSL。
以下清單提供了一個簡單的H2示例:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/http2/H2Application.java
import io.netty.handler.ssl.SslContextBuilder;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;
import java.io.File;
public class H2Application {
public static void main(String[] args) {
File cert = new File("certificate.crt");
File key = new File("private.key");
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);
DisposableServer server =
HttpServer.create()
.port(8080)
.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(sslContextBuilder))
.handle((request, response) -> response.sendString(Mono.just("hello")))
.bindNow();
server.onDispose()
.block();
}
}
現在,應用程序的行為應如下所示:
$ curl --http2 https://localhost:8080 -i
HTTP/2 200
hello
以下清單提供了一個簡單的H2C示例:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/http2/H2CApplication.java
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;
public class H2CApplication {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.port(8080)
.protocol(HttpProtocol.H2C)
.handle((request, response) -> response.sendString(Mono.just("hello")))
.bindNow();
server.onDispose()
.block();
}
}
現在,應用程序的行為應如下所示:
$ curl --http2-prior-knowledge http://localhost:8080 -i
HTTP/2 200
hello
5.8.1. Protocol Selection
./../../reactor-netty-http/src/main/java/reactor/netty/http/HttpProtocol.java
public enum HttpProtocol {
/**
* The default supported HTTP protocol by HttpServer and HttpClient
*/
HTTP11,
/**
* HTTP/2.0 support with TLS
* <p>If used along with HTTP/1.1 protocol, HTTP/2.0 will be the preferred protocol.
* While negotiating the application level protocol, HTTP/2.0 or HTTP/1.1 can be chosen.
* <p>If used without HTTP/1.1 protocol, HTTP/2.0 will always be offered as a protocol
* for communication with no fallback to HTTP/1.1.
*/
H2,
/**
* HTTP/2.0 support with clear-text.
* <p>If used along with HTTP/1.1 protocol, will support H2C "upgrade":
* Request or consume requests as HTTP/1.1 first, looking for HTTP/2.0 headers
* and {@literal Connection: Upgrade}. A server will typically reply a successful
* 101 status if upgrade is successful or a fallback HTTP/1.1 response. When
* successful the client will start sending HTTP/2.0 traffic.
* <p>If used without HTTP/1.1 protocol, will support H2C "prior-knowledge": Doesn't
* require {@literal Connection: Upgrade} handshake between a client and server but
* fallback to HTTP/1.1 will not be supported.
*/
H2C
}
5.9. Metrics
HTTP服務器支持與Micrometer的內置集成。它使用前綴react..netty.http.server公開所有度量。
metric name | type | description |
---|---|---|
reactor.netty.http.server.data.received | DistributionSummary | Amount of the data received, in bytes |
reactor.netty.http.server.data.sent | DistributionSummary | Amount of the data sent, in bytes |
reactor.netty.http.server.errors | Counter | Number of errors that occurred |
reactor.netty.http.server.data.received.time | Timer | Time spent in consuming incoming data |
reactor.netty.http.server.data.sent.time | Timer | Time spent in sending outgoing data |
reactor.netty.http.server.response.time | Timer | Total time for the request/response |
These additional metrics are also available:
ByteBufAllocator
metrics
metric name | type | description |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory | Gauge | The number of the bytes of the heap memory |
reactor.netty.bytebuf.allocator.used.direct.memory | Gauge | The number of the bytes of the direct memory |
reactor.netty.bytebuf.allocator.used.heap.arenas | Gauge | The number of heap arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.direct.arenas | Gauge | The number of direct arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.threadlocal.caches | Gauge | The number of thread local caches (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.tiny.cache.size | Gauge | The size of the tiny cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.small.cache.size | Gauge | The size of the small cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.normal.cache.size | Gauge | The size of the normal cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.chunk.size | Gauge | The chunk size for an arena (when PooledByteBufAllocator ) |
以下示例啟用了該集成:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/metrics/Application.java
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.config.MeterFilter;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
Metrics.globalRegistry
.config()
.meterFilter(MeterFilter.maximumAllowableTags("reactor.netty.http.server", "URI", 100, MeterFilter.deny()));
DisposableServer server =
HttpServer.create()
.metrics(true, s -> {
if (s.startsWith("/stream/")) {
return "/stream/{n}";
}
else if (s.startsWith("/bytes/")) {
return "/bytes/{n}";
}
return s;
})
.route(r ->
r.get("/stream/{n}",
(req, res) -> res.sendString(Mono.just(req.param("n"))))
.get("/bytes/{n}",
(req, res) -> res.sendString(Mono.just(req.param("n")))))
.bindNow();
server.onDispose()
.block();
}
}
當需要與Micrometer以外的系統集成時需要HTTP服務器指標,或者您想與Micrometer提供自己的集成時,可以提供自己的指標記錄器,如下所示:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/metrics/custom/Application.java
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.http.server.HttpServer;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.metrics(true, CustomHttpServerMetricsRecorder::new)
.route(r ->
r.get("/stream/{n}",
(req, res) -> res.sendString(Mono.just(req.param("n"))))
.get("/bytes/{n}",
(req, res) -> res.sendString(Mono.just(req.param("n")))))
.bindNow();
server.onDispose()
.block();
}
5.10. Unix Domain Sockets
使用本機傳輸時,HTTP服務器支持Unix域套接字(UDS)。
以下示例顯示了如何使用UDS支持:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/uds/Application.java
import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.bindAddress(() -> new DomainSocketAddress("/tmp/test.sock"))
.bindNow();
server.onDispose()
.block();
}
}
6. HTTP Client
Reactor Netty提供了易於使用和易於配置的HttpClient。它隱藏了創建HTTP客戶端所需的大多數Netty功能,並增加了Reactive Streams背壓。
6.1. Connect
要將HTTP客戶端連接到給定的HTTP端點,必須創建並配置HttpClient實例。以下示例顯示了如何執行此操作:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/connect/Application.java
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client = HttpClient.create();
client.get()
.uri("http://example.com/")
.response()
.block();
}
}
以下示例使用WebSocket:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/websocket/Application.java
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Flux;
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client = HttpClient.create();
client.websocket()
.uri("wss://echo.websocket.org")
.handle((inbound, outbound) -> {
inbound.receive()
.asString()
.take(1)
.subscribe(System.out::println);
final byte[] msgBytes = "hello".getBytes(CharsetUtil.ISO_8859_1);
return outbound.send(Flux.just(Unpooled.wrappedBuffer(msgBytes), Unpooled.wrappedBuffer(msgBytes)))
.neverComplete();
})
.blockLast();
}
}
6.1.1. Host and Port
為了連接到特定的主機和端口,可以將以下配置應用於HTTP客戶端:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/address/Application.java
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.host("example.com")
.port(80);
client.get()
.uri("/")
.response()
.block();
}
}
6.2. Writing Data
要將數據發送到給定的HTTP端點,可以使用send(Publisher)方法提供一個Publisher。默認情況下,Transfer-Encoding:chunked適用於預期請求正文的那些HTTP方法。通過請求標頭提供的Content-Length會禁用Transfer-Encoding:分塊(如有必要)。以下示例發送hello:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/send/Application.java
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client = HttpClient.create();
client.post()
.uri("http://example.com/")
.send(ByteBufFlux.fromString(Mono.just("hello")))
.response()
.block();
}
}
6.2.1. Adding Headers and Other Metadata
將數據發送到給定的HTTP端點時,您可能需要發送其他標頭,cookie和其他元數據。您可以使用以下配置來這樣做:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/send/headers/Application.java
import io.netty.handler.codec.http.HttpHeaderNames;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.headers(h -> h.set(HttpHeaderNames.CONTENT_LENGTH, 5));
client.post()
.uri("http://example.com/")
.send(ByteBufFlux.fromString(Mono.just("hello")))
.response()
.block();
}
}
Compression
您可以在HTTP客戶端上啟用壓縮,這意味着將請求標頭Accept-Encoding添加到了請求標頭中。以下示例顯示了如何執行此操作:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/compression/Application.java
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.compress(true);
client.get()
.uri("http://example.com/")
.response()
.block();
}
}
Auto-Redirect Support
您可以配置HTTP客戶端以啟用自動重定向支持。
Reactor Netty提供了兩種不同的自動重定向支持策略:
- followRedirect(boolean):指定是否為狀態301 | 302 | 307 | 308啟用HTTP自動重定向支持。
- followRedirect(BiPredicate ):如果提供的謂詞匹配,則啟用自動重定向支持。
以下示例使用followRedirect(true):
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/redirect/Application.java
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.followRedirect(true);
client.get()
.uri("http://example.com/")
.response()
.block();
}
}
6.3. Consuming Data
要從給定的HTTP端點接收數據,可以使用HttpClient.ResponseReceiver中的一種方法。以下示例使用responseContent方法:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/read/Application.java
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client = HttpClient.create();
client.get()
.uri("http://example.com/")
.responseContent()
.aggregate()
.asString()
.block();
}
}
6.3.1. Reading Headers and Other Metadata
從給定的HTTP端點接收數據時,您可以檢查響應標頭,狀態代碼和其他元數據。您可以使用HttpClientResponse獲得此其他元數據。以下示例顯示了如何執行此操作。
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/read/status/Application.java
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client = HttpClient.create();
client.get()
.uri("http://example.com/")
.responseSingle((resp, bytes) -> {
System.out.println(resp.status());
return bytes.asString();
})
.block();
}
}
6.3.2. HTTP Response Decoder
默認情況下,Netty為傳入的響應配置一些限制,例如:
- 初始行的最大長度。
- 請求頭的最大長度。
- 內容或每個塊的最大長度。
有關更多信息,請參見HttpResponseDecoder。
默認情況下,HTTP客戶端配置有以下設置:
./../../reactor-netty-http/src/main/java/reactor/netty/http/HttpDecoderSpec.java
public static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096;
public static final int DEFAULT_MAX_HEADER_SIZE = 8192;
public static final int DEFAULT_MAX_CHUNK_SIZE = 8192;
public static final boolean DEFAULT_VALIDATE_HEADERS = true;
public static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
./../../reactor-netty-http/src/main/java/reactor/netty/http/client/HttpResponseDecoderSpec.java
public static final boolean DEFAULT_FAIL_ON_MISSING_RESPONSE = false;
public static final boolean DEFAULT_PARSE_HTTP_AFTER_CONNECT_REQUEST = false;
/**
* The maximum length of the content of the HTTP/2.0 clear-text upgrade request.
* By default the client will allow an upgrade request with up to 65536 as
* the maximum length of the aggregated content.
*/
public static final int DEFAULT_H2C_MAX_CONTENT_LENGTH = 65536;
當需要更改這些默認設置時,可以按以下方式配置HTTP客戶端:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/responsedecoder/Application.java
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.httpResponseDecoder(spec -> spec.maxHeaderSize(16384));
client.get()
.uri("http://example.com/")
.responseContent()
.aggregate()
.asString()
.block();
}
}
6.4. TCP-level Configuration
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/channeloptions/Application.java
import io.netty.channel.ChannelOption;
import reactor.netty.http.client.HttpClient;
import java.net.InetSocketAddress;
public class Application {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.bindAddress(() -> new InetSocketAddress("host", 1234))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
String response =
client.get()
.uri("http://example.com/")
.responseContent()
.aggregate()
.asString()
.block();
System.out.println("Response " + response);
}
}
6.4.1. Wire Logger
當需要檢查對等點之間的流量時,Reactor Netty提供有線記錄。默認情況下,禁用有線日志記錄。要啟用它,必須將記錄器react.netty.http.client.HttpClient級別設置為DEBUG並應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/wiretap/Application.java
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.wiretap(true);
client.get()
.uri("http://example.com/")
.response()
.block();
}
}
6.5. SSL and TLS
當需要SSL或TLS時,可以應用下一個示例中顯示的配置。默認情況下,如果OpenSSL可用,則將SslProvider.OPENSSL提供程序用作提供程序。否則,將使用SslProvider.JDK提供程序。您可以通過使用SslContextBuilder或通過設置-Dio.netty.handler.ssl.noOpenSsl = true來切換提供程序。以下示例使用SslContextBuilder:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/security/Application.java
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
HttpClient client =
HttpClient.create()
.secure(spec -> spec.sslContext(sslContextBuilder));
client.get()
.uri("https://example.com/")
.response()
.block();
}
}
6.5.1. Server Name Indication
默認情況下,HTTP客戶端將遠程主機名作為SNI服務器名發送。當需要更改此默認設置時,可以按以下方式配置HTTP客戶端:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/sni/Application.java
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.http.client.HttpClient;
import javax.net.ssl.SNIHostName;
public class Application {
public static void main(String[] args) throws Exception {
SslContext sslContext = SslContextBuilder.forClient().build();
HttpClient client =
HttpClient.create()
.secure(spec -> spec.sslContext(sslContext)
.serverNames(new SNIHostName("test.com")));
client.get()
.uri("https://127.0.0.1:8080/")
.response()
.block();
}
}
6.6. Retry Strategies
默認情況下,如果HTTP客戶端在TCP級別中止,則HTTP客戶端將重試該請求一次。
6.7. HTTP/2
默認情況下,HTTP客戶端支持HTTP / 1.1。如果需要HTTP / 2,則可以通過配置獲取它。除了協議配置之外,如果您需要H2而不是H2C(明文),則還必須配置SSL。
以下清單提供了一個簡單的“ H2”示例:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/http2/H2Application.java
import io.netty.handler.codec.http.HttpHeaders;
import reactor.core.publisher.Mono;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
import reactor.util.function.Tuple2;
public class H2Application {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.protocol(HttpProtocol.H2)
.secure();
Tuple2<String, HttpHeaders> response =
client.get()
.uri("https://example.com/")
.responseSingle((res, bytes) -> bytes.asString()
.zipWith(Mono.just(res.responseHeaders())))
.block();
System.out.println("Used stream ID: " + response.getT2().get("x-http2-stream-id"));
System.out.println("Response: " + response.getT1());
}
}
以下清單展示了一個簡單的“ H2C”示例:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/http2/H2CApplication.java
import io.netty.handler.codec.http.HttpHeaders;
import reactor.core.publisher.Mono;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
import reactor.util.function.Tuple2;
public class H2CApplication {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.protocol(HttpProtocol.H2C);
Tuple2<String, HttpHeaders> response =
client.get()
.uri("http://localhost:8080/")
.responseSingle((res, bytes) -> bytes.asString()
.zipWith(Mono.just(res.responseHeaders())))
.block();
System.out.println("Used stream ID: " + response.getT2().get("x-http2-stream-id"));
System.out.println("Response: " + response.getT1());
}
}
6.7.1. Protocol Selection
./../../reactor-netty-http/src/main/java/reactor/netty/http/HttpProtocol.java
public enum HttpProtocol {
/**
* The default supported HTTP protocol by HttpServer and HttpClient
*/
HTTP11,
/**
* HTTP/2.0 support with TLS
* <p>If used along with HTTP/1.1 protocol, HTTP/2.0 will be the preferred protocol.
* While negotiating the application level protocol, HTTP/2.0 or HTTP/1.1 can be chosen.
* <p>If used without HTTP/1.1 protocol, HTTP/2.0 will always be offered as a protocol
* for communication with no fallback to HTTP/1.1.
*/
H2,
/**
* HTTP/2.0 support with clear-text.
* <p>If used along with HTTP/1.1 protocol, will support H2C "upgrade":
* Request or consume requests as HTTP/1.1 first, looking for HTTP/2.0 headers
* and {@literal Connection: Upgrade}. A server will typically reply a successful
* 101 status if upgrade is successful or a fallback HTTP/1.1 response. When
* successful the client will start sending HTTP/2.0 traffic.
* <p>If used without HTTP/1.1 protocol, will support H2C "prior-knowledge": Doesn't
* require {@literal Connection: Upgrade} handshake between a client and server but
* fallback to HTTP/1.1 will not be supported.
*/
H2C
}
6.8. Metrics
HTTP客戶端支持與Micrometer的內置集成。它使用前綴react.netty.http.client公開所有度量。
metric name | type | description |
---|---|---|
reactor.netty.http.client.data.received | DistributionSummary | Amount of the data received, in bytes |
reactor.netty.http.client.data.sent | DistributionSummary | Amount of the data sent, in bytes |
reactor.netty.http.client.errors | Counter | Number of errors that occurred |
reactor.netty.http.client.tls.handshake.time | Timer | Time spent for TLS handshake |
reactor.netty.http.client.connect.time | Timer | Time spent for connecting to the remote address |
reactor.netty.http.client.address.resolver | Timer | Time spent for resolving the address |
reactor.netty.http.client.data.received.time | Timer | Time spent in consuming incoming data |
reactor.netty.http.client.data.sent.time | Timer | Time spent in sending outgoing data |
reactor.netty.http.client.response.time | Timer | Total time for the request/response |
These additional metrics are also available:
Pooled ConnectionProvider
metrics
metric name | type | description |
---|---|---|
reactor.netty.connection.provider.total.connections | Gauge | The number of all connections, active or idle |
reactor.netty.connection.provider.active.connections | Gauge | The number of the connections that have been successfully acquired and are in active use |
reactor.netty.connection.provider.idle.connections | Gauge | The number of the idle connections |
reactor.netty.connection.provider.pending.connections | Gauge | The number of requests that are waiting for a connection |
ByteBufAllocator
metrics
metric name | type | description |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory | Gauge | The number of the bytes of the heap memory |
reactor.netty.bytebuf.allocator.used.direct.memory | Gauge | The number of the bytes of the direct memory |
reactor.netty.bytebuf.allocator.used.heap.arenas | Gauge | The number of heap arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.direct.arenas | Gauge | The number of direct arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.threadlocal.caches | Gauge | The number of thread local caches (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.tiny.cache.size | Gauge | The size of the tiny cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.small.cache.size | Gauge | The size of the small cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.normal.cache.size | Gauge | The size of the normal cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.chunk.size | Gauge | The chunk size for an arena (when PooledByteBufAllocator ) |
以下示例啟用了該集成:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/metrics/Application.java
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.config.MeterFilter;
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
Metrics.globalRegistry
.config()
.meterFilter(MeterFilter.maximumAllowableTags("reactor.netty.http.client", "URI", 100, MeterFilter.deny()));
HttpClient client =
HttpClient.create()
.metrics(true, s -> {
if (s.startsWith("/stream/")) {
return "/stream/{n}";
}
else if (s.startsWith("/bytes/")) {
return "/bytes/{n}";
}
return s;
});
client.get()
.uri("http://httpbin.org/stream/2")
.responseContent()
.blockLast();
client.get()
.uri("http://httpbin.org/bytes/1024")
.responseContent()
.blockLast();
}
}
如果要與Micrometer以外的系統進行集成時需要HTTP客戶端指標,或者您想與自己的Micrometer提供集成,則可以提供自己的指標記錄器,如下所示:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/metrics/custom/Application.java
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.http.client.HttpClient;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.metrics(true, CustomHttpClientMetricsRecorder::new);
client.get()
.uri("https://httpbin.org/stream/2")
.response()
.block();
}
6.9. Unix Domain Sockets
使用本地傳輸時,HTTP客戶端支持Unix域套接字(UDS)。
以下示例顯示了如何使用UDS支持:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/uds/Application.java
import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.http.client.HttpClient;
public class Application {
public static void main(String[] args) {
HttpClient client =
HttpClient.create()
.remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock"));
client.get()
.uri("/")
.response()
.block();
}
}
7. UDP Server
Reactor Netty提供了易於使用和易於配置的UdpServer。它隱藏了創建UDP服務器所需的大多數Netty功能,並增加了Reactive Streams背壓。
7.1. Starting and Stopping
要啟動UDP服務器,必須創建和配置UdpServer實例。默認情況下,主機配置為localhost,端口配置為12012。以下示例顯示如何創建和啟動UDP服務器:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/create/Application.java
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
返回的Connection提供了一個簡單的服務器API,包括disposeNow(),該API以阻塞的方式關閉了服務器。
7.1.1. Host and Port
為了在特定主機和端口上提供服務,可以將以下配置應用於UDP服務器:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/address/Application.java
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.host("localhost")
.port(8080)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
7.2. Writing Data
要將數據發送到遠程對等方,必須附加I / O處理程序。 I / O處理程序有權訪問UdpOutbound,以便能夠寫入數據。以下示例顯示如何發送hello:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/send/Application.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.handle((in, out) ->
out.sendObject(
in.receiveObject()
.map(o -> {
if (o instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) o;
ByteBuf buf = Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8);
return new DatagramPacket(buf, p.sender());
}
else {
return Mono.error(new Exception("Unexpected type of the message: " + o));
}
})))
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
7.3. Consuming Data
要從遠程對等方接收數據,必須附加一個I / O處理程序。 I / O處理程序有權訪問UdpInbound,以便能夠讀取數據。以下示例顯示了如何使用數據:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/read/Application.java
import io.netty.channel.socket.DatagramPacket;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.handle((in, out) ->
out.sendObject(
in.receiveObject()
.map(o -> {
if (o instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) o;
return new DatagramPacket(p.content().retain(), p.sender());
}
else {
return Mono.error(new Exception("Unexpected type of the message: " + o));
}
})))
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
7.4. Lifecycle Callbacks
提供以下生命周期回調,以便您擴展UDP服務器:
doOnBind
:當服務器通道即將綁定時調用。- doOnBound:綁定服務器通道時調用。
- doOnUnbound:當服務器通道未綁定時調用。
以下示例使用doOnBound方法:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/lifecycle/Application.java
import io.netty.handler.codec.LineBasedFrameDecoder;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.doOnBound(conn -> conn.addHandler(new LineBasedFrameDecoder(8192)))
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
7.5. Connection Configuration
本節描述了可以在UDP級別上使用的三種配置:
7.5.1. Channel Options
默認情況下,UDP服務器配置有以下選項:
./../../reactor-netty-core/src/main/java/reactor/netty/udp/UdpServerBind.java
UdpServerBind() {
this.config = new UdpServerConfig(
Collections.singletonMap(ChannelOption.AUTO_READ, false),
() -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
}
如果需要其他選項或需要更改當前選項,則可以應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/channeloptions/Application.java
import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
有關Netty頻道選項的更多信息,請參見以下鏈接:
7.5.2. Wire Logger
當需要檢查對等點之間的流量時,Reactor Netty提供有線記錄。默認情況下,禁用有線日志記錄。要啟用它,您必須將記錄器react.netty.udp.UdpServer級別設置為DEBUG並應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/wiretap/Application.java
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.wiretap(true)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
7.5.3. Event Loop Group
默認情況下,UDP服務器使用“事件循環組”,其中工作線程數等於初始化時可用於運行時的處理器數(但最小值為4)。需要其他配置時,可以使用LoopResource#create方法之一。
“事件循環組”的默認配置如下:
./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java
/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available
*/
如果需要更改這些設置,則可以應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/eventloop/Application.java
import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
Connection server =
UdpServer.create()
.runOn(loop)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
7.6. Metrics
UDP服務器支持與Micrometer的內置集成。它使用前綴react.netty.udp.server公開所有度量。
下表提供了有關UDP服務器指標的信息:
metric name | type | description |
---|---|---|
reactor.netty.udp.server.data.received | DistributionSummary | Amount of the data received, in bytes |
reactor.netty.udp.server.data.sent | DistributionSummary | Amount of the data sent, in bytes |
reactor.netty.udp.server.errors | Counter | Number of errors that occurred |
These additional metrics are also available:
ByteBufAllocator
metrics
metric name | type | description |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory | Gauge | The number of the bytes of the heap memory |
reactor.netty.bytebuf.allocator.used.direct.memory | Gauge | The number of the bytes of the direct memory |
reactor.netty.bytebuf.allocator.used.heap.arenas | Gauge | The number of heap arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.direct.arenas | Gauge | The number of direct arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.threadlocal.caches | Gauge | The number of thread local caches (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.tiny.cache.size | Gauge | The size of the tiny cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.small.cache.size | Gauge | The size of the small cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.normal.cache.size | Gauge | The size of the normal cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.chunk.size | Gauge | The chunk size for an arena (when PooledByteBufAllocator ) |
The following example enables that integration:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/metrics/Application.java
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.metrics(true)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
Enables the built-in integration with Micrometer | |
---|---|
如果要與Micrometer以外的系統集成需要UDP服務器度量標准,或者要與Micrometer提供自己的集成,則可以提供自己的度量記錄器,如下所示:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/metrics/custom/Application.java
import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.udp.UdpServer;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.metrics(true, CustomChannelMetricsRecorder::new)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
Enables UDP server metrics and provides ChannelMetricsRecorder implementation. |
|
---|---|
Suggest Edit to "UDP Server"
8. UDP Client
Reactor Netty提供了易於使用和易於配置的UdpClient。它隱藏了創建UDP客戶端所需的大多數Netty功能,並增加了Reactive Streams背壓。
8.1. Connecting and Disconnecting
要將UDP客戶端連接到給定的端點,必須創建並配置UdpClient實例。默認情況下,主機配置為localhost,端口為12012。以下示例顯示如何創建和連接UDP客戶端:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/create/Application.java
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
返回的Connection提供了一個簡單的連接API,包括disposeNow(),該API以阻塞的方式關閉了客戶端。
8.1.1. Host and Port
要連接到特定的主機和端口,可以將以下配置應用於UDP客戶端:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/address/Application.java
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
8.2. Writing Data
要將數據發送到給定的對等方,必須附加I / O處理程序。 I / O處理程序有權訪問UdpOutbound,以便能夠寫入數據。
以下示例顯示如何發送hello:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/send/Application.java
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.handle((udpInbound, udpOutbound) -> udpOutbound.sendString(Mono.just("hello")))
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
Sends hello string to the remote peer. |
|
---|---|
8.3. Consuming Data
要從給定的對等方接收數據,必須附加I / O處理程序。 I / O處理程序有權訪問UdpInbound,以便能夠讀取數據。以下示例顯示了如何使用數據:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/read/Application.java
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.handle((udpInbound, udpOutbound) -> udpInbound.receive().then())
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
8.4. Lifecycle Callbacks
提供以下生命周期回調,以便您擴展UDP客戶端:
- doOnConnect:通道即將連接時調用。
- doOnConnected:連接通道后調用。
- doOnDisconnected:斷開通道后調用。
以下示例使用doOnConnected方法:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/lifecycle/Application.java
import io.netty.handler.codec.LineBasedFrameDecoder;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.doOnConnected(conn -> conn.addHandler(new LineBasedFrameDecoder(8192)))
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
8.5. Connection Configuration
本節描述了可以在UDP級別上使用的三種配置:
8.5.1. Channel Options
默認情況下,UDP客戶端配置有以下選項:
./../../reactor-netty-core/src/main/java/reactor/netty/udp/UdpClientConnect.java
UdpClientConnect() {
this.config = new UdpClientConfig(
ConnectionProvider.newConnection(),
Collections.singletonMap(ChannelOption.AUTO_READ, false),
() -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
}
如果需要其他選項或需要更改當前選項,則可以應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/channeloptions/Application.java
import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
您可以在以下鏈接中找到有關Netty頻道選項的更多信息:
8.5.2. Wire Logger
當需要檢查對等點之間的流量時,Reactor Netty提供有線記錄。默認情況下,禁用有線日志記錄。要啟用它,必須將logger的反應堆.netty.udp.UdpClient級別設置為DEBUG並應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/wiretap/Application.java
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.wiretap(true)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
8.5.3. Event Loop Group
缺省情況下,UDP客戶端使用“事件循環組”,其中工作線程數等於初始化時可用於運行時的處理器數(但最小值為4)。當需要其他配置時,可以使用LoopResources#create方法之一。
以下清單顯示了“事件循環組”的默認配置:
./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java
/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available
*/
如果需要更改這些設置,則可以應用以下配置:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/eventloop/Application.java
import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.runOn(loop)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
8.6. Metrics
UDP客戶端支持與Micrometer的內置集成。它使用前綴react.netty.udp.client公開所有度量。
The following table provides information for the UDP client metrics:
metric name | type | description |
---|---|---|
reactor.netty.udp.client.data.received | DistributionSummary | Amount of the data received, in bytes |
reactor.netty.udp.client.data.sent | DistributionSummary | Amount of the data sent, in bytes |
reactor.netty.udp.client.errors | Counter | Number of errors that occurred |
reactor.netty.udp.client.connect.time | Timer | Time spent for connecting to the remote address |
reactor.netty.udp.client.address.resolver | Timer | Time spent for resolving the address |
These additional metrics are also available:
ByteBufAllocator
metrics
metric name | type | description |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory | Gauge | The number of the bytes of the heap memory |
reactor.netty.bytebuf.allocator.used.direct.memory | Gauge | The number of the bytes of the direct memory |
reactor.netty.bytebuf.allocator.used.heap.arenas | Gauge | The number of heap arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.direct.arenas | Gauge | The number of direct arenas (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.threadlocal.caches | Gauge | The number of thread local caches (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.tiny.cache.size | Gauge | The size of the tiny cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.small.cache.size | Gauge | The size of the small cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.normal.cache.size | Gauge | The size of the normal cache (when PooledByteBufAllocator ) |
reactor.netty.bytebuf.allocator.used.chunk.size | Gauge | The chunk size for an arena (when PooledByteBufAllocator ) |
The following example enables that integration:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/metrics/Application.java
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.metrics(true)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
如果要與Micrometer以外的系統進行集成時需要UDP客戶端指標,或者您想與Micrometer提供自己的集成,則可以提供自己的指標記錄器,如下所示:
./../../reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/metrics/custom/Application.java
import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.udp.UdpClient;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.metrics(true, CustomChannelMetricsRecorder::new)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}