對於KC的后台或者接口的操作,當用戶,組,角色這些實體狀態發生改變時,KC會對外發布事件,而這些事件處理程序我們是可以在后台配置的,默認繼承了jboss-logging日志事件,而我們可以在事件管理中去配置自己的事件處理程序。
事件處理程序SPI
實現EventListenerProviderFactory
和EventListenerProvider
這兩個SPI即可,我們在這里可以訂閱由KC發出現的事件,針對我們感興趣的事件,去添加處理代碼。
/**
* 后端管理平台事件
*
* @param adminEvent
* @param b
*/
@Override
public void onEvent(AdminEvent adminEvent, boolean b) {
logger.info(adminEvent.getResourceTypeAsString()))
}
總結常用的消息類型
- 操作類型:operationType
- 后端類型
- CREATE 新建
- UPDATE 編輯
- DELETE 刪除
- 后台資源類型:resourceType
- REALM_ROLE 域的角色
- REALM_ROLE_MAPPING 域的角色綁定
- USER 用戶
- GROUP 組
- GROUP_MEMBERSHIP 組綁定
- CLIENT_ROLE 客戶端角色
- CLIENT_ROLE_MAPPING 客戶端角色綁定
- 前端類型
- LOGIN 登錄
- LOGIN_ERROR 登錄失敗
- REGISTER 注冊
- REGISTER_ERROR 注冊失敗
- LOGOUT 登出
- LOGOUT_ERROR 登出失敗
- 后端類型
擴展一個KAFKA
- 需要擴展相關組件
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
- 需要配置你的kafka的生產者
kc這邊是一個kafka的生產者,當它收到由KC發出來的事件之后,把消息發到kafka,其它服務方可以消費這個kafka消息
/**
* kafka生產者.
*/
public class Producer {
private final static String BOOTSTRAP_SERVER = ConfigFactory.getInstance().getStrPropertyValue("kafka.host");
private final static Logger logger = Logger.getLogger(Producer.class);
private static KafkaProducer<String, String> producer;
private static KafkaProducer<String, String> getProducer() {
if (producer == null) {
//reset thread context
resetThreadContext();
// create the producer
producer = new KafkaProducer<String, String>(getProperties());
}
return producer;
}
public static void publishEvent(String topic, String value) {
// create a producer record
ProducerRecord<String, String> eventRecord =
new ProducerRecord<String, String>(topic, value);
// send data - asynchronous
getProducer().send(eventRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
logger.info(String.format("The offset of the record we just sent is:%s", recordMetadata.offset()));
}
}
});
}
private static void resetThreadContext() {
Thread.currentThread().setContextClassLoader(null);
}
public static Properties getProperties() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
return properties;
}
}
最后,在你的kc事件處理SPI里,調用咱們的KAFKA生產者去生產消息就行了