[Hadoop] - 自定義Mapreduce InputFormat&OutputFormat


  在MR程序的開發過程中,經常會遇到輸入數據不是HDFS或者數據輸出目的地不是HDFS的,MapReduce的設計已經考慮到這種情況,它為我們提供了兩個組建,只需要我們自定義適合的InputFormat和OutputFormat,就可以完成這個需求,這里簡單的介紹一個從MongoDB中讀數據,並寫出數據到MongoDB中的一種情況,只是一個Demo,所以數據隨便找的一個。


一、自定義InputFormat

  MapReduce中Map階段的數據輸入是由InputFormat決定的,我們查看org.apache.hadoop.mapreduce.InputFormat的源碼可以看到以下代碼內容,我們可以看到除了實現InputFormat抽象類以外,我們還需要自定義InputSplit和自定義RecordReader類,這兩個類的主要作用分別是:split確定數據分片的大小以及數據的位置信息,recordReader具體的讀取數據。

public abstract class InputFormat<K, V> {
  public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; // 獲取Map階段的數據分片集合信息
  public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 創建具體的數據讀取對象
}

  1、自定義InputSplit

    自定義InputSplit主要需要實現的方法有一下幾個:

public abstract class InputSplit {  
  public abstract long getLength() throws IOException, InterruptedException; // 獲取當前分片的長度大小  
  public abstract String[] getLocations() throws IOException, InterruptedException; // 獲取當前分片的位置信息  
}

  2、自定義RecordReader

    自定義RecordReader的主要實現方法有一下幾個:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
  public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化,如果在構造函數中初始化了,那么該方法可以為空
  public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一個key/value,如果存在返回true。否則返回false。
  public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;  // 獲取當然key
  public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;  // 獲取當然value
  public abstract float getProgress() throws IOException, InterruptedException;  // 獲取進度信息
  public abstract void close() throws IOException; // 關閉資源
}

二、自定義OutputFormat

  MapReduce中Reducer階段的數據輸出是由OutputFormat決定的,決定數據的輸出目的地和job的提交對象,我們查看org.apache.hadoop.mapreduce.OutputFormat的源碼可以看到以下代碼內容,我們可以看到除了實現OutputFormat抽象類以外,我們還需要自定義RecordWriter和自定義OutputCommitter類,其中OutputCommitter類由於不涉及到具體的輸出目的地,所以一般情況下,不用重寫,可直接使用FileOutputcommitter對象;RecordWriter類是具體的定義如何將數據寫到目的地的。

public abstract class OutputFormat<K, V> { 
  public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 獲取具體的數據寫出對象
  public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 檢查輸出配置信息是否正確
  public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 獲取輸出job的提交者對象
}

  1、自定義RecordWriter

    查看RecordWriter源碼,我們可以看到主要需要實現的有下列三個方法,分別是:

public abstract class RecordWriter<K, V> {  
  public abstract void write(K key, V value) throws IOException, InterruptedException;  // 具體的寫數據的方法
  public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 關閉資源
}

三、詳細代碼

  自定義InputFormat&InputSplit

  1 package com.gerry.mongo.hadoop2x.mr.mongodb.lib;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.util.ArrayList;
  7 import java.util.List;
  8 import java.util.Map;
  9 
 10 import org.apache.hadoop.conf.Configurable;
 11 import org.apache.hadoop.conf.Configuration;
 12 import org.apache.hadoop.io.LongWritable;
 13 import org.apache.hadoop.io.Writable;
 14 import org.apache.hadoop.mapreduce.InputFormat;
 15 import org.apache.hadoop.mapreduce.InputSplit;
 16 import org.apache.hadoop.mapreduce.JobContext;
 17 import org.apache.hadoop.mapreduce.MRJobConfig;
 18 import org.apache.hadoop.mapreduce.RecordReader;
 19 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 20 import org.apache.log4j.Logger;
 21 
 22 import com.mongodb.BasicDBObject;
 23 import com.mongodb.BasicDBObjectBuilder;
 24 import com.mongodb.DB;
 25 import com.mongodb.DBCollection;
 26 import com.mongodb.DBObject;
 27 import com.mongodb.Mongo;
 28 import com.mongodb.MongoException;
 29 
 30 public class MongoDBInputFormat<T extends MongoDBWritable> extends InputFormat<LongWritable, T> implements Configurable {
 31     private static final Logger LOG = Logger.getLogger(MongoDBInputFormat.class);
 32 
 33     /**
 34      * 空的對象,主要作用是不進行任何操作,類似於NullWritable
 35      */
 36     public static class NullMongoDBWritable implements MongoDBWritable, Writable {
 37         @Override
 38         public void write(DBCollection collection) throws MongoException {
 39             // TODO Auto-generated method stub
 40         }
 41 
 42         @Override
 43         public void readFields(DBObject object) throws MongoException {
 44             // TODO Auto-generated method stub
 45         }
 46 
 47         @Override
 48         public void write(DataOutput out) throws IOException {
 49             // TODO Auto-generated method stub
 50         }
 51 
 52         @Override
 53         public void readFields(DataInput in) throws IOException {
 54             // TODO Auto-generated method stub
 55         }
 56 
 57         @Override
 58         public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
 59             // TODO Auto-generated method stub
 60             return old;
 61         }
 62 
 63     }
 64 
 65     /**
 66      * MongoDB的input split類
 67      */
 68     public static class MongoDBInputSplit extends InputSplit implements Writable {
 69         private long end = 0;
 70         private long start = 0;
 71 
 72         /**
 73          * 默認構造方法
 74          */
 75         public MongoDBInputSplit() {
 76         }
 77 
 78         /**
 79          * 便利的構造方法
 80          * 
 81          * @param start
 82          *            集合中查詢的文檔開始行號
 83          * @param end
 84          *            集合中查詢的文檔結束行號
 85          */
 86         public MongoDBInputSplit(long start, long end) {
 87             this.start = start;
 88             this.end = end;
 89         }
 90 
 91         public long getEnd() {
 92             return end;
 93         }
 94 
 95         public long getStart() {
 96             return start;
 97         }
 98 
 99         @Override
100         public void write(DataOutput out) throws IOException {
101             out.writeLong(this.start);
102             out.writeLong(this.end);
103         }
104 
105         @Override
106         public void readFields(DataInput in) throws IOException {
107             this.start = in.readLong();
108             this.end = in.readLong();
109         }
110 
111         @Override
112         public long getLength() throws IOException, InterruptedException {
113             // 分片大小
114             return this.end - this.start;
115         }
116 
117         @Override
118         public String[] getLocations() throws IOException, InterruptedException {
119             // TODO 返回一個空的數組,表示不進行數據本地化的優化,那么map執行節點隨機選擇。
120             return new String[] {};
121         }
122 
123     }
124 
125     protected MongoDBConfiguration mongoConfiguration; // mongo相關配置信息
126     protected Mongo mongo; // mongo連接
127     protected String databaseName; // 連接的數據庫名稱
128     protected String collectionName; // 連接的集合名稱
129     protected DBObject conditionQuery; // 選擇條件
130     protected DBObject fieldQuery; // 需要的字段條件
131 
132     @Override
133     public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
134         DBCollection dbCollection = null;
135         try {
136             dbCollection = this.getDBCollection();
137             // 獲取數量大小
138             long count = dbCollection.count(this.getConditionQuery());
139             int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
140             long chunkSize = (count / chunks); // 分片數量
141 
142             // 開始分片,只是簡單的分配每個分片的數據量
143             List<InputSplit> splits = new ArrayList<InputSplit>();
144             for (int i = 0; i < chunks; i++) {
145                 MongoDBInputSplit split = null;
146                 if ((i + 1) == chunks) {
147                     split = new MongoDBInputSplit(i * chunkSize, count);
148                 } else {
149                     split = new MongoDBInputSplit(i * chunkSize, (i * chunkSize) + chunkSize);
150                 }
151                 splits.add(split);
152             }
153             return splits;
154         } catch (Exception e) {
155             throw new IOException(e);
156         } finally {
157             dbCollection = null;
158             closeConnection(); // 關閉資源的連接
159         }
160     }
161 
162     @Override
163     public RecordReader<LongWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
164         return createRecordReader((MongoDBInputSplit) split, context.getConfiguration());
165     }
166 
167     protected RecordReader<LongWritable, T> createRecordReader(MongoDBInputSplit split, Configuration conf) {
168         // 獲取從mongodb中讀取數據需要轉換成的value class,默認為NullMongoDBWritable
169         Class<? extends MongoDBWritable> valueClass = this.mongoConfiguration.getValueClass();
170         return new MongoDBRecordReader<T>(split, valueClass, conf, getDBCollection(), getConditionQuery(), getFieldQuery());
171     }
172 
173     @Override
174     public void setConf(Configuration conf) {
175         mongoConfiguration = new MongoDBConfiguration(conf);
176         databaseName = this.mongoConfiguration.getInputDatabaseName(); // 輸入數據的數據庫
177         collectionName = this.mongoConfiguration.getInputCollectionName(); // 輸入數據的集合
178         getMongo(); // 初始化
179         getConditionQuery(); // 初始化
180         getFieldQuery(); // 初始化
181     }
182 
183     @Override
184     public Configuration getConf() {
185         return this.mongoConfiguration.getConfiguration();
186     }
187 
188     public Mongo getMongo() {
189         try {
190             if (null == this.mongo) {
191                 this.mongo = this.mongoConfiguration.getMongoConnection();
192             }
193         } catch (Exception e) {
194             throw new RuntimeException(e);
195         }
196         return mongo;
197     }
198 
199     public DBObject getConditionQuery() {
200         if (null == this.conditionQuery) {
201             Map<String, String> conditions = this.mongoConfiguration.getInputConditions();
202             BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
203             for (Map.Entry<String, String> entry : conditions.entrySet()) {
204                 if (entry.getValue() != null) {
205                     builder.append(entry.getKey(), entry.getValue());
206                 } else {
207                     builder.push(entry.getKey());
208                 }
209             }
210             if (builder.isEmpty()) {
211                 this.conditionQuery = new BasicDBObject();
212             } else {
213                 this.conditionQuery = builder.get();
214             }
215         }
216         return this.conditionQuery;
217     }
218 
219     public DBObject getFieldQuery() {
220         if (fieldQuery == null) {
221             String[] fields = this.mongoConfiguration.getInputFieldNames();
222             if (fields != null && fields.length > 0) {
223                 BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
224                 for (String field : fields) {
225                     builder.push(field);
226                 }
227                 fieldQuery = builder.get();
228             } else {
229                 fieldQuery = new BasicDBObject();
230             }
231         }
232         return fieldQuery;
233     }
234 
235     protected DBCollection getDBCollection() {
236         DB db = getMongo().getDB(this.databaseName);
237         if (this.mongoConfiguration.isEnableAuth()) {
238             String username = this.mongoConfiguration.getUsername();
239             String password = this.mongoConfiguration.getPassword();
240             if (!db.authenticate(username, password.toCharArray())) {
241                 throw new RuntimeException("authenticate failure with the username:" + username + ",pwd:" + password);
242             }
243         }
244         return db.getCollection(collectionName);
245     }
246 
247     protected void closeConnection() {
248         try {
249             if (null != this.mongo) {
250                 this.mongo.close();
251                 this.mongo = null;
252             }
253         } catch (Exception e) {
254             LOG.debug("Exception on close", e);
255         }
256     }
257 }
MongoDBInputFormat.java

  自定義RecordReader

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;

import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;

public class MongoDBRecordReader<T extends MongoDBWritable> extends RecordReader<LongWritable, T> {
    private Class<? extends MongoDBWritable> valueClass;
    private LongWritable key;
    private T value;
    private long pos;
    private Configuration conf;
    private MongoDBInputFormat.MongoDBInputSplit split;
    private DBCollection collection;
    private DBObject conditionQuery;
    private DBObject fieldQuery;
    private DBCursor cursor;

    public MongoDBRecordReader(MongoDBInputFormat.MongoDBInputSplit split, Class<? extends MongoDBWritable> valueClass, Configuration conf, DBCollection collection, DBObject conditionQuery,
            DBObject fieldQuery) {
        this.split = split;
        this.valueClass = valueClass;
        this.collection = collection;
        this.conditionQuery = conditionQuery;
        this.fieldQuery = fieldQuery;
        this.conf = conf;
    }

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // do nothing
    }

    @SuppressWarnings("unchecked")
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        try {
            if (key == null) {
                key = new LongWritable();
            }
            if (value == null) {
                value = (T) ReflectionUtils.newInstance(valueClass, conf);
            }
            if (null == cursor) {
                cursor = executeQuery();
            }
            if (!cursor.hasNext()) {
                return false;
            }

            key.set(pos + split.getStart()); // 設置key
            value.readFields(cursor.next()); // 設置value
            pos++;
        } catch (Exception e) {
            throw new IOException("Exception in nextKeyValue", e);
        }
        return true;
    }

    protected DBCursor executeQuery() {
        try {
            return collection.find(conditionQuery, fieldQuery).skip((int) split.getStart()).limit((int) split.getLength());
        } catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return this.key;
    }

    @Override
    public T getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return pos;
    }

    @Override
    public void close() throws IOException {
        if (collection != null) {
            collection.getDB().getMongo().close();
        }
    }

}
MongoDBRecordReader.java

  自定義OutputFormat&RecordWriter

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;

import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;

public class MongoDBOutputFormat<K extends MongoDBWritable, V extends MongoDBWritable> extends OutputFormat<K, V> {
    private static Logger LOG = Logger.getLogger(MongoDBOutputFormat.class);

    /**
     * A RecordWriter that writes the reduce output to a MongoDB collection
     * 
     * @param <K>
     * @param <T>
     */
    public static class MongoDBRecordWriter<K extends MongoDBWritable, V extends MongoDBWritable> extends RecordWriter<K, V> {
        private Mongo mongo;
        private String databaseName;
        private String collectionName;
        private MongoDBConfiguration dbConf;
        private DBCollection dbCollection;
        private DBObject dbObject;
        private boolean enableFetchMethod;

        public MongoDBRecordWriter(MongoDBConfiguration dbConf, Mongo mongo, String databaseName, String collectionName) {
            this.mongo = mongo;
            this.databaseName = databaseName;
            this.collectionName = collectionName;
            this.dbConf = dbConf;
            this.enableFetchMethod = this.dbConf.isEnableUseFetchMethod();
            getDbCollection();// 創建連接
        }

        protected DBCollection getDbCollection() {
            if (null == this.dbCollection) {
                DB db = this.mongo.getDB(this.databaseName);
                if (this.dbConf.isEnableAuth()) {
                    String username = this.dbConf.getUsername();
                    String password = this.dbConf.getPassword();
                    if (!db.authenticate(username, password.toCharArray())) {
                        throw new RuntimeException("authenticate failure, the username:" + username + ", pwd:" + password);
                    }
                }
                this.dbCollection = db.getCollection(this.collectionName);
            }
            return this.dbCollection;
        }

        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            if (this.enableFetchMethod) {
                this.dbObject = key.fetchWriteDBObject(null);
                this.dbObject = value.fetchWriteDBObject(this.dbObject);
                // 寫數據
                this.dbCollection.insert(this.dbObject);// 在這里可以做一個緩存,一起提交,如果數據量大的情況下。
                this.dbObject = null;
            } else {
                // 直接調用寫方法
                key.write(dbCollection);
                value.write(dbCollection);
            }
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if (this.mongo != null) {
                this.dbCollection = null;
                this.mongo.close();
            }
        }
    }

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        try {
            MongoDBConfiguration dbConf = new MongoDBConfiguration(context.getConfiguration());
            String databaseName = dbConf.getOutputDatabaseName();
            String collectionName = dbConf.getOutputCollectionName();
            Mongo mongo = dbConf.getMongoConnection();
            return new MongoDBRecordWriter<K, V>(dbConf, mongo, databaseName, collectionName);
        } catch (Exception e) {
            LOG.error("Create the record writer occur exception.", e);
            throw new IOException(e);
        }
    }

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
        // 不進行檢測
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        // 由於outputcommitter主要作用是提交jar,分配jar的功能。所以我們這里直接使用FileOutputCommitter
        return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
    }

    /**
     * 設置output屬性
     * 
     * @param job
     * @param databaseName
     * @param collectionName
     */
    public static void setOutput(Job job, String databaseName, String collectionName) {
        job.setOutputFormatClass(MongoDBOutputFormat.class);
        job.setReduceSpeculativeExecution(false);
        MongoDBConfiguration mdc = new MongoDBConfiguration(job.getConfiguration());
        mdc.setOutputCollectionName(collectionName);
        mdc.setOutputDatabaseName(databaseName);
    }

    /**
     * 靜止使用fetch方法
     * 
     * @param conf
     */
    public static void disableFetchMethod(Configuration conf) {
        conf.setBoolean(MongoDBConfiguration.OUTPUT_USE_FETCH_METHOD_PROPERTY, false);
    }
}
MongoDBOutputFormat.java

  其他涉及到的java代碼

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;

import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat.NullMongoDBWritable;
import com.mongodb.Mongo;
import com.mongodb.ServerAddress;

public class MongoDBConfiguration {
    public static final String BIND_HOST_PROPERTY = "mapreduce.mongo.host";
    public static final String BIND_PORT_PROPERTY = "mapreduce.mongo.port";
    public static final String AUTH_ENABLE_PROPERTY = "mapreduce.mongo.auth.enable";
    public static final String USERNAME_PROPERTY = "mapreduce.mongo.username";
    public static final String PASSWORD_PROPERTY = "mapreduce.mongo.password";
    public static final String PARTITION_PROPERTY = "mapreduce.mongo.partition";

    public static final String INPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.input.database.name";
    public static final String INPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.input.collection.name";
    public static final String INPUT_FIELD_NAMES_PROPERTY = "mapreduce.mongo.input.field.names";
    public static final String INPUT_CONDITIONS_PROPERTY = "mapreduce.mongo.input.conditions";
    public static final String INPUT_CLASS_PROPERTY = "mapreduce.mongo.input.class";

    public static final String OUTPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.output.database.name";
    public static final String OUTPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.output.collection.name";
    // 在recordwriter中到底是否調用fetch方法,默認調用。如果設置為不調用,那么就直接使用writer方法
    public static final String OUTPUT_USE_FETCH_METHOD_PROPERTY = "mapreduce.mongo.output.use.fetch.method";

    private Configuration conf;

    public MongoDBConfiguration(Configuration conf) {
        this.conf = conf;
    }

    /**
     * 獲取Configuration對象
     * 
     * @return
     */
    public Configuration getConfiguration() {
        return this.conf;
    }

    /**
     * 設置連接信息
     * 
     * @param host
     * @param port
     * @return
     */
    public MongoDBConfiguration configureDB(String host, int port) {
        return this.configureDB(host, port, false, null, null);
    }

    /**
     * 設置連接信息
     * 
     * @param host
     * @param port
     * @param enableAuth
     * @param username
     * @param password
     * @return
     */
    public MongoDBConfiguration configureDB(String host, int port, boolean enableAuth, String username, String password) {
        this.conf.set(BIND_HOST_PROPERTY, host);
        this.conf.setInt(BIND_PORT_PROPERTY, port);
        if (enableAuth) {
            this.conf.setBoolean(AUTH_ENABLE_PROPERTY, true);
            this.conf.set(USERNAME_PROPERTY, username);
            this.conf.set(PASSWORD_PROPERTY, password);
        }
        return this;
    }

    /**
     * 獲取MongoDB的連接對象Connection對象
     * 
     * @return
     * @throws UnknownHostException
     */
    public Mongo getMongoConnection() throws UnknownHostException {
        return new Mongo(new ServerAddress(this.getBindHost(), this.getBindPort()));
    }

    /**
     * 獲取設置的host
     * 
     * @return
     */
    public String getBindHost() {
        return this.conf.get(BIND_HOST_PROPERTY, "localhost");
    }

    /**
     * 獲取設置的port
     * 
     * @return
     */
    public int getBindPort() {
        return this.conf.getInt(BIND_PORT_PROPERTY, 27017);
    }

    /**
     * 獲取是否開啟安全驗證,默認的Mongodb是不開啟的。
     * 
     * @return
     */
    public boolean isEnableAuth() {
        return this.conf.getBoolean(AUTH_ENABLE_PROPERTY, false);
    }

    /**
     * 獲取完全驗證所需要的用戶名
     * 
     * @return
     */
    public String getUsername() {
        return this.conf.get(USERNAME_PROPERTY);
    }

    /**
     * 獲取安全驗證所需要的密碼
     * 
     * @return
     */
    public String getPassword() {
        return this.conf.get(PASSWORD_PROPERTY);
    }

    public String getPartition() {
        return conf.get(PARTITION_PROPERTY, "|");
    }

    public MongoDBConfiguration setPartition(String partition) {
        conf.set(PARTITION_PROPERTY, partition);
        return this;
    }

    public String getInputDatabaseName() {
        return conf.get(INPUT_DATABASE_NAME_PROPERTY, "test");
    }

    public MongoDBConfiguration setInputDatabaseName(String databaseName) {
        conf.set(INPUT_DATABASE_NAME_PROPERTY, databaseName);
        return this;
    }

    public String getInputCollectionName() {
        return conf.get(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "test");
    }

    public void setInputCollectionName(String tableName) {
        conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, tableName);
    }

    public String[] getInputFieldNames() {
        return conf.getStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
    }

    public void setInputFieldNames(String... fieldNames) {
        conf.setStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
    }

    public Map<String, String> getInputConditions() {
        Map<String, String> result = new HashMap<String, String>();
        String[] conditions = conf.getStrings(INPUT_CONDITIONS_PROPERTY);
        if (conditions != null && conditions.length > 0) {
            String partition = this.getPartition();
            String[] values = null;
            for (String condition : conditions) {
                values = condition.split(partition);
                if (values != null && values.length == 2) {
                    result.put(values[0], values[1]);
                } else {
                    result.put(condition, null);
                }
            }
        }
        return result;
    }

    public void setInputConditions(Map<String, String> conditions) {
        if (conditions != null && conditions.size() > 0) {
            String[] values = new String[conditions.size()];
            String partition = this.getPartition();
            int k = 0;
            for (Map.Entry<String, String> entry : conditions.entrySet()) {
                if (entry.getValue() != null) {
                    values[k++] = entry.getKey() + partition + entry.getValue();
                } else {
                    values[k++] = entry.getKey();
                }
            }
            conf.setStrings(INPUT_CONDITIONS_PROPERTY, values);
        }
    }

    public Class<? extends MongoDBWritable> getValueClass() {
        return conf.getClass(INPUT_CLASS_PROPERTY, NullMongoDBWritable.class, MongoDBWritable.class);
    }

    public void setInputClass(Class<? extends DBWritable> inputClass) {
        conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
    }

    public String getOutputDatabaseName() {
        return conf.get(OUTPUT_DATABASE_NAME_PROPERTY, "test");
    }

    public MongoDBConfiguration setOutputDatabaseName(String databaseName) {
        conf.set(OUTPUT_DATABASE_NAME_PROPERTY, databaseName);
        return this;
    }

    public String getOutputCollectionName() {
        return conf.get(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, "test");
    }

    public void setOutputCollectionName(String tableName) {
        conf.set(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, tableName);
    }

    public boolean isEnableUseFetchMethod() {
        return conf.getBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, true);
    }

    public void setOutputUseFetchMethod(boolean useFetchMethod) {
        conf.setBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, useFetchMethod);
    }
}
MongoDBConfiguration.java
package com.gerry.mongo.hadoop2x.mr.mongodb.lib;

import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;

public interface MongoDBWritable {
    /**
     * 往mongodb的集合中寫數據
     * 
     * @param collection
     * @throws MongoException
     */
    public void write(DBCollection collection) throws MongoException;

    /**
     * 獲取要寫的mongoDB對象
     * 
     * @param old
     * @return
     * @throws MongoException
     */
    public DBObject fetchWriteDBObject(DBObject old) throws MongoException;

    /**
     * 從mongodb的集合中讀數據
     * 
     * @param collection
     * @throws MongoException
     */
    public void readFields(DBObject object) throws MongoException;
}
MongoDBWritable.java
package com.gerry.mongo.hadoop2x.mr.mongodb.nw;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBConfiguration;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBOutputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBWritable;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;

public class Demo {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        // 設置輸入的mongodb的數據庫和集合,以及對應的輸入對象value,這里的數據庫和集合要求存在,否則是沒有數據的,當然沒有數據不會出問題
        conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "users");
        conf.set(MongoDBConfiguration.INPUT_DATABASE_NAME_PROPERTY, "db_java");
        conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, DemoInputValueAndOutputKey.class, MongoDBWritable.class);
        Job job = Job.getInstance(conf, "mongodb-demo");

        job.setJarByClass(Demo.class);
        job.setMapperClass(DemoMapper.class);
        job.setReducerClass(DemoReducer.class);
        job.setOutputKeyClass(DemoInputValueAndOutputKey.class);
        job.setOutputValueClass(DemoOutputValue.class);
        job.setMapOutputKeyClass(DemoInputValueAndOutputKey.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setInputFormatClass(MongoDBInputFormat.class);
        MongoDBOutputFormat.setOutput(job, "foobar2", "users"); // 這個可以不存在

        job.waitForCompletion(true);
    }

    public static class DemoOutputValue implements Writable, MongoDBWritable {
        private Date clientTime;
        private long count;

        @Override
        public void write(DBCollection collection) throws MongoException {
            throw new UnsupportedOperationException();
        }

        @Override
        public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
            BasicDBObjectBuilder builder = null;
            Set<String> keys = new HashSet<String>();
            if (old != null) {
                keys = old.keySet();
                builder = BasicDBObjectBuilder.start(old.toMap());
            } else {
                builder = new BasicDBObjectBuilder();
            }
            // 添加當前對象的value值,如果存在同樣的key,那么加序號
            builder.append(getKey(keys, "time", 0), clientTime).append(getKey(keys, "count", 0), this.count);
            return builder.get();
        }

        @Override
        public void readFields(DBObject object) throws MongoException {
            throw new UnsupportedOperationException();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(this.clientTime.getTime());
            out.writeLong(this.count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.clientTime = new Date(in.readLong());
            this.count = in.readLong();
        }

        public Date getClientTime() {
            return clientTime;
        }

        public void setClientTime(Date clientTime) {
            this.clientTime = clientTime;
        }

        public long getCount() {
            return count;
        }

        public void setCount(long count) {
            this.count = count;
        }
    }

    public static class DemoInputValueAndOutputKey implements MongoDBWritable, WritableComparable<DemoInputValueAndOutputKey> {
        private String name;
        private Integer age;
        private String sex;

        @Override
        public void write(DataOutput out) throws IOException {
            if (this.name == null) {
                out.writeBoolean(false);
            } else {
                out.writeBoolean(true);
                out.writeUTF(this.name);
            }
            if (this.age == null) {
                out.writeBoolean(false);
            } else {
                out.writeBoolean(true);
                out.writeInt(this.age);
            }
            if (this.sex == null) {
                out.writeBoolean(false);
            } else {
                out.writeBoolean(true);
                out.writeUTF(this.sex);
            }
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.name = in.readBoolean() ? in.readUTF() : null;
            this.age = in.readBoolean() ? Integer.valueOf(in.readInt()) : null;
            this.sex = in.readBoolean() ? in.readUTF() : null;
        }

        @Override
        public void write(DBCollection collection) throws MongoException {
            DBObject object = new BasicDBObject();
            object.put("name", this.name);
            object.put("age", this.age.intValue());
            object.put("sex", this.sex);
            collection.insert(object);
        }

        @Override
        public void readFields(DBObject object) throws MongoException {
            this.name = (String) object.get("name");
            this.age = (Integer) object.get("age");
            this.sex = (String) object.get("sex");
        }

        @Override
        public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
            BasicDBObjectBuilder builder = null;
            Set<String> keys = new HashSet<String>();
            if (old != null) {
                keys = old.keySet();
                builder = BasicDBObjectBuilder.start(old.toMap());
            } else {
                builder = new BasicDBObjectBuilder();
            }
            // 添加當前對象的value值,如果存在同樣的key,那么加序號
            if (this.name != null) {
                builder.append(getKey(keys, "name", 0), this.name);
            }
            if (this.age != null) {
                builder.append(getKey(keys, "age", 0), this.age.intValue());
            }
            if (this.sex != null) {
                builder.append(getKey(keys, "sex", 0), this.sex);
            }
            return builder.get();
        }

        @Override
        public String toString() {
            return "DemoInputValue [name=" + name + ", age=" + age + ", sex=" + sex + "]";
        }

        @Override
        public int compareTo(DemoInputValueAndOutputKey o) {
            int tmp;
            if (this.name == null) {
                if (o.name != null) {
                    return -1;
                }
            } else if (o.name == null) {
                return 1;
            } else {
                tmp = this.name.compareTo(o.name);
                if (tmp != 0) {
                    return tmp;
                }
            }

            if (this.age == null) {
                if (o.age != null) {
                    return -1;
                }
            } else if (o.age == null) {
                return 1;
            } else {
                tmp = this.age - o.age;
                if (tmp != 0) {
                    return tmp;
                }
            }

            if (this.sex == null) {
                if (o.sex != null) {
                    return -1;
                }
            } else if (o.sex == null) {
                return 1;
            } else {
                return this.sex.compareTo(o.sex);
            }
            return 0;
        }

    }

    /**
     * 直接輸出
     * 
     * @author jsliuming
     * 
     */
    public static class DemoMapper extends Mapper<LongWritable, DemoInputValueAndOutputKey, DemoInputValueAndOutputKey, NullWritable> {
        @Override
        protected void map(LongWritable key, DemoInputValueAndOutputKey value, Context context) throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }

    /**
     * 寫出數據,只做一個統計操作
     * 
     * @author jsliuming
     * 
     */
    public static class DemoReducer extends Reducer<DemoInputValueAndOutputKey, NullWritable, DemoInputValueAndOutputKey, DemoOutputValue> {
        private DemoOutputValue outputValue = new DemoOutputValue();

        @Override
        protected void reduce(DemoInputValueAndOutputKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            long sum = 0;
            for (@SuppressWarnings("unused")
            NullWritable value : values) {
                sum++;
            }
            outputValue.setClientTime(new Date());
            outputValue.setCount(sum);
            context.write(key, outputValue);
        }
    }

    /**
     * 轉換key,作用是當key存在keys集合中的時候,在key后面添加序號
     * 
     * @param keys
     * @param key
     * @param index
     * @return
     */
    public static String getKey(Set<String> keys, String key, int index) {
        while (keys.contains(key)) {
            key = key + (index++);
        }
        return key;
    }
}
Demo

四、結果截圖

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM