Skip to content

Commit

Permalink
[LI-CHERRY-PICK] [012880d] KAFKA-8052; Ensure fetch session epoch is …
Browse files Browse the repository at this point in the history
…updated before new request (apache#6582) (#31)

TICKET = KAFKA-8052
LI_DESCRIPTION =
This will remove intermittent INVALID_FETCH_SESSION_EPOCH errors on fetch requests

EXIT_CRITERIA = HASH [012880d]
ORIGINAL_DESCRIPTION =

Reviewers: Jason Gustafson <[email protected]>, Colin Patrick McCabe <[email protected]>, Andrew Olson <[email protected]>, José Armando García Sancio <[email protected]
(cherry picked from commit 012880d)
  • Loading branch information
jonlee2 authored Jul 22, 2019
1 parent c3b03ea commit 03532f2
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.helpers.MessageFormatter;

import java.io.Closeable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -111,6 +112,10 @@
* the caller.</li>
* <li>Responses that collate partial responses from multiple brokers (e.g. to list offsets) are
* synchronized on the response future.</li>
* <li>At most one request is pending for each node at any time. Nodes with pending requests are
* tracked and updated after processing the response. This ensures that any state (e.g. epoch)
* updated while processing responses on one thread are visible while creating the subsequent request
* on a different thread.</li>
* </ul>
*/
public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
Expand All @@ -137,6 +142,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private final IsolationLevel isolationLevel;
private final Map<Integer, FetchSessionHandler> sessionHandlers;
private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
private final Set<Integer> nodesWithPendingFetchRequests;

private PartitionRecords nextInLineRecords = null;

Expand Down Expand Up @@ -185,6 +192,7 @@ public Fetcher(LogContext logContext,
this.requestTimeoutMs = requestTimeoutMs;
this.isolationLevel = isolationLevel;
this.sessionHandlers = new HashMap<>();
this.nodesWithPendingFetchRequests = new HashSet<>();

// HOTFIX
this.pausedNextInLineRecordsPerTopicPartition = new HashMap<>();
Expand Down Expand Up @@ -266,45 +274,73 @@ public synchronized int sendFetches() {
@Override
public void onSuccess(ClientResponse resp) {
synchronized (Fetcher.this) {
FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler == null) {
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());
return;
try {
@SuppressWarnings("unchecked")
FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler == null) {
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());
return;
}
if (!handler.handleResponse(response)) {
return;
}

Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
if (requestData == null) {
String message;
if (data.metadata().isFull()) {
message = MessageFormatter.arrayFormat(
"Response for missing full request partition: partition={}; metadata={}",
new Object[]{partition, data.metadata()}).getMessage();
} else {
message = MessageFormatter.arrayFormat(
"Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}",
new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage();
}

// Received fetch response for missing session partition
throw new IllegalStateException(message);
} else {
long fetchOffset = requestData.fetchOffset;
FetchResponse.PartitionData<Records> fetchData = entry.getValue();

log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
}
}

sensors.fetchLatency.record(resp.requestLatencyMs());
} finally {
nodesWithPendingFetchRequests.remove(fetchTarget.id());
}
if (!handler.handleResponse(response)) {
return;
}

Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
FetchResponse.PartitionData fetchData = entry.getValue();

log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
}

sensors.fetchLatency.record(resp.requestLatencyMs());
}
}

@Override
public void onFailure(RuntimeException e) {
synchronized (Fetcher.this) {
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler != null) {
handler.handleError(e);
try {
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler != null) {
handler.handleError(e);
}
} finally {
nodesWithPendingFetchRequests.remove(fetchTarget.id());
}
}
}
});

this.nodesWithPendingFetchRequests.add(entry.getKey().id());
}
return fetchRequestMap.size();
}
Expand Down Expand Up @@ -947,8 +983,9 @@ private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
// If we try to send during the reconnect blackout window, then the request is just
// going to be failed anyway before being sent, so skip the send for now
log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
} else if (client.hasPendingRequests(node)) {
log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);

} else if (this.nodesWithPendingFetchRequests.contains(node.id())) {
log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);
} else {
// if there is a leader and no in-flight requests, issue a new fetch
FetchSessionHandler.Builder builder = fetchable.get(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2587,6 +2587,60 @@ private void verifySessionPartitions() {
assertEquals(0, future.get());
}

@Test
public void testFetcherSessionEpochUpdate() throws Exception {
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);

subscriptions.assignFromUser(Collections.singleton(tp0));
subscriptions.seek(tp0, 0L);

AtomicInteger fetchesRemaining = new AtomicInteger(1000);
executorService = Executors.newSingleThreadExecutor();
Future<?> future = executorService.submit(() -> {
long nextOffset = 0;
long nextEpoch = 0;
while (fetchesRemaining.get() > 0) {
synchronized (consumerClient) {
if (!client.requests().isEmpty()) {
ClientRequest request = client.requests().peek();
FetchRequest fetchRequest = (FetchRequest) request.requestBuilder().build();
int epoch = fetchRequest.metadata().epoch();
assertTrue(String.format("Unexpected epoch expected %d got %d", nextEpoch, epoch), epoch == 0 || epoch == nextEpoch);
nextEpoch++;
LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseMap = new LinkedHashMap<>();
responseMap.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, nextOffset + 2L, nextOffset + 2,
0L, null, buildRecords(nextOffset, 2, nextOffset)));
nextOffset += 2;
client.respondToRequest(request, new FetchResponse<>(Errors.NONE, responseMap, 0, 123));
consumerClient.poll(time.timer(0));
}
}
}
return fetchesRemaining.get();
});
long nextFetchOffset = 0;
while (fetchesRemaining.get() > 0 && !future.isDone()) {
if (fetcher.sendFetches() == 1) {
synchronized (consumerClient) {
consumerClient.poll(time.timer(0));
}
}
if (fetcher.hasCompletedFetches()) {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
if (!fetchedRecords.isEmpty()) {
fetchesRemaining.decrementAndGet();
List<ConsumerRecord<byte[], byte[]>> records = fetchedRecords.get(tp0);
assertEquals(2, records.size());
assertEquals(nextFetchOffset, records.get(0).offset());
assertEquals(nextFetchOffset + 1, records.get(1).offset());
nextFetchOffset += 2;
}
assertTrue(fetcher.fetchedRecords().isEmpty());
}
}
assertEquals(0, future.get());
}

private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset);
for (int i = 0; i < count; i++)
Expand Down

0 comments on commit 03532f2

Please sign in to comment.