TCP/UDP长连接的误区:
对于TCP来说,属于连接式通信协议。需要实例化ServerSocket对象,通过serverSocket.accept()来建立阻塞,同时还可以通过bind方法来绑定服务端IP和PORT代码如下:
服务端:
package com.example.springbootkafka2; import com.example.serverconfig.ServerConfig; import lombok.SneakyThrows; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; import org.springframework.stereotype.Component; import java.net.*; import java.io.*; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @Component public class ServerSocket1 implements ApplicationRunner, Ordered { @Autowired private Config config; @Autowired private ServerConfig serverConfig; @Autowired private Producer producer; //监听的端口号和ip private int PORT; private String IP; int i; String ip; String message; //定义list集合 List<TopicsInfo> list; @Override public void run(ApplicationArguments args){//实现ApplicationRunner, Ordered接口中的run方法 PORT = serverConfig.getPort(); IP=serverConfig.getIp(); list= config.getTopicList(); init();//开启服务端线程 } @Override public int getOrder() { return 0; } private void init(){ new Thread(new test1()).start(); } //开启一个test1线程 private class test1 implements Runnable{ private String topic; @Override public void run() { try { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(IP,PORT)); while(true){ Socket client = serverSocket.accept(); //获取客户端的IP InetAddress ia= client.getInetAddress(); ip = ia.getHostAddress(); System.out.println("客户端ip:"+ip); //.......................................................................... //判断当前客户端是否与服务端有相匹配的IP port topic int size = list.size(); for( i=0 ;i<size;i++){ if(list.get(i).getIp().equals(ip)){ System.out.println("匹配成功,进行链接"+list.get(i).getIp()); topic=list.get(i).getTopic(); //一个客户端连接就开户一个线程处理 new Thread(new HandlerThread(client,topic)).start(); break; }else{ System.out.println("对不起,没有匹配的" + list.get(i).getIp()); } } } //........................................................................... } catch (IOException e) { System.out.println("服务器异常:" + e.getMessage()); } } } //服务端线程操作过程 private class HandlerThread implements Runnable{ private Socket client; private String topic; public HandlerThread(Socket client ,String topic) { this.client = client; this.topic = topic; } @Override public void run() { try { //取得输入流 InputStream is = client.getInputStream(); BufferedReader bw = new BufferedReader(new InputStreamReader(is)); while (true) { String receive = bw.readLine(); //queue.add(receive); message=receive; producer.send(topic,message);//这个是指定主题发送 System.out.println("发送消息:"+message); System.out.println("发送成功...");//生产者发送消息, //读取客户端发送来的数据 System.out.println(receive); } } catch (IOException e) { System.out.println("服务器异常:" + e.getMessage()); }finally{ if(client != null){ try { client.close(); } catch (IOException e) { client = null; System.out.println("服务器异常:" + e.getMessage()); } } } } } /////生产者定义的线程 //private class SendThread implements Runnable{ // @SneakyThrows // @Override // public void run() { // while (true) { // //String message=queue.take(); // //System.out.println("对不起我现在是空的:"+queue); // //通过这个方法,给指定主题和消费者发消息 // producer.send(topic,message);//这个是指定主题发送 // System.out.println("发送消息:"+message); // System.out.println("发送成功...");//生产者发送消息 // } // } // } // //为生产者发送消息开启一个线程 //private void send(){ // new Thread(new SendThread()).start(); // } }
客户端:
package com.example.springbootkafka2; import com.example.serverconfig.ServerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; import org.springframework.stereotype.Component; import java.io.*; import java.net.Socket; import java.util.Scanner; @Component public class ClientSocket implements ApplicationRunner,Ordered { @Autowired private ServerConfig serverConfig; private String IP ;//服务器地址 private int PORT;//服务器端口号 // public void main(String[] args) throws IOException { // ClientSocket clientSocket = new ClientSocket(); // clientSocket.handler(); // } private void handler() throws IOException { //创建一个流套接字,连接到指定主机上的指定端口号 Socket client=null; try { client = new Socket(IP, PORT); //取得输出流 OutputStream os = client.getOutputStream(); BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os)); //输入要发送到服务端的消息 Scanner sc = new Scanner(System.in); while (true) { String ss = sc.nextLine(); System.out.println(ss); bw.write(ss+"\n"); bw.flush(); System.out.println("数据已输出"); System.out.println(client); } // bw.close(); // os.close(); } catch (Exception e) { System.out.println("客户端异常:" + e.getMessage()); } finally { // if (client != null) { // try { // client.close(); // } catch (IOException e) { // System.out.println("客户端异常:" + e.getMessage()); // } // } } } @Override public void run(ApplicationArguments args) throws Exception {//实现ApplicationRunner, Ordered接口中的run方法 IP=serverConfig.getIp(); System.out.println("服务端IP:"+IP); PORT=serverConfig.getPort(); System.out.println("端口号:"+PORT); handler(); } @Override public int getOrder() { return 1; }//实现ApplicationRunner, Ordered接口中的getOrder方法 }
同理UDP:
UDP是以数据包的形式进行通信,不管服务端有没有接收,客户端会一直发,具体代码如下:
服务端:
package com.example.udp; import com.example.springbootkafka2.Config; import com.example.springbootkafka2.Producer; import com.example.springbootkafka2.TopicsInfo; import lombok.SneakyThrows; import org.springframework.stereotype.Component; import com.example.serverconfig.ServerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; import java.net.*; import java.util.List; @Component public class UdpServer implements ApplicationRunner,Ordered{ @Autowired private Config config; @Autowired private ServerConfig serverConfig; @Autowired private Producer producer; //监听的端口号和ip private int PORT; private String IP; //初始化全局变量 int i; String message; String ip; //定义list集合 List<TopicsInfo> list; @Override public void run(ApplicationArguments args){//实现ApplicationRunner, Ordered接口中的run方法 PORT = serverConfig.getPort(); IP=serverConfig.getIp(); list= config.getTopicList(); init();//开启服务端线程 } @Override public int getOrder() { return 0; } private void init(){ new Thread(new test1()).start(); } //开启一个test1线程 private class test1 implements Runnable{ private String topic; @SneakyThrows @Override public void run() { try { DatagramSocket socket = new DatagramSocket(PORT,InetAddress.getByName(IP)); while (true) { byte[] buf = new byte[1024]; DatagramPacket packet = new DatagramPacket(buf, buf.length); socket.receive(packet); message = new String(packet.getData());//转换为String类型 //System.out.println(message); //获取客户端的IP InetAddress ia = packet.getAddress(); ip= ia.getHostAddress(); System.out.println("客户端ip:"+ip); System.out.println(socket); //.......................................................................... //判断当前客户端是否与服务端有相匹配的IP port topic int size = list.size(); for (i = 0; i < size; i++) { if (list.get(i).getIp().equals(ip)) { System.out.println("匹配成功" + list.get(i).getIp()); topic = list.get(i).getTopic(); //一个客户端连接就开户一个线程处理 new Thread(new HandlerThread(topic)).start(); break; } else { System.out.println("对不起,没有匹配的" + list.get(i).getIp()); } } } //........................................................................... } catch(SocketException e){ System.out.println("服务器异常:" + e.getMessage()); } } } private class HandlerThread implements Runnable{ private String topic; public HandlerThread(String topic) { this.topic = topic; } @Override public void run() { producer.send(topic,message);//这个是指定主题发送 System.out.println("发送消息:"+message); System.out.println("发送成功...");//生产者发送消息, } } }
客户端:
package com.example.udp; import com.example.serverconfig.ServerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; import org.springframework.stereotype.Component; import java.io.*; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.Socket; import java.rmi.UnknownHostException; import java.util.Scanner; @Component public class UdpClient implements ApplicationRunner,Ordered{ @Autowired private ServerConfig serverConfig; private String IP ;//服务器地址 private int PORT;//服务器端口号 private void handler() throws IOException { //创建一个流套接字,连接到指定主机上的指定端口号 try { //输入要发送到服务端的消息 Scanner sc = new Scanner(System.in); while (true) { String ss = sc.nextLine(); byte[] bytes =ss.getBytes(); DatagramPacket datagramPacket = new DatagramPacket(bytes,bytes.length, InetAddress.getByName(IP),PORT); DatagramSocket socket = new DatagramSocket();//创建套接字程序 socket.send(datagramPacket);//通过套接字发送数据 } } catch (UnknownHostException e) { System.out.println("客户端异常:" + e.getMessage()); } } @Override public void run(ApplicationArguments args) throws Exception {//实现ApplicationRunner, Ordered接口中的run方法 IP=serverConfig.getIp(); System.out.println("服务端IP:"+IP); PORT=serverConfig.getPort(); System.out.println("端口号:"+PORT); handler(); } @Override public int getOrder() { return 1; }//实现ApplicationRunner, Ordered接口中的getOrder方法 }
配置文件
server:
port: 8088
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
topic: test,topic1
#生产者#消费者.....去匹配他们
####################
topics:
topicList[0]:
ip: 127.0.0.2
port: 8084
topic: test
topicList[1]:
ip: 127.0.0.1
port: 8085
topic: topic1
################
servers:
ip: 127.0.0.1
port: 8086
关于kafka的生产者与消费者代码如下:
生产者:
package com.example.springbootkafka2; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; @Slf4j @Component public class Producer { @Autowired private KafkaTemplate kafkaTemplate;//没有变 @Value("${spring.kafka.topic}") //引入application.properties中的数据 private String topics; public void send(String topics,String message) {//多个topic属性 log.info("待发送的消息:{}", message); ListenableFuture<SendResult<String,String>> future =kafkaTemplate.send(topics,message); future.addCallback(success ->log.info("Producer发送消息成功"),fail->log.error("Producer发送消息失败")); } }
消费者:
package com.example.springbootkafka2; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.concurrent.LinkedBlockingQueue; @Slf4j @Component public class Consumer { String message = null; // LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); //private static KafkaConsumer<String,String> consumer; @KafkaListener(topics = "#{'${spring.kafka.topic}'.split(',')}") public void receiver(ConsumerRecord record) throws InterruptedException { log.info("接收到的数据:主题:"+record.topic()+".....key:"+record.key()+"....值:"+record.value()); //System.out.println("主题:"+record.topic()+",值:"+record.value());//我在这输出打印了 //。。。。。。。。。下面的先不看。。。。。。。。。。。 // queue.add((String) record.value()); // System.out.println("我已经进入了队列:"+queue); // message = queue.take(); // System.out.println(message); // System.out.println("现在队列为空:"+queue); } }
配置类
package com.example.springbootkafka2; import lombok.Data; @Data public class TopicsInfo { private String ip; private int port; private String topic; } package com.example.springbootkafka2; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.util.List; @Component @Configuration @ConfigurationProperties("topics") @EnableConfigurationProperties(Config.class) @Data public class Config { private List<TopicsInfo> topicList; }
以上是一个消息发送模块,主要处理消息数据实现!如有不足之处,共同探讨,还望多多留言指教。。。。。。。。。。。。。。