在eclipse中運行spark examples(二)
package org.apache.spark.examples.streaming;
JavaCustomReceiver
編寫輔助測試代碼如下。
package tw;
import java.io.IOException; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket;
public class SocketServer {
public static void main(String[] args) throws IOException, InterruptedException { @SuppressWarnings("resource") ServerSocket ss = new ServerSocket(); ss.bind(new InetSocketAddress("localhost", 9999)); while(true){ Socket socket = ss.accept(); new Thread(new Runnable(){
@Override public void run() { try { PrintWriter pw; pw = new PrintWriter(socket.getOutputStream()); while(true){ pw.println("hello world"); pw.println("hello ls"); pw.println("hello ww"); pw.println("who lili"); pw.println("are zz"); pw.println("you"); pw.println("bye"); pw.println("hello world"); pw.println(); pw.flush(); Thread.sleep(500); } } catch (IOException | InterruptedException e) { e.printStackTrace(); } }}).start(); } }
} |
右鍵SocketServer.java,Run as /Java application
右鍵點擊JavaCustomReceiver.java,Run as/Java application
再次右鍵點擊JavaCustomReceiver.java,Run as/Run configuration
添加參數localhost 9999
在代碼中添加參數spark.master=local
// Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf() .setAppName("JavaCustomReceiver"); sparkConf.set("spark.master","local"); |
右鍵點擊JavaCustomReceiver.java,Run as/Java application
運行結果
JavaDirectKafkaWordCount
添加kafka和zookeeper的maven依賴,添加kafka的依賴
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.8</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 -->
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.2.1</version> </dependency> |
下載kafka和zookeeper的配置文件
/etc/kafka1/conf/server.properties /etc/zookeeper/conf/zoo.cfg
|
修改server.properties :
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:/hadoop/kmq
zookeeper.connect=localhost:2181
運行tw.kfksvr.KfkSvr
package tw.kfksvr;
import java.io.File; import java.io.IOException; import java.util.Properties;
import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
public class KfkSvr {
public static void main(String[] args) throws Exception {
//start local zookeeper System.out.println("starting local zookeeper..."); QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); Properties zkProperties = new Properties(); zkProperties.load(ClassLoader.getSystemResourceAsStream("zoo.cfg")); FileUtils.cleanDirectory(new File(zkProperties.getProperty("dataDir"))); ZooKeeperServerMain zooKeeperServer = new ZooKeeperServerMain(); final ServerConfig configuration = new ServerConfig(); quorumConfiguration.parseProperties(zkProperties); configuration.readFrom(quorumConfiguration); new Thread(new Runnable(){ @Override public void run() { try { zooKeeperServer.runFromConfig(configuration); } catch (IOException e) { e.printStackTrace(); } }}).start(); System.out.println("done"); Thread.sleep(1000); //start local kafka broker Properties kafkaProperties = new Properties();; kafkaProperties.load(ClassLoader.getSystemResourceAsStream("server.properties")); FileUtils.cleanDirectory(new File( kafkaProperties.getProperty("log.dirs"))); KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); KafkaServerStartable kafka = new KafkaServerStartable(kafkaConfig); System.out.println("starting local kafka broker..."); kafka.startup(); System.out.println("done");
Thread.sleep(1000); System.out.println("starting send kafka message..."); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i=0;i<10000;i++) { ProducerRecord<String, String> data = new ProducerRecord<String, String>("page_visits", "name"+i, "zs ls ww lhl"); producer.send(data); Thread.sleep(500); } producer.close(); System.out.println("done"); }
} |
修改JavaDirectKafkaWordCount 的Run configuration(localhost:9092 page_visits)
JavaDirectKafkaWordCount.java中添加一行:sparkConf.set("spark.master", "local[3]");
由於此例較陳舊,這里又需要去掉kafka。(下次重新運行Kfksvr又需要加上,且只有Kfksvr依賴它)
<!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.2.1</version> </dependency> --> |
運行JavaDirectKafkaWordCount
運行結果
結果解析。streaming時間窗口2秒,而kafka發送周期是500ms,故統計2000/500=4次。
JavaKafkaWordCount
啟動KfkSvr
啟動JavaKafkaWordCount
說明:這個example過於久遠,它間接依賴的class(org.apache.spark.Logging.class等三個。)已經刪除了。故,為了成功運行,需要找到這個class,並添加。(exmaples下其他都不需要)
從這里下載spark-1.5.2的bin。。http://spark.apache.org/downloads.html
解壓出spark-assembly-1.5.2-hadoop2.6.0.jar 。再從這個jar中找到apache.spark. Logging.class。
在項目中建出下面的目錄結構,並如圖拷貝到目錄下。
將上面patch目錄添加到classpath中

配置運行參數
在JavaKafkaWorkdCount.java中添加一行:sparkConf.set("spark.master", "local[3]");
說明:如果只設置spark.master=local,會有提示:WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
運行結果:
JavaStructuredKafkaWordCount
package org.apache.spark.examples.sql.streaming;
需要用到spark-sql-kafka,所以需要添加如下依賴。
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.10</artifactId> <version>2.1.1</version> </dependency> |
運行KfkSvr(略)
運行JavaStructuredKafkaWordCount,修改代碼
SparkSession spark = SparkSession .builder() .config("spark.master", "local[3]") .appName("JavaStructuredKafkaWordCount") .getOrCreate(); // Create DataSet representing the stream of input lines from kafka |
修改啟動參數:localhost:9092 subscribe page_visits
運行結果:
說明,日志較多。修改log級別ERROR后:
JavaStructuredNetworkWordCount
package org.apache.spark.examples.sql.streaming;
運行tw.SocketServer
配置JavaStructuredNetworkWordCount的啟動參數
配置JavaStructuredNetworkWordCount的代碼:
SparkSession spark = SparkSession .builder() .config("spark.master", "local[2]") .appName("JavaStructuredNetworkWordCount") .getOrCreate(); |
JavaStructuredNetworkWordCountWindowed
啟動socketServer
修改JavaStructuredNetworkWordCountWindowed啟動參數,添加main函數參數和環境變量
啟動JavaStructuredNetworkWordCountWindowed
JavaNetworkWordCount
package org.apache.spark.examples.streaming;
-
Program arguments: localhost 9999 D:/tmp/checkpoint/ D:/tmp/out
-
replace native envrionments with specified envrionment.
Envrionments: HADOOP_HOME=F:/TDDOWNLOAD/winutils-master/winutils-master/hadoop-2.7.1
修改JavaNetworkWordCount.java
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount")
.set("spark.master", "local[3]");
啟動tw.SocketServer。
啟動JavaNetworkWordCount
運行結果:
JavaQueueStream
SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream").set("spark.master", "local[2]");
結果如圖:
JavaRecoverableNetworkWordCount
SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount").set("spark.master", "local[3]"); |
1,Program arguments:localhost 9999
2,replace native envrionments with specified envrionment.
Envrionments: HADOOP_HOME=F:/TDDOWNLOAD/winutils-master/winutils-master/hadoop-2.7.1
修改JavaNetworkWordCount.java
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount")
.set("spark.master", "local[3]");
啟動tw.SocketServer。
啟動JavaRecoverableNetworkWordCount
運行結果:
JavaSqlNetworkWordCount
package org.apache.spark.examples.streaming;
-
Program arguments: localhost 9999
-
replace native envrionments with specified envrionment.
Envrionments: HADOOP_HOME=F:/TDDOWNLOAD/winutils-master/winutils-master/hadoop-2.7.1
修改JavaSqlNetworkWordCount.java
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount")
.set("spark.master", "local[3]");
啟動tw.SocketServer。
啟動JavaSqlNetworkWordCount
JavaStatefulNetworkWordCount
package org.apache.spark.examples.streaming;
-
Program arguments: localhost 9999
-
replace native envrionments with specified envrionment.
Envrionments: HADOOP_HOME=F:/TDDOWNLOAD/winutils-master/winutils-master/hadoop-2.7.1
修改JavaStatefulNetworkWordCount.java
SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount ")
.set("spark.master", "local[3]");
修改SocketServer
while(true){
pw.println("hello world");
pw.println();
pw.flush();
Thread.sleep(500);
}
啟動tw.SocketServer。
啟動JavaStatefulNetworkWordCount
運行結果:
JavaFlumeEventCount
准備輔助類
package tw.flume;
import java.util.HashMap; import java.util.Map;
import org.apache.flume.agent.embedded.EmbeddedAgent; import org.apache.flume.event.SimpleEvent;
import com.google.common.collect.ImmutableMap;
public class FlumeSvr {
public static void main(String[] args) throws Exception { final Map<String,String> properties=new HashMap<String,String>(); properties.put("channel.type","memory"); properties.put("channel.capacity","100000"); properties.put("channel.transactionCapacity","1000"); properties.put("sinks","sink1"); properties.put("sink1.type","avro"); properties.put("sink1.hostname","localhost"); properties.put("sink1.port","44444"); properties.put("processor.type","default"); EmbeddedAgent agent = null; agent = new EmbeddedAgent("myagent"); agent.configure(properties); agent.start(); for(int i=0;;i++){ SimpleEvent evt = new SimpleEvent() ; evt.setBody("this is body".getBytes()); evt.setHeaders(ImmutableMap.of("index", i+"")); agent.put(evt); Thread.sleep(100); } }
} |
添加maven依賴
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.10</artifactId> <version>2.1.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-embedded-agent --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-embedded-agent</artifactId> <version>1.6.0</version> </dependency> |
准備JavaFlumeEventCount
package org.apache.spark.examples.streaming;
-
Program arguments: localhost 44444
修改JavaFlumeEventCount.java
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount") .set("spark.master", "local[3]");
1,運行JavaFlumeEventCount
2,運行tw.flume. FlumeSvr
結果如下圖。每隔10秒出現一次Recived 100
JavaHdfsLR
package org.apache.spark.examples;
Program arguments: examples/src/main/resources/JavaHdfsLR.txt 100
修改JavaHdfsLR.java
SparkSession spark = SparkSession
.builder()
.appName("JavaHdfsLR").config("spark.master","local[3]")
編寫數據生成代碼
package tw;
import java.io.FileWriter; import java.io.IOException; import java.util.Random;
public class DataGen {
public static void main(String[] args) throws IOException { JavaHdfsLR();
}
private static void JavaHdfsLR() throws IOException { FileWriter out = new FileWriter("examples/src/main/resources/JavaHdfsLR.txt"); Random r =new Random(); for(int line=0;line<1000;line++){ StringBuffer sb =new StringBuffer(); for(int i=0;i<11;i++){ sb.append(r.nextDouble()); sb.append(" "); } out.write(sb.toString()); out.write('\n'); sb.setLength(0); } out.close(); }
} |
運行DataGen.java
運行JavaHdfsLR.java
結果類似(數據源隨機產生,故不同)如下:
JavaLogQuery
JavaPageRank
package org.apache.spark.examples;
Program arguments: examples/src/main/resources/JavaPageRank.txt 100
修改JavaHdfsLR.java
SparkSession spark = SparkSession
.builder()
.appName("JavaPageRank").config("spark.master","local[3]")
修改tw.DataGen的代碼
public static void main(String[] args) throws IOException { // JavaHdfsLR(); JavaPageRank(); }
private static void JavaPageRank() throws IOException { FileWriter out = new FileWriter("examples/src/main/resources/JavaPageRank.txt"); Random r =new Random(); char[] links = {'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z'}; for(int line=0;line<1000;line++){ int i = r.nextInt(24); out.write(links[i]); out.write(' '); i = r.nextInt(24); out.write(links[i]); out.write('\n'); } out.close(); } |
運行tw.DataGen
運行JavaPageRank
結果如下:
JavaSparkPi
JavaSparkPi
JavaStatusTrackerDemo
JavaTC


