實時分析之客戶畫像項目實踐



客戶畫像的背景描寫敘述

原來的互聯網,以解決用戶需求為目的。衍生出眾多的網聯網產品,以及產生呈數量級遞增的海量數據。當用戶需求基本得到滿足的時候,須要分析這些海量的數據。得以達到最高效的需求實現,最智能的功能服務。以及最精准的產品推薦,最后提升產品的競爭力。簡言之,產品由原來的需求驅動轉換成數據驅動。
客戶畫像就是數據驅動的代表作之中的一個。詳細點講,客戶畫像就是用戶的標簽(使用該產品的群體),程序能自己主動調整、組合、生成這些標簽,最后再通過這些標簽。達到精准營銷的目的。


當前流行的實時分析框架

首先一提到大數據,大家腦海中浮現的肯定是Hadoop。可是須要實時分析出結果的話,那Hadoop就力不從心了(先不講數據多少,單單啟動一個M/R就要幾分鍾的時間),假設沒有實時性需求的產品分析則另當別論。


當下最流行的三大實時分析框架各自是Apache SparkApache SamzaApache Storm。以下是網上找到的三大框架的說明和對照:
這里寫圖片描寫敘述
三者的總體框架類似,僅僅是各個節點的名字和術語不一樣罷了
這里寫圖片描寫敘述
Storm和Samza在消息發送處理的機制上是至少一次,而Spark是有且僅此一次,換句話講。Storm和Samza可能存在反復發送數據的情況;在消息處理上,Spark是秒級的,而Storm和Samza是壓秒級的(性能都不錯,壓秒級的也還是能夠接受^_^);在語言支持上,這個Storm貌似多點。

另外,Storm開源的也比較早,社區比較活躍。版本號迭代的比較快,文檔相對來說也比較多。Storm相對Spark也比較輕量級,上手簡單,這就是作者選擇Storm的原因,只是個人還是推薦Spark的。


環境准備、搭建和執行

以下是作者使用的軟件版本號
1. kafka2.11
2. zookeeper3.5.1
3. storm0.9.5

JDK的環境。這個都不明確的人也不用繼續看下去了。


作者在測試環境准備了4台虛擬機,改動每台虛擬機的/etc/hosts

172.16.2.235 master
172.16.2.231 slave1
172.16.2.236 slave2
172.16.2.241 slave3

235是主節點,其余三個是子節點,在主節點做好子節點免登錄權限設置
主機執行

    ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
    mv id_dsa.pub authorized_keys
    chmod 600 authorized_keys
    scp ~/.ssh/authorized_keys root@slave1:/root/.ssh/
    scp ~/.ssh/authorized_keys root@slave2:/root/.ssh/
    scp ~/.ssh/authorized_keys root@slave3:/root/.ssh/

(復制到各個從機上去)
每個從機都ssh進入一次 記錄從機信息


  • zookeeper

zookeeper是大數據必備的框架之中的一個。它是一個分布式的。開放源代碼的分布式應用程序協調服務,你能夠理解成每個子節點的任務控制中心
解壓
tar -zxvf zookeeper-3.5.1-alpha.tar.gz

配置
conf/zoo.cfg

initLimit=10
syncLimit=5
clientPort=2181
tickTime=2000
autopurge.purgeInterval=12
autopurge.snapRetainCount=3
dataDir=/home/zookeeper-3.5.1-alpha/data
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888
server.3=slave3:2888:3888

注意:須要在/home/zookeeper-3.5.1-alpha/data文件夾下創建一個myid文件,寫入該機的序列號,虛擬機就1。2累加下去

echo 0 >> /home/zookeeper-3.5.1-alpha/data/myid

啟動

/home/zookeeper-3.5.1-alpha/bin/zkServer.sh start &

jps一下。列表中出現QuorumPeerMain進程則代表啟動OK(各個子節點也啟動起來,以下的服務都依賴zookeeper)。


  • kafka

kafka,中文名叫卡夫卡,是一種高吞吐量的分布式公布訂閱消息系統,它能夠處理消費者規模的站點中的全部動作流數據。

簡言之,就是數據採集、發送器。
解壓

tar -zxvf kafka_2.11-0.8.2.0.tgz

配置。改動
config/server.properties

broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
host.name=master
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/kafka_2.11-0.8.2.0/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=master:2181,slave1:2181,slave2:2181,slave3:2181
zookeeper.connection.timeout.ms=6000
#真正刪除topic
delete.topic.enable=true

注意:這里的broker.id在各個子節點也不能反復
啟動

/home/kafka_2.11-0.8.2.0/bin/kafka-server-start.sh /home/kafka_2.11-0.8.2.0/config/server.properties &

jps一下,列表中出現Kafka進程則代表啟動OK。
驗證kafka集群執行是否正常:
訂閱日志
在log服務器上安裝kafka,僅僅解壓就好了。不須要配置,然后訂閱log

tail -0f /home/bigdata/logs/analytics.log | /home/kafka_2.11-0.8.2.0/bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092,slave3:9092 --topic bigdata_app_logs &

將最新一行的日志文件傳輸到kafka集群。消息隊列叫做bigdata_app_logs(這個ID在kafka集群中唯一)
再查詢隊列列表

./kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181

將會出現剛剛訂閱的topic:bigdata_app_logs

./kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181 --topic topic:bigdata_app_logs --from-beginning

將會實時同步log服務器上面的日志。這樣,kafka集群環境就搭建OK了

以下是作者自己整理的kafka流程圖:
這里寫圖片描寫敘述
這里日志採集有兩種方式。一種是站點程序通過log4j記錄的log文件,然后再客戶端執行,也就是上面介紹的那種。
還有一種就是通過KafkaLog4jAppender之間講日志傳輸到kafka集群,須要引入一個jar包

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.0</version>
        </dependency>

在log4j的兩種配置配置

log4j.logger.com.jjshome.bigdata.controller.CommonController=INFO,KAFKA_HIVE_AUDIT
log4j.appender.KAFKA_HIVE_AUDIT=kafka.producer.KafkaLog4jAppender
log4j.appender.KAFKA_HIVE_AUDIT.BrokerList=master:9092,slave1:9092,slave2:9092,slave3:9092
log4j.appender.KAFKA_HIVE_AUDIT.Topic=bigdata_app_logs
log4j.appender.KAFKA_HIVE_AUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA_HIVE_AUDIT.layout.ConversionPattern=%m%n
log4j.appender.KAFKA_HIVE_AUDIT.ProducerType=async
    <!-- kafka -->
    <appender name="KAFKA_HIVE_AUDIT" class="kafka.producer.KafkaLog4jAppender">
        <param name="DatePattern" value="'.'yyyy-MM-dd"/>
        <param name="BrokerList" value="master:9092,slave1:9092,slave2:9092,slave3:9092"/>
        <param name="Topic" value="jjs-fang-web-bigDatas"/>
        <param name="ProducerType" value="async"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %x - %m%n"/>
        </layout>
    </appender>

個人建議使用另外一種。可是要做好服務器之間的容錯機制,作者前期就吃過虧,在採集日志的時候,直接影響了業務流程。


  • storm

這里就不介紹了
解壓
tar -zxvf apache-storm-0.9.5.tar.gz

配置
conf/storm.yaml

 storm.zookeeper.servers:
     - "master"      - "slave1"      - "slave2"      - "slave3" 
 storm.local.dir: "/home/storm/data"
 nimbus.host: "master"
 supervisor.slots.ports:
    - 6700     - 6701     - 6702     - 6703  ui.port: 80

子節點配置都一樣。直接丟過去就好了
啟動
作者是在主節點啟動nimbus和ui、supervisor,其它的三個節點啟動supervisor
主節點

storm nimbus &
storm ui &
storm supervisor &

jps后出現nimbus和core、supervisor的進程,或者直接訪問http://master就可以(端口配置的是80)
這里寫圖片描寫敘述
:這里作者配置了環境變量。所以能夠直接storm
子節點分別都執行

storm supervisor &

以下是作者畫的storm結構圖
這里寫圖片描寫敘述
后面的數據落地,是結合業務。將數據存儲起來
好了,到此環境以及准備完畢。
若是要關閉各種進程。直接jps后直接kill掉。


Topology開發

topology是storm中job的別名,它的工作流程大概如圖:
這里寫圖片描寫敘述
這里spout消息發送源,bolt是數據處理節點,計算出來的記過能夠多次使用
項目准備:
storm-lib.zip
[big-data-client]
[big-data-storm]
第一個作者開發的Topology須要的lib包,將該lib替換到全部storm集群的storm/lib下
第二個作者開發環境須要的中間件,第三個storm項目。
項目中有兩個案例。一個TopN案例。一個客戶畫像案例(針對自自有業務的客戶畫像)
這里寫圖片描寫敘述
bolt是工作節點,remote是外部調用的數據接口,spout是消息源,topology是job主文件夾。
以下是客戶畫像的
Topology

package com.jjshome.storm.topology;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

import com.google.common.collect.ImmutableList;
import com.jjshome.storm.bolt.house.BoltFCWSplit;
import com.jjshome.storm.bolt.house.BoltLogFormat;
import com.jjshome.storm.bolt.house.BoltLogFormat4App;
import com.jjshome.storm.bolt.house.BoltSave;
import com.jjshome.storm.bolt.house.BoltThreshold;
import com.jjshome.storm.utils.CommonConstant;
import com.jjshome.storm.utils.StormRunner;

/** * @功能描寫敘述: 用戶行為分析的Topology * @項目版本號: 1.0.0 * @項目名稱: 大數據 * @相對路徑: com.jjshome.storm.topology.UserLogTopology.java * @創建作者: 歐陽文斌 * @問題反饋: ouyangwenbin2009@live.cn * @創建日期: 2015年12月7日 上午10:20:27 */ 
public class UserLogTopology {
    private static Logger logger = LoggerFactory.getLogger(UserLogTopology.class);
    /** 本地調試執行時間單位(秒) */
    private static final int DEFAULT_RUNTIME_IN_SECONDS = 60*30;
    /** kafka集群 */
    private static final String kafka_zookeeper_local = "master:2181,slave1:2181,slave2:2181,slave3:2181";
    private static final String kafka_zookeeper_online = "bigdata-99-51-master.jjshome.com:2181,bigdata-99-52-slave.jjshome.com:2181,bigdata-99-53-slave.jjshome.com:2181,bigdata-99-54-slave.jjshome.com:2181";
    /** Storm集群列表 */
    private static final List<String> zk_servers_local = ImmutableList.of("master","slave1", "slave2", "slave3");
    private static final List<String> zk_servers_online = ImmutableList.of("bigdata-99-51-master.jjshome.com","bigdata-99-52-slave.jjshome.com", "bigdata-99-53-slave.jjshome.com", "bigdata-99-54-slave.jjshome.com");

    private static Config createTopologyConfiguration() {
        Config conf = new Config();
        //是否是本地模式
        conf.setDebug(CommonConstant.IS_LOCAL?

true:false); //設置工作機數量 conf.setNumWorkers(CommonConstant.IS_LOCAL?4:16); return conf; } /** * @功能描寫敘述: 獲取KafkaConfig * @創建作者: 歐陽文斌 * @創建日期: 2015年12月11日 下午2:08:36 * @return */ private static KafkaSpout getKafkaSpout(){ // 房產網 bigdata日志的消息 String kafkaZookeeper = CommonConstant.IS_LOCAL?kafka_zookeeper_local:kafka_zookeeper_online; BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper); SpoutConfig kafka_config_fang = new SpoutConfig(brokerHosts, "jjs-fang-web-bigDatas", "/jjs-fang-web-bigDatas", "jjs-fang-web-bigDatas"); kafka_config_fang.scheme = new SchemeAsMultiScheme(new StringScheme()); kafka_config_fang.zkServers = CommonConstant.IS_LOCAL?zk_servers_local:zk_servers_online; kafka_config_fang.zkPort = 2181; return new KafkaSpout(kafka_config_fang); } /** * @功能描寫敘述: 獲取KafkaConfig * @創建作者: 歐陽文斌 * @創建日期: 2015年12月11日 下午2:08:36 * @return */ private static KafkaSpout getKafkaSpout_App(){ // 房產網 bigdata日志的消息 String kafkaZookeeper = CommonConstant.IS_LOCAL?kafka_zookeeper_local:kafka_zookeeper_online; BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper); SpoutConfig kafka_config_fang = new SpoutConfig(brokerHosts, "bigdata_app_logs", "/bigdata_app_logs", "bigdata_app_logs"); kafka_config_fang.scheme = new SchemeAsMultiScheme(new StringScheme()); kafka_config_fang.zkServers = CommonConstant.IS_LOCAL?zk_servers_local:zk_servers_online; kafka_config_fang.zkPort = 2181; return new KafkaSpout(kafka_config_fang); } public static void main(String[] args) { //Topology構造器 TopologyBuilder builder = new TopologyBuilder(); String topologyName = "UserLogTopology"; //配置器 Config topologyConfig = createTopologyConfiguration(); int runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS; final String app_index = "s_app"; final String pc_index = "s_pc"; final String fcwsplit_index = "b_fcwsplit"; final String logformat_index = "b_logformat"; final String logformatapp_index = "b_logformatapp"; //final String mongodb_index = "b_mongodb"; final String threshold_index = "b_threshold"; final String save_index = "b_save"; //設置 手機app log日志源 builder.setSpout(app_index, getKafkaSpout_App(), 4).setNumTasks(4); //設置 房產網日志源 builder.setSpout(pc_index, getKafkaSpout(), 8).setNumTasks(8); //房產網日志分割和過濾 builder.setBolt(fcwsplit_index, new BoltFCWSplit(), 8).setNumTasks(8).shuffleGrouping(pc_index); //日志格式化 builder.setBolt(logformat_index, new BoltLogFormat(), 4).setNumTasks(4).shuffleGrouping(fcwsplit_index); //手機日志格式化 builder.setBolt(logformatapp_index, new BoltLogFormat4App(), 4).setNumTasks(4).shuffleGrouping(app_index); //存儲 _USER_INTENTION 到mongoDB /*builder.setBolt(mongodb_index, new BoltMongo(), 2) .shuffleGrouping(logformat_index) .shuffleGrouping(logformatapp_index);*/ //數據 閥 控制 builder.setBolt(threshold_index, new BoltThreshold(2,60), 6).setNumTasks(6) .fieldsGrouping(logformat_index, new Fields("ip")) .fieldsGrouping(logformatapp_index, new Fields("ip")); //數據落地 builder.setBolt(save_index, new BoltSave(), 4).setNumTasks(4).fieldsGrouping(threshold_index, new Fields("ip")); try { StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds); } catch (Exception e) { logger.error("UserLogTopology@main", e); } } }

builder的整個構建過程,實際上也就是數據流的加工過程。

kafka的spout是引用第三方的jar,pom中有配置。
bolt

package com.jjshome.storm.bolt.house;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.jjshome.bigdata.entity.log._JJS_Log;
import com.jjshome.bigdata.util.SystemConstant;

/** * @功能描寫敘述: 房產網日志解析 * @項目版本號: 1.0.0 * @項目名稱: 大數據 * @相對路徑: com.jjshome.storm.bolt.BoltFCWSplit.java * @創建作者: 歐陽文斌 * @問題反饋: ouyangwenbin2009@live.cn * @創建日期: 2015年12月11日 下午2:20:07 */ 
public class BoltFCWSplit implements IRichBolt {
    private static final long serialVersionUID = 1L;
    private Logger logger = LoggerFactory.getLogger(BoltFCWSplit.class);
    private OutputCollector collector;

    /** 用戶行為分析的LOG正則 */
    private static Pattern s = Pattern.compile(""
            //時間
            + "(.*?),.*"
            //類別
            + "(YslHouseController|EsfHouseController|ZfHouseController|AgentInfoController|YywtController).*"
            //ip
            + "ip=(.*?

),.*" //cityCode + "cityCode=(.*?),.*" //userId + "userId=(.*?),.*" //phone + "phone=(.*?

),.*" //refererAddress + "refererAddress=(.*?),.*" //accessAddress + "accessAddress=(.*?),.*" //tags + "tags=(.*?),.*" //keyWord + "keyWord=(.*?),.*" //cookiesId + "cookiesId=(.*?

),.*"); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("object")); } @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String msg = "NOTHING"; try { //獲取消息流 msg = input.getString(0); //異常日志推斷 if(msg!=null&&msg.length()<1000){ //正則匹配 Matcher sm = s.matcher(msg); if(sm.find()){ //LOG日志格式轉換這對象 _JJS_Log jjsLog = new _JJS_Log(); log2entity(sm, jjsLog); if (jjsLog.getUrl_type() == 5) { if (jjsLog.getNew_url() != null && jjsLog.getNew_url().indexOf("saveReserveOrderInfo") > -1 && !"".equals(jjsLog.getUserId()) && null != jjsLog.getUserId()) { //發送消息到下一個bolt collector.emit(new Values(jjsLog)); } } else { //發送消息到下一個bolt collector.emit(new Values(jjsLog)); } } } } catch (Exception e) { //錯誤記錄做記錄 不須要反復發送 logger.error("BoltFCWSplit@execute "+msg, e); } finally { //消息處理成功 collector.ack(input); } } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } /** * @功能描寫敘述: log日志轉化 * @創建作者: 歐陽文斌 * @創建日期: 2015年12月15日 上午11:34:45 * @param sm * @param jjsLog */ private void log2entity(Matcher sm, _JJS_Log jjsLog){ if(sm!=null&&jjsLog!=null){ int i=0; jjsLog.setS_date(sm.group(++i)); jjsLog.setType(SystemConstant.FCW_INDEX); String type = sm.group(++i); if(StringUtils.isNotEmpty(type)){ if(type.equals("YslHouseController")){ jjsLog.setUrl_type(1); }else if(type.equals("EsfHouseController")){ jjsLog.setUrl_type(2); }else if(type.equals("ZfHouseController")){ jjsLog.setUrl_type(3); }else if(type.equals("AgentInfoController")){ jjsLog.setUrl_type(4); } else if(type.equals("YywtController")){ jjsLog.setUrl_type(5); } } jjsLog.setIp(sm.group(++i)); jjsLog.setCityCode(sm.group(++i)); jjsLog.setUserId(sm.group(++i)); jjsLog.setTel_num(sm.group(++i)); jjsLog.setOld_url(sm.group(++i)); jjsLog.setNew_url(sm.group(++i)); jjsLog.setTags(sm.group(++i)); jjsLog.setKeyWord(sm.group(++i)); jjsLog.setCookies(sm.group(++i)); } } }

bolt中就是數據的邏輯處理,關鍵的方法是input.getString(0);獲取數據,collector.emit(new Values(jjsLog));發送數據,collector.ack(input);告訴前一個發送者,信息處理成功。
在topology的grouping策略就是在Spout與Bolt、Bolt與Bolt之間傳遞Tuple的方式。總共同擁有七種方式:
1)shuffleGrouping(隨機分組)
2)fieldsGrouping(依照字段分組,在這里即是同一個單詞僅僅能發送給一個Bolt)
3)allGrouping(廣播發送。即每個Tuple。每個Bolt都會收到)
4)globalGrouping(全局分組,將Tuple分配到task id值最低的task里面)
5)noneGrouping(隨機分派)
6)directGrouping(直接分組,指定Tuple與Bolt的相應發送關系)
7)Local or shuffle Grouping
8)customGrouping (自己定義的Grouping)
經常使用的也就是隨機分組、按字段分組以及全局分組。
在自己Topology開發完畢后。能夠講執行模型改動成本地,然后執行Topology,方便進行調試。若是要公布到進群環境中。則將Storm項目打包,maven install(作者是maven項目),將打好的jar上傳到nimbus服務器。

storm jar storm-kafka-topology.jar com.jjshome.storm.topology.UserLogTopology

在jar的根文件夾上傳jar到storm集群中。后面的類名是一個帶main的topology,也就是上面的客戶畫像的topology。


公布成功后,能夠在UI界面看到topology的執行情況,各個節點的日志處理數量,延遲時間
這里寫圖片描寫敘述
topology執行起來后,能夠在各個數據存儲的節點中。獲取storm實時分析的結果。通過分析的結構,得到各個用戶實時的各種標簽,最后通過這些標簽。在產品庫中篩選最匹配的產品。


以下是作者的客戶畫像架構圖
這里寫圖片描寫敘述
數據流程
1.用戶操作產生日志
2.kafka收集日志
3.Storm分析處理日志
1)日志詳情存儲到mongoDB
2)半小時外意向模型存儲到mongoDB
3)半小時內意向模型存儲到redis
4)假設用戶登錄后的操作。則喚醒mongodb中全部的半小時意向模型,又一次組裝模型更新到mysql熱表中
5)監控日志,假設發生預警事件操作,則觸發意向模型以及精准推薦的生成
模型構建
在生成各種標簽集合時。要增加權重因子(可變),針對不同產品。構建不同標簽,再對各種操作以及權重因子。來生產用戶標簽。

深度分析能夠考慮增加機器學習在里面。


開發問題和運維問題的分析和解決

Q:在搭建集群的時候,通過UI看到各個節點的主機名一樣。都是localhost,導致topology全然不工作。
A:檢測各個虛擬機的hostname,保持和hosts中配置的一致,再重新啟動zookeeper和storm集群


Q:在公布topology到集群上后。在UI界面中看到各種class找不到的錯誤
A:將storm項目中的lib打包統一都放到storm中lib,這里要注意jar包沖突和版本號問題


Q:在日志累加的時候。fail的日志越來越多,導致延遲越來越大
A:這個問題跟業務處理有關系,檢查出現故障的bolt,通過刪剪法,反復提交測試,找出有問題的代碼


Q:發現設置的works節點不生效,實際的比設置的少非常多
A:檢查topology的配置器,是不是本地模式。


Q:數據實時處理,怎么才干高效的讓數據落地
A:作者這里用了滾筒模式,累積半小時的數據。再統一存儲。半小時以內的。直接存放在redis集群中


Q:在使用kafka的producer命令監控日志的時候,老是出現日志終端的現象
A:看看log4j是否配置了日志時間戳,由於開啟了時間戳,日志將會定時或不定時的將文件重命名,然后新開硬盤地址做存儲。這樣kafka是沒有辦法獲取新的log硬盤地址。

解決的方法:換用KafkaLog4jAppender方式,或者讓log文件不替換,每天定時清理一次就好了


Q:kafka集群服務器硬盤空間滿了
A:在沒有什么設定的操作下,kafka收到的日志會存儲在硬盤中,終究有一天。硬盤會滿掉。解決的方法:在各個節點增加crontab計划

0 6 * * * /home/zookeeper/bin/zkCleanup.sh -n 3


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM