在做分布式編譯的時候,每一個worker都有一個consumer,適用的kafka+zookeep的配置都是默認的配置,在消息比較少的情況下,每一個consumer都能均勻得到互不相同的消息,但是當消息比較多的時候,這個時候consumer就有重復消費的情況。
如何排查這種問題呢?
最開始以為是系統資源的瓶頸,編譯worker和kafka cluster都在一個集群上,導致消息同步不及時,所以就另外找了幾個機器,kafka和編譯worker分開。但是還是會遇到上述問題。
然后就想集群環境一致性最重要的一個條件是時間,然后將所有的機器都設置了統一的時間服務器,結果還是不行。
最后查看自己寫的代碼,將緩存隊列開大一些,竟然沒有重復了。
心想可能是自己使用的方式有點問題:下面是分析的過程
1: 分析重復的情況是什么?是由於不同hosts上重復的消費,還是相同hosts上的重復消費?
經過對比測試,是屬於相同hosts上的重復消費,間隔大概為20個message
2: kafka的消息隊列模型是什么?
- 一個topic: tizen-unified
- 三個partition:0,1,2
- 一個producer, 三個consumer。
- 每個consumer 端設置一個緩存隊列,當隊列滿的時候,consumer線程進入阻塞的狀態,這個時候編譯進程從隊列中取package進行編譯,直到隊列為空為止。
如下是consumer端的代碼:
#!/usr/bin/env python
# vim: ai ts=4 sts=4 et sw=4
"""gbs-worker - distributed system worker module."""
import os
import re
import Queue
import time
import threading
import multiprocessing
from multiprocessing import Process
from kafka import KafkaConsumer
from kafka import KafkaProducer
class Producer(Process):
def __init__(self):
super(Producer, self).__init__()
self.messageHandler = KafkaConsumer('tizen-unified',group_id='tizen-worker',bootstrap_servers='109.123.100.144:9092')
def run(self):
print "start produce, os pid: %d" % (os.getpid())
for message in self.messageHandler: # which will resume if there is a message from kafka
while(WorkerPool.taskQueue.full() is True):
time.sleep(5)
print "put package %s into queue" % message.value
WorkerPool.taskQueue.put(message.value)
class GbsBuild(object):
errorRule = re.compile(r"some packages failed to be built")
def __init__(self,packageName, id):
super(GbsBuild, self).__init__()
self.sourcePath = "/home/scm/renjg/workspace"
self.logPath = "/home/scm/renjg/logs"
self.gbsPath = "/usr/bin/gbs"
self.gbsconf = self.sourcePath+"/.gbs.conf"
self.gbsRoot = "/home/scm/GBS-ROOT"
self.packageName = packageName
self.threadId = id
def build(self):
os.system("cd %s" % (self.sourcePath))
os.system("mkdir -p %s" % self.logPath)
result = os.popen("gbs -c %s build -A armv7l -B %s-%d --binary-list %s --clean >%s/%s.log 2>&1" % (self.gbsconf,self.gbsRoot, self.threadId, self.packageName, self.logPath, self.packageName)).read()
if GbsBuild.errorRule.findall(result):
print "%s build error \n log file is: %s/%s.log" % (self.packageName, self.logPath, self.packageName)
return "Fail"
os.system("scp -r %s renjg@109.123.123.6:/home/renjg/GBS-ROOT/local/repos/unified_standard/" % (self.gbsRoot+"-"+str(self.threadId)+"/local/repos/unified_standard/armv7l/"))
print "%s package in process %d ,build done, copy done" % (self.packageName, self.threadId)
return "Success"
class Consumer(Process):
def __init__(self, threadId, partition = 0):
super(Consumer,self).__init__()
self.partition = partition
self.threadId = threadId
self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.144:9092")
def run(self):
print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid())
while True:
while WorkerPool.taskQueue.empty() is True:
time.sleep(1)
packageName = WorkerPool.taskQueue.get()
print "thread %d start %s package " % (self.threadId, packageName)
gbsbuild = GbsBuild(packageName,self.threadId)
print "thread %d building %s package" % (self.threadId, packageName)
if gbsbuild.build() == "Success":
#if True:
result = self.messageHandler.send("tizen-unified-status", value = "succeed", key = packageName, partition=0)
if(result.get(60)):
print "send success"
else:
print "send fail"
else:
result = self.messageHandler.send("tizen-unified-status", value = "failed", key = packageName, partition=0)
if(result.get(60)):
print "send success"
else:
print "send fail"
class WorkerPool(object):
capcaticy = 4
curThreadNum = 0
taskQueue = multiprocessing.Queue(capcaticy*100) #如果taskQueue很小的話,那么就會出現producer重復的獲取消息的情況。
def __init__(self):
self.producer = Producer()
self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)]
def start(self):
print "start Worker pool"
self.producer.start()
for i in range(0, WorkerPool.capcaticy):
self.consumers[i].start()
self.producer.join()
for i in range(0, WorkerPool.capcaticy):
self.consumers[i].join()
wp = WorkerPool()
wp.start()
print "Done"
當前的改進思路是:擴大WorkerPool.taskQueue的容量,當一次poll就獲取足夠多的message,然后consumer慢慢處理。
還有一種改進思路是:去掉producer,直接在每一個consumer中間創建一個connection, 把互斥的任務交給kafka去做。待測試。
