/** * rabbitmq 工具类 * * @author yangxj * @date 2020-03-25 20:30 * * 基本概念: queue 通过 routeKey 绑定 exchange; producer 发送消息到 exchange, exchange 通过 routeKey 发送与之绑定的 queue * exchange 类型: direct, fanout, topic */ public class RabbitMQUtils { /** * 获取rabbitMq 连接 * * @return * @throws Exception */ public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setUsername("guest"); return factory.newConnection(); }
}
-
事务机制
/** * 1. 通过事务机制( 缺点: 降低了mq 的吞吐量) */ public static void useTx() throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); // 声明一个queue channel.queueDeclare("test-queue", false, false, false, null); // 声明一个topic 类型的 exchange channel.exchangeDeclare("test-topic-exchange", BuiltinExchangeType.TOPIC); // queue 通过 routeKey 绑定 exchange channel.queueBind("test-queue", "test-topic-exchange", "test.*"); String message = "hello rabbit mq!"; channel.txSelect(); // 开启事务 try { channel.basicPublish("test-topic-exchange", "test.message", null, message.getBytes()); // 消息发送 channel.txCommit(); System.out.println("message send success.."); } catch (Exception e) { channel.txRollback(); // 回滚 System.out.println("message send fail.."); } finally { channel.close(); connection.close(); } }
-
生产者confirm模式 (同步)
/** * 2. 生产者 confirm模式(串型) * 原理: 当信道channel 进入confrim模式, 所有在该信道发布的消息都会被指派一个唯一的id * 当消息被投递到所有匹配的队列后, broker就会发送一个确认给生产者(包含消息id); */ public void producterConfirmSync() throws Exception { Connection connection = getConnection(); // 获取rabbitMq 连接 Channel channel = connection.createChannel(); channel.confirmSelect(); // 开启confirm模式 String message = "hello rabbit mq!"; String message2 = "hello rabbit mq2!"; channel.basicPublish("", "", MessageProperties.BASIC, message.getBytes()); channel.basicPublish("", "", MessageProperties.BASIC, message2.getBytes()); if (channel.waitForConfirms()) { // 支持多条发送后再确认 System.out.println("message send success.."); } else { System.out.println("message send fail.."); } channel.close(); connection.close(); }
-
生产者confirm (异步)
/** * 3. 生产者 confirm模式(异步) */ public void producterConfirmAsync() throws Exception { Connection connection = getConnection(); // 获取rabbitMq 连接 Channel channel = connection.createChannel(); channel.confirmSelect(); // 开启confirm模式 // ack ids SortedSet<Long> ackIds = new TreeSet<>(); // not ack ids SortedSet<Long> nackIds = new TreeSet<>(); for (int i = 0; i < 100; i++) { String message = "hello rabbit mq: " + i; channel.basicPublish("", "", null, message.getBytes()); } channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 发送成功(确认) // multiple 单条确认 or多条确认 (为true, 表示到这个序列号之前的所有消息都已经得到了处理) ackIds.add(deliveryTag); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 发送失败(未确认) nackIds.add(deliveryTag); } }); }
-
消费者confirm
/** * 消费者 confirm */ public void consumerConfirm() throws Exception { Connection connection = getConnection(); // 获取rabbitMq 连接 Channel channel = connection.createChannel(); try { channel.basicConsume("test", false, // 关闭消费者自动ack new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的 long deliveryTag = envelope.getDeliveryTag(); //第二个参数是批量确认标志。如果值为true,则执行批量确认,此deliveryTag之前收到的消息全部进行确认; // 如果值为false,则只对当前收到的消息进行确认 channel.basicAck(deliveryTag, true); // 确认 } }); } catch (IOException e) { e.printStackTrace(); }finally { channel.close(); connection.close(); } }