Skip to content

Commit

Permalink
WIP fixing FlinkPscProducerMigrationOperatorTest
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 16, 2024
1 parent d781fec commit 88f402f
Showing 1 changed file with 70 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,13 @@ private void flush(FlinkPscProducer.PscTransactionState transaction) throws Flin
public void snapshotState(FunctionSnapshotContext context) throws Exception {
super.snapshotState(context);

PscMetricRegistryManager.getInstance().updateHistogramMetric(
null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_PENDING_TRANSACTIONS, pendingCommitTransactions.size(), pscConfigurationInternal
);
PscMetricRegistryManager.getInstance().updateHistogramMetric(
null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_STATE_SIZE, getSize(state), pscConfigurationInternal
);

nextTransactionalIdHintState.clear();
// To avoid duplication only first subtask keeps track of next transactional id hint.
// Otherwise all of the
Expand Down Expand Up @@ -1252,13 +1259,6 @@ private void supersSnapshotState(FunctionSnapshotContext context) throws Excepti
} catch (InvocationTargetException exception) {
throw (Exception) exception.getTargetException();
}

PscMetricRegistryManager.getInstance().updateHistogramMetric(
null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_PENDING_TRANSACTIONS, pendingCommitTransactions.size(), pscConfigurationInternal
);
PscMetricRegistryManager.getInstance().updateHistogramMetric(
null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_STATE_SIZE, getSize(state), pscConfigurationInternal
);
}

@Override
Expand All @@ -1272,28 +1272,45 @@ public void initializeState(FunctionInitializationContext context) throws Except
semantic = FlinkPscProducer.Semantic.NONE;
}

Set<String> registeredStateNames = context.getOperatorStateStore().getRegisteredStateNames();
if (registeredStateNames == null || registeredStateNames.isEmpty() ||
(registeredStateNames.equals(Collections.singleton("state")))) {
nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
String actualTransactionalIdPrefix;
if (this.transactionalIdPrefix != null) {
actualTransactionalIdPrefix = this.transactionalIdPrefix;
} else {
actualTransactionalIdPrefix = getActualTransactionalIdPrefix();
}
initializeTransactionState(actualTransactionalIdPrefix);
super.initializeState(context);
return;
}
nextTransactionalIdHintState =
context.getOperatorStateStore()
.getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);

// Set<String> registeredStateNames = context.getOperatorStateStore().getRegisteredStateNames();
// if (registeredStateNames == null || registeredStateNames.isEmpty() ||
// (registeredStateNames.equals(Collections.singleton("state")))) {
// nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
// String actualTransactionalIdPrefix;
// if (this.transactionalIdPrefix != null) {
// actualTransactionalIdPrefix = this.transactionalIdPrefix;
// } else {
// actualTransactionalIdPrefix = getActualTransactionalIdPrefix();
// }
// initializeTransactionState(actualTransactionalIdPrefix);
// super.initializeState(context);
// return;
// }

if (pscConfigurationInternal == null) {
pscConfigurationInternal = PscConfigurationUtils.propertiesToPscConfigurationInternal(producerConfig, PscConfiguration.PSC_CLIENT_TYPE_PRODUCER);
}

if (context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2.getName())) {
// Check if the state is a flink-psc state or a flink-kafka state
boolean containsKafkaNextTransactionalIdHintDescriptorV2 = context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>)
PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2"))
.getName());
boolean containsKafkaNextTransactionalIdHintDescriptor = context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>)
PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR"))
.getName());
boolean containsPscNextTransactionalIdHintDescriptorV2 = context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2.getName());

if (containsPscNextTransactionalIdHintDescriptorV2) {
// psc checkpoint
LOG.info("Detected a flink-psc checkpoint.");
try {
Expand All @@ -1320,42 +1337,24 @@ public void initializeState(FunctionInitializationContext context) throws Except
);
throw e;
}
} else if (context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>)
PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2"))
.getName()) ||
context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>)
PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR"))
.getName())) {
} else if (containsKafkaNextTransactionalIdHintDescriptor || containsKafkaNextTransactionalIdHintDescriptorV2) {
LOG.info("Detected a flink-kafka checkpoint.");
try {
ListState<FlinkKafkaProducer.NextTransactionalIdHint> nextKafkaTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2"));
ListState<FlinkKafkaProducer.NextTransactionalIdHint> oldNextTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR"));

ArrayList<FlinkKafkaProducer.NextTransactionalIdHint> oldTransactionalIdHints =
Lists.newArrayList(oldNextTransactionalIdHintState.get());
if (!oldTransactionalIdHints.isEmpty()) {
nextKafkaTransactionalIdHintState.addAll(oldTransactionalIdHints);
//clear old state
oldNextTransactionalIdHintState.clear();
if (containsKafkaNextTransactionalIdHintDescriptor) {
migrateNextTransactionalIdHintState(context);
}

PscMetricRegistryManager.getInstance().updateHistogramMetric(
null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_RECOVERY_KAFKA_TRANSACTIONS, getSize(nextKafkaTransactionalIdHintState), pscConfigurationInternal
);

nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
for (FlinkKafkaProducer.NextTransactionalIdHint nextKafkaTransactionalIdHint : nextKafkaTransactionalIdHintState.get()) {
nextTransactionalIdHintState.add(new NextTransactionalIdHint(
nextKafkaTransactionalIdHint.lastParallelism,
nextKafkaTransactionalIdHint.nextFreeTransactionalId
));
}
// PscMetricRegistryManager.getInstance().updateHistogramMetric(
// null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_RECOVERY_KAFKA_TRANSACTIONS, getSize(nextKafkaTransactionalIdHintState), pscConfigurationInternal
// );
//
// nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
// for (FlinkKafkaProducer.NextTransactionalIdHint nextKafkaTransactionalIdHint : nextKafkaTransactionalIdHintState.get()) {
// nextTransactionalIdHintState.add(new NextTransactionalIdHint(
// nextKafkaTransactionalIdHint.lastParallelism,
// nextKafkaTransactionalIdHint.nextFreeTransactionalId
// ));
// }
String actualTransactionalIdPrefix;
if (this.transactionalIdPrefix != null) {
actualTransactionalIdPrefix = this.transactionalIdPrefix;
Expand Down Expand Up @@ -1394,6 +1393,21 @@ null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_RECOVERY_PSC_TRANSACTI
}
}

private void migrateNextTransactionalIdHintState(FunctionInitializationContext context) throws Exception {
ListState<FlinkKafkaProducer.NextTransactionalIdHint> nextKafkaTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2"));
ListState<FlinkKafkaProducer.NextTransactionalIdHint> oldNextTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR"));

ArrayList<FlinkKafkaProducer.NextTransactionalIdHint> oldTransactionalIdHints =
Lists.newArrayList(oldNextTransactionalIdHintState.get());
if (!oldTransactionalIdHints.isEmpty()) {
nextKafkaTransactionalIdHintState.addAll(oldTransactionalIdHints);
//clear old state
oldNextTransactionalIdHintState.clear();
}
}

private void initializeTransactionState(String actualTransactionalIdPrefix) throws Exception {
transactionalIdsGenerator =
new TransactionalIdsGenerator(
Expand Down

0 comments on commit 88f402f

Please sign in to comment.