storm 中的Python bolt的注意事項


Storm可支持多種語言,其中就有python .

首先需要創建一個類,

    public static class BasieCalculateBolt extends ShellBolt implements
            IRichBolt {

        public BasieCalculateBolt() {
            super("python", "bolt_base_calculate.py");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
引用的bolt_base_calculate.py放置的目錄必須為本項目的resources目錄,本項目的py文件放置於mutilang/resources目錄下,則要在maven的pom.xml文件中將其設置為resource目錄。
    <build>
        <sourceDirectory>src/jvm</sourceDirectory>
        <testSourceDirectory>test/jvm</testSourceDirectory>
        <resources>
            <resource>
                <directory>${basedir}/multilang</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

一個最簡單的Python bolt如下所示:

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

在resources目錄下還需放置在官網上下載的最新storm.py文件,https://github.com/apache/storm/blob/master/bin/storm.py。

python的bolt中不可有print語句,因為storm中Python bolt和其他bolt之間數據的傳遞的便是通過監控console輸出的數據。但是我們在Python中需要打印一些log來查看程序的運行,此時可使用log,即創建一個log.py

    import logging
    import logging.config
    import os
    
    logging.config.fileConfig('logging.conf')
    # create logger 下面是你工程的名稱
    logger = logging.getLogger('calculateEngine') 

logging.conf的配置可設置為

    [loggers]
    keys=root,calculateEngine
    
    [handlers]
    keys=fileHandler,consoleHandler
    
    [formatters]
    keys=simpleFormatter
    
    [logger_root]
    level=DEBUG
    handlers=consoleHandler
    
    [logger_calculateEngine]
    level=INFO
    handlers=fileHandler
    qualname=calculateEngine
    propagate=0
    
    [handler_consoleHandler]
    class=StreamHandler
    level=WARN
    formatter=simpleFormatter
    args=(sys.stdout,)
    
    [handler_fileHandler]
    class=FileHandler
    level=DEBUG
    maxBytes=10485760
    backupCount=20
    encoding=utf8
    formatter=simpleFormatter
    args=(os.path.join(os.path.dirname(__file__),'asien_calculate.log'),'a')
    
    [formatter_simpleFormatter]
    format=%(asctime)s - %(name)s - %(levelname)s - %(message)s

在python中的使用,只需from log import logger ,log.info("")即可


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM