Spring Boot整合Flink


軟件版本:Spring Boot 2.1.6+Flink1.6.1+JDK1.8

程序主體:

@SpringBootApplication
public class HadesTmsApplication implements CommandLineRunner {


public static void main(String[] args) {
SpringApplication application = new SpringApplication(HadesTmsApplication.class);
application.setBannerMode(Banner.Mode.OFF);
application.run(args);
}

@Override
public void run(String... args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer010 kafkaConsumer = new FlinkKafkaConsumer010<>("topic-name"), new SimpleStringSchema(), getProperties());
DataStream<String> dataStream = env.addSource(kafkaConsumer);
// 此處省略處理邏輯
dataStream.addSink(new MySink());


}

private Properties getProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrap_servers);
properties.setProperty("zookeeper.connect", zookeeper_connect);
properties.setProperty("group.id", group_id);
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
}
說明一下:因為是非web項目,所以實現CommandLineRunner接口,重寫run方法。在里面編寫流處理邏輯。

如果在MySink中需要使用spring容器中的類,而MySink是一個普通的類,那么是無法訪問到的。會引發空指針異常。可能有人想到了ApplicationContextAware這個接口,實現這個接口獲取ApplicationContext,也即是:

@Component
public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
private static final long serialVersionUID = -6454872090519042646L;
private static ApplicationContext applicationContext = null;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (ApplicationContextUtil.applicationContext == null) {
ApplicationContextUtil.applicationContext = applicationContext;
}
}

public static ApplicationContext getApplicationContext() {
return applicationContext;
}

//通過name獲取 Bean.
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}

//通過class獲取Bean.
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}

//通過name,以及Clazz返回指定的Bean
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
這種做法實際上在flink流處理中也是不可行的,在我之前的flink文章中 Flink讀寫系列之-讀mysql並寫入mysql 其中讀和寫階段有一個open方法,這個方法專門用於進行初始化的,那么我們可以在這里進行spring bean的初始化。那么MySink改造后即為:

@EnableAutoConfiguration
@MapperScan(basePackages = {"com.xxx.bigdata.xxx.mapper"})
public class SimpleSink extends RichSinkFunction<String> {


TeacherInfoMapper teacherInfoMapper;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
SpringApplication application = new SpringApplication(SimpleSink.class);
application.setBannerMode(Banner.Mode.OFF);
ApplicationContext context = application.run(new String[]{});
teacherInfoMapper = context.getBean(TeacherInfoMapper.class);
}

@Override
public void close() throws Exception {
super.close();
}

@Override
public void invoke(String value, Context context) throws Exception {
List<TeacherInfo> teacherInfoList = teacherInfoMapper.selectByPage(0, 100);
teacherInfoList.stream().forEach(teacherInfo -> System.out.println("teacherinfo:" + teacherInfo.getTeacherId() + "," + teacherInfo.getTimeBit() + "," + teacherInfo.getWeek()));
}
}
在invoke中就可以訪問spring容器中的Mapper方法了。

 pom如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xxx.bigdata</groupId>
<artifactId>flink-project</artifactId>
<version>1.0.0</version>
<name>flink-project</name>
<packaging>jar</packaging>
<description>My project for Spring Boot</description>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<flink.version>1.6.1</flink.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.cloudera</groupId>
<artifactId>ImpalaJDBC41</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
<includes>
<include>application.properties</include>
<include>application-${package.environment}.properties</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<skip>true</skip>
<mainClass>com.xxx.bigdata.xxx.Application</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<!--mybatis plugin to generate mapping file and class-->
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.5</version>
<configuration>
<configurationFile>${basedir}/src/main/resources/generatorConfig.xml</configurationFile>
<overwrite>true</overwrite>
<verbose>true</verbose>
</configuration>
<dependencies>
<dependency>
<groupId>com.cloudera</groupId>
<artifactId>ImpalaJDBC41</artifactId>
<version>2.6.4</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>

<profiles>
<!--開發環境-->
<profile>
<id>dev</id>
<properties>
<package.environment>dev</package.environment>
</properties>
<!--默認環境-->
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
<!--預發布環境-->
<profile>
<id>pre</id>
<properties>
<package.environment>pre</package.environment>
</properties>
</profile>
<!--生產環境-->
<profile>
<id>pro</id>
<properties>
<package.environment>pro</package.environment>
</properties>
</profile>
</profiles>

</project>
項目打包使用了默認的spring boot插件,配置了skip為true,如果不配置此項,打包后會多一個BOOT-INF目錄,運行時會引起ClassNotFoundException等各種異常,比如KafkaStreming問題,甚至需要反轉flink的類加載機制,由child-first變為parent-first(修改flink配置文件)等等。

遇到的問題:

1. java.lang.NoSuchMethodError: com.google.gson.GsonBuilder.setLenient()Lcom/google/gson/GsonBuilder
 GsonBuilder類來自gson-xxx.jar包,而我在自己的項目中執行mvn dependency:tree並沒有發現依賴這個包。莫非在flink運行時會使用自己lib庫下的gson包,轉而去flink的lib庫下,發現flink-dist_2.11-1.6.1.jar里包含了gson-xxx包,但是打開這個包一看類中沒有setLenient方法,於是在服務器上建立一個commlib,把gson-2.8.0.jar(包含setLenient方法)放進去,然后使用flink run提交時,指定classpath即可。

  2.日志沖突

Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.slf4j.impl.Log4jLoggerFactory loaded from file:/opt/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.slf4j.impl.Log4jLoggerFactory

排除springboot中的日志即可:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
3.flink run提交作業到yarn上時,如果需要指定classpath,則需要指定到確定的jar包,指定目錄不可行。那么假如所有依賴包已經放置在目錄中,拼接的shell可以這么寫:


lib_classpath="";

for jar in `ls /home/hadoop/lib`
do
jar_suffix=${jar##*.}
if [ "$jar_suffix" = "jar" ]
then
jar_path=" --classpath file:///home/hadoop/lib/$jar "
lib_classpath=${lib_classpath}${jar_path}
else(http://www.my516.com)
echo "the jar file $jar it not legal jar file,skip appendig"
fi
done
拼接后的lib_classpath值如下效果:

--classpath file:///home/hadoop/lib/accessors-smart-1.2.jar --classpath file:///home/hadoop/lib/akka-actor_2.11-2.4.20.jar

---------------------


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM