Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a Delta table doctor that analyzes and health and wellness of a Delta table #7

Open
MrPowers opened this issue Mar 8, 2023 · 9 comments

Comments

@MrPowers
Copy link
Collaborator

MrPowers commented Mar 8, 2023

A levi.delta_doctor(delta_table) command could be a nice way for users to help identify issues in their Delta table that could cause slow query performance.

There are several known problems that can cause poor performance of Delta tables:

  • too many small files
  • large files
  • file stats not being collected on the right columns/file stats missing for certain files
  • tables that are over-partitioned
  • tables that are not Z ORDERed
  • tables that should have constraints, but do not

The levi.delta_doctor(delta_table) could return a string with the following warnings:

  • SmallFileWarning: Your table contains 456 files with less than 1MB of data and you could consider optimizing to compact the small files
  • LargeFileWarning: Your table contains 32 files with more than 1.5GB of data. You should split up these files.
  • FileStatsWarning: You are only collecting stats for col1 and col2 in some files.

We should make it really easy for users to see if there are any obvious problems in their Delta table. We will ideally give them really easy solutions to fix these problems as well!

@puneetsharma04
Copy link

Hello @MrPowers : As per my understanding, we can create a code with below expectation:
The DeltaDoctor class takes a delta_table argument and has three methods (check_small_files, check_large_files, and check_file_stats) that each check for a specific problem and return a warning message if the problem is found. The diagnose method runs all three checks and returns a string with all the warning messages, or "No issues found" if no issues are found.
Let me know if that matches your expectation and code looks something like the below:

import os
from pyspark.sql import SparkSession

class DeltaDoctor:
    def __init__(self, delta_table):
        self.spark = SparkSession.builder.getOrCreate()
        self.delta_table = delta_table

    def check_small_files(self, threshold=1):
        small_files = self.spark.sql(f"DESCRIBE DETAIL {self.delta_table}").filter("size < 1000000")
        count = small_files.count()
        if count > 0:
            return f"SmallFileWarning: Your table contains {count} files with less than {threshold}MB of data and you could consider optimizing to compact the small files"
        else:
            return None

    def check_large_files(self, threshold=1500):
        large_files = self.spark.sql(f"DESCRIBE DETAIL {self.delta_table}").filter("size > 1500000000")
        count = large_files.count()
        if count > 0:
            return f"LargeFileWarning: Your table contains {count} files with more than {threshold}MB of data. You should split up these files."
        else:
            return None

    def check_file_stats(self, columns=["col1", "col2"]):
        file_stats = self.spark.sql(f"DESCRIBE DETAIL {self.delta_table}").select("stats")
        num_files = file_stats.count()
        bad_files = file_stats.filter((file_stats.stats.isNull()) | (~file_stats.stats.isSet(columns)))
        num_bad_files = bad_files.count()
        if num_bad_files > 0:
            return f"FileStatsWarning: You are only collecting stats for {columns} in {num_bad_files} out of {num_files} files."
        else:
            return None

    def diagnose(self):
        results = []
        small_files_result = self.check_small_files()
        if small_files_result:
            results.append(small_files_result)
        large_files_result = self.check_large_files()
        if large_files_result:
            results.append(large_files_result)
        file_stats_result = self.check_file_stats()
        if file_stats_result:
            results.append(file_stats_result)
        if len(results) == 0:
            return "No issues found"
        else:
            return "\n".join(results)
delta_table = "my_delta_table"
doctor = DeltaDoctor(delta_table)
warnings = doctor.diagnose()
print(warnings)

@MrPowers
Copy link
Collaborator Author

@puneetsharma04 - thanks for the suggestion, but this repo does not depend on Spark, so we'll need a solution that does not use PySpark. We'll need to use these APIs.

@puneetsharma04
Copy link

@MrPowers : So you mean, we have to use delta-rs Python API to access the Delta Lake table and perform the necessary checks? then may be the code like below can work.

import delta
import os

class DeltaDoctor:
    def __init__(self, delta_table):
        self.delta_table = delta_table

    def check_small_files(self, threshold=1):
        table = delta.DeltaTable.forPath(spark, self.delta_table)
        small_files = table.files.filter(lambda f: f.size < 1000000)
        count = small_files.count()
        if count > 0:
            return f"SmallFileWarning: Your table contains {count} files with less than {threshold}MB of data and you could consider optimizing to compact the small files"
        else:
            return None

    def check_large_files(self, threshold=1500):
        table = delta.DeltaTable.forPath(spark, self.delta_table)
        large_files = table.files.filter(lambda f: f.size > 1500000000)
        count = large_files.count()
        if count > 0:
            return f"LargeFileWarning: Your table contains {count} files with more than {threshold}MB of data. You should split up these files."
        else:
            return None

    def check_file_stats(self, columns=["col1", "col2"]):
        table = delta.DeltaTable.forPath(spark, self.delta_table)
        num_files = table.history().count()
        bad_files = table.history().filter((table.history().stats.isNull()) | (~table.history().stats.isSet(columns)))
        num_bad_files = bad_files.count()
        if num_bad_files > 0:
            return f"FileStatsWarning: You are only collecting stats for {columns} in {num_bad_files} out of {num_files} files."
        else:
            return None

    def diagnose(self):
        results = []
        small_files_result = self.check_small_files()
        if small_files_result:
            results.append(small_files_result)
        large_files_result = self.check_large_files()
        if large_files_result:
            results.append(large_files_result)
        file_stats_result = self.check_file_stats()
        if file_stats_result:
            results.append(file_stats_result)
        if len(results) == 0:
            return "No issues found"
        else:
            return "\n".join(results)

delta_table = "my_delta_table"
doctor = DeltaDoctor(delta_table)
warnings = doctor.diagnose()
print(warnings)

@MrPowers
Copy link
Collaborator Author

@puneetsharma04 - this looks great. Really good work. Can you submit a PR?

@jeremyjordan
Copy link
Contributor

One note on the reference implementation above: would it be possible to return the information in a little more structured format? Even just a list of warnings would be better than a single string IMO. For example, I might be interested in periodically running diagnose() on a set of Delta Tables and would want to have some ability to do something like:

doctor = DeltaDoctor(delta_table)
warnings = doctor.diagnose()
if len(warnings) > 0:
  send_alert(warnings)  # pseudocode example

And if we made classes for these warnings SmallFileWarning, LargeFileWarning, etc. we could also make it easy to do something like:

warnings = doctor.diagnose(ignore=[SmallFileWarning,])

to filter out warnings to we may want to explicitly ignore as the list of health checks grows over time.

@puneetsharma04
Copy link

@jeremyjordan : Thanks for the suggestion.
@MrPowers : I was trying to test the existing code in Pycharm.
However encountered with below error, is it the an existing issue with the deltalake API or any other dependency i would need to install in the module?

deltalake.PyDeltaTableError: Not a Delta table: No snapshot or version 0 found, perhaps /Users/puneet_sharma1/Documents/GitHub/levi/tests/tests/reader_tests/generated/basic_append/delta/ is an empty dir?

@MrPowers
Copy link
Collaborator Author

MrPowers commented Apr 4, 2023

@jeremyjordan - yes, I like the idea of returning results in a structured format. That's pretty much always my preference. Thanks for calling that out.

@MrPowers
Copy link
Collaborator Author

MrPowers commented Apr 4, 2023

@puneetsharma04 - looks like you're using this path: /Users/puneet_sharma1/Documents/GitHub/levi/tests/tests/reader_tests/generated/basic_append/delta/

You should be using this path: /Users/puneet_sharma1/Documents/GitHub/levi/tests/reader_tests/generated/basic_append/delta/

Let me know if that fixes your issue!

@puneetsharma04
Copy link

puneetsharma04 commented Apr 4, 2023

@MrPowers : You are right. However i haven't made any changes to code. Should i provide the full path and change the code or any other work around for it ? It like it creates a test folder (/Users/puneet_sharma1/Documents/GitHub/levi/tests/tests/reader_tests/generated/basic_append/delta/) on its own on this path. I am not sure why this kind of behaviour is there.
its failing on
"delta_table = DeltaTable("./tests/reader_tests/generated/basic_append/delta")"
However if i use
"delta_table = **DeltaTable("../tests/reader_tests/generated/basic_append/delta")"**
its passed or successful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants