-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ccl/changefeedccl: add compression options for webhook sink #138872
base: master
Are you sure you want to change the base?
ccl/changefeedccl: add compression options for webhook sink #138872
Conversation
It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR? Thank you for contributing to CockroachDB. Please ensure you have followed the guidelines for creating a PR. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
90f7858
to
9ab5e87
Compare
Thank you for updating your pull request. My owl senses detect your PR is good for review. Please keep an eye out for any test failures in CI. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
dc150e3
to
3c59821
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @massimo-ua! I have a few nits and a few questions, but this looks very good overall.
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
pkg/ccl/changefeedccl/compression.go
Outdated
// since we are using decompression only for reading error response body, we can use default reader | ||
return pgzip.NewReader(src) | ||
case sinkCompressionZstd: | ||
// zstd reader does not implement io.Closer interface, so we need to wrap it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The zstd reader does implement Close(), which we should call.
It may also be worth caching the readers instead of making a new one every time (see my next comment on doing the same for writers)... but that might be unnecessary for this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
zstd reader implements func (d *[Decoder] Close()
void
but io.ReadCloser
requires
func (io.Closer) Close() error
with error
that's why I had to wrap it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but your wrapper does not call the void Close method, where it should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asg0451 Thanks for the review. I'll work through the comments and update you once I'm done. |
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
1 similar comment
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks. Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
dace026
to
4866761
Compare
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
1 similar comment
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
622e518
to
e8616aa
Compare
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
I see that in the webhook sink
|
That's a good idea, but let's leave it out of here just to keep the scope down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know when it's ready for review again (just commenting so this gets removed from my assigned reviews list for now)
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
cb4fa0f
to
505533e
Compare
@asg0451 Please review recent changes |
@@ -887,6 +887,7 @@ func (f *cloudStorageSinkFile) flushToStorage( | |||
if err := f.codec.Close(); err != nil { | |||
return err | |||
} | |||
f.codec = nil // Set to nil to prevent double close |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reasoning behind this? did this issue come up in the tests? maybe it's worthwhile to make the wrappers more robust to double closing if this is an issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this came up during the tests. TestCloudStorageSinkFastGzip was panicking with "invalid memory address or nil pointer dereference" while attempting to invoke (*encWrapper).Close. This helped to fix the issue. As far as I understand, the issue is with cloudStorageSinkFile and its codec reference management. It closes the writer but doesn't reset the reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need some guidance here. Not sure we can address that on an encWrapper level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could have a closed bool
flag in the wrappers, set it on Close, and panic/err if it's used while closed. Reset could reset it. This could also prevent incorrect usage (like using it after closing it, which would be bad)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you feel about returning nil in case somebody is trying to close a closed wrapper but return error when Write or Read is called?
Returning nil on multiple attempts to Close the same reader is a default behaviour for gzip Writer and Reader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added bool flag to indicate the underlying encoder/decoder is closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds good
2648d6e
to
3889c4f
Compare
pkg/ccl/changefeedccl/compression.go
Outdated
// is recycled after closing rather than being discarded. | ||
func (e *encWrapper) Close() error { | ||
var err error | ||
if fErr := e.encoder.Flush(); fErr != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to call Flush here. Did we do it before? is Close not enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. There is no need to call flush since according to the docs close flushes unwritten data to reader. I'll remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -887,6 +887,7 @@ func (f *cloudStorageSinkFile) flushToStorage( | |||
if err := f.codec.Close(); err != nil { | |||
return err | |||
} | |||
f.codec = nil // Set to nil to prevent double close |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could have a closed bool
flag in the wrappers, set it on Close, and panic/err if it's used while closed. Reset could reset it. This could also prevent incorrect usage (like using it after closing it, which would be bad)
230a0d8
to
ed1168d
Compare
Release note (sql change): Added compression support for changefeed webhook sinks. This reduces network bandwidth and storage usage, improving performance and lowering costs. Users can enable compression by setting the compression=<algorithm> option. Supported algorithms are gzip and zstd. Epic: https://cockroachlabs.atlassian.net/browse/CRDB-39392 Resolved: https://cockroachlabs.atlassian.net/browse/CRDB-42915 Issue: cockroachdb#132279
ed1168d
to
5bbc73a
Compare
I noticed the race issues during Bazel extended CI. I don't believe there is a connection to the changes I've made. The race appears when spanInner.isNoop() and Span.reset() in pkg/utils/tracing are attempting to read and write to a shared resource. Do you think I should ignore this? |
Feel free to ignore bazel extended ci. |
Then that's it. There are no more changes left in this PR. Thank you for the review. Let me know if there is anything |
Added gzip compression option to the changefeed webhook sink to reduce both network bytes and storage bytes for the customer, leading to cost savings and potentially better performance. A minimum configuration to support for gzip.
Jira issue: CRDB-42915
Epic CRDB-39392
Issue #132279