Storm框架:Storm整合springboot


我們知道Storm本身是一個獨立運行的分布式流式數據處理框架,Springboot也是一個獨立運行的web框架。那么如何在Strom框架中集成Springboot使得我們能夠在Storm開發中運用Spring的Ioc容器及其他如Spring Jpa等功能呢?我們先來了解以下概念:

  • Storm主要的三個Component:Topology、Spout、Bolt。Topology作為主進程控制着spout、bolt線程的運行,他們相當於獨立運行的容器分布於storm集群中的各個機器節點。
  • SpringApplication:是配置Spring應用上下文的起點。通過調用SpringApplication.run()方法它將創建ApplicationContext實例,這是我們能夠使用Ioc容器的主要BeanFactory。之后Spring將會加載所有單例模式的beans,並啟動后台運行的CommandLineRunner beans等。
  • ApplicationContextAware:這是我們能夠在普通Java類中調用Spring容器里的beans的關鍵接口。

實現原理

Storm框架中的每個Spout和Bolt都相當於獨立的應用,Strom在啟動spout和bolt時提供了一個open方法(spout)和prepare方法(bolt)。我們可以把初始化Spring應用的操作放在這里,這樣可以保證每個spout/bolt應用在后續執行過程中都能獲取到Spring的ApplicationContext,有了ApplicationContext實例對象,Spring的所有功能就都能用上了。

  • Spout.open方法實現
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    //啟動Springboot應用
    SpringStormApplication.run();

    this.map = map;
    this.topologyContext = topologyContext;
    this.spoutOutputCollector = spoutOutputCollector;
}
  • Bolt.prepare方法實現
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    //啟動Springboot應用
    SpringStormApplication.run();

    this.map = map;
    this.topologyContext = topologyContext;
    this.outputCollector = outputCollector;
}
  • SpringStormApplication啟動類
@SpringBootApplication
@ComponentScan(value = "com.xxx.storm")
public class SpringStormApplication {
    /**
     * 非工程啟動入口,所以不用main方法
     * 加上synchronized的作用是由於storm在啟動多個bolt線程實例時,如果Springboot用到Apollo分布式配置,會報ConcurrentModificationException錯誤
     * 詳見:https://github.com/ctripcorp/apollo/issues/1658
     * @param args
     */
    public synchronized static void run(String ...args) {
        SpringApplication app = new SpringApplication(SpringStormApplication.class);
        //我們並不需要web servlet功能,所以設置為WebApplicationType.NONE
        app.setWebApplicationType(WebApplicationType.NONE);
        //忽略掉banner輸出
        app.setBannerMode(Banner.Mode.OFF);
        //忽略Spring啟動信息日志
        app.setLogStartupInfo(false);
        app.run(args);
    }
}

與我們傳統的Springboot應用啟動入口稍微有點區別,主要禁用了web功能,看下正常的啟動方式:

@SpringBootApplication
@ComponentScan(value = "com.xxx.web")
public class PlatformApplication {
	public static void main(String[] args) {
		SpringApplication.run(PlatformApplication.class, args);
	}
}
  • 在spout/bolt中調用了SpringStormApplication.run方法后,我們還需要能夠拿到ApplicationContext容器對象,這時候我們還需要實現ApplicationContextAware接口,寫個工具類BeanUtils:
@Component
public class BeanUtils implements ApplicationContextAware {
    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (BeanUtils.applicationContext == null) {
            BeanUtils.applicationContext = applicationContext;
        }
    }
    
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
    
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }
    
    public static <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }
    
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }
}

通過@Component注解使得Spring在啟動時能夠掃描到該bean,因為BeanUtils實現了ApplicationContextAware接口,Spring會在啟動成功時自動調用BeanUtils.setApplicationContext方法,將ApplicationContext對象保存到工具類的靜態變量中,之后我們就可以使用BeanUtils.getBean()去獲取Spring容器中的bean了。

寫個簡單例子

  • 在FilterBolt的execute方法中獲取Spring bean
@Override
public void execute(Tuple tuple) {
    FilterService filterService = (FilterService) BeanUtils.getBean("filterService");
    filterService.deleteAll();
}
  • 定義FilterService類,這時候我們就可以使用Spring的相關注解,自動注入,Spring Jpa等功能了。
@Service("filterService")
public class FilterService {
    @Autowired
    UserRepository userRepository;
    
    public void deleteAll() {
        userRepository.deleteAll();
    }
}

將storm應用作為Springboot工程的一個子模塊

工程主目錄的pom文件還是springboot相關的依賴,在storm子模塊中引入storm依賴,這時候啟動Strom的topology應用會有一個日志包依賴沖突。

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

我們需要在storm子模塊的pom文件中重寫org.springframework.boot:spring-boot-starter包依賴,將Springboot的相關日志包排除掉,如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j2</artifactId>
        </exclusion>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic2</artifactId>
        </exclusion>
    </exclusions>
</dependency>

使用maven-shade-plugin打包注意的問題

因為springboot有自己的打包插件,如果使用maven-shade-plugin需要將spring-boot-maven-plugin作為依賴引入,另外spring-boot-starter-parent需要使用dependencyManagement引入。

parent部分替換如下:

<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.1.0.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
</dependencyManagement>

build如下:

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.xxx.storm.pointer.XdPointerTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <dependencies>
                    <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-maven-plugin</artifactId>
                        <version>2.1.0.RELEASE</version>
                    </dependency>
                </dependencies>
                <configuration>
                    <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                    <artifactSet>
                        <excludes>
                            <exclude>com.xxx.storm:xxx-storm</exclude>
                            <exclude>org.slf4j:slf4j-api</exclude>
                            <exclude>javax.mail:javax.mail-api</exclude>
                            <exclude>org.apache.storm:storm-core</exclude>
                            <exclude>org.apache.storm:storm-kafka</exclude>
                            <exclude>org.apache.logging.log4j:log4j-slf4j-impl</exclude>
                        </excludes>
                    </artifactSet>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer
                                        implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                    <resource>META-INF/spring.factories</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.xxx.storm.pointer.XdPointerTopology</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

注意:META-INF/spring.* 文件不需要我們創建,這是springboot包內部的文件,這里只是需要顯示的引入進來。

OK,完美整合!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM