在eclipse中運行spark examples(二)


eclipse中運行spark examples(二)

 

package org.apache.spark.examples.streaming;

 

JavaCustomReceiver

編寫輔助測試代碼如下。

package tw;

 

import java.io.IOException;

import java.io.PrintWriter;

import java.net.InetSocketAddress;

import java.net.ServerSocket;

import java.net.Socket;

 

public class SocketServer {

 

    public static void main(String[] args) throws IOException, InterruptedException {

        @SuppressWarnings("resource")

        ServerSocket ss = new ServerSocket();

        ss.bind(new InetSocketAddress("localhost", 9999));

        while(true){

            Socket socket = ss.accept();

            new Thread(new Runnable(){

 

                @Override

                public void run() {

                    try {

                    PrintWriter pw;

                        pw = new PrintWriter(socket.getOutputStream());

                    while(true){

                        pw.println("hello world");

                        pw.println("hello ls");

                        pw.println("hello ww");

                        pw.println("who lili");

                        pw.println("are zz");

                        pw.println("you");

                        pw.println("bye");

                        pw.println("hello world");

                        pw.println();

                        pw.flush();

                        Thread.sleep(500);

                    }

                    } catch (IOException | InterruptedException e) {

                        e.printStackTrace();

                    }

                }}).start();

        }

    }

 

}

右鍵SocketServer.java,Run as /Java application

右鍵點擊JavaCustomReceiver.java,Run as/Java application

再次右鍵點擊JavaCustomReceiver.java,Run as/Run configuration

添加參數localhost 9999

在代碼中添加參數spark.master=local

// Create the context with a 1 second batch size

SparkConf sparkConf = new SparkConf()

.setAppName("JavaCustomReceiver");

sparkConf.set("spark.master","local");

右鍵點擊JavaCustomReceiver.java,Run as/Java application

運行結果

 

 

JavaDirectKafkaWordCount

添加kafka和zookeeper的maven依賴,添加kafka的依賴

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.4.8</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 -->

 

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.10.2.1</version>

</dependency>

 

下載kafka和zookeeper的配置文件

/etc/kafka1/conf/server.properties

/etc/zookeeper/conf/zoo.cfg

修改server.properties :

listeners=PLAINTEXT://localhost:9092

advertised.listeners=PLAINTEXT://localhost:9092

log.dirs=D:/hadoop/kmq

zookeeper.connect=localhost:2181

運行tw.kfksvr.KfkSvr

package tw.kfksvr;

 

import java.io.File;

import java.io.IOException;

import java.util.Properties;

 

import kafka.server.KafkaConfig;

import kafka.server.KafkaServerStartable;

 

import org.apache.commons.io.FileUtils;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.zookeeper.server.ServerConfig;

import org.apache.zookeeper.server.ZooKeeperServerMain;

import org.apache.zookeeper.server.quorum.QuorumPeerConfig;

 

public class KfkSvr {

 

    public static void main(String[] args) throws Exception {

        

        //start local zookeeper

        System.out.println("starting local zookeeper...");

        QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();

        Properties zkProperties = new Properties();

        zkProperties.load(ClassLoader.getSystemResourceAsStream("zoo.cfg"));

        FileUtils.cleanDirectory(new File(zkProperties.getProperty("dataDir")));

        ZooKeeperServerMain zooKeeperServer = new ZooKeeperServerMain();

        final ServerConfig configuration = new ServerConfig();

        quorumConfiguration.parseProperties(zkProperties);

        configuration.readFrom(quorumConfiguration);

        new Thread(new Runnable(){

            @Override

            public void run() {

                try {

                    zooKeeperServer.runFromConfig(configuration);

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }}).start();

        System.out.println("done");

        Thread.sleep(1000);

        //start local kafka broker

     Properties kafkaProperties = new Properties();;

     kafkaProperties.load(ClassLoader.getSystemResourceAsStream("server.properties"));

            FileUtils.cleanDirectory(new File( kafkaProperties.getProperty("log.dirs")));

         KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);

        KafkaServerStartable kafka = new KafkaServerStartable(kafkaConfig);

        System.out.println("starting local kafka broker...");

        kafka.startup();

        System.out.println("done");

          

        

 

        Thread.sleep(1000);

        System.out.println("starting send kafka message...");

        Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);

for (int i=0;i<10000;i++) {

ProducerRecord<String, String> data = new ProducerRecord<String, String>("page_visits", "name"+i, "zs ls ww lhl");

producer.send(data);

Thread.sleep(500);

}

producer.close();

        System.out.println("done");

    }

 

}

 

 

 

修改JavaDirectKafkaWordCount 的Run configuration(localhost:9092 page_visits)

JavaDirectKafkaWordCount.java中添加一行:sparkConf.set("spark.master", "local[3]");

由於此例較陳舊,這里又需要去掉kafka。(下次重新運行Kfksvr又需要加上,且只有Kfksvr依賴它)

<!--

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.10.2.1</version>

</dependency> -->

 

運行JavaDirectKafkaWordCount

 

運行結果

結果解析。streaming時間窗口2秒,而kafka發送周期是500ms,故統計2000/500=4次。

JavaKafkaWordCount

啟動KfkSvr

啟動JavaKafkaWordCount

    說明:這個example過於久遠,它間接依賴的class(org.apache.spark.Logging.class等三個。)已經刪除了。故,為了成功運行,需要找到這個class,並添加。(exmaples下其他都不需要)

從這里下載spark-1.5.2的bin。。http://spark.apache.org/downloads.html

解壓出spark-assembly-1.5.2-hadoop2.6.0.jar 。再從這個jar中找到apache.spark. Logging.class。

在項目中建出下面的目錄結構,並如圖拷貝到目錄下。

將上面patch目錄添加到classpath中

配置運行參數

在JavaKafkaWorkdCount.java中添加一行:sparkConf.set("spark.master", "local[3]");

    說明:如果只設置spark.master=local,會有提示:WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.

運行結果:

 

 

JavaStructuredKafkaWordCount

package org.apache.spark.examples.sql.streaming;

需要用到spark-sql-kafka,所以需要添加如下依賴。

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.10 -->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql-kafka-0-10_2.10</artifactId>

<version>2.1.1</version>

</dependency>

 

運行KfkSvr(略)

運行JavaStructuredKafkaWordCount,修改代碼

 

SparkSession spark = SparkSession

.builder()

.config("spark.master", "local[3]")

.appName("JavaStructuredKafkaWordCount")

.getOrCreate();

// Create DataSet representing the stream of input lines from kafka

修改啟動參數:localhost:9092 subscribe page_visits

運行結果:

說明,日志較多。修改log級別ERROR后:

 

 

JavaStructuredNetworkWordCount

package org.apache.spark.examples.sql.streaming;

運行tw.SocketServer

配置JavaStructuredNetworkWordCount的啟動參數

配置JavaStructuredNetworkWordCount的代碼:

SparkSession spark = SparkSession

.builder()

.config("spark.master", "local[2]")

.appName("JavaStructuredNetworkWordCount")

.getOrCreate();

 

 

 

JavaStructuredNetworkWordCountWindowed

啟動socketServer

修改JavaStructuredNetworkWordCountWindowed啟動參數,添加main函數參數和環境變量

 

啟動JavaStructuredNetworkWordCountWindowed

 

 

 

 

JavaNetworkWordCount

 

package org.apache.spark.examples.streaming;

  1. Program arguments: localhost 9999 D:/tmp/checkpoint/ D:/tmp/out
  2. replace native envrionments with specified envrionment.

Envrionments: HADOOP_HOME=F:/TDDOWNLOAD/winutils-master/winutils-master/hadoop-2.7.1

修改JavaNetworkWordCount.java

SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount")

        .set("spark.master", "local[3]");

啟動tw.SocketServer。

啟動JavaNetworkWordCount

運行結果:

 

JavaQueueStream

SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream").set("spark.master", "local[2]");

結果如圖:

 

JavaRecoverableNetworkWordCount

 

SparkConf sparkConf =

new SparkConf().setAppName("JavaRecoverableNetworkWordCount").set("spark.master", "local[3]");

1,Program arguments:localhost 9999

2,replace native envrionments with specified envrionment.

Envrionments: HADOOP_HOME=F:/TDDOWNLOAD/winutils-master/winutils-master/hadoop-2.7.1

修改JavaNetworkWordCount.java

SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount")

        .set("spark.master", "local[3]");

啟動tw.SocketServer。

啟動JavaRecoverableNetworkWordCount

運行結果:

 

JavaSqlNetworkWordCount

package org.apache.spark.examples.streaming;

  1. Program arguments: localhost 9999
  2. replace native envrionments with specified envrionment.

Envrionments: HADOOP_HOME=F:/TDDOWNLOAD/winutils-master/winutils-master/hadoop-2.7.1

修改JavaSqlNetworkWordCount.java

SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount")

        .set("spark.master", "local[3]");

啟動tw.SocketServer。

啟動JavaSqlNetworkWordCount

 

JavaStatefulNetworkWordCount

package org.apache.spark.examples.streaming;

  1. Program arguments: localhost 9999
  2. replace native envrionments with specified envrionment.

Envrionments: HADOOP_HOME=F:/TDDOWNLOAD/winutils-master/winutils-master/hadoop-2.7.1

修改JavaStatefulNetworkWordCount.java

SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount ")

        .set("spark.master", "local[3]");

 

修改SocketServer

        while(true){

            pw.println("hello world");

            pw.println();

            pw.flush();

            Thread.sleep(500);

        }

啟動tw.SocketServer。

啟動JavaStatefulNetworkWordCount

運行結果:

 

 

JavaFlumeEventCount

准備輔助類

package tw.flume;

 

import java.util.HashMap;

import java.util.Map;

 

import org.apache.flume.agent.embedded.EmbeddedAgent;

import org.apache.flume.event.SimpleEvent;

 

import com.google.common.collect.ImmutableMap;

 

public class FlumeSvr {

 

    public static void main(String[] args) throws Exception {

        final Map<String,String> properties=new HashMap<String,String>();

         properties.put("channel.type","memory");

         properties.put("channel.capacity","100000");

         properties.put("channel.transactionCapacity","1000");

         properties.put("sinks","sink1");

         properties.put("sink1.type","avro");

         properties.put("sink1.hostname","localhost");

         properties.put("sink1.port","44444");

         properties.put("processor.type","default");

         EmbeddedAgent agent = null;

         agent = new EmbeddedAgent("myagent");

         agent.configure(properties);

         agent.start();

         for(int i=0;;i++){

             SimpleEvent evt = new SimpleEvent() ;

             evt.setBody("this is body".getBytes());

            evt.setHeaders(ImmutableMap.of("index", i+""));

            agent.put(evt);

             Thread.sleep(100);

         }

    }

 

}

 

添加maven依賴

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.10 -->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-flume_2.10</artifactId>

<version>2.1.1</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-embedded-agent -->

<dependency>

<groupId>org.apache.flume</groupId>

<artifactId>flume-ng-embedded-agent</artifactId>

<version>1.6.0</version>

</dependency>

 

 

准備JavaFlumeEventCount

 

 

 

package org.apache.spark.examples.streaming;

  1. Program arguments: localhost 44444

修改JavaFlumeEventCount.java

SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount")         .set("spark.master", "local[3]");

1,運行JavaFlumeEventCount

2,運行tw.flume. FlumeSvr

結果如下圖。每隔10秒出現一次Recived 100

 

 

 

 

 

 

 

 

 

 

 

 

JavaHdfsLR

package org.apache.spark.examples;

Program arguments: examples/src/main/resources/JavaHdfsLR.txt 100

修改JavaHdfsLR.java

SparkSession spark = SparkSession

.builder()

.appName("JavaHdfsLR").config("spark.master","local[3]")

 

編寫數據生成代碼

package tw;

 

import java.io.FileWriter;

import java.io.IOException;

import java.util.Random;

 

public class DataGen {

 

    public static void main(String[] args) throws IOException {

        JavaHdfsLR();

 

    }

 

    private static void JavaHdfsLR() throws IOException {

        FileWriter out = new FileWriter("examples/src/main/resources/JavaHdfsLR.txt");

        Random r =new Random();

        for(int line=0;line<1000;line++){

            StringBuffer sb =new StringBuffer();

            for(int i=0;i<11;i++){

                sb.append(r.nextDouble());

                sb.append(" ");

            }

            out.write(sb.toString());

            out.write('\n');

            sb.setLength(0);

        }

        out.close();

    }

 

}

 

 

 

運行DataGen.java

運行JavaHdfsLR.java

結果類似(數據源隨機產生,故不同)如下:

JavaLogQuery

 

JavaPageRank

package org.apache.spark.examples;

Program arguments: examples/src/main/resources/JavaPageRank.txt 100

修改JavaHdfsLR.java

SparkSession spark = SparkSession

.builder()

.appName("JavaPageRank").config("spark.master","local[3]")

 

修改tw.DataGen的代碼

    public static void main(String[] args) throws IOException {

//        JavaHdfsLR();

        JavaPageRank();

    }

 

    private static void JavaPageRank() throws IOException {

        FileWriter out = new FileWriter("examples/src/main/resources/JavaPageRank.txt");

        Random r =new Random();

        char[] links = {'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z'};

        for(int line=0;line<1000;line++){

            int i = r.nextInt(24);

            out.write(links[i]);

            out.write(' ');

            i = r.nextInt(24);

            out.write(links[i]);

            out.write('\n');

        }

        out.close();

    }

運行tw.DataGen

運行JavaPageRank

結果如下:

 

 

JavaSparkPi

JavaSparkPi

 

JavaStatusTrackerDemo

 

JavaTC


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM