Http接口獲取數據寫入Hdfs


數據源類型:數組列表

[{field:value}, {field:value}, {field:value}, {field:value}]

1. 定義http數據源鏈接

package com.etl.datalink;

import java.util.Map;

public class LinkHttp {
    
        private String url;
        private Map<String,Object> params;
        
        public String getUrl() {
            return url;
        }
        public void setUrl(String url) {
            this.url = url;
        }
        public Map<String, Object> getParams() {
            return params;
        }
        public void setParams(Map<String, Object> params) {
            this.params = params;
        }

}

 

2. 定義hdfs鏈接配置

package com.etl.datalink;

import org.apache.hadoop.conf.Configuration;

public class LinkHdfs {
    
        private Configuration conf = new Configuration();
        private String fsName="fs.defaultFS";
        private String fsURI;
        
        
        public LinkHdfs(String fsName, String fsURI) {
                this.fsName = fsName;
                this.fsURI = fsURI;
                conf.set(this.fsName, this.fsURI);
        }
        
        public LinkHdfs(String fsURI) {
                this.fsURI = fsURI;
                conf.set(this.fsName, this.fsURI);
        }

        public String getFsName() {
            return fsName;
        }

        public void setFsName(String fsName) {
            this.fsName = fsName;
        }

        public String getFsURI() {
            return fsURI;
        }

        public void setFsURI(String fsURI) {
            this.fsURI = fsURI;
        }

        public Configuration getConf() {
            return conf;
        }

        public void setConf(Configuration conf) {
            this.conf = conf;
        }

     
}

3. 定義泛型類用於傳送http的內容到hdfs

   這里存在一點小問題:由於json是數組列表,所以需要獲取每條記錄,然后加入換行符號\n寫入hdfs。這樣在hive中查詢才能獲取到多個記錄。否則會全部當作一條記錄。

/**
 * 通用的http抽取數據到hdfs文件中
 * @author KingWang
 * @date    2018-10-15
 * @description
 */
public class Api2Hdfs{
    
        private static Logger log = Logger.getLogger(Api2Hdfs.class);
        
        public static <T> void run(String[] args, Class<T> clazz) {
            
            //http
            String url = args[0];
            String method = args[1];
            String startTime = args[2];
            String endTime = args[3];
        
            //hdfs
            String fsName = args[4];
            String fsURI = args[5];
            String targetFilePath = args[6];
            //http config
            Map<String,Object> params = new HashMap<String,Object>();

            //....省略部分參數 
            params.put("timestamp", System.currentTimeMillis()/1000L);
            params.put("start_time", startTime);
            params.put("end_time", endTime);
            
            LinkHttp http = new LinkHttp();
            http.setUrl(url);
            http.setParams(params);
            
            //hdfs config
            LinkHdfs hdfs = new LinkHdfs(fsName, fsURI);
            try {
                Api2Hdfs.process(http, hdfs, targetFilePath, clazz);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }    
    
        private static  <T> void process(LinkHttp http,LinkHdfs hdfs, String hdfsFile, Class<T> clazz) throws Exception{
            
                if(null==http) {
                        log.error("請求參數http未設置");
                        throw new Exception("請求參數http未設置");
                } 
                if(null==hdfs) {
                        log.error("請求參數hdfs未設置");
                        throw new Exception("請求參數hdfs未設置");
                }
            
            //創建http請求
            String url = http.getUrl();
            Map<String,Object> params = http.getParams();
            OkHttpClient client = new OkHttpClient();

            //添加參數
            FormBody.Builder bodyParams=new FormBody.Builder();
            if(params!=null && params.size() > 0) {
                  Iterator<Map.Entry<String,Object>> it = params.entrySet().iterator();
                  while(it.hasNext()) {
                      Map.Entry<String, Object> entry = it.next();
                      bodyParams.add(entry.getKey(), entry.getValue().toString());
                  }
            }
            
            final Request request = new Request.Builder().url(url).post(bodyParams.build()).build();
            Call call = client.newCall(request);
            call.enqueue(new Callback() {
    
                //網絡錯誤延遲處理
                @Override
                public void onFailure(Call call, IOException e) {
                    e.printStackTrace();
                    log.error(e.getMessage());
                }
    
                @Override
                public void onResponse(Call call, Response response) throws IOException {
                    FileSystem fs = null;
                    try {
                             
                            Path dstPath = new Path(hdfsFile);
                            fs = FileSystem.get(hdfs.getConf());
                            DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                            if(response.isSuccessful()) {
                                
                                    //對后台返回的數據進行處理
                                    System.out.println(df.format(LocalDateTime.now()) +" response.code:" +response.code());
                                    if (200 == response.code()) {
                                             
                                            //注意:response.body().string()只能有效調用一次
                                            ResponseInfo info = JSONObject.parseObject(response.body().string(), ResponseInfo.class);
                                            
                                            //error不為空,則錯誤
                                            if(StringUtils.isNotBlank(info.getError())) {
                                                    log.error(info.getError());
                                            } else {
                                                
                                                String rspcode = info.getResult().getRsp();
                                                //寫入hdfs
                                                if(rspcode.equalsIgnoreCase(ResultCode.SUCCESS.getCode())) {
                                                        System.out.println(info.getResult().getData());
                                                        if(info.getResult().getData().equals("[]")) {
                                                                System.out.println(df.format(LocalDateTime.now()) + " " + info.getResult().getMsg());        
                                                        } else {
                                                            List<T> objList = JSON.parseArray(info.getResult().getData(),clazz);
//                                                            byte[] bt = info.getResult().getData().getBytes();
                                                            FSDataOutputStream outputStream = fs.create(dstPath); 
                                                            int size = objList.size();
                                                            for(int i=0;i<size; i++) {
                                                                String orderstr = JSON.toJSONString(objList.get(i)) + '\n';
                                                                System.out.println(orderstr);
                                                                outputStream.write(orderstr.getBytes());   
                                                                if(i % 1000==0) {
                                                                    outputStream.flush();
                                                                }
                                                            }
                                                            outputStream.flush();
                                                            outputStream.close();        
                                                            log.info("create file " + hdfsFile + " success!");    
                                                        }
                                                } else {
                                                        log.error(info.getResult().getMsg());
                                                }                                                    
                                            }
                                    } 
                                    //對后台返回200~300之間的錯誤進行處理
                                    else {
                                        log.error(response.message());
                                    }
                                    
                                    //fs.close();
                            }
                    }catch (Exception e){
                        e.printStackTrace();
                        log.error(e.getMessage());
                    }finally {
                        fs.close();
                        //關閉
                        if(response.body()!=null) {
                            response.body().close();
                        }
                    }
                    log.info("write hdfs file end: " + hdfsFile);
                }
            });
            
        }

}

 

4. 定義bean用於解析, 由於定義了泛型,可以針對不同到接口定義不同的bean。

   類似如下

 

5. 定義執行的每個接口主類:

public class MemberApi extends Api2Hdfs{
    
        public static void main(String[] args) {
            Api2Hdfs.run(args, Member.class);
        }
}
public class OrderApi extends Api2Hdfs{
    
        public static void main(String[] args) {
            Api2Hdfs.run(args, Order.class);
        }
}

6. 定義每個接口的shell腳本,執行即可。

java -Djava.ext.dirs=lib com.etl.MemberApi \
${url} ${method} ${startDate} ${endDate} ${fsName} ${fsURI} ${targetFilePath} ${salt} >> ./logs/${table}.log 2>&1 &
java -Djava.ext.dirs=lib com.etl.OrderApi \
${url} ${method} ${startDate} ${endDate} ${fsName} ${fsURI} ${targetFilePath} ${salt}  >> ./logs/${table}.log 2>&1 &

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM