Let's Publish Keycloak Events to Kafka using SPI (plugins)

Stream Keycloak events to a Kafa topic using the Keycloak SPI’s

Components and Props

In this post, we will develop a plugin to stream Keycloak events to a Kafa topic using the Keycloak SPI’s. We will discuss the keycloak SPI implementation and integration with Kafka, also the configuration required.

What's Keycloak SPI?

Keycloak provides an extendable API out of the box (using JBOSS submodules) intended to cover most use-cases. But, at the same time, it offers large possibilities of customization. Implementing your provider, using Service Provider Interfaces (SPI), is a major one.

SPI Implementation

Implementing SPI requires its Provider Factory, Provider Interfaces, and a service configuration file.

First, let's create our java project and add a few dependencies.

Below the gradle.build:

xml
plugins {
    id 'java'
}

group 'com.yousseefben'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8
ext {
    keycloakVersion = '15.0.2'
}
repositories {
    mavenCentral()
}

dependencies {
    implementation group: 'org.keycloak', name: 'keycloak-server-spi', version: "${keycloakVersion}"
    implementation group: 'org.keycloak', name: 'keycloak-server-spi-private', version: "${keycloakVersion}"
    implementation group: 'org.keycloak', name: 'keycloak-services', version: "${keycloakVersion}"
}

In this case, we'll implement KeycloakCustomEventListenerFactory which involves EventListenerProviderFactory and KeycloakCustomEventListener

KeycloakCustomEventListenerFactory:

java
public class KeycloakCustomEventListenerFactory implements EventListenerProviderFactory {
    private KeycloakCustomEventListener keycloakCustomEventListener;

    private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListenerFactory.class);

    @Override
    public EventListenerProvider create(KeycloakSession keycloakSession) {

    }

    @Override
    public void init(Config.Scope scope) {
    }

    @Override
    public void postInit(KeycloakSessionFactory keycloakSessionFactory) {

    }

    @Override
    public void close() {

    }

    @Override
    public String getId() {
        return "kafka-event";

}

KeycloakCustomEventListener:

java
public class KeycloakCustomEventListener implements EventListenerProvider {

    private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListener.class);

    public KeycloakCustomEventListener(String topicKafka, Properties props) {

    }

    @Override
    public void onEvent(Event event) {

    }

    @Override
    public void onEvent(AdminEvent adminEvent, boolean b) {

    }

    @Override
    public void close() {

    }
}

Add Service Configuration file:

With these specific resources: META-INF/services/org.keycloak.events.EventListenerProviderFactory

com.yousseefben.KeycloakCustomEventListenerFactory

Add Kafka to your project:

  • Modify build.graddle file and add kafka-clients packages

xml
...
dependencies {
		...
    implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.0.0'
}
  • Edit KeycloakCustomEventListenerFactory :

java
public class KeycloakCustomEventListenerFactory implements EventListenerProviderFactory {
    private KeycloakCustomEventListener keycloakCustomEventListener;

    private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListenerFactory.class);

    private String topicKafka;
    private String bootstrapServers;
    private boolean sslEnabled;
    private String keystoreLocation;
    private String keystorePassword;
    private String trustSoreLocation;
    private String trustSorePassword;
    private boolean jaasEnabled;
    private String jaasConfig;
    private String saslMechanism;
    private String securityProtocol;
    private Properties properties;

    @Override
    public EventListenerProvider create(KeycloakSession keycloakSession) {
        if (keycloakCustomEventListener == null) {
            keycloakCustomEventListener = new KeycloakCustomEventListener(topicKafka, properties);
        }
        return keycloakCustomEventListener;
    }

    @Override
    public void init(Config.Scope scope) {
				log.info("Init kafka");

        bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
        keystoreLocation = System.getenv("KAFKA_SSL_KEYSTORE_LOCATION");
        keystorePassword = System.getenv("KAFKA_SSL_KEYSTORE_PASSWORD");
        trustSoreLocation = System.getenv("KAFKA_SSL_TRUSTSTORE_LOCATION");
        trustSorePassword = System.getenv("KAFKA_SSL_TRUSTSTORE_PASSWORD");
        jaasConfig = System.getenv("KAFKA_SASL_JAAS_CONFIG");
        saslMechanism = System.getenv("KAFKA_DEFAULT_SASL_MECHANISM");
        securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");
        sslEnabled = "true".equalsIgnoreCase(System.getenv("KAFKA_SSL_ENABLED");
        jaasEnabled = "true".equalsIgnoreCase(System.getenv("KAFKA_JAAS_ENABLED"));

        topicKafka = System.getenv("KAFKA_TOPIC");

				log.info("Kafka ssl enabled: " + sslEnabled);

if (topicKafka == null || topicKafka.isEmpty()) {
            throw new NullPointerException("topic is required.");
        }
        if (bootstrapServers == null || bootstrapServers.isEmpty()) {
            throw new NullPointerException("bootstrapServers are required");
        }
        if (sslEnabled && (keystoreLocation == null || keystorePassword == null || trustSoreLocation == null || trustSorePassword == null)) {
            throw new NullPointerException("ssl params required");
        }        properties = getProperties();

    }

    @Override
    public void postInit(KeycloakSessionFactory keycloakSessionFactory) {

    }

    @Override
    public void close() {

    }

    @Override
    public String getId() {
        return "kafka-event";
    }

    private Properties getProperties() {
        Properties propsKafka = new Properties();
        propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        if (sslEnabled) {
            propsKafka.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation);
            propsKafka.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword);
            propsKafka.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustSoreLocation);
            propsKafka.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustSorePassword);
        }
        if (jaasEnabled) {
            propsKafka.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
            propsKafka.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
            propsKafka.put("security.protocol", securityProtocol);

        }
        return propsKafka;
    }
}

Now, let's check the required properties for the init. And handle specifics like enabling ssl and jaas.

If you need further details about kafka-client configuration, please refer to the official documentation.

  • Edit KeycloakCustomEventListener :

java
public class KeycloakCustomEventListener implements EventListenerProvider {

    private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListener.class);

    private final CustomKafkaProducer customKafkaProducer;
    private ObjectMapper mapper;

    public KeycloakCustomEventListener(String topicKafka, Properties props) {
log.info("init custom event listener");
        mapper = new ObjectMapper();
        customKafkaProducer = new CustomKafkaProducer(topicKafka, props);
    }

    @Override
    public void onEvent(Event event) {
log.info("Event: " + event.getType() + "userId: {}" + event.getUserId());
        try {
            customKafkaProducer.publishEvent(mapper.writeValueAsString(event));
        } catch (JsonProcessingException e) {
log.error("error: " + e.getMessage());
        }

    }

    @Override
    public void onEvent(AdminEvent adminEvent, boolean b) {
log.info("Admin Event: " + adminEvent.getResourceType().name());
        try {
            customKafkaProducer.publishEvent(mapper.writeValueAsString(adminEvent));
        } catch (JsonProcessingException e) {
log.error("error: " + e.getMessage());
        }
    }

    @Override
    public void close() {

    }
}

Now, we instantiate our custom kafka producer CustomKafkaProducer and publish a message publishEvent.

java
public class CustomKafkaProducer {

    private static final Logger log= Logger.getLogger(CustomKafkaProducer.class);

    private final String topic;
    private final KafkaProducer<String, String> producer;

    public CustomKafkaProducer(String topic, Properties props) {
				log.info("init producer");
        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
        this.topic = topic;
        producer = new KafkaProducer<>(props);
    }

    public void publishEvent(String value) {
				log.info("publish event");
        ProducerRecord<String, String> eventRecord =
                new ProducerRecord<>(topic, value);
        producer.send(eventRecord);
    }
}

SPI Deployment

💡 WARNING: Deploying an SPI jar with some issue/exception can cause failure with your Keycloak Server run. To avoid issues with your Keycloak instance, is recommended to work on any experimental server instead of registrating the SPI under events using keycloak UI.

To register provider implementations we can simply use the Keycloak deployer approach to handle several dependencies automatically for you. The hot deployment and the re-deployment are also supported.

Server

If you copy your provider jar to the Keycloak standalone/deployments/ directory, your provider will automatically be deployed. Hot deployment works too.

Docker

The simplest way to deploy the spi in a docker container is to create a custom docker image from the keycloak base image. The keycloak-kafka.jar must be added to the /standalone/deployments folder.

example to DockerFile

docker
FROM quay.io/keycloak/keycloak:15.0.2
ADD ./keycloak-spi-1.0-SNAPSHOT.jar /opt/jboss/keycloak/standalone/deployments/

EXPOSE 8080
EXPOSE 8443

ENTRYPOINT [ "/opt/jboss/tools/docker-entrypoint.sh" ]

CMD ["-b", "0.0.0.0"]

Build the image:

docker build -t keycloak-kafka .

Example to run the image:

docker run -p 8080:8080 -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin -e KAFKA_TOPIC=MY_TOPIC keycloak-kafka

Enable the configuration into keylcoak

Config keycloak kafka

Conclusion


The entire code is available here: keycloak-kafka-spi

This is a basic example to capture events with Keycloak SPI. We can achieve event-driven architecture by publishing events to Kafka. We can do lots more. There are several SPI plugins that we can explore in keycloak.

---

If you like this blog, please like & share 🙂

Bye!