/**
* SparkStreaming對接rabbitmq java代碼
*/
public class SparkConsumerRabbit {
public static void main(String[] args) throws InterruptedException, AnalysisException {
SparkConf sparkConf = new SparkConf()
.setAppName("SparkConsumerRabbit")
.setMaster("local[2]");
//毫秒 Duration參數
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(1000));
Map<String, String> params = new HashMap<>();
//map中參數設置
params.put("hosts", "192.168.45.10");
params.put("port", "5672");
params.put("userName", "admin");
params.put("password", "admin");
params.put("queueName", "cj_ack");
//如報錯請添加下面的參數,原因是代碼運行報錯底層已經把durable置為true了;
//params.put("durable", "false");
Function<QueueingConsumer.Delivery, String> handler = message -> new String(message.getBody());
JavaReceiverInputDStream<String> messages = RabbitMQUtils.createJavaStream(jsc,String.class,params,handler);
messages.print();
jsc.start();
jsc.awaitTermination();
}
}
本代碼在1.5.0中運行無誤,如使用2.3.0以上代碼編寫需要添加logging類,后續將補充上,如有不足之處請諒解。相互學習。