最近一直在負責業務監控告警相關的開發;由於組織架構調整,從原來的服務端架構組分離出來成立工程效率組,很多原來不是我們組負責的項目也開始陸續交接到了我們手里;
以前一直由業務部門負責開發的sensoragent項目就是由我來交接,交接的時候才發現有很多問題,該應用是業務部門用來從MQ消費消息往神策發送數據的,由於數據量很大,該應用在生產已經擴大到了12點,而我們普通應用在生產也就四個點;雖然已經擴大到了12點,生產仍然有很大的消息堆積來不及處理,所以每天都能收到大量的消息堆積告警;
部門領導也跟我強調說,生產12個點仍然積壓,服務肯定是有問題的,因為我最近單獨負責神策指標收集系統的開發,所以和這個系統相關的指標發送系統的排查任務也就落到了我的頭上。
下圖是我的告警(目前公司只有開發運維角色才會接收到告警)

因為這兩天迭代結束,有時間來處理下自己交接項目的一個分析和優化:
我查看了應用在openshift里面的一些情況,同時也分析了一下granfa上面的應用運行狀況:在看到openshift里面的時候發現了一些端倪:如圖


項目里用到了十個線程去消費 MQ的數據,但是openshift上面可以看出,每次只有一個線程在處理,而且會出現Blocked的情況:
點開以后發現blocked的線程阻塞在send方法:
@Override public void send(Map<String, Object> message) {
synchronized (messageList) {
messageList.add(message);
if (messageList.size() >= bulkSize) {
flush();
}
}
}
這是神策官方sdk提供的的方法,可以看到加了鎖,每次consumer消費完信息,都會在內存里面存一份數據然后統一flush到神策。
我們再來看交接過來的項目是怎么使用的;

SensorsAnalytics這個對象是個單例對象,這樣存在的問題在於十個線程共用一個對象,存在現在安全,所以有了synchronized關鍵字,但也會導致串行化,同時但list集合滿了,會執行flush方法,而該方法:如下
@Override public void flush() {
synchronized (messageList) {
String sendingData = null;
try {
sendingData = jsonMapper.writeValueAsString(messageList);
} catch (JsonProcessingException e) {
messageList.clear();
if (throwException) {
throw new RuntimeException("Failed to serialize data.", e);
}
}
try {
this.httpConsumer.consume(sendingData);
messageList.clear();
} catch (IOException e) {
if (throwException) {
throw new RuntimeException("Failed to dump message with BatchConsumer.", e);
}
} catch (HttpConsumer.HttpConsumerException e) {
if (throwException) {
throw new RuntimeException("Failed to dump message with BatchConsumer.", e);
}
}
}
}
該方法會往神策發送數據,並且每次發送的時間很長,這也會嚴重阻礙到線程繼續消費運行;
目前我的解決方案是,為每個線程分配一個SensorsAnalytics對象:
代碼改進:
@Slf4j
public class SensorsAnalyticsThreadLocal {
private static final String PROPERTIES_PATH = "/application.properties";
private static final String URL_SENSORS_DATA = "url.sensorsanalytics";
private static final String SENSORS_DATA_BATCH_CONSUMER_BULK_SIZE = "sensorsdata.batchconsumer.bulksize";
private static Properties props;
static {
Resource resource = new ClassPathResource(PROPERTIES_PATH);
try {
props = PropertiesLoaderUtils.loadProperties(resource);
} catch (IOException e) {
log.error("初始化配置失敗", e);
}
}
private static ThreadLocal<SensorsAnalytics> sensorsAnalyticsThreadLocal = ThreadLocal.withInitial(() -> {
String sensorsDataUrl = props.getProperty(URL_SENSORS_DATA);
Integer bulkSize = Integer.valueOf(props.getProperty(SENSORS_DATA_BATCH_CONSUMER_BULK_SIZE));
return new SensorsAnalytics(new SensorsAnalytics.BatchConsumer(sensorsDataUrl, bulkSize, true));
}
);
public static SensorsAnalytics get() {
return sensorsAnalyticsThreadLocal.get();
}
public static void set(SensorsAnalytics sensorsAnalytics) {
sensorsAnalyticsThreadLocal.set(sensorsAnalytics);
}
public static void remove() {
sensorsAnalyticsThreadLocal.remove();
}
}
今天中午上到了生產,並且下來效果卓有成效,以前下午積壓最嚴重的時候,也沒有告警了,在運行平穩后,我們將原來生產12個點降低到了4個節點.
簡單總結下:生產環境,應用從業務開發部門交接過來的時候,生產部署了12個節點。每個節點有10個線程在消費mq的消息,但是每天仍然有大量mq消息積壓的告警,我通過線程的堆棧的信息,發現每個節點同一時刻,僅有一個線程在消費,其它線程都處於阻塞狀態,原因在於使用神策的sdk發送數據時,神策官方為了線程安全使用synchronized。但是這也是線程阻塞的原因,我用ThreadLocal解決了因為這個原因導致線程阻塞的問題,並在服務逐漸穩定的情況下,逐漸降低節點數,並最終降到了公司生產服務的最低節點數4.
