关于TCP/UDP通信长连接->消息队列kafka+springboot


TCP/UDP长连接的误区:

对于TCP来说,属于连接式通信协议。需要实例化ServerSocket对象,通过serverSocket.accept()来建立阻塞,同时还可以通过bind方法来绑定服务端IP和PORT代码如下:

服务端:

package com.example.springbootkafka2;
import com.example.serverconfig.ServerConfig;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.net.*;
import java.io.*;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

@Component
public class ServerSocket1 implements ApplicationRunner, Ordered {

@Autowired
private Config config;
@Autowired
private ServerConfig serverConfig;

@Autowired
private  Producer producer;
    //监听的端口号和ip
private  int PORT;
private String IP;

int i;
String ip;
String message;
//定义list集合
List<TopicsInfo> list;
    @Override
    public void run(ApplicationArguments args){//实现ApplicationRunner, Ordered接口中的run方法
        PORT = serverConfig.getPort();
        IP=serverConfig.getIp();
        list= config.getTopicList();
        init();//开启服务端线程
    }
    @Override
    public int getOrder() {
        return 0;
    }
    private  void init(){
        new Thread(new test1()).start();
    }
    //开启一个test1线程
    private class test1 implements Runnable{
        private String topic;
        @Override
        public void run() {
            try {
                ServerSocket serverSocket = new ServerSocket();
                serverSocket.bind(new InetSocketAddress(IP,PORT));
                while(true){
                    Socket client = serverSocket.accept();
                    //获取客户端的IP
                    InetAddress ia= client.getInetAddress();
                    ip = ia.getHostAddress();
                    System.out.println("客户端ip:"+ip);
//..........................................................................
                 //判断当前客户端是否与服务端有相匹配的IP port topic
                  int size = list.size();
                  for( i=0 ;i<size;i++){
                      if(list.get(i).getIp().equals(ip)){
                          System.out.println("匹配成功,进行链接"+list.get(i).getIp());
                          topic=list.get(i).getTopic();
                          //一个客户端连接就开户一个线程处理
                          new Thread(new HandlerThread(client,topic)).start();
                          break;
                      }else{
                          System.out.println("对不起,没有匹配的" + list.get(i).getIp());
                      }
                  }
                }
//...........................................................................
            } catch (IOException e) {
                System.out.println("服务器异常:" + e.getMessage());
            }
        }
    }
    //服务端线程操作过程
    private class HandlerThread implements Runnable{
        private Socket client;
        private String topic;
        public HandlerThread(Socket client ,String topic) {
            this.client = client;
            this.topic = topic;
        }
        @Override
        public void run() {
            try {
                //取得输入流
                InputStream is = client.getInputStream();
                BufferedReader bw = new BufferedReader(new InputStreamReader(is));
                while (true) {
                    String receive = bw.readLine();
                    //queue.add(receive);
                    message=receive;
                    producer.send(topic,message);//这个是指定主题发送
                    System.out.println("发送消息:"+message);
                    System.out.println("发送成功...");//生产者发送消息,
                    //读取客户端发送来的数据
                    System.out.println(receive);
                }
            } catch (IOException e) {
                System.out.println("服务器异常:" + e.getMessage());
            }finally{
                if(client != null){
                    try {
                        client.close();
                    } catch (IOException e) {
                        client = null;
                        System.out.println("服务器异常:" + e.getMessage());
                    }
                }
            }
        }
    }
    /////生产者定义的线程
//private  class SendThread implements Runnable{
//      @SneakyThrows
//        @Override
//        public void run() {
//            while (true) {
//                //String message=queue.take();
//                //System.out.println("对不起我现在是空的:"+queue);
//                //通过这个方法,给指定主题和消费者发消息
//                producer.send(topic,message);//这个是指定主题发送
//                System.out.println("发送消息:"+message);
//                System.out.println("发送成功...");//生产者发送消息
//            }
//        }
//    }
//    //为生产者发送消息开启一个线程
//private void send(){
// new Thread(new SendThread()).start();
//    }
}

 




客户端:

package com.example.springbootkafka2;

import com.example.serverconfig.ServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.io.*;
import java.net.Socket;
import java.util.Scanner;

@Component
public class ClientSocket  implements ApplicationRunner,Ordered {

@Autowired
private ServerConfig serverConfig;

private  String IP ;//服务器地址

private  int PORT;//服务器端口号

//    public  void main(String[] args) throws IOException {
//        ClientSocket clientSocket = new ClientSocket();
//        clientSocket.handler();
//    }
private  void handler() throws IOException {
        //创建一个流套接字,连接到指定主机上的指定端口号
          Socket client=null;
        try {
               client = new Socket(IP, PORT);
                    //取得输出流
                    OutputStream os = client.getOutputStream();
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));
                    //输入要发送到服务端的消息
                    Scanner sc = new Scanner(System.in);
                    while (true) {
                        String ss = sc.nextLine();
                        System.out.println(ss);
                        bw.write(ss+"\n");
                        bw.flush();
                        System.out.println("数据已输出");
                        System.out.println(client);
                    }
//                    bw.close();
//                    os.close();
        } catch (Exception e) {
            System.out.println("客户端异常:" + e.getMessage());
        } finally {
//            if (client != null) {
//                try {
//                    client.close();
//                } catch (IOException e) {
//                    System.out.println("客户端异常:" + e.getMessage());
//                }
//            }
        }
    }
@Override
public void run(ApplicationArguments args) throws Exception {//实现ApplicationRunner, Ordered接口中的run方法
       IP=serverConfig.getIp();
       System.out.println("服务端IP:"+IP);
       PORT=serverConfig.getPort();
       System.out.println("端口号:"+PORT);
       handler();
    }
@Override
public int getOrder() {
        return 1;
    }//实现ApplicationRunner, Ordered接口中的getOrder方法
}

同理UDP:

UDP是以数据包的形式进行通信,不管服务端有没有接收,客户端会一直发,具体代码如下:

服务端:

package com.example.udp;

import com.example.springbootkafka2.Config;
import com.example.springbootkafka2.Producer;
import com.example.springbootkafka2.TopicsInfo;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;
import com.example.serverconfig.ServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import java.net.*;
import java.util.List;
@Component
public class UdpServer implements ApplicationRunner,Ordered{
    @Autowired
    private Config config;
    @Autowired
    private ServerConfig serverConfig;
    @Autowired
    private Producer producer;
  //监听的端口号和ip
    private  int PORT;
    private String IP;
  //初始化全局变量
    int i;
    String message;
    String ip;
   //定义list集合
    List<TopicsInfo> list;
    @Override
public void run(ApplicationArguments args){//实现ApplicationRunner, Ordered接口中的run方法
      PORT = serverConfig.getPort();
      IP=serverConfig.getIp();
      list= config.getTopicList();
      init();//开启服务端线程
   }
    @Override
    public int getOrder() {
        return 0;
    }
    private  void init(){
        new Thread(new test1()).start();
    }
    //开启一个test1线程
    private  class test1 implements Runnable{
        private String topic;
        @SneakyThrows
        @Override
        public void run() {
            try {
                DatagramSocket socket = new DatagramSocket(PORT,InetAddress.getByName(IP));
                while (true) {
                    byte[] buf = new byte[1024];
                    DatagramPacket packet = new DatagramPacket(buf, buf.length);
                    socket.receive(packet);
                    message = new String(packet.getData());//转换为String类型
                    //System.out.println(message);
                    //获取客户端的IP
                    InetAddress ia = packet.getAddress();
                    ip= ia.getHostAddress();
                    System.out.println("客户端ip:"+ip);
                    System.out.println(socket);
//..........................................................................
                    //判断当前客户端是否与服务端有相匹配的IP port topic
                    int size = list.size();
                    for (i = 0; i < size; i++) {
                        if (list.get(i).getIp().equals(ip)) {
                            System.out.println("匹配成功" + list.get(i).getIp());
                            topic = list.get(i).getTopic();
                            //一个客户端连接就开户一个线程处理
                            new Thread(new HandlerThread(topic)).start();
                            break;
                        } else {
                            System.out.println("对不起,没有匹配的" + list.get(i).getIp());
                        }
                    }
                     }
//...........................................................................
                } catch(SocketException e){
                    System.out.println("服务器异常:" + e.getMessage());
                }
            }
    }
    private class HandlerThread implements Runnable{
        private String topic;
        public HandlerThread(String topic) {
            this.topic = topic;
        }
        @Override
        public void run() {
                    producer.send(topic,message);//这个是指定主题发送
                    System.out.println("发送消息:"+message);
                    System.out.println("发送成功...");//生产者发送消息,
        }
    }
}

客户端:

package com.example.udp;

import com.example.serverconfig.ServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.io.*;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import java.rmi.UnknownHostException;
import java.util.Scanner;

@Component
public class UdpClient implements ApplicationRunner,Ordered{

    @Autowired
    private ServerConfig serverConfig;

    private String IP ;//服务器地址
     private  int PORT;//服务器端口号

    private  void handler() throws IOException {
        //创建一个流套接字,连接到指定主机上的指定端口号
        try {
            //输入要发送到服务端的消息
            Scanner sc = new Scanner(System.in);

            while (true) {
                String ss = sc.nextLine();
                byte[] bytes =ss.getBytes();
                DatagramPacket datagramPacket = new DatagramPacket(bytes,bytes.length, InetAddress.getByName(IP),PORT);
                DatagramSocket socket = new DatagramSocket();//创建套接字程序
                socket.send(datagramPacket);//通过套接字发送数据
            }
        } catch (UnknownHostException e) {
            System.out.println("客户端异常:" + e.getMessage());
        }
    }
    @Override
    public void run(ApplicationArguments args) throws Exception {//实现ApplicationRunner, Ordered接口中的run方法
        IP=serverConfig.getIp();
        System.out.println("服务端IP:"+IP);
        PORT=serverConfig.getPort();
        System.out.println("端口号:"+PORT);
        handler();
    }
    @Override
    public int getOrder() {
        return 1;
    }//实现ApplicationRunner, Ordered接口中的getOrder方法
}

配置文件

server:
  port: 8088
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    topic: test,topic1
#生产者#消费者.....去匹配他们

####################
topics:
  topicList[0]:
    ip: 127.0.0.2
    port: 8084
    topic: test
  topicList[1]:
    ip: 127.0.0.1
    port: 8085
    topic: topic1

################
servers:
  ip: 127.0.0.1
  port: 8086

关于kafka的生产者与消费者代码如下:

生产者:

package com.example.springbootkafka2;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Slf4j
@Component
public  class Producer {
    @Autowired
    private KafkaTemplate kafkaTemplate;//没有变
    @Value("${spring.kafka.topic}")     //引入application.properties中的数据
    private String topics;

    public void send(String topics,String message) {//多个topic属性
        log.info("待发送的消息:{}", message);
       ListenableFuture<SendResult<String,String>> future =kafkaTemplate.send(topics,message);
      future.addCallback(success ->log.info("Producer发送消息成功"),fail->log.error("Producer发送消息失败"));
    }
}

消费者:

package com.example.springbootkafka2;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;

@Slf4j
@Component
public class Consumer {
    String message = null;
//    LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
   //private static KafkaConsumer<String,String> consumer;
    @KafkaListener(topics = "#{'${spring.kafka.topic}'.split(',')}")
    public void receiver(ConsumerRecord record) throws InterruptedException {
        log.info("接收到的数据:主题:"+record.topic()+".....key:"+record.key()+"....值:"+record.value());
        //System.out.println("主题:"+record.topic()+",值:"+record.value());//我在这输出打印了
        //。。。。。。。。。下面的先不看。。。。。。。。。。。
//        queue.add((String) record.value());
//        System.out.println("我已经进入了队列:"+queue);
//        message = queue.take();
//        System.out.println(message);
//       System.out.println("现在队列为空:"+queue);
    }
}

配置类

package com.example.springbootkafka2;

import lombok.Data;


@Data
public class TopicsInfo {
    private String ip;
    private int port;
    private String topic;
}


package com.example.springbootkafka2;


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.List;



@Component
@Configuration
@ConfigurationProperties("topics")
@EnableConfigurationProperties(Config.class)
@Data
public class Config {
    private List<TopicsInfo> topicList;
}

以上是一个消息发送模块,主要处理消息数据实现!如有不足之处,共同探讨,还望多多留言指教。。。。。。。。。。。。。。

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM