keycloak~擴展事件機制,集成kafka中間件


對於KC的后台或者接口的操作,當用戶,組,角色這些實體狀態發生改變時,KC會對外發布事件,而這些事件處理程序我們是可以在后台配置的,默認繼承了jboss-logging日志事件,而我們可以在事件管理中去配置自己的事件處理程序。

事件處理程序SPI

實現EventListenerProviderFactoryEventListenerProvider這兩個SPI即可,我們在這里可以訂閱由KC發出現的事件,針對我們感興趣的事件,去添加處理代碼。

/**
     * 后端管理平台事件
     *
     * @param adminEvent
     * @param b
     */
    @Override
    public void onEvent(AdminEvent adminEvent, boolean b) {
      logger.info(adminEvent.getResourceTypeAsString()))
    }

總結常用的消息類型

  • 操作類型:operationType
    • 后端類型
      • CREATE 新建
      • UPDATE 編輯
      • DELETE 刪除
    • 后台資源類型:resourceType
      • REALM_ROLE 域的角色
      • REALM_ROLE_MAPPING 域的角色綁定
      • USER 用戶
      • GROUP 組
      • GROUP_MEMBERSHIP 組綁定
      • CLIENT_ROLE 客戶端角色
      • CLIENT_ROLE_MAPPING 客戶端角色綁定
    • 前端類型
      • LOGIN 登錄
      • LOGIN_ERROR 登錄失敗
      • REGISTER 注冊
      • REGISTER_ERROR 注冊失敗
      • LOGOUT 登出
      • LOGOUT_ERROR 登出失敗

擴展一個KAFKA

  • 需要擴展相關組件
<dependencies>
     <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>
  • 需要配置你的kafka的生產者
    kc這邊是一個kafka的生產者,當它收到由KC發出來的事件之后,把消息發到kafka,其它服務方可以消費這個kafka消息
/**
 * kafka生產者.
 */
public class Producer {

    private final static String BOOTSTRAP_SERVER = ConfigFactory.getInstance().getStrPropertyValue("kafka.host");
    private final static Logger logger = Logger.getLogger(Producer.class);
    private static KafkaProducer<String, String> producer;

    private static KafkaProducer<String, String> getProducer() {
        if (producer == null) {
            //reset thread context
            resetThreadContext();
            // create the producer
            producer = new KafkaProducer<String, String>(getProperties());
        }
        return producer;
    }

    public static void publishEvent(String topic, String value) {

        // create a producer record
        ProducerRecord<String, String> eventRecord =
                new ProducerRecord<String, String>(topic, value);

        // send data - asynchronous
        getProducer().send(eventRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    e.printStackTrace();
                } else {
                    logger.info(String.format("The offset of the record we just sent is:%s", recordMetadata.offset()));
                }
            }
        });

    }

    private static void resetThreadContext() {
        Thread.currentThread().setContextClassLoader(null);
    }

    public static Properties getProperties() {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        return properties;
    }


}

最后,在你的kc事件處理SPI里,調用咱們的KAFKA生產者去生產消息就行了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM