storm集群環境部署


一、storm集群環境部署
1、集群環境下storm包部署:
(1)必須將項目依賴的本地lib目錄下的jar包放入集群子節點apache-storm-2.0.0 安裝目錄的lib-worker 和extlib目錄下,和主節點的extlib目錄下。否則報
ClassNotFoundException: org.springframework.context.ApplicationContext (lib-worker目錄下原來就有環境依賴的jar包,所以比extlib下的jar包多)

執行的 storm jar 包放在apache-storm-2.0.0/bin 目錄下,和兩個子節點的lib-worker下。

2、集群環境啟動

cd /home/apache-storm-2.0.0/bin

在主備節點:
storm nimbus >/dev/null 2>&1 &
storm ui >/dev/null &


在子節點和備節點:
storm supervisor >/dev/null 2>&1 &

查看啟動狀態:
ps -ef | grep daemon.nimbus
使用瀏覽器在頁面上查看拓撲的狀態和集群環境運行狀態http://xxx.xxx.xxx.xxx:8080/

3、拓撲的提交與停止:

拓撲提交:
storm jar stormtest-0.0.1-SNAPSHOT.jar com.stormtest.SpringbootApplication APP1 111

storm jar stormtest-0.0.1-SNAPSHOT.jar com.stormtest.SpringbootApplication APP1 111 >/home/apache-storm-2.0.0/bin/info.log &


殺掉拓撲:

cd /home/apache-storm-2.0.0/bin
storm kill 拓撲名

后台查看運行的拓撲:
storm list

4、日志查看:

在子節點/home/apache-storm-2.0.0/logs/workers-artifacts 下找到拓撲名稱, 去查看里面的worker.log 日志。

 

二、springboot集成storm 集群環境部署的問題與解決方法

1、解決slf4j沖突
java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND bound slf4j-log4j12.jar on t

log4j-slf4j-impl-2.11.2.jar
slf4j-log4j12-1.7.25.jar

目前是在集群上lib-worker目錄下刪除了上面兩個jar,不刪除在工作節點不打印worker.log 日志 。

log4j-slf4j-impl-2.11.1.jar 必須在lib-worker下存在,不存在不打印worker.log 日志 。


lib-worker下如果有虛無境的 kafkastorm的jar包 就會報(LoggerFactory is not a Logback LoggerContext but Logback is on the classpath.)


2、在集群上父pom文件加 <scope>provided</scope> 解決yaml 沖突,在本地運行時要注釋掉。

 

3、將lib包 上傳到lib-worker 目錄下的時候,刪除掉 storm開頭的6個jar包。負責會報文件沖突(和2一樣),要保留lib-worker目錄下 自帶的兩個storm的jar包。

4、 讀取不到依賴工程的 配置文件?
必須把要執行的jar包的properties 配置文件放在config目錄下。


5、不在log4j2.xml指定目錄打印日志
必須在屬性文件中指定日志文件所在位置,在bin目錄提交任務的時候 不能指定日志輸出目錄。

6、libworker 下不能有能運行的提交的jar包其他名稱的版本,會報類的序列號不一致。

 


7、本地啟動的時候不能設置為2個 worker,否則跑一會 沒有任何報錯進程就斷掉了。


8、同時拉起兩個worker,
將兩個bolt 分發到5.2X4上,5.2X4上要啟動springApplication就會打印 consumer.log日志,就報了
javax.management.InstanceAlreadyExistsException: com.alibaba.druid.filter.stat:name=statFilter,type=StatFilter

將兩個spout分發到了 4.2X3上,不需要啟動springApplication,因此沒有打印consumer.log日志。

停了子節點,上面的worker進程依然存在,worker.log一直打印日志,停了worker進程,會立刻在同一節點拉起來。啟動的時候沒啟動成功的時候會重復多次去拉prepare方法。

同一個worker內,spring只被初始化一次。

 

9、

當設置了兩個 worker 進程config.setNumWorkers(2) 時, Bolt emit發射的對象不能繼承父類,當繼承了父類屬性后 ,通過this.collector.emit( "common",new Values(Ball)) 發送給下級bolt數據后,下級bolt通過

Bolt  bolt  =(input.getValueByField( "common") 獲取Ball對象的時候會導致獲取不到 父類的屬性 。

原因是 發射后取對象的時候就取不到父類的屬性。

10、為了能使用spring上下文環境中的bean,spout的open()方法和bolt的prepare()方法中要加上 springboot的啟動語句,ConfigurableApplicationContext mycontext = SpringApplication.run(SpringbootApplication.class);為了防止同一個worker 進程內 同時拉起多個spring環境導致端口暫用沖突,要對全局靜態變量 mycontext 做判斷,為空並且沒有在拉起的時候才啟動,有拉起的時候等待。

11、spout 對接kafka的時候,在open方法里要創建kafka消費對象,每個spout都要new 自己的消費對象,否則會報消費者被占用的錯誤。

private void kafkaInit()

{
 
this.consumer=new KafkaConsumer(msgKafkaConsumerConfig.consumerKafkaConfigs());
String topic = "nihao";
this.consumer.subscribe(Arrays.asList(new String[] { topic }));
 
}

 

三、集群環境下各個節點的分工

nimbus 將客戶端提交的topology代碼保存到本地目錄/nimbus/stormdist/下

nimbus 分配的任務存放在zk節點/storm/assignments下:

進入zk客戶端:
cd /zookeeper/bin
./zkCli.sh
get /storm/assignments
ls /storm/assignments

ls /storm/assignments/wn0827-1-1566868207
quit


子節點從nimbus中獲取到的內容放在:
/home/apache-storm-2.0.0/storm-local/supervisor/stormdist/test1-6-1565944602

stormjar.jar

 

 四、storm並行理解與運行狀態解析

       每個子節點上可以有多個worker進程,每個worker進程只屬於一個拓撲,一個worker進程可以啟動多個excutor線程,每個excutor線程只能運行一個bolt或spout的一個或多個實例(默認一個excutor只運行一個task實例),但可以給一個bolt 設置並行度,可以設置2個excutor去執行,這兩個excutor可能在一個worker上也可能在兩個worker上。可以設置task的並行度,但task的並行度要大於等於excutor,
       拓撲一旦提交后task的數量是不會變得,如果一個bolt的task數量設置為1,后面通過命令修改它的excutor的並行度為2,這個是修改不了的,因為同一個組件的excutor數量一定小於等於task的數量。可以設置task為4,excutor為2,但如果設置了task為1,excutor為2,那個系統也只會給它分配一個excutor線程。

動態改變並行度:
storm rebalance consumer_APP2 -n 2 -e bastionSpout=2

 

Storm UI 運行狀態解析:

Version Supervisors Used slots Free slots Total slots Executors Tasks
2.0.0        2                       2               6                8         14              14


兩個子節點,使用了兩個worker進程, 還有6個進程可以使用, 一共有8個進程, 共啟了14個線程, 共有14個task實例再運行。


spout 發射給bolt的數據,bolt如果不取不處理,過了30秒的超時時間,這條消息會faield,可以在storm ui上看到,這時會重新發送,導致惡行循環失敗。

 

五、storm 參數調優

參數解析:

1、 supervisor.slots.ports:worker進程的接收線程的監聽端口;

2、 topology.receiver.buffer.size:worker接收線程緩存消息的大小,它將該緩存消息發送給executor線程;需要為2的倍數
是worker的 receive thread 一次追加給 executor’s incoming queue 的最大消息數量。設置大了會引起各種問題。

3、 topology.transfer.buffer.size:worker進程中向外發送消息的緩存大小; 32
每個工作進程都有一個發送線程,負責從工作進程的傳輸隊列中讀取消息並通過網絡將它們發送給下游消費者。

4、 topology.executor.receive.buffer.size:executor線程的接收隊列大小;需要為2的倍數 16384 incoming queue

5、 topology.executor.send.buffer.size:executor線程的發送隊列大小;需要為2的倍數 outgoing queue

 

數據在storm里的處理流程如下:

每個worker有一個接受線程和一個發送線程,接受線程負責將接受到的消息receive queue(topology.receiver.buffer.size)發送給合適excutor的incoming queue隊列(topology.executor.receive.buffer.size)。

每個excutor有一個task線程有一個發送線程,task線程負責將 incoming queue 中的數據處理后放入excutor的 outgoing queue; 發送線程負責將excutor的 outgoing queue中( topology.executor.send.buffer.size)的數據發送到worker進程的 transfer queue中(topology.transfer.buffer.size)。

worker 的發送線程負責將transfer queue中的數據通過網絡發送給其他worker ;

只有excutor的outgoing queue中 放的是單個的tuple,其他 queue里每個元素放的都是tuple的list。所以其他queue配置數據偏小。

storm 進程內通訊 參數說明可參考:
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/


拓撲最優參數配置:

conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);

conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);

conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);

conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM