一次lettuce导致的内存溢出问题


现象

生产环境,程序出现内存溢出。

分析

heap dump文件分析

发现添加了大量的ShutdownHooks。

通过类名“io/lettuce/core/metrics/DefaultCommandLatencyCollector$DefaultPauseDetectorWrapper$1”,得知是lettuce框架的。找到此类源码,添加ShutdownHooks的代码。

public void retain() {

	if (counter.incrementAndGet() == 1) {

		if (instanceCounter.getAndIncrement() > 0) {
			InternalLogger instance = InternalLoggerFactory.getInstance(getClass());
			instance.info("Initialized PauseDetectorWrapper more than once.");
		}

		pauseDetector = new SimplePauseDetector(TimeUnit.MILLISECONDS.toNanos(10), TimeUnit.MILLISECONDS.toNanos(10), 3);
		Runtime.getRuntime().addShutdownHook(new Thread("ShutdownHook for SimplePauseDetector") {
			@Override
			public void run() {
				if (pauseDetector != null) {
					pauseDetector.shutdown();
				}
			}
		});
	}
}

查找该方法的调用者。发现,在DefaultCommandLatencyCollector的构造函数中,添加了一个Function createLatencies,这个function中,有调用retain()方法。

# DefaultCommandLatencyCollector.class
public DefaultCommandLatencyCollector(CommandLatencyCollectorOptions options) {

	this.options = options;
	this.createLatencies = id -> {

		if (PAUSE_DETECTOR_UPDATER.get(this) == null) {
			if (PAUSE_DETECTOR_UPDATER.compareAndSet(this, null, GLOBAL_PAUSE_DETECTOR)) {
				PAUSE_DETECTOR_UPDATER.get(this).retain();
			}
		}
		PauseDetector pauseDetector = ((DefaultPauseDetectorWrapper) PAUSE_DETECTOR_UPDATER.get(this)).getPauseDetector();

		if (options.resetLatenciesAfterEvent()) {
			return new Latencies(pauseDetector);
		}

		return new CummulativeLatencies(pauseDetector);
	};
}

继续,查找createLatencies的调用者。发现如下方法:

# DefaultCommandLatencyCollector.class
/**
 * Record the command latency per {@code connectionPoint} and {@code commandType}.
 *
 * @param local the local address
 * @param remote the remote address
 * @param commandType the command type
 * @param firstResponseLatency latency value in {@link TimeUnit#NANOSECONDS} from send to the first response
 * @param completionLatency latency value in {@link TimeUnit#NANOSECONDS} from send to the command completion
 */
public void recordCommandLatency(SocketAddress local, SocketAddress remote, ProtocolKeyword commandType,
		long firstResponseLatency, long completionLatency) {

	if (!isEnabled()) {
		return;
	}

	Latencies latencies = latencyMetricsRef.get().computeIfAbsent(createId(local, remote, commandType), createLatencies);

	latencies.firstResponse.recordLatency(rangify(firstResponseLatency));
	latencies.completion.recordLatency(rangify(completionLatency));
}

通过注释,是在记录command latency,网络上查询资料,发现这是lettuce的一个默认打开的功能,统计命令延时。而我们显示不需要这个功能,所以将其禁用。

修改

禁用command latency。

package syb.redisTest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.metrics.CommandLatencyCollectorOptions;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;

@Component
public class RedisTestThread extends Thread {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${host}")
    private String host;

    @Value("${port}")
    private int port;

    @Value("${password}")
    private String password;

    private RedisClient redisClient;
    private StatefulRedisConnection<String, String> connect;
    private ClientResources res;

    private String key = "test-syb-key";
    private String value = "test-syb-value";

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                logger.error("", e);
            }
            logger.info("run test, start");
            try {
                res = DefaultClientResources.builder()
                        .commandLatencyCollectorOptions(CommandLatencyCollectorOptions.disabled()).build();
                RedisURI redisUri = RedisURI.Builder.redis(host).withPort(port).withPassword(password).withDatabase(0)
                        .build();
                redisClient = RedisClient.create(res, redisUri);
                connect = redisClient.connect();
                RedisCommands<String, String> commands = connect.sync();

                commands.set(key, value);
                commands.get(key);
                commands.del(key);

                destroy();
            } catch (Exception e) {
                logger.error("", e);
            } finally {
                destroy();
            }
            logger.info("run test, end");
        }
    }

    public void destroy() {
        try {
            connect.close();
            redisClient.shutdown();
            res.shutdown();
        } catch (Exception e) {
            logger.error("", e);
        }
        logger.info("redis client destroy");
    }
}

创建ClientResources时,禁用了Command latency。
再次运行程序,隔一段时间抓一次heap dump,未再发现ShutdownHooks。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM