diff --git a/README.md b/README.md index 31fdf53..292d8b0 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,11 @@ The dataMigrate directly reads data from the TSM file of InfluxDB and writes the ### requirements -Go version >1.16 +Go version > 1.16 Setting Environment Variables -``` +```bash > export GOPATH=/path/to/dir > export GO111MODULE=on > export GONOSUMDB=* @@ -22,39 +22,110 @@ Setting Environment Variables ### compile -``` +```bash > bash build.sh ``` -### data migration +## data migration +Before migrating, you need to create the corresponding database and RP in openGemini. (This behavior may be fixed in a future release.) + +```bash +> dataMigrate run --from dir/to/influxdb/data --to ip:port --database dbname ``` -> dataMigrate --from path/to/tsm-file --to ip:port --database dbname + +**WARNING**: When using this tool, please do not migrate data without shutting down InfluxDB if possible; otherwise, some +unknown problems may occur. To ensure that data is as complete as possible after migration, keep the empty write load +running before shutting down InfluxDB and wait for data in the cache to complete disk dumping (10 minutes by default). + + +### example 1: Migrate all databases + +example influxdb data dir: `/var/lib/influxdb/data` + +```bash +> ls -l /var/lib/influxdb/data +total 0 +drwx------ 4 root root 128B 12 6 14:58 _internal +drwx------ 4 root root 128B 12 6 14:59 db0 +drwx------ 4 root root 128B 12 8 09:01 db1 ``` +We migrate `internal` db + +```bash +> ./dataMigrate run --from /var/lib/influxdb/data --to ip:port --database _internal + +2023/12/08 14:17:48 Data migrate tool starting +2023/12/08 14:17:48 Debug mode is enabled +2023/12/08 14:17:48 Searching for tsm files to migrate +2023/12/08 14:17:48 Writing out data from shard _internal/monitor/1, [2/4]... +2023/12/08 14:17:48 Writing out data from shard db0/autogen/2, [4/4]... +2023/12/08 14:17:48 Writing out data from shard _internal/monitor/3, [3/4]... +2023/12/08 14:17:48 Writing out data from shard db1/autogen/5, [1/4]... +2023/12/08 14:17:48 Dealing file: /Users/shilinlee/.influxdb/data/_internal/monitor/1/000000001-000000001.tsm +2023/12/08 14:17:48 Dealing file: /Users/shilinlee/.influxdb/data/_internal/monitor/3/000000001-000000001.tsm +2023/12/08 14:17:48 Dealing file: /Users/shilinlee/.influxdb/data/db1/autogen/5/000000001-000000001.tsm +2023/12/08 14:17:48 Dealing file: /Users/shilinlee/.influxdb/data/db0/autogen/2/000000001-000000001.tsm +2023/12/08 14:17:48 Shard db0/autogen/2 takes 1.703084ms to migrate, with 1 tags, 2 fields, 2 rows read +2023/12/08 14:17:48 Shard db1/autogen/5 takes 2.076959ms to migrate, with 5 tags, 1 fields, 3 rows read +2023/12/08 14:17:48 Shard _internal/monitor/1 takes 467.09275ms to migrate, with 49 tags, 115 fields, 34098 rows read +2023/12/08 14:17:48 Shard _internal/monitor/3 takes 475.290791ms to migrate, with 49 tags, 115 fields, 22443 rows read +2023/12/08 14:17:48 Total: takes 477.482791ms to migrate, with 54 tags, 118 fields, 56546 rows read. ``` -Usage: dataMigrate [flags] - -database string - Optional: the database to read - -end string - Optional: the end time to read (RFC3339 format) - -from string - Data storage path (default "/var/lib/Influxdb/data") - -retention string - Optional: the retention policy to read (requires -database) - -start string - Optional: the start time to read (RFC3339 format) - -batch int - Optional: specify batch size for inserting lines (default 1000) - -mode string - Optional: whether to enable debug log or not (set as "Debug" to enable it) - -to string - Destination host to write data to (default "127.0.0.1:8086",which is the openGemini service default address) +### example 2: Migrate the specified database + +```bash +> ./dataMigrate run --from /var/lib/influxdb/data --to ip:port --database db0 + +2023/12/08 14:31:47 Data migrate tool starting +2023/12/08 14:31:47 Debug mode is enabled +2023/12/08 14:31:47 Searching for tsm files to migrate +2023/12/08 14:31:47 Writing out data from shard db0/autogen/2, [1/1]... +2023/12/08 14:31:47 Dealing file: /Users/shilinlee/.influxdb/data/db0/autogen/2/000000001-000000001.tsm +2023/12/08 14:31:47 Shard db0/autogen/2 takes 45.883209ms to migrate, with 1 tags, 2 fields, 2 rows read +2023/12/08 14:31:47 Total: takes 48.502792ms to migrate, with 1 tags, 2 fields, 2 rows read. ``` -**Notice**: When using this tool, please do not migrate data without shutting down InfluxDB if possible; otherwise, some -unknown problems may occur. To ensure that data is as complete as possible after migration, keep the empty write load -running before shutting down InfluxDB and wait for data in the cache to complete disk dumping (10 minutes by default). +### example 3: Migrate the specified database with auth and https + +```bash +> ./dataMigrate run --from /var/lib/influxdb/data --to ip:port --database db0 \ + --ssl --unsafeSsl --username rwusr --password This@123 + +2023/12/08 14:31:47 Data migrate tool starting +2023/12/08 14:31:47 Debug mode is enabled +2023/12/08 14:31:47 Searching for tsm files to migrate +2023/12/08 14:31:47 Writing out data from shard db0/autogen/2, [1/1]... +2023/12/08 14:31:47 Dealing file: /Users/shilinlee/.influxdb/data/db0/autogen/2/000000001-000000001.tsm +2023/12/08 14:31:47 Shard db0/autogen/2 takes 45.883209ms to migrate, with 1 tags, 2 fields, 2 rows read +2023/12/08 14:31:47 Total: takes 48.502792ms to migrate, with 1 tags, 2 fields, 2 rows read. +``` + + +## For more help + +```bash +Reads TSM files into InfluxDB line protocol format and write into openGemini + +Usage: + run [flags] + +Flags: + --batch int Optional: specify batch size for inserting lines (default 1000) + --database string Optional: the database to read + --debug Optional: whether to enable debug log or not + --end string Optional: the end time to read (RFC3339 format) + -f, --from string Influxdb Data storage path. See your influxdb config item: data.dir (default "/var/lib/influxdb/data") + -h, --help help for run + -p, --password string Optional: The password to connect to the openGemini cluster. + --retention string Optional: the retention policy to read (required -database) + --ssl Optional: Use https for requests. + --start string Optional: the start time to read (RFC3339 format) + -t, --to string Destination host to write data to (default "127.0.0.1:8086") + --unsafeSsl Optional: Set this when connecting to the cluster using https and not use SSL verification. + -u, --username string Optional: The username to connect to the openGemini cluster. +``` **Welcome to add more features.** diff --git a/cmd/root.go b/cmd/root.go new file mode 100644 index 0000000..cbfc8bc --- /dev/null +++ b/cmd/root.go @@ -0,0 +1,44 @@ +package cmd + +import ( + "github.com/openGemini/dataMigrate/src" + "github.com/spf13/cobra" +) + +var ( + RootCmd *cobra.Command // represents the cluster command + opt src.DataMigrateOptions +) + +func init() { + RootCmd = &cobra.Command{ + Use: "run", + Short: "Reads TSM files into InfluxDB line protocol format and write into openGemini", + SilenceUsage: true, + SilenceErrors: true, + RunE: func(cmd *cobra.Command, args []string) error { + migrateCmd := src.NewDataMigrateCommand(&opt) + if err := migrateCmd.Run(); err != nil { + return err + } + return nil + }, + } + + RootCmd.Flags().StringVarP(&opt.Username, "username", "u", "", "Optional: The username to connect to the openGemini cluster.") + RootCmd.Flags().StringVarP(&opt.Password, "password", "p", "", "Optional: The password to connect to the openGemini cluster.") + RootCmd.Flags().StringVarP(&opt.DataDir, "from", "f", "/var/lib/influxdb/data", "Influxdb Data storage path. See your influxdb config item: data.dir") + RootCmd.Flags().StringVarP(&opt.Out, "to", "t", "127.0.0.1:8086", "Destination host to write data to") + RootCmd.Flags().StringVarP(&opt.Database, "database", "", "", "Optional: the database to read") + RootCmd.Flags().StringVarP(&opt.RetentionPolicy, "retention", "", "", "Optional: the retention policy to read (required -database)") + RootCmd.Flags().StringVarP(&opt.Start, "start", "", "", "Optional: the start time to read (RFC3339 format)") + RootCmd.Flags().StringVarP(&opt.End, "end", "", "", "Optional: the end time to read (RFC3339 format)") + RootCmd.Flags().IntVarP(&opt.BatchSize, "batch", "", 1000, "Optional: specify batch size for inserting lines") + RootCmd.Flags().BoolVarP(&opt.Debug, "debug", "", false, "Optional: whether to enable debug log or not") + RootCmd.Flags().BoolVarP(&opt.Ssl, "ssl", "", false, "Optional: Use https for requests.") + RootCmd.Flags().BoolVarP(&opt.UnsafeSsl, "unsafeSsl", "", false, "Optional: Set this when connecting to the cluster using https and not use SSL verification.") +} + +func Execute() error { + return RootCmd.Execute() +} diff --git a/dataMigrate b/dataMigrate new file mode 100755 index 0000000..b31d6a5 Binary files /dev/null and b/dataMigrate differ diff --git a/go.mod b/go.mod index 9cf1c5c..90baa82 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/influxdata/influxdb v1.8.0 github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab github.com/pkg/errors v0.8.1 + github.com/spf13/cobra v0.0.3 go.uber.org/atomic v1.3.2 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.8.0 // indirect diff --git a/go.sum b/go.sum index 7a82053..5c73097 100644 --- a/go.sum +++ b/go.sum @@ -114,6 +114,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/flux v0.65.0 h1:57tk1Oo4gpGIDbV12vUAPCMtLtThhaXzub1XRIuqv6A= github.com/influxdata/flux v0.65.0/go.mod h1:BwN2XG2lMszOoquQaFdPET8FRQfrXiZsWmcMO9rkaVY= @@ -216,7 +217,9 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/main.go b/main.go index f116c8e..947a72b 100644 --- a/main.go +++ b/main.go @@ -15,26 +15,16 @@ limitations under the License. package main import ( - "os" - + "github.com/openGemini/dataMigrate/cmd" "github.com/openGemini/dataMigrate/src" + "os" ) func main() { defer src.Logger.Close() - src.Logger.LogString("Data migrate tool starting", src.TOCONSOLE, src.LEVEL_INFO) - if err := Run(os.Args[1:]...); err != nil { + err := cmd.Execute() + if err != nil { src.Logger.LogError(err) os.Exit(1) } } - -func Run(args ...string) error { - if len(args) > 0 { - cmd := src.NewDataMigrateCommand() - if err := cmd.Run(args...); err != nil { - return err - } - } - return nil -} diff --git a/src/cursor.go b/src/cursor.go index 81efe75..2f82fa3 100644 --- a/src/cursor.go +++ b/src/cursor.go @@ -341,8 +341,9 @@ func (s *Scanner) writeBatches(c client.Client, cmd Migrator) error { for { if flag { bp, _ = client.NewBatchPoints(client.BatchPointsConfig{ - Database: cmd.getDatabase(), - Precision: "ns", + Database: cmd.getDatabase(), + RetentionPolicy: cmd.getRetentionPolicy(), + Precision: "ns", }) flag = false } diff --git a/src/dataMigrate.go b/src/dataMigrate.go index ba530e0..a781703 100644 --- a/src/dataMigrate.go +++ b/src/dataMigrate.go @@ -16,14 +16,11 @@ package src import ( "bytes" "context" - "flag" "fmt" - "github.com/influxdata/influxdb/models" - "github.com/pkg/errors" - "go.uber.org/atomic" - "golang.org/x/sync/errgroup" "io" "math" + "net/http" + _ "net/http/pprof" "os" "path/filepath" "runtime" @@ -33,10 +30,11 @@ import ( "sync" "time" - "net/http" - _ "net/http/pprof" - + "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb/engine/tsm1" + "github.com/pkg/errors" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" ) // escape set for tags @@ -82,13 +80,7 @@ type DataMigrateCommand struct { Stderr io.Writer Stdout io.Writer - dataDir string - out string - database string - retentionPolicy string - startTime int64 - endTime int64 - batchSize int + opt *DataMigrateOptions manifest []fileGroupInfo tsmFiles map[string][]string @@ -99,11 +91,13 @@ type DataMigrateCommand struct { } // NewDataMigrateCommand returns a new instance of DataMigrateCommand. -func NewDataMigrateCommand() *DataMigrateCommand { +func NewDataMigrateCommand(opt *DataMigrateOptions) *DataMigrateCommand { return &DataMigrateCommand{ Stderr: os.Stderr, Stdout: os.Stdout, + opt: opt, + manifest: make([]fileGroupInfo, 0), tsmFiles: make(map[string][]string), shardGroups: make([]shardGroupInfo, 0), @@ -112,68 +106,51 @@ func NewDataMigrateCommand() *DataMigrateCommand { } // Run executes the command. -func (cmd *DataMigrateCommand) Run(args ...string) error { - var start, end string - var debug string - flag.StringVar(&cmd.dataDir, "from", "/var/lib/influxdb/data", "Data storage path") - flag.StringVar(&cmd.out, "to", "127.0.0.1:8086", "Destination host to write data to") - flag.StringVar(&cmd.database, "database", "", "Optional: the database to read") - flag.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to read (requires -database)") - flag.StringVar(&start, "start", "", "Optional: the start time to read (RFC3339 format)") - flag.StringVar(&end, "end", "", "Optional: the end time to read (RFC3339 format)") - flag.StringVar(&debug, "mode", "", "Optional: whether to enable debug log or not") - flag.IntVar(&cmd.batchSize, "batch", 1000, "Optional: specify batch size for inserting lines") - - flag.Usage = func() { - fmt.Fprintf(cmd.Stdout, "Reads TSM files into InfluxDB line protocol format and insert into openGemini\n\n") - fmt.Fprintf(cmd.Stdout, "Usage: %s [flags]\n\n", filepath.Base(os.Args[0])) - flag.PrintDefaults() - } - - flag.Parse() - - // write params to log - logger.LogString("Got param \"from\": "+cmd.dataDir, TOLOGFILE, LEVEL_INFO) - logger.LogString("Got param \"to\": "+cmd.out, TOLOGFILE, LEVEL_INFO) - logger.LogString("Got param \"database\": "+cmd.database, TOLOGFILE, LEVEL_INFO) - logger.LogString("Got param \"retention\": "+cmd.retentionPolicy, TOLOGFILE, LEVEL_INFO) - logger.LogString("Got param \"start\": "+start, TOLOGFILE, LEVEL_INFO) - logger.LogString("Got param \"end\": "+end, TOLOGFILE, LEVEL_INFO) - logger.LogString("Got param \"batch\": "+strconv.Itoa(cmd.batchSize), TOLOGFILE, LEVEL_INFO) - +func (cmd *DataMigrateCommand) Run() error { // set defaults - if start != "" { - s, err := time.Parse(time.RFC3339, start) + if cmd.opt.Start != "" { + s, err := time.Parse(time.RFC3339, cmd.opt.Start) if err != nil { return err } - cmd.startTime = s.UnixNano() + cmd.opt.StartTime = s.UnixNano() } else { - cmd.startTime = math.MinInt64 + cmd.opt.StartTime = math.MinInt64 } - if end != "" { - e, err := time.Parse(time.RFC3339, end) + if cmd.opt.End != "" { + e, err := time.Parse(time.RFC3339, cmd.opt.End) if err != nil { return err } - cmd.endTime = e.UnixNano() + cmd.opt.EndTime = e.UnixNano() } else { // set end time to max if it is not set. - cmd.endTime = math.MaxInt64 + cmd.opt.EndTime = math.MaxInt64 } if err := cmd.validate(); err != nil { return err } + logger.LogString("Data migrate tool starting", TOCONSOLE, LEVEL_INFO) + + // write params to log + logger.LogString("Got param \"from\": "+cmd.opt.DataDir, TOLOGFILE, LEVEL_INFO) + logger.LogString("Got param \"to\": "+cmd.opt.Out, TOLOGFILE, LEVEL_INFO) + logger.LogString("Got param \"database\": "+cmd.opt.Database, TOLOGFILE, LEVEL_INFO) + logger.LogString("Got param \"retention\": "+cmd.opt.RetentionPolicy, TOLOGFILE, LEVEL_INFO) + logger.LogString("Got param \"start\": "+cmd.opt.Start, TOLOGFILE, LEVEL_INFO) + logger.LogString("Got param \"end\": "+cmd.opt.End, TOLOGFILE, LEVEL_INFO) + logger.LogString("Got param \"batch\": "+strconv.Itoa(cmd.opt.BatchSize), TOLOGFILE, LEVEL_INFO) + gs := NewGeminiService(cmd) - shardGroupDuration, err := gs.GetShardGroupDuration(cmd.database, "autogen") + shardGroupDuration, err := gs.GetShardGroupDuration(cmd.opt.Database, "autogen") if err != nil { return err } cmd.shardGroupDuration = shardGroupDuration - if debug == "debug" || debug == "Debug" || debug == "DEBUG" { + if cmd.opt.Debug { logger.SetDebug() logger.LogString("Debug mode is enabled", TOCONSOLE|TOLOGFILE, LEVEL_DEBUG) } @@ -190,15 +167,15 @@ func (cmd *DataMigrateCommand) Run(args ...string) error { } func (cmd *DataMigrateCommand) setOutput(url string) { - cmd.out = strings.TrimPrefix(url, "http://") + cmd.opt.Out = strings.TrimPrefix(url, "http://") } // Check whether the parameters are valid or not. func (cmd *DataMigrateCommand) validate() error { - if cmd.retentionPolicy != "" && cmd.database == "" { + if cmd.opt.RetentionPolicy != "" && cmd.opt.Database == "" { return fmt.Errorf("dataMigrate: must specify a db") } - if cmd.startTime != 0 && cmd.endTime != 0 && cmd.endTime < cmd.startTime { + if cmd.opt.StartTime != 0 && cmd.opt.EndTime != 0 && cmd.opt.EndTime < cmd.opt.StartTime { return fmt.Errorf("dataMigrate: end time before start time") } return nil @@ -232,7 +209,7 @@ func (cmd *DataMigrateCommand) runMigrate() error { func (cmd *DataMigrateCommand) walkTSMFiles() error { logger.LogString("Searching for tsm files to migrate", TOCONSOLE|TOLOGFILE, LEVEL_INFO) - err := filepath.Walk(cmd.dataDir, func(path string, f os.FileInfo, err error) error { + err := filepath.Walk(cmd.opt.DataDir, func(path string, f os.FileInfo, err error) error { if err != nil { return err } @@ -241,7 +218,7 @@ func (cmd *DataMigrateCommand) walkTSMFiles() error { return nil } - relPath, err := filepath.Rel(cmd.dataDir, path) + relPath, err := filepath.Rel(cmd.opt.DataDir, path) if err != nil { return err } @@ -250,17 +227,16 @@ func (cmd *DataMigrateCommand) walkTSMFiles() error { return fmt.Errorf("invalid directory structure for %s", path) } - if dirs[0] == cmd.database || cmd.database == "" { - if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" { - key := filepath.Join(dirs[0], dirs[1], dirs[2]) - cmd.tsmFiles[key] = append(cmd.tsmFiles[key], path) - if len(cmd.tsmFiles[key]) == 1 { - cmd.manifest = append(cmd.manifest, fileGroupInfo{ - db: dirs[0], - rp: dirs[1], - sid: dirs[2], - }) - } + if (dirs[0] == cmd.opt.Database || cmd.opt.Database == "") && + (dirs[1] == cmd.opt.RetentionPolicy || cmd.opt.RetentionPolicy == "") { + key := filepath.Join(dirs[0], dirs[1], dirs[2]) + cmd.tsmFiles[key] = append(cmd.tsmFiles[key], path) + if len(cmd.tsmFiles[key]) == 1 { + cmd.manifest = append(cmd.manifest, fileGroupInfo{ + db: dirs[0], + rp: dirs[1], + sid: dirs[2], + }) } } return nil @@ -369,11 +345,11 @@ func (cmd *DataMigrateCommand) populateShardGroups() error { } func (cmd *DataMigrateCommand) doMigrate(ctx context.Context, info shardGroupInfo) error { - migrateShard := func(key string, files []string) error { + migrateShard := func(info *shardGroupInfo, key string, files []string) error { logger.LogString(fmt.Sprintf("Writing out data from shard %v, [%d/%d]...", key, cmd.gstat.progress.Inc(), len(cmd.manifest)), TOCONSOLE|TOLOGFILE, LEVEL_INFO) st := time.Now() - mig := NewMigrator(cmd) + mig := NewMigrator(cmd, info) defer mig.release() if err := mig.migrateTsmFiles(files); err != nil { return err @@ -394,7 +370,7 @@ func (cmd *DataMigrateCommand) doMigrate(ctx context.Context, info shardGroupInf for _, sid := range info.sids { key := filepath.Join(info.db, info.rp, sid) if files, ok := cmd.tsmFiles[key]; ok { - if err := migrateShard(key, files); err != nil { + if err := migrateShard(&info, key, files); err != nil { return errors.WithStack(err) } } else { diff --git a/src/dataMigrate_test.go b/src/dataMigrate_test.go index 87361e6..a74accb 100644 --- a/src/dataMigrate_test.go +++ b/src/dataMigrate_test.go @@ -11,8 +11,7 @@ package src import ( "fmt" - "io/ioutil" - "math" + "io" "math/rand" "os" "sort" @@ -101,7 +100,11 @@ func Test_ReadTSMFile(t *testing.T) { defer server.Close() cmd := newCommand() - cmd.setOutput(server.URL) + cmd.opt.Out = strings.TrimPrefix(server.URL, "http://") + info := &shardGroupInfo{ + db: "db0", + rp: "rp0", + } for _, c := range []corpus{ basicCorpus, @@ -112,7 +115,7 @@ func Test_ReadTSMFile(t *testing.T) { filelist := []string{tsmFile.Name()} - mig := NewMigrator(cmd) + mig := NewMigrator(cmd, info) if err := mig.migrateTsmFiles(filelist); err != nil { t.Fatal(err) } @@ -120,7 +123,7 @@ func Test_ReadTSMFile(t *testing.T) { // Missing .tsm file should not cause a failure. filelist := []string{"file-that-does-not-exist.tsm"} - if err := NewMigrator(newCommand()).migrateTsmFiles(filelist); err != nil { + if err := NewMigrator(newCommand(), info).migrateTsmFiles(filelist); err != nil { t.Fatal(err) } } @@ -138,15 +141,17 @@ func TestEmptyMigrate(t *testing.T) { defer server.Close() cmd := newCommand() - cmd.startTime = 0 - cmd.endTime = 0 - cmd.setOutput(server.URL) + cmd.opt.Out = strings.TrimPrefix(server.URL, "http://") + info := &shardGroupInfo{ + db: "db0", + rp: "rp0", + } f := writeCorpusToTSMFile(makeFloatsCorpus(100, 250)) defer os.Remove(f.Name()) filelist := []string{f.Name()} - if err := NewMigrator(cmd).migrateTsmFiles(filelist); err != nil { + if err := NewMigrator(cmd, info).migrateTsmFiles(filelist); err != nil { t.Fatal(err) } } @@ -164,18 +169,23 @@ func benchmarkReadTSM(c corpus, b *testing.B) { defer server.Close() cmd := newCommand() - cmd.setOutput(server.URL) + cmd.opt.Out = strings.TrimPrefix(server.URL, "http://") // Garbage collection is relatively likely to happen during export, so track allocations. b.ReportAllocs() f := writeCorpusToTSMFile(c) defer os.Remove(f.Name()) + info := &shardGroupInfo{ + db: "db0", + rp: "rp0", + } + b.ResetTimer() b.StartTimer() for i := 0; i < b.N; i++ { filelist := []string{f.Name()} - if err := NewMigrator(cmd).migrateTsmFiles(filelist); err != nil { + if err := NewMigrator(cmd, info).migrateTsmFiles(filelist); err != nil { b.Fatal(err) } } @@ -199,13 +209,14 @@ func BenchmarkReadTSMStrings_100s_250vps(b *testing.B) { func newCommand() *DataMigrateCommand { return &DataMigrateCommand{ - Stderr: ioutil.Discard, - Stdout: ioutil.Discard, - manifest: make([]fileGroupInfo, 0), - tsmFiles: make(map[string][]string), - startTime: math.MinInt64, - endTime: math.MaxInt64, - gstat: &globalStatInfo{}, + Stderr: io.Discard, + Stdout: io.Discard, + + opt: &DataMigrateOptions{}, + + manifest: make([]fileGroupInfo, 0), + tsmFiles: make(map[string][]string), + gstat: &globalStatInfo{}, } } @@ -263,7 +274,7 @@ func makeStringsCorpus(numSeries, numStringsPerSeries int) corpus { }) } func writeCorpusToTSMFile(c corpus) *os.File { - tsmFile, err := ioutil.TempFile("", "export_test_corpus_tsm") + tsmFile, err := os.CreateTemp("", "export_test_corpus_tsm") if err != nil { panic(err) } diff --git a/src/geminiservice.go b/src/geminiservice.go index 9202c97..83c5946 100644 --- a/src/geminiservice.go +++ b/src/geminiservice.go @@ -18,7 +18,7 @@ type geminiService struct { func NewGeminiService(cmd *DataMigrateCommand) *geminiService { return &geminiService{ - out: cmd.out, + out: cmd.opt.Out, } } diff --git a/src/migrator.go b/src/migrator.go index af2ab18..60f8378 100644 --- a/src/migrator.go +++ b/src/migrator.go @@ -15,6 +15,7 @@ type Migrator interface { writeCurrentFiles() error releaseTSMReaders() getDatabase() string + getRetentionPolicy() string getStat() *statInfo getGStat() *globalStatInfo getBatchSize() int @@ -58,11 +59,12 @@ type statInfo struct { } type migrator struct { - out string - database string - startTime int64 - endTime int64 - batchSize int + out string + database string + retentionPolicy string + startTime int64 + endTime int64 + batchSize int files *[]tsm1.TSMFile // series to fields @@ -94,23 +96,28 @@ func (m *migrator) getDatabase() string { return m.database } +func (m *migrator) getRetentionPolicy() string { + return m.retentionPolicy +} + func (m *migrator) getStat() *statInfo { return m.stat } -func NewMigrator(cmd *DataMigrateCommand) *migrator { +func NewMigrator(cmd *DataMigrateCommand, info *shardGroupInfo) *migrator { mig := &migrator{ - out: cmd.out, - database: cmd.database, - startTime: cmd.startTime, - endTime: cmd.endTime, - files: filesPool.Get().(*[]tsm1.TSMFile), - serieskeys: make(map[string]map[string]struct{}, 100), - stat: statPool.Get().(*statInfo), - gstat: cmd.gstat, - batchSize: cmd.batchSize, - mstCache: mstCachePool.Get().(*lru.Cache), - tagsCache: tagsCachePool.Get().(*lru.Cache), + out: cmd.opt.Out, + database: info.db, + retentionPolicy: info.rp, + startTime: cmd.opt.StartTime, + endTime: cmd.opt.EndTime, + files: filesPool.Get().(*[]tsm1.TSMFile), + serieskeys: make(map[string]map[string]struct{}, 100), + stat: statPool.Get().(*statInfo), + gstat: cmd.gstat, + batchSize: cmd.opt.BatchSize, + mstCache: mstCachePool.Get().(*lru.Cache), + tagsCache: tagsCachePool.Get().(*lru.Cache), } mig.stat.rowsRead = 0 mig.stat.tagsRead = make(map[string]struct{}) diff --git a/src/options.go b/src/options.go new file mode 100644 index 0000000..920ed25 --- /dev/null +++ b/src/options.go @@ -0,0 +1,19 @@ +package src + +type DataMigrateOptions struct { + DataDir string + Out string + Username string + Password string + Database string + RetentionPolicy string + Start string // rfc3339 format + End string // rfc3339 format + StartTime int64 // timestamp + EndTime int64 // timestamp + BatchSize int + Ssl bool + UnsafeSsl bool + + Debug bool +}