Storm處理流程, 基本參數配置


配置選項名稱

配置選項作用

topology.max.task.parallelism

每個Topology運行時最大的executor數目

topology.workers

每個Topology運行時的worker的默認數目,若在代碼中設置,則此選項值被覆蓋

storm.zookeeper.servers

zookeeper集群的節點列表

storm.local.dir

Storm用於存儲jar包和臨時文件的本地存儲目錄

storm.zookeeper.root

Storm在zookeeper集群中的根目錄,默認是“/”

ui.port

Storm集群的UI地址端口號,默認是8080

nimbus.host:

Nimbus節點的host

supervisor.slots.ports

Supervisor 節點的worker占位槽,集群中的所有Topology公用這些槽位數,即使提交時設置了較大數值的槽位數,系統也會按照當前集群中實際剩余的槽位數來 進行分配,當所有的槽位數都分配完時,新提交的Topology只能等待,系統會一直監測是否有空余的槽位空出來,如果有,就再次給新提交的 Topology分配

supervisor.worker.timeout.secs

Worker的超時時間,單位為秒,超時后,Storm認為當前worker進程死掉,會重新分配其運行着的task任務

drpc.servers

在使用drpc服務時,drpc server的服務器列表

drpc.port

在使用drpc服務時,drpc server的服務端口

 

本地模式下, 基本並發度控制

conf.setMaxTaskParallelism(5);   本地模式下一個組件能夠運行的最大線程數

builder.setSpout("spout", new RandomSentenceSpout(), 10);  最后的參數parallelism_hint 表示executor的數目,每個作為一個thread在work下工作,  但是如果超過setMaxTaskParallelism定義的上限,則使用setMaxTaskParallelism設置的TOPOLOGY_MAX_TASK_PARALLELISM

builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(2);  ,task的數目,默認和executor是1:1 的關系,就是每個task運行在一個物理線程上,

在這里設置的是taskNum為2,executor 是5,表示RandomSentenceSpout創建2次,實際只有兩個2個executor,  executor不能超過NumTask

builder.setSpout("spout", new RandomSentenceSpout(), 2).setNumTasks(5);   

在這里設置的是taskNum為5,executor 是2, 表示RandomSentenceSpout創建5次,2個executor在兩個物理線程上執行,  每個executor執行1/2的任務

這么寫感覺意義都不大, 只是個人為了理解storm executor task概念, 在0.8以后,幾個executor有可能是共用一個物理線程,由上面測試能看出。

突然想起這個其實還是有好處的,因為在storm中 TaskNum是靜態的, executor是動態的, 比如tasknum是5,exector是2,這時候是在兩個物理線程執行, 如果我們將executor改成3,  這時會變成在3個物理線程上執行,提高了並發性. 物理線程公式應該Min(executor, tasknum),  這個未在任何文檔上見過,個人的一個推斷.

動態調整參數

# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.

$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

builder.setBolt("split", new SplitSentence(), 8).setNumTasks(1).shuffleGrouping("spout"); 這里和上面一樣,會負載均衡地放入一個線程中運行

conf.setDebug(true);                         //
conf.setMaxSpoutPending(2);          //  這個設置一個spout task上面最多有多少個沒有處理(ack/fail)的tuple,防止tuple隊列過大, 只對可靠任務起作用
conf.setMessageTimeoutSecs(1);    //  消息處理延時, 就是消息超過延時后, emit發射源會認為是fail , storm默認是30秒,如果實現的為Irichbolt接口,沒有ack和ack延時都會觸發,這個時間過短的話,如果自定義重發,bolt可能會多處理,tuple在發射過程中, 但是還沒有到達bolt, 但是已經延時了,emit發射源會認為已經失敗了,但是bolt還是收到這個tuple, 所以storm引入了事務拓撲,0.8以后叫trident. 如果實現的為IBaseBolt,則只會在延時情況下觸發, 默認會調用ack,但是這個ack如果有再次發射, 這個ack就會自動錨定了.

根據具體業務需求選擇合適的Bolt
conf.setNumAckers(2);                     //  消息處理的acker數量.默認1,可以根據實際處理情況調大

真實環境

conf.setNumWorkers(5); // 設置工作進程 ,  如果不添加端口, 默認會是4個worker進程

需要在storm.yaml下添加端口

supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704

每個worker使用一個端口. 

 在uI窗口是spout bolt acker幾個的累加.

storm.yaml參數參考  

配置項 配置說明
storm.zookeeper.servers ZooKeeper服務器列表
storm.zookeeper.port ZooKeeper連接端口
storm.local.dir storm使用的本地文件系統目錄(必須存在並且storm進程可讀寫)
storm.cluster.mode Storm集群運行模式([distributed|local])
storm.local.mode.zmq Local模式下是否使用ZeroMQ作消息系統,如果設置為false則使用java消息系統。默認為false
storm.zookeeper.root ZooKeeper中Storm的根目錄位置
storm.zookeeper.session.timeout 客戶端連接ZooKeeper超時時間
storm.id 運行中拓撲的id,由storm name和一個唯一隨機數組成。
nimbus.host nimbus服務器地址
nimbus.thrift.port nimbus的thrift監聽端口
nimbus.childopts 通過storm-deploy項目部署時指定給nimbus進程的jvm選項
nimbus.task.timeout.secs 心跳超時時間,超時后nimbus會認為task死掉並重分配給另一個地址。
nimbus.monitor.freq.secs nimbus檢查心跳和重分配任務的時間間隔.注意如果是機器宕掉nimbus會立即接管並處理。
nimbus.supervisor.timeout.secs supervisor的心跳超時時間,一旦超過nimbus會認為該supervisor已死並停止為它分發新任務.
nimbus.task.launch.secs task啟動時的一個特殊超時設置.在啟動后第一次心跳前會使用該值來臨時替代nimbus.task.timeout.secs.
nimbus.reassign 當發現task失敗時nimbus是否重新分配執行。默認為真,不建議修改。
nimbus.file.copy.expiration.secs nimbus判斷上傳/下載鏈接的超時時間,當空閑時間超過該設定時nimbus會認為鏈接死掉並主動斷開
ui.port Storm UI的服務端口
drpc.servers DRPC服務器列表,以便DRPCSpout知道和誰通訊
drpc.port Storm DRPC的服務端口
supervisor.slots.ports supervisor上能夠運行workers的端口列表.每個worker占用一個端口,且每個端口只運行一個worker.通過這項配置可以調整每台機器上運行的worker數.(調整slot數/每機)
supervisor.childopts 在storm-deploy項目中使用,用來配置supervisor守護進程的jvm選項
supervisor.worker.timeout.secs supervisor中的worker心跳超時時間,一旦超時supervisor會嘗試重啟worker進程.
supervisor.worker.start.timeout.secs supervisor初始啟動時,worker的心跳超時時間,當超過該時間supervisor會嘗試重啟worker。因為JVM初始啟動和配置會帶來的額外消耗,從而使得第一次心跳會超過supervisor.worker.timeout.secs的設定
supervisor.enable supervisor是否應當運行分配給他的workers.默認為true,該選項用來進行Storm的單元測試,一般不應修改.
supervisor.heartbeat.frequency.secs supervisor心跳發送頻率(多久發送一次)
supervisor.monitor.frequency.secs supervisor檢查worker心跳的頻率
worker.childopts supervisor啟動worker時使用的jvm選項.所有的”%ID%”字串會被替換為對應worker的標識符
worker.heartbeat.frequency.secs worker的心跳發送時間間隔
task.heartbeat.frequency.secs task匯報狀態心跳時間間隔
task.refresh.poll.secs task與其他tasks之間鏈接同步的頻率.(如果task被重分配,其他tasks向它發送消息需要刷新連接).一般來講,重分配發生時其他tasks會理解得到通知。該配置僅僅為了防止未通知的情況。
topology.debug 如果設置成true,Storm將記錄發射的每條信息。
topology.optimize master是否在合適時機通過在單個線程內運行多個task以達到優化topologies的目的.
topology.workers 執行該topology集群中應當啟動的進程數量.每個進程內部將以線程方式執行一定數目的tasks.topology的組件結合該參數和並行度提示來優化性能
topology.ackers topology中啟動的acker任務數.Acker保存由spout發送的tuples的記錄,並探測tuple何時被完全處理.當Acker探測到tuple被處理完畢時會向spout發送確認信息.通常應當根據topology的吞吐量來確定acker的數目,但一般不需要太多.當設置為0時,相當於禁用了消息可靠性,storm會在spout發送tuples后立即進行確認.
topology.message.timeout.secs topology中spout發送消息的最大處理超時時間.如果一條消息在該時間窗口內未被成功ack,Storm會告知spout這條消息失敗。而部分spout實現了失敗消息重播功能。
topology.kryo.register 注冊到Kryo(Storm底層的序列化框架)的序列化方案列表.序列化方案可以是一個類名,或者是com.esotericsoftware.kryo.Serializer的實現.
topology.skip.missing.kryo.registrations Storm是否應該跳過它不能識別的kryo序列化方案.如果設置為否task可能會裝載失敗或者在運行時拋出錯誤.
topology.max.task.parallelism 在一個topology中能夠允許的最大組件並行度.該項配置主要用在本地模式中測試線程數限制.
topology.max.spout.pending 一個spout task中處於pending狀態的最大的tuples數量.該配置應用於單個task,而不是整個spouts或topology.
topology.state.synchronization.timeout.secs 組件同步狀態源的最大超時時間(保留選項,暫未使用)
topology.stats.sample.rate 用來產生task統計信息的tuples抽樣百分比
topology.fall.back.on.java.serialization topology中是否使用java的序列化方案
zmq.threads 每個worker進程內zeromq通訊用到的線程數
zmq.linger.millis 當連接關閉時,鏈接嘗試重新發送消息到目標主機的持續時長.這是一個不常用的高級選項,基本上可以忽略.
java.library.path JVM啟動(如Nimbus,Supervisor和workers)時的java.library.path設置.該選項告訴JVM在哪些路徑下定位本地庫.

storm內默認參數
java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib"
   
  ### storm.* configs are general configurations
  # the local dir is where jars are kept
  storm.local.dir: "storm-local"
  storm.zookeeper.servers:
  - "localhost"
  storm.zookeeper.port: 2181
  storm.zookeeper.root: "/storm"
  storm.zookeeper.session.timeout: 20000
  storm.zookeeper.connection.timeout: 15000
  storm.zookeeper.retry.times: 5
  storm.zookeeper.retry.interval: 1000
  storm.zookeeper.retry.intervalceiling.millis: 30000
  storm.cluster.mode: "distributed" # can be distributed or local
  storm.local.mode.zmq: false
  storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
  storm.messaging.transport: "backtype.storm.messaging.netty.Context"
  storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
   
  ### nimbus.* configs are for the master
  nimbus.host: "localhost"
  nimbus.thrift.port: 6627
  nimbus.thrift.max_buffer_size: 1048576
  nimbus.childopts: "-Xmx1024m"
  nimbus.task.timeout.secs: 30
  nimbus.supervisor.timeout.secs: 60
  nimbus.monitor.freq.secs: 10
  nimbus.cleanup.inbox.freq.secs: 600
  nimbus.inbox.jar.expiration.secs: 3600
  nimbus.task.launch.secs: 120
  nimbus.reassign: true
  nimbus.file.copy.expiration.secs: 600
  nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator"
   
  ### ui.* configs are for the master
  ui.port: 8080
  ui.childopts: "-Xmx768m"
   
  logviewer.port: 8000
  logviewer.childopts: "-Xmx128m"
  logviewer.appender.name: "A1"
   
   
  drpc.port: 3772
  drpc.worker.threads: 64
  drpc.queue.size: 128
  drpc.invocations.port: 3773
  drpc.request.timeout.secs: 600
  drpc.childopts: "-Xmx768m"
   
  transactional.zookeeper.root: "/transactional"
  transactional.zookeeper.servers: null
  transactional.zookeeper.port: null
   
  ### supervisor.* configs are for node supervisors
  # Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication
  supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703
  supervisor.childopts: "-Xmx256m"
  #how long supervisor will wait to ensure that a worker process is started
  supervisor.worker.start.timeout.secs: 120
  #how long between heartbeats until supervisor considers that worker dead and tries to restart it
  supervisor.worker.timeout.secs: 30
  #how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary
  supervisor.monitor.frequency.secs: 3
  #how frequently the supervisor heartbeats to the cluster state (for nimbus)
  supervisor.heartbeat.frequency.secs: 5
  supervisor.enable: true
   
  ### worker.* configs are for task workers
  worker.childopts: "-Xmx768m"
  worker.heartbeat.frequency.secs: 1
   
  # control how many worker receiver threads we need per worker
  topology.worker.receiver.thread.count: 1
   
  task.heartbeat.frequency.secs: 3
  task.refresh.poll.secs: 10
   
  zmq.threads: 1
  zmq.linger.millis: 5000
  zmq.hwm: 0
   
   
  storm.messaging.netty.server_worker_threads: 1
  storm.messaging.netty.client_worker_threads: 1
  storm.messaging.netty.buffer_size: 5242880 #5MB buffer
  # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker.
  storm.messaging.netty.max_retries: 300
  storm.messaging.netty.max_wait_ms: 1000
  storm.messaging.netty.min_wait_ms: 100
   
  # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
  storm.messaging.netty.transfer.batch.size: 262144
   
  # We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
  storm.messaging.netty.flush.check.interval.ms: 10
   
  ### topology.* configs are for specific executing storms
  topology.enable.message.timeouts: true
  topology.debug: false
  topology.workers: 1
  topology.acker.executors: null
  topology.tasks: null
  # maximum amount of time a message has to complete before it's considered failed
  topology.message.timeout.secs: 30
  topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer"
  topology.skip.missing.kryo.registrations: false
  topology.max.task.parallelism: null
  topology.max.spout.pending: null
  topology.state.synchronization.timeout.secs: 60
  topology.stats.sample.rate: 0.05
  topology.builtin.metrics.bucket.size.secs: 60
  topology.fall.back.on.java.serialization: true
  topology.worker.childopts: null
  topology.executor.receive.buffer.size: 1024 #batched
  topology.executor.send.buffer.size: 1024 #individual messages
  topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets)
  topology.transfer.buffer.size: 1024 # batched
  topology.tick.tuple.freq.secs: null
  topology.worker.shared.thread.pool.size: 4
  topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy"
  topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
  topology.sleep.spout.wait.strategy.time.ms: 1
  topology.error.throttle.interval.secs: 10
  topology.max.error.report.per.interval: 5
  topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
  topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer"
  topology.trident.batch.emit.interval.millis: 500
  topology.classpath: null
  topology.environment: null
   
  dev.zookeeper.path: "/tmp/dev-storm-zookeeper"


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM