客戶需求如下,nginx的訪問日志中ip,匹配出對應的國家,省份和城市,然后給我了一個maxmind的連接參考。
查找資料,有做成hive udf的使用方式, 我們項目中一直使用 waterdrop 來做數據處理,所以決定開發一個 waterdrop的插件。
關於這個功能,waterdrop本身提供有兩個商用組件,geopip2(也是使用maxmind) 另一個是國內的 ipipnet。
如果有人不懂 waterdrop,可以參考 https://interestinglab.github.io/waterdrop/#/zh-cn/quick-start
開發使用 scala語言,開發完畢后,使用 mvn clean package 打包即可,生成的包是不含有 依賴的,請注意把依賴放到spark classpath中去使用。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.student</groupId> <artifactId>GeoIP2</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.version>2.11.8</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.0</spark.version> <waterdrop.version>1.4.0</waterdrop.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>io.github.interestinglab.waterdrop</groupId> <artifactId>waterdrop-apis_2.11</artifactId> <version>${waterdrop.version}</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>com.maxmind.db</groupId> <artifactId>maxmind-db</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.maxmind.geoip2</groupId> <artifactId>geoip2</artifactId> <version>2.6.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.0.2</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
主要的程序文件只有一個Geoip2:
package com.student import io.github.interestinglab.waterdrop.apis.BaseFilter import com.typesafe.config.{Config, ConfigFactory} import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, udf} import scala.collection.JavaConversions._ import com.maxmind.geoip2.DatabaseReader import java.io.{File, InputStream} import java.net.InetAddress import com.maxmind.db.CHMCache import org.apache.spark.SparkFiles object ReaderWrapper extends Serializable { @transient lazy val reader = { val geoIPFile = "GeoLite2-City.mmdb"; val database = new File(SparkFiles.get(geoIPFile)); val reader: DatabaseReader = new DatabaseReader.Builder(database) //.fileMode(com.maxmind.db.Reader.FileMode.MEMORY) .fileMode(com.maxmind.db.Reader.FileMode.MEMORY_MAPPED) .withCache(new CHMCache()).build(); reader } } class GeoIP2 extends BaseFilter { var config: Config = ConfigFactory.empty() /** * Set Config. **/ override def setConfig(config: Config): Unit = { this.config = config } /** * Get Config. **/ override def getConfig(): Config = { this.config } override def checkConfig(): (Boolean, String) = { val requiredOptions = List("source_field") val nonExistsOptions: List[(String, Boolean)] = requiredOptions.map { optionName => (optionName, config.hasPath(optionName)) }.filter { p => !p._2 } if (nonExistsOptions.length == 0) { (true, "") } else { (false, "please specify setting as non-empty string") } } override def prepare(spark: SparkSession): Unit = { val defaultConfig = ConfigFactory.parseMap( Map( "source_field" -> "raw_message", "target_field" -> "__ROOT__" ) ) config = config.withFallback(defaultConfig) } override def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row] = { val srcField = config.getString("source_field") val func = udf { ip: String => ip2Locatation(ip) } val ip2Country=udf{ip:String => ip2Location2(ip,1)} val ip2Province=udf{ip:String => ip2Location2(ip,2)} val ip2City=udf{ip:String => ip2Location2(ip,3)} //df.withColumn(config.getString("target_field"), func(col(srcField))) df.withColumn("__country__", ip2Country(col(srcField))) .withColumn("__province__", ip2Province(col(srcField))) .withColumn("__city__", ip2City(col(srcField))) } def ip2Locatation(ip: String) = { try { val reader = ReaderWrapper.reader val ipAddress = InetAddress.getByName(ip) val response = reader.city(ipAddress) val country = response.getCountry() val subdivision = response.getMostSpecificSubdivision() val city = response.getCity() (country.getNames().get("zh-CN"), subdivision.getNames.get("zh-CN"), city.getNames().get("zh-CN")) } catch { case ex: Exception => ex.printStackTrace() ("", "", "") } } def ip2Location2(ip: String,index: Int) = { try { val reader = ReaderWrapper.reader val ipAddress = InetAddress.getByName(ip) val response = reader.city(ipAddress) index match { case 1 => response.getCountry().getNames().get("zh-CN") case 2 => response.getMostSpecificSubdivision().getNames.get("zh-CN") case 3 => response.getCity().getNames().get("zh-CN") case _ => "" } } catch { case ex: Exception => ex.printStackTrace() "" } } }
測試類的代碼如下:

package com.student import com.typesafe.config._ import org.apache.spark.sql.{DataFrame, SparkSession} object TestIt { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("demo").master("local[1]") .config("spark.files","/Users/student.yao/code/sparkshell/GeoLite2-City.mmdb") //.enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("WARN") //獲取到第一個conf,復制給插件的實例 val firstConf:Config = ConfigFactory.empty() //實例化插件對象 val pluginInstance: GeoIP2 = new GeoIP2 pluginInstance.setConfig(firstConf) pluginInstance.prepare(spark) //虛擬一些數據 import spark.implicits._ val sourceFile: DataFrame = Seq((1, "221.131.74.138"), (2, "112.25.215.84"), (3,"103.231.164.15"), (4,"36.99.136.137"), (5,"223.107.54.102"), (6,"117.136.118.125") ).toDF("id", "raw_message") sourceFile.show(false) val df2 = pluginInstance.process(spark,sourceFile) df2.show(false) } }
遇到的問題
1.一開始的時候,把 GeoLite2-city.mmdb放到了 Resources文件夾,想把它打到jar包里,然后在項目中 使用 getClass().getResourceAsStream("/GeoLite2-city")
運行中發現這種方式特別慢,說這種慢,是和 使用 sparkFieles.get方式比較起來,遂改成sparkFiles獲取的方式,先把文件放到 hdfs,然后在spark作業配置薦中配置:
spark.files="hdfs://nameservicesxxx/path/to/Geolite2-city.mmdb",這樣 SparkFiles.get("GeoLite2-city.mmdb") 就可以獲取文件使用。
2。性能優化的過程中,想把 Reader提出來放在 Prepare方法里面是很自然的一個想法,在本機測試的時侯沒問題,因為是單機的,沒發現,在生產上時發現報 不可序列化的異常。
其實在 ideaj中可以使用 spark.sparkContext.broadcast(reader)的方式,就可以發現這個異常。如何解決這個異常,通常的解決方式是 在 dataframe|rdd的 foreachpartition|mappartitions中
生成對象,這樣就不會報錯了。進一步可以使用 單例 這樣效果更好些 。但這個是 插件,沒法這樣做, 就想到了可以使用一個外殼包起來,讓reader不序列化即可。
所以有了 ReaderWrapper extends Serializable @transient lazy val 這一段。
3。初始化 reader的時候進行緩存
經過這些處理,性能得到了提升,經測試,4G,4core,每5秒一個批次,2萬條數據處理3秒鍾。(kafka->waterdrop-json->es)全過程。