基於Kafka消息驅動最終一致事務(二)


實現用例分析

上篇基於Kafka消息驅動最終一致事務(一)介紹BASE的理論,接着我們引入一個實例看如何實現BASE,我們會用圖7顯示的算法實現BASE。

首先介紹使用技術棧

JDK:1.8

Spring:spring-boot,spring-data-jpa

數據庫:Mysql

消息服務器:Kafka

數據表

用戶庫user創建用戶表user,更新應用表updates_applied

CREATE TABLE `user` (
    `id` INT(11) NOT NULL AUTO_INCREMENT,
    `name` VARCHAR(50) NOT NULL,
    `amt_sold` INT(11) NOT NULL DEFAULT '0',
    `amt_bought` INT(11) NOT NULL DEFAULT '0',
    PRIMARY KEY (`id`)
);

CREATE TABLE `updates_applied` (
    `trans_id` INT(11) NOT NULL,
    `balance` VARCHAR(50) NOT NULL,
    `user_id` INT(11) NOT NULL
);

 交易庫transaction創建交易庫表transaction

CREATE TABLE `transaction` (
    `xid` INT(11) NOT NULL AUTO_INCREMENT,
    `seller_id` INT(11) NOT NULL,
    `buyer_id` INT(11) NOT NULL,
    `amount` INT(11) NOT NULL,
    PRIMARY KEY (`xid`)
);

配置兩個數據源

使用JavaConfig方式。其它damain類,repository類,service類請看源碼github地址:https://github.com/birdstudiocn/spring-sample/tree/master/Message-Driven-Sample

package cn.birdstudio.user.domain;

import javax.sql.DataSource;

import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
@EnableJpaRepositories(basePackageClasses = User.class, entityManagerFactoryRef = "userEntityManagerFactory", transactionManagerRef = "userTransactionManager")
class UserDataSourceConfiguration {
	@Bean

	@ConfigurationProperties("app.datasource.user")
	DataSourceProperties userDataSourceProperties() {
		return new DataSourceProperties();
	}

	@Bean

	@ConfigurationProperties("app.datasource.user")
	DataSource userDataSource() {
		return userDataSourceProperties().initializeDataSourceBuilder().build();
	}

	@Bean
	LocalContainerEntityManagerFactoryBean userEntityManagerFactory() {
		HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
		vendorAdapter.setGenerateDdl(false);
		LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
		factory.setJpaVendorAdapter(vendorAdapter);
		factory.setPackagesToScan(User.class.getPackage().getName());
		factory.setDataSource(userDataSource());
		factory.setPersistenceUnitName("user");
		return factory;
	}

	@Bean
	PlatformTransactionManager userTransactionManager() {
		JpaTransactionManager txManager = new JpaTransactionManager();
		txManager.setEntityManagerFactory(userEntityManagerFactory().getObject());
		return txManager;
	}
}

 TransactionDataSourceConfiguration

package cn.birdstudio.transaction.domain;

import javax.sql.DataSource;

import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
@EnableJpaRepositories(basePackageClasses = Transaction.class, entityManagerFactoryRef = "transactionEntityManagerFactory", transactionManagerRef = "transactionManager")
class TransactionDataSourceConfiguration {
	@Bean
	@ConfigurationProperties("app.datasource.transaction")
	DataSourceProperties transactionDataSourceProperties() {
		return new DataSourceProperties();
	}

	@Bean
	@ConfigurationProperties("app.datasource.transaction")
	DataSource transactionDataSource() {
		return transactionDataSourceProperties().initializeDataSourceBuilder().build();
	}

	@Bean
	LocalContainerEntityManagerFactoryBean transactionEntityManagerFactory() {
		HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
		vendorAdapter.setGenerateDdl(false);
		LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
		factory.setJpaVendorAdapter(vendorAdapter);
		factory.setPackagesToScan(Transaction.class.getPackage().getName());
		factory.setDataSource(transactionDataSource());
		factory.setPersistenceUnitName("transaction");
		return factory;
	}

	@Bean
	PlatformTransactionManager transactionManager() {
		JpaTransactionManager txManager = new JpaTransactionManager();
		txManager.setEntityManagerFactory(transactionEntityManagerFactory().getObject());
		return txManager;
	}
}

配置Kafka消息服務

生產者配置類KafkaProducerConfig.java,配置KafkaTransactionManager必須設置producerFactory.setTransactionIdPrefix("trans");

Configuration
public class KafkaProducerConfig {
	@Bean
	public ProducerFactory<String, Map<String, Object>> producerFactory() {
		DefaultKafkaProducerFactory<String, Map<String, Object>> producerFactory = new DefaultKafkaProducerFactory<>(
				producerConfigs());
		producerFactory.setTransactionIdPrefix("trans");
		return producerFactory;
	}

	@Bean
	public Map<String, Object> producerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.1.168:9092");
		props.put(ProducerConfig.RETRIES_CONFIG, 2);
		props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
		props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
		props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

		return props;
	}

	@Bean
	public KafkaTemplate<String, Map<String, Object>> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}
}

消費者配置類KafkaConsumerConfig.java,配置KafkaTransactionManager

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
	@Bean
	public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(
			ProducerFactory<String, Map<String, Object>> producerFactory) {
		ConcurrentKafkaListenerContainerFactory<String, TransactionMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		//factory.setMessageConverter(new StringJsonMessageConverter());
		//factory.setConcurrency(3);
		factory.getContainerProperties().setPollTimeout(3000);
		factory.getContainerProperties().setTransactionManager(new KafkaTransactionManager<>(producerFactory));
		return factory;
	}

	@Bean
	public ConsumerFactory<String, TransactionMessage> consumerFactory() {
		JsonDeserializer<TransactionMessage> jd = new JsonDeserializer<>(TransactionMessage.class);
		return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jd);
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> propsMap = new HashMap<>();
		propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.1.168:9092");
		propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
		propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
		propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
		propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
		propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
		return propsMap;
	}
}

Kafka消息監聽接口實現UserServiceImpl。@KafkaListener(groupId = "group1", topics = "transaction")注釋監聽事件接口,@Transactional("userTransactionManager")注釋數據庫事務。事件接口被調用KafkaTransactionManager事務開始,然后JpaTransactionManager事務開始,如果事務提交則調用producer.sendOffsetsToTransaction(),最后KafkaTransactionManager事務提交。如果JpaTransactionManager事務有異常則不調用producer.sendOffsetsToTransaction()。如果JpaTransactionManager事務提交后KafkaTransactionManager事務有異常也不調用producer.sendOffsetsToTransaction()。int processed = updatesAppliedRepository.find(trans_id, id, type.toString())語句來判斷是否已經更新了User。producer.sendOffsetsToTransaction()作用與刪除隊列消息相當。

@Component("userService")
public class UserServiceImpl implements UserService {
	private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);
	private final UserRepository userRepository;
	@Resource
	private UpdatesAppliedRepository updatesAppliedRepository;

	public UserServiceImpl(UserRepository userRepository) {
		this.userRepository = userRepository;
	}

	private void sold(TransactionMessage msg) {
		Type type = msg.getType();
		int id = msg.getId();
		int amount = msg.getAmount();
		int trans_id = msg.getXid();
		int processed = updatesAppliedRepository.find(trans_id, id, type.toString());
		if (processed == 0) {
			switch (type) {
			case SELLER:
				userRepository.updateAmtSold(id, amount);
				break;
			case BUYER:
				userRepository.updateAmtBought(id, amount);
				break;
			}
			//throwException();
			UpdatesApplied updatesApplied = new UpdatesApplied();
			updatesApplied.setTrans_id(trans_id);
			updatesApplied.setUser_id(id);
			updatesApplied.setBalance(type.toString());
			updatesAppliedRepository.save(updatesApplied);
		}
	}

	@Override
	@Transactional("userTransactionManager")
	@KafkaListener(groupId = "group1", topics = "transaction")
	//@KafkaListener(groupId = "group1", topicPartitions = @TopicPartition(topic = "", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5")))
	public void receivekafka(TransactionMessage msg) {
		logger.info("receive kafka message {}", msg);
		sold(msg);
	}

	private void throwException() {
		throw new RuntimeException("throw exception in test");
	}
}

參考資料

1,http://queue.acm.org/detail.cfm?id=1394128

2,Spring Data JPA - Multiple datasources exam

3,JMS

4,https://stackoverflow.com/questions/42230797/spring-cloud-stream-kafka-eventual-consistency-does-kafka-auto-retry-unackno

5,http://www.kennybastani.com/2016/04/event-sourcing-microservices-spring-cloud.html

6,使用Spring Cloud和Reactor在微服務中實現Event Sourcing

7,Spring Kafka Tutorial – Getting Started with the Spring for Apache Kafka

8,碧桂園旺生活平台解決分布式事務方案之tcc開源框架 https://github.com/yu199195/happylifeplat-tcc


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM