Skip to content

Commit

Permalink
Sync documentation of main branch
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed Jan 28, 2025
1 parent 9edc393 commit 13cde8c
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 79 deletions.
2 changes: 1 addition & 1 deletion _generated-doc/main/config/quarkus-all-config.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10359,7 +10359,7 @@ ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_TEST_ARG_LINE+++`
endif::add-copy-button-to-env-var[]
--
|list of string
|string
|

a|icon:lock[title=Fixed at build time] [[quarkus-core_quarkus-test-env-environment-variable-name]] [.property-path]##link:#quarkus-core_quarkus-test-env-environment-variable-name[`quarkus.test.env."environment-variable-name"`]##
Expand Down
2 changes: 1 addition & 1 deletion _generated-doc/main/config/quarkus-core_quarkus.test.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_TEST_ARG_LINE+++`
endif::add-copy-button-to-env-var[]
--
|list of string
|string
|

a|icon:lock[title=Fixed at build time] [[quarkus-core_quarkus-test-env-environment-variable-name]] [.property-path]##link:#quarkus-core_quarkus-test-env-environment-variable-name[`quarkus.test.env."environment-variable-name"`]##
Expand Down
124 changes: 103 additions & 21 deletions _versions/main/guides/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2956,9 +2956,7 @@ NOTE: If you use Hibernate Reactive, look at <<writing-entities-managed-by-hiber

Because we write to a database, we must run this method in a transaction.
Yet, sending the entity to Kafka happens asynchronously.
The operation returns a `CompletionStage` (or a `Uni` if you use a `MutinyEmitter`) reporting when the operation completes.
We must be sure that the transaction is still running until the object is written.
Otherwise, you may access the object outside the transaction, which is not allowed.
We can achieve this by using `.sendAndAwait()` or `.sendAndForget()` on the `MutinyEmitter`, or `.send().toCompletableFuture().join()` on the `Emitter`.

To implement this process, you need the following approach:

Expand All @@ -2973,26 +2971,29 @@ import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import io.smallrye.reactive.messaging.MutinyEmitter;
@Path("/")
public class ResourceSendingToKafka {
@Channel("kafka") Emitter<Fruit> emitter;
@Channel("kafka") MutinyEmitter<Fruit> emitter;
@POST
@Path("/fruits")
@Transactional // <1>
public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) { // <2>
@Transactional // <1>
public void storeAndSendToKafka(Fruit fruit) { // <2>
fruit.persist();
return emitter.send(new FruitDto(fruit)); // <3>
emitter.sendAndAwait(new FruitDto(fruit)); // <3>
}
}
----
<1> As we are writing to the database, make sure we run inside a transaction
<2> The method receives the fruit instance to persist. It returns a `CompletionStage` which is used for the transaction demarcation. The transaction is committed when the return `CompletionStage` completes. In our case, it's when the message is written to Kafka.
<2> The method receives the fruit instance to persist.
<3> Wrap the managed entity inside a Data transfer object and send it to Kafka.
This makes sure that managed entity is not impacted by the Kafka serialization.
Then await the completion of the operation before returning.

NOTE: You should not return a `CompletionStage` or `Uni` when using `@Transactional`, as all transaction commits will happen on a single thread, which impacts performance.

[[writing-entities-managed-by-hibernate-reactive-to-kafka]]
=== Writing entities managed by Hibernate Reactive to Kafka
Expand Down Expand Up @@ -3191,23 +3192,104 @@ public class FruitProducer {
@Consumes(MediaType.APPLICATION_JSON)
@Bulkhead(1)
public Uni<Void> post(Fruit fruit) {
Context context = Vertx.currentContext(); // <2>
return sf.withTransaction(session -> // <3>
kafkaTx.withTransaction(emitter -> // <4>
session.persist(fruit).invoke(() -> emitter.send(fruit)) // <5>
).emitOn(context::runOnContext) // <6>
);
return sf.withTransaction(session -> // <2>
kafkaTx.withTransaction(emitter -> // <3>
session.persist(fruit).invoke(() -> emitter.send(fruit)) // <4>
));
}
}
----

<1> Inject the Hibernate Reactive `SessionFactory`.
<2> Capture the caller Vert.x context.
<3> Begin a Hibernate Reactive transaction.
<4> Begin a Kafka transaction.
<5> Persist the payload and send the entity to Kafka.
<6> The Kafka transaction terminates on the Kafka producer sender thread.
We need to switch to the Vert.x context previously captured in order to terminate the Hibernate Reactive transaction on the same context we started it.
<2> Begin a Hibernate Reactive transaction.
<3> Begin a Kafka transaction.
<4> Persist the payload and send the entity to Kafka.

Alternatively, you can use the `@WithTransaction` annotation to start a transaction and commit it when the method returns:

[source, java]
----
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
@Path("/")
public class FruitProducer {
@Channel("kafka") KafkaTransactions<Fruit> kafkaTx;
@POST
@Path("/fruits")
@Consumes(MediaType.APPLICATION_JSON)
@Bulkhead(1)
@WithTransaction // <1>
public Uni<Void> post(Fruit fruit) {
return kafkaTx.withTransaction(emitter -> // <2>
fruit.persist().invoke(() -> emitter.send(fruit)) // <3>
);
}
}
----

<1> Start a Hibernate Reactive transaction and commit it when the method returns.
<2> Begin a Kafka transaction.
<3> Persist the payload and send the entity to Kafka.

[[chaining-kafka-transactions-with-hibernate-orm-transactions]]
=== Chaining Kafka Transactions with Hibernate ORM transactions

While `KafkaTransactions` provide a reactive API on top of Mutiny to manage Kafka transactions,
you can still chain Kafka transactions with blocking Hibernate ORM transactions.

[source, java]
----
import jakarta.transaction.Transactional;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.quarkus.logging.Log;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
@Path("/")
public class FruitProducer {
@Channel("kafka") KafkaTransactions<Pet> emitter;
@POST
@Path("/fruits")
@Consumes(MediaType.APPLICATION_JSON)
@Bulkhead(1)
@Transactional // <1>
public void post(Fruit fruit) {
emitter.withTransaction(e -> { // <2>
// if id is attributed by the database, will need to flush to get it
// fruit.persistAndFlush();
fruit.persist(); // <3>
Log.infov("Persisted fruit {0}", p);
e.send(p); // <4>
return Uni.createFrom().voidItem();
}).await().indefinitely(); // <5>
}
}
----

<1> Start a Hibernate ORM transaction. The transaction is committed when the method returns.
<2> Begin a Kafka transaction.
<3> Persist the payload.
<4> Send the entity to Kafka inside the Kafka transaction.
<5> Wait on the returned `Uni` for the Kafka transaction to complete.

== Logging

Expand Down
74 changes: 74 additions & 0 deletions _versions/main/guides/messaging.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,80 @@ public class MyProfileBean {
}
----

==== Pausable Channels

Injected `@Channel` streams are not subscribed to by default, so the flow of messages is controlled by the application code using reactive streams and Mutiny APIs.
But for `@Incoming` methods, the flow of messages is controlled by the runtime.

Pausable channels provide a mechanism to control message flow programmatically.
This is useful in scenarios where producers or consumers need to stop temporarily due to managing the lifecycle or performing maintenance operations.

To use pausable channels, you need to activate it with the configuration property `pausable` set to `true`.

[source, properties]
----
mp.messaging.incoming.my-channel.pausable=true
# optional, by default the channel is NOT paused initially
mp.messaging.outgoing.my-channel.initially-paused=true
----

If a channel is configured to be pausable, you can get the `PausableChannel` by channel name from the `ChannelRegistry` programmatically, and pause or resume the channel as needed:

[source, java]
----
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.PausableChannel;
@ApplicationScoped
public class PausableController {
@Inject
ChannelRegistry registry;
@PostConstruct
public void resume() {
// Wait for the application to be ready
// Retrieve the pausable channel
PausableChannel pausable = registry.getPausable("my-channel");
// Pause the processing of the messages
pausable.resume();
}
public void pause() {
// Retrieve the pausable channel
PausableChannel pausable = registry.getPausable("my-channel");
// Pause the processing of the messages
pausable.pause();
}
@Incoming("my-channel")
void process(String message) {
// Process the message
}
}
----

This feature is independent of connectors and can be in theory used with channels backed by any connector.
Note that pausing message consumption applies back-pressure on the underlying consumer which receives messages from the remote broker.

[NOTE]
====
Kafka consumers provide a similar feature to pause and resume the consumption of messages from topic-partitions.
The Quarkus Kafka connector allows xref:kafka.adoc#kafka-bare-clients[access to the underlying client] to pause/resume the consumption.
However, by default, with the `pause-if-no-requests=true` configuration,
the connector handles automatically the back-pressure,
by the pausing and resuming the Kafka consumer based on downstream requests.
It is therefore recommended to use pausable channels with the default `pause-if-no-requests=true` configuration.
====

==== Multiple Outgoings and `@Broadcast`

By default, messages transmitted in a channel are only dispatched to a single consumer.
Expand Down
Loading

0 comments on commit 13cde8c

Please sign in to comment.