很多人對kafka消息隊列應該不陌生,使用起來也比較方便。對kafka最常見的操作一般有如下幾種:
- 啟動kafka集群
- 創建一個名稱為xxx的主題(topic)
- 查看已經創建好的主題
- 向xxx這個主題中插入一些數據
- 從xxx這個主題中消費一些數據
- bin/kafka-server-start.sh //作用是啟動kafka服務端進程
- bin/kafka-topics.sh //作用是創建、查看主題
- bin/kafka-console-producer.sh //命令行方式的生產者
- bin/kafka-console-consumer.sh //命令行方式的消費者
下面是一些常用的操作實例:
- 啟動kafka集群:
- 創建一個名稱是test的主體(topic)
- 查看創建的主題(topic)
- 往test這個topic中插入一些數據,用kafka-console-producer
- 從test這個topic中消費一些數據,用kafka-console-consumer從最開頭開始讀取數據:
正文
kafka-server-start.sh執行流程分析
首先分析一下運行bin/kafka-server-start.sh config/server.properties的時候,kafka源碼的執行流程。
第一步:先看看kafka-server-start.sh這個腳本,如下
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 此處是判斷一下執行kafka-server-start.sh 這個腳本后邊的參數的個數,
# 如果小於1,那么說明沒有傳入server.properties這個文件的路徑,
# 那就打印使用說明給用戶並直接退出,不繼續執行后續的腳本。
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
#該腳本所在的文件夾的路徑,例如:/data/kafka/bin/
base_dir=$(dirname $0)
# 檢查KAFKA_LOG4J_OPTS這個變量是否已經被設置
# 如果沒有,就設置為
# -Dlog4j.configuration=file:$base_dir/../config/log4j.properties
# 其中的$base_dir會被替換為真實路徑
# 假設base_dir是/data/kafka/bin/的話
# KAFKA_LOG4J_OPTS=
# "-Dlog4j.configuration=file:/data/kafka/bin/../config/log4j.properties"
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
# 檢查KAFKA_HEAP_OPTS這個變量是否已經被設置
# 如果沒有的話,就設置為
# -Xmx1G -Xms1G
# 即 KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
# EXTRA_ARGS="-name kafkaServer -loggc"
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
# $1是執行腳本時的第一個參數,比如
# sh 1.sh a b c
# 那么$1='a' $2='b' $3='c'
# 所以此處是檢查運行kafka-server-start.sh 這個腳本的時候,第一個參數是不是-daemon
# 假設我運行的是./kafka-server-start.sh -daemon ../conf/server.properties
# 那么就會進入下面的 -daemon) 分支
# 假設我運行的是./kafka-server-start.sh ../conf/server.properties
# 那么就會進入到下面的 *) 分支
# -daemon 分支做的事情是 給EXTRA_ARGS這個變量再追加一個參數,
# 原來EXTRA_ARGS="-name kafkaServer -loggc",
# 追加后變成了"-daemon -name kafkaServer -loggc"
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
#最后運行程序啟動kafka server
# 可以把exec改成 echo 打印出來看看 下面的腳本都被替換成啥了
# 我本機輸出了 bin/kafka-run-class.sh -name kafkaServer -loggc kafka.Kafka -deamon config/server.properties
#也就是說其實最后運行的是 bin/kafka-run-class.sh -name kafkaServer -loggc kafka.Kafka -deamon config/server.properties
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
兜兜轉轉,我們發現我執行的
bin/kafka-server-start.sh -daemon conf/server.properties
最后在kafka-server-start.sh腳本里面進行一番轉換之后變成了
bin/kafka-run-class.sh -name kafkaServer -loggc kafka.Kafka -deamon config/server.properties
那么接着我們就開始分析bin/kafka-run-class.sh這個腳本,還是貼一下這個腳本完整代碼,再結合代碼做一些解釋。
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#如果參數的個數小於1,進入if條件
if [ $# -lt 1 ];
then
#打印該腳本的使用方式給用戶。
echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
exit 1
fi
# 執行一下linux命令 uname -a ,看一下的結果中是不是有“CYGWIN”字符串
if [[ $(uname -a) =~ "CYGWIN" ]]; then
#如果有的話,就把CYGWIN設置為1
CYGWIN=1
else
#如果沒有匹配到“CYGWIN”字符串,則把CYGWIN設置為0
CYGWIN=0
fi
# 判斷一下$INCLUDE_TEST_JARS變量的值,是否為空;是的話進入if中
if [ -z "$INCLUDE_TEST_JARS" ]; then
#把INCLUDE_TEST_JARS設置為false
INCLUDE_TEST_JARS=false
fi
#一個用來匹配測試用的jar包的正則表達式
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
#一個函數,用來判斷一個文件是不是測試相關的或者文檔相關的。
#如果傳入的文件是測試相關的,返回0
#如果傳入的是與運行程序息息相關的,重要的,則返回1
should_include_file() {
if [ "$INCLUDE_TEST_JARS" = true ]; then
return 0
fi
file=$1
if [ -z "$(echo "$file" | egrep "$regex")" ] ; then
return 0
else
return 1
fi
}
#設置base_dir為當前腳本bin文件夾的上一級目錄。
#比如我的kafka-run-class.sh路徑是/data/kafka2.2/bin/,那么base_dir就是/data/kafka2.2/
base_dir=$(dirname $0)/..
# 判斷一下$SCALA_VERSION變量的值,是否為空;是的話進入if中
if [ -z "$SCALA_VERSION" ]; then
#設置$SCALA_VERSION為2.13.2
SCALA_VERSION=2.13.2
#判斷一下base_dir目錄下是不是有gradle.properties文件,如果有的話,進入if邏輯塊
if [[ -f "$base_dir/gradle.properties" ]]; then
#設置SCALA_VERSION變量為從gradle.properties中查詢到的值,比如gradle.properties中可能有一行scalaVersion=2.11.1
#那么用cut切掉等號左邊的部分,剩下2.11.1。把2.11.1設置到
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi
fi
# 判斷一下$SCALA_BINARY_VERSION內容的長度是不是0,是則進入if
if [ -z "$SCALA_BINARY_VERSION" ]; then
#設置SCALA_BINARY_VERSION變量的值為SCALA_VERSION的前三分之二。
#比如SCALA_VERSION=2.11.1 那么SCALA_BINARY_VERSION=2.11
SCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')
fi
done
#這里設置nullglob開啟,這個nullglob開啟之后,shell腳本或者終端中可以識別通配符*,關閉后不能用*這個通配符
shopt -s nullglob
# 判斷一下$UPGRADE_KAFKA_STREAMS_TEST_VERSION變量的值,是否為空;是的話進入if中
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
#依次把base_dir/core/build/文件夾下dependant-libs-${SCALA_VERSION}開頭的文件夾路徑 賦值給 dir變量
for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
do
#把dir變量的內容追加到$CLASSPATH中
CLASSPATH="$CLASSPATH:$dir/*"
done
fi
#依次把base_dir/examples/build/libs/文件夾下kafka-examples開頭,.jar結尾的文件全路徑(含文件名稱) 賦值給 file變量
for file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
do
#判斷一下,如果should_include_file返回1,也就是說是重要的jar包,就進入到if邏輯
if should_include_file "$file"; then
#添加$file變量的內容到$CLASSPATH中
CLASSPATH="$CLASSPATH":"$file"
fi
done
# 判斷一下$UPGRADE_KAFKA_STREAMS_TEST_VERSION變量的值,是否為空;是的話進入if中
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
#設置clients_lib_dir為 當期腳本所在目錄/../clients/build/libs
clients_lib_dir=$(dirname $0)/../clients/build/libs
#設置streams_lib_dir為 當期腳本所在目錄/../streams/build/libs
streams_lib_dir=$(dirname $0)/../streams/build/libs
#設置streams_dependant_clients_lib_dir為 當期腳本所在目錄/../streams/build/dependant-libs-${SCALA_VERSION}
streams_dependant_clients_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
else
#設置clients_lib_dir為 /opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
#設置streams_lib_dir為 和clients_lib_dir一樣
streams_lib_dir=$clients_lib_dir
#設置streams_dependant_clients_lib_dir為 和clients_lib_dir一樣
streams_dependant_clients_lib_dir=$streams_lib_dir
fi
#依次把$clients_lib_dir文件夾下kafka-clients開頭,.jar結尾的文件全路徑(含文件名稱) 賦值給 file變量
for file in "$clients_lib_dir"/kafka-clients*.jar;
do
#判斷一下,如果should_include_file返回1,也就是說是重要的jar包,就進入到if邏輯
if should_include_file "$file"; then
#添加$file變量的內容到$CLASSPATH中
CLASSPATH="$CLASSPATH":"$file"
fi
done
#依次把$streams_lib_dir文件夾下kafka-streams開頭,.jar結尾的文件全路徑(含文件名稱) 賦值給 file變量
for file in "$streams_lib_dir"/kafka-streams*.jar;
do
#判斷一下,如果should_include_file返回1,也就是說是重要的jar包,就進入到if邏輯
if should_include_file "$file"; then
#添加$file變量的內容到$CLASSPATH中
CLASSPATH="$CLASSPATH":"$file"
fi
done
# 判斷一下$UPGRADE_KAFKA_STREAMS_TEST_VERSION變量的內容的長度是否為0,是則進入
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
#依次把base_dir/streams/examples/build/libs/文件夾下kafka-streams-examples開頭,.jar結尾的文件全路徑(含文件名稱) 賦值給 file變量
for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
do
#判斷一下,如果should_include_file返回1,也就是說是重要的jar包,就進入到if邏輯
if should_include_file "$file"; then
#添加$file變量的內容到$CLASSPATH中
CLASSPATH="$CLASSPATH":"$file"
fi
done
else
#替換$UPGRADE_KAFKA_STREAMS_TEST_VERSION中的小數點,比如$UPGRADE_KAFKA_STREAMS_TEST_VERSION=2.12.1
#替換后變成2121
VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
#SHORT_VERSION_NO_DOTS等於VERSION_NO_DOTS剔除最后一位,比如VERSION_NO_DOTS等於2121 SHORT_VERSION_NO_DOTS等於212
SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
#依次把base_dir/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/文件夾下
#kafka-streams-upgrade-system-tests開頭,.jar結尾的文件全路徑(含文件名稱) 賦值給 file變量
for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
do
#判斷一下,如果should_include_file返回1,也就是說是重要的jar包,就進入到if邏輯
if should_include_file "$file"; then
#添加$file變量的內容到$CLASSPATH中
CLASSPATH="$file":"$CLASSPATH"
fi
done
#如果$SHORT_VERSION_NO_DOTS等於0100,
if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then
#追加/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar到classpath
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
#追加/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar到classpath
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
fi
#如果$SHORT_VERSION_NO_DOTS等於0101
if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then
#追加/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar到classpath
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
#追加/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar到classpath
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
fi
fi
#遍歷$streams_dependant_clients_lib_dir文件夾下的rocksdb開頭,.jar結尾的文件
for file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
do
#依次添加文件到classpath中
CLASSPATH="$CLASSPATH":"$file"
done
#遍歷$streams_dependant_clients_lib_dir文件夾下的中間匹配hamcrest字符串,.jar結尾的文件
for file in "$streams_dependant_clients_lib_dir"/*hamcrest*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done
#遍歷$streams_dependant_clients_lib_dir文件夾下的kafka-tools開頭,.jar結尾的文件
for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
do
#判斷一下,如果should_include_file返回1,也就是說是重要的jar包,就進入到if邏輯
if should_include_file "$file"; then
#添加$file變量的內容到$CLASSPATH中
CLASSPATH="$CLASSPATH":"$file"
fi
done
#依次把base_dir/tools/build/文件夾下dependant-libs-${SCALA_VERSION}開頭的文件夾路徑 賦值給 dir變量
for dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;
do
#添加$file變量的內容到$CLASSPATH中
CLASSPATH="$CLASSPATH:$dir/*"
done
#依次把 "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension" 逐個賦值給cc_pkg變量
for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
do
#遍歷$base_dir/connect/${cc_pkg}/build/libs/文件夾下的connect-${cc_pkg}開頭,.jar結尾的文件
for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
do
#判斷一下,如果should_include_file返回1,也就是說是重要的jar包,就進入到if邏輯
if should_include_file "$file"; then
#添加$file變量的內容到$CLASSPATH中
CLASSPATH="$CLASSPATH":"$file"
fi
done
#判斷$base_dir/connect/${cc_pkg}/build/dependant-libs文件夾是否存在,是的話進入if
if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
#把base_dir/connect/${cc_pkg}/build/dependant-libs/下的所有文件都加入到classpath中去
CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"
fi
done
# 遍歷base_dir文件夾下的libs
for file in "$base_dir"/libs/*;
do
#判斷一下,如果should_include_file返回1,也就是說是重要的jar包,就進入到if邏輯
if should_include_file "$file"; then
#添加$file變量的內容到$CLASSPATH中
CLASSPATH="$CLASSPATH":"$file"
fi
done
#遍歷base_dir/core/build/libs/文件夾下所有kafka_${SCALA_BINARY_VERSION}開頭的文件
for file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
do
#判斷一下,如果should_include_file返回1,也就是說是重要的jar包,就進入到if邏輯
if should_include_file "$file"; then
#添加$file變量的內容到$CLASSPATH中
CLASSPATH="$CLASSPATH":"$file"
fi
done
#到此關閉nullglob,也就是說后邊用不到*去匹配文件和文件夾了
shopt -u nullglob
#判斷$CLASSPATH的內容長度是不是0,是則為真
if [ -z "$CLASSPATH" ] ; then
#打印錯誤信息
echo "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'"
#退出腳本,不再向下繼續執行
exit 1
fi
#判斷$KAFKA_JMX_OPTS的內容長度是不是0,是則為真
if [ -z "$KAFKA_JMX_OPTS" ]; then
#設置KAFKA_JMX_OPTS的值為-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
#判斷$JMX_PORT是不是空的,空的話就進入if塊
if [ $JMX_PORT ]; then
#設置KAFKA_JMX_OPTS的值為 $KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi
#判斷$LOG_DIR是不是空的,空的話就進入if塊
if [ "x$LOG_DIR" = "x" ]; then
#LOG_DIR=$base_dir/log
LOG_DIR="$base_dir/logs"
fi
#判斷$KAFKA_LOG4J_OPTS內容的長度是不是0,是則為真
if [ -z "$KAFKA_LOG4J_OPTS" ]; then
#設置LOG4J_DIR為$base_dir/config/tools-log4j.properties
LOG4J_DIR="$base_dir/config/tools-log4j.properties"
# 如果檢測到CYGWIN,那么需要轉換LOG4J_DIR中的path
(( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
#設置KAFKA_LOG4J_OPTS為 -Dlog4j.configuration=file:${LOG4J_DIR}
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
else
# 如果不存在$LOG_DIR這個文件夾,那么就創建一個文件夾
if [ ! -d "$LOG_DIR" ]; then
mkdir -p "$LOG_DIR"
fi
fi
# 如果檢測到CYGWIN,那么需要轉換LOG_DIR中的path
(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
# 判斷$KAFKA_OPTS這個變量內容長度為0,則為真
if [ -z "$KAFKA_OPTS" ]; then
#如果為空,設置為""
KAFKA_OPTS=""
fi
#如果$KAFKA_DEBUG的內容為空
if [ "x$KAFKA_DEBUG" != "x" ]; then
# 設置DEFAULT_JAVA_DEBUG_PORT變量為5005
DEFAULT_JAVA_DEBUG_PORT="5005"
#如果$JAVA_DEBUG_PORT的長度為0,則為真
if [ -z "$JAVA_DEBUG_PORT" ]; then
#設置JAVA_DEBUG_PORT為$DEFAULT_JAVA_DEBUG_PORT
JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
fi
# DEFAULT_JAVA_DEBUG_OPTS設置為-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT
DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
# 檢查$JAVA_DEBUG_OPTS的內容是不是長度為0,是則進入if
if [ -z "$JAVA_DEBUG_OPTS" ]; then
#設置JAVA_DEBUG_OPTS的值等於$DEFAULT_JAVA_DEBUG_OPTS
JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
fi
#打印 Enabling Java debug options: $JAVA_DEBUG_OPTS 到屏幕
echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
#設置KAFKA_OPTS的變量值為 $JAVA_DEBUG_OPTS 拼接上$KAFKA_OPTS
KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi
#判斷一下使用那個JAVA_HOME
#首先判斷JAVA_HOME這個變量的內容的長度是不是0,是則進入if
if [ -z "$JAVA_HOME" ]; then
#設置JAVA變量的值為"JAVA"
JAVA="java"
else
#設置JAVA變量的值為$JAVA_HOME/bin/java
JAVA="$JAVA_HOME/bin/java"
fi
#設置內存相關的參數
#首先判斷一下$KAFKA_HEAP_OPTS變量內容的長度是不是0,是則進入if
if [ -z "$KAFKA_HEAP_OPTS" ]; then
#設置KAFKA_HEAP_OPTS的值為 -Xmx256M
KAFKA_HEAP_OPTS="-Xmx256M"
fi
# JVM性能參數
# MaxInlineLevel=15是自JDK 14以來的默認值,一旦不再支持舊版JDK,就可以刪除。
#首先判斷一下$KAFKA_JVM_PERFORMANCE_OPTS變量內容的長度是不是0,是則進入if
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
#設置KAFKA_JVM_PERFORMANCE_OPTS的值為-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
fi
#首先$#代表的是腳本后邊的參數的個數
#-gt是 greater than 的縮寫,代表的意思是 大於
#判斷一下參數的個數是不是大於0,是則進入
while [ $# -gt 0 ]; do
#設置COMMAND的值為第一個參數
COMMAND=$1
# 判斷COMMAND的內容
case $COMMAND in
#如果COMMAND的內容為-name
-name)
#設置DAEMON_NAME的值為緊隨COMMAND后邊的第二個參數
DAEMON_NAME=$2
#設置CONSOLE_OUTPUT_FILE的值為$LOG_DIR/$DAEMON_NAME.out
CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
#剔除原來的前2個參數,把原來的第三個參數變為第一個參數,原來的第四個參數變為第二個參數
shift 2
;;
#如果COMMAND的內容為-loggc
-loggc)
#判斷$KAFKA_GC_LOG_OPTS,如果$KAFKA_GC_LOG_OPTS的內容長度為0,則進入if
if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
#GC_LOG_ENABLED的值設置為true
GC_LOG_ENABLED="true"
fi
#剔除原來的1個參數,把原來的第二個參數變為第一個參數,原來的第三個參數變為第二個參數
shift
;;
#如果COMMAND的內容為-daemon
-daemon)
#設置DAEMON_MODE的值為true
DAEMON_MODE="true"
#剔除原來的1個參數,把原來的第二個參數變為第一個參數,原來的第三個參數變為第二個參數
shift
;;
*)
break
;;
esac
done
# GC相關的參數設置
#設置GC_FILE_SUFFIX的值為-gc.log
#設置GC_LOG_FILE_NAME的指為''
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
#如果GC_LOG_ENABLED的值等於true的話
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
#GC_LOG_FILE_NAME的值設置為$DAEMON_NAME追加$GC_FILE_SUFFIX
GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
# 版本號的第一段,對於Java 9之前的版本來說是 "1"。
# 然后變成'9', '10', ...
# `java --version`第一行的一些例子。
# 8 -> java版本 "1.8.0_152"
# 9.0.4 ->java版本 "9.0.4"
# 10 ->java版本 "10" 2018-03-20
# 10.0.1 -> java版本 "10.0.1" 2018-04-17
# 我們需要匹配到行尾,以防止sed打印不匹配的字符。
#此處匹配java的主版本號,jdk1.8之前的JAVA_MAJOR_VERSION都是1
#jdk1.8往后就變成了9,10,11等
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
#如果JAVA_MAJOR_VERSION的版本號大於等於9話
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
#設置KAFKA_GC_LOG_OPTS的值為 -Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400
KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
else
#否則設置為-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M
KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
fi
fi
# 從classpath中刪除可能的冒號前綴(當CLASSPATH為空白時,發生在`CLASSPATH="$CLASSPATH:$file"`這樣的行)。
# 右側使用的語法是原生的Bash字符串操作,更多細節請看下面的內容。
# http://tldp.org/LDP/abs/html/string-manipulation.html, 特別是標題為 "子串去除 "的部分。
#設置CLASSPATH的值為
CLASSPATH=${CLASSPATH#:}
#如果檢測到CGYWIN,需要把CLASSPATH的值轉化為windowns特有的格式
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
#登錄模式
#如果DAEMON_MODE是true的話
if [ "x$DAEMON_MODE" = "xtrue" ]; then
#后台執行程序,並打印到"$CONSOLE_OUTPUT_FILE" 文件中
nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
#前台之前,把日志打印到前台
exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
fi
上面對腳本進行了細致的分析,但是最重要的其實我們要分析最后幾行代碼,
我以bin/kafka-run-class.sh -name kafkaServer -loggc kafka.Kafka -deamon config/server.properties
為例子,最終經過這個kafka-run-class.sh腳本后,最后其實真實運行的腳本
nohup java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/home/hadoop/software/kafka_2.13-2.5.1/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/home/hadoop/software/kafka_2.13-2.5.1/bin/../logs -Dlog4j.configuration=file:./../config/log4j.properties -cp /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/activation-1.1.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/aopalliance-repackaged-2.5.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/argparse4j-0.7.0.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/audience-annotations-0.5.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/commons-cli-1.4.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/commons-lang3-3.8.1.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-api-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-basic-auth-extension-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-file-2.5.1.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-json-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-mirror-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-mirror-client-2.5.1.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-runtime-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-transforms-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/hk2-api-2.5.0.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/hk2-locator-2.5.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/hk2-utils-2.5.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-annotations-2.10.2.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-core-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-databind-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-dataformat-csv-2.10.2.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-datatype-jdk8-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-jaxrs-base-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-jaxrs-json-provider-2.10.2.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-module-jaxb-annotations-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-module-paranamer-2.10.2.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-module-scala_2.13-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jakarta.annotation-api-1.3.4.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jakarta.inject-2.5.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jakarta.xml.bind-api-2.3.2.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/javassist-3.22.0-CR2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/javassist-3.26.0-GA.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/javax.servlet-api-3.1.0.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jaxb-api-2.3.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-client-2.28.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-common-2.28.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-container-servlet-2.28.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-container-servlet-core-2.28.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-hk2-2.28.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-media-jaxb-2.28.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-server-2.28.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-client-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-continuation-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-http-9.4.24.v20191120.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-io-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-security-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-server-9.4.24.v20191120.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-servlet-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-servlets-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-util-9.4.24.v20191120.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jopt-simple-5.0.4.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka_2.13-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka_2.13-2.5.1-sources.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-clients-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-log4j-appender-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-streams-2.5.1.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-streams-examples-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-streams-scala_2.13-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-streams-test-utils-2.5.1.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-tools-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/log4j-1.2.17.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/lz4-java-1.7.1.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/maven-artifact-3.6.3.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-buffer-4.1.50.Final.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-codec-4.1.50.Final.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-common-4.1.50.Final.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-handler-4.1.50.Final.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-resolver-4.1.50.Final.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-transport-4.1.50.Final.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-transport-native-epoll-4.1.50.Final.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-transport-native-unix-common-4.1.50.Final.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/paranamer-2.8.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/plexus-utils-3.2.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/reflections-0.9.12.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/rocksdbjni-5.18.3.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/scala-collection-compat_2.13-2.1.3.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/scala-java8-compat_2.13-0.9.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/scala-library-2.13.1.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/scala-logging_2.13-3.9.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/scala-reflect-2.13.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/slf4j-api-1.7.30.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/snappy-java-1.1.7.3.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/validation-api-2.0.1.Final.jar: /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/zookeeper-3.5.8.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/zookeeper-jute-3.5.8.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/zstd-jni-1.4.4-7.jar "kafka.Kafka ../conf/server.properties" > "/home/hadoop/software/kafka_2.13-2.5.1/bin/../logs/kafkaServer.out" 2>&1 < /dev/null &
最終發現執行的是kafka.Kafka類,傳入的參數是 ../conf/server.properties,然后把日志輸出到/home/hadoop/software/kafka_2.13-2.4.1/logs/kafkaServer.out文件中。
后續的文章中我們將會繼續對kafka.Kafka的執行流程進行分析,本文先暫時分析shell腳本部分。
