場景描述:之前是做kafka不是通過ssl驗證的方式進行接入的,所以就是正常的接受數據。發現我們通過aws服務器去訪問阿里雲服務器上的kafka的時候,我們服務器要把全部的網關
開放給阿里雲服務器的kafka這樣的話數據就很不安全。所以就從阿里買了kafka服務器這樣就能通過公網去訪問服務器,然后帶驗證的kafka集群。
下面是flink連接kafka不用驗證的代碼:
不用驗證的kafka,flink 讀取數據
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "IP:9093"); properties.setProperty("group.id", "tests-consumer-ssgroup20200922"); properties.setProperty("auto.offset.reset", "latest"); System.out.println("11111111111"); FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010("binlog", new SimpleStringSchema(), properties); DataStream<String> keyedStream = env.addSource(myConsumer).setParallelism(1); System.out.println("2222222222222"); System.out.println("3333333333333"); keyedStream.addSink(new MysqlSink()).setParallelism(1).name("數據插入mysql").disableChaining(); env.execute("Flink Streaming Java binlog data"); }
需要驗證的kafka,flink讀取數據
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props = new Properties();
//設置接入點,請通過控制台獲取對應Topic的接入點
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9093");
//設置SSL根證書的路徑,請記得將XXX修改為自己的路徑
//與sasl路徑類似,該文件也不能被打包到jar中
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/home/hadoop/kafka.client.truststore.jks");
//根證書store的密碼,保持不變
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
//接入協議,目前支持使用SASL_SSL協議接入
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
//SASL鑒權方式,保持不變
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//兩次poll之間的最大允許間隔
//可更加實際拉去數據和客戶的版本等設置此值,默認30s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//設置單次拉取的量,走公網訪問時,該參數會有較大影響
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
//每次poll的最大數量
//注意該值不要改得太大,如果poll太多數據,而不能在下次poll之前消費完,則會觸發一次負載均衡,產生卡頓
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//當前消費實例所屬的消費組,請在控制台申請之后填寫
//屬於同一個組的消費實例,會負載消費消息
props.put(ConsumerConfig.GROUP_ID_CONFIG, "bi-binlog");
//hostname校驗改成空
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
//props.put("auto.offset.reset", "earliest");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';"); //這里的分號一定要注意
FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010("binlog", new SimpleStringSchema(), props);
DataStream<String> keyedStream = env.addSource(myConsumer).setParallelism(1);
System.out.println("2222222222222");
System.out.println("3333333333333");
// System.out.println(keyedStream.print());
keyedStream.addSink(new MysqlSink()).setParallelism(1).name("數據插入mysql").disableChaining();
// keyedStream.addSink(new S3Sink()).setParallelism(2).name("數據插入s3");
env.execute("Flink Streaming Java binlog data");
}
因為kafka是要經過ssl驗證的,因為我們這里是使用的阿里的kafka所以他有一個驗證文件kafka.client.truststore.jks,這個文件是kafka的驗證文件。必須放在服務器的本地。他不能通過讀取hdfs上的文件。
所以在這里一定要注意將這個文件放在你flink 集群的各個節點上面都放一份。要不然他就會報下面的錯誤。
flink的相關環境bubu部署好之后就將代碼打包提交到服務器上。提交的命令如下:
flink run -m yarn-cluster -yjm 1024 -ytm 1024 -c WordCount /mnt/flinkjar/mysqlflink.jar
我們的代碼是提交到yarn上面去的,這里給jobmanager和taskmanager各自分配了一個G的內存。
然后我將代碼提交之后就開始報錯報錯的內容如下:
2020-09-27 11:35:40,927 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
- Shutting YarnJobClusterEntrypoint down with application status FAILED.
Diagnostics java.lang.NoSuchMethodError:
org.apache.flink.configuration.ConfigOptions$OptionBuilder.stringType()Lorg/apache/flink/configuration/ConfigOptions$TypedConfigOptionBuilder; at org.apache.flink.yarn.configuration.YarnConfigOptions.<clinit>(YarnConfigOptions.java:214) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.getRPCPortRange(YarnJobClusterEntrypoint.java:63) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:246) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119) . 2020-09-27 11:35:40,937 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not start cluster entrypoint YarnJobClusterEntrypoint. org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:182) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(Y
看到這里的錯誤搜索了半天也沒有。也沒找到答案。最后發現我們新做的emr集群當中的flink的版本是1.10.0但是原來那個emr集群當中的版本是1.9.0。然后去找pom文件將版本修改正確
然后執行一下代碼發現可以了,至此問題得到解決了。
從這個錯誤當中可以看出遇到 NoSuchMethodError 一般都是出現了組件的版本和代碼的版本不同造成的。