From 88f402fdc7a7bec937e2c941cd78543892d111c0 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 16 Oct 2024 13:32:15 -0400 Subject: [PATCH] WIP fixing FlinkPscProducerMigrationOperatorTest --- .../connectors/psc/FlinkPscProducer.java | 126 ++++++++++-------- 1 file changed, 70 insertions(+), 56 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java index d4840fd..facab7e 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java @@ -1160,6 +1160,13 @@ private void flush(FlinkPscProducer.PscTransactionState transaction) throws Flin public void snapshotState(FunctionSnapshotContext context) throws Exception { super.snapshotState(context); + PscMetricRegistryManager.getInstance().updateHistogramMetric( + null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_PENDING_TRANSACTIONS, pendingCommitTransactions.size(), pscConfigurationInternal + ); + PscMetricRegistryManager.getInstance().updateHistogramMetric( + null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_STATE_SIZE, getSize(state), pscConfigurationInternal + ); + nextTransactionalIdHintState.clear(); // To avoid duplication only first subtask keeps track of next transactional id hint. // Otherwise all of the @@ -1252,13 +1259,6 @@ private void supersSnapshotState(FunctionSnapshotContext context) throws Excepti } catch (InvocationTargetException exception) { throw (Exception) exception.getTargetException(); } - - PscMetricRegistryManager.getInstance().updateHistogramMetric( - null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_PENDING_TRANSACTIONS, pendingCommitTransactions.size(), pscConfigurationInternal - ); - PscMetricRegistryManager.getInstance().updateHistogramMetric( - null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_SNAPSHOT_PSC_STATE_SIZE, getSize(state), pscConfigurationInternal - ); } @Override @@ -1272,28 +1272,45 @@ public void initializeState(FunctionInitializationContext context) throws Except semantic = FlinkPscProducer.Semantic.NONE; } - Set registeredStateNames = context.getOperatorStateStore().getRegisteredStateNames(); - if (registeredStateNames == null || registeredStateNames.isEmpty() || - (registeredStateNames.equals(Collections.singleton("state")))) { - nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2); - String actualTransactionalIdPrefix; - if (this.transactionalIdPrefix != null) { - actualTransactionalIdPrefix = this.transactionalIdPrefix; - } else { - actualTransactionalIdPrefix = getActualTransactionalIdPrefix(); - } - initializeTransactionState(actualTransactionalIdPrefix); - super.initializeState(context); - return; - } + nextTransactionalIdHintState = + context.getOperatorStateStore() + .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2); + +// Set registeredStateNames = context.getOperatorStateStore().getRegisteredStateNames(); +// if (registeredStateNames == null || registeredStateNames.isEmpty() || +// (registeredStateNames.equals(Collections.singleton("state")))) { +// nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2); +// String actualTransactionalIdPrefix; +// if (this.transactionalIdPrefix != null) { +// actualTransactionalIdPrefix = this.transactionalIdPrefix; +// } else { +// actualTransactionalIdPrefix = getActualTransactionalIdPrefix(); +// } +// initializeTransactionState(actualTransactionalIdPrefix); +// super.initializeState(context); +// return; +// } if (pscConfigurationInternal == null) { pscConfigurationInternal = PscConfigurationUtils.propertiesToPscConfigurationInternal(producerConfig, PscConfiguration.PSC_CLIENT_TYPE_PRODUCER); } - if (context.getOperatorStateStore() - .getRegisteredStateNames() - .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2.getName())) { + // Check if the state is a flink-psc state or a flink-kafka state + boolean containsKafkaNextTransactionalIdHintDescriptorV2 = context.getOperatorStateStore() + .getRegisteredStateNames() + .contains(((ListStateDescriptor) + PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2")) + .getName()); + boolean containsKafkaNextTransactionalIdHintDescriptor = context.getOperatorStateStore() + .getRegisteredStateNames() + .contains(((ListStateDescriptor) + PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR")) + .getName()); + boolean containsPscNextTransactionalIdHintDescriptorV2 = context.getOperatorStateStore() + .getRegisteredStateNames() + .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2.getName()); + + if (containsPscNextTransactionalIdHintDescriptorV2) { // psc checkpoint LOG.info("Detected a flink-psc checkpoint."); try { @@ -1320,42 +1337,24 @@ public void initializeState(FunctionInitializationContext context) throws Except ); throw e; } - } else if (context.getOperatorStateStore() - .getRegisteredStateNames() - .contains(((ListStateDescriptor) - PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2")) - .getName()) || - context.getOperatorStateStore() - .getRegisteredStateNames() - .contains(((ListStateDescriptor) - PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR")) - .getName())) { + } else if (containsKafkaNextTransactionalIdHintDescriptor || containsKafkaNextTransactionalIdHintDescriptorV2) { LOG.info("Detected a flink-kafka checkpoint."); try { - ListState nextKafkaTransactionalIdHintState = - context.getOperatorStateStore().getUnionListState((ListStateDescriptor) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2")); - ListState oldNextTransactionalIdHintState = - context.getOperatorStateStore().getUnionListState((ListStateDescriptor) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR")); - - ArrayList oldTransactionalIdHints = - Lists.newArrayList(oldNextTransactionalIdHintState.get()); - if (!oldTransactionalIdHints.isEmpty()) { - nextKafkaTransactionalIdHintState.addAll(oldTransactionalIdHints); - //clear old state - oldNextTransactionalIdHintState.clear(); + if (containsKafkaNextTransactionalIdHintDescriptor) { + migrateNextTransactionalIdHintState(context); } - PscMetricRegistryManager.getInstance().updateHistogramMetric( - null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_RECOVERY_KAFKA_TRANSACTIONS, getSize(nextKafkaTransactionalIdHintState), pscConfigurationInternal - ); - - nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2); - for (FlinkKafkaProducer.NextTransactionalIdHint nextKafkaTransactionalIdHint : nextKafkaTransactionalIdHintState.get()) { - nextTransactionalIdHintState.add(new NextTransactionalIdHint( - nextKafkaTransactionalIdHint.lastParallelism, - nextKafkaTransactionalIdHint.nextFreeTransactionalId - )); - } +// PscMetricRegistryManager.getInstance().updateHistogramMetric( +// null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_RECOVERY_KAFKA_TRANSACTIONS, getSize(nextKafkaTransactionalIdHintState), pscConfigurationInternal +// ); +// +// nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2); +// for (FlinkKafkaProducer.NextTransactionalIdHint nextKafkaTransactionalIdHint : nextKafkaTransactionalIdHintState.get()) { +// nextTransactionalIdHintState.add(new NextTransactionalIdHint( +// nextKafkaTransactionalIdHint.lastParallelism, +// nextKafkaTransactionalIdHint.nextFreeTransactionalId +// )); +// } String actualTransactionalIdPrefix; if (this.transactionalIdPrefix != null) { actualTransactionalIdPrefix = this.transactionalIdPrefix; @@ -1394,6 +1393,21 @@ null, FlinkPscStateRecoveryMetricConstants.PSC_SINK_STATE_RECOVERY_PSC_TRANSACTI } } + private void migrateNextTransactionalIdHintState(FunctionInitializationContext context) throws Exception { + ListState nextKafkaTransactionalIdHintState = + context.getOperatorStateStore().getUnionListState((ListStateDescriptor) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2")); + ListState oldNextTransactionalIdHintState = + context.getOperatorStateStore().getUnionListState((ListStateDescriptor) PscCommon.getField(FlinkKafkaProducer.class, "NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR")); + + ArrayList oldTransactionalIdHints = + Lists.newArrayList(oldNextTransactionalIdHintState.get()); + if (!oldTransactionalIdHints.isEmpty()) { + nextKafkaTransactionalIdHintState.addAll(oldTransactionalIdHints); + //clear old state + oldNextTransactionalIdHintState.clear(); + } + } + private void initializeTransactionState(String actualTransactionalIdPrefix) throws Exception { transactionalIdsGenerator = new TransactionalIdsGenerator(