結合Scala+Spring,我們將采取一個很簡單的場景:下訂單,然后發送一封電子郵件。
編制一個服務:
@Service
class OrderService @Autowired() (orderDao: OrderDao, mailNotifier: OrderMailNotifier) {
@Transactional
def placeOrder(order: Order) {
orderDao save order //保存訂單
mailNotifier sendMail order //發送郵件
}
}
上面代碼是在保存訂單和發送郵件兩個同步執行,發送郵件需要連接郵件服務器,比較耗時,拖延了整個性能,我們采取異步發送電子郵件,利用Spring內置的自定義事件,與JMS或其他生產者 - 消費者類似。
case class OrderPlacedEvent(order: Order) extends ApplicationEvent
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
@Transactional
def placeOrder(order: Order) {
orderDao save order
eventPublisher publishEvent OrderPlacedEvent(order)
}
}
區別是繼承了ApplicationEvent 之前是直接用 OrderMailNotifier 直接發送,而現在我們使用
ApplicationEventPublisher 發送發郵件事件了。
事件監聽者代碼如下:
@Service
class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] {
def onApplicationEvent(event: OrderPlacedEvent) {
//sending e-mail...
}
}
在監聽者方法中真正實現郵件發送。
但是Spring的ApplicationEvents是同步事件,意味着我們並沒有真正實現異步,程序還會在這里堵塞,如果希望異步,我們需要重新定義一個ApplicationEventMulticaster,實現類型SimpleApplicationEventMulticaster和TaskExecutor:
@Bean
def applicationEventMulticaster() = {
val multicaster = new SimpleApplicationEventMulticaster()
multicaster.setTaskExecutor(taskExecutor())
multicaster
}
@Bean
def taskExecutor() = {
val pool = new ThreadPoolTaskExecutor()
pool.setMaxPoolSize(10)
pool.setCorePoolSize(10)
pool.setThreadNamePrefix("Spring-Async-")
pool
}
Spring通過使用TaskExecutor已經支持廣播事件了,對onApplicationEvent() 標注
@Async
@Async
def onApplicationEvent(event: OrderPlacedEvent) { //...
如果你希望使用@Async,可以編制自己的異步執行器:
@Configuration
@EnableAsync
class ThreadingConfig extends AsyncConfigurer {
def getAsyncExecutor = taskExecutor()
@Bean
def taskExecutor() = {
val pool = new ThreadPoolTaskExecutor()
pool.setMaxPoolSize(10)
pool.setCorePoolSize(10)
pool.setThreadNamePrefix("Spring-Async-")
pool
}
}
@ EnableAsync是足夠了。,默認情況下,Spring使用SimpleAsyncTaskExecutor類創建新的線程。
以上所有設置暴露一個真正的問題。現在,我們雖然使用其他線程發送一個異步消息處理。不幸的是,我們引入競爭條件。
- 開始事務
- 存儲order到數據庫
- 發送一個包裝order的消息
- 確認
異步線程獲得OrderPlacedEvent並開始處理。現在的問題是,它發生(3)之后,還是(4)之前或者(4)之后?這有一個很大的區別!在前者的情況下,交易也尚未提交訂單所以不存在於數據庫中。另一方面,延遲加載可能已經在工作,致使訂單對象仍然然綁定在 PersistenceContext(缺省我們使用JPA)。
解決辦法是使用
TransactionSynchronizationManager.,可以注冊很多監聽者
TransactionSynchronization,它對於事務的提交或回滾都有事件發送。
@Transactional
def placeOrder(order: Order) {
orderDao save order
afterCommit {
eventPublisher publishEvent OrderPlacedEvent(order)
}
}
private def afterCommit[T](fun: => T) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter {
override def afterCommit() {
fun
}
})
}
當前事務提交后 afterCommit()接受調用,可以安全地調用registerSynchronization()多次 - 監聽器存儲在Set並且本地保存到當前事務中,事務提交后消失。
我們將afterCommit方法單獨抽象成一個類,分離關注。
class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher)
extends ApplicationEventPublisher {
override def publishEvent(event: ApplicationEvent) {
if (TransactionSynchronizationManager.isActualTransactionActive) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter {
override def afterCommit() {
delegate publishEvent event
}
})
}
else
delegate publishEvent event
}
}
TransactionAwareApplicationEventPublisher是實現Spring的ApplicationEventPublisher。
我們要將這個新的實現告訴Spring替換掉舊的,用@Primary:
@Resource
val applicationContext: ApplicationContext = null
@Bean
@Primary
def transactionAwareApplicationEventPublisher() =
new TransactionAwareApplicationEventPublisher(applicationContext)
再看看原來的訂單服務:
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
@Transactional
def placeOrder(order: Order) {
orderDao save order
eventPublisher publishEvent OrderPlacedEvent(order)
}
注意這里ApplicationEventPublisher已經是我們自己實現的TransactionAwareApplicationEventPublisher,將被自動注入這個服務。
最后,要在真正訂單保存的業務代碼上放置事務:
def placeOrder(order: Order) {
storeOrder(order)
eventPublisher publishEvent OrderPlacedEvent(order)
}
@Transactional
def storeOrder(order: Order) = orderDao save order
當然這沒有根本解決問題,如果placeOrder有一個更大的事務怎么辦?