You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm running into an issue where my Spark session is getting killed before all of my Kafka messages can be produced.
In the documentation I saw there was a callback mechanism that can be used. I see that this callback method is called for each record that is produced. The best solution I can come up with is just comparing the number of times the callback is called with the count in my data frame, but is there any more elegant solution that comes to mind?
val spark = SparkSession
.builder()
.config("spark.debug.maxToStringFields", 100000)
.enableHiveSupport()
.getOrCreate()
for ((key, value) <- myConfigMap) {
producerConfig.setProperty(key, value)
}
for (sqlFileName <- sqlFileNamesArg.split(",")) {
sqlParamsMap("TMPID") = sqlParamsMap("TMPID").toString
.replaceAll("\\$\\{sqlFileName\\}", sqlFileName.replaceAll(".sql", ""))
val theSql = render(spark, sqlFileName, sqlParamsMap)
val df = spark.sql(theSql)
if (!df.rdd.isEmpty) {
df.rdd.writeToKafka(
producerConfig,
s => new ProducerRecord[String, String](topic, convertRowToJSON(s))
)
}
}
// Edit: Commenting out the below call to stop() seems to make no difference on whether the
// Spark session early-terminate issue occurs or not.
spark.stop() // call explicitly to avoid issue here -> https://issues.apache.org/jira/browse/SPARK-24981
)
The text was updated successfully, but these errors were encountered:
Sorry, I forgot to mention this - I am using version 0.3.0 due to my Spark version being 2.1.1.
The code you link to above looks slightly different for 0.3.0 - could this be related?
I'm running into an issue where my Spark session is getting killed before all of my Kafka messages can be produced.
In the documentation I saw there was a callback mechanism that can be used. I see that this callback method is called for each record that is produced. The best solution I can come up with is just comparing the number of times the callback is called with the count in my data frame, but is there any more elegant solution that comes to mind?
The text was updated successfully, but these errors were encountered: