Consumer Lib

A library that simplifies consuming (Kafka) events through a fluent API by abstracting common problems that arise when processing events. This includes, among other things, event deserialization, matching events to schemas, duplicate detection, error handling, transaction and retry management, and much more.

Artifacts Links
Repository https://gitlab.com/microservice-dungeon/services/consumer-lib
Registry https://gitlab.com/microservice-dungeon/services/consumer-lib/-/packages
Changelog https://gitlab.com/microservice-dungeon/services/consumer-lib/-/blob/master/CHANGELOG.md

Table of contents:


Installation

To install the Consumer Library in your service, simply add both of the following snippets to your pom.xml file. The first one registers the Gitlab package registry the library is published in. The second declares the library as a dependency.

    <repositories>
        <repository>
            <id>msd-gitlab-consumer-lib</id>
            <url>https://gitlab.com/api/v4/projects/77732140/packages/maven</url>
        </repository>
    </repositories>
    <dependency>
        <groupId>de.microservice-dungeon</groupId>
        <artifactId>consumer-lib</artifactId>
        <version>0.3.0</version>
    </dependency>

Keep in mind, that the version shown here might be outdated.


Getting Started

Register your Events

The first step in using the library is to create a handler for each external event.

To do this, an EventSpecification is implemented with a concrete class. Each class represents exactly one event to be processed. The mapping between an EventSpecification and an external event is resolved during processing by matching the EventType.

The EventSpecification expects a concrete type as its generic parameter. For the Microservice Dungeon, there are two available presets: KafkaEventContext<KafkaDefaultHeader, byte[]> for consuming Core-Service events, and KafkaEventContext<KafkaIntentHeader, byte[]> for consuming intents. Alternatively, custom types can be defined. However, it must be noted that all classes used in the remainder of this documentation are based on the mentioned presets.

Example

@Component
@RequiredArgsConstructor
public class GameStateTransferEventV1 implements EventSpecification<KafkaEventContext<KafkaDefaultHeaders, byte[]>> {

    public static final String SCHEMA = "game.state-transfer";
    public static final int SCHEMA_VERSION = 1;

    @Override
    public EventType getEventType() {
        return EventType.build(SCHEMA, SCHEMA_VERSION);
    }

    @Override
    public void process(KafkaEventContext<KafkaDefaultHeaders, byte[]> context) {
        try {
            Payload payload = objectMapper.readValue(context.value(), Payload.class);
            // ...
        } catch (Exception e) {
            // ...
        }
    }

    @Override
    public Optional<BiConsumer<KafkaEventContext<KafkaDefaultHeader, byte[]>, Throwable>> onError() {
        return Optional.empty();
    }
    
    // Payload ...
}

Configure your Listener

The next (and final) step is to create a ProcessorChain for each (Kafka-)listener It can either be instantiated directly or via its ProcessorChain.Builder, which applies initial defaults and provides a fluent API.

Alternatively, the KafkaProcessorChainFactory provided for the Microservice Dungeon with Kafka can be used. It preconfigures the ProcessorChain.Builder with the appropriate types for processing Kafka events that conform to the Microservice Dungeon event schema. This is the recommended approach for Core-Services and Players. The KafkaProcessorChainFactory is an autoconfigured bean and can be injected as usual via the dependency injection context.

Example

@Component
@RequiredArgsConstructor
public class GameStateEventListener {

    private final KafkaProcessorChainFactory processorFactory;
    private final GameStateTransferEventV1 ecstEvent;

    private ProcessorChain<KafkaEventContext<KafkaDefaultHeaders, byte[]>, ConsumerRecord<String, byte[]>> chain;

    @PostConstruct
    public void init(){
        chain = processorFactory.defaultBuilder()
                .register(ecstEvent)
                .build();
    }

    @Transactional
    @KafkaListener(topics = "db.game.ecst.v1")
    public void onECST(ConsumerRecord<String, byte[]> record) {
        chain.process(record);
    }
}

Deep Dive

The following section takes a closer look at some implementation details.

Transaction Management

In the current version, the processor chain does not provide its own transaction context, nor does it require one. This will change with future releases.

Error Handling

Likewise, the current version does not offer any advanced error handling. It is therefore the responsibility of the consumer to handle exceptions.

Last modified February 3, 2026: Add consumer-lib readme (5baa576)