TCP/UDP長連接的誤區:
對於TCP來說,屬於連接式通信協議。需要實例化ServerSocket對象,通過serverSocket.accept()來建立阻塞,同時還可以通過bind方法來綁定服務端IP和PORT代碼如下:
服務端:
package com.example.springbootkafka2; import com.example.serverconfig.ServerConfig; import lombok.SneakyThrows; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; import org.springframework.stereotype.Component; import java.net.*; import java.io.*; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @Component public class ServerSocket1 implements ApplicationRunner, Ordered { @Autowired private Config config; @Autowired private ServerConfig serverConfig; @Autowired private Producer producer; //監聽的端口號和ip private int PORT; private String IP; int i; String ip; String message; //定義list集合 List<TopicsInfo> list; @Override public void run(ApplicationArguments args){//實現ApplicationRunner, Ordered接口中的run方法 PORT = serverConfig.getPort(); IP=serverConfig.getIp(); list= config.getTopicList(); init();//開啟服務端線程 } @Override public int getOrder() { return 0; } private void init(){ new Thread(new test1()).start(); } //開啟一個test1線程 private class test1 implements Runnable{ private String topic; @Override public void run() { try { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(IP,PORT)); while(true){ Socket client = serverSocket.accept(); //獲取客戶端的IP InetAddress ia= client.getInetAddress(); ip = ia.getHostAddress(); System.out.println("客戶端ip:"+ip); //.......................................................................... //判斷當前客戶端是否與服務端有相匹配的IP port topic int size = list.size(); for( i=0 ;i<size;i++){ if(list.get(i).getIp().equals(ip)){ System.out.println("匹配成功,進行鏈接"+list.get(i).getIp()); topic=list.get(i).getTopic(); //一個客戶端連接就開戶一個線程處理 new Thread(new HandlerThread(client,topic)).start(); break; }else{ System.out.println("對不起,沒有匹配的" + list.get(i).getIp()); } } } //........................................................................... } catch (IOException e) { System.out.println("服務器異常:" + e.getMessage()); } } } //服務端線程操作過程 private class HandlerThread implements Runnable{ private Socket client; private String topic; public HandlerThread(Socket client ,String topic) { this.client = client; this.topic = topic; } @Override public void run() { try { //取得輸入流 InputStream is = client.getInputStream(); BufferedReader bw = new BufferedReader(new InputStreamReader(is)); while (true) { String receive = bw.readLine(); //queue.add(receive); message=receive; producer.send(topic,message);//這個是指定主題發送 System.out.println("發送消息:"+message); System.out.println("發送成功...");//生產者發送消息, //讀取客戶端發送來的數據 System.out.println(receive); } } catch (IOException e) { System.out.println("服務器異常:" + e.getMessage()); }finally{ if(client != null){ try { client.close(); } catch (IOException e) { client = null; System.out.println("服務器異常:" + e.getMessage()); } } } } } /////生產者定義的線程 //private class SendThread implements Runnable{ // @SneakyThrows // @Override // public void run() { // while (true) { // //String message=queue.take(); // //System.out.println("對不起我現在是空的:"+queue); // //通過這個方法,給指定主題和消費者發消息 // producer.send(topic,message);//這個是指定主題發送 // System.out.println("發送消息:"+message); // System.out.println("發送成功...");//生產者發送消息 // } // } // } // //為生產者發送消息開啟一個線程 //private void send(){ // new Thread(new SendThread()).start(); // } }
客戶端:
package com.example.springbootkafka2; import com.example.serverconfig.ServerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; import org.springframework.stereotype.Component; import java.io.*; import java.net.Socket; import java.util.Scanner; @Component public class ClientSocket implements ApplicationRunner,Ordered { @Autowired private ServerConfig serverConfig; private String IP ;//服務器地址 private int PORT;//服務器端口號 // public void main(String[] args) throws IOException { // ClientSocket clientSocket = new ClientSocket(); // clientSocket.handler(); // } private void handler() throws IOException { //創建一個流套接字,連接到指定主機上的指定端口號 Socket client=null; try { client = new Socket(IP, PORT); //取得輸出流 OutputStream os = client.getOutputStream(); BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os)); //輸入要發送到服務端的消息 Scanner sc = new Scanner(System.in); while (true) { String ss = sc.nextLine(); System.out.println(ss); bw.write(ss+"\n"); bw.flush(); System.out.println("數據已輸出"); System.out.println(client); } // bw.close(); // os.close(); } catch (Exception e) { System.out.println("客戶端異常:" + e.getMessage()); } finally { // if (client != null) { // try { // client.close(); // } catch (IOException e) { // System.out.println("客戶端異常:" + e.getMessage()); // } // } } } @Override public void run(ApplicationArguments args) throws Exception {//實現ApplicationRunner, Ordered接口中的run方法 IP=serverConfig.getIp(); System.out.println("服務端IP:"+IP); PORT=serverConfig.getPort(); System.out.println("端口號:"+PORT); handler(); } @Override public int getOrder() { return 1; }//實現ApplicationRunner, Ordered接口中的getOrder方法 }
同理UDP:
UDP是以數據包的形式進行通信,不管服務端有沒有接收,客戶端會一直發,具體代碼如下:
服務端:
package com.example.udp; import com.example.springbootkafka2.Config; import com.example.springbootkafka2.Producer; import com.example.springbootkafka2.TopicsInfo; import lombok.SneakyThrows; import org.springframework.stereotype.Component; import com.example.serverconfig.ServerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; import java.net.*; import java.util.List; @Component public class UdpServer implements ApplicationRunner,Ordered{ @Autowired private Config config; @Autowired private ServerConfig serverConfig; @Autowired private Producer producer; //監聽的端口號和ip private int PORT; private String IP; //初始化全局變量 int i; String message; String ip; //定義list集合 List<TopicsInfo> list; @Override public void run(ApplicationArguments args){//實現ApplicationRunner, Ordered接口中的run方法 PORT = serverConfig.getPort(); IP=serverConfig.getIp(); list= config.getTopicList(); init();//開啟服務端線程 } @Override public int getOrder() { return 0; } private void init(){ new Thread(new test1()).start(); } //開啟一個test1線程 private class test1 implements Runnable{ private String topic; @SneakyThrows @Override public void run() { try { DatagramSocket socket = new DatagramSocket(PORT,InetAddress.getByName(IP)); while (true) { byte[] buf = new byte[1024]; DatagramPacket packet = new DatagramPacket(buf, buf.length); socket.receive(packet); message = new String(packet.getData());//轉換為String類型 //System.out.println(message); //獲取客戶端的IP InetAddress ia = packet.getAddress(); ip= ia.getHostAddress(); System.out.println("客戶端ip:"+ip); System.out.println(socket); //.......................................................................... //判斷當前客戶端是否與服務端有相匹配的IP port topic int size = list.size(); for (i = 0; i < size; i++) { if (list.get(i).getIp().equals(ip)) { System.out.println("匹配成功" + list.get(i).getIp()); topic = list.get(i).getTopic(); //一個客戶端連接就開戶一個線程處理 new Thread(new HandlerThread(topic)).start(); break; } else { System.out.println("對不起,沒有匹配的" + list.get(i).getIp()); } } } //........................................................................... } catch(SocketException e){ System.out.println("服務器異常:" + e.getMessage()); } } } private class HandlerThread implements Runnable{ private String topic; public HandlerThread(String topic) { this.topic = topic; } @Override public void run() { producer.send(topic,message);//這個是指定主題發送 System.out.println("發送消息:"+message); System.out.println("發送成功...");//生產者發送消息, } } }
客戶端:
package com.example.udp; import com.example.serverconfig.ServerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; import org.springframework.stereotype.Component; import java.io.*; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.Socket; import java.rmi.UnknownHostException; import java.util.Scanner; @Component public class UdpClient implements ApplicationRunner,Ordered{ @Autowired private ServerConfig serverConfig; private String IP ;//服務器地址 private int PORT;//服務器端口號 private void handler() throws IOException { //創建一個流套接字,連接到指定主機上的指定端口號 try { //輸入要發送到服務端的消息 Scanner sc = new Scanner(System.in); while (true) { String ss = sc.nextLine(); byte[] bytes =ss.getBytes(); DatagramPacket datagramPacket = new DatagramPacket(bytes,bytes.length, InetAddress.getByName(IP),PORT); DatagramSocket socket = new DatagramSocket();//創建套接字程序 socket.send(datagramPacket);//通過套接字發送數據 } } catch (UnknownHostException e) { System.out.println("客戶端異常:" + e.getMessage()); } } @Override public void run(ApplicationArguments args) throws Exception {//實現ApplicationRunner, Ordered接口中的run方法 IP=serverConfig.getIp(); System.out.println("服務端IP:"+IP); PORT=serverConfig.getPort(); System.out.println("端口號:"+PORT); handler(); } @Override public int getOrder() { return 1; }//實現ApplicationRunner, Ordered接口中的getOrder方法 }
配置文件
server:
port: 8088
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
topic: test,topic1
#生產者#消費者.....去匹配他們
####################
topics:
topicList[0]:
ip: 127.0.0.2
port: 8084
topic: test
topicList[1]:
ip: 127.0.0.1
port: 8085
topic: topic1
################
servers:
ip: 127.0.0.1
port: 8086
關於kafka的生產者與消費者代碼如下:
生產者:
package com.example.springbootkafka2; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; @Slf4j @Component public class Producer { @Autowired private KafkaTemplate kafkaTemplate;//沒有變 @Value("${spring.kafka.topic}") //引入application.properties中的數據 private String topics; public void send(String topics,String message) {//多個topic屬性 log.info("待發送的消息:{}", message); ListenableFuture<SendResult<String,String>> future =kafkaTemplate.send(topics,message); future.addCallback(success ->log.info("Producer發送消息成功"),fail->log.error("Producer發送消息失敗")); } }
消費者:
package com.example.springbootkafka2; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.concurrent.LinkedBlockingQueue; @Slf4j @Component public class Consumer { String message = null; // LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); //private static KafkaConsumer<String,String> consumer; @KafkaListener(topics = "#{'${spring.kafka.topic}'.split(',')}") public void receiver(ConsumerRecord record) throws InterruptedException { log.info("接收到的數據:主題:"+record.topic()+".....key:"+record.key()+"....值:"+record.value()); //System.out.println("主題:"+record.topic()+",值:"+record.value());//我在這輸出打印了 //。。。。。。。。。下面的先不看。。。。。。。。。。。 // queue.add((String) record.value()); // System.out.println("我已經進入了隊列:"+queue); // message = queue.take(); // System.out.println(message); // System.out.println("現在隊列為空:"+queue); } }
配置類
package com.example.springbootkafka2; import lombok.Data; @Data public class TopicsInfo { private String ip; private int port; private String topic; } package com.example.springbootkafka2; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.util.List; @Component @Configuration @ConfigurationProperties("topics") @EnableConfigurationProperties(Config.class) @Data public class Config { private List<TopicsInfo> topicList; }
以上是一個消息發送模塊,主要處理消息數據實現!如有不足之處,共同探討,還望多多留言指教。。。。。。。。。。。。。。