Skip to content

Commit

Permalink
feat: delete staging and load files from object storage post successf…
Browse files Browse the repository at this point in the history
…ul sync (#5428)
  • Loading branch information
shekhar-rudder authored Jan 23, 2025
1 parent aaae529 commit 134fbc0
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 48 deletions.
73 changes: 73 additions & 0 deletions integration_test/warehouse/warehouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,79 @@ func TestUploads(t *testing.T) {
})
}

func TestCleanupObjectStorageFiles(t *testing.T) {
t.Run("object storage files cleanup", func(t *testing.T) {
testcases := []struct {
name string
cleanupObjectStorageFiles bool
expectedFileCount int
}{
{
name: "should delete files",
cleanupObjectStorageFiles: true,
expectedFileCount: 0,
},
{
name: "should not delete files",
cleanupObjectStorageFiles: false,
expectedFileCount: 2,
},
}
for _, tc := range testcases {
db, minioResource, whClient := setupServer(t, false, func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].Destinations[0].Config["cleanupObjectStorageFiles"] = tc.cleanupObjectStorageFiles
}, nil)
ctx := context.Background()
events := 100
eventsPayload := strings.Join(lo.RepeatBy(events, func(int) string {
return fmt.Sprintf(`{"data":{"id":%q,"user_id":%q,"received_at":"2023-05-12T04:36:50.199Z"},"metadata":{"columns":{"id":"string","user_id":"string","received_at":"datetime"}, "table": "tracks"}}`,
uuid.New().String(),
uuid.New().String(),
)
}), "\n")
require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
WorkspaceID: workspaceID,
SourceID: sourceID,
DestinationID: destinationID,
Location: prepareStagingFile(t, ctx, minioResource, eventsPayload).ObjectName,
TotalEvents: events,
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
UseRudderStorage: false,
DestinationRevisionID: destinationID,
Schema: map[string]map[string]any{
"tracks": {
"id": "string",
"user_id": "string",
"received_at": "datetime",
},
},
}))
requireStagingFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
{A: "source_id", B: sourceID},
{A: "destination_id", B: destinationID},
{A: "status", B: succeeded},
}...)
requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
{A: "status", B: exportedData},
{A: "wh_uploads.source_id", B: sourceID},
{A: "wh_uploads.destination_id", B: destinationID},
{A: "wh_uploads.namespace", B: namespace},
}...)
requireUploadJobsCount(t, ctx, db, 1, []lo.Tuple2[string, any]{
{A: "source_id", B: sourceID},
{A: "destination_id", B: destinationID},
{A: "namespace", B: namespace},
{A: "status", B: exportedData},
}...)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
files, err := minioResource.Contents(ctx, "")
require.NoError(t, err)
require.Len(t, files, tc.expectedFileCount)
}
})
}

func TestDestinationTransformation(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)
Expand Down
97 changes: 49 additions & 48 deletions warehouse/internal/model/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,53 @@ func (destConfSetting) protected() {}
func (s destConfSetting) String() string { return string(s) }

var (
PreferAppendSetting DestinationConfigSetting = destConfSetting("preferAppend")
UseRudderStorageSetting DestinationConfigSetting = destConfSetting("useRudderStorage")
SecureSetting DestinationConfigSetting = destConfSetting("secure")
SkipVerifySetting DestinationConfigSetting = destConfSetting("skipVerify")
EnableExternalLocationSetting DestinationConfigSetting = destConfSetting("enableExternalLocation")
UseSTSTokensSetting DestinationConfigSetting = destConfSetting("useSTSTokens")
UseGlueSetting DestinationConfigSetting = destConfSetting("useGlue")
HostSetting DestinationConfigSetting = destConfSetting("host")
DatabaseSetting DestinationConfigSetting = destConfSetting("database")
UserSetting DestinationConfigSetting = destConfSetting("user")
PasswordSetting DestinationConfigSetting = destConfSetting("password")
PortSetting DestinationConfigSetting = destConfSetting("port")
SSLModeSetting DestinationConfigSetting = destConfSetting("sslMode")
ProjectSetting DestinationConfigSetting = destConfSetting("project")
CredentialsSetting DestinationConfigSetting = destConfSetting("credentials")
LocationSetting DestinationConfigSetting = destConfSetting("location")
CACertificateSetting DestinationConfigSetting = destConfSetting("caCertificate")
ClusterSetting DestinationConfigSetting = destConfSetting("cluster")
AWSAccessKeySetting DestinationConfigSetting = destConfSetting("accessKey")
AWSAccessSecretSetting DestinationConfigSetting = destConfSetting("accessKeyID")
AWSBucketNameSetting DestinationConfigSetting = destConfSetting("bucketName")
AWSPrefixSetting DestinationConfigSetting = destConfSetting("prefix")
MinioAccessKeyIDSetting DestinationConfigSetting = destConfSetting("accessKeyID")
MinioSecretAccessKeySetting DestinationConfigSetting = destConfSetting("secretAccessKey")
TimeWindowLayoutSetting DestinationConfigSetting = destConfSetting("timeWindowLayout")
PathSetting DestinationConfigSetting = destConfSetting("path")
TokenSetting DestinationConfigSetting = destConfSetting("token")
CatalogSetting DestinationConfigSetting = destConfSetting("catalog")
ExternalLocationSetting DestinationConfigSetting = destConfSetting("externalLocation")
UseIAMForAuthSetting DestinationConfigSetting = destConfSetting("useIAMForAuth")
IAMRoleARNForAuthSetting DestinationConfigSetting = destConfSetting("iamRoleARNForAuth")
ClusterIDSetting DestinationConfigSetting = destConfSetting("clusterId")
UseServerlessSetting DestinationConfigSetting = destConfSetting("useServerless")
WorkgroupNameSetting DestinationConfigSetting = destConfSetting("workgroupName")
ClusterRegionSetting DestinationConfigSetting = destConfSetting("clusterRegion")
StorageIntegrationSetting DestinationConfigSetting = destConfSetting("storageIntegration")
AccountSetting DestinationConfigSetting = destConfSetting("account")
WarehouseSetting DestinationConfigSetting = destConfSetting("warehouse")
RoleSetting DestinationConfigSetting = destConfSetting("role")
UseKeyPairAuthSetting DestinationConfigSetting = destConfSetting("useKeyPairAuth")
PrivateKeySetting DestinationConfigSetting = destConfSetting("privateKey")
PrivateKeyPassphraseSetting DestinationConfigSetting = destConfSetting("privateKeyPassphrase")
TableSuffixSetting DestinationConfigSetting = destConfSetting("tableSuffix")
SyncFrequencySetting DestinationConfigSetting = destConfSetting("syncFrequency")
SyncStartAtSetting DestinationConfigSetting = destConfSetting("syncStartAt")
ExcludeWindowSetting DestinationConfigSetting = destConfSetting("excludeWindow")
PartitionColumnSetting DestinationConfigSetting = destConfSetting("partitionColumn")
PartitionTypeSetting DestinationConfigSetting = destConfSetting("partitionType")
PreferAppendSetting DestinationConfigSetting = destConfSetting("preferAppend")
UseRudderStorageSetting DestinationConfigSetting = destConfSetting("useRudderStorage")
SecureSetting DestinationConfigSetting = destConfSetting("secure")
SkipVerifySetting DestinationConfigSetting = destConfSetting("skipVerify")
EnableExternalLocationSetting DestinationConfigSetting = destConfSetting("enableExternalLocation")
UseSTSTokensSetting DestinationConfigSetting = destConfSetting("useSTSTokens")
UseGlueSetting DestinationConfigSetting = destConfSetting("useGlue")
HostSetting DestinationConfigSetting = destConfSetting("host")
DatabaseSetting DestinationConfigSetting = destConfSetting("database")
UserSetting DestinationConfigSetting = destConfSetting("user")
PasswordSetting DestinationConfigSetting = destConfSetting("password")
PortSetting DestinationConfigSetting = destConfSetting("port")
SSLModeSetting DestinationConfigSetting = destConfSetting("sslMode")
ProjectSetting DestinationConfigSetting = destConfSetting("project")
CredentialsSetting DestinationConfigSetting = destConfSetting("credentials")
LocationSetting DestinationConfigSetting = destConfSetting("location")
CACertificateSetting DestinationConfigSetting = destConfSetting("caCertificate")
ClusterSetting DestinationConfigSetting = destConfSetting("cluster")
AWSAccessKeySetting DestinationConfigSetting = destConfSetting("accessKey")
AWSAccessSecretSetting DestinationConfigSetting = destConfSetting("accessKeyID")
AWSBucketNameSetting DestinationConfigSetting = destConfSetting("bucketName")
AWSPrefixSetting DestinationConfigSetting = destConfSetting("prefix")
MinioAccessKeyIDSetting DestinationConfigSetting = destConfSetting("accessKeyID")
MinioSecretAccessKeySetting DestinationConfigSetting = destConfSetting("secretAccessKey")
TimeWindowLayoutSetting DestinationConfigSetting = destConfSetting("timeWindowLayout")
PathSetting DestinationConfigSetting = destConfSetting("path")
TokenSetting DestinationConfigSetting = destConfSetting("token")
CatalogSetting DestinationConfigSetting = destConfSetting("catalog")
ExternalLocationSetting DestinationConfigSetting = destConfSetting("externalLocation")
UseIAMForAuthSetting DestinationConfigSetting = destConfSetting("useIAMForAuth")
IAMRoleARNForAuthSetting DestinationConfigSetting = destConfSetting("iamRoleARNForAuth")
ClusterIDSetting DestinationConfigSetting = destConfSetting("clusterId")
UseServerlessSetting DestinationConfigSetting = destConfSetting("useServerless")
WorkgroupNameSetting DestinationConfigSetting = destConfSetting("workgroupName")
ClusterRegionSetting DestinationConfigSetting = destConfSetting("clusterRegion")
StorageIntegrationSetting DestinationConfigSetting = destConfSetting("storageIntegration")
AccountSetting DestinationConfigSetting = destConfSetting("account")
WarehouseSetting DestinationConfigSetting = destConfSetting("warehouse")
RoleSetting DestinationConfigSetting = destConfSetting("role")
UseKeyPairAuthSetting DestinationConfigSetting = destConfSetting("useKeyPairAuth")
PrivateKeySetting DestinationConfigSetting = destConfSetting("privateKey")
PrivateKeyPassphraseSetting DestinationConfigSetting = destConfSetting("privateKeyPassphrase")
TableSuffixSetting DestinationConfigSetting = destConfSetting("tableSuffix")
SyncFrequencySetting DestinationConfigSetting = destConfSetting("syncFrequency")
SyncStartAtSetting DestinationConfigSetting = destConfSetting("syncStartAt")
ExcludeWindowSetting DestinationConfigSetting = destConfSetting("excludeWindow")
PartitionColumnSetting DestinationConfigSetting = destConfSetting("partitionColumn")
PartitionTypeSetting DestinationConfigSetting = destConfSetting("partitionType")
CleanupObjectStorageFilesSetting DestinationConfigSetting = destConfSetting("cleanupObjectStorageFiles")
)
40 changes: 40 additions & 0 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

