流式計算(三)-Flink Stream 篇一


原創文章,謝絕任何形式轉載,否則追究法律責任!

​流的世界,有點亂,群雄逐鹿,流實在太多,看完這個馬上又冒出一個,也不知哪個才是真正的牛,據說Flink是位重量級選手,能流計算,還能批處理,

和其他伙伴關系也融洽的很,與HDFS/File/SQL/MQ往來都不在話下,今天我們就來實戰一把。

 

環境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/RHEL8.0/VMWare15.5/Springboot2.2.1.RELEASE/Mysql8.0.11/Kafka2.3.1/ShadowJar5.0/Flink1.9.1/Zookeeper3.5.5

難度:新手--戰士--老兵--大師

目標:

  1. Zookeeper集群部署
  2. Kafka集群部署及AdminClient使用
  3. Flink Standalone集群部署
  4. Flink流計算程序設計

說明:

為了遇見各種問題,同時保持時效性,我盡量使用最新的軟件版本。同時代碼中大量使用注釋,並盡量使用非鏈式寫法,層次清晰。

代碼地址:其中的day24,https://github.com/xiexiaobiao/dubbo-project.git

 作者原創文章,禁止任何轉載,否則追究法律責任。

第一部分 部署圖

1.1 整體部署圖,注意本次我沒使用Flink的HA模式,故只有一個Master節點:

本次實現的整體邏輯:Java應用產生流數據,用kafka進行傳輸,在Flink中進行流計算,最后存儲到文件和Mysql中:

 

第二部分 zk集群

2.1 Zookeeper(簡寫ZK)集群安裝:我分別部署ZK到192.168.1.221,192.168.1.222,192.168.1.223 linux虛機上,這里只說一台,其他類似,下載apache-zookeeper-3.5.5-bin.tar.gz,解壓到目錄/usr/zookeeper3.5/apache-zookeeper-3.5.5-bin/修改ZK配置文件:

[root@localhost ~]# vim /usr/zookeeper3.5/apache-zookeeper-3.5.5-bin/conf/zoo.cfg
dataDir=/usr/zookeeper3.5/data
dataLogDir=/usr/zookeeper3.5/logs
#added for ZK cluster
server.1=192.168.1.221:2888:3888
server.2=192.168.1.222:2888:3888
server.3=192.168.1.223:2888:3888

zk的配置說明:(LF指Leader/Follower)

打開firewall端口:

[root@localhost zookeeper3.5]# firewall-cmd --permanent --add-port=2181/tcp
[root@localhost zookeeper3.5]# firewall-cmd --permanent --add-port=2888/tcp
[root@localhost zookeeper3.5]# firewall-cmd --permanent --add-port=3888/tcp
[root@localhost zookeeper3.5]# firewall-cmd –reload

創建myid文件,也可使用touch或vim創建:

[root@localhost ~]# echo "2" > /usr/zookeeper3.5/data/myid

啟動:

[root@localhost ~]# sh /usr/zookeeper3.5/apache-zookeeper-3.5.5-bin/bin/zkServer.sh start

查看節點角色:

[root@localhost ~]# sh $ZOOKEEPER_HOME/bin/zkServer.sh status

說明:

使用zkCli測試節點間連接,我在192.168.1.221上訪問192.168.1.222:

 

第三部分 Kafka集群

3.1 Kafka集群,參考上一篇,這里以192.168.1.223為例,其他類似,修改/usr/kafka2.3/kafka_2.12-2.3.1/config/server.properties:

broker.id=2 #集群內此編號必須唯一
listeners=PLAINTEXT://192.168.1.223:9092 #Socket監聽地址,沒寫hostname/IP即為listen所有IP
log.dirs=/usr/kafka2.3/kafka-logs  #kafka保存log目錄
zookeeper.connect=192.168.1.221:2181,192.168.1.222:2181,192.168.1.223:2181 #ZK注冊地址

注意開啟kafka的訪問端口,否則APP無法訪問:

[root@localhost ~]# firewall-cmd --permanent --add-port=9092/tcp
[root@localhost ~]# firewall-cmd –reload

再分別啟動kafka:
[root@server223 kafka_2.12-2.3.1]# ./bin/kafka-server-start.sh config/server.properties

第四部分 Flink集群

4.1 Flink基礎知識,因篇幅原因,略!

4.2 Flink Standalone集群(一個master多個worker)部署:必須先安裝SSH,使得各集群節點機器間無密碼進行SSH訪問,並必須使用相同的flink安裝目錄結構,因master節點通過SSH使用腳本控制worker節點。Flink集群還有HA模式,即多個Master節點和多個worker節點,Master節點中一個為leader,其他Master節點為standby,故障時standby就轉換為leader角色。

4.3 SSH開通:

我這里192.168.1.222為master,192.168.1.221/223為worker, 222到223的SSH,222到221的SSH類似,如linux上沒有SSH,需先安裝該service。以下為222到221的SSH開通,在222上:

[root@localhost ~]# ssh-keygen
[root@localhost ~]# ssh-copy-id 192.168.1.221
[root@localhost ~]# vim /etc/ssh/sshd_config
PasswordAuthentication no
[root@localhost ~]# systemctl restart sshd
[root@localhost ~]# ssh 192.168.1.221
[root@localhost ~]# exit
logout
Connection to 192.168.1.221 closed.

 

4.4 Flink安裝:我下載的是編譯后版本:flink-1.9.1-bin-scala_2.12.tgz,解壓后在/usr/flink1.9/flink-1.9.1目錄, 集群配置:

[root@localhost flink-1.9.1]# vim conf/flink-conf.yaml
#jobmanager.rpc.address key to point to your master node
jobmanager.rpc.address: 192.168.1.222
#The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
conf/flink-conf.yaml  taskmanager.numberOfTaskSlots: 2
#java_home
env.java.home: /usr/jdk11/jdk-11.0.4/

 

worker節點配置:

[root@localhost flink-1.9.1]# vim conf/slaves
#all nodes in your cluster which shall be used as worker nodes
192.168.1.221
192.168.1.223
 

復制flink文件目錄到各worker集群節點,這樣flink目錄結構確定一致:

[root@localhost flink-1.9.1]# scp -r /usr/flink1.9  root@192.168.1.223:/usr/flink1.9
 

4.5 啟動集群,這里只需在master節點上操作,flink會通過SSH自動啟動worker上的taskmanager:

[root@localhost flink-1.9.1]# ./bin/start-cluster.sh

顯示如上因我設置了hostname分別為192.168.1.221 -> server221/ 192.168.1.222 -> server222/ 192.168.1.223 -> server223,可以看到221/223的worker節點被動啟動了。

4.6 關閉集群:

[root@localhost flink-1.9.1]# ./bin/stop-cluster.sh

4.7 啟動或關閉另一個jobmanager實例:

[root@server222 flink-1.9.1]# ./bin/jobmanager.sh stop-all

[root@server222 flink-1.9.1]# ./bin/jobmanager.sh start 192.168.1.222:8081

4.8 Web訪問,先在222上開通端口號:

[root@localhost ~]# firewall-cmd --permanent --add-port=8081/tcp
[root@localhost ~]# firewall-cmd –reload

Windows上訪問web UI:http://192.168.1.222:8081/

4.9 寫flink處理邏輯:

建立Gradle項目,模擬車流處理,結構如下:

 
        

依賴,再次建議按需逐步加入,了解各依賴的作用:

    flinkShadowJar group: 'org.apache.flink', name: 'flink-core', version: '1.9.1'
    flinkShadowJar group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.1'
    flinkShadowJar group: 'org.apache.flink', name: 'flink-clients_2.12', version: '1.9.1'
    flinkShadowJar group: 'org.apache.flink', name: 'flink-java', version: '1.9.1'
    flinkShadowJar group: 'org.projectlombok', name: 'lombok', version: '1.18.10'
    flinkShadowJar group: 'org.apache.flink', name: 'flink-connector-kafka_2.12', version: '1.9.1'
    flinkShadowJar group: 'org.apache.kafka', name: 'kafka-streams', version: '2.3.1'
    flinkShadowJar group: 'com.alibaba', name: 'fastjson', version: '1.2.62'
    flinkShadowJar group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.3.3.RELEASE'
    flinkShadowJar group: 'org.springframework.boot', name: 'spring-boot-starter', version: '2.2.1.RELEASE'
    flinkShadowJar group: 'com.alibaba', name: 'druid', version: '1.1.21'
    flinkShadowJar group: 'mysql', name: 'mysql-connector-java', version: '8.0.18'
 

車流當然得有車,先建個POJO類com.biao.flink.Vehicle,

 
        
@Data
@Getter
@Setter
publicclass Vehicle {
    // 類型:car/truck/suv/pickup/other
    private String type;
    // 車牌號:隨機6位數字
    private Integer plate;
    // yellow/red/white/black
    private String color;
    // 車重: 1.5-2.5
    private Float weight;
    ...
}
 
        

com.biao.flink.Producer用於產生車流:

@Component
//@Slf4j
publicclass Producer {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    private Logger log = LoggerFactory.getLogger(Producer.class);
    private String time = LocalDateTime.now().toString();

    public void send() {
        log.info("send elements to kafka started at :{}",time);
        Random random = new Random();
        // kafkaTemplate.sendDefault() 為異步方法,返回 ListenerFuture<T>,
        // 如果運行到此前沒有顯示創建topic:vehicle,send方法會缺省創建復制因子為1的topic
        this.getVehicles()
                .forEach(item -> kafkaTemplate.send("vehicle",String.valueOf(random.nextInt(10)), JSON.toJSONString(item)));
    }

    // 使用隨機方法產生車輛流
    public Stream<Vehicle> getVehicles(){
        List<Vehicle> vehicleList = new ArrayList<>(75);
        Random random = new Random();
        List<String> colors = Arrays.asList("red","yellow","black","white");
        List<String> types = Arrays.asList("car","truck","suv","pickup","other");
        List<Float> weights = Arrays.asList(1.0f,1.5f,2.0f,2.5f);
        // 使用Random生成IntStream流
        IntStream intStream1 = random.ints(25,100000,999999);
        intStream1.limit(25)
                .forEach(num -> {vehicleList.add(new Vehicle(types.get(random.nextInt(5)),num,
                        colors.get(random.nextInt(4)),weights.get(random.nextInt(4))));});
        // 使用 IntStream靜態方法生成流
        IntStream intStream2 = IntStream.rangeClosed(100000,999999);
        intStream2.limit(25)
                .forEach(num -> {vehicleList.add(new Vehicle(types.get(random.nextInt(5)),num,
                        colors.get(random.nextInt(4)),weights.get(random.nextInt(4))));});
        // 使用Stream靜態迭代器方法生成流
        Stream<Integer> intStream3 = Stream.iterate(100000, n->n+3);
        intStream3.limit(25)
                .forEach( t -> {vehicleList.add(new Vehicle(types.get(random.nextInt(5)),t,
                        colors.get(random.nextInt(4)),weights.get(random.nextInt(4))));});

        // 用於輸出測試
        // vehicleList.stream().forEach(System.out::println);
        return vehicleList.stream();
    }
}

以上代碼核心點:1.getVehicles生產車流,里面使用了3種模式生成java stream:Random類生成的無限IntStream;IntStream.rangeClosed靜態方法生成范圍流;Stream.iterate靜態迭代器方法生成無限流。再結合隨機Vehicle屬性,最終生成Stream(Vehicle)流。2.Send方法中使用KafkaTemplate向Kafka集群發送流數據,但注意轉為了String流。這里我也出個考題:String.valueof(Object) 和 Object.toString(),有何不同呢?檢驗下功力幾層。

com.biao.flink.KafkaConfig配置類:

@Configuration
publicclass KafkaConfig {

    // 通過bean注入可以直接創建一個topic並指定topic的屬性,也可使用AdminClient來創建
    @Bean
    NewTopic topic(){
        // NewTopic(String name, int numPartitions, short replicationFactor)
        returnnew NewTopic("vehicle",3,(short) 3);
    }

    // 使用AdminClient的靜態create方法創建一個管理端,用於管理topic
    @Bean(name = "adminClient")
    AdminClient adminClient(){
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092,192.168.1.222:9092,192.168.1.223:9092");
        return AdminClient.create(properties);
    }
}

以上代碼通過Springboot DI生成kafka的topic和AdminClient,為什么要這么生成topic?因為這種方式可以很好的指定replicationFactor,即partition的復制因子,雖然也可使用AdminClient來創建topic,但不可修改復制因子屬性。另請注意,com.biao.flink.Producer中的send方法也能缺省創建numPartitions和replicationFactor為1的topic,這里就說了3中創建topic的方法了!

com.biao.flink.KafkaMain是Kafka應用入口類,也是記錄發送者:

@SpringBootApplication
publicclass KafkaMain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("Flink Application starting .........");
        ConfigurableApplicationContext context = SpringApplication.run(KafkaMain.class,args);

        // 使用kafka AdminClient示例
        AdminClient adminClient = (AdminClient) context.getBean("adminClient");
        // 批量創建kafka topic
        // adminClient.createTopics(...);
        // 打印輸出所有的topic
        adminClient.listTopics().names().get().forEach(System.out::println);
        // 打印vehicle的topic信息
        System.out.println("info>>>" + adminClient.describeTopics(Collections.singletonList("vehicle")).values());
        // 刪除vehicle2的topic,注意刪除前要關閉所有consumer
        System.out.println("delete>>>" +adminClient.deleteTopics(Collections.singletonList("vehicle2")).values());
        // 修改partitions屬性,僅對新Partitions起作用,原有Partitions狀態不變,且replicationFactor不能修改
        // 以下例子設定原vehicle已有3個Partitions,replicationFactor為3,現增加到4個Partitions
        List<List<Integer>> lists = new ArrayList<>(Collections.emptyList());
        lists.add(Arrays.asList(0,1,2));
        NewPartitions newPartitions = NewPartitions.increaseTo(4,lists);
        Map<String, NewPartitions> map = new HashMap<>(Collections.emptyMap());
        map.put("vehicle",newPartitions);
        adminClient.createPartitions(map);

        // kafka生產者進行發送記錄
        Producer producer = context.getBean(Producer.class);
        producer.send();
    }
}

以上代碼重點是學習AdminClient,它來管理kafka的topic,可以批量創建createTopics、打印輸出listTopics、刪除deleteTopics和修改createPartitions。但修改numPartitions屬性,僅對新Partitions起作用,原有Partitions狀態不變,且replicationFactor不能修改。而且我使用發現createPartitions方法使用晦澀難懂,不推薦使用!

最后是com.biao.flink.FlinkMain ,flink的入口類,也是Flink的流計算邏輯實現:

publicclass FlinkMain {
    public static void main(String[] args) throws Exception {
        // 流式執行上下文,還有非流式的ExecutionEnvironment,缺省為Local模式,
        // 遠程的使用RemoteStreamEnvironment可在遠程Flink集群上運行jar
        // RemoteStreamEnvironment remoteStreamEnv = new RemoteStreamEnvironment(String host, int port, String... jarFiles);
        final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        // StreamsConfig已經預定義了很多參數名稱,運行時console會輸出所有StreamsConfig values
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"Flink Application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092,192.168.1.222:9092,192.168.1.223:9092");
        // kafka流都是byte[],必須有序列化,不同的對象使用不同的序列化器
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        // 構造一個KafkaConsumer,使用kafka topic做源頭
        // FlinkKafkaConsumer(topic,DeserializationSchema,properties),其中的DeserializationSchema可以自定義反序列化模式,但
        // 強烈建議使用通用序列化,自定義序列化遷移和維護困難
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("vehicle",new SimpleStringSchema(),props);
        // flink連接Kafka數據源,並生成流
        // 使用map計算將每個string元素轉換為Vehicle對象,JSON -> Vehicle
        DataStream<Vehicle> stream = environment.addSource(flinkKafkaConsumer)
                .map(str -> JSONObject.parseObject(str,Vehicle.class))
                .setParallelism(2);
        // 保存到文件測試,寫入mysql前可以用來測試數據,本地windows運行和上傳到linux上運行,注意文件路徑寫法不用
        // stream.writeAsText("D:/flinkOutput01", FileSystem.WriteMode.OVERWRITE);
        stream.writeAsText("/usr/flink1.9/fileOutPut01", FileSystem.WriteMode.OVERWRITE);
        // 保存到mysql測試
        stream.addSink(new Sink2Mysql());
        /**計數式滑動窗口測試:countWindowAll(windowSize,slideSize),以下窗口大小10個記錄,窗口一次滑動5個記錄位置
         * ,特別注意 -> countWindowAll模式無法並行,因所有記錄均需通過一個窗口*/
        SingleOutputStreamOperator<Vehicle> operator = stream
                .keyBy("type")
                .countWindowAll(5L,10L)
                .sum("weight");
        // operator.writeAsText("D:/flinkOutput02", FileSystem.WriteMode.OVERWRITE);
        operator.writeAsText("/usr/flink1.9/fileOutPut02", FileSystem.WriteMode.OVERWRITE);
        // Triggers the program execution.
        environment.execute();
    }
}

以上代碼中,重點關注StreamExecutionEnvironment,它是Flink運行的流式執行上下文,可以指定全局時間處理特性,還有並行度(下篇再述)。還有非流式的ExecutionEnvironment,缺省為Local模式,遠程的使用RemoteStreamEnvironment可在指定遠程Flink集群上運行jar文件,每個flink pipeline代碼最后都要environment.execute()來觸發執行!

FlinkKafkaConsumer則構造一個KafkaConsumer,使用kafka topic做源頭,注意序列化器的指定。然后就是對DataStream(Vehicle)流的系列操作了,其中涉及到了窗口(window)對象,因為流處理一般要處理無窮多個對象,不可能一次全處理完,如何每次選取一些想要的?那就是一個window框,要么框住一定數量的,要么框住一定時間段內的,然后再處理。Window的概念還是非常重要的!我后續再講,或者看官君查查資料,此篇不展開。

com.biao.flink.Sink2Mysql存數據到mysql邏輯:

// 也可使用頂級接口 SinkFunction,RichSinkFunction也實現了SinkFunction
publicclass Sink2Mysql extends RichSinkFunction<Vehicle> {
    private PreparedStatement preparedStatement;
    private DruidDataSource dataSource;
    private Connection connection;

    // Initialization method for the function.
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        dataSource = new DruidDataSource();
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUsername("root");
        dataSource.setPassword("123456");
        dataSource.setUrl("jdbc:mysql://192.168.1.107:3306/data_center?characterEncoding=UTF-8&serverTimezone=UTC");
        connection = dataSource.getConnection();
        preparedStatement = connection.prepareStatement("insert into vehicle(type, plate, color, weight) values(?, ?, ?, ?);");

    }

    // Tear-down method for the user code. It is called after the last call to the main working methods
    @Override
    public void close() throws Exception {
        super.close();
        // 需try-catch,我直接簡寫了
        preparedStatement.close();
        connection.close();
        dataSource.close();
    }

    // Writes the given value to the sink. This function is called for every record.
    @Override
    public void invoke(Vehicle value, Context context) throws Exception {
        preparedStatement.setString(1,value.getType());
        preparedStatement.setInt(2,value.getPlate());
        preparedStatement.setString(3,value.getColor());
        preparedStatement.setFloat(4,value.getWeight());
        boolean result = preparedStatement.execute();
        System.out.println("DB saved record success >>>>> " );
        /*preparedStatement.addBatch();
        int[] updateNums = preparedStatement.executeBatch();
        System.out.println("DB saved items >>>>> " + updateNums.length);*/
    }
}

以上就是保存到Mysql的方法單獨實現:使用DruidDataSource來了一套JDBC標准寫法。

 

第五部分 運行部署

5.1 本地運行:

  • 啟動各linux上的zookeeper和Kafka,
  • 先運行com.biao.flink.FlinkMain,注意本地要修改下文件保存路徑的String,
  • 再運行com.biao.flink.KafkaMain,即可查看文件和Mysql的寫入情況:

貼圖略!略!略!可參考5.2的圖。

5.2 部署到Flink上運行:

啟動各linux上的zookeeper和Kafka,

使用shadowJar插件打包jar文件,這里需要注意下build.gradle文件的編寫,特別是shadowJar如何打包依賴。注意指定jar內main函數入口。

Gradle命令:gradle clean shadowJar

 然后通過web UI可以上傳jar包運行,或者在flink server中使用命令行,或者還可以使用Rest API方式,我這里使用web UI上傳,最后submit一下:

點擊plan,可以看到我們的前面代碼的Flink計算邏輯數據流向圖:

然后運行com.biao.flink.KafkaMain產生流,再去文件和Mysql查看下數據寫入情況。可以驗證KafkaMain每運行一次,車流有75輛,多次則為75*N輛:

文件寫入情況,這里以sum sink的結果output02為例:

Mysql寫入情況:

 

總結:

Java Stream,Kafka Stream和 Flink Stream各有特點,應注意區分和理解,Java Stream側重在對集合類數據局部處理,Kafka Stream流則可在流傳輸過程中做輕量級預處理,而Flink Stream則是重量級流計算框架,需依照業務需求量體裁衣。

問題解決:

1.RHEL8.0設置靜態和臨時主機名,主機名分為三類:靜態static、個性pretty和臨時transient:

[root@localhost ~]# hostnamectl status
[root@localhost ~]# hostnamectl set-hostname "Your Pretty HostName" --pretty
[root@localhost ~]# hostnamectl set-hostname host.example.com --static
[root@localhost ~]# hostnamectl set-hostname host.example.com --transient
[root@localhost ~]# hostnamectl
 

2.Build后找不到jar文件,確認項目文件顯示選項開啟:

3.Springboot和shadowJar打包模式的區別:compile語句是springboot打包fatJar用的,flinkShadowJar語句是shadow打包進fatJar用的,而且shadowJar打包是將其他依賴jar拆分成類文件放入fatJar,而springboot是直接引入依賴jar。

4.Flink的窗口:基於時間的tumbling time windows(翻滾時間窗口) 和 sliding time windows(滑動時間窗口),還有基於count/session 的窗口,當然還可以自定義窗口。

5.每個流元素都有三個不同的時間概念,即 processing time被處理時刻戳, event time元素自帶事件時間戳 和 ingestion time進入Flink的時間戳,Flink可以通過StreamExecutionEnvironment設置按照哪個timestamp處理。

6.submit任務后,出現異常:org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available 這是因為worker的Task Slots數量不夠,可以通過加大修改conf/flink-conf.yaml的參數taskmanager.numberOfTaskSlots來滿足,一般設置為cpu核心整數倍。

7.mysql寫入失敗,檢查DB:mysql中User表的網絡訪問權限設置,localhost表示只能本地訪問,連接串中是"jdbc:mysql://192.168.1.107:3306/…",flink運行時是從其他linux IP訪問107:3306,所以要改為接受全部的 “ % ”,記得再重啟mysql服務才能生效!

8.文件可以寫入但mysql未寫入,查看日志:Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure 研究得出是windows防火牆阻止3306端口訪問,新建一個入口規則可搞定:

9.Task Managers下面沒有全部的worker節點,且Overview中Available Task Slots一直是0,即standalone集群部署失敗,解決辦法,此問題我研究了整整一天才解決,因為沒報任何錯誤,且每個節點可獨立啟動,只能全部重裝加刪日志試出來的:

上述問題葯方:把222 master節點上/usr/flink1.9/flink-1.9.1/log目錄下文件全部刪掉!再重啟集群即可。

10.運行集群模式時查看Running job的詳細信息,很久后還一直轉圈,flink的 log 日志顯示:java.net.NoRouteToHostException: No route to host ,請關閉linux的防火牆!

[root@server221 ~]# systemctl stop firewalld
[root@server221 ~]# systemctl status firewalld

 

個人原創文章,謝絕任何形式轉載,否則追究法律責任!

 

我的往期文章:

  1. 流式計算(二)-Kafka Stream
  2. 流式計算(一)-Java8Stream
  3. Dubbo學習系列之十六(ELK海量日志分析)
  4. Linux下Redis集群
  5. Dubbo學習系列之十五(Seata分布式事務方案TCC模式)
 
個人微信公眾號,只發原創文章。

此篇完!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM