需要實現的功能:
寫訪問spark的接口,也就是從web上輸入網址就能把我們需要的信息通過提交一個job然后返回給我們json數據。
成果展示:
通過url請求,然后的到一個wordcount的json結果(借助的是谷歌瀏覽器postman插件顯示的,直接在瀏覽器上輸入網址是一樣的效果)
使用的關鍵技術:
java語言編程,springmvc框架,tomcat容器,spark框架,scala相關依賴
成體架構:
我使用的是maven構建的一個web工程,pom文件如下:
<dependencies>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-compiler -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.11</version>
</dependency>
<!-- spring框架的相關jar包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-jdbc -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-webmvc -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<!--添加持久層框架(mybatise)-->
<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.1</version>
</dependency>
<!--mybatise和spring整合包-->
<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis-spring -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.3.0</version>
</dependency>
<!-- -->
<dependency>
<groupId>commons-DBCP</groupId>
<artifactId>commons-DBCP</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.9</version>
</dependency>
<!--添加連接池的jar包-->
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.18</version>
</dependency>
<!--添加數據庫驅動-->
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<!-- 日志處理 -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!--json相關的依賴,不要使用jackson的依賴-->
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
</dependencies>
web.xml的配置(這里只配置了springmvc容器)
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0">
<display-name>Archetype Created Web Application</display-name>
<!-- springmvc的前端控制器 -->
<servlet>
<servlet-name>manager</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<!-- contextConfigLocation不是必須的, 如果不配置contextConfigLocation, springmvc的配置文件默認在:WEB-INF/servlet的name+"-servlet.xml" -->
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:springmvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>manager</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<!-- 解決post亂碼 -->
<filter>
<filter-name>CharacterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>utf-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>CharacterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern> </filter-mapping> <!-- 日志配置 --> <context-param> <param-name>log4jConfigLocation</param-name> <param-value>classpath:log4j.properties</param-value> </context-param> <listener> <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class> </listener> </web-app>
然后就是springMVC的配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 配置包掃描器 -->
<context:component-scan base-package="com.zzrenfeng.zhsx.controller" />
<!-- 配置注解驅動 -->
<mvc:annotation-driven />
<context:component-scan base-package="com.zzrenfeng.zhsx.service"></context:component-scan>
<context:component-scan base-package="com.zzrenfeng.zhsx.spark.service"></context:component-scan>
<context:component-scan base-package="com.zzrenfeng.zhsx.spark.conf"></context:component-scan>
</beans>
配置文件就就沒有了,如果有需要可以再去集成其他的,下面進入編碼的介紹
對象和json相互轉換的工具類:
(為什么使用手動的去轉換,而沒有使用jackson的相關依賴進行自動轉換,是我在使用的時候發現使用jackson會對咱們的spark作業有影響,spark作業會異常終止掉)
package com.zzrenfeng.zhsx.util; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import net.sf.json.JsonConfig; /** * Json與javaBean之間的轉換工具類 * * @author * @version * * {@code 現使用json-lib組件實現 * 需要 * json-lib-2.4-jdk15.jar * ezmorph-1.0.6.jar * commons-collections-3.1.jar * commons-lang-2.0.jar * 支持 * } */
public class JsonUtil { /** * 從一個JSON 對象字符格式中得到一個java對象 * * @param jsonString * @param beanCalss * @return */ @SuppressWarnings("unchecked") public static <T> T jsonToBean(String jsonString, Class<T> beanCalss) { JSONObject jsonObject = JSONObject.fromObject(jsonString); T bean = (T) JSONObject.toBean(jsonObject, beanCalss); return bean; } /** * 將java對象轉換成json字符串 * * @param bean * @return */
public static String beanToJson(Object bean) { JSONObject json = JSONObject.fromObject(bean); return json.toString(); } /** * 將java對象轉換成json字符串 * * @param bean * @return */
public static String beanToJson(Object bean, String[] _nory_changes, boolean nory) { JSONObject json = null; if(nory){//轉換_nory_changes里的屬性
Field[] fields = bean.getClass().getDeclaredFields(); String str = ""; for(Field field : fields){ // System.out.println(field.getName());
str+=(":"+field.getName()); } fields = bean.getClass().getSuperclass().getDeclaredFields(); for(Field field : fields){ // System.out.println(field.getName());
str+=(":"+field.getName()); } str+=":"; for(String s : _nory_changes){ str = str.replace(":"+s+":", ":"); } json = JSONObject.fromObject(bean,configJson(str.split(":"))); }else{//轉換除了_nory_changes里的屬性
json = JSONObject.fromObject(bean,configJson(_nory_changes)); } return json.toString(); } private static JsonConfig configJson(String[] excludes) { JsonConfig jsonConfig = new JsonConfig(); jsonConfig.setExcludes(excludes); //
jsonConfig.setIgnoreDefaultExcludes(false); //
// jsonConfig.setCycleDetectionStrategy(CycleDetectionStrategy.LENIENT); // jsonConfig.registerJsonValueProcessor(Date.class, //
// new DateJsonValueProcessor(datePattern));
return jsonConfig; } /** * 將java對象List集合轉換成json字符串 * @param beans * @return */ @SuppressWarnings("unchecked") public static String beanListToJson(List beans) { StringBuffer rest = new StringBuffer(); rest.append("["); int size = beans.size(); for (int i = 0; i < size; i++) { rest.append(beanToJson(beans.get(i))+((i<size-1)?",":"")); } rest.append("]"); return rest.toString(); } /** * * @param beans * @param _no_changes * @return */ @SuppressWarnings("unchecked") public static String beanListToJson(List beans, String[] _nory_changes, boolean nory) { StringBuffer rest = new StringBuffer(); rest.append("["); int size = beans.size(); for (int i = 0; i < size; i++) { try{ rest.append(beanToJson(beans.get(i),_nory_changes,nory)); if(i<size-1){ rest.append(","); } }catch(Exception e){ e.printStackTrace(); } } rest.append("]"); return rest.toString(); } /** * 從json HASH表達式中獲取一個map,改map支持嵌套功能 * * @param jsonString * @return */ @SuppressWarnings({ "unchecked" }) public static Map jsonToMap(String jsonString) { JSONObject jsonObject = JSONObject.fromObject(jsonString); Iterator keyIter = jsonObject.keys(); String key; Object value; Map valueMap = new HashMap(); while (keyIter.hasNext()) { key = (String) keyIter.next(); value = jsonObject.get(key).toString(); valueMap.put(key, value); } return valueMap; } /** * map集合轉換成json格式數據 * @param map * @return */
public static String mapToJson(Map<String, ?> map, String[] _nory_changes, boolean nory){ String s_json = "{"; Set<String> key = map.keySet(); for (Iterator<?> it = key.iterator(); it.hasNext();) { String s = (String) it.next(); if(map.get(s) == null){ }else if(map.get(s) instanceof List<?>){ s_json+=(s+":"+JsonUtil.beanListToJson((List<?>)map.get(s), _nory_changes, nory)); }else{ JSONObject json = JSONObject.fromObject(map); s_json += (s+":"+json.toString());; } if(it.hasNext()){ s_json+=","; } } s_json+="}"; return s_json; } /** * 從json數組中得到相應java數組 * * @param jsonString * @return */
public static Object[] jsonToObjectArray(String jsonString) { JSONArray jsonArray = JSONArray.fromObject(jsonString); return jsonArray.toArray(); } public static String listToJson(List<?> list) { JSONArray jsonArray = JSONArray.fromObject(list); return jsonArray.toString(); } /** * 從json對象集合表達式中得到一個java對象列表 * * @param jsonString * @param beanClass * @return */ @SuppressWarnings("unchecked") public static <T> List<T> jsonToBeanList(String jsonString, Class<T> beanClass) { JSONArray jsonArray = JSONArray.fromObject(jsonString); JSONObject jsonObject; T bean; int size = jsonArray.size(); List<T> list = new ArrayList<T>(size); for (int i = 0; i < size; i++) { jsonObject = jsonArray.getJSONObject(i); bean = (T) JSONObject.toBean(jsonObject, beanClass); list.add(bean); } return list; } /** * 從json數組中解析出java字符串數組 * * @param jsonString * @return */
public static String[] jsonToStringArray(String jsonString) { JSONArray jsonArray = JSONArray.fromObject(jsonString); String[] stringArray = new String[jsonArray.size()]; int size = jsonArray.size(); for (int i = 0; i < size; i++) { stringArray[i] = jsonArray.getString(i); } return stringArray; } /** * 從json數組中解析出javaLong型對象數組 * * @param jsonString * @return */
public static Long[] jsonToLongArray(String jsonString) { JSONArray jsonArray = JSONArray.fromObject(jsonString); int size = jsonArray.size(); Long[] longArray = new Long[size]; for (int i = 0; i < size; i++) { longArray[i] = jsonArray.getLong(i); } return longArray; } /** * 從json數組中解析出java Integer型對象數組 * * @param jsonString * @return */
public static Integer[] jsonToIntegerArray(String jsonString) { JSONArray jsonArray = JSONArray.fromObject(jsonString); int size = jsonArray.size(); Integer[] integerArray = new Integer[size]; for (int i = 0; i < size; i++) { integerArray[i] = jsonArray.getInt(i); } return integerArray; } /** * 從json數組中解析出java Double型對象數組 * * @param jsonString * @return */
public static Double[] jsonToDoubleArray(String jsonString) { JSONArray jsonArray = JSONArray.fromObject(jsonString); int size = jsonArray.size(); Double[] doubleArray = new Double[size]; for (int i = 0; i < size; i++) { doubleArray[i] = jsonArray.getDouble(i); } return doubleArray; } }
spark的工具類:(主要負責sparkcontext的初始化工作)
package com.zzrenfeng.zhsx.spark.conf; import java.io.Serializable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.stereotype.Component; @Component public class ApplicationConfiguration implements Serializable{ private static final long serialVersionUID = 1L; public SparkConf sparkconf(){ SparkConf conf = new SparkConf() .setMaster("local[*]") .setAppName("wc"); return conf; } public JavaSparkContext javaSparkContext(){ return new JavaSparkContext(sparkconf()); } public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() { return new PropertySourcesPlaceholderConfigurer(); } public String filePath(){ return "E:\\測試文件\\nlog.txt"; } }
wordcount model類(對wordcount進行封裝)
package com.zzrenfeng.zhsx.spark.domain; import scala.Serializable; public class WordCount implements Serializable{ /** * */
private static final long serialVersionUID = 1L; private String word; private Integer count; public WordCount(){} public WordCount(String v1, int l) { word = v1; count = l; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public String toString() { return "WordCount [word=" + word + ", count=" + count + "]"; } }
spark service類,主要是負責spark word count的job任務邏輯
package com.zzrenfeng.zhsx.spark.service; import java.util.Arrays; import java.util.List; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import scala.Tuple2; import com.zzrenfeng.zhsx.spark.conf.ApplicationConfiguration; import com.zzrenfeng.zhsx.spark.domain.WordCount; @Component public class SparkServiceTest implements java.io.Serializable{ @Autowired ApplicationConfiguration applicationConfiguration; public List<WordCount> doWordCount(){ JavaSparkContext javaSparkContext = applicationConfiguration.javaSparkContext(); System.out.println(javaSparkContext); JavaRDD<String> file = javaSparkContext.textFile(applicationConfiguration.filePath()); JavaRDD<String> worlds = file.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String t) throws Exception { // TODO Auto-generated method stub
List<String> list = Arrays.asList(t.split(" ")); return list; } }); JavaRDD<WordCount> wordcount = worlds.map(new Function<String, WordCount>() { @Override public WordCount call(String v1) throws Exception { return new WordCount(v1,1); } }); JavaPairRDD<String, Integer> pairwordCount = wordcount.mapToPair(new PairFunction<WordCount, String, Integer>() { @Override public Tuple2<String, Integer> call(WordCount t) throws Exception { // TODO Auto-generated method stub
return new Tuple2<>(t.getWord() , new Integer(t.getCount())); } }); JavaPairRDD<String, Integer> worldCounts = pairwordCount.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { // TODO Auto-generated method stub
return v1+v2; } }); JavaRDD result = worldCounts.map(new Function<Tuple2<String,Integer>, WordCount>() { @Override public WordCount call(Tuple2<String, Integer> v1) throws Exception { // TODO Auto-generated method stub
return new WordCount(v1._1,v1._2); } }); List<WordCount> list = result.collect(); javaSparkContext.close(); System.out.println(list.toString()); return list; } }
controller層,主要負責請求的攔截
package com.zzrenfeng.zhsx.controller; import java.util.ArrayList; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import com.zzrenfeng.zhsx.spark.domain.WordCount; import com.zzrenfeng.zhsx.spark.service.SparkServiceTest; import com.zzrenfeng.zhsx.util.JsonUtil; @Controller @RequestMapping("hello") public class ControllerTest { @Autowired private SparkServiceTest sparkServiceTest; @RequestMapping("wc") @ResponseBody public String wordCount(){ List<WordCount> list = sparkServiceTest.doWordCount(); return JsonUtil.listToJson(list); } }
進行啟動,然后在瀏覽器上輸入上面的攔截的url就可以看到開始出現的結果了。
因為這是個web接口,所以可以從各個端去調用,甚至可以用其他語言去調用。
現在可以愉快的去擼spark代碼了,也許有人會問spark不應該用scala開發更好嗎?
個人認為如果是純粹的數據處理可以使用scala,編寫起來太爽了,但是跟其他的集成的時候最好還是用java,畢竟有問題了還可以跟java大牛去討論討論。
歡迎有興趣的一起來探討