Let's Publish Keycloak Events to Kafka using SPI (plugins)
Stream Keycloak events to a Kafa topic using the Keycloak SPI’s
In this post, we will develop a plugin to stream Keycloak events to a Kafa topic using the Keycloak SPI’s. We will discuss the keycloak SPI implementation and integration with Kafka, also the configuration required.
What's Keycloak SPI?
Keycloak provides an extendable API out of the box (using JBOSS submodules) intended to cover most use-cases. But, at the same time, it offers large possibilities of customization. Implementing your provider, using Service Provider Interfaces (SPI), is a major one.
SPI Implementation
Implementing SPI requires its Provider Factory, Provider Interfaces, and a service configuration file.
First, let's create our java project and add a few dependencies.
Below the gradle.build:
plugins {
id 'java'
}
group 'com.yousseefben'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
ext {
keycloakVersion = '15.0.2'
}
repositories {
mavenCentral()
}
dependencies {
implementation group: 'org.keycloak', name: 'keycloak-server-spi', version: "${keycloakVersion}"
implementation group: 'org.keycloak', name: 'keycloak-server-spi-private', version: "${keycloakVersion}"
implementation group: 'org.keycloak', name: 'keycloak-services', version: "${keycloakVersion}"
}
In this case, we'll implement KeycloakCustomEventListenerFactory
which involves EventListenerProviderFactory
and KeycloakCustomEventListener
KeycloakCustomEventListenerFactory
:
public class KeycloakCustomEventListenerFactory implements EventListenerProviderFactory {
private KeycloakCustomEventListener keycloakCustomEventListener;
private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListenerFactory.class);
@Override
public EventListenerProvider create(KeycloakSession keycloakSession) {
}
@Override
public void init(Config.Scope scope) {
}
@Override
public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
}
@Override
public void close() {
}
@Override
public String getId() {
return "kafka-event";
}
KeycloakCustomEventListener
:
public class KeycloakCustomEventListener implements EventListenerProvider {
private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListener.class);
public KeycloakCustomEventListener(String topicKafka, Properties props) {
}
@Override
public void onEvent(Event event) {
}
@Override
public void onEvent(AdminEvent adminEvent, boolean b) {
}
@Override
public void close() {
}
}
Add Service Configuration file:
With these specific resources: META-INF/services/org.keycloak.events.EventListenerProviderFactory
com.yousseefben.KeycloakCustomEventListenerFactory
Add Kafka to your project:
Modify build.graddle file and add
kafka-clients
packages
...
dependencies {
...
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.0.0'
}
Edit
KeycloakCustomEventListenerFactory
:
public class KeycloakCustomEventListenerFactory implements EventListenerProviderFactory {
private KeycloakCustomEventListener keycloakCustomEventListener;
private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListenerFactory.class);
private String topicKafka;
private String bootstrapServers;
private boolean sslEnabled;
private String keystoreLocation;
private String keystorePassword;
private String trustSoreLocation;
private String trustSorePassword;
private boolean jaasEnabled;
private String jaasConfig;
private String saslMechanism;
private String securityProtocol;
private Properties properties;
@Override
public EventListenerProvider create(KeycloakSession keycloakSession) {
if (keycloakCustomEventListener == null) {
keycloakCustomEventListener = new KeycloakCustomEventListener(topicKafka, properties);
}
return keycloakCustomEventListener;
}
@Override
public void init(Config.Scope scope) {
log.info("Init kafka");
bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
keystoreLocation = System.getenv("KAFKA_SSL_KEYSTORE_LOCATION");
keystorePassword = System.getenv("KAFKA_SSL_KEYSTORE_PASSWORD");
trustSoreLocation = System.getenv("KAFKA_SSL_TRUSTSTORE_LOCATION");
trustSorePassword = System.getenv("KAFKA_SSL_TRUSTSTORE_PASSWORD");
jaasConfig = System.getenv("KAFKA_SASL_JAAS_CONFIG");
saslMechanism = System.getenv("KAFKA_DEFAULT_SASL_MECHANISM");
securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");
sslEnabled = "true".equalsIgnoreCase(System.getenv("KAFKA_SSL_ENABLED");
jaasEnabled = "true".equalsIgnoreCase(System.getenv("KAFKA_JAAS_ENABLED"));
topicKafka = System.getenv("KAFKA_TOPIC");
log.info("Kafka ssl enabled: " + sslEnabled);
if (topicKafka == null || topicKafka.isEmpty()) {
throw new NullPointerException("topic is required.");
}
if (bootstrapServers == null || bootstrapServers.isEmpty()) {
throw new NullPointerException("bootstrapServers are required");
}
if (sslEnabled && (keystoreLocation == null || keystorePassword == null || trustSoreLocation == null || trustSorePassword == null)) {
throw new NullPointerException("ssl params required");
} properties = getProperties();
}
@Override
public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
}
@Override
public void close() {
}
@Override
public String getId() {
return "kafka-event";
}
private Properties getProperties() {
Properties propsKafka = new Properties();
propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
if (sslEnabled) {
propsKafka.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation);
propsKafka.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword);
propsKafka.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustSoreLocation);
propsKafka.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustSorePassword);
}
if (jaasEnabled) {
propsKafka.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
propsKafka.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
propsKafka.put("security.protocol", securityProtocol);
}
return propsKafka;
}
}
Now, let's check the required properties for the init. And handle specifics like enabling ssl and jaas.
If you need further details about kafka-client configuration, please refer to the official documentation.
Edit
KeycloakCustomEventListener
:
public class KeycloakCustomEventListener implements EventListenerProvider {
private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListener.class);
private final CustomKafkaProducer customKafkaProducer;
private ObjectMapper mapper;
public KeycloakCustomEventListener(String topicKafka, Properties props) {
log.info("init custom event listener");
mapper = new ObjectMapper();
customKafkaProducer = new CustomKafkaProducer(topicKafka, props);
}
@Override
public void onEvent(Event event) {
log.info("Event: " + event.getType() + "userId: {}" + event.getUserId());
try {
customKafkaProducer.publishEvent(mapper.writeValueAsString(event));
} catch (JsonProcessingException e) {
log.error("error: " + e.getMessage());
}
}
@Override
public void onEvent(AdminEvent adminEvent, boolean b) {
log.info("Admin Event: " + adminEvent.getResourceType().name());
try {
customKafkaProducer.publishEvent(mapper.writeValueAsString(adminEvent));
} catch (JsonProcessingException e) {
log.error("error: " + e.getMessage());
}
}
@Override
public void close() {
}
}
Now, we instantiate our custom kafka producer CustomKafkaProducer
and publish a message publishEvent
.
public class CustomKafkaProducer {
private static final Logger log= Logger.getLogger(CustomKafkaProducer.class);
private final String topic;
private final KafkaProducer<String, String> producer;
public CustomKafkaProducer(String topic, Properties props) {
log.info("init producer");
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
this.topic = topic;
producer = new KafkaProducer<>(props);
}
public void publishEvent(String value) {
log.info("publish event");
ProducerRecord<String, String> eventRecord =
new ProducerRecord<>(topic, value);
producer.send(eventRecord);
}
}
SPI Deployment
💡 WARNING: Deploying an SPI jar with some issue/exception can cause failure with your Keycloak Server run. To avoid issues with your Keycloak instance, is recommended to work on any experimental server instead of registrating the SPI under events using keycloak UI.
To register provider implementations we can simply use the Keycloak deployer approach to handle several dependencies automatically for you. The hot deployment and the re-deployment are also supported.
Server
If you copy your provider jar to the Keycloak standalone/deployments/
directory, your provider will automatically be deployed. Hot deployment works too.
Docker
The simplest way to deploy the spi in a docker container is to create a custom docker image from the keycloak base image. The keycloak-kafka.jar
must be added to the /standalone/deployments
folder.
example to DockerFile
FROM quay.io/keycloak/keycloak:15.0.2
ADD ./keycloak-spi-1.0-SNAPSHOT.jar /opt/jboss/keycloak/standalone/deployments/
EXPOSE 8080
EXPOSE 8443
ENTRYPOINT [ "/opt/jboss/tools/docker-entrypoint.sh" ]
CMD ["-b", "0.0.0.0"]
Build the image:
docker build -t keycloak-kafka .
Example to run the image:
docker run -p 8080:8080 -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin -e KAFKA_TOPIC=MY_TOPIC keycloak-kafka
Enable the configuration into keylcoak
Conclusion
The entire code is available here: keycloak-kafka-spi
This is a basic example to capture events with Keycloak SPI. We can achieve event-driven architecture by publishing events to Kafka. We can do lots more. There are several SPI plugins that we can explore in keycloak.
---
If you like this blog, please like & share 🙂
Bye!