Java 混沌實驗執行器 chaosblade-exec-jvm


關於混沌實驗的意義和作用這里就不說了,這里只說實現。
Java項目混沌實驗整體實現思路就是通過java agent 和字節碼增強技術注入異常。
針對 Java 注入的異常有:
  1. 線程池被占用
  2. 連接池被占用
  3. 方法延遲
  4. 拋異常
  5. 指定返回值
  6. CPU 滿載
  7. 內存溢出
  8. CodeCacheFilling
下面依次展開
 @Override
 public void full(final ThreadPoolExecutor threadPoolExecutor) {
        if (threadPoolExecutor == null) {
            LOGGER.warn("threadPoolExecutor is null");
            return;
        }
        if (futureCache.size() > 0) {
            LOGGER.info("thread pool has started");
            return;
        }
        isRunning = true;
        final int activeCount = threadPoolExecutor.getActiveCount();
        final int corePoolSize = threadPoolExecutor.getCorePoolSize();
        final int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
        LOGGER.info("start execute thread pool full, activeCount: {}, corePoolSize: {}, maximumPoolSize: {}",
            activeCount, corePoolSize, maximumPoolSize);

        synchronized (lock) {
            if (executorService == null || executorService.isShutdown() || executorService.isTerminated()) {
                executorService = ThreadUtil.createScheduledExecutorService();
            }
        }
        executorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < maximumPoolSize; i++) {
                        // 不斷的提交線程, 這個線程里面就是一個sleep, 所以會霸占這線程
                        Future<?> future = threadPoolExecutor.submit(new InterruptableRunnable());
                        if (future != null) {
                            // 放入到一個數組中,等恢復的時候,再把數組中的線程關閉掉
                            futureCache.add(future);
                        }
                    }
                } catch (RejectedExecutionException e) {
                    LOGGER.info("has triggered thread pool full");
                } catch (Exception e) {
                    LOGGER.error("execute thread pool full exception", e);
                }
            }
        }, 0, 10, TimeUnit.SECONDS);
}
@Override
public void full(final DataSource dataSource) {
        if (dataSource == null) {
            LOGGER.warn("dataSource is null");
            return;
        }
        if (connectionHolder.size() > 0) {
            LOGGER.info("connection pool full has started");
            return;
        }
        isRunning = true;

        int maxPoolSize = getMaxPoolSize();
        final int poolSize = maxPoolSize <= 0 ? DEFAULT_MAX_POOL_SIZE : maxPoolSize;
        LOGGER.info("Start execute connection pool full, poolSize: {}", poolSize);

        synchronized (lock) {
            if (executorService == null || executorService.isShutdown() || executorService.isTerminated()) {
                executorService = ThreadUtil.createScheduledExecutorService();
            }
        }
        executorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < poolSize; i++) {
                        // 這里持續獲取連接,但不釋放
                        Connection connection = dataSource.getConnection();
                        if (connection != null) {
                            // 將連接放到一個數組中,在恢復的時候, 關閉掉
                            connectionHolder.add(connection);
                        }
                    }
                    LOGGER.info("execute connection pool full success");
                } catch (SQLException e) {
                    LOGGER.info("connection pool full, {}", e.getMessage());
                } catch (Exception e) {
                    LOGGER.warn("get database connection exception", e);
                }
            }
        }, 0, 10, TimeUnit.SECONDS);
}
@Override
public void run(EnhancerModel enhancerModel) throws Exception {
        // 要延遲的時間
        String time = enhancerModel.getActionFlag(timeFlagSpec.getName());
        Integer sleepTimeInMillis = Integer.valueOf(time);
        int offset = 0;
        String offsetTime = enhancerModel.getActionFlag(timeOffsetFlagSpec.getName());
        if (!StringUtil.isBlank(offsetTime)) {
            offset = Integer.valueOf(offsetTime);
        }
        TimeoutExecutor timeoutExecutor = enhancerModel.getTimeoutExecutor();
        if (timeoutExecutor != null) {
            long timeoutInMillis = timeoutExecutor.getTimeoutInMillis();
            if (timeoutInMillis > 0 && timeoutInMillis < sleepTimeInMillis) {
                sleep(timeoutInMillis, 0);
                timeoutExecutor.run(enhancerModel);
                return;
            }
        }
        sleep(sleepTimeInMillis, offset);
}

@Override
public void sleep(long timeInMillis, int offsetInMillis) {
Random random = new Random();
int offset = 0;
if (offsetInMillis > 0) {
offset = random.nextInt(offsetInMillis);
}
if (offset % 2 == 0) {
timeInMillis = timeInMillis + offset;
} else {
timeInMillis = timeInMillis - offset;
}
if (timeInMillis <= 0) {
timeInMillis = offsetInMillis;
}
try {
TimeUnit.MILLISECONDS.sleep(timeInMillis);
} catch (InterruptedException e) {
LOGGER.error("running delay action interrupted", e);
}
}
 

 

    @Override
    public void run(EnhancerModel enhancerModel) throws Exception {
        Exception exception = null;
        String exceptionMessage = null;
        if (exceptionMessageFlag != null) {
            exceptionMessage = enhancerModel.getActionFlag(exceptionMessageFlag.getName());
        }
        if (StringUtil.isBlank(exceptionMessage)) {
            exceptionMessage = DEFAULT_EXCEPTION_MESSAGE;
        }
        if (enhancerModel.getAction().equals(THROW_CUSTOM_EXCEPTION)) {
            exception = throwCustomException(enhancerModel.getClassLoader(), enhancerModel.getActionFlag(exceptionFlag
                .getName()), exceptionMessage);
        } else if (enhancerModel.getAction().equals(THROW_DECLARED_EXCEPTION)) {
            exception = throwDeclaredException(enhancerModel.getClassLoader(), enhancerModel.getMethod(),
                exceptionMessage);
        }
        if (exception != null) {
            // 模擬拋出異常
            InterruptProcessException.throwThrowsImmediately(exception);
        }
    }
   @Override
   public void run(EnhancerModel enhancerModel) throws Exception {
        // get return value from model action
        String value = enhancerModel.getActionFlag(valueFlagSpec.getName());
        Method method = enhancerModel.getMethod();

        if (method == null) {
            return;
        }

        final Map<String, Object> variates = new HashMap<String, Object>();
        if (enhancerModel.getMethodArguments() != null) {
            for (int i = 0; i < enhancerModel.getMethodArguments().length; i++) {
                variates.put(String.format("p%d", i), enhancerModel.getMethodArguments()[i]);
            }
        }
        if (enhancerModel.getReturnValue() != null) {
            variates.put("r", enhancerModel.getReturnValue());
        }

        //這里其實就是一個映射器吧, 翻譯基本類型用的
        final Calculator calculator = new Calculator() {
            @Override
            public Constant getValue(String name) throws CompilerException {
                if (name==null || name.equals("null")) {
                    return Constant.build(NULL, null);
                } else if (!variates.containsKey(name)) {
                    return Constant.build(STRING, name);
                } else if (variates.get(name) instanceof Number) {
                    return Constant.build(NUMERIC, ((Number) variates.get(name)).doubleValue());
                } else if (variates.get(name) instanceof String) {
                    return Constant.build(STRING, variates.get(name).toString());
                } else if (variates.get(name) instanceof Boolean) {
                    return Constant.build(BOOLEAN, Boolean.parseBoolean(variates.get(name).toString()));
                }
                return Constant.build(NULL, null);
            }

            @Override
            public boolean isVariate(String name) {
                return variates.containsKey(name);
            }
        };

        final Syntactic syntactic = new Syntactic(calculator);
        final Constant constant = syntactic.getFormulaValue(value);

        // 生成返回值
        Object returnValue = generateReturnValue(enhancerModel.getClassLoader(), method, constant.getAsString());

        InterruptProcessException.throwReturnImmediately(returnValue);
    }
    @Override
    public void run(EnhancerModel enhancerModel) throws Exception {
        if (executorService != null && (!executorService.isShutdown())) {
            throw new IllegalStateException("The jvm cpu full load experiment is running");
        }
        // 綁定 CPU 核心數, 即指定幾個核心滿載
        String cpuCount = enhancerModel.getActionFlag(JvmConstant.FLAG_NAME_CPU_COUNT);
        // 獲取所有核心
        int maxProcessors = Runtime.getRuntime().availableProcessors();
        int threadCount = maxProcessors;
        if (!StringUtil.isBlank(cpuCount)) {
            Integer count = Integer.valueOf(cpuCount.trim());
            if (count > 0 && count < maxProcessors) {
                threadCount = count;
            }
        }
        synchronized (lock) {
            if (executorService != null && (!executorService.isShutdown())) {
                throw new IllegalStateException("The jvm cpu full load experiment is running...");
            }
            // 需要N核心滿載,就生成N個的線程,
            executorService = Executors.newFixedThreadPool(threadCount);
        }
        flag = true;
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    // 死循環, 循環內什么都不要做
                    while (flag) {
                    }
                }
            });
            LOGGER.info("start jvm cpu full load thread, {}", i);
        }
    }
內存這塊有三種異常,堆,非堆, 堆外
--堆
@Override
protected void innerRun(EnhancerModel enhancerModel, JvmOomConfiguration jvmOomConfiguration) { oomObjects.add(new OomObject(jvmOomConfiguration.getBlock())); }

--非堆
@Override
protected void innerRun(EnhancerModel enhancerModel, JvmOomConfiguration jvmOomConfiguration) {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(OomObject.class);
enhancer.setUseCache(false);
enhancer.setCallback(new MethodInterceptor() {
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy)
throws Throwable {
return proxy.invokeSuper(obj, args);
}
});
enhancer.create();

}
--堆外
@Override
protected void innerRun(EnhancerModel enhancerModel, JvmOomConfiguration jvmOomConfiguration) {
oomObjects.add(ByteBuffer.allocateDirect(jvmOomConfiguration.getBlock() * JvmConstant.ONE_MB));
}
 

 

原理就是動態編譯生成大量的 class,對編譯后的 class 進行實例化並調用。讓 JIT 誤以為是一個熱方法而進行編譯。
@Override
public void run() {
            LOGGER.info("Generating all objects for preheating. Bucket size: " + bucketSize + ".");

            List<Object> objects = new ArrayList<Object>();
// 動態類 DynamicJavaClassGenerator generator
= new DynamicJavaClassGenerator(); for (int i = 0; i < bucketSize; i++) { if (!flag.get()) { LOGGER.info("Experiment stopped. Stop code cache object generating."); } Object object = null; // 生成對象 try { object = generator.generateObject(); } catch (Exception e) { LOGGER.error("Generate CodeCacheObject failed.", e); } if (null != object) { objects.add(object); } } LOGGER.info("Generated all objects for code cache filling."); try { lock.await(); } catch (Exception e) { LOGGER.error("Worker thread has been interrupted.", e); return; } LOGGER.info("Preheating all objects. Compile threshold: " + compileThreshold + "."); while (true) { for (Object object : objects) { try { // 不斷加熱方法,促使進行 JIT編譯 Method method = object.getClass().getMethod("method"); for (int i = 0; i <= compileThreshold; i++) { if (!flag.get()) { LOGGER.info("Experiment stopped. Stop code cache object preheating."); return; } method.invoke(object); } } catch (Exception e) { LOGGER.error("Preheating CodeCacheObject failed.", e); } } } } }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM