配置選項名稱 |
配置選項作用 |
topology.max.task.parallelism |
每個Topology運行時最大的executor數目 |
topology.workers |
每個Topology運行時的worker的默認數目,若在代碼中設置,則此選項值被覆蓋 |
storm.zookeeper.servers |
zookeeper集群的節點列表 |
storm.local.dir |
Storm用於存儲jar包和臨時文件的本地存儲目錄 |
storm.zookeeper.root |
Storm在zookeeper集群中的根目錄,默認是“/” |
ui.port |
Storm集群的UI地址端口號,默認是8080 |
nimbus.host: |
Nimbus節點的host |
supervisor.slots.ports |
Supervisor 節點的worker占位槽,集群中的所有Topology公用這些槽位數,即使提交時設置了較大數值的槽位數,系統也會按照當前集群中實際剩余的槽位數來 進行分配,當所有的槽位數都分配完時,新提交的Topology只能等待,系統會一直監測是否有空余的槽位空出來,如果有,就再次給新提交的 Topology分配 |
supervisor.worker.timeout.secs |
Worker的超時時間,單位為秒,超時后,Storm認為當前worker進程死掉,會重新分配其運行着的task任務 |
drpc.servers |
在使用drpc服務時,drpc server的服務器列表 |
drpc.port |
在使用drpc服務時,drpc server的服務端口 |
本地模式下, 基本並發度控制
conf.setMaxTaskParallelism(5); 本地模式下一個組件能夠運行的最大線程數
builder.setSpout("spout", new RandomSentenceSpout(), 10); 最后的參數parallelism_hint 表示executor的數目,每個作為一個thread在work下工作, 但是如果超過setMaxTaskParallelism定義的上限,則使用setMaxTaskParallelism設置的TOPOLOGY_MAX_TASK_PARALLELISM
builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(2); ,task的數目,默認和executor是1:1 的關系,就是每個task運行在一個物理線程上,
在這里設置的是taskNum為2,executor 是5,表示RandomSentenceSpout創建2次,實際只有兩個2個executor, executor不能超過NumTask
builder.setSpout("spout", new RandomSentenceSpout(), 2).setNumTasks(5);
在這里設置的是taskNum為5,executor 是2, 表示RandomSentenceSpout創建5次,2個executor在兩個物理線程上執行, 每個executor執行1/2的任務
這么寫感覺意義都不大, 只是個人為了理解storm executor task概念, 在0.8以后,幾個executor有可能是共用一個物理線程,由上面測試能看出。
突然想起這個其實還是有好處的,因為在storm中 TaskNum是靜態的, executor是動態的, 比如tasknum是5,exector是2,這時候是在兩個物理線程執行, 如果我們將executor改成3, 這時會變成在3個物理線程上執行,提高了並發性. 物理線程公式應該Min(executor, tasknum), 這個未在任何文檔上見過,個人的一個推斷.
動態調整參數
# Reconfigure the topology "mytopology" to use 5 worker processes, # the spout "blue-spout" to use 3 executors and # the bolt "yellow-bolt" to use 10 executors. $ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
builder.setBolt("split", new SplitSentence(), 8).setNumTasks(1).shuffleGrouping("spout"); 這里和上面一樣,會負載均衡地放入一個線程中運行
conf.setDebug(true); //
conf.setMaxSpoutPending(2); // 這個設置一個spout task上面最多有多少個沒有處理(ack/fail)的tuple,防止tuple隊列過大, 只對可靠任務起作用
conf.setMessageTimeoutSecs(1); // 消息處理延時, 就是消息超過延時后, emit發射源會認為是fail , storm默認是30秒,如果實現的為Irichbolt接口,沒有ack和ack延時都會觸發,這個時間過短的話,如果自定義重發,bolt可能會多處理,tuple在發射過程中, 但是還沒有到達bolt, 但是已經延時了,emit發射源會認為已經失敗了,但是bolt還是收到這個tuple, 所以storm引入了事務拓撲,0.8以后叫trident. 如果實現的為IBaseBolt,則只會在延時情況下觸發, 默認會調用ack,但是這個ack如果有再次發射, 這個ack就會自動錨定了.
根據具體業務需求選擇合適的Bolt
conf.setNumAckers(2); // 消息處理的acker數量.默認1,可以根據實際處理情況調大
真實環境
conf.setNumWorkers(5); // 設置工作進程 , 如果不添加端口, 默認會是4個worker進程
需要在storm.yaml下添加端口
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704
每個worker使用一個端口.
在uI窗口是spout bolt acker幾個的累加.
storm.yaml參數參考
配置項 | 配置說明 |
storm.zookeeper.servers | ZooKeeper服務器列表 |
storm.zookeeper.port | ZooKeeper連接端口 |
storm.local.dir | storm使用的本地文件系統目錄(必須存在並且storm進程可讀寫) |
storm.cluster.mode | Storm集群運行模式([distributed|local]) |
storm.local.mode.zmq | Local模式下是否使用ZeroMQ作消息系統,如果設置為false則使用java消息系統。默認為false |
storm.zookeeper.root | ZooKeeper中Storm的根目錄位置 |
storm.zookeeper.session.timeout | 客戶端連接ZooKeeper超時時間 |
storm.id | 運行中拓撲的id,由storm name和一個唯一隨機數組成。 |
nimbus.host | nimbus服務器地址 |
nimbus.thrift.port | nimbus的thrift監聽端口 |
nimbus.childopts | 通過storm-deploy項目部署時指定給nimbus進程的jvm選項 |
nimbus.task.timeout.secs | 心跳超時時間,超時后nimbus會認為task死掉並重分配給另一個地址。 |
nimbus.monitor.freq.secs | nimbus檢查心跳和重分配任務的時間間隔.注意如果是機器宕掉nimbus會立即接管並處理。 |
nimbus.supervisor.timeout.secs | supervisor的心跳超時時間,一旦超過nimbus會認為該supervisor已死並停止為它分發新任務. |
nimbus.task.launch.secs | task啟動時的一個特殊超時設置.在啟動后第一次心跳前會使用該值來臨時替代nimbus.task.timeout.secs. |
nimbus.reassign | 當發現task失敗時nimbus是否重新分配執行。默認為真,不建議修改。 |
nimbus.file.copy.expiration.secs | nimbus判斷上傳/下載鏈接的超時時間,當空閑時間超過該設定時nimbus會認為鏈接死掉並主動斷開 |
ui.port | Storm UI的服務端口 |
drpc.servers | DRPC服務器列表,以便DRPCSpout知道和誰通訊 |
drpc.port | Storm DRPC的服務端口 |
supervisor.slots.ports | supervisor上能夠運行workers的端口列表.每個worker占用一個端口,且每個端口只運行一個worker.通過這項配置可以調整每台機器上運行的worker數.(調整slot數/每機) |
supervisor.childopts | 在storm-deploy項目中使用,用來配置supervisor守護進程的jvm選項 |
supervisor.worker.timeout.secs | supervisor中的worker心跳超時時間,一旦超時supervisor會嘗試重啟worker進程. |
supervisor.worker.start.timeout.secs | supervisor初始啟動時,worker的心跳超時時間,當超過該時間supervisor會嘗試重啟worker。因為JVM初始啟動和配置會帶來的額外消耗,從而使得第一次心跳會超過supervisor.worker.timeout.secs的設定 |
supervisor.enable | supervisor是否應當運行分配給他的workers.默認為true,該選項用來進行Storm的單元測試,一般不應修改. |
supervisor.heartbeat.frequency.secs | supervisor心跳發送頻率(多久發送一次) |
supervisor.monitor.frequency.secs | supervisor檢查worker心跳的頻率 |
worker.childopts | supervisor啟動worker時使用的jvm選項.所有的”%ID%”字串會被替換為對應worker的標識符 |
worker.heartbeat.frequency.secs | worker的心跳發送時間間隔 |
task.heartbeat.frequency.secs | task匯報狀態心跳時間間隔 |
task.refresh.poll.secs | task與其他tasks之間鏈接同步的頻率.(如果task被重分配,其他tasks向它發送消息需要刷新連接).一般來講,重分配發生時其他tasks會理解得到通知。該配置僅僅為了防止未通知的情況。 |
topology.debug | 如果設置成true,Storm將記錄發射的每條信息。 |
topology.optimize | master是否在合適時機通過在單個線程內運行多個task以達到優化topologies的目的. |
topology.workers | 執行該topology集群中應當啟動的進程數量.每個進程內部將以線程方式執行一定數目的tasks.topology的組件結合該參數和並行度提示來優化性能 |
topology.ackers | topology中啟動的acker任務數.Acker保存由spout發送的tuples的記錄,並探測tuple何時被完全處理.當Acker探測到tuple被處理完畢時會向spout發送確認信息.通常應當根據topology的吞吐量來確定acker的數目,但一般不需要太多.當設置為0時,相當於禁用了消息可靠性,storm會在spout發送tuples后立即進行確認. |
topology.message.timeout.secs | topology中spout發送消息的最大處理超時時間.如果一條消息在該時間窗口內未被成功ack,Storm會告知spout這條消息失敗。而部分spout實現了失敗消息重播功能。 |
topology.kryo.register | 注冊到Kryo(Storm底層的序列化框架)的序列化方案列表.序列化方案可以是一個類名,或者是com.esotericsoftware.kryo.Serializer的實現. |
topology.skip.missing.kryo.registrations | Storm是否應該跳過它不能識別的kryo序列化方案.如果設置為否task可能會裝載失敗或者在運行時拋出錯誤. |
topology.max.task.parallelism | 在一個topology中能夠允許的最大組件並行度.該項配置主要用在本地模式中測試線程數限制. |
topology.max.spout.pending | 一個spout task中處於pending狀態的最大的tuples數量.該配置應用於單個task,而不是整個spouts或topology. |
topology.state.synchronization.timeout.secs | 組件同步狀態源的最大超時時間(保留選項,暫未使用) |
topology.stats.sample.rate | 用來產生task統計信息的tuples抽樣百分比 |
topology.fall.back.on.java.serialization | topology中是否使用java的序列化方案 |
zmq.threads | 每個worker進程內zeromq通訊用到的線程數 |
zmq.linger.millis | 當連接關閉時,鏈接嘗試重新發送消息到目標主機的持續時長.這是一個不常用的高級選項,基本上可以忽略. |
java.library.path | JVM啟動(如Nimbus,Supervisor和workers)時的java.library.path設置.該選項告訴JVM在哪些路徑下定位本地庫. |
storm內默認參數
java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib" | |
### storm.* configs are general configurations | |
# the local dir is where jars are kept | |
storm.local.dir: "storm-local" | |
storm.zookeeper.servers: | |
- "localhost" | |
storm.zookeeper.port: 2181 | |
storm.zookeeper.root: "/storm" | |
storm.zookeeper.session.timeout: 20000 | |
storm.zookeeper.connection.timeout: 15000 | |
storm.zookeeper.retry.times: 5 | |
storm.zookeeper.retry.interval: 1000 | |
storm.zookeeper.retry.intervalceiling.millis: 30000 | |
storm.cluster.mode: "distributed" # can be distributed or local | |
storm.local.mode.zmq: false | |
storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin" | |
storm.messaging.transport: "backtype.storm.messaging.netty.Context" | |
storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate" | |
### nimbus.* configs are for the master | |
nimbus.host: "localhost" | |
nimbus.thrift.port: 6627 | |
nimbus.thrift.max_buffer_size: 1048576 | |
nimbus.childopts: "-Xmx1024m" | |
nimbus.task.timeout.secs: 30 | |
nimbus.supervisor.timeout.secs: 60 | |
nimbus.monitor.freq.secs: 10 | |
nimbus.cleanup.inbox.freq.secs: 600 | |
nimbus.inbox.jar.expiration.secs: 3600 | |
nimbus.task.launch.secs: 120 | |
nimbus.reassign: true | |
nimbus.file.copy.expiration.secs: 600 | |
nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator" | |
### ui.* configs are for the master | |
ui.port: 8080 | |
ui.childopts: "-Xmx768m" | |
logviewer.port: 8000 | |
logviewer.childopts: "-Xmx128m" | |
logviewer.appender.name: "A1" | |
drpc.port: 3772 | |
drpc.worker.threads: 64 | |
drpc.queue.size: 128 | |
drpc.invocations.port: 3773 | |
drpc.request.timeout.secs: 600 | |
drpc.childopts: "-Xmx768m" | |
transactional.zookeeper.root: "/transactional" | |
transactional.zookeeper.servers: null | |
transactional.zookeeper.port: null | |
### supervisor.* configs are for node supervisors | |
# Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication | |
supervisor.slots.ports: | |
- 6700 | |
- 6701 | |
- 6702 | |
- 6703 | |
supervisor.childopts: "-Xmx256m" | |
#how long supervisor will wait to ensure that a worker process is started | |
supervisor.worker.start.timeout.secs: 120 | |
#how long between heartbeats until supervisor considers that worker dead and tries to restart it | |
supervisor.worker.timeout.secs: 30 | |
#how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary | |
supervisor.monitor.frequency.secs: 3 | |
#how frequently the supervisor heartbeats to the cluster state (for nimbus) | |
supervisor.heartbeat.frequency.secs: 5 | |
supervisor.enable: true | |
### worker.* configs are for task workers | |
worker.childopts: "-Xmx768m" | |
worker.heartbeat.frequency.secs: 1 | |
# control how many worker receiver threads we need per worker | |
topology.worker.receiver.thread.count: 1 | |
task.heartbeat.frequency.secs: 3 | |
task.refresh.poll.secs: 10 | |
zmq.threads: 1 | |
zmq.linger.millis: 5000 | |
zmq.hwm: 0 | |
storm.messaging.netty.server_worker_threads: 1 | |
storm.messaging.netty.client_worker_threads: 1 | |
storm.messaging.netty.buffer_size: 5242880 #5MB buffer | |
# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. | |
storm.messaging.netty.max_retries: 300 | |
storm.messaging.netty.max_wait_ms: 1000 | |
storm.messaging.netty.min_wait_ms: 100 | |
# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. | |
storm.messaging.netty.transfer.batch.size: 262144 | |
# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is. | |
storm.messaging.netty.flush.check.interval.ms: 10 | |
### topology.* configs are for specific executing storms | |
topology.enable.message.timeouts: true | |
topology.debug: false | |
topology.workers: 1 | |
topology.acker.executors: null | |
topology.tasks: null | |
# maximum amount of time a message has to complete before it's considered failed | |
topology.message.timeout.secs: 30 | |
topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer" | |
topology.skip.missing.kryo.registrations: false | |
topology.max.task.parallelism: null | |
topology.max.spout.pending: null | |
topology.state.synchronization.timeout.secs: 60 | |
topology.stats.sample.rate: 0.05 | |
topology.builtin.metrics.bucket.size.secs: 60 | |
topology.fall.back.on.java.serialization: true | |
topology.worker.childopts: null | |
topology.executor.receive.buffer.size: 1024 #batched | |
topology.executor.send.buffer.size: 1024 #individual messages | |
topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets) | |
topology.transfer.buffer.size: 1024 # batched | |
topology.tick.tuple.freq.secs: null | |
topology.worker.shared.thread.pool.size: 4 | |
topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy" | |
topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy" | |
topology.sleep.spout.wait.strategy.time.ms: 1 | |
topology.error.throttle.interval.secs: 10 | |
topology.max.error.report.per.interval: 5 | |
topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" | |
topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer" | |
topology.trident.batch.emit.interval.millis: 500 | |
topology.classpath: null | |
topology.environment: null | |
dev.zookeeper.path: "/tmp/dev-storm-zookeeper" |