關於TCP/UDP通信長連接->消息隊列kafka+springboot


TCP/UDP長連接的誤區:

對於TCP來說,屬於連接式通信協議。需要實例化ServerSocket對象,通過serverSocket.accept()來建立阻塞,同時還可以通過bind方法來綁定服務端IP和PORT代碼如下:

服務端:

package com.example.springbootkafka2;
import com.example.serverconfig.ServerConfig;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.net.*;
import java.io.*;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

@Component
public class ServerSocket1 implements ApplicationRunner, Ordered {

@Autowired
private Config config;
@Autowired
private ServerConfig serverConfig;

@Autowired
private  Producer producer;
    //監聽的端口號和ip
private  int PORT;
private String IP;

int i;
String ip;
String message;
//定義list集合
List<TopicsInfo> list;
    @Override
    public void run(ApplicationArguments args){//實現ApplicationRunner, Ordered接口中的run方法
        PORT = serverConfig.getPort();
        IP=serverConfig.getIp();
        list= config.getTopicList();
        init();//開啟服務端線程
    }
    @Override
    public int getOrder() {
        return 0;
    }
    private  void init(){
        new Thread(new test1()).start();
    }
    //開啟一個test1線程
    private class test1 implements Runnable{
        private String topic;
        @Override
        public void run() {
            try {
                ServerSocket serverSocket = new ServerSocket();
                serverSocket.bind(new InetSocketAddress(IP,PORT));
                while(true){
                    Socket client = serverSocket.accept();
                    //獲取客戶端的IP
                    InetAddress ia= client.getInetAddress();
                    ip = ia.getHostAddress();
                    System.out.println("客戶端ip:"+ip);
//..........................................................................
                 //判斷當前客戶端是否與服務端有相匹配的IP port topic
                  int size = list.size();
                  for( i=0 ;i<size;i++){
                      if(list.get(i).getIp().equals(ip)){
                          System.out.println("匹配成功,進行鏈接"+list.get(i).getIp());
                          topic=list.get(i).getTopic();
                          //一個客戶端連接就開戶一個線程處理
                          new Thread(new HandlerThread(client,topic)).start();
                          break;
                      }else{
                          System.out.println("對不起,沒有匹配的" + list.get(i).getIp());
                      }
                  }
                }
//...........................................................................
            } catch (IOException e) {
                System.out.println("服務器異常:" + e.getMessage());
            }
        }
    }
    //服務端線程操作過程
    private class HandlerThread implements Runnable{
        private Socket client;
        private String topic;
        public HandlerThread(Socket client ,String topic) {
            this.client = client;
            this.topic = topic;
        }
        @Override
        public void run() {
            try {
                //取得輸入流
                InputStream is = client.getInputStream();
                BufferedReader bw = new BufferedReader(new InputStreamReader(is));
                while (true) {
                    String receive = bw.readLine();
                    //queue.add(receive);
                    message=receive;
                    producer.send(topic,message);//這個是指定主題發送
                    System.out.println("發送消息:"+message);
                    System.out.println("發送成功...");//生產者發送消息,
                    //讀取客戶端發送來的數據
                    System.out.println(receive);
                }
            } catch (IOException e) {
                System.out.println("服務器異常:" + e.getMessage());
            }finally{
                if(client != null){
                    try {
                        client.close();
                    } catch (IOException e) {
                        client = null;
                        System.out.println("服務器異常:" + e.getMessage());
                    }
                }
            }
        }
    }
    /////生產者定義的線程
//private  class SendThread implements Runnable{
//      @SneakyThrows
//        @Override
//        public void run() {
//            while (true) {
//                //String message=queue.take();
//                //System.out.println("對不起我現在是空的:"+queue);
//                //通過這個方法,給指定主題和消費者發消息
//                producer.send(topic,message);//這個是指定主題發送
//                System.out.println("發送消息:"+message);
//                System.out.println("發送成功...");//生產者發送消息
//            }
//        }
//    }
//    //為生產者發送消息開啟一個線程
//private void send(){
// new Thread(new SendThread()).start();
//    }
}

 




客戶端:

package com.example.springbootkafka2;

import com.example.serverconfig.ServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.io.*;
import java.net.Socket;
import java.util.Scanner;

@Component
public class ClientSocket  implements ApplicationRunner,Ordered {

@Autowired
private ServerConfig serverConfig;

private  String IP ;//服務器地址

private  int PORT;//服務器端口號

//    public  void main(String[] args) throws IOException {
//        ClientSocket clientSocket = new ClientSocket();
//        clientSocket.handler();
//    }
private  void handler() throws IOException {
        //創建一個流套接字,連接到指定主機上的指定端口號
          Socket client=null;
        try {
               client = new Socket(IP, PORT);
                    //取得輸出流
                    OutputStream os = client.getOutputStream();
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));
                    //輸入要發送到服務端的消息
                    Scanner sc = new Scanner(System.in);
                    while (true) {
                        String ss = sc.nextLine();
                        System.out.println(ss);
                        bw.write(ss+"\n");
                        bw.flush();
                        System.out.println("數據已輸出");
                        System.out.println(client);
                    }
//                    bw.close();
//                    os.close();
        } catch (Exception e) {
            System.out.println("客戶端異常:" + e.getMessage());
        } finally {
//            if (client != null) {
//                try {
//                    client.close();
//                } catch (IOException e) {
//                    System.out.println("客戶端異常:" + e.getMessage());
//                }
//            }
        }
    }
@Override
public void run(ApplicationArguments args) throws Exception {//實現ApplicationRunner, Ordered接口中的run方法
       IP=serverConfig.getIp();
       System.out.println("服務端IP:"+IP);
       PORT=serverConfig.getPort();
       System.out.println("端口號:"+PORT);
       handler();
    }
@Override
public int getOrder() {
        return 1;
    }//實現ApplicationRunner, Ordered接口中的getOrder方法
}

同理UDP:

UDP是以數據包的形式進行通信,不管服務端有沒有接收,客戶端會一直發,具體代碼如下:

服務端:

package com.example.udp;

import com.example.springbootkafka2.Config;
import com.example.springbootkafka2.Producer;
import com.example.springbootkafka2.TopicsInfo;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;
import com.example.serverconfig.ServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import java.net.*;
import java.util.List;
@Component
public class UdpServer implements ApplicationRunner,Ordered{
    @Autowired
    private Config config;
    @Autowired
    private ServerConfig serverConfig;
    @Autowired
    private Producer producer;
  //監聽的端口號和ip
    private  int PORT;
    private String IP;
  //初始化全局變量
    int i;
    String message;
    String ip;
   //定義list集合
    List<TopicsInfo> list;
    @Override
public void run(ApplicationArguments args){//實現ApplicationRunner, Ordered接口中的run方法
      PORT = serverConfig.getPort();
      IP=serverConfig.getIp();
      list= config.getTopicList();
      init();//開啟服務端線程
   }
    @Override
    public int getOrder() {
        return 0;
    }
    private  void init(){
        new Thread(new test1()).start();
    }
    //開啟一個test1線程
    private  class test1 implements Runnable{
        private String topic;
        @SneakyThrows
        @Override
        public void run() {
            try {
                DatagramSocket socket = new DatagramSocket(PORT,InetAddress.getByName(IP));
                while (true) {
                    byte[] buf = new byte[1024];
                    DatagramPacket packet = new DatagramPacket(buf, buf.length);
                    socket.receive(packet);
                    message = new String(packet.getData());//轉換為String類型
                    //System.out.println(message);
                    //獲取客戶端的IP
                    InetAddress ia = packet.getAddress();
                    ip= ia.getHostAddress();
                    System.out.println("客戶端ip:"+ip);
                    System.out.println(socket);
//..........................................................................
                    //判斷當前客戶端是否與服務端有相匹配的IP port topic
                    int size = list.size();
                    for (i = 0; i < size; i++) {
                        if (list.get(i).getIp().equals(ip)) {
                            System.out.println("匹配成功" + list.get(i).getIp());
                            topic = list.get(i).getTopic();
                            //一個客戶端連接就開戶一個線程處理
                            new Thread(new HandlerThread(topic)).start();
                            break;
                        } else {
                            System.out.println("對不起,沒有匹配的" + list.get(i).getIp());
                        }
                    }
                     }
//...........................................................................
                } catch(SocketException e){
                    System.out.println("服務器異常:" + e.getMessage());
                }
            }
    }
    private class HandlerThread implements Runnable{
        private String topic;
        public HandlerThread(String topic) {
            this.topic = topic;
        }
        @Override
        public void run() {
                    producer.send(topic,message);//這個是指定主題發送
                    System.out.println("發送消息:"+message);
                    System.out.println("發送成功...");//生產者發送消息,
        }
    }
}

客戶端:

package com.example.udp;

import com.example.serverconfig.ServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.io.*;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import java.rmi.UnknownHostException;
import java.util.Scanner;

@Component
public class UdpClient implements ApplicationRunner,Ordered{

    @Autowired
    private ServerConfig serverConfig;

    private String IP ;//服務器地址
     private  int PORT;//服務器端口號

    private  void handler() throws IOException {
        //創建一個流套接字,連接到指定主機上的指定端口號
        try {
            //輸入要發送到服務端的消息
            Scanner sc = new Scanner(System.in);

            while (true) {
                String ss = sc.nextLine();
                byte[] bytes =ss.getBytes();
                DatagramPacket datagramPacket = new DatagramPacket(bytes,bytes.length, InetAddress.getByName(IP),PORT);
                DatagramSocket socket = new DatagramSocket();//創建套接字程序
                socket.send(datagramPacket);//通過套接字發送數據
            }
        } catch (UnknownHostException e) {
            System.out.println("客戶端異常:" + e.getMessage());
        }
    }
    @Override
    public void run(ApplicationArguments args) throws Exception {//實現ApplicationRunner, Ordered接口中的run方法
        IP=serverConfig.getIp();
        System.out.println("服務端IP:"+IP);
        PORT=serverConfig.getPort();
        System.out.println("端口號:"+PORT);
        handler();
    }
    @Override
    public int getOrder() {
        return 1;
    }//實現ApplicationRunner, Ordered接口中的getOrder方法
}

配置文件

server:
  port: 8088
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    topic: test,topic1
#生產者#消費者.....去匹配他們

####################
topics:
  topicList[0]:
    ip: 127.0.0.2
    port: 8084
    topic: test
  topicList[1]:
    ip: 127.0.0.1
    port: 8085
    topic: topic1

################
servers:
  ip: 127.0.0.1
  port: 8086

關於kafka的生產者與消費者代碼如下:

生產者:

package com.example.springbootkafka2;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Slf4j
@Component
public  class Producer {
    @Autowired
    private KafkaTemplate kafkaTemplate;//沒有變
    @Value("${spring.kafka.topic}")     //引入application.properties中的數據
    private String topics;

    public void send(String topics,String message) {//多個topic屬性
        log.info("待發送的消息:{}", message);
       ListenableFuture<SendResult<String,String>> future =kafkaTemplate.send(topics,message);
      future.addCallback(success ->log.info("Producer發送消息成功"),fail->log.error("Producer發送消息失敗"));
    }
}

消費者:

package com.example.springbootkafka2;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;

@Slf4j
@Component
public class Consumer {
    String message = null;
//    LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
   //private static KafkaConsumer<String,String> consumer;
    @KafkaListener(topics = "#{'${spring.kafka.topic}'.split(',')}")
    public void receiver(ConsumerRecord record) throws InterruptedException {
        log.info("接收到的數據:主題:"+record.topic()+".....key:"+record.key()+"....值:"+record.value());
        //System.out.println("主題:"+record.topic()+",值:"+record.value());//我在這輸出打印了
        //。。。。。。。。。下面的先不看。。。。。。。。。。。
//        queue.add((String) record.value());
//        System.out.println("我已經進入了隊列:"+queue);
//        message = queue.take();
//        System.out.println(message);
//       System.out.println("現在隊列為空:"+queue);
    }
}

配置類

package com.example.springbootkafka2;

import lombok.Data;


@Data
public class TopicsInfo {
    private String ip;
    private int port;
    private String topic;
}


package com.example.springbootkafka2;


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.List;



@Component
@Configuration
@ConfigurationProperties("topics")
@EnableConfigurationProperties(Config.class)
@Data
public class Config {
    private List<TopicsInfo> topicList;
}

以上是一個消息發送模塊,主要處理消息數據實現!如有不足之處,共同探討,還望多多留言指教。。。。。。。。。。。。。。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM