Skip to content

Commit

Permalink
Included readme, optimized code & included time range updates
Browse files Browse the repository at this point in the history
Signed-off-by: Chandrakala Subramanyam <[email protected]>
  • Loading branch information
chandrams committed Jan 20, 2025
1 parent b853519 commit a5ade1f
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 111 deletions.
61 changes: 61 additions & 0 deletions tests/scripts/local_monitoring_tests/bulk_stress_test.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# **Kruize Bulk API stress test**

Kruize Bulk API stress test validates the behaviour of [Kruize Bulk APIs](/design/BulkAPI.md) by loading these APIs with multiple requests to generate recommendations

## Tests description
- **Kruize Bulk API stress test**
The test does the following:
- Deploys kruize in non-CRD mode using the [deploy script](https://github.com/kruize/autotune/blob/master/deploy.sh) from the autotune repo
- Creates a resource optimization metric profile using the [createMetricProfile API](/design/MetricProfileAPI.md)
- Runs any of the specified tests below:
- No config test - In this test the Bulk API is invoked parallely without an empty bulk configuration
- Time range test - In this test the Bulk API is invoked parallely with the same time range as specified in the configuration
- Split Time range test - In this test the Bulk API is invoked parallely with different time ranges as specified in the configuration.
- Once the Bulk job is created, the test gets the bulk job status and on completion fetches the recommendations for the processed experiments

## Prerequisites for running the tests:
- Minikube setup or access to Openshift cluster
- Tools like kubectl, oc, curl, jq

- To test with Thanos datasource, Thanos setup with tsdb blocks containing usage metrics is required

## How to run the test?

Use the below command to test :

```
<KRUIZE_REPO>/tests/scripts/local_monitoring_tests/bulk_stress_test/bulk_stress_test.sh -c [minikube|openshift] [-i Kruize image] [-w No. of workers] [-t interval hours (default - 2)] [-s End date of tsdb block] [-a kruize replicas] [-r <resultsdir path>] [--skipsetup skip kruize setup] [ -z to test with prometheus datasource] [--test Specify the test to be run] [--url Thanos Datasource url]
```

Where values for bulk_stress_test.sh are:

```
usage: bulk_stress_test.sh
[ -c ] : cluster type. Supported type - minikube, openshift. Default - minikube
[ -i ] : optional. Kruize docker image to be used for testing
default - quay.io/kruize/autotune:mvp_demo
[ -r ] : Results directory path
[ -w ] : No. of workers (default - 5)
[ -t ] : interval hours (default - 2)
[ -s ] : Initial start date (default - 2024-12-18T06:20:00.000Z)
[ -a ] : kruize replicas (default - 3)
[ -z ] : To register prometheus datasource with kruize
[ --test ] : Specify the test to be run [time_range/no_config/time_range_split] (default - time_range)
[ --url ]: Datasource url (default - ]"
[ --skipsetup ] : skip kruize setup]
```

For example,

```
<AUTOTUNE_REPO>/tests/scripts/remote_monitoring_tests/stress_test/remote_monitoring_stress_test.sh -r /tmp/stress-test-results -i kruize/autotune_operator:0.0.10_mvp -u 1000 -e 100 -t 900
```

Test with the following inputs:
100 exps / 100 results
100 exps / 1500 results
1000 exps / 100 results
1000 exps / 1500 results
10000 exps / 100 results

Once the tests are complete, verify if there are no errors or exceptions in jmeter logs. Kruize resource usage metrics can be found in monitoring_metrics.csv in the results directory.
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,77 @@ def setup_logger(name, log_file, level=logging.INFO):

return logger

def invoke_bulk(worker_number, delay):
try:
if test == "time_range_rampup" or test == "no_config_rampup":
time.sleep(delay)
def fetch_recommendations(job_status_json):
logger.info("Fetching processed experiments...")
exp_list = list(job_status_json["experiments"].keys())

logger.info("List of processed experiments")
logger.info("**************************************************")
logger.info(exp_list)
logger.info("**************************************************")

# List recommendations for the experiments for which recommendations are available
recommendations_json_arr = []

if exp_list:
list_reco_failures = 0
for exp_name in exp_list:
logger.info(f"Fetching recommendations for {exp_name}...")
list_reco_response = list_recommendations(exp_name)
if list_reco_response.status_code != 200:
list_reco_failures = list_reco_failures + 1
logger.info(f"List recommendations failed for the experiment - {exp_name}!")
reco = list_reco_response.json()
logger.info(reco)
continue
else:
logger.info(f"Fetched recommendations for {exp_name} - Done")

reco = list_reco_response.json()
recommendations_json_arr.append(reco)

# Dump the recommendations into a json file
reco_dir = results_dir + "/recommendation_jsons"
os.makedirs(reco_dir, exist_ok=True)
reco_file = reco_dir + "/recommendations" + str(worker_number) + ".json"
with open(reco_file, 'w') as f:
json.dump(recommendations_json_arr, f, indent=4)

if list_reco_failures != 0:
logger.info(
f"List recommendations failed for some of the experiments, check the log {log_file} for details!")
return -1
else:
return 0
else:
logger.error("Something went wrong! There are no experiments with recommendations!")
return -1

def invoke_bulk(worker_number, start_time=None, end_time=None):
try:
stress_log_dir = results_dir + "/stress_logs"
os.makedirs(stress_log_dir, exist_ok=True)

log_file = f"{stress_log_dir}/worker_{worker_number}.log"
logger = setup_logger(f"logger_{worker_number}", log_file)

if test == "time_range" or test == "time_range_rampup":
# Update the bulk json with start & end time
logger.info(f"worker number = {worker_number}")
logger.info(f"start time = {start_time}")
logger.info(f"end time = {end_time}")

if test == "time_range" or test == "time_range_split":
bulk_json_file = "../json_files/bulk_input_timerange.json"
elif test == "no_config" or test == "no_config_rampup":
json_file = open(bulk_json_file, "r")
bulk_json = json.loads(json_file.read())

bulk_json['time_range']['start'] = start_time
bulk_json['time_range']['end'] = end_time
elif test == "no_config":
bulk_json_file = "../json_files/bulk_input.json"

json_file = open(bulk_json_file, "r")
bulk_json = json.loads(json_file.read())
json_file = open(bulk_json_file, "r")
bulk_json = json.loads(json_file.read())

if prometheus == 1:
logger.info("Datasource - prometheus-1")
Expand All @@ -84,7 +137,7 @@ def invoke_bulk(worker_number, delay):
job_status = job_status_json['status']

while job_status != "COMPLETED":
bulk_job_response = get_bulk_job_status(job_id, verbose)
bulk_job_response = get_bulk_job_status(job_id, verbose, logger)
job_status_json = bulk_job_response.json()
job_status = job_status_json['status']
if job_status == "FAILED":
Expand All @@ -105,61 +158,17 @@ def invoke_bulk(worker_number, delay):

# Fetch the list of experiments for which recommendations are available
if job_status != "FAILED":
logger.info("Fetching processed experiments...")
exp_list = list(job_status_json["experiments"].keys())

logger.info("List of processed experiments")
logger.info("**************************************************")
logger.info(exp_list)
logger.info("**************************************************")

# List recommendations for the experiments for which recommendations are available
recommendations_json_arr = []

if exp_list:
list_reco_failures = 0
for exp_name in exp_list:

logger.info(f"Fetching recommendations for {exp_name}...")
list_reco_response = list_recommendations(exp_name)
if list_reco_response.status_code != 200:
list_reco_failures = list_reco_failures + 1
logger.info(f"List recommendations failed for the experiment - {exp_name}!")
reco = list_reco_response.json()
logger.info(reco)
continue
else:
logger.info(f"Fetched recommendations for {exp_name} - Done")

reco = list_reco_response.json()
recommendations_json_arr.append(reco)

# Dump the recommendations into a json file
reco_dir = results_dir + "/recommendation_jsons"
os.makedirs(reco_dir, exist_ok=True)
reco_file = reco_dir + "/recommendations" + str(worker_number) + ".json"
with open(reco_file, 'w') as f:
json.dump(recommendations_json_arr, f, indent=4)

if list_reco_failures != 0:
logger.info(f"List recommendations failed for some of the experiments, check the log {log_file} for details!")
return -1
else:
return 0
else:
logger.error("Something went wrong! There are no experiments with recommendations!")
return -1
status = fetch_recommendations(job_status_json)
return status
else:
logger.info(f"Check {job_file} for job status")
return -1
except Exception as e:
return {'error': str(e)}


def invoke_bulk_with_time_range(worker_number, start_time, end_time, delay):
def invoke_bulk_with_time_range(worker_number, start_time, end_time):
try:
#time.sleep(delay)

stress_log_dir = results_dir + "/stress_logs"
os.makedirs(stress_log_dir, exist_ok=True)

Expand Down Expand Up @@ -202,9 +211,8 @@ def invoke_bulk_with_time_range(worker_number, start_time, end_time, delay):

# Loop until job status is COMPLETED
job_status = job_status_json['status']
print(job_status)
while job_status != "COMPLETED":
bulk_job_response = get_bulk_job_status(job_id, verbose)
bulk_job_response = get_bulk_job_status(job_id, verbose, logger)
job_status_json = bulk_job_response.json()
job_status = job_status_json['status']
if job_status == "FAILED":
Expand All @@ -225,51 +233,8 @@ def invoke_bulk_with_time_range(worker_number, start_time, end_time, delay):

# Fetch the list of experiments for which recommendations are available
if job_status != "FAILED":
logger.info("Fetching processed experiments...")
exp_list = list(job_status_json["experiments"].keys())

logger.info("List of processed experiments")
logger.info("**************************************************")
logger.info(exp_list)
logger.info("**************************************************")

# List recommendations for the experiments for which recommendations are available
recommendations_json_arr = []

if exp_list:
list_reco_failures = 0
for exp_name in exp_list:

logger.info(f"Fetching recommendations for {exp_name}...")
list_reco_response = list_recommendations(exp_name)
if list_reco_response.status_code != 200:
list_reco_failures = list_reco_failures + 1
logger.info(f"List recommendations failed for the experiment - {exp_name}!")
reco = list_reco_response.json()
logger.info(reco)
continue
else:
logger.info(f"Fetched recommendations for {exp_name} - Done")

reco = list_reco_response.json()
recommendations_json_arr.append(reco)

# Dump the recommendations into a json file
reco_dir = results_dir + "/recommendation_jsons"
os.makedirs(reco_dir, exist_ok=True)
reco_file = reco_dir + "/recommendations" + str(worker_number) + ".json"
with open(reco_file, 'w') as f:
json.dump(recommendations_json_arr, f, indent=4)

if list_reco_failures != 0:
logger.info(
f"List recommendations failed for some of the experiments, check the log {log_file} for details!")
return -1
else:
return 0
else:
logger.error("Something went wrong! There are no experiments with recommendations!")
return -1
status = fetch_recommendations(job_status_json)
return status
else:
logger.info(f"Check {job_file} for job status")
return -1
Expand All @@ -278,10 +243,16 @@ def invoke_bulk_with_time_range(worker_number, start_time, end_time, delay):

def parallel_requests_to_bulk():
results = []

current_start_time = datetime.strptime(initial_end_time, '%Y-%m-%dT%H:%M:%S.%fZ') - timedelta(hours=interval_hours)
current_end_time = datetime.strptime(initial_end_time, '%Y-%m-%dT%H:%M:%S.%fZ')

current_start_time = current_start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')
current_end_time = current_end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all the tasks to the executor
futures = [
executor.submit(invoke_bulk, worker_number, delay=worker_number * rampup_interval_seconds)
executor.submit(invoke_bulk, worker_number, current_start_time, current_end_time) if test == "time_range" else executor.submit(invoke, worker_number)
for worker_number in range(1, max_workers+1)
]

Expand All @@ -292,7 +263,6 @@ def parallel_requests_to_bulk():
results.append(result)
except Exception as e:
results.append({'error': str(e)})

return results

def parallel_requests_with_time_range_split(max_workers):
Expand Down Expand Up @@ -320,7 +290,7 @@ def parallel_requests_with_time_range_split(max_workers):
current_start_time = current_start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')
current_end_time = current_end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')

executor.submit(invoke_bulk_with_time_range, worker_number, current_start_time, current_end_time, delay=worker_number * rampup_interval_seconds)
executor.submit(invoke_bulk, worker_number, current_start_time, current_end_time)

current_end_time = current_start_time

Expand All @@ -342,7 +312,6 @@ def parallel_requests_with_time_range_split(max_workers):
initial_end_time = ""
interval_hours = 6
test = ""
rampup_interval_seconds = 2
prometheus = 0

parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -403,9 +372,6 @@ def parallel_requests_with_time_range_split(max_workers):
start_time = time.time()

if test == "time_range" or test == "no_config":
rampup_interval_seconds = 0

if test == "time_range" or test == "no_config" or test == "time_range_rampup" or test == "no_config_rampup":
responses = parallel_requests_to_bulk()

if test == "time_range_split":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ KRUIZE_IMAGE="quay.io/kruize/autotune:mvp_demo"

function usage() {
echo
echo "Usage: [-i Kruize image] [-w No. of workers (default - 5)] [-t interval hours (default - 2)] [-s Initial start date (default - 2024-12-18T06:20:00.000Z)]"
echo "Usage: [-i Kruize image] [-w No. of workers (default - 5)] [-t interval hours (default - 2)] [-s End date of tsdb block (default - 2024-12-18T06:20:00.000Z)]"
echo "[-a kruize replicas (default - 3)][-r <resultsdir path>] [--skipsetup skip kruize setup] [ -z to test with prometheus datasource]"
echo "[--test Specify the test to be run (default - time_range)] [--url Datasource url (default - ${ds_url}]"
exit 1
Expand Down

0 comments on commit a5ade1f

Please sign in to comment.