Expand Down Expand Up @@ -363,6 +365,9 @@ func (job *UploadJob) run() (err error) {
if err = job.exportData(); err != nil {
break
}
if err = job.cleanupObjectStorageFiles(); err != nil {
break
}
newStatus = nextUploadState.completed

default:
Expand Down Expand Up @@ -427,6 +432,41 @@ func (job *UploadJob) run() (err error) {
return nil
}

func (job *UploadJob) cleanupObjectStorageFiles() error {
cleanupObjectStorageFiles := job.warehouse.GetBoolDestinationConfig(model.CleanupObjectStorageFilesSetting)
if !cleanupObjectStorageFiles {
return nil
}
destination := job.warehouse.Destination
storageProvider := whutils.ObjectStorageType(destination.DestinationDefinition.Name, destination.Config, job.upload.UseRudderStorage)
fm, err := filemanager.New(&filemanager.Settings{
Provider: storageProvider,
Config: misc.GetObjectStorageConfig(misc.ObjectStorageOptsT{
Provider: storageProvider,
Config: destination.Config,
UseRudderStorage: job.upload.UseRudderStorage,
WorkspaceID: job.upload.WorkspaceID,
}),
})
if err != nil {
return fmt.Errorf("creating file manager: %w", err)
}
loadingFiles, err := job.loadFilesRepo.GetByStagingFiles(job.ctx, job.stagingFileIDs)
if err != nil {
return fmt.Errorf("fetching loading files: %w", err)
}
stagingKeysToDel := lo.Map(job.stagingFiles, func(file *model.StagingFile, _ int) string {
return fm.GetDownloadKeyFromFileLocation(file.Location)
})
loadingKeysToDel := lo.Map(loadingFiles, func(file model.LoadFile, _ int) string {
return fm.GetDownloadKeyFromFileLocation(file.Location)
})
if err = fm.Delete(job.ctx, append(stagingKeysToDel, loadingKeysToDel...)); err != nil {
return fmt.Errorf("deleting files from object storage: %w", err)
}
return nil
}

// CanAppend returns true if:
// * the source is not an ETL source
// * the source is not a replay source
Expand Down

0 comments on commit 134fbc0

Please sign in to comment.