Rocketmq日志收集與logback集成Demo


官方文檔有簡潔的例子,這里就做一個簡單補充和實踐

  1. 直接上logback-boot.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <!-- 文件輸出格式 -->
    <property name="PATTERN" value="%-12(%d{yyyy-MM-dd HH:mm:ss.SSS}) |-%-5level [%thread] %c [%L] -| %msg%n"/>
    <!-- 生產日志文件路徑 -->
    <property name="LOG_FILE_PATH" value="/home/tomapp/manager/logs"/>
    <!-- 開發環境 -->
    <springProfile name="dev">
        <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>${PATTERN}</pattern>
            </encoder>
        </appender>
        <logger name="com.tzxylao.manager" level="debug"/>
        <root level="info">
            <appender-ref ref="CONSOLE"/>
        </root>
    </springProfile>
    <!-- 生產環境 -->
    <springProfile name="prod">
        <!-- 每天產生一個文件 -->
        <appender name="PROD_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">

            <!-- 文件路徑 -->
            <file>${LOG_FILE_PATH}/manager.log</file>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <!-- 文件名稱 -->
                <fileNamePattern>${LOG_FILE_PATH}/manager.%d{yyyy-MM-dd}.log</fileNamePattern>
                <!-- 文件最大保存歷史數量 -->
                <MaxHistory>100</MaxHistory>
            </rollingPolicy>
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>${PATTERN}</pattern>
            </layout>
        </appender>
        <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>${PATTERN}</pattern>
            </encoder>
        </appender>
        <appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
            <tag>TagA</tag>
            <topic>TopicTest</topic>
            <producerGroup>please_rename_unique_group_name</producerGroup>
            <nameServerAddress>47.101.45.25:9876</nameServerAddress>
            <layout>
                <pattern>%date %p %t - %m%n</pattern>
            </layout>
        </appender>

        <appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
            <queueSize>1024</queueSize>
            <discardingThreshold>80</discardingThreshold>
            <maxFlushTime>2000</maxFlushTime>
            <neverBlock>true</neverBlock>
            <appender-ref ref="mqAppender1"/>
        </appender>

        <logger name="com.tzxylao.manager" level="debug"/>
        <root level="info">
            <appender-ref ref="PROD_FILE"/>
            <appender-ref ref="CONSOLE"/>
            <appender-ref ref="mqAppender1"/>
        </root>

    </springProfile>

</configuration>
  • 這里有我本來的日志輸出,rocketmq的輸出主要是appender為mqAppender1、mqAsyncAppender1的這兩個,最后放入root節點就可以了
  • 官方文檔路徑:https://rocketmq.apache.org/docs/logappender-example/
  • 這里要起作用,必須把nameServerAddress,rocketmq服務地址配上
  • 可能出錯的地方就是找不到對應的類了,這里給出我的引用pom
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-logappender</artifactId>
    <version>4.3.0</version>
</dependency>

測試

  1. 編寫個簡單的消費端服務,就拿官方的例子加上服務地址就行了
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.tzxylao.quick;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        /*
         * Specify where to start in case the specified consumer group is a brand new one.
         */
        consumer.setNamesrvAddr("47.101.45.25:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more more topics to consume.
         */
        consumer.subscribe("TopicTest", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
//                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                try {
                    String s = new String(msgs.get(0).getBody(), "utf-8");
                    System.out.println(s);
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

  1. 好了,現在啟動消費端,再啟動配置了rocketmq日志的服務端
  • 這是生產端打印

  • 這是消費端打印

好了,日志收集后怎么用,自己發揮想象吧~~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM