pyspark通过zookeeper管理kafka偏移量


  在spark streaming集成kafka时,如何处理其偏移量的问题,由于spark streaming自带的checkpoint弊端非常明显,所以一些对数据一致性要求比较高的项目里面,不建议采用其自带的checkpoint来做故障恢复。

在spark streaming1.3之后的版本支持direct kafka stream,这种策略更加完善,放弃了原来使用Kafka的高级API自动保存数据的偏移量,之后的版本采用Simple API也就是更加偏底层的api,我们既可以用checkpoint来容灾,也可以通过低级api来获取偏移量自己管理偏移量,这样以来无论是程序升级,还是故障重启,在框架端都可以做到Exact One准确一次的语义。

  本文主要介绍通过spark python api来处理sparkStreaming消费kafka的消息,通过zookeeper来手动管理kafka偏移量

 
  
#! /usr/bin/env python3
# -*- coding: utf-8 -*-

from pyspark import SparkConf, SparkContext
from pyspark import SparkContext
from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.streaming.kafka import TopicAndPartition ZOOKEEPER_SERVERS = "172.16.1.225:2181"


def get_zookeeper_instance(): from kazoo.client import KazooClient if 'KazooSingletonInstance' not in globals(): globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS) globals()['KazooSingletonInstance'].start() return globals()['KazooSingletonInstance'] def save_offsets(rdd): zk = get_zookeeper_instance() for offset in offsetRanges: path = f"/test/{offset.topic}/{offset.partition}" zk.ensure_path(path) print(offset.untilOffset) zk.set(path, str(offset.untilOffset).encode()) for i in rdd.collect(): print(i) def read_offsets(zk, topics): from_offsets = {} for topic in topics: if not zk.exists(f'/test/{topic}'): # zk.create(f'/test/{topic}')
            zk.ensure_path(f'/test/{topic}') else: for partition in zk.get_children(f'/test/{topic}'): topic_partion = TopicAndPartition(topic, int(partition)) offset = int(zk.get(f'/test/{topic}/{partition}')[0]) from_offsets[topic_partion] = offset print(from_offsets) return from_offsets def trans_info(rdd): result = rdd.map(lambda row: json.loads(row[1])['request_body'] if json.loads(row[1])['@fields'][ 'request'] == 'POST /api-wechat/alarm/alarmServer HTTP/1.0' else None). \ filter(lambda row: row is not None).map(lambda row: json.loads(row)[0]). \ filter(lambda row: row['cmd'] == '021003'). \ map(lambda row: ( row['device_code'], row['data']['temperature1'], row['data']['battery'], row['data']['run_time'], row['time'])) global offsetRanges offsetRanges = rdd.offsetRanges() for o in offsetRanges: print("%s %s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset, o.untilOffset - o.fromOffset)) return result def start(): # os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
    sconf = SparkConf() sconf.set('spark.cores.max', 4) sc = SparkContext(appName='KafkaDirectWordCount', conf=sconf) ssc = StreamingContext(sc, 4) brokers = "172.16.1.225:9092,172.16.1.226:9092,172.16.1.227:9092" topic = ['logfile-memory-kafka'] zk = get_zookeeper_instance() from_offsets = read_offsets(zk, topic) kafkaStreams = KafkaUtils.createDirectStream(ssc, topics=['logfile-memory-kafka'], kafkaParams={"metadata.broker.list": brokers}, fromOffsets=from_offsets) kafkaStreams.transform(trans_info).foreachRDD(save_offsets) ssc.start() # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate
if __name__ == '__main__':
start()
 

 

 

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM