1. 背景
近日項目要求基於爬取的影視評論信息,抽取影視的關鍵字信息。考慮到影視評論數據量較大,因此采用Spark處理框架。關鍵詞提取的處理主要包含分詞+算法抽取兩部分。目前分詞工具包較為主流的,包括哈工大的LTP以及HanLP,而關鍵詞的抽取算法較多,包括TF-IDF、TextRank、互信息等。本次任務主要基於LTP、HanLP、Ac雙數組進行分詞,采用TextRank、互信息以及TF-IDF結合的方式進行關鍵詞抽取。
說明:本項目剛開始接觸,因此效果層面需迭代調優。
2. 技術選型
(1) 詞典
1) 基於HanLP項目提供的詞典數據,具體可參見HanLP的github。
2) 考慮到影視的垂直領域特性,引入騰訊的嵌入的漢語詞,參考該地址。
(2) 分詞
1) LTP分詞服務:基於Docker Swarm部署多副本集服務,通過HTTP協議請求,獲取分詞結果(部署方法可百度); 也可以直接在本地加載,放在內存中調用,效率更高(未嘗試)
2) AC雙數組:基於AC雙數組,采用最長匹配串,采用HanLP中的AC雙數組分詞器
(3) 抽取
1) 經典的TF-IDF:基於詞頻統計實現
2) TextRank:借鑒於PageRank算法,基於HanLP提供的接口
3) 互信息:基於HanLP提供的接口
3. 實現代碼
(1) 代碼結構
1) 代碼將分詞服務進行函數封裝,基於不同的名稱,執行名稱指定的分詞
2) TextRank、互信息、LTP、AC雙數組等提取出分詞或短語,最后均通過TF-IDF進行統計計算
(2) 整體代碼
1) 主體代碼:細節層面與下載的原始評論數據結構有關,因此無需過多關注,只需關注下主體流程即可

1 2 def extractFilmKeyWords(algorithm: String): Unit ={ 3 // 測試 4 println(HanLPSpliter.getInstance.seg("如何看待《戰狼2》中的愛國情懷?")) 5 6 val sc = new SparkContext(new SparkConf().setAppName("extractFileKeyWords").set("spark.driver.maxResultSize", "3g")) 7 8 val baseDir = "/work/ws/video/parse/key_word" 9 10 import scala.collection.JavaConversions._ 11 def extractComments(sc: SparkContext, inputInfo: (String, String)): RDD[(String, List[String])] = { 12 sc.textFile(s"$baseDir/data/${inputInfo._2}") 13 .map(data => { 14 val json = JSONObjectEx.fromObject(data.trim) 15 if(null == json) ("", List()) 16 else{ 17 val id = json.getStringByKeys("_id") 18 val comments: List[String] = json.getArrayInfo("comments", "review").toList 19 val reviews: List[String] = json.getArrayInfo("reviews", "review").toList 20 val titles: List[String] = json.getArrayInfo("reviews", "title").toList 21 val texts = (comments ::: reviews ::: titles).filter(f => !CleanUtils.isEmpty(f)) 22 (IdBuilder.getSourceKey(inputInfo._1, id), texts) 23 } 24 }) 25 } 26 27 // 廣播停用詞 28 val filterWordRdd = sc.broadcast(sc.textFile(s"$baseDir/data/stopwords.txt").map(_.trim).distinct().collect().toList) 29 30 def formatOutput(infos: List[(Int, String)]): String ={ 31 infos.map(info => { 32 val json = new JSONObject() 33 json.put("status", info._1) 34 try{ 35 json.put("res", info._2) 36 } catch { 37 case _ => json.put("res", "[]") 38 } 39 json.toString.replaceAll("[\\s]+", "") 40 }).mkString(" | ") 41 } 42 43 def genContArray(words: List[String]): JSONArray ={ 44 val arr = new JSONArray() 45 words.map(f => { 46 val json = new JSONObject() 47 json.put("cont", f) 48 arr.put(json) 49 }) 50 arr 51 } 52 53 // 基於LTP分詞服務 54 def splitWordByLTP(texts: List[String]): List[(Int, String)] ={ 55 texts.map(f => { 56 val url = "http://dev.content_ltp.research.com/ltp" 57 val params = new util.HashMap[String, String]() 58 params.put("s", f) 59 params.put("f", "json") 60 params.put("t", "ner") 61 // 調用LTP分詞服務 62 val result = HttpPostUtil.httpPostRetry(url, params).replaceAll("[\\s]+", "") 63 if (CleanUtils.isEmpty(result)) (0, f) else { 64 val resultArr = new JSONArray() 65 66 val jsonArr = try { JSONArray.fromString(result) } catch { case _ => null} 67 if (null != jsonArr && 0 < jsonArr.length()) { 68 for (i <- 0 until jsonArr.getJSONArray(0).length()) { 69 val subJsonArr = jsonArr.getJSONArray(0).getJSONArray(i) 70 for (j <- 0 until subJsonArr.length()) { 71 val subJson = subJsonArr.getJSONObject(j) 72 if(!filterWordRdd.value.contains(subJson.getString("cont"))){ 73 resultArr.put(subJson) 74 } 75 } 76 } 77 } 78 if(resultArr.length() > 0) (1, resultArr.toString) else (0, f) 79 } 80 }) 81 } 82 83 // 基於AC雙數組搭建的分詞服務 84 def splitWordByAcDoubleTreeServer(texts: List[String]): List[(Int, String)] ={ 85 texts.map(f => { 86 val splitResults = SplitQueryHelper.splitQueryText(f) 87 .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList 88 if (0 == splitResults.size) (0, f) else (1, genContArray(splitResults).toString) 89 }) 90 } 91 92 // 內存加載AC雙數組 93 def splitWordByAcDoubleTree(texts: List[String]): List[(Int, String)] ={ 94 texts.map(f => { 95 val splitResults = HanLPSpliter.getInstance().seg(f) 96 .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList 97 if (0 == splitResults.size) (0, f) else (1, genContArray(splitResults).toString) 98 }) 99 } 100 101 // TextRank 102 def splitWordByTextRank(texts: List[String]): List[(Int, String)] ={ 103 texts.map(f => { 104 val splitResults = HanLP.extractKeyword(f, 100) 105 .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList 106 if (0 == splitResults.size) (0, f) else { 107 val arr = genContArray(splitResults) 108 if(0 == arr.length()) (0, f) else (1, arr.toString) 109 } 110 }) 111 } 112 113 // 互信息 114 def splitWordByMutualInfo(texts: List[String]): List[(Int, String)] ={ 115 texts.map(f => { 116 val splitResults = HanLP.extractPhrase(f, 50) 117 .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList 118 if (0 == splitResults.size) (0, f) else { 119 val arr = genContArray(splitResults) 120 if(0 == arr.length()) (0, f) else (1, arr.toString) 121 } 122 }) 123 } 124 125 // 提取分詞信息 126 val unionInputRdd = sc.union( 127 extractComments(sc, SourceType.DB -> "db_review.json"), 128 extractComments(sc, SourceType.MY -> "my_review.json"), 129 extractComments(sc, SourceType.MT -> "mt_review.json")) 130 .filter(_._2.nonEmpty) 131 132 unionInputRdd.cache() 133 134 unionInputRdd.map(data => { 135 val splitResults = algorithm match { 136 case "ltp" => splitWordByLTP(data._2) 137 case "acServer" => splitWordByAcDoubleTreeServer(data._2) 138 case "ac" => splitWordByAcDoubleTree(data._2) 139 case "textRank" => splitWordByTextRank(data._2) 140 case "mutualInfo" => splitWordByMutualInfo(data._2) 141 } 142 143 val output = formatOutput(splitResults) 144 s"${data._1}\t$output" 145 }).saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/wordSplit/$algorithm")) 146 147 val splitRDD = sc.textFile(s"$baseDir/result/wordSplit/$algorithm/part*", 30) 148 .flatMap(data => { 149 if(data.split("\\t").length < 2) None 150 else{ 151 val sourceKey = data.split("\\t")(0) 152 val words = data.split("\\t")(1).split(" \\| ").flatMap(f => { 153 val json = JSONObjectEx.fromObject(f.trim) 154 if (null != json && "1".equals(json.getStringByKeys("status"))) { 155 val jsonArr = try { JSONArray.fromString(json.getStringByKeys("res")) } catch { case _ => null } 156 var result: List[(String, String)] = List() 157 if (jsonArr != null) { 158 for (j <- 0 until jsonArr.length()) { 159 val json = jsonArr.getJSONObject(j) 160 val cont = json.getString("cont") 161 result ::= (cont, cont) 162 } 163 } 164 result.reverse 165 } else None 166 }).toList 167 Some((sourceKey, words)) 168 } 169 }).filter(_._2.nonEmpty) 170 171 splitRDD.cache() 172 173 val totalFilms = splitRDD.count() 174 175 val idfRdd = splitRDD.flatMap(result => { 176 result._2.map(_._1).distinct.map((_, 1)) 177 }).groupByKey().filter(f => f._2.size > 1).map(f => (f._1, Math.log(totalFilms * 1.0 / (f._2.sum + 1)))) 178 179 idfRdd.cache() 180 idfRdd.map(f => s"${f._1}\t${f._2}").saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/idf/$algorithm")) 181 182 val idfMap = sc.broadcast(idfRdd.collectAsMap()) 183 // 計算TF 184 val tfRdd = splitRDD.map(result => { 185 val totalWords = result._2.size 186 val keyWords = result._2.groupBy(_._1) 187 .map(f => { 188 val word = f._1 189 val tf = f._2.size * 1.0 / totalWords 190 (tf * idfMap.value.getOrElse(word, 0D), word) 191 }).toList.sortBy(_._1).reverse.filter(_._2.trim.length > 1).take(50) 192 (result._1, keyWords) 193 }) 194 195 tfRdd.cache() 196 tfRdd.map(f => { 197 val json = new JSONObject() 198 json.put("_id", f._1) 199 200 val arr = new JSONArray() 201 for (keyWord <- f._2) { 202 val subJson = new JSONObject() 203 subJson.put("score", keyWord._1) 204 subJson.put("word", keyWord._2) 205 arr.put(subJson) 206 } 207 json.put("keyWords", arr) 208 json.toString 209 }).saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/keyword/$algorithm/withScore")) 210 211 tfRdd.map(f => s"${f._1}\t${f._2.map(_._2).toList.mkString(",")}") 212 .saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/keyword/$algorithm/noScore")) 213 214 tfRdd.unpersist() 215 216 splitRDD.unpersist() 217 idfMap.unpersist() 218 idfRdd.unpersist() 219 220 unionInputRdd.unpersist() 221 filterWordRdd.unpersist() 222 sc.stop() 223 }
2) 基於HanLP提供的AC雙數組封裝

1 2 import com.google.common.collect.Lists; 3 import com.hankcs.hanlp.HanLP; 4 import com.hankcs.hanlp.seg.Segment; 5 import com.hankcs.hanlp.seg.common.Term; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import java.io.Serializable; 10 import java.util.List; 11 12 public class HanLPSpliter implements Serializable{ 13 private static Logger logger = LoggerFactory.getLogger(Act.class); 14 15 private static HanLPSpliter instance = null; 16 17 private static Segment segment = null; 18 19 private static final String PATH = "conf/tencent_word_act.txt"; 20 21 public static HanLPSpliter getInstance() { 22 if(null == instance){ 23 instance = new HanLPSpliter(); 24 } 25 return instance; 26 } 27 28 public HanLPSpliter(){ 29 this.init(); 30 } 31 32 public void init(){ 33 initSegment(); 34 } 35 36 public void initSegment(){ 37 if(null == segment){ 38 addDict(); 39 HanLP.Config.IOAdapter = new HadoopFileIOAdapter(); 40 segment = HanLP.newSegment("dat"); 41 segment.enablePartOfSpeechTagging(true); 42 segment.enableCustomDictionaryForcing(true); 43 } 44 } 45 46 public List<String> seg(String text){ 47 if(null == segment){ 48 initSegment(); 49 } 50 51 List<Term> terms = segment.seg(text); 52 List<String> results = Lists.newArrayList(); 53 for(Term term : terms){ 54 results.add(term.word); 55 } 56 return results; 57 } 58 }
3) HanLP加載HDFS中的自定義詞典

1 import com.hankcs.hanlp.corpus.io.IIOAdapter; 2 import org.apache.hadoop.conf.Configuration; 3 import org.apache.hadoop.fs.FileSystem; 4 import org.apache.hadoop.fs.Path; 5 6 import java.io.IOException; 7 import java.io.InputStream; 8 import java.io.OutputStream; 9 import java.net.URI; 10 11 public class HadoopFileIOAdapter implements IIOAdapter{ 12 @Override 13 public InputStream open(String path) throws IOException { 14 Configuration conf = new Configuration(); 15 FileSystem fs = FileSystem.get(URI.create(path), conf); 16 return fs.open(new Path(path)); 17 } 18 19 @Override 20 public OutputStream create(String path) throws IOException { 21 Configuration conf = new Configuration(); 22 FileSystem fs = FileSystem.get(URI.create(path), conf); 23 OutputStream out = fs.create(new Path(path)); 24 return out; 25 } 26 }
4. 采坑總結
(1) Spark中實現HanLP自定義詞典的加載
由於引入騰訊的嵌入詞,因此使用HanLP的自定義詞典功能,參考的方法如下:
a. 《基於hanLP的中文分詞詳解-MapReduce實現&自定義詞典文件》,該方法適用於自定義詞典的數量較少的情況,如果詞典量較大,如騰訊嵌入詞820W+,理論上jar包較為臃腫
b. 《Spark中使用HanLP分詞》,該方法的好處在於無需手工構件詞典的bin文件,操作簡單
切記:如果想讓自定義詞典生效,需先將data/dictionary/custom中的bin文件刪除。通過HanLP源碼得知,如果存在bin文件,則直接加載該bin文件,否則會將custom中用戶自定義的詞典重新加載,在指定的環境中(如本地或HDFS)中自動生成bin文件。
騰訊820W詞典,基於HanLP生成bin文件的時間大概為30分鍾。
(2) Spark異常
Spark執行過程中的異常信息:
1) 異常1
a. 異常信息:
Job aborted due to stage failure: Total size of serialized results of 3979 tasks (1024.2 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
b. 解決:通過設置spark.driver.maxResultSize=4G,參考:《Spark排錯與優化》
2) 異常2
a. 異常信息:java.lang.OutOfMemoryError: Java heap space
b. 解決:參考https://blog.csdn.net/guohecang/article/details/52088117
如有問題,請留言回復!