一、
1、生產者 產生數據
package kafakaTohbase;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zk.connect", KafkaProperties.zkConnect);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "hdjt01:9092,hdjt02:9092,hdjt03:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (int i = 0; i < 10; i++){
producer.send(new KeyedMessage<String, String>("test5", "liu" + i));
}
}
}
注: props.put("serializer.class", "kafka.serializer.StringEncoder") 發送的數據是String,
還可以是 二進制數組形式:
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder"); 如果沒有這個,就代表 key也是二進制形式。
生產者發送的都是keyvalue對
2、消費者
package kafakaTohbase;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaConsumer extends Thread{
private final ConsumerConnector consumer;
private final String topic;
public KafkaConsumer(String topic) {
consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId1);
props.put("zookeeper.session.timeout.ms", "40000"); //zookeeper 與 region server 的鏈接超時時間
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
//props.put("auto.offset.reset", "smallest");//可以讀取舊數據,默認不讀取
return new ConsumerConfig(props);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
HBaseUtils hbase = new HBaseUtils();
while (it.hasNext()) { //相當於加了一把鎖,一直返回true
// System.out.println("3receive:" + it.next().message());
try {
System.out.println("11111");
hbase.put(new String(it.next().message()));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// try {
// sleep(300); // 每條消息延遲300ms
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
}
}
連接hbase,配置信息
package kafakaTohbase;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseUtils {
public void put(String string) throws IOException {
//設置HBase據庫的連接配置參數
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hdjt01:2181,hdjt02:2181,hdjt03:2181"); // Zookeeper的地址
// conf.set("hbase.zookeeper.property.clientPort", "42182");
Random random = new Random();
long a = random.nextInt(1000000000);
String tableName = "emp";
String rowkey = "rowkey"+a ;
String columnFamily = "basicinfo";
String column = "empname";
//String value = string;
HTable table=new HTable(conf, tableName);
Put put=new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(string));
table.put(put);//放入表
System.out.println("放入成功");
table.close();//釋放資源
}
}
測試消費者:
public class Kafkaceshi {
public static void main(String[] args) {
// KafkaProducer a=new KafkaProducer ();
// a.producer();
KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
consumerThread.run();
}
}
