項目實戰 從 0 到 1 學習之Flink(25)Flink從redis中獲取數據作為source源


redis中的數據:
在這里插入圖片描述
需要實現SourceFunction接口,指定泛型<>,也就是獲取redis里的數據,處理完后的數據輸入的數據類型 這里我們需要的是
(我們需要返回kv對的,就要考慮HashMap)
pom.xml

 <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.3</version>
            </dependency>

Java代碼:

package ryx.source;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import javax.swing.plaf.TableHeaderUI;
import java.util.HashMap;
import java.util.Map;


/**
 *
 * 在redis中保存的有國家和大區的關系
 * hset  areas AREA_US US
 * hset  areas AREA_CT TW,HK
 * hset  areas AREA_AR PK,KW,SA
 * hset  areas AREA_IN IN
 *./bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic allDataClean--from-beginning
 *
 * 我們需要返回kv對的,就要考慮HashMap
 */
public class MyRedisSource implements SourceFunction<HashMap<String,String>> {
    private Logger logger= LoggerFactory.getLogger(MyRedisSource.class);
    private boolean isRunning =true;
    private Jedis jedis=null;
    private final long SLEEP_MILLION=5000;
    public void run(SourceContext<HashMap<String, String>> ctx) throws Exception {
        this.jedis = new Jedis("hadoop01", 6379);
        HashMap<String, String> kVMap = new HashMap<String, String>();
        while(isRunning){
            try{
                kVMap.clear();
                Map<String, String> areas = jedis.hgetAll("areas");
                for(Map.Entry<String,String> entry:areas.entrySet()){
                    // key :大區 value:國家
                    String key = entry.getKey();
                    String value = entry.getValue();
                    String[] splits = value.split(",");
                    System.out.println("key:"+key+",--value:"+value);
                    for (String split:splits){
                        // key :國家value:大區
                        kVMap.put(split, key);
                    }
                }
                if(kVMap.size()>0){
                    ctx.collect(kVMap);
                }else {
                    logger.warn("從redis中獲取的數據為空");
                }
                Thread.sleep(SLEEP_MILLION);
            }catch (JedisConnectionException e){
                logger.warn("redis連接異常,需要重新連接",e.getCause());
                jedis = new Jedis("hadoop01", 6379);
            }catch (Exception e){
                logger.warn(" source 數據源異常",e.getCause());
            }
        }
    }

    public void cancel() {
        isRunning=false;
        while(jedis!=null){
            jedis.close();
        }
    }
}

結果為:
key:AREA_US,–value:US
key:AREA_CT,–value:TW,HK
key:AREA_AR,–value:PK,KW,SA
key:AREA_IN,–value:IN

接着將value數據進行分割單個的單詞,和key進行進行組合裝到HashMap中,通過Run方法的SourceContext對象,作為Source源進行輸出!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM