diff --git a/cmd/tsbs_load/config.go b/cmd/tsbs_load/config.go index 6edf6eea5..7cec82ba5 100644 --- a/cmd/tsbs_load/config.go +++ b/cmd/tsbs_load/config.go @@ -16,19 +16,20 @@ type LoaderConfig struct { } type RunnerConfig struct { - DBName string `yaml:"db-name" mapstructure:"db-name"` - BatchSize uint `yaml:"batch-size" mapstructure:"batch-size"` - Workers uint - Limit uint64 - DoLoad bool `yaml:"do-load" mapstructure:"do-load"` - DoCreateDB bool `yaml:"do-create-db" mapstructure:"do-create-db"` - DoAbortOnExist bool `yaml:"do-abort-on-exist" mapstructure:"do-abort-on-exist"` - ReportingPeriod time.Duration `yaml:"reporting-period" mapstructure:"reporting-period"` - Seed int64 - HashWorkers bool `yaml:"hash-workers" mapstructure:"hash-workers"` - InsertIntervals string `yaml:"insert-intervals" mapstructure:"insert-intervals"` - FlowControl bool `yaml:"flow-control" mapstructure:"flow-control"` - ChannelCapacity uint `yaml:"channel-capacity" mapstructure:"channel-capacity"` + DBName string `yaml:"db-name" mapstructure:"db-name"` + BatchSize uint `yaml:"batch-size" mapstructure:"batch-size"` + Workers uint + Limit uint64 + DoLoad bool `yaml:"do-load" mapstructure:"do-load"` + DoCreateDB bool `yaml:"do-create-db" mapstructure:"do-create-db"` + DoAbortOnExist bool `yaml:"do-abort-on-exist" mapstructure:"do-abort-on-exist"` + ReportingPeriod time.Duration `yaml:"reporting-period" mapstructure:"reporting-period"` + Seed int64 + HashWorkers bool `yaml:"hash-workers" mapstructure:"hash-workers"` + InsertIntervals string `yaml:"insert-intervals" mapstructure:"insert-intervals"` + InsertIntervalsUnit string `yaml:"insert-intervals-unit" mapstructure:"insert-intervals-unit"` + FlowControl bool `yaml:"flow-control" mapstructure:"flow-control"` + ChannelCapacity uint `yaml:"channel-capacity" mapstructure:"channel-capacity"` } type DataSourceConfig struct { diff --git a/load/insertstrategy/sleep_regulator.go b/load/insertstrategy/sleep_regulator.go index f03892819..4286c12be 100644 --- a/load/insertstrategy/sleep_regulator.go +++ b/load/insertstrategy/sleep_regulator.go @@ -43,12 +43,12 @@ type sleepRegulator struct { // numWorkers=3, string='1,2' => worker '0' at least 1 second, workers '1' and '2' at least 2 seconds between inserts // numWorkers=1, string='0-1' => worker '0' needs to have [0,1) seconds between inserts // numWorkers=3, string='1,2-4'=> worker '0' have 1 second between inserts, workers '1' and '2' have [2,4) seconds between inserts -func NewSleepRegulator(insertIntervalString string, numWorkers int, initialRand *rand.Rand) (SleepRegulator, error) { +func NewSleepRegulator(insertIntervalString string, unitString string, numWorkers int, initialRand *rand.Rand) (SleepRegulator, error) { if numWorkers <= 0 { return nil, fmt.Errorf("number of workers must be positive, can't be %d", numWorkers) } - sleepTimes, err := parseInsertIntervalString(insertIntervalString, numWorkers, initialRand) + sleepTimes, err := parseInsertIntervalString(insertIntervalString, unitString, numWorkers, initialRand) if err != nil { return nil, err } diff --git a/load/insertstrategy/sleep_regulator_config.go b/load/insertstrategy/sleep_regulator_config.go index 2dbd89882..c6e9ebd44 100644 --- a/load/insertstrategy/sleep_regulator_config.go +++ b/load/insertstrategy/sleep_regulator_config.go @@ -2,6 +2,7 @@ package insertstrategy import ( "errors" + "fmt" "math/rand" "strconv" "strings" @@ -9,12 +10,15 @@ import ( ) const ( - workerSleepUnit = time.Second intervalSeparator = "," rangeSeparator = "-" intervalFormatError = "worker insert interval could not be parsed as integer constant or range. Required: 'x' or 'x-y' | x,y are uint x worker '0' needs to have [0,1) seconds between inserts // numWorkers=3, string='1,2-4'=> worker '0' have 1 second between inserts, workers '1' and '2' have [2,4) seconds between inserts // Error returned if numbers can't be parsed -func parseInsertIntervalString(insertIntervalString string, numWorkers int, initialRand *rand.Rand) (map[int]generateSleepTimeFn, error) { +func parseInsertIntervalString(insertIntervalString string, unitString string, numWorkers int, initialRand *rand.Rand) (map[int]generateSleepTimeFn, error) { randsPerWorker := makeRandsForWorkers(numWorkers, initialRand) splitIntervals := splitIntervalString(insertIntervalString) numIntervals := len(splitIntervals) @@ -32,6 +36,8 @@ func parseInsertIntervalString(insertIntervalString string, numWorkers int, init currentInterval := 0 var err error + setWorkerSleepUnit(unitString) + for i := 0; i < numWorkers; i++ { intervalToParse := splitIntervals[currentInterval] sleepGenerators[i], err = parseSingleIntervalString(intervalToParse, randsPerWorker[i]) @@ -47,6 +53,19 @@ func parseInsertIntervalString(insertIntervalString string, numWorkers int, init return sleepGenerators, nil } +func setWorkerSleepUnit(unitString string) { + switch unitString { + case "millisecond": + workerSleepUnit = time.Millisecond + case "microsecond": + workerSleepUnit = time.Microsecond + case "second": + workerSleepUnit = time.Second + default: + panic(fmt.Sprintf(`%s is not a valid unit for insert intervals!`, unitString)) + } +} + // parses an insert interval string for a single worker, // first it attempts to parse it as a constant, then as a range func parseSingleIntervalString(rangeStr string, randForWorker *rand.Rand) (generateSleepTimeFn, error) { diff --git a/load/insertstrategy/sleep_regulator_test.go b/load/insertstrategy/sleep_regulator_test.go index e8c18004b..7aa80d85b 100644 --- a/load/insertstrategy/sleep_regulator_test.go +++ b/load/insertstrategy/sleep_regulator_test.go @@ -34,7 +34,7 @@ func TestNewSleepRegulator(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - res, err := NewSleepRegulator(tc.intervalString, tc.workers, tc.rand) + res, err := NewSleepRegulator(tc.intervalString, "second", tc.workers, tc.rand) if err != nil && !tc.expectErr { t.Errorf("unexpected error: %v", err) return @@ -65,7 +65,7 @@ func TestNewSleepRegulator(t *testing.T) { } func TestTimeToSleepPanicOnWrongWorkerNumber(t *testing.T) { - sr, _ := NewSleepRegulator("1", 1, rand.New(rand.NewSource(0))) + sr, _ := NewSleepRegulator("1", "second", 1, rand.New(rand.NewSource(0))) defer func() { if r := recover(); r != "invalid worker number: 2" { t.Errorf("wrong panic.\nexpected: invalid worker number: 1\ngot: %v", r) diff --git a/load/loader.go b/load/loader.go index 8e1e37afd..9ac7cea1f 100644 --- a/load/loader.go +++ b/load/loader.go @@ -31,19 +31,20 @@ var ( // BenchmarkRunnerConfig contains all the configuration information required for running BenchmarkRunner. type BenchmarkRunnerConfig struct { - DBName string `yaml:"db-name" mapstructure:"db-name" json:"db-name"` - BatchSize uint `yaml:"batch-size" mapstructure:"batch-size" json:"batch-size"` - Workers uint `yaml:"workers" mapstructure:"workers" json:"workers"` - Limit uint64 `yaml:"limit" mapstructure:"limit" json:"limit"` - DoLoad bool `yaml:"do-load" mapstructure:"do-load" json:"do-load"` - DoCreateDB bool `yaml:"do-create-db" mapstructure:"do-create-db" json:"do-create-db"` - DoAbortOnExist bool `yaml:"do-abort-on-exist" mapstructure:"do-abort-on-exist" json:"do-abort-on-exist"` - ReportingPeriod time.Duration `yaml:"reporting-period" mapstructure:"reporting-period" json:"reporting-period"` - HashWorkers bool `yaml:"hash-workers" mapstructure:"hash-workers" json:"hash-workers"` - NoFlowControl bool `yaml:"no-flow-control" mapstructure:"no-flow-control" json:"no-flow-control"` - ChannelCapacity uint `yaml:"channel-capacity" mapstructure:"channel-capacity" json:"channel-capacity"` - InsertIntervals string `yaml:"insert-intervals" mapstructure:"insert-intervals" json:"insert-intervals"` - ResultsFile string `yaml:"results-file" mapstructure:"results-file" json:"results-file"` + DBName string `yaml:"db-name" mapstructure:"db-name" json:"db-name"` + BatchSize uint `yaml:"batch-size" mapstructure:"batch-size" json:"batch-size"` + Workers uint `yaml:"workers" mapstructure:"workers" json:"workers"` + Limit uint64 `yaml:"limit" mapstructure:"limit" json:"limit"` + DoLoad bool `yaml:"do-load" mapstructure:"do-load" json:"do-load"` + DoCreateDB bool `yaml:"do-create-db" mapstructure:"do-create-db" json:"do-create-db"` + DoAbortOnExist bool `yaml:"do-abort-on-exist" mapstructure:"do-abort-on-exist" json:"do-abort-on-exist"` + ReportingPeriod time.Duration `yaml:"reporting-period" mapstructure:"reporting-period" json:"reporting-period"` + HashWorkers bool `yaml:"hash-workers" mapstructure:"hash-workers" json:"hash-workers"` + NoFlowControl bool `yaml:"no-flow-control" mapstructure:"no-flow-control" json:"no-flow-control"` + ChannelCapacity uint `yaml:"channel-capacity" mapstructure:"channel-capacity" json:"channel-capacity"` + InsertIntervals string `yaml:"insert-intervals" mapstructure:"insert-intervals" json:"insert-intervals"` + InsertIntervalsUnit string `yaml:"insert-intervals-unit" mapstructure:"insert-intervals-unit" json:"insert-intervals-unit"` + ResultsFile string `yaml:"results-file" mapstructure:"results-file" json:"results-file"` // deprecated, should not be used in other places other than tsbs_load_xx commands FileName string `yaml:"file" mapstructure:"file" json:"file"` Seed int64 `yaml:"seed" mapstructure:"seed" json:"seed"` @@ -61,7 +62,8 @@ func (c BenchmarkRunnerConfig) AddToFlagSet(fs *pflag.FlagSet) { fs.Duration("reporting-period", 10*time.Second, "Period to report write stats") fs.String("file", "", "File name to read data from") fs.Int64("seed", 0, "PRNG seed (default: 0, which uses the current timestamp)") - fs.String("insert-intervals", "", "Time to wait between each insert, default '' => all workers insert ASAP. '1,2' = worker 1 waits 1s between inserts, worker 2 and others wait 2s") + fs.String("insert-intervals", "", "Time to wait between each insert, default '' => all workers insert ASAP. '1,2' = worker 1 waits 1s between inserts, worker 2 and others wait 2s. (Unit adjustable with insert-intervals-unit)") + fs.String("insert-intervals-unit", "second", "Unit for insert intervals. Options: second, millisecond, microsecond.") fs.Bool("hash-workers", false, "Whether to consistently hash insert data to the same workers (i.e., the data for a particular host always goes to the same worker)") fs.String("results-file", "", "Write the test results summary json to this file") } @@ -97,7 +99,7 @@ func GetBenchmarkRunner(c BenchmarkRunnerConfig) BenchmarkRunner { if c.InsertIntervals == "" { loader.sleepRegulator = insertstrategy.NoWait() } else { - loader.sleepRegulator, err = insertstrategy.NewSleepRegulator(c.InsertIntervals, int(loader.Workers), loader.initialRand) + loader.sleepRegulator, err = insertstrategy.NewSleepRegulator(c.InsertIntervals, c.InsertIntervalsUnit, int(loader.Workers), loader.initialRand) if err != nil { panic(fmt.Sprintf("could not initialize BenchmarkRunner: %v", err)) }