在MR程序的開發過程中,經常會遇到輸入數據不是HDFS或者數據輸出目的地不是HDFS的,MapReduce的設計已經考慮到這種情況,它為我們提供了兩個組建,只需要我們自定義適合的InputFormat和OutputFormat,就可以完成這個需求,這里簡單的介紹一個從MongoDB中讀數據,並寫出數據到MongoDB中的一種情況,只是一個Demo,所以數據隨便找的一個。
一、自定義InputFormat
MapReduce中Map階段的數據輸入是由InputFormat決定的,我們查看org.apache.hadoop.mapreduce.InputFormat的源碼可以看到以下代碼內容,我們可以看到除了實現InputFormat抽象類以外,我們還需要自定義InputSplit和自定義RecordReader類,這兩個類的主要作用分別是:split確定數據分片的大小以及數據的位置信息,recordReader具體的讀取數據。
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; // 獲取Map階段的數據分片集合信息 public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 創建具體的數據讀取對象 }
1、自定義InputSplit
自定義InputSplit主要需要實現的方法有一下幾個:
public abstract class InputSplit { public abstract long getLength() throws IOException, InterruptedException; // 獲取當前分片的長度大小 public abstract String[] getLocations() throws IOException, InterruptedException; // 獲取當前分片的位置信息 }
2、自定義RecordReader
自定義RecordReader的主要實現方法有一下幾個:
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化,如果在構造函數中初始化了,那么該方法可以為空 public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一個key/value,如果存在返回true。否則返回false。 public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; // 獲取當然key public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; // 獲取當然value public abstract float getProgress() throws IOException, InterruptedException; // 獲取進度信息 public abstract void close() throws IOException; // 關閉資源 }
二、自定義OutputFormat
MapReduce中Reducer階段的數據輸出是由OutputFormat決定的,決定數據的輸出目的地和job的提交對象,我們查看org.apache.hadoop.mapreduce.OutputFormat的源碼可以看到以下代碼內容,我們可以看到除了實現OutputFormat抽象類以外,我們還需要自定義RecordWriter和自定義OutputCommitter類,其中OutputCommitter類由於不涉及到具體的輸出目的地,所以一般情況下,不用重寫,可直接使用FileOutputcommitter對象;RecordWriter類是具體的定義如何將數據寫到目的地的。
public abstract class OutputFormat<K, V> { public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 獲取具體的數據寫出對象 public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 檢查輸出配置信息是否正確 public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 獲取輸出job的提交者對象 }
1、自定義RecordWriter
查看RecordWriter源碼,我們可以看到主要需要實現的有下列三個方法,分別是:
public abstract class RecordWriter<K, V> { public abstract void write(K key, V value) throws IOException, InterruptedException; // 具體的寫數據的方法 public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 關閉資源 }
三、詳細代碼
自定義InputFormat&InputSplit

1 package com.gerry.mongo.hadoop2x.mr.mongodb.lib; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.util.ArrayList; 7 import java.util.List; 8 import java.util.Map; 9 10 import org.apache.hadoop.conf.Configurable; 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.io.LongWritable; 13 import org.apache.hadoop.io.Writable; 14 import org.apache.hadoop.mapreduce.InputFormat; 15 import org.apache.hadoop.mapreduce.InputSplit; 16 import org.apache.hadoop.mapreduce.JobContext; 17 import org.apache.hadoop.mapreduce.MRJobConfig; 18 import org.apache.hadoop.mapreduce.RecordReader; 19 import org.apache.hadoop.mapreduce.TaskAttemptContext; 20 import org.apache.log4j.Logger; 21 22 import com.mongodb.BasicDBObject; 23 import com.mongodb.BasicDBObjectBuilder; 24 import com.mongodb.DB; 25 import com.mongodb.DBCollection; 26 import com.mongodb.DBObject; 27 import com.mongodb.Mongo; 28 import com.mongodb.MongoException; 29 30 public class MongoDBInputFormat<T extends MongoDBWritable> extends InputFormat<LongWritable, T> implements Configurable { 31 private static final Logger LOG = Logger.getLogger(MongoDBInputFormat.class); 32 33 /** 34 * 空的對象,主要作用是不進行任何操作,類似於NullWritable 35 */ 36 public static class NullMongoDBWritable implements MongoDBWritable, Writable { 37 @Override 38 public void write(DBCollection collection) throws MongoException { 39 // TODO Auto-generated method stub 40 } 41 42 @Override 43 public void readFields(DBObject object) throws MongoException { 44 // TODO Auto-generated method stub 45 } 46 47 @Override 48 public void write(DataOutput out) throws IOException { 49 // TODO Auto-generated method stub 50 } 51 52 @Override 53 public void readFields(DataInput in) throws IOException { 54 // TODO Auto-generated method stub 55 } 56 57 @Override 58 public DBObject fetchWriteDBObject(DBObject old) throws MongoException { 59 // TODO Auto-generated method stub 60 return old; 61 } 62 63 } 64 65 /** 66 * MongoDB的input split類 67 */ 68 public static class MongoDBInputSplit extends InputSplit implements Writable { 69 private long end = 0; 70 private long start = 0; 71 72 /** 73 * 默認構造方法 74 */ 75 public MongoDBInputSplit() { 76 } 77 78 /** 79 * 便利的構造方法 80 * 81 * @param start 82 * 集合中查詢的文檔開始行號 83 * @param end 84 * 集合中查詢的文檔結束行號 85 */ 86 public MongoDBInputSplit(long start, long end) { 87 this.start = start; 88 this.end = end; 89 } 90 91 public long getEnd() { 92 return end; 93 } 94 95 public long getStart() { 96 return start; 97 } 98 99 @Override 100 public void write(DataOutput out) throws IOException { 101 out.writeLong(this.start); 102 out.writeLong(this.end); 103 } 104 105 @Override 106 public void readFields(DataInput in) throws IOException { 107 this.start = in.readLong(); 108 this.end = in.readLong(); 109 } 110 111 @Override 112 public long getLength() throws IOException, InterruptedException { 113 // 分片大小 114 return this.end - this.start; 115 } 116 117 @Override 118 public String[] getLocations() throws IOException, InterruptedException { 119 // TODO 返回一個空的數組,表示不進行數據本地化的優化,那么map執行節點隨機選擇。 120 return new String[] {}; 121 } 122 123 } 124 125 protected MongoDBConfiguration mongoConfiguration; // mongo相關配置信息 126 protected Mongo mongo; // mongo連接 127 protected String databaseName; // 連接的數據庫名稱 128 protected String collectionName; // 連接的集合名稱 129 protected DBObject conditionQuery; // 選擇條件 130 protected DBObject fieldQuery; // 需要的字段條件 131 132 @Override 133 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 134 DBCollection dbCollection = null; 135 try { 136 dbCollection = this.getDBCollection(); 137 // 獲取數量大小 138 long count = dbCollection.count(this.getConditionQuery()); 139 int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); 140 long chunkSize = (count / chunks); // 分片數量 141 142 // 開始分片,只是簡單的分配每個分片的數據量 143 List<InputSplit> splits = new ArrayList<InputSplit>(); 144 for (int i = 0; i < chunks; i++) { 145 MongoDBInputSplit split = null; 146 if ((i + 1) == chunks) { 147 split = new MongoDBInputSplit(i * chunkSize, count); 148 } else { 149 split = new MongoDBInputSplit(i * chunkSize, (i * chunkSize) + chunkSize); 150 } 151 splits.add(split); 152 } 153 return splits; 154 } catch (Exception e) { 155 throw new IOException(e); 156 } finally { 157 dbCollection = null; 158 closeConnection(); // 關閉資源的連接 159 } 160 } 161 162 @Override 163 public RecordReader<LongWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 164 return createRecordReader((MongoDBInputSplit) split, context.getConfiguration()); 165 } 166 167 protected RecordReader<LongWritable, T> createRecordReader(MongoDBInputSplit split, Configuration conf) { 168 // 獲取從mongodb中讀取數據需要轉換成的value class,默認為NullMongoDBWritable 169 Class<? extends MongoDBWritable> valueClass = this.mongoConfiguration.getValueClass(); 170 return new MongoDBRecordReader<T>(split, valueClass, conf, getDBCollection(), getConditionQuery(), getFieldQuery()); 171 } 172 173 @Override 174 public void setConf(Configuration conf) { 175 mongoConfiguration = new MongoDBConfiguration(conf); 176 databaseName = this.mongoConfiguration.getInputDatabaseName(); // 輸入數據的數據庫 177 collectionName = this.mongoConfiguration.getInputCollectionName(); // 輸入數據的集合 178 getMongo(); // 初始化 179 getConditionQuery(); // 初始化 180 getFieldQuery(); // 初始化 181 } 182 183 @Override 184 public Configuration getConf() { 185 return this.mongoConfiguration.getConfiguration(); 186 } 187 188 public Mongo getMongo() { 189 try { 190 if (null == this.mongo) { 191 this.mongo = this.mongoConfiguration.getMongoConnection(); 192 } 193 } catch (Exception e) { 194 throw new RuntimeException(e); 195 } 196 return mongo; 197 } 198 199 public DBObject getConditionQuery() { 200 if (null == this.conditionQuery) { 201 Map<String, String> conditions = this.mongoConfiguration.getInputConditions(); 202 BasicDBObjectBuilder builder = new BasicDBObjectBuilder(); 203 for (Map.Entry<String, String> entry : conditions.entrySet()) { 204 if (entry.getValue() != null) { 205 builder.append(entry.getKey(), entry.getValue()); 206 } else { 207 builder.push(entry.getKey()); 208 } 209 } 210 if (builder.isEmpty()) { 211 this.conditionQuery = new BasicDBObject(); 212 } else { 213 this.conditionQuery = builder.get(); 214 } 215 } 216 return this.conditionQuery; 217 } 218 219 public DBObject getFieldQuery() { 220 if (fieldQuery == null) { 221 String[] fields = this.mongoConfiguration.getInputFieldNames(); 222 if (fields != null && fields.length > 0) { 223 BasicDBObjectBuilder builder = new BasicDBObjectBuilder(); 224 for (String field : fields) { 225 builder.push(field); 226 } 227 fieldQuery = builder.get(); 228 } else { 229 fieldQuery = new BasicDBObject(); 230 } 231 } 232 return fieldQuery; 233 } 234 235 protected DBCollection getDBCollection() { 236 DB db = getMongo().getDB(this.databaseName); 237 if (this.mongoConfiguration.isEnableAuth()) { 238 String username = this.mongoConfiguration.getUsername(); 239 String password = this.mongoConfiguration.getPassword(); 240 if (!db.authenticate(username, password.toCharArray())) { 241 throw new RuntimeException("authenticate failure with the username:" + username + ",pwd:" + password); 242 } 243 } 244 return db.getCollection(collectionName); 245 } 246 247 protected void closeConnection() { 248 try { 249 if (null != this.mongo) { 250 this.mongo.close(); 251 this.mongo = null; 252 } 253 } catch (Exception e) { 254 LOG.debug("Exception on close", e); 255 } 256 } 257 }
自定義RecordReader

package com.gerry.mongo.hadoop2x.mr.mongodb.lib; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.ReflectionUtils; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; public class MongoDBRecordReader<T extends MongoDBWritable> extends RecordReader<LongWritable, T> { private Class<? extends MongoDBWritable> valueClass; private LongWritable key; private T value; private long pos; private Configuration conf; private MongoDBInputFormat.MongoDBInputSplit split; private DBCollection collection; private DBObject conditionQuery; private DBObject fieldQuery; private DBCursor cursor; public MongoDBRecordReader(MongoDBInputFormat.MongoDBInputSplit split, Class<? extends MongoDBWritable> valueClass, Configuration conf, DBCollection collection, DBObject conditionQuery, DBObject fieldQuery) { this.split = split; this.valueClass = valueClass; this.collection = collection; this.conditionQuery = conditionQuery; this.fieldQuery = fieldQuery; this.conf = conf; } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // do nothing } @SuppressWarnings("unchecked") @Override public boolean nextKeyValue() throws IOException, InterruptedException { try { if (key == null) { key = new LongWritable(); } if (value == null) { value = (T) ReflectionUtils.newInstance(valueClass, conf); } if (null == cursor) { cursor = executeQuery(); } if (!cursor.hasNext()) { return false; } key.set(pos + split.getStart()); // 設置key value.readFields(cursor.next()); // 設置value pos++; } catch (Exception e) { throw new IOException("Exception in nextKeyValue", e); } return true; } protected DBCursor executeQuery() { try { return collection.find(conditionQuery, fieldQuery).skip((int) split.getStart()).limit((int) split.getLength()); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return this.key; } @Override public T getCurrentValue() throws IOException, InterruptedException { return this.value; } @Override public float getProgress() throws IOException, InterruptedException { return pos; } @Override public void close() throws IOException { if (collection != null) { collection.getDB().getMongo().close(); } } }
自定義OutputFormat&RecordWriter

package com.gerry.mongo.hadoop2x.mr.mongodb.lib; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.Mongo; public class MongoDBOutputFormat<K extends MongoDBWritable, V extends MongoDBWritable> extends OutputFormat<K, V> { private static Logger LOG = Logger.getLogger(MongoDBOutputFormat.class); /** * A RecordWriter that writes the reduce output to a MongoDB collection * * @param <K> * @param <T> */ public static class MongoDBRecordWriter<K extends MongoDBWritable, V extends MongoDBWritable> extends RecordWriter<K, V> { private Mongo mongo; private String databaseName; private String collectionName; private MongoDBConfiguration dbConf; private DBCollection dbCollection; private DBObject dbObject; private boolean enableFetchMethod; public MongoDBRecordWriter(MongoDBConfiguration dbConf, Mongo mongo, String databaseName, String collectionName) { this.mongo = mongo; this.databaseName = databaseName; this.collectionName = collectionName; this.dbConf = dbConf; this.enableFetchMethod = this.dbConf.isEnableUseFetchMethod(); getDbCollection();// 創建連接 } protected DBCollection getDbCollection() { if (null == this.dbCollection) { DB db = this.mongo.getDB(this.databaseName); if (this.dbConf.isEnableAuth()) { String username = this.dbConf.getUsername(); String password = this.dbConf.getPassword(); if (!db.authenticate(username, password.toCharArray())) { throw new RuntimeException("authenticate failure, the username:" + username + ", pwd:" + password); } } this.dbCollection = db.getCollection(this.collectionName); } return this.dbCollection; } @Override public void write(K key, V value) throws IOException, InterruptedException { if (this.enableFetchMethod) { this.dbObject = key.fetchWriteDBObject(null); this.dbObject = value.fetchWriteDBObject(this.dbObject); // 寫數據 this.dbCollection.insert(this.dbObject);// 在這里可以做一個緩存,一起提交,如果數據量大的情況下。 this.dbObject = null; } else { // 直接調用寫方法 key.write(dbCollection); value.write(dbCollection); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (this.mongo != null) { this.dbCollection = null; this.mongo.close(); } } } @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { try { MongoDBConfiguration dbConf = new MongoDBConfiguration(context.getConfiguration()); String databaseName = dbConf.getOutputDatabaseName(); String collectionName = dbConf.getOutputCollectionName(); Mongo mongo = dbConf.getMongoConnection(); return new MongoDBRecordWriter<K, V>(dbConf, mongo, databaseName, collectionName); } catch (Exception e) { LOG.error("Create the record writer occur exception.", e); throw new IOException(e); } } @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { // 不進行檢測 } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { // 由於outputcommitter主要作用是提交jar,分配jar的功能。所以我們這里直接使用FileOutputCommitter return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context); } /** * 設置output屬性 * * @param job * @param databaseName * @param collectionName */ public static void setOutput(Job job, String databaseName, String collectionName) { job.setOutputFormatClass(MongoDBOutputFormat.class); job.setReduceSpeculativeExecution(false); MongoDBConfiguration mdc = new MongoDBConfiguration(job.getConfiguration()); mdc.setOutputCollectionName(collectionName); mdc.setOutputDatabaseName(databaseName); } /** * 靜止使用fetch方法 * * @param conf */ public static void disableFetchMethod(Configuration conf) { conf.setBoolean(MongoDBConfiguration.OUTPUT_USE_FETCH_METHOD_PROPERTY, false); } }
其他涉及到的java代碼

package com.gerry.mongo.hadoop2x.mr.mongodb.lib; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat.NullMongoDBWritable; import com.mongodb.Mongo; import com.mongodb.ServerAddress; public class MongoDBConfiguration { public static final String BIND_HOST_PROPERTY = "mapreduce.mongo.host"; public static final String BIND_PORT_PROPERTY = "mapreduce.mongo.port"; public static final String AUTH_ENABLE_PROPERTY = "mapreduce.mongo.auth.enable"; public static final String USERNAME_PROPERTY = "mapreduce.mongo.username"; public static final String PASSWORD_PROPERTY = "mapreduce.mongo.password"; public static final String PARTITION_PROPERTY = "mapreduce.mongo.partition"; public static final String INPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.input.database.name"; public static final String INPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.input.collection.name"; public static final String INPUT_FIELD_NAMES_PROPERTY = "mapreduce.mongo.input.field.names"; public static final String INPUT_CONDITIONS_PROPERTY = "mapreduce.mongo.input.conditions"; public static final String INPUT_CLASS_PROPERTY = "mapreduce.mongo.input.class"; public static final String OUTPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.output.database.name"; public static final String OUTPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.output.collection.name"; // 在recordwriter中到底是否調用fetch方法,默認調用。如果設置為不調用,那么就直接使用writer方法 public static final String OUTPUT_USE_FETCH_METHOD_PROPERTY = "mapreduce.mongo.output.use.fetch.method"; private Configuration conf; public MongoDBConfiguration(Configuration conf) { this.conf = conf; } /** * 獲取Configuration對象 * * @return */ public Configuration getConfiguration() { return this.conf; } /** * 設置連接信息 * * @param host * @param port * @return */ public MongoDBConfiguration configureDB(String host, int port) { return this.configureDB(host, port, false, null, null); } /** * 設置連接信息 * * @param host * @param port * @param enableAuth * @param username * @param password * @return */ public MongoDBConfiguration configureDB(String host, int port, boolean enableAuth, String username, String password) { this.conf.set(BIND_HOST_PROPERTY, host); this.conf.setInt(BIND_PORT_PROPERTY, port); if (enableAuth) { this.conf.setBoolean(AUTH_ENABLE_PROPERTY, true); this.conf.set(USERNAME_PROPERTY, username); this.conf.set(PASSWORD_PROPERTY, password); } return this; } /** * 獲取MongoDB的連接對象Connection對象 * * @return * @throws UnknownHostException */ public Mongo getMongoConnection() throws UnknownHostException { return new Mongo(new ServerAddress(this.getBindHost(), this.getBindPort())); } /** * 獲取設置的host * * @return */ public String getBindHost() { return this.conf.get(BIND_HOST_PROPERTY, "localhost"); } /** * 獲取設置的port * * @return */ public int getBindPort() { return this.conf.getInt(BIND_PORT_PROPERTY, 27017); } /** * 獲取是否開啟安全驗證,默認的Mongodb是不開啟的。 * * @return */ public boolean isEnableAuth() { return this.conf.getBoolean(AUTH_ENABLE_PROPERTY, false); } /** * 獲取完全驗證所需要的用戶名 * * @return */ public String getUsername() { return this.conf.get(USERNAME_PROPERTY); } /** * 獲取安全驗證所需要的密碼 * * @return */ public String getPassword() { return this.conf.get(PASSWORD_PROPERTY); } public String getPartition() { return conf.get(PARTITION_PROPERTY, "|"); } public MongoDBConfiguration setPartition(String partition) { conf.set(PARTITION_PROPERTY, partition); return this; } public String getInputDatabaseName() { return conf.get(INPUT_DATABASE_NAME_PROPERTY, "test"); } public MongoDBConfiguration setInputDatabaseName(String databaseName) { conf.set(INPUT_DATABASE_NAME_PROPERTY, databaseName); return this; } public String getInputCollectionName() { return conf.get(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "test"); } public void setInputCollectionName(String tableName) { conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, tableName); } public String[] getInputFieldNames() { return conf.getStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY); } public void setInputFieldNames(String... fieldNames) { conf.setStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames); } public Map<String, String> getInputConditions() { Map<String, String> result = new HashMap<String, String>(); String[] conditions = conf.getStrings(INPUT_CONDITIONS_PROPERTY); if (conditions != null && conditions.length > 0) { String partition = this.getPartition(); String[] values = null; for (String condition : conditions) { values = condition.split(partition); if (values != null && values.length == 2) { result.put(values[0], values[1]); } else { result.put(condition, null); } } } return result; } public void setInputConditions(Map<String, String> conditions) { if (conditions != null && conditions.size() > 0) { String[] values = new String[conditions.size()]; String partition = this.getPartition(); int k = 0; for (Map.Entry<String, String> entry : conditions.entrySet()) { if (entry.getValue() != null) { values[k++] = entry.getKey() + partition + entry.getValue(); } else { values[k++] = entry.getKey(); } } conf.setStrings(INPUT_CONDITIONS_PROPERTY, values); } } public Class<? extends MongoDBWritable> getValueClass() { return conf.getClass(INPUT_CLASS_PROPERTY, NullMongoDBWritable.class, MongoDBWritable.class); } public void setInputClass(Class<? extends DBWritable> inputClass) { conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class); } public String getOutputDatabaseName() { return conf.get(OUTPUT_DATABASE_NAME_PROPERTY, "test"); } public MongoDBConfiguration setOutputDatabaseName(String databaseName) { conf.set(OUTPUT_DATABASE_NAME_PROPERTY, databaseName); return this; } public String getOutputCollectionName() { return conf.get(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, "test"); } public void setOutputCollectionName(String tableName) { conf.set(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, tableName); } public boolean isEnableUseFetchMethod() { return conf.getBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, true); } public void setOutputUseFetchMethod(boolean useFetchMethod) { conf.setBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, useFetchMethod); } }

package com.gerry.mongo.hadoop2x.mr.mongodb.lib; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoException; public interface MongoDBWritable { /** * 往mongodb的集合中寫數據 * * @param collection * @throws MongoException */ public void write(DBCollection collection) throws MongoException; /** * 獲取要寫的mongoDB對象 * * @param old * @return * @throws MongoException */ public DBObject fetchWriteDBObject(DBObject old) throws MongoException; /** * 從mongodb的集合中讀數據 * * @param collection * @throws MongoException */ public void readFields(DBObject object) throws MongoException; }

package com.gerry.mongo.hadoop2x.mr.mongodb.nw; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Date; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBConfiguration; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBOutputFormat; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBWritable; import com.mongodb.BasicDBObject; import com.mongodb.BasicDBObjectBuilder; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoException; public class Demo { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 設置輸入的mongodb的數據庫和集合,以及對應的輸入對象value,這里的數據庫和集合要求存在,否則是沒有數據的,當然沒有數據不會出問題 conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "users"); conf.set(MongoDBConfiguration.INPUT_DATABASE_NAME_PROPERTY, "db_java"); conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, DemoInputValueAndOutputKey.class, MongoDBWritable.class); Job job = Job.getInstance(conf, "mongodb-demo"); job.setJarByClass(Demo.class); job.setMapperClass(DemoMapper.class); job.setReducerClass(DemoReducer.class); job.setOutputKeyClass(DemoInputValueAndOutputKey.class); job.setOutputValueClass(DemoOutputValue.class); job.setMapOutputKeyClass(DemoInputValueAndOutputKey.class); job.setMapOutputValueClass(NullWritable.class); job.setInputFormatClass(MongoDBInputFormat.class); MongoDBOutputFormat.setOutput(job, "foobar2", "users"); // 這個可以不存在 job.waitForCompletion(true); } public static class DemoOutputValue implements Writable, MongoDBWritable { private Date clientTime; private long count; @Override public void write(DBCollection collection) throws MongoException { throw new UnsupportedOperationException(); } @Override public DBObject fetchWriteDBObject(DBObject old) throws MongoException { BasicDBObjectBuilder builder = null; Set<String> keys = new HashSet<String>(); if (old != null) { keys = old.keySet(); builder = BasicDBObjectBuilder.start(old.toMap()); } else { builder = new BasicDBObjectBuilder(); } // 添加當前對象的value值,如果存在同樣的key,那么加序號 builder.append(getKey(keys, "time", 0), clientTime).append(getKey(keys, "count", 0), this.count); return builder.get(); } @Override public void readFields(DBObject object) throws MongoException { throw new UnsupportedOperationException(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.clientTime.getTime()); out.writeLong(this.count); } @Override public void readFields(DataInput in) throws IOException { this.clientTime = new Date(in.readLong()); this.count = in.readLong(); } public Date getClientTime() { return clientTime; } public void setClientTime(Date clientTime) { this.clientTime = clientTime; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } } public static class DemoInputValueAndOutputKey implements MongoDBWritable, WritableComparable<DemoInputValueAndOutputKey> { private String name; private Integer age; private String sex; @Override public void write(DataOutput out) throws IOException { if (this.name == null) { out.writeBoolean(false); } else { out.writeBoolean(true); out.writeUTF(this.name); } if (this.age == null) { out.writeBoolean(false); } else { out.writeBoolean(true); out.writeInt(this.age); } if (this.sex == null) { out.writeBoolean(false); } else { out.writeBoolean(true); out.writeUTF(this.sex); } } @Override public void readFields(DataInput in) throws IOException { this.name = in.readBoolean() ? in.readUTF() : null; this.age = in.readBoolean() ? Integer.valueOf(in.readInt()) : null; this.sex = in.readBoolean() ? in.readUTF() : null; } @Override public void write(DBCollection collection) throws MongoException { DBObject object = new BasicDBObject(); object.put("name", this.name); object.put("age", this.age.intValue()); object.put("sex", this.sex); collection.insert(object); } @Override public void readFields(DBObject object) throws MongoException { this.name = (String) object.get("name"); this.age = (Integer) object.get("age"); this.sex = (String) object.get("sex"); } @Override public DBObject fetchWriteDBObject(DBObject old) throws MongoException { BasicDBObjectBuilder builder = null; Set<String> keys = new HashSet<String>(); if (old != null) { keys = old.keySet(); builder = BasicDBObjectBuilder.start(old.toMap()); } else { builder = new BasicDBObjectBuilder(); } // 添加當前對象的value值,如果存在同樣的key,那么加序號 if (this.name != null) { builder.append(getKey(keys, "name", 0), this.name); } if (this.age != null) { builder.append(getKey(keys, "age", 0), this.age.intValue()); } if (this.sex != null) { builder.append(getKey(keys, "sex", 0), this.sex); } return builder.get(); } @Override public String toString() { return "DemoInputValue [name=" + name + ", age=" + age + ", sex=" + sex + "]"; } @Override public int compareTo(DemoInputValueAndOutputKey o) { int tmp; if (this.name == null) { if (o.name != null) { return -1; } } else if (o.name == null) { return 1; } else { tmp = this.name.compareTo(o.name); if (tmp != 0) { return tmp; } } if (this.age == null) { if (o.age != null) { return -1; } } else if (o.age == null) { return 1; } else { tmp = this.age - o.age; if (tmp != 0) { return tmp; } } if (this.sex == null) { if (o.sex != null) { return -1; } } else if (o.sex == null) { return 1; } else { return this.sex.compareTo(o.sex); } return 0; } } /** * 直接輸出 * * @author jsliuming * */ public static class DemoMapper extends Mapper<LongWritable, DemoInputValueAndOutputKey, DemoInputValueAndOutputKey, NullWritable> { @Override protected void map(LongWritable key, DemoInputValueAndOutputKey value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } } /** * 寫出數據,只做一個統計操作 * * @author jsliuming * */ public static class DemoReducer extends Reducer<DemoInputValueAndOutputKey, NullWritable, DemoInputValueAndOutputKey, DemoOutputValue> { private DemoOutputValue outputValue = new DemoOutputValue(); @Override protected void reduce(DemoInputValueAndOutputKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (@SuppressWarnings("unused") NullWritable value : values) { sum++; } outputValue.setClientTime(new Date()); outputValue.setCount(sum); context.write(key, outputValue); } } /** * 轉換key,作用是當key存在keys集合中的時候,在key后面添加序號 * * @param keys * @param key * @param index * @return */ public static String getKey(Set<String> keys, String key, int index) { while (keys.contains(key)) { key = key + (index++); } return key; } }
四、結果截圖