Flink 消費RabbitMQ 和 Kafka


在消息RabbitMQ時,我們關心的一個問題是手動ack還是自動ack,如果是自動ack就怕出現丟消息的情況
Flink以RabbitMQ作為Source,是怎么保證消息唯一性的呢,是怎么保證ack的.

 

首先引入依賴包
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-connector-kafka-0.10_${scala.version}</artifactId>
     <version>${flink.version}</version>
</dependency>               

 

RMQSource類,可以看到如果設置了checkpointing,則默認autoAck是false,是手動控制提交的
那什么時候提交呢,flink checkpointing有個時間間隔,每次checkpointing觸發時,才能ack,也就是說,不是一條消息ack一下,而是定時ack
這個跟kafka,update offset一樣,都是在checkpoint的時候處理 @Override public void open(Configuration config) throws Exception { super.open(config); ConnectionFactory factory = setupConnectionFactory(); try { connection = factory.newConnection(); channel = connection.createChannel(); if (channel == null) { throw new RuntimeException("None of RabbitMQ channels are available"); } setupQueue(); consumer = new QueueingConsumer(channel); RuntimeContext runtimeContext = getRuntimeContext(); if (runtimeContext instanceof StreamingRuntimeContext && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) { autoAck = false; // enables transaction mode channel.txSelect(); } else { autoAck = true; } LOG.debug("Starting RabbitMQ source with autoAck status: " + autoAck); channel.basicConsume(queueName, autoAck, consumer); } catch (IOException e) { throw new RuntimeException("Cannot create RMQ connection with " + queueName + " at " + rmqConnectionConfig.getHost(), e); } running = true; }

 

RMQSource

@Override
	public void run(SourceContext<OUT> ctx) throws Exception {
		while (running) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();

			synchronized (ctx.getCheckpointLock()) {

				OUT result = schema.deserialize(delivery.getBody());

				if (schema.isEndOfStream(result)) {
					break;
				}

				if (!autoAck) {
					final long deliveryTag = delivery.getEnvelope().getDeliveryTag();
					if (usesCorrelationId) {
						final String correlationId = delivery.getProperties().getCorrelationId();
						Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
							"with usesCorrelationId set to true but a message was received with " +
							"correlation id set to null!");
						if (!addId(correlationId)) {
							// we have already processed this message
							continue;
						}
					}
					sessionIds.add(deliveryTag);
				}

				ctx.collect(result);
			}
		}
	}

 

@Override
	protected void acknowledgeSessionIDs(List<Long> sessionIds) {
		try {
			for (long id : sessionIds) {
				channel.basicAck(id, false);
			}
			channel.txCommit();
		} catch (IOException e) {
			throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
		}
	}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM