Spring中AsyncRestTemplate的應用
Web應用程序通常需要查詢外部REST服務。 在為滿足這些需求擴展應用程序時,HTTP和同步調用的本質會帶來挑戰:可能會阻塞多個線程,等待遠程HTTP響應。
AsyncRestTemplate類,在開發REST客戶端時允許非阻塞異步支持。
Spring的中心類,用於異步客戶端HTTP訪問。 公開與RestTemplate相似的方法,但返回ListenableFuture包裝器,而不是具體的結果。AsyncRestTemplate通過getRestOperations()方法公開一個同步RestTemplate,並與該RestTemplate共享其錯誤處理程序和消息轉換器。
注意:默認情況下,AsyncRestTemplate依靠標准JDK工具建立HTTP連接。 您可以通過使用接受AsyncClientHttpRequestFactory的構造函數來切換使用其他HTTP庫,例如Apache HttpComponents,Netty和OkHttp。
默認的AsyncRestTemplate構造函數注冊一個SimpleAsyncTaskExecutor來執行HTTP請求。 當處理大量短期請求時,像ThreadPoolTaskExecutor這樣的線程池TaskExecutor實現可能是一個不錯的選擇。
即支持HTTPS調用也支持HTTP調用
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import javax.net.ssl.*;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.Socket;
import java.security.cert.X509Certificate;
/**
*
*/
public class HttpsClientRequestFactory extends SimpleClientHttpRequestFactory {
@Override
protected void prepareConnection(HttpURLConnection connection, String httpMethod) {
try {
if (!(connection instanceof HttpsURLConnection)) {
//這樣即支持http 也支持 https
return;
// throw new RuntimeException("An instance of HttpsURLConnection is expected");
}
HttpsURLConnection httpsConnection = (HttpsURLConnection) connection;
TrustManager[] trustAllCerts = new TrustManager[]{
new X509TrustManager() {
@Override
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}
@Override
public void checkClientTrusted(X509Certificate[] certs, String authType) {
}
@Override
public void checkServerTrusted(X509Certificate[] certs, String authType) {
}
}
};
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustAllCerts, new java.security.SecureRandom());
httpsConnection.setSSLSocketFactory(new MyCustomSSLSocketFactory(sslContext.getSocketFactory()));
httpsConnection.setHostnameVerifier(new HostnameVerifier() {
@Override
public boolean verify(String s, SSLSession sslSession) {
return true;
}
});
super.prepareConnection(httpsConnection, httpMethod);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* We need to invoke sslSocket.setEnabledProtocols(new String[] {"SSLv3"});
* see http://www.oracle.com/technetwork/java/javase/documentation/cve-2014-3566-2342133.html (Java 8 section)
*/
// SSLSocketFactory用於創建 SSLSockets
private static class MyCustomSSLSocketFactory extends SSLSocketFactory {
private final SSLSocketFactory delegate;
public MyCustomSSLSocketFactory(SSLSocketFactory delegate) {
this.delegate = delegate;
}
// 返回默認啟用的密碼套件。除非一個列表啟用,對SSL連接的握手會使用這些密碼套件。
// 這些默認的服務的最低質量要求保密保護和服務器身份驗證
@Override
public String[] getDefaultCipherSuites() {
return delegate.getDefaultCipherSuites();
}
// 返回的密碼套件可用於SSL連接啟用的名字
@Override
public String[] getSupportedCipherSuites() {
return delegate.getSupportedCipherSuites();
}
@Override
public Socket createSocket(final Socket socket, final String host, final int port,
final boolean autoClose) throws IOException {
final Socket underlyingSocket = delegate.createSocket(socket, host, port, autoClose);
return overrideProtocol(underlyingSocket);
}
@Override
public Socket createSocket(final String host, final int port) throws IOException {
final Socket underlyingSocket = delegate.createSocket(host, port);
return overrideProtocol(underlyingSocket);
}
@Override
public Socket createSocket(final String host, final int port, final InetAddress localAddress,
final int localPort) throws
IOException {
final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
return overrideProtocol(underlyingSocket);
}
@Override
public Socket createSocket(final InetAddress host, final int port) throws IOException {
final Socket underlyingSocket = delegate.createSocket(host, port);
return overrideProtocol(underlyingSocket);
}
@Override
public Socket createSocket(final InetAddress host, final int port, final InetAddress localAddress,
final int localPort) throws
IOException {
final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
return overrideProtocol(underlyingSocket);
}
private Socket overrideProtocol(final Socket socket) {
if (!(socket instanceof SSLSocket)) {
throw new RuntimeException("An instance of SSLSocket is expected");
}
((SSLSocket) socket).setEnabledProtocols(new String[]{"TLSv1"});
return socket;
}
}
}
RestTemplate
RestTemplate restTemplate = new RestTemplate(new HttpsClientRequestFactory());
//要使用Apache HttpComponents代替本機java.net功能,請按以下方式構造RestTemplate:
RestTemplate template = new RestTemplate(new HttpComponentsClientHttpRequestFactory());
AsyncRestTemplate
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.client.AsyncRestTemplate;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @author Created by niugang on 2020/4/10/16:47
*/
public class AsyncRestTemplateTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//即支持http 也 支持 https
AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
HttpsClientRequestFactory httpsClientRequestFactory = new HttpsClientRequestFactory();
httpsClientRequestFactory.setTaskExecutor(new SimpleAsyncTaskExecutor());
asyncRestTemplate.setAsyncRequestFactory(httpsClientRequestFactory);
// async call
Future<ResponseEntity<String>> futureEntity = asyncRestTemplate.getForEntity(
"https://11.12.115.104/api/serverStatus/serverInfo", String.class);
ResponseEntity<String> stringResponseEntity = futureEntity.get();
System.out.println(stringResponseEntity.getBody());
ListenableFuture<ResponseEntity<String>> futureEntity1 = asyncRestTemplate.getForEntity(
"http://localhost:8088/boot/dateTest", String.class);
// register a callback
futureEntity1.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> entity) {
System.out.println("success:"+entity.getBody());
}
@Override
public void onFailure(Throwable t) {
System.out.println("error");
}
});
System.out.println("主線程");
}
}
要使用Apache HttpComponents代替本機java.net功能
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.10</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.10</version>
</dependency>
import org.apache.http.Consts;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.DnsResolver;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.conn.SystemDefaultDnsResolver;
import org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.client.HttpAsyncClient;
import org.apache.http.nio.conn.ManagedNHttpClientConnection;
import org.apache.http.nio.conn.NHttpConnectionFactory;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.client.AsyncRestTemplate;
import java.nio.charset.CodingErrorAction;
import java.util.concurrent.ExecutionException;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.nio.codecs.DefaultHttpResponseParserFactory;
import org.apache.http.ssl.SSLContextBuilder;
import javax.net.ssl.SSLContext;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.Future;
/**
* @author Created by niugang on 2020/4/10/16:47
*/
public class AsyncRestTemplateTest2 {
public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
//即支持http 也 支持 https
AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
// HTTPConnection工廠 :配置請求/解析響應
NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory =
new ManagedNHttpClientConnectionFactory(
DefaultHttpRequestWriterFactory.INSTANCE,
DefaultHttpResponseParserFactory.INSTANCE, HeapByteBufferAllocator.INSTANCE);
//ssl 連接設置 無須證書也能訪問 https
//使用 loadTrustMaterial() 方法實現一個信任策略,信任所有證書
SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
// 信任所有
@Override
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
return true;
}
}).build();
// 為支持的協議方案創建自定義連接套接字工廠的注冊表。
Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE))
.build();
//DNS解析器
DnsResolver dnsResolver = SystemDefaultDnsResolver.INSTANCE;
// Create I/O reactor configuration
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setConnectTimeout(30000)
.setSoTimeout(30000)
.build();
// 創建一個定制的I/O reactort
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
// 使用自定義配置創建連接管理器。
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(
ioReactor, connFactory, sessionStrategyRegistry, dnsResolver);
//創建連接配置
ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setMalformedInputAction(CodingErrorAction.IGNORE)
.setUnmappableInputAction(CodingErrorAction.IGNORE)
.setCharset(Consts.UTF_8)
.build();
// 將連接管理器配置為默認使用或針對特定主機使用連接配置。
connManager.setDefaultConnectionConfig(connectionConfig);
// 配置永久連接的最大總數或每個路由限制
// 可以保留在池中或由連接管理器租用。
//每個路由的默認最大連接,每個路由實際最大連接為默認為DefaultMaxPreRoute控制,而MaxTotal是控制整個池子最大數
connManager.setMaxTotal(100);
connManager.setDefaultMaxPerRoute(10);
// 創建全局請求配置
RequestConfig defaultRequestConfig = RequestConfig.custom()
.setCookieSpec(CookieSpecs.DEFAULT)
.setSocketTimeout(5 * 1000)
.setConnectTimeout(5 * 1000)
.setExpectContinueEnabled(true)
.build();
// Create an HttpClientUtils with the given custom dependencies and configuration.
HttpAsyncClient httpclient = HttpAsyncClients.custom()
.setConnectionManager(connManager)
.setDefaultRequestConfig(defaultRequestConfig)
.build();
asyncRestTemplate.setAsyncRequestFactory(new HttpComponentsAsyncClientHttpRequestFactory(httpclient));
// async call
Future<ResponseEntity<String>> futureEntity = asyncRestTemplate.getForEntity(
"https://11.12.115.104/api/serverStatus/serverInfo", String.class);
ResponseEntity<String> stringResponseEntity = futureEntity.get();
System.out.println(stringResponseEntity.getBody());
ListenableFuture<ResponseEntity<String>> futureEntity1 = asyncRestTemplate.getForEntity(
"http://localhost:8088/boot/dateTest", String.class);
// register a callback
futureEntity1.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> entity) {
System.out.println("success:"+entity.getBody());
}
@Override
public void onFailure(Throwable t) {
System.out.println("error");
}
});
System.out.println("主線程");
}
}