Consumer Lib
A library that simplifies consuming (Kafka) events through a fluent API by abstracting common problems that arise when processing events. This includes, among other things, event deserialization, matching events to schemas, duplicate detection, error handling, transaction and retry management, and much more.
Table of contents:
Installation
To install the Consumer Library in your service, simply add both of the following snippets to your pom.xml file.
The first one registers the Gitlab package registry the library is published in.
The second declares the library as a dependency.
<repositories>
<repository>
<id>msd-gitlab-consumer-lib</id>
<url>https://gitlab.com/api/v4/projects/77732140/packages/maven</url>
</repository>
</repositories>
<dependency>
<groupId>de.microservice-dungeon</groupId>
<artifactId>consumer-lib</artifactId>
<version>0.3.0</version>
</dependency>
Keep in mind, that the version shown here might be outdated.
Getting Started
Register your Events
The first step in using the library is to create a handler for each external event.
To do this, an EventSpecification is implemented with a concrete class.
Each class represents exactly one event to be processed.
The mapping between an EventSpecification and an external event is resolved during processing by matching
the EventType.
The EventSpecification expects a concrete type as its generic parameter.
For the Microservice Dungeon, there are two available presets:
KafkaEventContext<KafkaDefaultHeader, byte[]> for consuming Core-Service events, and
KafkaEventContext<KafkaIntentHeader, byte[]> for consuming intents.
Alternatively, custom types can be defined.
However, it must be noted that all classes used in the remainder of this documentation
are based on the mentioned presets.
Example
@Component
@RequiredArgsConstructor
public class GameStateTransferEventV1 implements EventSpecification<KafkaEventContext<KafkaDefaultHeaders, byte[]>> {
public static final String SCHEMA = "game.state-transfer";
public static final int SCHEMA_VERSION = 1;
@Override
public EventType getEventType() {
return EventType.build(SCHEMA, SCHEMA_VERSION);
}
@Override
public void process(KafkaEventContext<KafkaDefaultHeaders, byte[]> context) {
try {
Payload payload = objectMapper.readValue(context.value(), Payload.class);
// ...
} catch (Exception e) {
// ...
}
}
@Override
public Optional<BiConsumer<KafkaEventContext<KafkaDefaultHeader, byte[]>, Throwable>> onError() {
return Optional.empty();
}
// Payload ...
}
Configure your Listener
The next (and final) step is to create a ProcessorChain for each (Kafka-)listener
It can either be instantiated directly or via its ProcessorChain.Builder, which applies initial defaults and
provides a fluent API.
Alternatively, the KafkaProcessorChainFactory provided for the Microservice Dungeon with Kafka can be used.
It preconfigures the ProcessorChain.Builder with the appropriate types for processing Kafka events
that conform to the Microservice Dungeon event schema.
This is the recommended approach for Core-Services and Players.
The KafkaProcessorChainFactory is an autoconfigured bean and can be injected as usual via the dependency
injection context.
Example
@Component
@RequiredArgsConstructor
public class GameStateEventListener {
private final KafkaProcessorChainFactory processorFactory;
private final GameStateTransferEventV1 ecstEvent;
private ProcessorChain<KafkaEventContext<KafkaDefaultHeaders, byte[]>, ConsumerRecord<String, byte[]>> chain;
@PostConstruct
public void init(){
chain = processorFactory.defaultBuilder()
.register(ecstEvent)
.build();
}
@Transactional
@KafkaListener(topics = "db.game.ecst.v1")
public void onECST(ConsumerRecord<String, byte[]> record) {
chain.process(record);
}
}
Deep Dive
The following section takes a closer look at some implementation details.
Transaction Management
In the current version, the processor chain does not provide its own transaction context, nor does it require one. This will change with future releases.
Error Handling
Likewise, the current version does not offer any advanced error handling. It is therefore the responsibility of the consumer to handle exceptions.