spring中使用異步事件實現同步事務


結合Scala+Spring,我們將采取一個很簡單的場景:下訂單,然后發送一封電子郵件。
編制一個服務:
@Service
class OrderService @Autowired() (orderDao: OrderDao, mailNotifier: OrderMailNotifier) {
 
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order //保存訂單
        mailNotifier sendMail order //發送郵件
    }
}
上面代碼是在保存訂單和發送郵件兩個同步執行,發送郵件需要連接郵件服務器,比較耗時,拖延了整個性能,我們采取異步發送電子郵件,利用Spring內置的自定義事件,與JMS或其他生產者 - 消費者類似。
case class OrderPlacedEvent(order: Order) extends ApplicationEvent
 
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
 
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        eventPublisher publishEvent OrderPlacedEvent(order)
    }
 
}
區別是繼承了ApplicationEvent 之前是直接用 OrderMailNotifier 直接發送,而現在我們使用 ApplicationEventPublisher 發送發郵件事件了。
事件監聽者代碼如下:
@Service
class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] {
 
    def onApplicationEvent(event: OrderPlacedEvent) {
        //sending e-mail...
    }
 
}
在監聽者方法中真正實現郵件發送。
但是Spring的ApplicationEvents是同步事件,意味着我們並沒有真正實現異步,程序還會在這里堵塞,如果希望異步,我們需要重新定義一個ApplicationEventMulticaster,實現類型SimpleApplicationEventMulticaster和TaskExecutor:
@Bean
def applicationEventMulticaster() = {
    val multicaster = new SimpleApplicationEventMulticaster()
    multicaster.setTaskExecutor(taskExecutor())
    multicaster
}
 
@Bean
def taskExecutor() = {
    val pool = new ThreadPoolTaskExecutor()
    pool.setMaxPoolSize(10)
    pool.setCorePoolSize(10)
    pool.setThreadNamePrefix("Spring-Async-")
    pool
}
Spring通過使用TaskExecutor已經支持廣播事件了,對onApplicationEvent() 標注  @Async
@Async
def onApplicationEvent(event: OrderPlacedEvent) { //...
如果你希望使用@Async,可以編制自己的異步執行器:
@Configuration
@EnableAsync
class ThreadingConfig extends AsyncConfigurer {
    def getAsyncExecutor = taskExecutor()
 
    @Bean
    def taskExecutor() = {
        val pool = new ThreadPoolTaskExecutor()
        pool.setMaxPoolSize(10)
        pool.setCorePoolSize(10)
        pool.setThreadNamePrefix("Spring-Async-")
        pool
    }
 
}
@ EnableAsync是足夠了。,默認情況下,Spring使用SimpleAsyncTaskExecutor類創建新的線程。
以上所有設置暴露一個真正的問題。現在,我們雖然使用其他線程發送一個異步消息處理。不幸的是,我們引入競爭條件。
  1. 開始事務
  2. 存儲order到數據庫
  3. 發送一個包裝order的消息
  4. 確認
異步線程獲得OrderPlacedEvent並開始處理。現在的問題是,它發生(3)之后,還是(4)之前或者(4)之后?這有一個很大的區別!在前者的情況下,交易也尚未提交訂單所以不存在於數據庫中。另一方面,延遲加載可能已經在工作,致使訂單對象仍然然綁定在 PersistenceContext(缺省我們使用JPA)。
解決辦法是使用  TransactionSynchronizationManager.,可以注冊很多監聽者  TransactionSynchronization,它對於事務的提交或回滾都有事件發送。
@Transactional
def placeOrder(order: Order) {
    orderDao save order
    afterCommit {
        eventPublisher publishEvent OrderPlacedEvent(order)
    }
}
 
private def afterCommit[T](fun: => T) {
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter {
        override def afterCommit() {
            fun
        }
    })
}
當前事務提交后 afterCommit()接受調用,可以安全地調用registerSynchronization()多次 - 監聽器存儲在Set並且本地保存到當前事務中,事務提交后消失。
我們將afterCommit方法單獨抽象成一個類,分離關注。
class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher)
    extends ApplicationEventPublisher {
 
    override def publishEvent(event: ApplicationEvent) {
        if (TransactionSynchronizationManager.isActualTransactionActive) {
            TransactionSynchronizationManager.registerSynchronization(
                new TransactionSynchronizationAdapter {
                    override def afterCommit() {
                        delegate publishEvent event
                    }
                })
        }
        else
            delegate publishEvent event
    }
 
}
TransactionAwareApplicationEventPublisher是實現Spring的ApplicationEventPublisher。
我們要將這個新的實現告訴Spring替換掉舊的,用@Primary:
@Resource
val applicationContext: ApplicationContext = null
 
@Bean
@Primary
def transactionAwareApplicationEventPublisher() =
    new TransactionAwareApplicationEventPublisher(applicationContext)
再看看原來的訂單服務:
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
 
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        eventPublisher publishEvent OrderPlacedEvent(order)
    }
注意這里ApplicationEventPublisher已經是我們自己實現的TransactionAwareApplicationEventPublisher,將被自動注入這個服務。
最后,要在真正訂單保存的業務代碼上放置事務:
def placeOrder(order: Order) {
    storeOrder(order)
    eventPublisher publishEvent OrderPlacedEvent(order)
}
 
@Transactional
def storeOrder(order: Order) = orderDao save order
當然這沒有根本解決問題,如果placeOrder有一個更大的事務怎么辦?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM