實現用例分析
上篇基於Kafka消息驅動最終一致事務(一)介紹BASE的理論,接着我們引入一個實例看如何實現BASE,我們會用圖7顯示的算法實現BASE。
首先介紹使用技術棧
JDK:1.8
Spring:spring-boot,spring-data-jpa
數據庫:Mysql
消息服務器:Kafka
數據表
用戶庫user創建用戶表user,更新應用表updates_applied
CREATE TABLE `user` ( `id` INT(11) NOT NULL AUTO_INCREMENT, `name` VARCHAR(50) NOT NULL, `amt_sold` INT(11) NOT NULL DEFAULT '0', `amt_bought` INT(11) NOT NULL DEFAULT '0', PRIMARY KEY (`id`) ); CREATE TABLE `updates_applied` ( `trans_id` INT(11) NOT NULL, `balance` VARCHAR(50) NOT NULL, `user_id` INT(11) NOT NULL );
交易庫transaction創建交易庫表transaction
CREATE TABLE `transaction` ( `xid` INT(11) NOT NULL AUTO_INCREMENT, `seller_id` INT(11) NOT NULL, `buyer_id` INT(11) NOT NULL, `amount` INT(11) NOT NULL, PRIMARY KEY (`xid`) );
配置兩個數據源
使用JavaConfig方式。其它damain類,repository類,service類請看源碼github地址:https://github.com/birdstudiocn/spring-sample/tree/master/Message-Driven-Sample
package cn.birdstudio.user.domain; import javax.sql.DataSource; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter; import org.springframework.transaction.PlatformTransactionManager; @Configuration @EnableJpaRepositories(basePackageClasses = User.class, entityManagerFactoryRef = "userEntityManagerFactory", transactionManagerRef = "userTransactionManager") class UserDataSourceConfiguration { @Bean @ConfigurationProperties("app.datasource.user") DataSourceProperties userDataSourceProperties() { return new DataSourceProperties(); } @Bean @ConfigurationProperties("app.datasource.user") DataSource userDataSource() { return userDataSourceProperties().initializeDataSourceBuilder().build(); } @Bean LocalContainerEntityManagerFactoryBean userEntityManagerFactory() { HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter(); vendorAdapter.setGenerateDdl(false); LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean(); factory.setJpaVendorAdapter(vendorAdapter); factory.setPackagesToScan(User.class.getPackage().getName()); factory.setDataSource(userDataSource()); factory.setPersistenceUnitName("user"); return factory; } @Bean PlatformTransactionManager userTransactionManager() { JpaTransactionManager txManager = new JpaTransactionManager(); txManager.setEntityManagerFactory(userEntityManagerFactory().getObject()); return txManager; } }
TransactionDataSourceConfiguration
package cn.birdstudio.transaction.domain; import javax.sql.DataSource; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter; import org.springframework.transaction.PlatformTransactionManager; @Configuration @EnableJpaRepositories(basePackageClasses = Transaction.class, entityManagerFactoryRef = "transactionEntityManagerFactory", transactionManagerRef = "transactionManager") class TransactionDataSourceConfiguration { @Bean @ConfigurationProperties("app.datasource.transaction") DataSourceProperties transactionDataSourceProperties() { return new DataSourceProperties(); } @Bean @ConfigurationProperties("app.datasource.transaction") DataSource transactionDataSource() { return transactionDataSourceProperties().initializeDataSourceBuilder().build(); } @Bean LocalContainerEntityManagerFactoryBean transactionEntityManagerFactory() { HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter(); vendorAdapter.setGenerateDdl(false); LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean(); factory.setJpaVendorAdapter(vendorAdapter); factory.setPackagesToScan(Transaction.class.getPackage().getName()); factory.setDataSource(transactionDataSource()); factory.setPersistenceUnitName("transaction"); return factory; } @Bean PlatformTransactionManager transactionManager() { JpaTransactionManager txManager = new JpaTransactionManager(); txManager.setEntityManagerFactory(transactionEntityManagerFactory().getObject()); return txManager; } }
配置Kafka消息服務
生產者配置類KafkaProducerConfig.java,配置KafkaTransactionManager必須設置producerFactory.setTransactionIdPrefix("trans");
Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, Map<String, Object>> producerFactory() { DefaultKafkaProducerFactory<String, Map<String, Object>> producerFactory = new DefaultKafkaProducerFactory<>( producerConfigs()); producerFactory.setTransactionIdPrefix("trans"); return producerFactory; } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.1.168:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 2); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public KafkaTemplate<String, Map<String, Object>> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
消費者配置類KafkaConsumerConfig.java,配置KafkaTransactionManager
@Configuration @EnableKafka public class KafkaConsumerConfig { @Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory( ProducerFactory<String, Map<String, Object>> producerFactory) { ConcurrentKafkaListenerContainerFactory<String, TransactionMessage> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //factory.setMessageConverter(new StringJsonMessageConverter()); //factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); factory.getContainerProperties().setTransactionManager(new KafkaTransactionManager<>(producerFactory)); return factory; } @Bean public ConsumerFactory<String, TransactionMessage> consumerFactory() { JsonDeserializer<TransactionMessage> jd = new JsonDeserializer<>(TransactionMessage.class); return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jd); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.1.168:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return propsMap; } }
Kafka消息監聽接口實現UserServiceImpl。@KafkaListener(groupId = "group1", topics = "transaction")注釋監聽事件接口,@Transactional("userTransactionManager")注釋數據庫事務。事件接口被調用KafkaTransactionManager事務開始,然后JpaTransactionManager事務開始,如果事務提交則調用producer.sendOffsetsToTransaction(),最后KafkaTransactionManager事務提交。如果JpaTransactionManager事務有異常則不調用producer.sendOffsetsToTransaction()。如果JpaTransactionManager事務提交后KafkaTransactionManager事務有異常也不調用producer.sendOffsetsToTransaction()。int processed = updatesAppliedRepository.find(trans_id, id, type.toString())語句來判斷是否已經更新了User。producer.sendOffsetsToTransaction()作用與刪除隊列消息相當。
@Component("userService") public class UserServiceImpl implements UserService { private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class); private final UserRepository userRepository; @Resource private UpdatesAppliedRepository updatesAppliedRepository; public UserServiceImpl(UserRepository userRepository) { this.userRepository = userRepository; } private void sold(TransactionMessage msg) { Type type = msg.getType(); int id = msg.getId(); int amount = msg.getAmount(); int trans_id = msg.getXid(); int processed = updatesAppliedRepository.find(trans_id, id, type.toString()); if (processed == 0) { switch (type) { case SELLER: userRepository.updateAmtSold(id, amount); break; case BUYER: userRepository.updateAmtBought(id, amount); break; } //throwException(); UpdatesApplied updatesApplied = new UpdatesApplied(); updatesApplied.setTrans_id(trans_id); updatesApplied.setUser_id(id); updatesApplied.setBalance(type.toString()); updatesAppliedRepository.save(updatesApplied); } } @Override @Transactional("userTransactionManager") @KafkaListener(groupId = "group1", topics = "transaction") //@KafkaListener(groupId = "group1", topicPartitions = @TopicPartition(topic = "", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))) public void receivekafka(TransactionMessage msg) { logger.info("receive kafka message {}", msg); sold(msg); } private void throwException() { throw new RuntimeException("throw exception in test"); } }
參考資料
1,http://queue.acm.org/detail.cfm?id=1394128
2,Spring Data JPA - Multiple datasources exam
3,JMS
5,http://www.kennybastani.com/2016/04/event-sourcing-microservices-spring-cloud.html
6,使用Spring Cloud和Reactor在微服務中實現Event Sourcing
7,Spring Kafka Tutorial – Getting Started with the Spring for Apache Kafka
8,碧桂園旺生活平台解決分布式事務方案之tcc開源框架 https://github.com/yu199195/happylifeplat-tcc