java SASL_SSL 帳號密碼 方式訪問 kafka


java SASL_SSL 帳號密碼 方式訪問 kafka

Producer Java Sample java生產者:

Properties props = new Properties();
props.put("bootstrap.servers", "*******:9092,*******:9092");
props.put("acks", "all");//
props.put("retries", 3);
props.put("batch.size", 106384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_SSL");
props.put("ssl.truststore.location", "D:/client_truststore.jks");
props.put("ssl.truststore.password", "WSO2_sp440");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf_crm' password='xxxxxxx';"); //注意passwod結尾的分號一定不要漏

props.put("ssl.endpoint.identification.algorithm", "");

long sys = System.currentTimeMillis();

String contractId=CRM_ContractID
String payload = "payload";
Producer<String, String> producer = new KafkaProducer<>(props);

//Synchronized Mode, Producer will wait and block until Kafka Server return response

try{

Future future =producer.send(new ProducerRecord<>("CRM_Contract", contractId, payload));// (topic, key, payload),the second parameter is the key
future.get();//。 If not care whether success or failure , no need this code

producer.close();

} catch(Exception e) {
e.printStackTrace();// Connection, No Leader error can be resolved by retry; but too large message error will not re-try and throw exception immediately
}

//Asynchronized mode, Producer not wait for response, Background process of Producer submit message to Kafka server by Batch size. It need callback to handle whether message is sent to Kafka Server. If error happen ,need to log the exception.

try{

producer.send(new ProducerRecord<>("CRM_Contract", contractId, payload),new Callback() {

public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());}}});

}catch(Exception e) {

e.printStackTrace();

}

 

Consumer Java Sample java消費者:

Properties props = new Properties();

props.put("bootstrap.servers", "*******:9092");

props.put("group.id", "wso2_sp");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "G:\\client_truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "WSO2_sp440");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf_xxx' password='xxxxx';");//注意passwod結尾的分號一定不要漏

props.put("ssl.endpoint.identification.algorithm", "");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "file_poc";
consumer.subscribe(Arrays.asList(topic));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM