Hmily框架特性
-
無縫集成Spring,Spring boot start。
-
無縫集成Dubbo,SpringCloud,Motan等rpc框架。
-
多種事務日志的存儲方式(redis,mongdb,mysql等)。
-
多種不同日志序列化方式(Kryo,protostuff,hession)。
-
事務自動恢復。
-
支持內嵌事務的依賴傳遞。
-
代碼零侵入,配置簡單靈活。
Hmily為什么這么高性能?
1.采用disruptor進行事務日志的異步讀寫(disruptor是一個無鎖,無GC的並發編程框架)
package com.hmily.tcc.core.disruptor.publisher; import com.hmily.tcc.common.bean.entity.TccTransaction; import com.hmily.tcc.common.enums.EventTypeEnum; import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory; import com.hmily.tcc.core.coordinator.CoordinatorService; import com.hmily.tcc.core.disruptor.event.HmilyTransactionEvent; import com.hmily.tcc.core.disruptor.factory.HmilyTransactionEventFactory; import com.hmily.tcc.core.disruptor.handler.HmilyConsumerDataHandler; import com.hmily.tcc.core.disruptor.translator.HmilyTransactionEventTranslator; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * event publisher. * * @author xiaoyu(Myth) */ @Component public class HmilyTransactionEventPublisher implements DisposableBean { private Disruptor<HmilyTransactionEvent> disruptor; private final CoordinatorService coordinatorService; @Autowired public HmilyTransactionEventPublisher(final CoordinatorService coordinatorService) { this.coordinatorService = coordinatorService; } /** * disruptor start. * * @param bufferSize this is disruptor buffer size. * @param threadSize this is disruptor consumer thread size. */ public void start(final int bufferSize, final int threadSize) { disruptor = new Disruptor<>(new HmilyTransactionEventFactory(), bufferSize, r -> { AtomicInteger index = new AtomicInteger(1); return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement()); }, ProducerType.MULTI, new BlockingWaitStrategy()); final Executor executor = new ThreadPoolExecutor(threadSize, threadSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), HmilyThreadFactory.create("hmily-log-disruptor", false), new ThreadPoolExecutor.AbortPolicy()); HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize]; for (int i = 0; i < threadSize; i++) { consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService); } disruptor.handleEventsWithWorkerPool(consumers); disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler()); disruptor.start(); } /** * publish disruptor event. * * @param tccTransaction {@linkplain com.hmily.tcc.common.bean.entity.TccTransaction } * @param type {@linkplain EventTypeEnum} */ public void publishEvent(final TccTransaction tccTransaction, final int type) { final RingBuffer<HmilyTransactionEvent> ringBuffer = disruptor.getRingBuffer(); ringBuffer.publishEvent(new HmilyTransactionEventTranslator(type), tccTransaction); } @Override public void destroy() { disruptor.shutdown(); } }
在這里bufferSize 的默認值是4094 * 4,用戶可以根據自行的情況進行配置。
HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize]; for (int i = 0; i < threadSize; i++) { consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService); } disruptor.handleEventsWithWorkerPool(consumers);
這里是采用多個消費者去處理隊列里面的任務。
2.異步執行confrim,cancel方法。
package com.hmily.tcc.core.service.handler; import com.hmily.tcc.common.bean.context.TccTransactionContext; import com.hmily.tcc.common.bean.entity.TccTransaction; import com.hmily.tcc.common.enums.TccActionEnum; import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory; import com.hmily.tcc.core.service.HmilyTransactionHandler; import com.hmily.tcc.core.service.executor.HmilyTransactionExecutor; import org.aspectj.lang.ProceedingJoinPoint; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * this is transaction starter. * * @author xiaoyu */ @Component public class StarterHmilyTransactionHandler implements HmilyTransactionHandler { private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors() << 1; private final HmilyTransactionExecutor hmilyTransactionExecutor; private final Executor executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), HmilyThreadFactory.create("hmily-execute", false), new ThreadPoolExecutor.AbortPolicy()); @Autowired public StarterHmilyTransactionHandler(final HmilyTransactionExecutor hmilyTransactionExecutor) { this.hmilyTransactionExecutor = hmilyTransactionExecutor; } @Override public Object handler(final ProceedingJoinPoint point, final TccTransactionContext context) throws Throwable { Object returnValue; try { TccTransaction tccTransaction = hmilyTransactionExecutor.begin(point); try { //execute try returnValue = point.proceed(); tccTransaction.setStatus(TccActionEnum.TRYING.getCode()); hmilyTransactionExecutor.updateStatus(tccTransaction); } catch (Throwable throwable) { //if exception ,execute cancel final TccTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction(); executor.execute(() -> hmilyTransactionExecutor .cancel(currentTransaction)); throw throwable; } //execute confirm final TccTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction(); executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction)); } finally { hmilyTransactionExecutor.remove(); } return returnValue; } }
當try方法的AOP切面有異常的時候,采用線程池異步去執行cancel,無異常的時候去執行confrim方法。
這里有人可能會問:那么cancel方法異常,或者confrim方法異常怎么辦呢?
答:首先這種情況是非常罕見的,因為你上一面才剛剛執行完try。其次如果出現這種情況,在try階段會保存好日志,Hmily有內置的調度線程池來進行恢復,不用擔心。
有人又會問:這里如果日志保存異常了怎么辦?
答:首先這又是一個牛角尖問題,首先日志配置的參數,在框架啟動的時候,會要求你配置的。其次,就算在運行過程中日志保存異常,這時候框架會取緩存中的,並不會影響程序正確執行。最后,萬一日志保存異常了,系統又在很極端的情況下down機了,恭喜你,你可以去買彩票了,最好的解決辦法就是不去解決它。
3.ThreadLocal緩存的使用。
/** * transaction begin. * * @param point cut point. * @return TccTransaction */ public TccTransaction begin(final ProceedingJoinPoint point) { LogUtil.debug(LOGGER, () -> "......hmily transaction!start...."); //build tccTransaction final TccTransaction tccTransaction = buildTccTransaction(point, TccRoleEnum.START.getCode(), null); //save tccTransaction in threadLocal CURRENT.set(tccTransaction); //publishEvent hmilyTransactionEventPublisher.publishEvent(tccTransaction, EventTypeEnum.SAVE.getCode()); //set TccTransactionContext this context transfer remote TccTransactionContext context = new TccTransactionContext(); //set action is try context.setAction(TccActionEnum.TRYING.getCode()); context.setTransId(tccTransaction.getTransId()); context.setRole(TccRoleEnum.START.getCode()); TransactionContextLocal.getInstance().set(context); return tccTransaction; }
首先要理解,threadLocal保存的發起者一方法的事務信息。這個很重要,不要會有點懵逼。rpc的調用,會形成調用鏈,進行保存。
/** * add participant. * * @param participant {@linkplain Participant} */ public void enlistParticipant(final Participant participant) { if (Objects.isNull(participant)) { return; } Optional.ofNullable(getCurrentTransaction()) .ifPresent(c -> { c.registerParticipant(participant); updateParticipant(c); }); }
4.GuavaCache的使用
package com.hmily.tcc.core.cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.Weigher; import com.hmily.tcc.common.bean.entity.TccTransaction; import com.hmily.tcc.core.coordinator.CoordinatorService; import com.hmily.tcc.core.helper.SpringBeanUtils; import org.apache.commons.lang3.StringUtils; import java.util.Optional; import java.util.concurrent.ExecutionException; /** * use google guava cache. * @author xiaoyu */ public final class TccTransactionCacheManager { private static final int MAX_COUNT = 10000; private static final LoadingCache<String, TccTransaction> LOADING_CACHE = CacheBuilder.newBuilder().maximumWeight(MAX_COUNT) .weigher((Weigher<String, TccTransaction>) (string, tccTransaction) -> getSize()) .build(new CacheLoader<String, TccTransaction>() { @Override public TccTransaction load(final String key) { return cacheTccTransaction(key); } }); private static CoordinatorService coordinatorService = SpringBeanUtils.getInstance().getBean(CoordinatorService.class); private static final TccTransactionCacheManager TCC_TRANSACTION_CACHE_MANAGER = new TccTransactionCacheManager(); private TccTransactionCacheManager() { } /** * TccTransactionCacheManager. * * @return TccTransactionCacheManager */ public static TccTransactionCacheManager getInstance() { return TCC_TRANSACTION_CACHE_MANAGER; } private static int getSize() { return (int) LOADING_CACHE.size(); } private static TccTransaction cacheTccTransaction(final String key) { return Optional.ofNullable(coordinatorService.findByTransId(key)).orElse(new TccTransaction()); } /** * cache tccTransaction. * * @param tccTransaction {@linkplain TccTransaction} */ public void cacheTccTransaction(final TccTransaction tccTransaction) { LOADING_CACHE.put(tccTransaction.getTransId(), tccTransaction); } /** * acquire TccTransaction. * * @param key this guava key. * @return {@linkplain TccTransaction} */ public TccTransaction getTccTransaction(final String key) { try { return LOADING_CACHE.get(key); } catch (ExecutionException e) { return new TccTransaction(); } } /** * remove guava cache by key. * @param key guava cache key. */ public void removeByKey(final String key) { if (StringUtils.isNotEmpty(key)) { LOADING_CACHE.invalidate(key); } } }
在參與者中,我們使用了ThreadLocal,而在參與者中,我們為什么不使用呢?
其實原因有二點:首先.因為try,和confrim 會不在一個線程里,會造成ThreadLocal失效。當考慮到RPC集群的時候,可能會負載到不同的機器上。
這里有一個細節就是:
private static TccTransaction cacheTccTransaction(final String key) { return Optional.ofNullable(coordinatorService.findByTransId(key)).orElse(new TccTransaction()); }
當GuavaCache里面沒有的時候,會去查詢日志返回,這樣就保證了對集群環境的支持。以上4點造就了Hmily是一個異步的高性能分布式事務TCC框架的原因。
Hmily如何使用?
(https://github.com/yu199195/hmily/tree/master/hmily-tcc-demo)
首先因為之前的包命名問題,框架包並沒有上傳到maven中心倉庫,固需要使用者自己拉取代碼,編譯deploy到自己的私服。
1.dubbo用戶
- 在你的Api接口項目引入
<dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-annotation</artifactId> <version>{you version}</version> </dependency>
- 在你的服務提供者項目引入
<dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-dubbo</artifactId> <version>{you version}</version> </dependency>
- 配置啟動bean
<!-- Aspect 切面配置,是否開啟AOP切面--> <aop:aspectj-autoproxy expose-proxy="true"/> <!--掃描框架的包--> <context:component-scan base-package="com.hmily.tcc.*"/> <!--啟動類屬性配置--> <bean id="hmilyTransactionBootstrap" class="com.hmily.tcc.core.bootstrap.HmilyTransactionBootstrap"> <property name="serializer" value="kryo"/> <property name="recoverDelayTime" value="120"/> <property name="retryMax" value="3"/> <property name="scheduledDelay" value="120"/> <property name="scheduledThreadMax" value="4"/> <property name="repositorySupport" value="db"/> <property name="tccDbConfig"> <bean class="com.hmily.tcc.common.config.TccDbConfig"> <property name="url" value="jdbc:mysql://192.168.1.98:3306/tcc?useUnicode=true&characterEncoding=utf8"/> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="username" value="root"/> <property name="password" value="123456"/> </bean> </property> </bean>
當然配置屬性很多,這里只給出了demo,具體可以參考這個類:
package com.hmily.tcc.common.config; import com.hmily.tcc.common.enums.RepositorySupportEnum; import lombok.Data; /** * hmily config. * * @author xiaoyu */ @Data public class TccConfig { /** * Resource suffix this parameter please fill in about is the transaction store path. * If it's a table store this is a table suffix, it's stored the same way. * If this parameter is not filled in, the applicationName of the application is retrieved by default */ private String repositorySuffix; /** * log serializer. * {@linkplain com.hmily.tcc.common.enums.SerializeEnum} */ private String serializer = "kryo"; /** * scheduledPool Thread size. */ private int scheduledThreadMax = Runtime.getRuntime().availableProcessors() << 1; /** * scheduledPool scheduledDelay unit SECONDS. */ private int scheduledDelay = 60; /** * retry max. */ private int retryMax = 3; /** * recoverDelayTime Unit seconds * (note that this time represents how many seconds after the local transaction was created before execution). */ private int recoverDelayTime = 60; /** * Parameters when participants perform their own recovery. * 1.such as RPC calls time out * 2.such as the starter down machine */ private int loadFactor = 2; /** * repositorySupport. * {@linkplain RepositorySupportEnum} */ private String repositorySupport = "db"; /** * disruptor bufferSize. */ private int bufferSize = 4096 * 2 * 2; /** * this is disruptor consumerThreads. */ private int consumerThreads = Runtime.getRuntime().availableProcessors() << 1; /** * db config. */ private TccDbConfig tccDbConfig; /** * mongo config. */ private TccMongoConfig tccMongoConfig; /** * redis config. */ private TccRedisConfig tccRedisConfig; /** * zookeeper config. */ private TccZookeeperConfig tccZookeeperConfig; /** * file config. */ private TccFileConfig tccFileConfig; }
2.SpringCloud用戶
- 需要引入
<dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-springcloud</artifactId> <version>{you version}</version> </dependency>
- 配置啟動bean 如上。
3.Motan用戶
- 需要引入
<dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-motan</artifactId> <version>{you version}</version> </dependency>
- 配置啟動bean 如上。
hmily-spring-boot-start
- 那這個就更容易了,只需要根據你的RPC框架去引入不同的jar包。
- 如果你是dubbo用戶,那么引入
<dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-spring-boot-starter-dubbo</artifactId> <version>${your version}</version> </dependency>
- 如果你是SpringCloud用戶,那么引入
<dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-spring-boot-starter-springcloud</artifactId> <version>${your version}</version> </dependency>
- 如果你是Motan用戶,那么引入
<dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-spring-boot-starter-motan</artifactId> <version>${your version}</version> </dependency>
- 然后在你的yml里面進行如下配置:
hmily: tcc : serializer : kryo recoverDelayTime : 128 retryMax : 3 scheduledDelay : 128 scheduledThreadMax : 10 repositorySupport : db tccDbConfig : driverClassName : com.mysql.jdbc.Driver url : jdbc:mysql://192.168.1.98:3306/tcc?useUnicode=true&characterEncoding=utf8 username : root password : 123456 #repositorySupport : redis #tccRedisConfig: #masterName: mymaster #sentinel : true #sentinelUrl : 192.168.1.91:26379;192.168.1.92:26379;192.168.1.93:26379 #password : foobaredbbexONE123 # repositorySupport : zookeeper # host : 92.168.1.73:2181 # sessionTimeOut : 100000 # rootPath : /tcc # repositorySupport : mongodb # mongoDbUrl : 192.168.1.68:27017 # mongoDbName : happylife # mongoUserName : xiaoyu # mongoUserPwd : 123456 # repositorySupport : file # path : /account # prefix : account
就這么簡單,然后就可以在接口方法上加上@Tcc注解,進行愉快的使用了。當然因為篇幅問題,很多東西只是簡單的描述,尤其是邏輯方面的。
下面是github地址:https://github.com/yu199195/hmily
