使用hadoop mapreduce分析mongodb數據:(1)


最近考慮使用hadoop mapreduce來分析mongodb上的數據,從網上找了一些demo,東拼西湊,終於運行了一個demo,下面把過程展示給大家

環境

  • ubuntu 14.04 64bit
  • hadoop 2.6.4
  • mongodb 2.4.9
  • Java 1.8
  • mongo-hadoop-core-1.5.2.jar
  • mongo-java-driver-3.0.4.jar

mongo-hadoop-core-1.5.2.jar以及mongo-java-driver-3.0.4.jar的下載和配置

  • 編譯mongo-hadoop-core-1.5.2.jar
  • $ git clone https://github.com/mongodb/mongo-hadoop
    $ cd mongo-hadoop
    $ ./gradlew jar
    • 編譯時間比較長,成功編譯之后mongo-hadoop-core-1.5.2.jar存在的路徑是core/build/libs
  • 下載mongo-java-driver-3.0.4.jar
  • http://central.maven.org/maven2/org/mongodb/mongo-java-driver/3.0.4/
    選擇
    mongo-java-driver-3.0.4.jar

數據

  • 數據樣例
  • > db.in.find({})
    { "_id" : ObjectId("5758db95ab12e17a067fbb6f"), "x" : "hello world" }
    { "_id" : ObjectId("5758db95ab12e17a067fbb70"), "x" : "nice to meet you" }
    { "_id" : ObjectId("5758db95ab12e17a067fbb71"), "x" : "good to see you" }
    { "_id" : ObjectId("5758db95ab12e17a067fbb72"), "x" : "world war 2" }
    { "_id" : ObjectId("5758db95ab12e17a067fbb73"), "x" : "see you again" }
    { "_id" : ObjectId("5758db95ab12e17a067fbb74"), "x" : "bye bye" }
  • 最后的結果
  • > db.out.find({})
    { "_id" : "2", "value" : 1 }
    { "_id" : "again", "value" : 1 }
    { "_id" : "bye", "value" : 2 }
    { "_id" : "good", "value" : 1 }
    { "_id" : "hello", "value" : 1 }
    { "_id" : "meet", "value" : 1 }
    { "_id" : "nice", "value" : 1 }
    { "_id" : "see", "value" : 2 }
    { "_id" : "to", "value" : 2 }
    { "_id" : "war", "value" : 1 }
    { "_id" : "world", "value" : 2 }
    { "_id" : "you", "value" : 3 }
  • 目標是統計每個文檔中出現的詞頻,並且把單詞作為key,詞頻作為value存在mongodb中

Hadoop mapreduce代碼

  • Mapreduce 代碼
     1 import java.util.*; 
     2 import java.io.*;
     3 
     4 import org.bson.*;
     5 
     6 import com.mongodb.hadoop.MongoInputFormat;
     7 import com.mongodb.hadoop.MongoOutputFormat;
     8 
     9 import org.apache.hadoop.conf.Configuration;
    10 import org.apache.hadoop.io.*;
    11 import org.apache.hadoop.mapreduce.*;
    12 
    13 
    14 public class WordCount {
    15     public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, IntWritable> {
    16         private final static IntWritable one = new IntWritable(1);
    17         private Text word = new Text();
    18         public void map(Object key, BSONObject value, Context context ) 
    19                 throws IOException, InterruptedException {
    20             System.out.println( "key: " + key );
    21             System.out.println( "value: " + value );
    22             StringTokenizer itr = new StringTokenizer(value.get( "x" ).toString());
    23             while (itr.hasMoreTokens()) {
    24                 word.set(itr.nextToken());
    25                 context.write(word, one);
    26             }
    27         }
    28     }
    29     public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    30         private IntWritable result = new IntWritable();
    31         public void reduce(Text key, Iterable<IntWritable> values, Context context )
    32             throws IOException, InterruptedException {
    33             int sum = 0;
    34             for (IntWritable val : values) {
    35                 sum += val.get();
    36             }
    37             result.set(sum);
    38             context.write(key, result);
    39         }
    40     }
    41     public static void main(String[] args) throws Exception {
    42         Configuration conf = new Configuration();
    43         conf.set( "mongo.input.uri" , "mongodb://localhost/testmr.in" );
    44         conf.set( "mongo.output.uri" , "mongodb://localhost/testmr.out" );
    45         @SuppressWarnings("deprecation")
    46         Job job = new Job(conf, "word count");
    47         job.setJarByClass(WordCount.class);
    48         job.setMapperClass(TokenizerMapper.class);
    49         job.setCombinerClass(IntSumReducer.class);
    50         job.setReducerClass(IntSumReducer.class);
    51         job.setOutputKeyClass(Text.class);
    52         job.setOutputValueClass(IntWritable.class);
    53         job.setInputFormatClass( MongoInputFormat.class );
    54         job.setOutputFormatClass( MongoOutputFormat.class );
    55         System.exit(job.waitForCompletion(true) ? 0 : 1);
    56     }
    57 }
    • 注意:設置mongo.input.uri和mongo.output.uri
      1 conf.set( "mongo.input.uri" , "mongodb://localhost/testmr.in" );
      2 conf.set( "mongo.output.uri" , "mongodb://localhost/testmr.out" );
  • 編譯
    • 編譯
      $ hadoop com.sun.tools.javac.Main WordCount.java -Xlint:deprecation
    • 編譯jar包
      $ jar cf wc.jar WordCount*.class
  • 運行
    • 啟動hadoop,運行mapreduce代碼必須啟動hadoop
      $ start-all.sh
    • 運行程序
    • $ hadoop jar  wc.jar WordCount
  • 查看結果
  • $ mongo
    MongoDB shell version: 2.4.9
    connecting to: test
    > use testmr;
    switched to db testmr
    > db.out.find({})
    { "_id" : "2", "value" : 1 }
    { "_id" : "again", "value" : 1 }
    { "_id" : "bye", "value" : 2 }
    { "_id" : "good", "value" : 1 }
    { "_id" : "hello", "value" : 1 }
    { "_id" : "meet", "value" : 1 }
    { "_id" : "nice", "value" : 1 }
    { "_id" : "see", "value" : 2 }
    { "_id" : "to", "value" : 2 }
    { "_id" : "war", "value" : 1 }
    { "_id" : "world", "value" : 2 }
    { "_id" : "you", "value" : 3 }
    > 

     

以上是一個簡單的例子,接下來我要用hadoop mapreduce處理mongodb中的更加復雜的數據。敬請期待,如果有疑問,請在留言區提出 ^_^

 

參考資料以及文檔

  1. The elephant in the room mongo db + hadoop
  2. http://chenhua-1984.iteye.com/blog/2162576
  3. http://api.mongodb.com/java/2.12/com/mongodb/MongoURI.html
  4. http://stackoverflow.com/questions/27020075/mongo-hadoop-connector-issue

如果The elephant in the room mongo db + hadoop打不開,請到我的github下載ppt


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM