java 分布式實踐


java 分布式實踐

spring boot cloud實踐

開源的全鏈路跟蹤很多,比如 Spring Cloud Sleuth + Zipkin,國內有美團的 CAT 等等。

其目的就是當一個請求經過多個服務時,可以通過一個固定值獲取整條請求鏈路的行為日志,基於此可以再進行耗時分析等,衍生出一些性能診斷的功能。

不過對於我們而言,首要目的就是 Trouble Shooting,出了問題需要快速定位異常出現在什么服務,整個請求的鏈路是怎樣的。

為了讓解決方案輕量,我們在日志中打印 RequestId 以及 TraceId 來標記鏈路。

RequestId 在 Gateway 生成表示唯一一次請求,TraceId 相當於二級路徑,一開始與 RequestId 一樣,但進入線程池或者消息隊列后,TraceId 會增加標記來標識唯一條路徑。

舉個例子,當一次請求向 MQ 發送一個消息,那么這個消息可能會被多個消費者消費,此時每個消費線程都會自己生成一個 TraceId 來標記消費鏈路。加入 TraceId 的目的就是為了避免只用 RequestId 過濾出太多日志。

實現上,通過 ThreadLocal 存放 APIRequestContext 串聯單服務內的所有調用。

當跨服務調用時,將 APIRequestContext 信息轉化為 HTTP Header,被調用方獲取到 HTTP Header 后再次構建 APIRequestContext 放入 ThreadLocal,重復循環保證 RequestId 和 TraceId 不丟失即可。

如果進入 MQ,那么 APIRequestContext 信息轉化為 Message Header 即可(基於 RabbitMQ 實現)。

當日志匯總到日志系統后,如果出現問題,只需要捕獲發生異常的 RequestId 或是 TraceId 即可進行問題定位。

1. 功能介紹

2. 使用方法

  1. 啟動udf-rabbitmq-producer:

  2. 啟動udf-rabbitmq-comsumer:

  3. 查看日志: produce的日志

|2019-04-04 11:53:12.260| INFO|XNIO-1 task-10|udf.udf.rabbitmq.producer.controller.SendMessage:34||127.0.0.1|172.27.9.60|clientSysName|ito-rabbitmq-provider|1904041153122562832859336|BIZ|init rabbitmq2org.springframework.amqp.rabbit.core.RabbitTemplate@1b1f5012|
|2019-04-04 11:53:12.262| INFO|XNIO-1 task-10|udf.udf.rabbitmq.springboot.starter.postprocess.MDCMesagePostProcess:41||127.0.0.1|172.27.9.60|clientSysName|ito-rabbitmq-provider|1904041153122562832859336|BIZ|{"traceId":"1904041153122562832859336","clientSysName":"clientSysName","logType":"BIZ","request":"HttpServletRequestImpl [ POST /v1/sendMsg ]","clientNodeName":"127.0.0.1","serverNodeName":"172.27.9.60","transId":"transId","requestUrl":"http://127.0.0.1:10616/v1/sendMsg","serverSysName":"ito-rabbitmq-provider","startTime":"1554349992256"}|
|2019-04-04 11:53:12.264| INFO|XNIO-1 task-10|udf.udf.log.springboot.starter.filter.NormalAspect:52||127.0.0.1|172.27.9.60|clientSysName|ito-rabbitmq-provider|1904041153122562832859336|RESP|interfaceName=http://127.0.0.1:10616/v1/sendMsg^executeTime=8^desc=request:HttpServletRequestImpl [ POST /v1/sendMsg ],response:{"age":0,"name":"string"}|

comsumer的日志

|2019-04-04 11:53:12.295| INFO|dataDeepDealListenerContainer-6|udf.udf.rabbitmq.springboot.starter.postprocess.MDCReceivePostProcessors:52||127.0.0.1|172.27.9.60|ito-rabbitmq-provider|ito-rabbitmq-comsumer|1904041152498513697567629|BIZ|接收到的MDC{"traceId":"1904041153122562832859336","clientSysName":"clientSysName","logType":"BIZ","request":"HttpServletRequestImpl [ POST /v1/sendMsg ]","clientNodeName":"127.0.0.1","serverNodeName":"172.27.9.60","transId":"transId","requestUrl":"http://127.0.0.1:10616/v1/sendMsg","serverSysName":"ito-rabbitmq-provider","startTime":"1554349992256","__TypeId__":"udf.udf.rabbitmq.producer.model.SampleMessage"}|
|2019-04-04 11:53:12.296| INFO|dataDeepDealListenerContainer-6|udf.udf.rabbitmq.comsumer.mqlistener.SampleMessageListener:26||127.0.0.1|172.27.9.60|ito-rabbitmq-provider|ito-rabbitmq-comsumer|1904041153122562832859336|BIZ|成功處理MQ消息, 消息體:{"name":"string","age":0}|
  1. 實現要點: 對rabbitmqtemplate進行增強,添加前置消息處理器:
@Bean
   RabbitTemplate reforeRabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter jackson2JsonMessageConverter)
   {
         RabbitTemplate rabbitTemplate =new RabbitTemplate(connectionFactory);
       rabbitTemplate.setBeforePublishPostProcessors(mdcMesagePostProcess);
       rabbitTemplate.setAfterReceivePostProcessors(mdcReceivePostProcessors);
       rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
       logger.info("init rabbitmq"+rabbitTemplate);
       return rabbitTemplate;
   }
public class MDCMesagePostProcess implements MessagePostProcessor {
    private Logger logger= LoggerFactory.getLogger(MDCMesagePostProcess.class);
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        Map<String, String> mdcContainer =MDC.getCopyOfContextMap();
        logger.info(JSON.toJSONString(mdcContainer));
        for (Map.Entry<String, String> m : mdcContainer.entrySet()) {
            message.getMessageProperties().setHeader(m.getKey(),m.getValue());
        }
        return message;
    }
}
@Service
public class MDCReceivePostProcessors implements MessagePostProcessor {
    @Value("${spring.application.name}")
    private String sysName;
    private Logger logger= LoggerFactory.getLogger(MDCReceivePostProcessors.class);
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        Map<String, Object> mdc= message.getMessageProperties().getHeaders();
        Map<String, String> mdcString= new HashMap<String,String>();
        logger.info("接收到的MDC"+ JSON.toJSONString(mdc));
        for (Map.Entry<String, Object> m : mdc.entrySet()) {

            mdcString.put(m.getKey(),m.getValue().toString());

        }
        mdcString.put(LogKeyConstants.CLIENT_SYS_NAME, String.valueOf(mdc.get(LogKeyConstants.SERVER_SYS_NAME)));
        mdcString.put(LogKeyConstants.SERVER_SYS_NAME, sysName);
        MDC.setContextMap(mdcString);
        return message;
    }
}

消費者還是需要額外指定:

@Bean
    public SimpleMessageListenerContainer dataDeepDealListenerContainer(ConnectionFactory connectionFactory, Queue queue) {

        SimpleMessageListenerContainer container =new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(queue);
        container.setMessageListener(sampleMessageListener);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setAfterReceivePostProcessors(mdcReceivePostProcessors);
        return container;
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM