在spark streaming集成kafka时,如何处理其偏移量的问题,由于spark streaming自带的checkpoint弊端非常明显,所以一些对数据一致性要求比较高的项目里面,不建议采用其自带的checkpoint来做故障恢复。
在spark streaming1.3之后的版本支持direct kafka stream,这种策略更加完善,放弃了原来使用Kafka的高级API自动保存数据的偏移量,之后的版本采用Simple API也就是更加偏底层的api,我们既可以用checkpoint来容灾,也可以通过低级api来获取偏移量自己管理偏移量,这样以来无论是程序升级,还是故障重启,在框架端都可以做到Exact One准确一次的语义。
本文主要介绍通过spark python api来处理sparkStreaming消费kafka的消息,通过zookeeper来手动管理kafka偏移量
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
from pyspark import SparkConf, SparkContext
from pyspark import SparkContext
from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.streaming.kafka import TopicAndPartition ZOOKEEPER_SERVERS = "172.16.1.225:2181" def get_zookeeper_instance(): from kazoo.client import KazooClient if 'KazooSingletonInstance' not in globals(): globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS) globals()['KazooSingletonInstance'].start() return globals()['KazooSingletonInstance'] def save_offsets(rdd): zk = get_zookeeper_instance() for offset in offsetRanges: path = f"/test/{offset.topic}/{offset.partition}" zk.ensure_path(path) print(offset.untilOffset) zk.set(path, str(offset.untilOffset).encode()) for i in rdd.collect(): print(i) def read_offsets(zk, topics): from_offsets = {} for topic in topics: if not zk.exists(f'/test/{topic}'): # zk.create(f'/test/{topic}') zk.ensure_path(f'/test/{topic}') else: for partition in zk.get_children(f'/test/{topic}'): topic_partion = TopicAndPartition(topic, int(partition)) offset = int(zk.get(f'/test/{topic}/{partition}')[0]) from_offsets[topic_partion] = offset print(from_offsets) return from_offsets def trans_info(rdd): result = rdd.map(lambda row: json.loads(row[1])['request_body'] if json.loads(row[1])['@fields'][ 'request'] == 'POST /api-wechat/alarm/alarmServer HTTP/1.0' else None). \ filter(lambda row: row is not None).map(lambda row: json.loads(row)[0]). \ filter(lambda row: row['cmd'] == '021003'). \ map(lambda row: ( row['device_code'], row['data']['temperature1'], row['data']['battery'], row['data']['run_time'], row['time'])) global offsetRanges offsetRanges = rdd.offsetRanges() for o in offsetRanges: print("%s %s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset, o.untilOffset - o.fromOffset)) return result def start(): # os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" sconf = SparkConf() sconf.set('spark.cores.max', 4) sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf) ssc = StreamingContext(sc, 4) brokers = "172.16.1.225:9092,172.16.1.226:9092,172.16.1.227:9092" topic = ['logfile-memory-kafka'] zk = get_zookeeper_instance() from_offsets = read_offsets(zk, topic) kafkaStreams = KafkaUtils.createDirectStream(ssc, topics=['logfile-memory-kafka'], kafkaParams={"metadata.broker.list": brokers}, fromOffsets=from_offsets) kafkaStreams.transform(trans_info).foreachRDD(save_offsets) ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
if __name__ == '__main__':
start()