windows 下部署kafka 日記 轉


一、下載
  去apache 的官網(http://kafka.apache.org/downloads.html)下載最新的二進制版的壓
  縮包。目前的最新版本是kafka_2.11-0.8.2.1.tgz。
二、解壓
  直接解壓到D 盤根目錄下。
三、修改配置文件
  注意版本不同,可能配置文件不同。請參照實際情況修改。
  1.修改log4j.properties 文件中的“kafka.logs.dir=logs ”為“kafka.logs.dir=/tmp/logs”。
  2.修改test-log4j.properties 文件中的4 處“ File=logs/server.log ” 為 “File=/tmp/logs/server.log”。
  3.復制config/server.properties 為 第一個節點server-9093.properties,修改如下內容:
    broker.id=9093
    port=9093
    host.name=localhost
    log.dirs=/tmp/kafka-logs-9093
  4.復制config/server.properties 為 第一個節點server-9094.properties,修改如下內容:
    broker.id=9094
    port=9094
    host.name=localhost
    log.dirs=/tmp/kafka-logs-9094
四、修改啟動文件
  刪除所有不能在windows 下運行的*.sh 文件。復制kafka_2.11-0.8.2.1/bin/windows/*.bat 到kafka_2.11-0.8.2.1/bin 目錄下。
  1.修改zookeeper-server-start.bat 文件中的“%~dp0../../”為“%~dp0/../”。
  2.修改kafka-server-start.bat 文件中的“%~dp0../../”為“%~dp0/../”。
  3.修改kafka-run-class.bat 文件中的“pushd %~dp0..\..”為“pushd %~dp0”。 修改kafka-run-class.bat 文件中的“%BASE_DIR%/config”為“%BASE_DIR%/../config”。
  將kafka-run-class.bat 文件中的如下內容:
  rem Classpath addition for kafka-core dependencies
    for %%i in (%BASE_DIR%\core\build\dependant-libs-%SCALA_VERSION%\*.jar) do (
    call :concat %%i
  )
  rem Classpath addition for kafka-perf dependencies
    for %%i in (%BASE_DIR%\perf\build\dependant-libs-%SCALA_VERSION%\*.jar) do (
    call :concat %%i
  )
  rem Classpath addition for kafka-clients
    for %%i in (%BASE_DIR%\clients\build\libs\kafka-clients-*.jar) do (
    call :concat %%i
  )
  rem Classpath addition for kafka-examples
    for %%i in (%BASE_DIR%\examples\build\libs\kafka-examples-*.jar) do (
    call :concat %%i
  )
  rem Classpath addition for contrib/hadoop-consumer
    for %%i in (%BASE_DIR%\contrib\hadoop-consumer\build\libs\kafka-hadoop-consumer-*.jar)
    do (
    call :concat %%i
  )
  rem Classpath addition for contrib/hadoop-producer
    for %%i in (%BASE_DIR%\contrib\hadoop-producer\build\libs\kafka-hadoop-producer-*.jar) do
    (
    call :concat %%i
  )
  rem Classpath addition for release
    for %%i in (%BASE_DIR%\libs\*.jar) do (
    call :concat %%i
  )
  rem Classpath addition for core
    for %%i in (%BASE_DIR%\core\build\libs\kafka_%SCALA_BINARY_VERSION%*.jar) do (
    call :concat %%i
  )
  修改為:
  rem Classpath addition for release
    for %%i in (%BASE_DIR%\..\libs\*.jar) do (
    call :concat %%i
  )
五、啟動zookeeper
  打開命令提示符窗口,在kafka_2.11-0.8.2.1/bin 目錄下執行如下命令: zookeeper-server-start ../config/zookeeper.properties
六、啟動kafka
  打開命令提示符窗口,在kafka_2.11-0.8.2.1/bin 目錄下執行如下命令,啟動第一個節點:
  kafka-server-start.bat ..\config\server-9093.properties
  打開命令提示符窗口,在kafka_2.11-0.8.2.1/bin 目錄下執行如下命令,啟動第二個節點:
  kafka-server-start.bat ..\config\server-9094.properties
七、創建topic 批處理腳本  執行報錯,不影響 。
  打開命令提示符窗口,在kafka_2.11-0.8.2.1/bin 目錄下執行如下命令:
  kafka-topics.bat kafka.admin.TopicCommand %*
八、創建topic
  打開命令提示符窗口,在kafka_2.11-0.8.2.1/bin 目錄下執行如下命令(replication-factor 設置為kafka 節點數):
  kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic
九、查看topic 描述信息
  打開命令提示符窗口,在kafka_2.11-0.8.2.1/bin 目錄下執行如下命令:kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic
十、生產topic 消息
  打開命令提示符窗口,在kafka_2.11-0.8.2.1/bin 目錄下執行如下命令: kafka-console-producer.bat --broker-list localhost:9093 --topic my-replicated-topic
  啟動成功后,即可在此窗口輸入測試內容,回車后即可發送成功。
十一、消費topic 消息
  打開命令提示符窗口, 在kafka_2.11-0.8.2.1/bin 目錄下執行如下命令:
  kafka-console-consumer.bat --zookeeper localhost:2181 --from-beginning --topic
  my-replicated-topic
  啟動成功后,即可實時在此窗口看到生產的消息。
十二、未解決的異常信息
  命令提示符還會輸入一些異常信息,還未解決。但是目前看,不影響使用。
  1. 啟動kafka 時:
    [2015-05-14 17:36:50,027] INFO Initiating client connection, connectString=localhost:2181
    sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2e716cb7 (org.apache.zo
    okeeper.ZooKeeper)
    [2015-05-14 17:36:50,058] INFO Opening socket connection to server 127.0.0.1/127.0.0.1:2181.
    Will not attempt to authenticate using SASL (java.lang.SecurityException: 無 法定位登錄配置) (org.apache.zookeeper.ClientCnxn)
    [2015-05-14 17:36:50,063] INFO Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
    [2015-05-14 17:36:50,110] INFO Session establishment complete on server
    127.0.0.1/127.0.0.1:2181, sessionid = 0x14d51c7fc560000, negotiated timeout = 6000
    (org.apache.zookeeper.ClientCnxn)
    [2015-05-14 17:36:50,112] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
    [2015-05-14 17:36:50,273] INFO Loading logs. (kafka.log.LogManager)
    [2015-05-14 17:36:50,323] INFO Recovering unflushed segment 0 in log my-replicated-topic-0.(kafka.log.Log)
    [2015-05-14 17:36:50,335] WARN Error when freeing index buffer (kafka.log.OffsetIndex)
    java.lang.NullPointerException
    at kafka.log.OffsetIndex.kafka$log$OffsetIndex$$forceUnmap(OffsetIndex.scala:301)
    at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
    at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
    at kafka.log.Log.loadSegments(Log.scala:179)
    at kafka.log.Log.<init>(Log.scala:67)
    at
    kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142)
    at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at
    java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662)
    [2015-05-14 17:36:50,344] INFO Completed load of log my-replicated-topic-0 with log end offset 1 (kafka.log.Log)
    [2015-05-14 17:36:50,351] INFO Recovering unflushed segment 0 in log my-replicated-topic-1.(kafka.log.Log)
    [2015-05-14 17:36:50,355] WARN Error when freeing index buffer (kafka.log.OffsetIndex)
    java.lang.NullPointerException
    at kafka.log.OffsetIndex.kafka$log$OffsetIndex$$forceUnmap(OffsetIndex.scala:301)
    at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
    at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
    at kafka.log.Log.loadSegments(Log.scala:179)
    at kafka.log.Log.<init>(Log.scala:67)
    at
    kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142)
    at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at
    java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662)
    [2015-05-14 17:36:50,358] INFO Completed load of log my-replicated-topic-1 with log end offset 13 (kafka.log.Log)
    [2015-05-14 17:36:50,363] INFO Recovering unflushed segment 0 in log my-replicated-topic-2.(kafka.log.Log)
    [2015-05-14 17:36:50,367] WARN Error when freeing index buffer (kafka.log.OffsetIndex)
    java.lang.NullPointerException
    at kafka.log.OffsetIndex.kafka$log$OffsetIndex$$forceUnmap(OffsetIndex.scala:301)
    at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
    at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
    at kafka.log.Log.loadSegments(Log.scala:179)
    at kafka.log.Log.<init>(Log.scala:67)
    at
    kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142)
    at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at
    java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:662)
    [2015-05-14 17:36:50,371] INFO Completed load of log my-replicated-topic-2 with log end offset 14 (kafka.log.Log)
  2. 查看topic 描述信息時:
    D:\kafka_2.11-0.8.2.1\bin>kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic
    [2015-05-14 17:40:25,209] ERROR Unable to open socket to
    0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxnSocketNIO)
    [2015-05-14 17:40:25,210] WARN Session 0x0 for server null, unexpected error, closing socket
    connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
    java.net.SocketException: Address family not supported by protocol family: connect
    at sun.nio.ch.Net.connect(Native Method)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532)
    at
    org.apache.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:277)
    at
    org.apache.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:287)
    at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:967)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1003)
    Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:2 Configs:
    Topic: my-replicated-topic Partition: 0 Leader: 9093 Replicas:9094,9093 Isr: 9093,9094
    Topic: my-replicated-topic Partition: 1 Leader: 9093 Replicas: 9093,9094 Isr: 9093,9094
    Topic: my-replicated-topic Partition: 2 Leader: 9093 Replicas: 9094,9093 Isr: 9093,9094
  3. 生產topic 消息時
    D:\kafka_2.11-0.8.2.1\bin>kafka-console-producer.bat --broker-list localhost:9093 --topic my-replicated-topic
    [2015-05-14 17:42:27,943] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
  4. 消費topic 消息時
    D:\kafka_2.11-0.8.2.1\bin>kafka-console-consumer.bat --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
    [2015-05-14 17:47:11,588] ERROR Unable to open socket to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxnSocketNIO)
    [2015-05-14 17:47:11,589] WARN Session 0x0 for server null, unexpected error, closing socket
    connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
    java.net.SocketException: Address family not supported by protocol family: connect
    at sun.nio.ch.Net.connect(Native Method)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532)
    at
    org.apache.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:277)
    at
    org.apache.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:287)
    at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:967)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1003)

 

安裝過程遇到的問題:

  1. kafka啟動后提示
    Unrecognized VM option ‘+UseCompressedOops‘ Could not create the Java virtual machine.
    開始以為是內存大小的問題, 后來發現不是,是JDK的問題, 我用的32位centos,jdk1.6_24,  換成JDK1.7依然報錯。
    查看 bin/kafka-run-class.sh  找到
    if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
      KAFKA_JVM_PERFORMANCE_OPTS="-server  -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
      -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true" fi
    去掉-XX:+UseCompressedOops
    JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
    啟動成功


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM