From b2548bc50a5daa538036f9a8ed0db1e69033c370 Mon Sep 17 00:00:00 2001 From: patacoing Date: Sat, 17 Aug 2024 17:25:05 +0200 Subject: [PATCH 1/2] feat: async support for request method We added an async support for the request method by creating an async_request method. It provides the same arguments as the other one. In order to support async calls, we used the aiohttp module. We didn't change the way the method works but just changed sync calls by async calls. It concerns api calls and sleep. --- qualysapi/connector.py | 212 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) diff --git a/qualysapi/connector.py b/qualysapi/connector.py index b565934..9d63e28 100644 --- a/qualysapi/connector.py +++ b/qualysapi/connector.py @@ -8,6 +8,8 @@ import logging import time from collections import defaultdict +import asyncio +import aiohttp import requests @@ -581,3 +583,213 @@ def request( return request.content return response + + async def async_request( + self, + api_call, + data=None, + api_version=None, + http_method=None, + concurrent_scans_retries=0, + concurrent_scans_retry_delay=0, + verify=True, + ): + """ Return QualysGuard API response asynchronously.""" + + async with aiohttp.ClientSession() as session: + logger.debug("concurrent_scans_retries =\n%s", str(concurrent_scans_retries)) + logger.debug("concurrent_scans_retry_delay =\n%s", str(concurrent_scans_retry_delay)) + concurrent_scans_retries = int(concurrent_scans_retries) + concurrent_scans_retry_delay = int(concurrent_scans_retry_delay) + + url, data, headers, http_method = self.build_request(api_call, data, api_version, http_method) + + # Make request at least once (more if concurrent_retry is enabled). + retries = 0 + # + # set a warning threshold for the rate limit + rate_warn_threshold = 10 + while retries <= concurrent_scans_retries: + # Make request. + logger.debug("url =\n%s", str(url)) + logger.debug("data =\n%s", str(data)) + logger.debug("headers =\n%s", str(headers)) + + logger.debug(f"{http_method.upper()} request.") + + request = await getattr(session, http_method)( + url, + data=data, + headers=headers, + verify_ssl=verify, + proxy=self.proxies, + ) + request = await request.__aenter__() + + logger.debug("response headers =\n%s", str(request.headers)) + + # Remember how many times left user can make against api_call. + try: + self.rate_limit_remaining[api_call] = int( + request.headers["x-ratelimit-remaining"] + ) + logger.debug( + "rate limit for api_call, %s = %s", + api_call, + self.rate_limit_remaining[api_call], + ) + response = await request.text() + await request.__aexit__(None, None, None) + if ( + "1960" in response + and "This API cannot be run again until" in response + ): + max_retries = 10 + retry_count = 0 + while ( + "1960" in response + and "This API cannot be run again until" in response + ): + retry_count += 1 + time_to_wait = int(30) + logger.info( + "Concurrency Limit Exceeded waiting %d seconds. %d retries remaining" + % (time_to_wait, max_retries - retry_count) + ) + await asyncio.sleep(time_to_wait) + + logger.info(str(url), str(data)) # self.auth, headers, self.proxies) + logger.debug(f"{http_method.upper()} request.") + + request = await getattr(session, http_method)( + url, + data=data, + headers=headers, + verify_ssl=verify, + proxy=self.proxies, + ) + + request = await request.__aenter__() + + logger.debug("response headers =\n%s" % (str(request.headers))) + response = await request.text() + await request.__aexit__(None, None, None) + if retry_count >= max_retries: + break + elif ( + "1965" in response + and "This API cannot be run again for another" in response + ): + max_retries = 10 + retry_count = 0 + while ( + "1965" in response + and "This API cannot be run again for another" in response + ): + retry_count += 1 + time_to_wait = int(request.headers["x-ratelimit-towait-sec"]) + logger.info( + "API Limit Exceeded waiting %d seconds. %d retries remaining" + % (time_to_wait, max_retries - retry_count) + ) + await asyncio.sleep(time_to_wait) + + logger.info(str(url), str(data)) # self.auth, headers, self.proxies) + logger.debug(f"{http_method.upper()} request.") + + request = await getattr(session, http_method)( + url, + data=data, + headers=headers, + verify_ssl=verify, + proxy=self.proxies, + ) + + request = await request.__aenter__() + + logger.debug("response headers =\n%s" % (str(request.headers))) + response = await request.text() + + await request.__aexit__(None, None, None) + if retry_count >= max_retries: + break + + elif self.rate_limit_remaining[api_call] > rate_warn_threshold: + logger.debug( + "rate limit for api_call, %s = %s", + api_call, + self.rate_limit_remaining[api_call], + ) + elif (self.rate_limit_remaining[api_call] <= rate_warn_threshold) and ( + self.rate_limit_remaining[api_call] > 0 + ): + logger.warning( + "Rate limit is about to being reached (remaining api calls = %s)", + self.rate_limit_remaining[api_call], + ) + elif self.rate_limit_remaining[api_call] <= 0: + logger.critical( + "ATTENTION! RATE LIMIT HAS BEEN REACHED (remaining api calls = %s)!", + self.rate_limit_remaining[api_call], + ) + except KeyError as e: + # Likely a bad api_call. + logger.debug(e) + pass + except TypeError as e: + # Likely an asset search api_call. + logger.debug(e) + pass + # Response received. + response = await request.text() + + await request.__aexit__(None, None, None) + logger.debug("response text =\n%s", response) + # Keep track of how many retries. + retries += 1 + # Check for concurrent scans limit. + if not ( + "INVALID_REQUEST" in response + and "You have reached the maximum number of concurrent running scans" + in response + and "Please wait until your previous scans have completed" + in response + ): + # Did not hit concurrent scan limit. + break + else: + # Hit concurrent scan limit. + logger.critical(response) + # If trying again, delay next try by concurrent_scans_retry_delay. + if retries <= concurrent_scans_retries: + logger.warning( + "Waiting %d seconds until next try.", concurrent_scans_retry_delay + ) + await asyncio.sleep(concurrent_scans_retry_delay) + # Inform user of how many retries. + logger.critical("Retry #%d", retries) + else: + # Ran out of retries. Let user know. + logger.critical("Alert! Ran out of concurrent_scans_retries!") + return False + # Check to see if there was an error. + try: + request.raise_for_status() + except aiohttp.ClientResponseError: + # Error + logger.error("Error! Received a 4XX client error or 5XX server error response.") + logger.error("Content = \n%s", response) + logger.error("Headers = \n%s", str(request.headers)) + #request.raise_for_status() + if '' in response: + logger.error("Error! Your IP address is not in the list of secure IPs." \ + + " Manager must include this IP (QualysGuard VM > Users > Security).") + logger.error("Content = \n%s", response) + logger.error("Headers = \n%s", str(request.headers)) + return False + + # return bytes if pdf + if 'application/pdf' in request.headers['content-type']: + return request.content + + return response From 66f47cb2ec9831c2764ff3f2076a81a73a83658a Mon Sep 17 00:00:00 2001 From: patacoing Date: Mon, 26 Aug 2024 22:52:38 +0200 Subject: [PATCH 2/2] fix: fixed async request Added missing ssl verification. Convert data to be FormData. Added auth by using BasicAuth. --- qualysapi/connector.py | 71 +++++++++++++++++++++++++++++++++++------- 1 file changed, 59 insertions(+), 12 deletions(-) diff --git a/qualysapi/connector.py b/qualysapi/connector.py index 9d63e28..450de2c 100644 --- a/qualysapi/connector.py +++ b/qualysapi/connector.py @@ -2,6 +2,10 @@ __copyright__ = "Copyright 2013, Parag Baxi" __license__ = "Apache License 2.0" +import ssl + +import certifi + """ Module that contains classes for setting up connections to QualysGuard API and requesting data from it. """ @@ -233,6 +237,49 @@ def format_payload(self, api_version, data): logger.debug("Converted:\n%s", data) return data + def build_async_request(self, api_call, data=None, api_version=None, http_method=None): + logger.debug("api_call =\n%s", api_call) + logger.debug("api_version =\n%s", api_version) + logger.debug("data %s =\n %s", type(data), str(data)) + logger.debug("http_method =\n%s", http_method) + + api_call = self.preformat_call(api_call) + api_version = self.format_api_version(api_version) if api_version else self.which_api_version(api_call) + # Set up base url. + url = self.url_api_version(api_version) + # Set up headers. + headers = { + "X-Requested-With": f"Parag Baxi QualysAPI (python) v{qualysapi.version.__version__}" + } + logger.debug("headers =\n%s", str(headers)) + # Portal API support XML/JSON exchange format (JSON for assets tagging and management). + # The data exchange format must be specified in the headers. + if api_version in ("am", "was", "am2"): + if self.data_exchange_format == 'xml': + headers["Content-type"] = "text/xml" + headers["Accept"] = "text/xml" + if self.data_exchange_format == 'json': + headers["Content-type"] = "application/json" + headers["Accept"] = "application/json" + # Set up http request method, if not specified. + if not http_method: + http_method = self.format_http_method(api_version, api_call, data) + logger.debug("http_method =\n%s", http_method) + # Format API call. + api_call = self.format_call(api_version, api_call) + logger.debug("api_call =\n%s", api_call) + # Append api_call to url. + url += api_call + # Format data, if applicable. + if data is not None: + data = aiohttp.FormData(data) + + logger.debug("url =\n%s", str(url)) + logger.debug("data =\n%s", str(data)) + logger.debug("headers =\n%s", str(headers)) + + return url, data, headers, http_method + def build_request(self, api_call, data=None, api_version=None, http_method=None): logger.debug("api_call =\n%s", api_call) logger.debug("api_version =\n%s", api_version) @@ -595,6 +642,7 @@ async def async_request( verify=True, ): """ Return QualysGuard API response asynchronously.""" + sslcontext = ssl.create_default_context(cafile=certifi.where()) async with aiohttp.ClientSession() as session: logger.debug("concurrent_scans_retries =\n%s", str(concurrent_scans_retries)) @@ -602,11 +650,11 @@ async def async_request( concurrent_scans_retries = int(concurrent_scans_retries) concurrent_scans_retry_delay = int(concurrent_scans_retry_delay) - url, data, headers, http_method = self.build_request(api_call, data, api_version, http_method) + url, data, headers, http_method = self.build_async_request(api_call, data, api_version, http_method) # Make request at least once (more if concurrent_retry is enabled). retries = 0 - # + # set a warning threshold for the rate limit rate_warn_threshold = 10 while retries <= concurrent_scans_retries: @@ -620,8 +668,9 @@ async def async_request( request = await getattr(session, http_method)( url, data=data, + auth=aiohttp.BasicAuth(*self.auth), headers=headers, - verify_ssl=verify, + ssl=sslcontext if verify else False, proxy=self.proxies, ) request = await request.__aenter__() @@ -664,8 +713,9 @@ async def async_request( request = await getattr(session, http_method)( url, data=data, + auth=aiohttp.BasicAuth(*self.auth), headers=headers, - verify_ssl=verify, + ssl=sslcontext if verify else False, proxy=self.proxies, ) @@ -700,8 +750,9 @@ async def async_request( request = await getattr(session, http_method)( url, data=data, + auth=aiohttp.BasicAuth(*self.auth), headers=headers, - verify_ssl=verify, + ssl=sslcontext if verify else False, proxy=self.proxies, ) @@ -732,14 +783,10 @@ async def async_request( "ATTENTION! RATE LIMIT HAS BEEN REACHED (remaining api calls = %s)!", self.rate_limit_remaining[api_call], ) - except KeyError as e: - # Likely a bad api_call. + except KeyError as e: # Likely a bad api_call. logger.debug(e) - pass - except TypeError as e: - # Likely an asset search api_call. + except TypeError as e: # Likely an asset search api_call. logger.debug(e) - pass # Response received. response = await request.text() @@ -780,7 +827,7 @@ async def async_request( logger.error("Error! Received a 4XX client error or 5XX server error response.") logger.error("Content = \n%s", response) logger.error("Headers = \n%s", str(request.headers)) - #request.raise_for_status() + request.raise_for_status() if '' in response: logger.error("Error! Your IP address is not in the list of secure IPs." \ + " Manager must include this IP (QualysGuard VM > Users > Security).")