Skip to content

Commit

Permalink
Revert "WIP fixing FlinkPscProducerMigrationOperatorTest"
Browse files Browse the repository at this point in the history
This reverts commit 88f402f.
  • Loading branch information
jeffxiang committed Oct 16, 2024
1 parent 88f402f commit fb57a25
Showing 1 changed file with 56 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1160,13 +1160,6 @@ private void flush(FlinkPscProducer.PscTransactionState transaction) throws Flin
public void snapshotState(FunctionSnapshotContext context) throws Exception {
super.snapshotState(context);

PscMetricRegistryManager.getInstance().updateHistogramMetric(
null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_PENDING_TRANSACTIONS, pendingCommitTransactions.size(), pscConfigurationInternal
);
PscMetricRegistryManager.getInstance().updateHistogramMetric(
null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_STATE_SIZE, getSize(state), pscConfigurationInternal
);

nextTransactionalIdHintState.clear();
// To avoid duplication only first subtask keeps track of next transactional id hint.
// Otherwise all of the
Expand Down Expand Up @@ -1259,6 +1252,13 @@ private void supersSnapshotState(FunctionSnapshotContext context) throws Excepti
} catch (InvocationTargetException exception) {
throw (Exception) exception.getTargetException();
}

PscMetricRegistryManager.getInstance().updateHistogramMetric(
null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_PENDING_TRANSACTIONS, pendingCommitTransactions.size(), pscConfigurationInternal
);
PscMetricRegistryManager.getInstance().updateHistogramMetric(
null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_STATE_SIZE, getSize(state), pscConfigurationInternal
);
}

@Override
Expand All @@ -1272,45 +1272,28 @@ public void initializeState(FunctionInitializationContext context) throws Except
semantic = FlinkPscProducer.Semantic.NONE;
}

nextTransactionalIdHintState =
context.getOperatorStateStore()
.getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);

// Set<String> registeredStateNames = context.getOperatorStateStore().getRegisteredStateNames();
// if (registeredStateNames == null || registeredStateNames.isEmpty() ||
// (registeredStateNames.equals(Collections.singleton("state")))) {
// nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
// String actualTransactionalIdPrefix;
// if (this.transactionalIdPrefix != null) {
// actualTransactionalIdPrefix = this.transactionalIdPrefix;
// } else {
// actualTransactionalIdPrefix = getActualTransactionalIdPrefix();
// }
// initializeTransactionState(actualTransactionalIdPrefix);
// super.initializeState(context);
// return;
// }
Set<String> registeredStateNames = context.getOperatorStateStore().getRegisteredStateNames();
if (registeredStateNames == null || registeredStateNames.isEmpty() ||
(registeredStateNames.equals(Collections.singleton("state")))) {
nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
String actualTransactionalIdPrefix;
if (this.transactionalIdPrefix != null) {
actualTransactionalIdPrefix = this.transactionalIdPrefix;
} else {
actualTransactionalIdPrefix = getActualTransactionalIdPrefix();
}
initializeTransactionState(actualTransactionalIdPrefix);
super.initializeState(context);
return;
}

if (pscConfigurationInternal == null) {
pscConfigurationInternal = PscConfigurationUtils.propertiesToPscConfigurationInternal(producerConfig, PscConfiguration.PSC_CLIENT_TYPE_PRODUCER);
}

// Check if the state is a flink-psc state or a flink-kafka state
boolean containsKafkaNextTransactionalIdHintDescriptorV2 = context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>)
PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2"))
.getName());
boolean containsKafkaNextTransactionalIdHintDescriptor = context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>)
PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR"))
.getName());
boolean containsPscNextTransactionalIdHintDescriptorV2 = context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2.getName());

if (containsPscNextTransactionalIdHintDescriptorV2) {
if (context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2.getName())) {
// psc checkpoint
LOG.info("Detected a flink-psc checkpoint.");
try {
Expand All @@ -1337,24 +1320,42 @@ public void initializeState(FunctionInitializationContext context) throws Except
);
throw e;
}
} else if (containsKafkaNextTransactionalIdHintDescriptor || containsKafkaNextTransactionalIdHintDescriptorV2) {
} else if (context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>)
PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2"))
.getName()) ||
context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>)
PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR"))
.getName())) {
LOG.info("Detected a flink-kafka checkpoint.");
try {
if (containsKafkaNextTransactionalIdHintDescriptor) {
migrateNextTransactionalIdHintState(context);
ListState<FlinkKafkaProducer.NextTransactionalIdHint> nextKafkaTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2"));
ListState<FlinkKafkaProducer.NextTransactionalIdHint> oldNextTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR"));

ArrayList<FlinkKafkaProducer.NextTransactionalIdHint> oldTransactionalIdHints =
Lists.newArrayList(oldNextTransactionalIdHintState.get());
if (!oldTransactionalIdHints.isEmpty()) {
nextKafkaTransactionalIdHintState.addAll(oldTransactionalIdHints);
//clear old state
oldNextTransactionalIdHintState.clear();
}

// PscMetricRegistryManager.getInstance().updateHistogramMetric(
// null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_RECOVERY_KAFKA_TRANSACTIONS, getSize(nextKafkaTransactionalIdHintState), pscConfigurationInternal
// );
//
// nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
// for (FlinkKafkaProducer.NextTransactionalIdHint nextKafkaTransactionalIdHint : nextKafkaTransactionalIdHintState.get()) {
// nextTransactionalIdHintState.add(new NextTransactionalIdHint(
// nextKafkaTransactionalIdHint.lastParallelism,
// nextKafkaTransactionalIdHint.nextFreeTransactionalId
// ));
// }
PscMetricRegistryManager.getInstance().updateHistogramMetric(
null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_RECOVERY_KAFKA_TRANSACTIONS, getSize(nextKafkaTransactionalIdHintState), pscConfigurationInternal
);

nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
for (FlinkKafkaProducer.NextTransactionalIdHint nextKafkaTransactionalIdHint : nextKafkaTransactionalIdHintState.get()) {
nextTransactionalIdHintState.add(new NextTransactionalIdHint(
nextKafkaTransactionalIdHint.lastParallelism,
nextKafkaTransactionalIdHint.nextFreeTransactionalId
));
}
String actualTransactionalIdPrefix;
if (this.transactionalIdPrefix != null) {
actualTransactionalIdPrefix = this.transactionalIdPrefix;
Expand Down Expand Up @@ -1393,21 +1394,6 @@ null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_RECOVERY_PSC_TRANSACTI
}
}

private void migrateNextTransactionalIdHintState(FunctionInitializationContext context) throws Exception {
ListState<FlinkKafkaProducer.NextTransactionalIdHint> nextKafkaTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2"));
ListState<FlinkKafkaProducer.NextTransactionalIdHint> oldNextTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState((ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR"));

ArrayList<FlinkKafkaProducer.NextTransactionalIdHint> oldTransactionalIdHints =
Lists.newArrayList(oldNextTransactionalIdHintState.get());
if (!oldTransactionalIdHints.isEmpty()) {
nextKafkaTransactionalIdHintState.addAll(oldTransactionalIdHints);
//clear old state
oldNextTransactionalIdHintState.clear();
}
}

private void initializeTransactionState(String actualTransactionalIdPrefix) throws Exception {
transactionalIdsGenerator =
new TransactionalIdsGenerator(
Expand Down

0 comments on commit fb57a25

Please sign in to comment.