Skip to content

Commit

Permalink
2614 kafka email and doc changes (#2615)
Browse files Browse the repository at this point in the history
* Additional warning text

* Reference alternate storage in failure email

* Update CHANGELOG
  • Loading branch information
rorymckinley authored Oct 30, 2024
1 parent 383c692 commit b05bc65
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ and this project adheres to

### Added

- Additional documentation and notification text relating to the importance of
alternate storage for Kafka triggers.
[#2614](https://github.com/OpenFn/lightning/issues/2614)

### Changed

- Enforcing MFA for a project can be enforced by the usage limiter
Expand Down
4 changes: 4 additions & 0 deletions DEPLOYMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ variable.

#### Persisting Failed Messages

**PLEASE NOTE: If alternate file storage is not enabled, messages that fail
to be persisted will not be retained by Lightning ans this can result in data
loss, if the Kafka cluster can not make these messages available again.**

If a Kafka message files to be persisted as a WorkOrder, Run and Dataclip, the
option exists to write the failed message to a location on the local file system.
If this option is enabled by setting `KAFKA_ALTERNATE_STORAGE_ENABLED`, then the
Expand Down
10 changes: 10 additions & 0 deletions lib/lightning/accounts/user_notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,22 @@ defmodule Lightning.Accounts.UserNotifier do
deliver(user, "Kafka trigger failure on #{workflow.name}", """
As of #{display_timestamp}, the Kafka trigger associated with the workflow `#{workflow.name}` (#{url(~p"/projects/#{workflow.project_id}/w/#{workflow.id}")}) has failed to persist at least one message.
#{alternate_storage_message(Lightning.Config.kafka_alternate_storage_enabled?())}
If you have access to the system logs, please look for entries containing 'Kafka Pipeline Error'.
OpenFn
""")
end

defp alternate_storage_message(true = _alternate_storage_enabled) do
"This Lightning instance has alternate storage enabled. This means that any messages that failed to persist will be stored in the location referenced by the KAFKA_ALTERNATE_STORAGE_FILE_PATH environment variable. These messages can be recovered by reprocessing them."
end

defp alternate_storage_message(false = _alternate_storage_enabled) do
"THIS LIGHTNING INSTANCE DOES NOT HAVE ALTERNATE STORAGE ENABLED, SO THESE FAILED MESSAGES CANNOT BE RECOVERED WITHOUT MAKING THEM AVAILABLE ON THE KAFKA CLUSTER AGAIN."
end

defp pluralize_with_s(1, string), do: string
defp pluralize_with_s(_integer, string), do: "#{string}s"
end
45 changes: 44 additions & 1 deletion test/lightning/accounts/user_notifier_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Lightning.Accounts.UserNotifierTest do
use Lightning.DataCase, async: true
use LightningWeb, :html

import Mox
import Swoosh.TestAssertions

alias Lightning.Accounts.{UserNotifier, User}
Expand Down Expand Up @@ -435,7 +436,11 @@ defmodule Lightning.Accounts.UserNotifierTest do
)
end

test "Kafka trigger failure" do
test "Kafka trigger failure - alternate storage disabled" do
stub(Lightning.MockConfig, :kafka_alternate_storage_enabled?, fn ->
false
end)

timestamp = DateTime.utc_now()

displayed_timestamp =
Expand All @@ -458,6 +463,44 @@ defmodule Lightning.Accounts.UserNotifierTest do
text_body: """
As of #{displayed_timestamp}, the Kafka trigger associated with the workflow `#{workflow.name}` (#{workflow_url}) has failed to persist at least one message.
THIS LIGHTNING INSTANCE DOES NOT HAVE ALTERNATE STORAGE ENABLED, SO THESE FAILED MESSAGES CANNOT BE RECOVERED WITHOUT MAKING THEM AVAILABLE ON THE KAFKA CLUSTER AGAIN.
If you have access to the system logs, please look for entries containing 'Kafka Pipeline Error'.
OpenFn
"""
)
end

test "Kafka trigger failure - alternate storage enabled" do
stub(Lightning.MockConfig, :kafka_alternate_storage_enabled?, fn ->
true
end)

timestamp = DateTime.utc_now()

displayed_timestamp =
timestamp
|> DateTime.truncate(:second)
|> DateTime.to_iso8601()

user = Lightning.AccountsFixtures.user_fixture()
workflow = insert(:workflow)

workflow_url =
LightningWeb.Endpoint
|> url(~p"/projects/#{workflow.project_id}/w/#{workflow.id}")

UserNotifier.send_trigger_failure_mail(user, workflow, timestamp)

assert_email_sent(
subject: "Kafka trigger failure on #{workflow.name}",
to: Swoosh.Email.Recipient.format(user),
text_body: """
As of #{displayed_timestamp}, the Kafka trigger associated with the workflow `#{workflow.name}` (#{workflow_url}) has failed to persist at least one message.
This Lightning instance has alternate storage enabled. This means that any messages that failed to persist will be stored in the location referenced by the KAFKA_ALTERNATE_STORAGE_FILE_PATH environment variable. These messages can be recovered by reprocessing them.
If you have access to the system logs, please look for entries containing 'Kafka Pipeline Error'.
OpenFn
Expand Down

0 comments on commit b05bc65

Please sign in to comment.