在spark streaming集成kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對數據一致性要求比較高的項目里面,不建議采用其自帶的checkpoint來做故障恢復。
在spark streaming1.3之后的版本支持direct kafka stream,這種策略更加完善,放棄了原來使用Kafka的高級API自動保存數據的偏移量,之后的版本采用Simple API也就是更加偏底層的api,我們既可以用checkpoint來容災,也可以通過低級api來獲取偏移量自己管理偏移量,這樣以來無論是程序升級,還是故障重啟,在框架端都可以做到Exact One准確一次的語義。
本文主要介紹通過spark python api來處理sparkStreaming消費kafka的消息,通過zookeeper來手動管理kafka偏移量
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
from pyspark import SparkConf, SparkContext
from pyspark import SparkContext
from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.streaming.kafka import TopicAndPartition ZOOKEEPER_SERVERS = "172.16.1.225:2181" def get_zookeeper_instance(): from kazoo.client import KazooClient if 'KazooSingletonInstance' not in globals(): globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS) globals()['KazooSingletonInstance'].start() return globals()['KazooSingletonInstance'] def save_offsets(rdd): zk = get_zookeeper_instance() for offset in offsetRanges: path = f"/test/{offset.topic}/{offset.partition}" zk.ensure_path(path) print(offset.untilOffset) zk.set(path, str(offset.untilOffset).encode()) for i in rdd.collect(): print(i) def read_offsets(zk, topics): from_offsets = {} for topic in topics: if not zk.exists(f'/test/{topic}'): # zk.create(f'/test/{topic}') zk.ensure_path(f'/test/{topic}') else: for partition in zk.get_children(f'/test/{topic}'): topic_partion = TopicAndPartition(topic, int(partition)) offset = int(zk.get(f'/test/{topic}/{partition}')[0]) from_offsets[topic_partion] = offset print(from_offsets) return from_offsets def trans_info(rdd): result = rdd.map(lambda row: json.loads(row[1])['request_body'] if json.loads(row[1])['@fields'][ 'request'] == 'POST /api-wechat/alarm/alarmServer HTTP/1.0' else None). \ filter(lambda row: row is not None).map(lambda row: json.loads(row)[0]). \ filter(lambda row: row['cmd'] == '021003'). \ map(lambda row: ( row['device_code'], row['data']['temperature1'], row['data']['battery'], row['data']['run_time'], row['time'])) global offsetRanges offsetRanges = rdd.offsetRanges() for o in offsetRanges: print("%s %s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset, o.untilOffset - o.fromOffset)) return result def start(): # os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" sconf = SparkConf() sconf.set('spark.cores.max', 4) sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf) ssc = StreamingContext(sc, 4) brokers = "172.16.1.225:9092,172.16.1.226:9092,172.16.1.227:9092" topic = ['logfile-memory-kafka'] zk = get_zookeeper_instance() from_offsets = read_offsets(zk, topic) kafkaStreams = KafkaUtils.createDirectStream(ssc, topics=['logfile-memory-kafka'], kafkaParams={"metadata.broker.list": brokers}, fromOffsets=from_offsets) kafkaStreams.transform(trans_info).foreachRDD(save_offsets) ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
if __name__ == '__main__':
start()