SparkStreaming對接rabbitMQ


/**
* SparkStreaming對接rabbitmq java代碼
*/
public class SparkConsumerRabbit {
public static void main(String[] args) throws InterruptedException, AnalysisException {
SparkConf sparkConf = new SparkConf()
.setAppName("SparkConsumerRabbit")
.setMaster("local[2]");
//毫秒 Duration參數
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1000));
Map<String, String> params = new HashMap<>();
//map中參數設置
params.put("hosts", "192.168.45.10");
params.put("port", "5672");
params.put("userName", "admin");
params.put("password", "admin");
params.put("queueName", "cj_ack");

//如報錯請添加下面的參數,原因是代碼運行報錯底層已經把durable置為true了;
//params.put("durable", "false");
Function<QueueingConsumer.Delivery, String> handler = message -> new String(message.getBody());
JavaReceiverInputDStream<String> messages = RabbitMQUtils.createJavaStream(jsc,String.class,params,handler);
messages.print();
jsc.start();
jsc.awaitTermination();
}
}

本代碼在1.5.0中運行無誤,如使用2.3.0以上代碼編寫需要添加logging類,后續將補充上,如有不足之處請諒解。相互學習。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM