Pausing Kafka at run time with Quarkus
Pausing the consumption of Kafka messages can help perform maintenance, debug issues without processing new messages or prevent overwhelming downstream systems.
In a Quarkus app, that can be done by disabling a Reactive Messaging channel or by directly interacting with the Kafka client but both approaches have their limitations. In this post, I’ll show you a newer and better option which is available starting Quarkus 3.13: Pausable Channels from SmallRye Reactive Messaging.
Enabling pausable channels
Channels from SmallRye Reactive Messaging are not pausable by default. This can be changed from the Quarkus configuration:
mp.messaging.incoming.my-channel.pausable=true (1)
mp.messaging.incoming.my-channel.initially-paused=true (2)
1 | The channel named my-channel is pausable. |
2 | Pausable channels are not paused by default when the application starts.
To modify this behavior, set this configuration property to true . |
Creating a channel flow controller
This post will demonstrate two different ways to interact with the PausableChannel
API.
First, let’s create a CDI bean that will be injected in the code examples in the following sections.
import io.quarkus.logging.Log;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.PausableChannel;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class ChannelFlowController {
@Inject
ChannelRegistry channelRegistry;
public void pause(String channel) {
PausableChannel pausableChannel = getPausableChannel(channel);
if (!pausableChannel.isPaused()) {
pausableChannel.pause();
Log.infof("Paused channel: %s", channel);
}
}
public void resume(String channel) {
PausableChannel pausableChannel = getPausableChannel(channel);
if (pausableChannel.isPaused()) {
pausableChannel.resume();
Log.infof("Resumed channel: %s", channel);
}
}
private PausableChannel getPausableChannel(String channel) {
PausableChannel pausableChannel = channelRegistry.getPausable(channel);
if (pausableChannel == null) {
throw new IllegalArgumentException("Channel not found or not marked as pausable from the Quarkus configuration");
} else {
return pausableChannel;
}
}
}
Pausing a channel from a REST API
This approach is available as code in the gwenneg/blog-pausing-kafka-at-run-time repository. |
The PausableChannel
API can easily be exposed through a REST API which would typically be restricted to administrators of the application.
Make sure that any REST endpoints you introduce to expose the |
import jakarta.inject.Inject;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/channels")
public class ChannelResource { (1)
@Inject
ChannelFlowController channelFlowController;
@PUT
@Path("/pause")
public Response pause(String channel) {
try {
channelFlowController.pause(channel);
return Response.ok().build();
} catch (IllegalArgumentException e) {
return Response.status(Response.Status.NOT_FOUND)
.entity(e.getMessage())
.build(); (2)
}
}
@PUT
@Path("/resume")
public Response resume(String channel) {
try {
channelFlowController.resume(channel);
return Response.ok().build();
} catch (IllegalArgumentException e) {
return Response.status(Response.Status.NOT_FOUND)
.entity(e.getMessage())
.build(); (2)
}
}
}
1 | Make sure that the endpoints in this class are secured and accessible only to authorized users. |
2 | If the channel argument does not identify any existing channel, or if that channel exists but is not marked as pausable from the Quarkus configuration, an HTTP 404 error will be returned. |
Pausing a channel from Unleash
Sometimes, exposing a REST endpoint to interact with the PausableChannel
API is not an option.
Here’s an alternative based on Unleash and the quarkus-unleash extension.
It’s very similar to my other post Changing the Quarkus loggers level from Unleash which contains a lot more details than this post.
Passing the channel configuration from Unleash to Quarkus
In Unleash, each feature toggle can be associated with variants, either directly (deprecated) or through an activation strategy (recommended). We’ll use a variant with a JSON payload to pass data from Unleash to Quarkus and pause or resume a channel:

Deserializing the channel configuration
The variant payload needs to be deserialized before it can be used to pause or resume a channel. Here’s the data structure we’ll use for that:
public class KafkaChannelConfig {
public String hostName;
public String channel;
public Boolean paused;
}
Applying the channel configuration automatically
Now that the channel configuration can be modified from Unleash and passed Quarkus, how do we pause or resume the channel whenever its configuration is changed?
We’ll do that with the Subscriber API from Unleash and subscribe to the FeatureToggleResponse
event, which is emitted when the Unleash client fetches toggles from the server.
Using the Subscriber API with the quarkus-unleash extension is extremely simple.
UnleashSubscriber
needs to be implemented in a CDI bean and that’s it!
The extension will pass the bean to the Unleash client builder automatically.
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.getunleash.Unleash;
import io.getunleash.Variant;
import io.getunleash.event.UnleashSubscriber;
import io.getunleash.repository.FeatureToggleResponse;
import io.getunleash.variant.Payload;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Optional;
import static io.getunleash.repository.FeatureToggleResponse.Status.CHANGED;
import static java.lang.Boolean.TRUE;
@ApplicationScoped
public class KafkaChannelManager implements UnleashSubscriber {
private static final String UNLEASH_TOGGLE_NAME = "my-app.kafka-channels";
@ConfigProperty(name = "host-name", defaultValue = "localhost") (1)
String hostName;
@Inject
Unleash unleash;
@Inject
ObjectMapper objectMapper;
@Inject
ChannelFlowController channelFlowController;
@Override
public void togglesFetched(FeatureToggleResponse toggleResponse) { (2)
if (toggleResponse.getStatus() == CHANGED) { (3)
KafkaChannelConfig[] kafkaChannelConfigs = getKafkaChannelConfigs();
for (KafkaChannelConfig kafkaChannelConfig : kafkaChannelConfigs) {
try {
if (shouldThisHostBeUpdated(kafkaChannelConfig)) {
if (TRUE.equals(kafkaChannelConfig.paused)) {
channelFlowController.pause(kafkaChannelConfig.channel);
} else {
channelFlowController.resume(kafkaChannelConfig.channel);
}
}
} catch (Exception e) {
Log.error("Could not pause or resume a channel", e);
}
}
}
}
private KafkaChannelConfig[] getKafkaChannelConfigs() {
Variant variant = unleash.getVariant(UNLEASH_TOGGLE_NAME); (4)
if (variant.isEnabled()) { (5)
Optional<Payload> payload = variant.getPayload();
if (payload.isPresent() && payload.get().getType().equals("json") && payload.get().getValue() != null) {
try {
return objectMapper.readValue(payload.get().getValue(), KafkaChannelConfig[].class);
} catch (JsonProcessingException e) {
Log.error("Variant payload deserialization failed", e);
}
}
}
return new KafkaChannelConfig[0]; (6)
}
private boolean shouldThisHostBeUpdated(KafkaChannelConfig kafkaChannelConfig) {
if (kafkaChannelConfig.hostName == null) {
return true;
}
if (kafkaChannelConfig.hostName.endsWith("*")) { (7)
return hostName.startsWith(kafkaChannelConfig.hostName.substring(0, kafkaChannelConfig.hostName.length() - 1));
} else {
return hostName.equals(kafkaChannelConfig.hostName);
}
}
}
1 | If this code is run from Kubernetes or OpenShift, the generated pod name can be passed through the HOST_NAME environment variable and used here. |
2 | This method is invoked every time the Unleash client fetches toggles from the server. |
3 | We’ll try to pause or resume channels only if the toggles changed server-side. |
4 | Be careful about the argument passed to Unleash#getVariant : it has to be the toggle name, not the variant name. |
5 | variant.isEnabled() will return false if the toggle is disabled in Unleash or if no variants are associated with the toggle directly or through its activation strategy. |
6 | If the method is unable to find a variant payload or if it fails to deserialize that payload for any reasons, an empty KafkaChannelConfig array will be returned. |
7 | This block is used to filter hosts based on a host name prefix. If you need finer filtering, replacing the current wild card approach with a regular expression could be a good option. |
Here’s an example of variant payload that could be consumed by KafkaChannelManager
:
[
{
"hostName": "amazing-service-7dbbcb4cc-9d9hl",
"channel": "orders",
"paused": true
},
{
"hostName": "awesome-app*",
"channel": "deliveries",
"paused": false
},
{
"channel": "events",
"paused": true
}
]
In that payload:
-
the first entry will pause the
orders
channel of a specific host:amazing-service-7dbbcb4cc-9d9hl
-
the second entry will resume the
deliveries
channel of all hosts whose name starts withawesome-app
-
the third entry will pause the
events
channel of all hosts regardless of their names
A temporary limitation of the PausableChannel API
The current version of the PausableChannel
API doesn’t handle messages that were already requested before a channel is paused.
As a result, your app might still process a few messages after initiating a pause, before the channel fully stops.
The SmallRye Reactive Messaging team is actively working on an enhancement to address this issue in the near future.
Thanks for reading this post. Happy pausing!
Leave a comment