Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Foreign keys on plan files are not working #1

Closed
pabloromanelli opened this issue Mar 1, 2024 · 10 comments
Closed

Foreign keys on plan files are not working #1

pabloromanelli opened this issue Mar 1, 2024 · 10 comments
Assignees

Comments

@pabloromanelli
Copy link

Hi, I love the project!

I'm trying to use it mainly with plan and tasks files, but when I configure sinkOptions.foreignKeys on the plan yaml file it fails with ArrayIndexOutOfBoundsException.

To replicate the issue I'm using https://github.com/data-catering/data-caterer-example

DATA_SOURCE=postgres-multiple-tables docker-compose up -d datacaterer

Logs:

docker logs -f docker-datacaterer-1
01/03/2024 13:19:15 [INFO ] com.github.pflooky.datagen.App$: Starting Data Caterer
01/03/2024 13:19:17 [INFO ] com.github.pflooky.datagen.core.parser.PlanParser$: Found plan file and parsed successfully, plan-file-path=/opt/app/custom/plan/postgres-multiple-tables.yaml, plan-name=postgres_multiple_tables_plan, plan-description=Create balance and transaction data in Postgres
01/03/2024 13:19:18 [INFO ] com.github.pflooky.datagen.core.generator.DataGeneratorProcessor: Following tasks are enabled and will be executed: num-tasks=1, tasks=task=postgres_balance_and_transaction, num-steps=2, steps=balances,transactions
01/03/2024 13:19:19 [INFO ] com.github.pflooky.datagen.core.util.RecordCountUtil$: Number of batches for data generation, num-batches=3, num-records-per-batch=1000000, total-records=3000000
01/03/2024 13:19:19 [INFO ] com.github.pflooky.datagen.core.generator.BatchDataProcessor: Starting batch, batch=1, num-batches=3
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
	at com.github.pflooky.datagen.core.model.ForeignKeyRelationHelper$.fromString(PlanModels.scala:22)
...

Setting the foreign keys on a plan class is not failing (.addForeignKeyRelationship())

Thank you

@pabloromanelli pabloromanelli changed the title Foreign keys on plan files are now working Foreign keys on plan files are not working Mar 1, 2024
@pflooky
Copy link

pflooky commented Mar 8, 2024

Hi @pabloromanelli!

Thanks for raising the issue. I appreciate the logs and explanation.
I suspect there is something within the Plan creation that is missing. Let me investigate and get back to you.

@pflooky pflooky self-assigned this Mar 8, 2024
@pflooky
Copy link

pflooky commented Mar 8, 2024

Found what the issue was. It failed to parse due to handling scenarios where there are . in the foreign key names so it allows for || being used as a delimiter as well but it broke the previous delimiter logic.

Should be fixed now as part of release 0.6.1 which has also been updated in the https://github.com/data-catering/data-caterer-example.

Let me know if this fix works for you then can close off the issue.
Thanks! 👍

@pabloromanelli
Copy link
Author

Hi Peter!

It is still failing with the latest version of https://github.com/data-catering/data-caterer-example (https://github.com/data-catering/data-caterer-example/tree/fe014bd1999e9e77233765c5903ba75de5c79b27)

I updated the Dockefile to use 0.6.1 (https://github.com/data-catering/data-caterer-example/blob/main/Dockerfile):

FROM datacatering/data-caterer-basic:0.6.1

And changed the env variable on depends_on to be able to use a different plan:

version: "3.9"
services:
  datacaterer:
    build: ../
    environment:
...
      - "PLAN_FILE_PATH=/opt/app/custom/${PLAN:-plan/${DATA_SOURCE:-postgres}}.yaml"
...
    depends_on:
      - postgres

After starting docker compose it fails with the same error

DATA_SOURCE=postgres-multiple-tables  docker-compose up -d datacaterer
docker logs -f docker-datacaterer-1

13/03/2024 13:42:45 [INFO ] com.github.pflooky.datagen.App$: Starting Data Caterer
13/03/2024 13:42:47 [INFO ] com.github.pflooky.datagen.core.parser.PlanParser$: Found plan file and parsed successfully, plan-file-path=/opt/app/custom/plan/postgres-multiple-tables.yaml, plan-name=postgres_multiple_tables_plan, plan-description=Create balance and transaction data in Postgres
13/03/2024 13:42:47 [INFO ] com.github.pflooky.datagen.core.generator.DataGeneratorProcessor: Following tasks are enabled and will be executed: num-tasks=1, tasks=task=postgres_balance_and_transaction, num-steps=2, steps=balances,transactions
13/03/2024 13:42:48 [INFO ] com.github.pflooky.datagen.core.util.RecordCountUtil$: Number of batches for data generation, num-batches=3, num-records-per-batch=1000000, total-records=3000000
13/03/2024 13:42:48 [INFO ] com.github.pflooky.datagen.core.generator.BatchDataProcessor: Starting batch, batch=1, num-batches=3
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
	at com.github.pflooky.datagen.core.model.ForeignKeyRelationHelper$.fromString(PlanModels.scala:22)
	at com.github.pflooky.datagen.core.model.PlanImplicits$SinkOptionsOps.gatherForeignKeyRelations(PlanModels.scala:217)
...

Thanks!

@pflooky
Copy link

pflooky commented Mar 14, 2024

Thanks again @pabloromanelli for the details.

I see. So you are using the following postgres-multiple-tables.yaml plan to run.
I was able to have it run successfully via:

peter@Flooks-MacBook-Pro docker % PLAN=plan/postgres-multiple-tables DATA_SOURCE=postgres docker-compose up -d datacaterer
peter@Flooks-MacBook-Pro docker % docker logs -f docker-datacaterer-1
14/03/2024 08:27:34 [INFO ] io.github.datacatering.datacaterer.App$: Starting Data Caterer
14/03/2024 08:27:36 [INFO ] io.github.datacatering.datacaterer.core.parser.PlanParser$: Found plan file and parsed successfully, plan-file-path=/opt/app/custom/plan/postgres-multiple-tables.yaml, plan-name=postgres_multiple_tables_plan, plan-description=Create balance and transaction data in Postgres
14/03/2024 08:27:36 [INFO ] io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor: Following tasks are enabled and will be executed: num-tasks=1, tasks=task=postgres_balance_and_transaction, num-steps=2, steps=balances,transactions
14/03/2024 08:27:37 [INFO ] io.github.datacatering.datacaterer.core.util.RecordCountUtil$: Number of batches for data generation, num-batches=1, num-records-per-batch=1000000, total-records=21000
14/03/2024 08:27:37 [INFO ] io.github.datacatering.datacaterer.core.generator.BatchDataProcessor: Starting batch, batch=1, num-batches=1
14/03/2024 08:27:40 [WARN ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Count is disabled. It will help with performance. Defaulting to -1
14/03/2024 08:27:40 [INFO ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Pushing data to sink, data-source-name=postgresCustomer, step-name=balances, save-mode=Append, num-records=-1, status=started
14/03/2024 08:27:42 [INFO ] io.github.datacatering.datacaterer.core.util.MetadataUtil$: Computed metadata statistics for data source, name=postgresCustomer, format=jdbc, details=Map(format -> jdbc, stringtype -> unspecified, url -> jdbc:postgresql://postgresserver:5432/customer, driver -> org.postgresql.Driver, dbtable -> account.balances, user -> postgres), rows-analysed=1000, size-in-bytes=39804, num-columns-analysed=4
14/03/2024 08:27:42 [INFO ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Successfully saved data to sink, data-source-name=postgresCustomer, step-name=balances, save-mode=Append, num-records=-1, status=finished
14/03/2024 08:27:42 [INFO ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Pushing data to sink, data-source-name=postgresCustomer, step-name=transactions, save-mode=Append, num-records=-1, status=started
14/03/2024 08:27:44 [INFO ] io.github.datacatering.datacaterer.core.util.MetadataUtil$: Computed metadata statistics for data source, name=postgresCustomer, format=jdbc, details=Map(format -> jdbc, stringtype -> unspecified, url -> jdbc:postgresql://postgresserver:5432/customer, driver -> org.postgresql.Driver, dbtable -> account.transactions, user -> postgres), rows-analysed=5035, size-in-bytes=245815, num-columns-analysed=4
14/03/2024 08:27:44 [INFO ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Successfully saved data to sink, data-source-name=postgresCustomer, step-name=transactions, save-mode=Append, num-records=-1, status=finished
14/03/2024 08:27:44 [INFO ] io.github.datacatering.datacaterer.core.generator.BatchDataProcessor: Finished batch, batch=1, num-batches=1
14/03/2024 08:27:44 [INFO ] io.github.datacatering.datacaterer.core.generator.result.DataGenerationResultWriter: Writing data generation summary to HTML files, folder-path=/opt/app/custom/report
14/03/2024 08:27:44 [INFO ] io.github.datacatering.datacaterer.App$: Completed in 10s

I had to add some missing fields from the application.conf but this doesn't explain the error you are getting which is about the foreign keys. I have pushed these changes into data-caterer-example now. In case there is a caching issue, can you try clear out your docker images and start clean?
i.e.

docker ps -a | grep datacaterer | awk -F " " '{print $1}' | xargs docker rm
docker images | grep datacaterer | awk -F " " '{print $3}' | xargs docker rmi

On another note, you can run a separate plan without altering the Dockerfile via the following command:

PLAN=plan/postgres-multiple-tables DATA_SOURCE=postgres docker-compose up -d datacaterer

I have made a note of this in the README to make it clearer.

Please let me know if it is still a problem for you. Thanks for your patience 👍

@pabloromanelli
Copy link
Author

Now I can get it to start but it fails with foreign key constraints on the database:

I removed old docker images

docker ps -a | grep datacaterer | awk -F " " '{print $1}' | xargs docker rm
docker images | grep datacaterer | awk -F " " '{print $3}' | xargs docker rmi

and added to application.conf

validation {}
alert {}

Started docker compose using

PLAN=plan/postgres-multiple-tables DATA_SOURCE=postgres docker-compose up -d datacaterer

Logs: docker logs -f docker-datacaterer-1

14/03/2024 14:14:54 [INFO ] io.github.datacatering.datacaterer.App$: Starting Data Caterer
14/03/2024 14:14:56 [INFO ] io.github.datacatering.datacaterer.core.parser.PlanParser$: Found plan file and parsed successfully, plan-file-path=/opt/app/custom/plan/postgres-multiple-tables.yaml, plan-name=postgres_multiple_tables_plan, plan-description=Create balance and transaction data in Postgres
14/03/2024 14:14:56 [INFO ] io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor: Following tasks are enabled and will be executed: num-tasks=1, tasks=task=postgres_balance_and_transaction, num-steps=2, steps=balances,transactions
14/03/2024 14:14:57 [INFO ] io.github.datacatering.datacaterer.core.util.RecordCountUtil$: Number of batches for data generation, num-batches=3, num-records-per-batch=1000000, total-records=3000000
14/03/2024 14:14:57 [INFO ] io.github.datacatering.datacaterer.core.generator.BatchDataProcessor: Starting batch, batch=1, num-batches=3
14/03/2024 14:15:12 [WARN ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Count is disabled. It will help with performance. Defaulting to -1
14/03/2024 14:15:12 [INFO ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Pushing data to sink, data-source-name=postgresCustomer, step-name=balances, save-mode=Append, num-records=-1, status=started
14/03/2024 14:15:14 [ERROR] org.apache.spark.executor.Executor: Exception in task 1.0 in stage 13.0 (TID 19)
java.sql.BatchUpdateException: Batch entry 912 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC148559','2023-05-24 20:20:08.734+00','open',17441.251547534743) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC148559) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2402)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:574)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC148559) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	... 21 more
14/03/2024 14:15:14 [ERROR] org.apache.spark.executor.Executor: Exception in task 0.0 in stage 13.0 (TID 18)
java.sql.BatchUpdateException: Batch entry 221 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC132436','2024-01-12 01:31:19.309+00','closed',88357.3012032108) was aborted: ERROR: deadlock detected
  Detail: Process 61 waits for ShareLock on transaction 744; blocked by process 60.
Process 60 waits for ShareLock on transaction 743; blocked by process 61.
  Hint: See server log for query details.
  Where: while inserting index tuple (3,94) in relation "balances_account_number_key"  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2402)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2134)
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1491)
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1516)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:560)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: ERROR: deadlock detected
  Detail: Process 61 waits for ShareLock on transaction 744; blocked by process 60.
Process 60 waits for ShareLock on transaction 743; blocked by process 61.
  Hint: See server log for query details.
  Where: while inserting index tuple (3,94) in relation "balances_account_number_key"
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	... 24 more
14/03/2024 14:15:14 [ERROR] org.apache.spark.scheduler.TaskSetManager: Task 1 in stage 13.0 failed 1 times; aborting job
14/03/2024 14:15:16 [INFO ] io.github.datacatering.datacaterer.core.util.MetadataUtil$: Computed metadata statistics for data source, name=postgresCustomer, format=jdbc, details=Map(format -> jdbc, stringtype -> unspecified, url -> jdbc:postgresql://postgresserver:5432/customer, driver -> org.postgresql.Driver, dbtable -> account.balances, user -> postgres, password -> postgres), rows-analysed=333333, size-in-bytes=13213873, num-columns-analysed=4
14/03/2024 14:15:16 [ERROR] io.github.datacatering.datacaterer.core.sink.SinkFactory: Failed to save data for sink, data-source-name=postgresCustomer, step-name=balances, save-mode=Append, num-records=-1, status=failed, exception=Job aborted due to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 19) (caa356230363 executor driver): java.sql.BatchUpdateException: Batch entry 912 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC148559','2023-05-24 20:20:08.734+00','open',17441.251547534743) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)
Exception in thread "main" java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 19) (caa356230363 executor driver): java.sql.BatchUpdateException: Batch entry 912 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC148559','2023-05-24 20:20:08.734+00','open',17441.251547534743) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC148559) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2402)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:574)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC148559) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	... 21 more

Driver stacktrace:
	at io.github.datacatering.datacaterer.core.sink.SinkFactory.saveData(SinkFactory.scala:52)
	at io.github.datacatering.datacaterer.core.sink.SinkFactory.pushToSink(SinkFactory.scala:34)
	at io.github.datacatering.datacaterer.core.generator.BatchDataProcessor.$anonfun$pushDataToSinks$3(BatchDataProcessor.scala:85)
	at scala.collection.immutable.List.map(List.scala:293)
	at io.github.datacatering.datacaterer.core.generator.BatchDataProcessor.pushDataToSinks(BatchDataProcessor.scala:80)
	at io.github.datacatering.datacaterer.core.generator.BatchDataProcessor.$anonfun$splitAndProcess$2(BatchDataProcessor.scala:65)
	at io.github.datacatering.datacaterer.core.generator.BatchDataProcessor.$anonfun$splitAndProcess$2$adapted(BatchDataProcessor.scala:34)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at io.github.datacatering.datacaterer.core.generator.BatchDataProcessor.splitAndProcess(BatchDataProcessor.scala:34)
	at io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor.generateDataWithResult(DataGeneratorProcessor.scala:57)
	at io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor.generateData(DataGeneratorProcessor.scala:39)
	at io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor.generateData(DataGeneratorProcessor.scala:33)
	at io.github.datacatering.datacaterer.core.plan.PlanProcessor$.executePlanWithConfig(PlanProcessor.scala:55)
	at io.github.datacatering.datacaterer.core.plan.PlanProcessor$.executePlan(PlanProcessor.scala:46)
	at io.github.datacatering.datacaterer.core.plan.PlanProcessor$.$anonfun$determineAndExecutePlan$8(PlanProcessor.scala:32)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at io.github.datacatering.datacaterer.core.plan.PlanProcessor$.$anonfun$determineAndExecutePlan$6(PlanProcessor.scala:32)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at io.github.datacatering.datacaterer.core.plan.PlanProcessor$.determineAndExecutePlan(PlanProcessor.scala:32)
	at io.github.datacatering.datacaterer.App$.main(App.scala:18)
	at io.github.datacatering.datacaterer.App.main(App.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1029)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 19) (caa356230363 executor driver): java.sql.BatchUpdateException: Batch entry 912 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC148559','2023-05-24 20:20:08.734+00','open',17441.251547534743) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC148559) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2402)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:574)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC148559) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	... 21 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1036)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1034)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:901)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:70)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at io.github.datacatering.datacaterer.core.sink.SinkFactory.$anonfun$saveBatchData$1(SinkFactory.scala:72)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at io.github.datacatering.datacaterer.core.sink.SinkFactory.saveBatchData(SinkFactory.scala:73)
	at io.github.datacatering.datacaterer.core.sink.SinkFactory.saveData(SinkFactory.scala:45)
	... 38 more
Caused by: java.sql.BatchUpdateException: Batch entry 912 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC148559','2023-05-24 20:20:08.734+00','open',17441.251547534743) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC148559) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2402)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:574)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC148559) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	... 21 more

(I'm still using the parent image FROM datacatering/data-caterer-basic:0.6.1 on /Dockerfile)

@pflooky
Copy link

pflooky commented Mar 14, 2024

Ah great! So the error you are getting now relates to the generated data not being unique (in this case account_number in the balances table). Given it is generating 1,000,000 account_number values according to a particular regex (ACC1[0-9]{5,10} from here), it has generated the same value by chance and failed to push into Postgres as the value already exists.

This can be resolved via enabling the uniqueness flag in application.conf.

flags {
    enableUniqueCheck = true
}

I will update the examples application.conf to include all the latest configurations.

I have thought about having the flag enabled by default but there is a performance penalty when it is enabled. I will introduce some logic to check if any fields have been set to isUnique=true and then enable this flag.

@pflooky
Copy link

pflooky commented Mar 15, 2024

I have released version 0.6.2 to include the logic always to generate unique values if isUnique or isPrimaryKey is set to true. I updated the data-caterer-example repo as well and it runs without error now. Let me know if you run into any other issues. 👍

@pabloromanelli
Copy link
Author

Thank you for taking the time Peter! but I'm sorry to say that it is still failing with the latest changes.
Just in case, I cloned the repo again and clean up all docker images (I didn't change any file from the repo):

Clean up:

❯ docker images | grep datacaterer | awk -F " " '{print $3}' | xargs docker rmi
Untagged: docker-datacaterer:latest
Deleted: sha256:bac118d8b3eca6a6d1a0f72c3b2c2cbd4a51b1845fce7a653b01afa48b38fb33

❯ docker images | grep caterer | awk -F " " '{print $3}' | xargs docker rmi
Untagged: datacatering/data-caterer-basic:0.6.1
Untagged: datacatering/data-caterer-basic@sha256:e252c28008e29042679368ccf0476da5dc3f9cff0cd3de4f5341fd1f378f893f
Deleted: sha256:71a9b97cf772d1b9c93867095934c7dddb91a1fb3469285d02ae401b13c045d1
Untagged: datacatering/data-caterer-basic:0.6.0
Untagged: datacatering/data-caterer-basic@sha256:499424369a54cb05785cd830fbaa2fa0fdd32876aa034367fc45cae3d24b0a8a
Deleted: sha256:2a0c6aa08dcae9560858155e91614362c424603baa564fadf724c68f07c56d39
Deleted: sha256:e0241513a7bd17154466ddea69fca4957ce43b42033044a2a0c5a2631bb4bd58
Deleted: sha256:a99efb36fb62c8cac529d95a68fff714bc445a5beed5be59d6179f3131824a49
Deleted: sha256:57659c417a10d0a50591abc7047dd7bc81c02a4374723d4727ffa572e902e50e
Deleted: sha256:24db7e351d9018ff9c0adcd8551a3bea5fca271eda3eddda19bfa1959c9e2924
Deleted: sha256:ae7b8dee58f69f31c78a0a3943a5cc21e1644fcc1fbb5bc8ab84b679a79bbcd8
Deleted: sha256:3bf6f68e635d8d0bf75d87773587ba8226f55a1d3f146e3691de0b7185c9bd22
Deleted: sha256:78d084d743f56b3d52325b7d76cd962a0b7dbd14c1d8b83f3b722f90340152d6
Deleted: sha256:0499a6177d527721beab495704728b83e56b627838480536047e1ddebd186e62
Deleted: sha256:04e7a88e97d8e17a37c00fefaf70ffd8c1e9d72f3023273e838ccf6585020f84
Deleted: sha256:efd185bed709bbbcfdcbce529057936c5486cdf4ca11aac768bca34b5d3aadf0
Deleted: sha256:23e15c9ea17e4541fb2c98419b963794b13d8921a42cf9d97100d02352488683
Deleted: sha256:ab3ae7038b5a23b7aaeccefcbc49ac50607b445fc6a6b7ed51678d79db666a8a
Untagged: datacatering/data-caterer-basic:0.4.1
Untagged: datacatering/data-caterer-basic@sha256:4ae29436f21996710984c908aea7afd679945cf1dbddbc0bc421ca407d107ab6
Deleted: sha256:731a8d91788a998224255e78efc7b143a90a9f86f3e6cc85442c733d74e78d93
Error response from daemon: conflict: unable to delete e2a9822c86c3 (must be forced) - image is being used by stopped container e4cc0a81c572

❯ docker ps -a | grep caterer | awk -F " " '{print $1}' | xargs docker rm

❯ git clone [email protected]:data-catering/data-caterer-example.git
Cloning into 'data-caterer-example'...
remote: Enumerating objects: 649, done.
remote: Counting objects: 100% (649/649), done.
remote: Compressing objects: 100% (296/296), done.
remote: Total 649 (delta 278), reused 529 (delta 161), pack-reused 0
Receiving objects: 100% (649/649), 1.85 MiB | 2.71 MiB/s, done.
Resolving deltas: 100% (278/278), done.
❯ git log
commit 3a7be9edec4776ec882d03cbadab2f71fccd6dd4 (HEAD -> main, origin/main, origin/HEAD)
Author: Flook Peter <[email protected]>
Date:   Fri Mar 15 10:25:36 2024 +0800

    Update to 0.6.2

Build jar

❯ ./run.sh
Class name of plan to run [DocumentationPlanRun]:
Building jar with plan run

BUILD SUCCESSFUL in 567ms
5 actionable tasks: 2 executed, 2 from cache, 1 up-to-date
210d01bdbd6cbc086b847a49e39ff2807fafa36b35d1d697adf4a4d987d0f726
Running Data Caterer via docker, version: 0.6.2
15/03/2024 15:46:42 [INFO ] io.github.datacatering.datacaterer.App$: Starting Data Caterer
15/03/2024 15:46:43 [INFO ] io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor: Following tasks are enabled and will be executed: num-tasks=2, tasks=task=df0c2bcc-1019-482b-acd8-7551b5e145b6, num-steps=1, steps=2e961411-ae80-460d-8ec3-e6bb5fff6b6e||task=e3f6d2d9-c333-4523-9f8c-f42ea84d0997, num-steps=1, steps=6415f322-7f95-44dc-aa9a-c963f18e483c
15/03/2024 15:46:43 [INFO ] io.github.datacatering.datacaterer.core.util.RecordCountUtil$: Number of batches for data generation, num-batches=1, num-records-per-batch=100000, total-records=300
15/03/2024 15:46:43 [INFO ] io.github.datacatering.datacaterer.core.generator.BatchDataProcessor: Starting batch, batch=1, num-batches=1
15/03/2024 15:46:47 [INFO ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Pushing data to sink, data-source-name=account_info, step-name=2e961411-ae80-460d-8ec3-e6bb5fff6b6e, save-mode=Overwrite, num-records=100, status=started
15/03/2024 15:46:48 [INFO ] io.github.datacatering.datacaterer.core.util.MetadataUtil$: Computed metadata statistics for data source, name=account_info, format=json, details=Map(saveMode -> overwrite, format -> json, path -> /opt/app/data/json), rows-analysed=100, size-in-bytes=23041, num-columns-analysed=5
15/03/2024 15:46:48 [INFO ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Successfully saved data to sink, data-source-name=account_info, step-name=2e961411-ae80-460d-8ec3-e6bb5fff6b6e, save-mode=Overwrite, num-records=100, status=finished
15/03/2024 15:46:48 [INFO ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Pushing data to sink, data-source-name=transactions, step-name=6415f322-7f95-44dc-aa9a-c963f18e483c, save-mode=Overwrite, num-records=151, status=started
15/03/2024 15:46:48 [INFO ] io.github.datacatering.datacaterer.core.util.MetadataUtil$: Computed metadata statistics for data source, name=transactions, format=csv, details=Map(saveMode -> overwrite, header -> true, format -> csv, path -> /opt/app/data/csv), rows-analysed=151, size-in-bytes=11524, num-columns-analysed=5
15/03/2024 15:46:48 [INFO ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Successfully saved data to sink, data-source-name=transactions, step-name=6415f322-7f95-44dc-aa9a-c963f18e483c, save-mode=Overwrite, num-records=151, status=finished
15/03/2024 15:46:48 [INFO ] io.github.datacatering.datacaterer.core.generator.BatchDataProcessor: Finished batch, batch=1, num-batches=1
15/03/2024 15:46:48 [INFO ] io.github.datacatering.datacaterer.core.validator.ValidationProcessor: Executing data validations
15/03/2024 15:46:48 [INFO ] io.github.datacatering.datacaterer.core.validator.ValidationProcessor: Executing data validations for data source, name=default_validation, data-source-name=account_info, num-validations=1
15/03/2024 15:46:49 [INFO ] io.github.datacatering.datacaterer.core.validator.ValidationProcessor: Executing data validations for data source, name=default_validation, data-source-name=transactions, num-validations=2
15/03/2024 15:46:49 [INFO ] io.github.datacatering.datacaterer.core.validator.ValidationProcessor: Data validations successful for validation, name=default_validation, description=Validation of data sources after generating data, data-source-name=account_info, data-source-options=Map(saveMode -> overwrite, format -> json, path -> /opt/app/data/json), is-success=true
15/03/2024 15:46:49 [ERROR] io.github.datacatering.datacaterer.core.validator.ValidationProcessor: Failed validation: validation-name=default_validation, description=Validation of data sources after generating data, data-source-name=transactions, data-source-options=Map(saveMode -> overwrite, header -> true, format -> csv, path -> /opt/app/data/csv), is-success=false, validation-type=expression, check=amount > 15, sample-errors={"account_id":"ACC17535014","name":"Santiago Gleichner","txn_id":"k4re6k","amount":"14.64614839669984","merchant":"Doyle and Sons"},{"account_id":"ACC82581513","name":"Erika Crona","txn_id":"HGMDPG","amount":"15.0466598398658","merchant":"Haag, Mayer and White"},{"account_id":"ACC18392642","name":"Jonna Conn II","txn_id":"gWam","amount":"11.645604189767013","merchant":"Torphy Inc"},{"account_id":"ACC85887942","name":"Elenora Stark","txn_id":"E2F","amount":"10.47739324666357","merchant":"Torphy, Heathcote and Gutkowski"},{"account_id":"ACC78703928","name":"Rogelio Abshire","txn_id":"V3","amount":"10.956636791438548","merchant":"Smith-Kovacek"}
15/03/2024 15:46:49 [INFO ] io.github.datacatering.datacaterer.core.generator.result.DataGenerationResultWriter: Writing data generation summary to HTML files, folder-path=/opt/app/data/report
15/03/2024 15:46:49 [INFO ] io.github.datacatering.datacaterer.App$: Completed in 7s
Finished!

❯ docker ps -a | grep caterer | awk -F " " '{print $1}' | xargs docker rm

6f52d4ab0e12
e4cc0a81c572

Docker Compose up

❯ PLAN=plan/postgres-multiple-tables DATA_SOURCE=postgres docker-compose up -d datacaterer
[+] Building 0.1s (7/7) FINISHED                                                                                                                                                                                                docker:desktop-linux
 => [datacaterer internal] load build definition from Dockerfile                                                                                                                                                                                0.0s
 => => transferring dockerfile: 216B                                                                                                                                                                                                            0.0s
 => [datacaterer internal] load metadata for docker.io/datacatering/data-caterer-basic:0.6.2                                                                                                                                                    0.0s
 => [datacaterer internal] load .dockerignore                                                                                                                                                                                                   0.0s
 => => transferring context: 2B                                                                                                                                                                                                                 0.0s
 => [datacaterer internal] load build context                                                                                                                                                                                                   0.0s
 => => transferring context: 222.04kB                                                                                                                                                                                                           0.0s
 => CACHED [datacaterer 1/2] FROM docker.io/datacatering/data-caterer-basic:0.6.2                                                                                                                                                               0.0s
 => [datacaterer 2/2] COPY --chown=app:app build/libs/data-caterer-example-0.1.0.jar /opt/spark/jars/data-caterer.jar                                                                                                                           0.0s
 => [datacaterer] exporting to image                                                                                                                                                                                                            0.0s
 => => exporting layers                                                                                                                                                                                                                         0.0s
 => => writing image sha256:fb88bc53c49b8edcfe2715f48ba6c8f309f969c7f4ac71fbe6d0354f336b6bcb                                                                                                                                                    0.0s
 => => naming to docker.io/library/docker-datacaterer                                                                                                                                                                                           0.0s
[+] Running 3/4
 ⠹ Network docker_default             Created                                                                                                                                                                                                  11.2s
 ✔ Container docker-postgresserver-1  Healthy                                                                                                                                                                                                  10.7s
 ✔ Container docker-postgres-1        Started                                                                                                                                                                                                  10.8s
 ✔ Container docker-datacaterer-1     Started                                                                                                                                                                                                  11.2s

Logs

❯ docker logs -f docker-datacaterer-1
15/03/2024 15:48:10 [INFO ] io.github.datacatering.datacaterer.App$: Starting Data Caterer
15/03/2024 15:48:12 [INFO ] io.github.datacatering.datacaterer.core.parser.PlanParser$: Found plan file and parsed successfully, plan-file-path=/opt/app/custom/plan/postgres-multiple-tables.yaml, plan-name=postgres_multiple_tables_plan, plan-description=Create balance and transaction data in Postgres
15/03/2024 15:48:12 [INFO ] io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor: Following tasks are enabled and will be executed: num-tasks=1, tasks=task=postgres_balance_and_transaction, num-steps=2, steps=balances,transactions
15/03/2024 15:48:14 [INFO ] io.github.datacatering.datacaterer.core.util.RecordCountUtil$: Number of batches for data generation, num-batches=2, num-records-per-batch=1000000, total-records=1005000
15/03/2024 15:48:14 [INFO ] io.github.datacatering.datacaterer.core.generator.BatchDataProcessor: Starting batch, batch=1, num-batches=2
15/03/2024 15:48:31 [WARN ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Count is disabled. It will help with performance. Defaulting to -1
15/03/2024 15:48:31 [INFO ] io.github.datacatering.datacaterer.core.sink.SinkFactory: Pushing data to sink, data-source-name=postgresCustomer, step-name=balances, save-mode=Append, num-records=-1, status=started
15/03/2024 15:48:32 [ERROR] org.apache.spark.executor.Executor: Exception in task 0.0 in stage 39.0 (TID 36)
java.sql.BatchUpdateException: Batch entry 11 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC100004','2023-09-20 18:08:46.552+00','closed',53318.697515061074) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC100004) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2402)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2134)
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1491)
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1516)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:560)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC100004) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	... 24 more
15/03/2024 15:48:32 [ERROR] org.apache.spark.executor.Executor: Exception in task 1.0 in stage 39.0 (TID 37)
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC100000','2024-02-05 10:46:12.488+00'::timestamp,'closed',71454.47226148563) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC100000) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2402)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2134)
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1491)
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1516)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:560)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC100000) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	... 24 more
15/03/2024 15:48:32 [ERROR] org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 39.0 failed 1 times; aborting job
15/03/2024 15:48:35 [INFO ] io.github.datacatering.datacaterer.core.util.MetadataUtil$: Computed metadata statistics for data source, name=postgresCustomer, format=jdbc, details=Map(format -> jdbc, stringtype -> unspecified, url -> jdbc:postgresql://postgresserver:5432/customer, driver -> org.postgresql.Driver, dbtable -> account.balances, user -> postgres), rows-analysed=400988, size-in-bytes=16049443, num-columns-analysed=4
15/03/2024 15:48:35 [ERROR] io.github.datacatering.datacaterer.core.sink.SinkFactory: Failed to save data for sink, data-source-name=postgresCustomer, step-name=balances, save-mode=Append, num-records=-1, status=failed, exception=Job aborted due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: Lost task 0.0 in stage 39.0 (TID 36) (10a1c1e59037 executor driver): java.sql.BatchUpdateException: Batch entry 11 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC100004','2023-09-20 18:08:46.552+00','closed',53318.697515061074) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number
Exception in thread "main" java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: Lost task 0.0 in stage 39.0 (TID 36) (10a1c1e59037 executor driver): java.sql.BatchUpdateException: Batch entry 11 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC100004','2023-09-20 18:08:46.552+00','closed',53318.697515061074) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC100004) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2402)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2134)
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1491)
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1516)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:560)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC100004) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	... 24 more

Driver stacktrace:
	at io.github.datacatering.datacaterer.core.sink.SinkFactory.saveData(SinkFactory.scala:52)
	at io.github.datacatering.datacaterer.core.sink.SinkFactory.pushToSink(SinkFactory.scala:34)
	at io.github.datacatering.datacaterer.core.generator.BatchDataProcessor.$anonfun$pushDataToSinks$4(BatchDataProcessor.scala:85)
	at scala.collection.immutable.List.map(List.scala:293)
	at io.github.datacatering.datacaterer.core.generator.BatchDataProcessor.pushDataToSinks(BatchDataProcessor.scala:80)
	at io.github.datacatering.datacaterer.core.generator.BatchDataProcessor.$anonfun$splitAndProcess$2(BatchDataProcessor.scala:65)
	at io.github.datacatering.datacaterer.core.generator.BatchDataProcessor.$anonfun$splitAndProcess$2$adapted(BatchDataProcessor.scala:34)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at io.github.datacatering.datacaterer.core.generator.BatchDataProcessor.splitAndProcess(BatchDataProcessor.scala:34)
	at io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor.generateDataWithResult(DataGeneratorProcessor.scala:57)
	at io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor.generateData(DataGeneratorProcessor.scala:39)
	at io.github.datacatering.datacaterer.core.generator.DataGeneratorProcessor.generateData(DataGeneratorProcessor.scala:33)
	at io.github.datacatering.datacaterer.core.plan.PlanProcessor$.executePlanWithConfig(PlanProcessor.scala:55)
	at io.github.datacatering.datacaterer.core.plan.PlanProcessor$.executePlan(PlanProcessor.scala:46)
	at io.github.datacatering.datacaterer.core.plan.PlanProcessor$.$anonfun$determineAndExecutePlan$8(PlanProcessor.scala:32)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at io.github.datacatering.datacaterer.core.plan.PlanProcessor$.$anonfun$determineAndExecutePlan$6(PlanProcessor.scala:32)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at io.github.datacatering.datacaterer.core.plan.PlanProcessor$.determineAndExecutePlan(PlanProcessor.scala:32)
	at io.github.datacatering.datacaterer.App$.main(App.scala:18)
	at io.github.datacatering.datacaterer.App.main(App.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1029)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: Lost task 0.0 in stage 39.0 (TID 36) (10a1c1e59037 executor driver): java.sql.BatchUpdateException: Batch entry 11 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC100004','2023-09-20 18:08:46.552+00','closed',53318.697515061074) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC100004) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2402)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2134)
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1491)
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1516)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:560)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC100004) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	... 24 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1036)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1034)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:901)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:70)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at io.github.datacatering.datacaterer.core.sink.SinkFactory.$anonfun$saveBatchData$1(SinkFactory.scala:72)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at io.github.datacatering.datacaterer.core.sink.SinkFactory.saveBatchData(SinkFactory.scala:73)
	at io.github.datacatering.datacaterer.core.sink.SinkFactory.saveData(SinkFactory.scala:45)
	... 38 more
Caused by: java.sql.BatchUpdateException: Batch entry 11 INSERT INTO account.balances ("account_number","create_time","account_status","balance") VALUES ('ACC100004','2023-09-20 18:08:46.552+00','closed',53318.697515061074) was aborted: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC100004) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2402)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2134)
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1491)
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1516)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:560)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "balances_account_number_key"
  Detail: Key (account_number)=(ACC100004) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	... 24 more

@pflooky
Copy link

pflooky commented Mar 16, 2024

That's okay, thanks for sharing 👍. I believe this has to do with existing data within your Postgres account.balance table. Currently, Data Caterer doesn't consider existing data when calculating unique values. It will only generate unique values for that particular data generation run. You can try truncate the table first then run again.

Maybe in later versions, I will include this feature but it can be difficult to achieve (i.e. expensive operation to read all data from the data source and then to generate unique values). I have this working for autoincrement fields given you can increment from the current maximum value, but have not found an easy way to generalise for all types of data.

@pflooky
Copy link

pflooky commented May 3, 2024

For reference, I have created the following issue to ensure that by default, globally unique values are generated.
#13

This will help resolve the problem you ran into since you would expect it to work without any further intervention.

@pflooky pflooky closed this as completed May 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants