Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot send all messages successfully before Spark session terminates #167

Open
mdobrin opened this issue Apr 10, 2021 · 3 comments
Open

Comments

@mdobrin
Copy link

mdobrin commented Apr 10, 2021

I'm running into an issue where my Spark session is getting killed before all of my Kafka messages can be produced.
In the documentation I saw there was a callback mechanism that can be used. I see that this callback method is called for each record that is produced. The best solution I can come up with is just comparing the number of times the callback is called with the count in my data frame, but is there any more elegant solution that comes to mind?

val spark = SparkSession
      .builder()
      .config("spark.debug.maxToStringFields", 100000)
      .enableHiveSupport()
      .getOrCreate()

    for ((key, value) <- myConfigMap) {
      producerConfig.setProperty(key, value)
    }

    for (sqlFileName <- sqlFileNamesArg.split(",")) {
      sqlParamsMap("TMPID") = sqlParamsMap("TMPID").toString
        .replaceAll("\\$\\{sqlFileName\\}", sqlFileName.replaceAll(".sql", ""))

      val theSql = render(spark, sqlFileName, sqlParamsMap)
      val df = spark.sql(theSql)

      if (!df.rdd.isEmpty) {
        df.rdd.writeToKafka(
          producerConfig,
          s => new ProducerRecord[String, String](topic, convertRowToJSON(s))
        )
      }
    }

    //  Edit: Commenting out the below call to stop() seems to make no difference on whether the 
    //  Spark session early-terminate issue occurs or not.
    spark.stop()  // call explicitly to avoid issue here -> https://issues.apache.org/jira/browse/SPARK-24981
)
@BenFradet
Copy link
Owner

@mdobrin
Copy link
Author

mdobrin commented Apr 13, 2021

Sorry, I forgot to mention this - I am using version 0.3.0 due to my Spark version being 2.1.1.
The code you link to above looks slightly different for 0.3.0 - could this be related?

https://github.com/BenFradet/spark-kafka-writer/blob/0.3.0/spark-kafka-0-10-writer/src/test/scala/com/github/benfradet/spark/kafka010/writer/SKRSpec.scala#L63-L68

@BenFradet
Copy link
Owner

ah, it might well be 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants