spark streaming從指定offset處消費Kafka數據


 spark streaming從指定offset處消費Kafka數據
2017-06-13 15:19 770人閱讀 評論(2) 收藏 舉報
 分類: spark(5)  

原文地址:http://blog.csdn.net/high2011/article/details/53706446

      首先很感謝原文作者,看到這篇文章我少走了很多彎路,轉載此文章是為了保留一份供復習用,請大家支持原作者,移步到上面的連接去看,謝謝


一、情景:當Spark streaming程序意外退出時,數據仍然再往Kafka中推送,然而由於Kafka默認是從latest的offset讀取,這會導致數據丟失。為了避免數據丟失,那么我們需要記錄每次消費的offset,以便下次檢查並且從指定的offset開始讀取
二、環境:kafka-0.9.0、Spark-1.6.0、jdk-1.7、Scala-2.10.5、idea16
三、實現代碼:
      1、引入spark和kafka的相關依賴包
[html] view plain copy
<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
         xmlns="http://maven.apache.org/POM/4.0.0"  
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
    <modelVersion>4.0.0</modelVersion>  
  
    <groupId>com.ngaa</groupId>  
    <artifactId>test-my</artifactId>  
    <version>1.0-SNAPSHOT</version>  
    <inceptionYear>2008</inceptionYear>  
    <properties>  
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>  
        <!--add  maven release-->  
        <maven.compiler.source>1.7</maven.compiler.source>  
        <maven.compiler.target>1.7</maven.compiler.target>  
        <encoding>UTF-8</encoding>  
        <!--scala版本-->  
        <scala.version>2.10.5</scala.version>  
        <!--測試機器上的scala版本-->  
        <test.scala.version>2.11.7</test.scala.version>  
  
        <jackson.version>2.3.0</jackson.version>  
        <!--slf4j版本-->  
        <slf4j-version>1.7.20</slf4j-version>  
        <!--cdh-spark-->  
        <spark.cdh.version>1.6.0-cdh5.8.0</spark.cdh.version>  
        <spark.streaming.cdh.version>1.6.0-cdh5.8.0</spark.streaming.cdh.version>  
        <kafka.spark.cdh.version>1.6.0-cdh5.8.0</kafka.spark.cdh.version>  
        <!--cdh-hadoop-->  
        <hadoop.cdh.version>2.6.0-cdh5.8.0</hadoop.cdh.version>  
        <!--http client必需要兼容CDH中的hadoop版本(cd /opt/cloudera/parcels/CDH/lib/hadoop/lib)-->  
        <httpclient.version>4.2.5</httpclient.version>  
  
        <!--http copre-->  
        <httpcore.version>4.2.5</httpcore.version>  
        <!--fastjson-->  
        <fastjson.version>1.1.39</fastjson.version>  
  
    </properties>  
  
    <repositories>  
        <repository>  
            <id>scala-tools.org</id>  
            <name>Scala-Tools Maven2 Repository</name>  
            <url>http://scala-tools.org/repo-releases</url>  
        </repository>  
        <!--配置依賴庫地址(用於加載CDH依賴的jar包) -->  
        <repository>  
            <id>cloudera</id>  
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
        </repository>  
    </repositories>  
  
    <pluginRepositories>  
        <pluginRepository>  
            <id>scala-tools.org</id>  
            <name>Scala-Tools Maven2 Repository</name>  
            <url>http://scala-tools.org/repo-releases</url>  
        </pluginRepository>  
    </pluginRepositories>  
  
    <dependencies>  
  
        <!--fastjson-->  
        <dependency>  
            <groupId>com.alibaba</groupId>  
            <artifactId>fastjson</artifactId>  
            <version>${fastjson.version}</version>  
        </dependency>  
        <!--httpclient-->  
        <dependency>  
            <groupId>org.apache.httpcomponents</groupId>  
            <artifactId>httpclient</artifactId>  
            <version>${httpclient.version}</version>  
        </dependency>  
  
        <!--http core-->  
        <dependency>  
            <groupId>org.apache.httpcomponents</groupId>  
            <artifactId>httpcore</artifactId>  
            <version>${httpcore.version}</version>  
        </dependency>  
  
        <!--slf4j-->  
        <dependency>  
            <groupId>org.slf4j</groupId>  
            <artifactId>slf4j-log4j12</artifactId>  
            <version>${slf4j-version}</version>  
        </dependency>  
        <!--hadoop-->  
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-client</artifactId>  
            <version>${hadoop.cdh.version}</version>  
            <exclusions>  
                <exclusion>  
                    <groupId>javax.servlet</groupId>  
                    <artifactId>*</artifactId>  
                </exclusion>  
            </exclusions>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-common</artifactId>  
            <version>${hadoop.cdh.version}</version>  
            <exclusions>  
                <exclusion>  
                    <groupId>javax.servlet</groupId>  
                    <artifactId>*</artifactId>  
                </exclusion>  
            </exclusions>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-hdfs</artifactId>  
            <version>${hadoop.cdh.version}</version>  
            <exclusions>  
                <exclusion>  
                    <groupId>javax.servlet</groupId>  
                    <artifactId>*</artifactId>  
                </exclusion>  
            </exclusions>  
        </dependency>  
        <!--spark scala-->  
        <dependency>  
            <groupId>org.scala-lang</groupId>  
            <artifactId>scala-library</artifactId>  
            <version>${scala.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>com.fasterxml.jackson.core</groupId>  
            <artifactId>jackson-databind</artifactId>  
            <version>${jackson.version}</version>  
        </dependency>  
  
        <!--spark streaming和kafka的相關包-->  
        <dependency>  
            <groupId>org.apache.spark</groupId>  
            <artifactId>spark-streaming_2.10</artifactId>  
            <version>${spark.streaming.cdh.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.spark</groupId>  
            <artifactId>spark-streaming-kafka_2.10</artifactId>  
            <version>${kafka.spark.cdh.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>junit</groupId>  
            <artifactId>junit</artifactId>  
            <version>4.12</version>  
            <scope>test</scope>  
        </dependency>  
  
        <!--引入windows本地庫的spark包-->  
        <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-assembly_2.10</artifactId>  
        <version>${spark.cdh.version}</version>  
        <scope>system</scope>  
        <systemPath>D:/crt_send_document/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar</systemPath>  
        </dependency>  
  
        <!--引入測試環境linux本地庫的spark包-->  
        <!--<dependency>-->  
            <!--<groupId>org.apache.spark</groupId>-->  
            <!--<artifactId>spark-assembly_2.10</artifactId>-->  
            <!--<version>${spark.cdh.version}</version>-->  
            <!--<scope>system</scope>-->  
            <!--<systemPath>/opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar-->  
            <!--</systemPath>-->  
        <!--</dependency>-->  
  
        <!--引入中央倉庫的spark包-->  
        <!--<dependency>-->  
        <!--<groupId>org.apache.spark</groupId>-->  
        <!--<artifactId>spark-assembly_2.10</artifactId>-->  
        <!--<version>${spark.cdh.version}</version>-->  
        <!--</dependency>-->  
  
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-web-proxy -->  
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-yarn-server-web-proxy</artifactId>  
            <version>2.6.0-cdh5.8.0</version>  
        </dependency>  
  
    </dependencies>  
  
    <!--maven打包-->  
    <build>  
        <finalName>test-my</finalName>  
        <sourceDirectory>src/main/scala</sourceDirectory>  
        <testSourceDirectory>src/test/scala</testSourceDirectory>  
        <plugins>  
            <plugin>  
                <groupId>org.scala-tools</groupId>  
                <artifactId>maven-scala-plugin</artifactId>  
                <version>2.15.2</version>  
                <executions>  
                    <execution>  
                        <goals>  
                            <goal>compile</goal>  
                            <goal>testCompile</goal>  
                        </goals>  
                    </execution>  
                </executions>  
                <configuration>  
                    <scalaVersion>${scala.version}</scalaVersion>  
                    <args>  
                        <arg>-target:jvm-1.7</arg>  
                    </args>  
                </configuration>  
            </plugin>  
            <plugin>  
                <groupId>org.apache.maven.plugins</groupId>  
                <artifactId>maven-eclipse-plugin</artifactId>  
                <configuration>  
                    <downloadSources>true</downloadSources>  
                    <buildcommands>  
                        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>  
                    </buildcommands>  
                    <additionalProjectnatures>  
                        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>  
                    </additionalProjectnatures>  
                    <classpathContainers>  
                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>  
                        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>  
                    </classpathContainers>  
                </configuration>  
            </plugin>  
            <plugin>  
                <artifactId>maven-assembly-plugin</artifactId>  
                <configuration>  
                    <descriptorRefs>  
                        <descriptorRef>jar-with-dependencies</descriptorRef>  
                    </descriptorRefs>  
                    <archive>  
                        <manifest>  
                            <mainClass></mainClass>  
                        </manifest>  
                    </archive>  
                </configuration>  
                <executions>  
                    <execution>  
                        <id>make-assembly</id>  
                        <phase>package</phase>  
                        <goals>  
                            <goal>single</goal>  
                        </goals>  
                    </execution>  
                </executions>  
            </plugin>  
        </plugins>  
    </build>  
    <reporting>  
        <plugins>  
            <plugin>  
                <groupId>org.scala-tools</groupId>  
                <artifactId>maven-scala-plugin</artifactId>  
                <configuration>  
                    <scalaVersion>${scala.version}</scalaVersion>  
                </configuration>  
            </plugin>  
        </plugins>  
    </reporting>  
  
</project>  

 2、新建測試類
[java] view plain copy
import kafka.common.TopicAndPartition  
import kafka.message.MessageAndMetadata  
import kafka.serializer.StringDecoder  
import org.apache.log4j.{Level, Logger}  
import org.apache.spark.{SparkConf, TaskContext}  
import org.apache.spark.streaming.dstream.InputDStream  
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.slf4j.LoggerFactory  
  
/** 
  * Created by yangjf on 2016/12/18 
  * Update date: 
  * Time: 11:10 
  * Describle :從指定偏移量讀取kafka數據 
  * Result of Test: 
  * Command: 
  * Email: jifei.yang@ngaa.com.cn 
  */  
object ReadBySureOffsetTest {  
  val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass)  
  
  def main(args: Array[String]) {  
    //設置打印日志級別  
    Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR)  
    Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR)  
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)  
    logger.info("測試從指定offset消費kafka的主程序開始")  
    if (args.length < 1) {  
      System.err.println("Your arguments were " + args.mkString(","))  
      System.exit(1)  
      logger.info("主程序意外退出")  
    }  
    //hdfs://hadoop1:8020/user/root/spark/checkpoint  
    val Array(checkpointDirectory) = args  
    logger.info("checkpoint檢查:" + checkpointDirectory)  
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,  
      () => {  
        createContext(checkpointDirectory)  
      })  
    logger.info("streaming開始啟動")  
    ssc.start()  
    ssc.awaitTermination()  
  }  
  
  def createContext(checkpointDirectory: String): StreamingContext = {  
    //獲取配置  
    val brokers = "hadoop3:9092,hadoop4:9092"  
    val topics = "20161218a"  
  
    //默認為5秒  
    val split_rdd_time = 8  
    // 創建上下文  
    val sparkConf = new SparkConf()  
      .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]")  
      .set("spark.app.id", "streaming_kafka")  
  
    val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time))  
  
    ssc.checkpoint(checkpointDirectory)  
  
    // 創建包含brokers和topic的直接kafka流  
    val topicsSet: Set[String] = topics.split(",").toSet  
    //kafka配置參數  
    val kafkaParams: Map[String, String] = Map[String, String](  
      "metadata.broker.list" -> brokers,  
      "group.id" -> "apple_sample",  
      "serializer.class" -> "kafka.serializer.StringEncoder"  
//      "auto.offset.reset" -> "largest"   //自動將偏移重置為最新偏移(默認)  
//      "auto.offset.reset" -> "earliest"  //自動將偏移重置為最早的偏移  
//      "auto.offset.reset" -> "none"      //如果沒有為消費者組找到以前的偏移,則向消費者拋出異常  
    )  
    /** 
      * 從指定位置開始讀取kakfa數據 
      * 注意:由於Exactly  Once的機制,所以任何情況下,數據只會被消費一次! 
      *      指定了開始的offset后,將會從上一次Streaming程序停止處,開始讀取kafka數據 
      */  
    val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L))                          //指定topic,partition_no,offset  
    val fromOffsets = setFromOffsets(offsetList)     //構建參數  
    val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //構建MessageAndMetadata  
   //使用高級API從指定的offset開始消費,欲了解詳情,  
   //請進入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看  
    val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)  
  
    //數據操作  
    messages.foreachRDD(mess => {  
      //獲取offset集合  
      val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges  
      mess.foreachPartition(lines => {  
        lines.foreach(line => {  
          val o: OffsetRange = offsetsList(TaskContext.get.partitionId)  
          logger.info("++++++++++++++++++++++++++++++此處記錄offset+++++++++++++++++++++++++++++++++++++++")  
          logger.info(s"${o.topic}  ${o.partition}  ${o.fromOffset}  ${o.untilOffset}")  
          logger.info("+++++++++++++++++++++++++++++++此處消費數據操作++++++++++++++++++++++++++++++++++++++")  
          logger.info("The kafka  line is " + line)  
        })  
      })  
    })  
    ssc  
  }  
  
  //構建Map  
  def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {  
    var fromOffsets: Map[TopicAndPartition, Long] = Map()  
    for (offset <- list) {  
      val tp = TopicAndPartition(offset._1, offset._2)//topic和分區數  
      fromOffsets += (tp -> offset._3)           // offset位置  
    }  
    fromOffsets  
  }  
}  

四、參考文檔:
    1、spark API  http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$
    2、Kafka官方配置說明:http://kafka.apache.org/documentation.html#configuration
    3、Kafka SampleConsumer:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
    4、Spark streaming 消費遍歷offset說明:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html
    5、Kafka官方API說明:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
注:以上測試通過,可以根據需要修改。如有疑問,請留言!

 

重復這個實驗的注意事項

1.首先要知道自己topic ,分區數,checkpoint的文件夾

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test

然后在下面的代碼設置每個分區的起始位置,

val offsetList = List((topics, 0, 0L),(topics, 1, 0L),(topics, 2, 0L))
每次運行之后都上次的消費記錄都會記錄在checkpint中,比如第一次運行是從0開始消費的,程序暫停之后會從checkpoint中讀取上次的位置然后基礎消費
注意的地方是checkpoint要和topic一一對應.不然會報錯,還有分分區的個數如果是3個list'里面就有三個,且是從0開始的
如果換了topic要記得換checkpoint

現在的代碼可以保證每次啟動之后從上次的問題開始消費.
從指定位置消費的做法是,切換一個新的checkpoint文件夾,在
val offsetList = List((topics, 0, 120L),(topics, 1, 0L),(topics, 2, 0L))中執行指定哪個分區從哪里開始消費,此時是指0號分區從120個偏移量開始消費

此時offset自己管理是沒有存儲在zk中的是查詢不到的
val offsetList = List((topics, 0, 0L),(topics, 1, 0L),(topics, 2, 0L))
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(checkpointDirectory)
})

http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

不進入斷點的原因
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the StreamingContext
will be created by called the provided `creatingFunc`.

Storing Offsets Outside Kafka

The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. This is not always possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality.

 lz實現的代碼

package my.bigdata.studyKafka

/**
  * Created by lq on 2017/8/30.
  */
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, TaskContext}
import org.slf4j.LoggerFactory


object ReadBySureOffsetTest {
  val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass)

  def main(args: Array[String]) {
    //設置打印日志級別
    System.setProperty("HADOOP_USER_NAME", "root")
    Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    logger.info("測試從指定offset消費kafka的主程序開始")
    if (args.length < 1) {
      System.err.println("Your arguments were " + args.mkString(","))
      System.exit(1)
      logger.info("主程序意外退出")
    }
    //hdfs://hadoop1:8020/user/root/spark/checkpoint
    val Array(checkpointDirectory) = args
    logger.info("checkpoint檢查:" + checkpointDirectory)
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => {
        createContext(checkpointDirectory)
      })
    logger.info("streaming開始啟動")
    ssc.start()
    ssc.awaitTermination()
  }

  def createContext(checkpointDirectory: String): StreamingContext = {
    //獲取配置
    val brokers = "slave1:9092,slave2:9092"
    val topics = "maats1"

    //默認為5秒
    val split_rdd_time = 8
    // 創建上下文
    val sparkConf = new SparkConf()
      .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]")
      .set("spark.app.id", "streaming_kafka")

    val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time))

    ssc.checkpoint(checkpointDirectory)

    // 創建包含brokers和topic的直接kafka流
    val topicsSet: Set[String] = topics.split(",").toSet
    //kafka配置參數
    val kafkaParams: Map[String, String] = Map[String, String](
      "metadata.broker.list" -> brokers,
      "group.id" -> "first3",
      "serializer.class" -> "kafka.serializer.StringEncoder"
      //      "auto.offset.reset" -> "largest"   //自動將偏移重置為最新偏移(默認)
      //      "auto.offset.reset" -> "earliest"  //自動將偏移重置為最早的偏移
      //      "auto.offset.reset" -> "none"      //如果沒有為消費者組找到以前的偏移,則向消費者拋出異常
    )
    /**
      * 從指定位置開始讀取kakfa數據
      * 注意:由於Exactly  Once的機制,所以任何情況下,數據只會被消費一次!
      *      指定了開始的offset后,將會從上一次Streaming程序停止處,開始讀取kafka數據
      */
    //val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L))                          //指定topic,partition_no,offset
    val offsetList = List((topics, 0, 230L),(topics, 1, 0L),(topics, 2, 0L))
    val fromOffsets = setFromOffsets(offsetList)     //構建參數
    val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //構建MessageAndMetadata
    //使用高級API從指定的offset開始消費,欲了解詳情,
    //請進入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看
    val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

    //數據操作
    messages.foreachRDD(mess => {
      //獲取offset集合
      val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges
      mess.foreachPartition(lines => {
        lines.foreach(line => {
          val o: OffsetRange = offsetsList(TaskContext.get.partitionId)
          logger.info("++++++++++++++++++++++++++++++此處記錄offset+++++++++++++++++++++++++++++++++++++++")
          logger.info(s"${o.topic}  ${o.partition}  ${o.fromOffset}  ${o.untilOffset}")
          logger.info("+++++++++++++++++++++++++++++++此處消費數據操作++++++++++++++++++++++++++++++++++++++")
          logger.info("The kafka  line is " + line)
        })
      })
    })
    ssc
  }

  //構建Map
  def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
    var fromOffsets: Map[TopicAndPartition, Long] = Map()
    for (offset <- list) {
      val tp = TopicAndPartition(offset._1, offset._2)//topic和分區數
      fromOffsets += (tp -> offset._3)           // offset位置
    }
    fromOffsets
  }
}

 



 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM