diff --git a/README.md b/README.md index 3ff75e9..f3ee5f3 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ Build status Coverage Code style: black -Number of tests +Number of tests Number of downloads

diff --git a/tests/auth_helper.py b/tests/auth_helper.py deleted file mode 100644 index 40b8567..0000000 --- a/tests/auth_helper.py +++ /dev/null @@ -1,31 +0,0 @@ -import requests -import requests.auth -import responses - - -def get_header(responses: responses.RequestsMock, auth: requests.auth.AuthBase) -> dict: - # Mock a dummy response - responses.add(responses.GET, "http://authorized_only") - # Send a request to this dummy URL with authentication - response = requests.get("http://authorized_only", auth=auth) - # Return headers received on this dummy URL - return response.request.headers - - -def get_query_args( - responses: responses.RequestsMock, auth: requests.auth.AuthBase -) -> str: - # Mock a dummy response - responses.add(responses.GET, "http://authorized_only") - # Send a request to this dummy URL with authentication - response = requests.get("http://authorized_only", auth=auth) - # Return headers received on this dummy URL - return response.request.path_url - - -def get_request(responses: responses.RequestsMock, url: str) -> responses.Response: - for call in responses.calls: - if call.request.url == url: - # Pop out verified request (to be able to check multiple requests) - responses.calls._calls.remove(call) - return call.request diff --git a/tests/features/multi_auth/test_add_operator.py b/tests/features/multi_auth/test_add_operator.py index 6ff87f1..9605b8f 100644 --- a/tests/features/multi_auth/test_add_operator.py +++ b/tests/features/multi_auth/test_add_operator.py @@ -2,19 +2,36 @@ from responses import RequestsMock import requests +from responses.matchers import header_matcher, query_string_matcher import requests_auth -from requests_auth.testing import BrowserMock, create_token, token_cache, browser_mock +from requests_auth.testing import ( + BrowserMock, + create_token, + token_cache, + browser_mock, +) # noqa: F401 import requests_auth._oauth2.authorization_code_pkce -from tests.auth_helper import get_header def test_basic_and_api_key_authentication_can_be_combined(responses: RequestsMock): basic_auth = requests_auth.Basic("test_user", "test_pwd") api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, basic_auth + api_key_auth) - assert header.get("Authorization") == "Basic dGVzdF91c2VyOnRlc3RfcHdk" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = basic_auth + api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Basic dGVzdF91c2VyOnRlc3RfcHdk", + "X-API-Key": "my_provided_api_key", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_header_api_key_and_multiple_authentication_can_be_combined( @@ -27,10 +44,22 @@ def test_header_api_key_and_multiple_authentication_can_be_combined( api_key_auth3 = requests_auth.HeaderApiKey( "my_provided_api_key3", header_name="X-Api-Key3" ) - header = get_header(responses, api_key_auth + (api_key_auth2 + api_key_auth3)) - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" - assert header.get("X-Api-Key3") == "my_provided_api_key3" + auth = api_key_auth + (api_key_auth2 + api_key_auth3) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + "X-Api-Key3": "my_provided_api_key3", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_multiple_auth_and_header_api_key_can_be_combined( @@ -43,10 +72,22 @@ def test_multiple_auth_and_header_api_key_can_be_combined( api_key_auth3 = requests_auth.HeaderApiKey( "my_provided_api_key3", header_name="X-Api-Key3" ) - header = get_header(responses, (api_key_auth + api_key_auth2) + api_key_auth3) - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" - assert header.get("X-Api-Key3") == "my_provided_api_key3" + auth = (api_key_auth + api_key_auth2) + api_key_auth3 + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + "X-Api-Key3": "my_provided_api_key3", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_multiple_auth_and_multiple_auth_can_be_combined( @@ -62,13 +103,23 @@ def test_multiple_auth_and_multiple_auth_can_be_combined( api_key_auth4 = requests_auth.HeaderApiKey( "my_provided_api_key4", header_name="X-Api-Key4" ) - header = get_header( - responses, (api_key_auth + api_key_auth2) + (api_key_auth3 + api_key_auth4) + auth = (api_key_auth + api_key_auth2) + (api_key_auth3 + api_key_auth4) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + "X-Api-Key3": "my_provided_api_key3", + "X-Api-Key4": "my_provided_api_key4", + } + ) + ], ) - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" - assert header.get("X-Api-Key3") == "my_provided_api_key3" - assert header.get("X-Api-Key4") == "my_provided_api_key4" + + requests.get("http://authorized_only", auth=auth) def test_basic_and_multiple_authentication_can_be_combined( @@ -81,10 +132,22 @@ def test_basic_and_multiple_authentication_can_be_combined( api_key_auth3 = requests_auth.HeaderApiKey( "my_provided_api_key3", header_name="X-Api-Key3" ) - header = get_header(responses, basic_auth + (api_key_auth2 + api_key_auth3)) - assert header.get("Authorization") == "Basic dGVzdF91c2VyOnRlc3RfcHdk" - assert header.get("X-Api-Key2") == "my_provided_api_key2" - assert header.get("X-Api-Key3") == "my_provided_api_key3" + auth = basic_auth + (api_key_auth2 + api_key_auth3) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Basic dGVzdF91c2VyOnRlc3RfcHdk", + "X-Api-Key2": "my_provided_api_key2", + "X-Api-Key3": "my_provided_api_key3", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_query_api_key_and_multiple_authentication_can_be_combined( @@ -97,19 +160,23 @@ def test_query_api_key_and_multiple_authentication_can_be_combined( api_key_auth3 = requests_auth.HeaderApiKey( "my_provided_api_key3", header_name="X-Api-Key3" ) + auth = api_key_auth + (api_key_auth2 + api_key_auth3) - # Mock a dummy response - responses.add(responses.GET, "http://authorized_only") - # Send a request to this dummy URL with authentication - response = requests.get( - "http://authorized_only", auth=api_key_auth + (api_key_auth2 + api_key_auth3) - ) - # Return headers received on this dummy URL - assert ( - response.request.path_url - == "/?api_key=my_provided_api_key&api_key2=my_provided_api_key2" + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "X-Api-Key3": "my_provided_api_key3", + } + ), + query_string_matcher( + "api_key=my_provided_api_key&api_key2=my_provided_api_key2" + ), + ], ) - assert response.request.headers.get("X-Api-Key3") == "my_provided_api_key3" + + requests.get("http://authorized_only", auth=auth) def test_oauth2_resource_owner_password_and_api_key_authentication_can_be_combined( @@ -118,8 +185,7 @@ def test_oauth2_resource_owner_password_and_api_key_authentication_can_be_combin resource_owner_password_auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -130,9 +196,21 @@ def test_oauth2_resource_owner_password_and_api_key_authentication_can_be_combin }, ) api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, resource_owner_password_auth + api_key_auth) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = resource_owner_password_auth + api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_oauth2_resource_owner_password_and_multiple_authentication_can_be_combined( @@ -141,8 +219,7 @@ def test_oauth2_resource_owner_password_and_multiple_authentication_can_be_combi resource_owner_password_auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -156,12 +233,22 @@ def test_oauth2_resource_owner_password_and_multiple_authentication_can_be_combi api_key_auth2 = requests_auth.HeaderApiKey( "my_provided_api_key2", header_name="X-Api-Key2" ) - header = get_header( - responses, resource_owner_password_auth + (api_key_auth + api_key_auth2) + auth = resource_owner_password_auth + (api_key_auth + api_key_auth2) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + } + ) + ], ) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" + + requests.get("http://authorized_only", auth=auth) def test_oauth2_client_credential_and_api_key_authentication_can_be_combined( @@ -170,8 +257,7 @@ def test_oauth2_client_credential_and_api_key_authentication_can_be_combined( resource_owner_password_auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -182,9 +268,21 @@ def test_oauth2_client_credential_and_api_key_authentication_can_be_combined( }, ) api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, resource_owner_password_auth + api_key_auth) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = resource_owner_password_auth + api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_oauth2_client_credential_and_multiple_authentication_can_be_combined( @@ -193,8 +291,7 @@ def test_oauth2_client_credential_and_multiple_authentication_can_be_combined( resource_owner_password_auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -208,12 +305,22 @@ def test_oauth2_client_credential_and_multiple_authentication_can_be_combined( api_key_auth2 = requests_auth.HeaderApiKey( "my_provided_api_key2", header_name="X-Api-Key2" ) - header = get_header( - responses, resource_owner_password_auth + (api_key_auth + api_key_auth2) + auth = resource_owner_password_auth + (api_key_auth + api_key_auth2) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + } + ) + ], ) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" + + requests.get("http://authorized_only", auth=auth) def test_oauth2_authorization_code_and_api_key_authentication_can_be_combined( @@ -226,8 +333,7 @@ def test_oauth2_authorization_code_and_api_key_authentication_can_be_combined( opened_url="http://provide_code?response_type=code&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F", reply_url="http://localhost:5000#code=SplxlOBeZQQYbYS6WxSbIA&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de", ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -238,9 +344,22 @@ def test_oauth2_authorization_code_and_api_key_authentication_can_be_combined( }, ) api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, authorization_code_auth + api_key_auth) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = authorization_code_auth + api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de. You may close this tab." ) @@ -256,8 +375,7 @@ def test_oauth2_authorization_code_and_multiple_authentication_can_be_combined( opened_url="http://provide_code?response_type=code&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F", reply_url="http://localhost:5000#code=SplxlOBeZQQYbYS6WxSbIA&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de", ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -271,12 +389,23 @@ def test_oauth2_authorization_code_and_multiple_authentication_can_be_combined( api_key_auth2 = requests_auth.HeaderApiKey( "my_provided_api_key2", header_name="X-Api-Key2" ) - header = get_header( - responses, authorization_code_auth + (api_key_auth + api_key_auth2) + auth = authorization_code_auth + (api_key_auth + api_key_auth2) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + } + ) + ], ) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de. You may close this tab." ) @@ -295,8 +424,7 @@ def test_oauth2_pkce_and_api_key_authentication_can_be_combined( opened_url="http://provide_code?response_type=code&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F&code_challenge=5C_ph_KZ3DstYUc965SiqmKAA-ShvKF4Ut7daKd3fjc&code_challenge_method=S256", reply_url="http://localhost:5000#code=SplxlOBeZQQYbYS6WxSbIA&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de", ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -307,9 +435,22 @@ def test_oauth2_pkce_and_api_key_authentication_can_be_combined( }, ) api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, pkce_auth + api_key_auth) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = pkce_auth + api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de. You may close this tab." ) @@ -328,8 +469,7 @@ def test_oauth2_pkce_and_multiple_authentication_can_be_combined( opened_url="http://provide_code?response_type=code&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F&code_challenge=5C_ph_KZ3DstYUc965SiqmKAA-ShvKF4Ut7daKd3fjc&code_challenge_method=S256", reply_url="http://localhost:5000#code=SplxlOBeZQQYbYS6WxSbIA&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de", ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -343,10 +483,23 @@ def test_oauth2_pkce_and_multiple_authentication_can_be_combined( api_key_auth2 = requests_auth.HeaderApiKey( "my_provided_api_key2", header_name="X-Api-Key2" ) - header = get_header(responses, pkce_auth + (api_key_auth + api_key_auth2)) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" + auth = pkce_auth + (api_key_auth + api_key_auth2) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de. You may close this tab." ) @@ -366,9 +519,22 @@ def test_oauth2_implicit_and_api_key_authentication_can_be_combined( data=f"access_token={token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, implicit_auth + api_key_auth) - assert header.get("Authorization") == f"Bearer {token}" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = implicit_auth + api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": f"Bearer {token}", + "X-API-Key": "my_provided_api_key", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) @@ -391,10 +557,23 @@ def test_oauth2_implicit_and_multiple_authentication_can_be_combined( api_key_auth2 = requests_auth.HeaderApiKey( "my_provided_api_key2", header_name="X-Api-Key2" ) - header = get_header(responses, implicit_auth + (api_key_auth + api_key_auth2)) - assert header.get("Authorization") == f"Bearer {token}" - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" + auth = implicit_auth + (api_key_auth + api_key_auth2) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": f"Bearer {token}", + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) diff --git a/tests/features/multi_auth/test_and_operator.py b/tests/features/multi_auth/test_and_operator.py index f6f920a..b08a78d 100644 --- a/tests/features/multi_auth/test_and_operator.py +++ b/tests/features/multi_auth/test_and_operator.py @@ -2,19 +2,36 @@ from responses import RequestsMock import requests +from responses.matchers import header_matcher, query_string_matcher import requests_auth -from requests_auth.testing import BrowserMock, create_token, token_cache, browser_mock +from requests_auth.testing import ( + BrowserMock, + create_token, + token_cache, + browser_mock, +) # noqa: F401 import requests_auth._oauth2.authorization_code_pkce -from tests.auth_helper import get_header def test_basic_and_api_key_authentication_can_be_combined(responses: RequestsMock): basic_auth = requests_auth.Basic("test_user", "test_pwd") api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, basic_auth & api_key_auth) - assert header.get("Authorization") == "Basic dGVzdF91c2VyOnRlc3RfcHdk" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = basic_auth & api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Basic dGVzdF91c2VyOnRlc3RfcHdk", + "X-API-Key": "my_provided_api_key", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_header_api_key_and_multiple_authentication_can_be_combined( @@ -27,10 +44,22 @@ def test_header_api_key_and_multiple_authentication_can_be_combined( api_key_auth3 = requests_auth.HeaderApiKey( "my_provided_api_key3", header_name="X-Api-Key3" ) - header = get_header(responses, api_key_auth & (api_key_auth2 & api_key_auth3)) - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" - assert header.get("X-Api-Key3") == "my_provided_api_key3" + auth = api_key_auth & (api_key_auth2 & api_key_auth3) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + "X-Api-Key3": "my_provided_api_key3", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_multiple_auth_and_header_api_key_can_be_combined( @@ -43,10 +72,22 @@ def test_multiple_auth_and_header_api_key_can_be_combined( api_key_auth3 = requests_auth.HeaderApiKey( "my_provided_api_key3", header_name="X-Api-Key3" ) - header = get_header(responses, (api_key_auth & api_key_auth2) & api_key_auth3) - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" - assert header.get("X-Api-Key3") == "my_provided_api_key3" + auth = (api_key_auth & api_key_auth2) & api_key_auth3 + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + "X-Api-Key3": "my_provided_api_key3", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_multiple_auth_and_multiple_auth_can_be_combined( @@ -62,13 +103,23 @@ def test_multiple_auth_and_multiple_auth_can_be_combined( api_key_auth4 = requests_auth.HeaderApiKey( "my_provided_api_key4", header_name="X-Api-Key4" ) - header = get_header( - responses, (api_key_auth & api_key_auth2) & (api_key_auth3 & api_key_auth4) + auth = (api_key_auth & api_key_auth2) & (api_key_auth3 & api_key_auth4) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + "X-Api-Key3": "my_provided_api_key3", + "X-Api-Key4": "my_provided_api_key4", + } + ) + ], ) - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" - assert header.get("X-Api-Key3") == "my_provided_api_key3" - assert header.get("X-Api-Key4") == "my_provided_api_key4" + + requests.get("http://authorized_only", auth=auth) def test_basic_and_multiple_authentication_can_be_combined( @@ -81,10 +132,22 @@ def test_basic_and_multiple_authentication_can_be_combined( api_key_auth3 = requests_auth.HeaderApiKey( "my_provided_api_key3", header_name="X-Api-Key3" ) - header = get_header(responses, basic_auth & (api_key_auth2 & api_key_auth3)) - assert header.get("Authorization") == "Basic dGVzdF91c2VyOnRlc3RfcHdk" - assert header.get("X-Api-Key2") == "my_provided_api_key2" - assert header.get("X-Api-Key3") == "my_provided_api_key3" + auth = basic_auth & (api_key_auth2 & api_key_auth3) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Basic dGVzdF91c2VyOnRlc3RfcHdk", + "X-Api-Key2": "my_provided_api_key2", + "X-Api-Key3": "my_provided_api_key3", + } + ) + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_query_api_key_and_multiple_authentication_can_be_combined( @@ -97,19 +160,23 @@ def test_query_api_key_and_multiple_authentication_can_be_combined( api_key_auth3 = requests_auth.HeaderApiKey( "my_provided_api_key3", header_name="X-Api-Key3" ) + auth = api_key_auth & (api_key_auth2 & api_key_auth3) - # Mock a dummy response - responses.add(responses.GET, "http://authorized_only") - # Send a request to this dummy URL with authentication - response = requests.get( - "http://authorized_only", auth=api_key_auth & (api_key_auth2 & api_key_auth3) - ) - # Return headers received on this dummy URL - assert ( - response.request.path_url - == "/?api_key=my_provided_api_key&api_key2=my_provided_api_key2" + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "X-Api-Key3": "my_provided_api_key3", + } + ), + query_string_matcher( + "api_key=my_provided_api_key&api_key2=my_provided_api_key2" + ), + ], ) - assert response.request.headers.get("X-Api-Key3") == "my_provided_api_key3" + + requests.get("http://authorized_only", auth=auth) def test_oauth2_resource_owner_password_and_api_key_authentication_can_be_combined( @@ -118,8 +185,9 @@ def test_oauth2_resource_owner_password_and_api_key_authentication_can_be_combin resource_owner_password_auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") + auth = resource_owner_password_auth & api_key_auth + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -129,10 +197,20 @@ def test_oauth2_resource_owner_password_and_api_key_authentication_can_be_combin "example_parameter": "example_value", }, ) - api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, resource_owner_password_auth & api_key_auth) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + } + ), + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_oauth2_resource_owner_password_and_multiple_authentication_can_be_combined( @@ -141,8 +219,7 @@ def test_oauth2_resource_owner_password_and_multiple_authentication_can_be_combi resource_owner_password_auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -156,12 +233,22 @@ def test_oauth2_resource_owner_password_and_multiple_authentication_can_be_combi api_key_auth2 = requests_auth.HeaderApiKey( "my_provided_api_key2", header_name="X-Api-Key2" ) - header = get_header( - responses, resource_owner_password_auth & (api_key_auth & api_key_auth2) + auth = resource_owner_password_auth & (api_key_auth & api_key_auth2) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + } + ), + ], ) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" + + requests.get("http://authorized_only", auth=auth) def test_oauth2_client_credential_and_api_key_authentication_can_be_combined( @@ -170,8 +257,7 @@ def test_oauth2_client_credential_and_api_key_authentication_can_be_combined( resource_owner_password_auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -182,9 +268,21 @@ def test_oauth2_client_credential_and_api_key_authentication_can_be_combined( }, ) api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, resource_owner_password_auth & api_key_auth) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = resource_owner_password_auth & api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + } + ), + ], + ) + + requests.get("http://authorized_only", auth=auth) def test_oauth2_client_credential_and_multiple_authentication_can_be_combined( @@ -193,8 +291,7 @@ def test_oauth2_client_credential_and_multiple_authentication_can_be_combined( resource_owner_password_auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -208,12 +305,22 @@ def test_oauth2_client_credential_and_multiple_authentication_can_be_combined( api_key_auth2 = requests_auth.HeaderApiKey( "my_provided_api_key2", header_name="X-Api-Key2" ) - header = get_header( - responses, resource_owner_password_auth & (api_key_auth & api_key_auth2) + auth = resource_owner_password_auth & (api_key_auth & api_key_auth2) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + } + ), + ], ) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" + + requests.get("http://authorized_only", auth=auth) def test_oauth2_authorization_code_and_api_key_authentication_can_be_combined( @@ -226,8 +333,7 @@ def test_oauth2_authorization_code_and_api_key_authentication_can_be_combined( opened_url="http://provide_code?response_type=code&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F", reply_url="http://localhost:5000#code=SplxlOBeZQQYbYS6WxSbIA&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de", ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -238,9 +344,22 @@ def test_oauth2_authorization_code_and_api_key_authentication_can_be_combined( }, ) api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, authorization_code_auth & api_key_auth) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = authorization_code_auth & api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + } + ), + ], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de. You may close this tab." ) @@ -256,8 +375,7 @@ def test_oauth2_authorization_code_and_multiple_authentication_can_be_combined( opened_url="http://provide_code?response_type=code&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F", reply_url="http://localhost:5000#code=SplxlOBeZQQYbYS6WxSbIA&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de", ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -271,12 +389,23 @@ def test_oauth2_authorization_code_and_multiple_authentication_can_be_combined( api_key_auth2 = requests_auth.HeaderApiKey( "my_provided_api_key2", header_name="X-Api-Key2" ) - header = get_header( - responses, authorization_code_auth & (api_key_auth & api_key_auth2) + auth = authorization_code_auth & (api_key_auth & api_key_auth2) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + } + ), + ], ) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de. You may close this tab." ) @@ -295,8 +424,7 @@ def test_oauth2_pkce_and_api_key_authentication_can_be_combined( opened_url="http://provide_code?response_type=code&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F&code_challenge=5C_ph_KZ3DstYUc965SiqmKAA-ShvKF4Ut7daKd3fjc&code_challenge_method=S256", reply_url="http://localhost:5000#code=SplxlOBeZQQYbYS6WxSbIA&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de", ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -307,9 +435,22 @@ def test_oauth2_pkce_and_api_key_authentication_can_be_combined( }, ) api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, pkce_auth & api_key_auth) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = pkce_auth & api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + } + ), + ], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de. You may close this tab." ) @@ -328,8 +469,7 @@ def test_oauth2_pkce_and_multiple_authentication_can_be_combined( opened_url="http://provide_code?response_type=code&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F&code_challenge=5C_ph_KZ3DstYUc965SiqmKAA-ShvKF4Ut7daKd3fjc&code_challenge_method=S256", reply_url="http://localhost:5000#code=SplxlOBeZQQYbYS6WxSbIA&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de", ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -343,10 +483,23 @@ def test_oauth2_pkce_and_multiple_authentication_can_be_combined( api_key_auth2 = requests_auth.HeaderApiKey( "my_provided_api_key2", header_name="X-Api-Key2" ) - header = get_header(responses, pkce_auth & (api_key_auth & api_key_auth2)) - assert header.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" + auth = pkce_auth & (api_key_auth & api_key_auth2) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA", + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + } + ), + ], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de. You may close this tab." ) @@ -366,9 +519,22 @@ def test_oauth2_implicit_and_api_key_authentication_can_be_combined( data=f"access_token={token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) api_key_auth = requests_auth.HeaderApiKey("my_provided_api_key") - header = get_header(responses, implicit_auth & api_key_auth) - assert header.get("Authorization") == f"Bearer {token}" - assert header.get("X-Api-Key") == "my_provided_api_key" + auth = implicit_auth & api_key_auth + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": f"Bearer {token}", + "X-API-Key": "my_provided_api_key", + } + ), + ], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) @@ -391,10 +557,23 @@ def test_oauth2_implicit_and_multiple_authentication_can_be_combined( api_key_auth2 = requests_auth.HeaderApiKey( "my_provided_api_key2", header_name="X-Api-Key2" ) - header = get_header(responses, implicit_auth & (api_key_auth & api_key_auth2)) - assert header.get("Authorization") == f"Bearer {token}" - assert header.get("X-Api-Key") == "my_provided_api_key" - assert header.get("X-Api-Key2") == "my_provided_api_key2" + auth = implicit_auth & (api_key_auth & api_key_auth2) + + responses.get( + "http://authorized_only", + match=[ + header_matcher( + { + "Authorization": f"Bearer {token}", + "X-API-Key": "my_provided_api_key", + "X-Api-Key2": "my_provided_api_key2", + } + ), + ], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) diff --git a/tests/features/pytest_fixture/test_testing_token_mock.py b/tests/features/pytest_fixture/test_testing_token_mock.py index d44f0cb..9cfc04e 100644 --- a/tests/features/pytest_fixture/test_testing_token_mock.py +++ b/tests/features/pytest_fixture/test_testing_token_mock.py @@ -1,13 +1,18 @@ +import requests from responses import RequestsMock +from responses.matchers import header_matcher import requests_auth -from requests_auth.testing import token_cache_mock, token_mock -from tests.auth_helper import get_header +from requests_auth.testing import token_cache_mock, token_mock # noqa: F401 def test_token_mock(token_cache_mock, responses: RequestsMock): auth = requests_auth.OAuth2Implicit("http://provide_token") expected_token = requests_auth.OAuth2.token_cache.get_token("") - assert ( - get_header(responses, auth).get("Authorization") == f"Bearer {expected_token}" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {expected_token}"})], ) + + requests.get("http://authorized_only", auth=auth) diff --git a/tests/features/token_cache/test_testing_oauth2_authorization_code.py b/tests/features/token_cache/test_testing_oauth2_authorization_code.py index 5081ba6..06c50e5 100644 --- a/tests/features/token_cache/test_testing_oauth2_authorization_code.py +++ b/tests/features/token_cache/test_testing_oauth2_authorization_code.py @@ -1,9 +1,10 @@ import pytest +import requests from responses import RequestsMock +from responses.matchers import header_matcher import requests_auth -from requests_auth.testing import token_cache_mock -from tests.auth_helper import get_header +from requests_auth.testing import token_cache_mock # noqa: F401 @pytest.fixture @@ -15,37 +16,49 @@ def test_oauth2_authorization_code_flow(token_cache_mock, responses: RequestsMoc auth = requests_auth.OAuth2AuthorizationCode( "http://provide_code", "http://provide_access_token" ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_okta_authorization_code_flow(token_cache_mock, responses: RequestsMock): auth = requests_auth.OktaAuthorizationCode( "testserver.okta-emea.com", "54239d18-c68c-4c47-8bdd-ce71ea1d50cd" ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_oauth2_authorization_code_pkce_flow(token_cache_mock, responses: RequestsMock): auth = requests_auth.OAuth2AuthorizationCodePKCE( "http://provide_code", "http://provide_access_token" ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_okta_authorization_code_pkce_flow(token_cache_mock, responses: RequestsMock): auth = requests_auth.OktaAuthorizationCodePKCE( "testserver.okta-emea.com", "54239d18-c68c-4c47-8bdd-ce71ea1d50cd" ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + + requests.get("http://authorized_only", auth=auth) diff --git a/tests/features/token_cache/test_testing_oauth2_implicit.py b/tests/features/token_cache/test_testing_oauth2_implicit.py index 4c435f7..427936b 100644 --- a/tests/features/token_cache/test_testing_oauth2_implicit.py +++ b/tests/features/token_cache/test_testing_oauth2_implicit.py @@ -1,9 +1,10 @@ import pytest +import requests from responses import RequestsMock +from responses.matchers import header_matcher import requests_auth -from requests_auth.testing import token_cache_mock -from tests.auth_helper import get_header +from requests_auth.testing import token_cache_mock # noqa: F401 @pytest.fixture @@ -13,47 +14,62 @@ def token_mock() -> str: def test_oauth2_implicit_flow(token_cache_mock, responses: RequestsMock): auth = requests_auth.OAuth2Implicit("http://provide_token") - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_okta_implicit_flow(token_cache_mock, responses: RequestsMock): auth = requests_auth.OktaImplicit( "testserver.okta-emea.com", "54239d18-c68c-4c47-8bdd-ce71ea1d50cd" ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_aad_implicit_flow(token_cache_mock, responses: RequestsMock): auth = requests_auth.AzureActiveDirectoryImplicit( "45239d18-c68c-4c47-8bdd-ce71ea1d50cd", "54239d18-c68c-4c47-8bdd-ce71ea1d50cd" ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_okta_implicit_id_token_flow(token_cache_mock, responses: RequestsMock): auth = requests_auth.OktaImplicitIdToken( "testserver.okta-emea.com", "54239d18-c68c-4c47-8bdd-ce71ea1d50cd" ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_aad_implicit_id_token_flow(token_cache_mock, responses: RequestsMock): auth = requests_auth.AzureActiveDirectoryImplicitIdToken( "45239d18-c68c-4c47-8bdd-ce71ea1d50cd", "54239d18-c68c-4c47-8bdd-ce71ea1d50cd" ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + + requests.get("http://authorized_only", auth=auth) diff --git a/tests/oauth2/client_credential/test_oauth2_client_credential.py b/tests/oauth2/client_credential/test_oauth2_client_credential.py index 981829c..1464a8f 100644 --- a/tests/oauth2/client_credential/test_oauth2_client_credential.py +++ b/tests/oauth2/client_credential/test_oauth2_client_credential.py @@ -1,10 +1,10 @@ from responses import RequestsMock +from responses.matchers import header_matcher import pytest import requests import requests_auth from requests_auth.testing import token_cache # noqa: F401 -from tests.auth_helper import get_header, get_request def test_oauth2_client_credentials_flow_uses_provided_session( @@ -18,8 +18,7 @@ def test_oauth2_client_credentials_flow_uses_provided_session( client_secret="test_pwd", session=session, ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -28,13 +27,16 @@ def test_oauth2_client_credentials_flow_uses_provided_session( "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + header_matcher({"x-test": "Test value"}), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) - request = get_request(responses, "http://provide_access_token/") - assert request.headers["x-test"] == "Test value" + + requests.get("http://authorized_only", auth=auth) def test_oauth2_client_credentials_flow_token_is_sent_in_authorization_header_by_default( @@ -43,8 +45,7 @@ def test_oauth2_client_credentials_flow_token_is_sent_in_authorization_header_by auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -54,11 +55,13 @@ def test_oauth2_client_credentials_flow_token_is_sent_in_authorization_header_by "example_parameter": "example_value", }, ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_oauth2_client_credentials_flow_token_is_expired_after_30_seconds_by_default( token_cache, responses: RequestsMock @@ -73,8 +76,7 @@ def test_oauth2_client_credentials_flow_token_is_expired_after_30_seconds_by_def expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) # Meaning a new one will be requested - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -84,11 +86,13 @@ def test_oauth2_client_credentials_flow_token_is_expired_after_30_seconds_by_def "example_parameter": "example_value", }, ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_oauth2_client_credentials_flow_token_custom_expiry( token_cache, responses: RequestsMock @@ -105,18 +109,19 @@ def test_oauth2_client_credentials_flow_token_custom_expiry( token="2YotnFZFEjr1zCsicMWpAA", expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_expires_in_sent_as_str(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -126,19 +131,19 @@ def test_expires_in_sent_as_str(token_cache, responses: RequestsMock): "example_parameter": "example_value", }, ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_with_invalid_grant_request_no_json(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, "http://provide_access_token", body="failure", status=400 - ) + responses.post("http://provide_access_token", body="failure", status=400) with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: requests.get("http://authorized_only", auth=auth) assert str(exception_info.value) == "failure" @@ -150,8 +155,7 @@ def test_with_invalid_grant_request_invalid_request_error( auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_request"}, status=400, @@ -173,8 +177,7 @@ def test_with_invalid_grant_request_invalid_request_error_and_error_description( auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_request", "error_description": "desc of the error"}, status=400, @@ -190,8 +193,7 @@ def test_with_invalid_grant_request_invalid_request_error_and_error_description_ auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "error": "invalid_request", @@ -214,8 +216,7 @@ def test_with_invalid_grant_request_invalid_request_error_and_error_description_ auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "error": "invalid_request", @@ -237,8 +238,7 @@ def test_with_invalid_grant_request_without_error(token_cache, responses: Reques auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"other": "other info"}, status=400, @@ -254,8 +254,7 @@ def test_with_invalid_grant_request_invalid_client_error( auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_client"}, status=400, @@ -281,8 +280,7 @@ def test_with_invalid_grant_request_invalid_grant_error( auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_grant"}, status=400, @@ -304,8 +302,7 @@ def test_with_invalid_grant_request_unauthorized_client_error( auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "unauthorized_client"}, status=400, @@ -325,8 +322,7 @@ def test_with_invalid_grant_request_unsupported_grant_type_error( auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "unsupported_grant_type"}, status=400, @@ -346,8 +342,7 @@ def test_with_invalid_grant_request_invalid_scope_error( auth = requests_auth.OAuth2ClientCredentials( "http://provide_access_token", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_scope"}, status=400, diff --git a/tests/oauth2/client_credential/test_oauth2_client_credential_okta.py b/tests/oauth2/client_credential/test_oauth2_client_credential_okta.py index b6fc653..658674d 100644 --- a/tests/oauth2/client_credential/test_oauth2_client_credential_okta.py +++ b/tests/oauth2/client_credential/test_oauth2_client_credential_okta.py @@ -1,9 +1,9 @@ import requests from responses import RequestsMock +from responses.matchers import header_matcher import requests_auth from requests_auth.testing import token_cache # noqa: F401 -from tests.auth_helper import get_header, get_request def test_okta_client_credentials_flow_uses_provided_session( @@ -14,8 +14,7 @@ def test_okta_client_credentials_flow_uses_provided_session( auth = requests_auth.OktaClientCredentials( "test_okta", client_id="test_user", client_secret="test_pwd", session=session ) - responses.add( - responses.POST, + responses.post( "https://test_okta/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -24,13 +23,16 @@ def test_okta_client_credentials_flow_uses_provided_session( "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + header_matcher({"x-test": "Test value"}), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) - request = get_request(responses, "https://test_okta/oauth2/default/v1/token") - assert request.headers["x-test"] == "Test value" + + requests.get("http://authorized_only", auth=auth) def test_okta_client_credentials_flow_token_is_sent_in_authorization_header_by_default( @@ -39,8 +41,7 @@ def test_okta_client_credentials_flow_token_is_sent_in_authorization_header_by_d auth = requests_auth.OktaClientCredentials( "test_okta", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "https://test_okta/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -50,11 +51,13 @@ def test_okta_client_credentials_flow_token_is_sent_in_authorization_header_by_d "example_parameter": "example_value", }, ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_okta_client_credentials_flow_token_is_expired_after_30_seconds_by_default( token_cache, responses: RequestsMock @@ -69,8 +72,7 @@ def test_okta_client_credentials_flow_token_is_expired_after_30_seconds_by_defau expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) # Meaning a new one will be requested - responses.add( - responses.POST, + responses.post( "https://test_okta/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -80,11 +82,13 @@ def test_okta_client_credentials_flow_token_is_expired_after_30_seconds_by_defau "example_parameter": "example_value", }, ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_okta_client_credentials_flow_token_custom_expiry( token_cache, responses: RequestsMock @@ -101,18 +105,19 @@ def test_okta_client_credentials_flow_token_custom_expiry( token="2YotnFZFEjr1zCsicMWpAA", expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_expires_in_sent_as_str(token_cache, responses: RequestsMock): auth = requests_auth.OktaClientCredentials( "test_okta", client_id="test_user", client_secret="test_pwd" ) - responses.add( - responses.POST, + responses.post( "https://test_okta/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -122,7 +127,9 @@ def test_expires_in_sent_as_str(token_cache, responses: RequestsMock): "example_parameter": "example_value", }, ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + + requests.get("http://authorized_only", auth=auth) diff --git a/tests/oauth2/implicit/test_oauth2_implicit.py b/tests/oauth2/implicit/test_oauth2_implicit.py index 0aa1cb5..a19e619 100644 --- a/tests/oauth2/implicit/test_oauth2_implicit.py +++ b/tests/oauth2/implicit/test_oauth2_implicit.py @@ -4,14 +4,14 @@ import requests import pytest from responses import RequestsMock +from responses.matchers import header_matcher from requests_auth.testing import ( BrowserMock, create_token, - browser_mock, - token_cache, -) # noqa: F401 -from tests.auth_helper import get_header + browser_mock, # noqa: F401 + token_cache, # noqa: F401 +) import requests_auth @@ -43,8 +43,12 @@ def test_oauth2_implicit_flow_token_is_not_reused_if_a_url_parameter_is_changing reply_url="http://localhost:5000", data=f"custom_token={first_token}&state=5652a8138e3a99dab7b94532c73ed5b10f19405316035d1efdc8bf7e0713690485254c2eaff912040eac44031889ef0a5ed5730c8a111541120d64a898c31afe", ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {first_token}"})], + ) - assert get_header(responses, auth1).get("Authorization") == f"Bearer {first_token}" + requests.get("http://authorized_only", auth=auth1) # Ensure that the new token is different than previous one expiry_in_1_hour = datetime.datetime.now( @@ -61,9 +65,13 @@ def test_oauth2_implicit_flow_token_is_not_reused_if_a_url_parameter_is_changing reply_url="http://localhost:5000", data=f"custom_token={second_token}&state=5c3940ccf78ac6e7d6d8d06782d9fd95a533aa5425b616eaa38dc3ec9508fbd55152c58a0d8dd8a087e76b77902559285819a41cb78ce8713e5a3b974bf07ce9", ) - response = requests.get("http://authorized_only", auth=auth2) - # Return headers received on this dummy URL - assert response.request.headers.get("Authorization") == f"Bearer {second_token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {second_token}"})], + ) + + requests.get("http://authorized_only", auth=auth2) + tab1.assert_success( "You are now authenticated on 5652a8138e3a99dab7b94532c73ed5b10f19405316035d1efdc8bf7e0713690485254c2eaff912040eac44031889ef0a5ed5730c8a111541120d64a898c31afe. You may close this tab." ) @@ -88,15 +96,23 @@ def test_oauth2_implicit_flow_token_is_reused_if_only_nonce_differs( reply_url="http://localhost:5000", data=f"custom_token={token}&state=67b95d2c7555751d1d72c97c7cd9ad6630c8395e0eaa51ee86ac7e451211ded9cd98a7190848789fe93632d8960425710e93f1f5549c6c6bc328bf3865a85ff2", ) - assert get_header(responses, auth1).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth1) auth2 = requests_auth.OAuth2Implicit( "http://provide_token?response_type=custom_token&nonce=2", token_field_name="custom_token", ) - response = requests.get("http://authorized_only", auth=auth2) - # Return headers received on this dummy URL - assert response.request.headers.get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth2) tab.assert_success( "You are now authenticated on 67b95d2c7555751d1d72c97c7cd9ad6630c8395e0eaa51ee86ac7e451211ded9cd98a7190848789fe93632d8960425710e93f1f5549c6c6bc328bf3865a85ff2. You may close this tab." ) @@ -119,7 +135,38 @@ def test_oauth2_implicit_flow_token_can_be_requested_on_a_custom_server_port( reply_url="http://localhost:5002", data=f"access_token={token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) - assert get_header(responses, auth).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) + tab.assert_success( + "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." + ) + + +def test_oauth2_implicit_flow_uses_redirect_uri_domain( + token_cache, responses: RequestsMock, browser_mock: BrowserMock +): + auth = requests_auth.OAuth2Implicit( + "http://provide_token", redirect_uri_domain="localhost.mycompany.com" + ) + expiry_in_1_hour = datetime.datetime.now( + datetime.timezone.utc + ) + datetime.timedelta(hours=1) + token = create_token(expiry_in_1_hour) + tab = browser_mock.add_response( + opened_url="http://provide_token?response_type=token&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521&redirect_uri=http%3A%2F%2Flocalhost.mycompany.com%3A5000%2F", + reply_url="http://localhost:5000", + data=f"access_token={token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) @@ -138,7 +185,12 @@ def test_oauth2_implicit_flow_post_token_is_sent_in_authorization_header_by_defa reply_url="http://localhost:5000", data=f"access_token={token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) - assert get_header(responses, auth).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) @@ -167,7 +219,12 @@ def test_oauth2_implicit_flow_token_is_expired_after_30_seconds_by_default( reply_url="http://localhost:5000", data=f"access_token={token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) - assert get_header(responses, auth).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) @@ -187,7 +244,12 @@ def test_oauth2_implicit_flow_token_custom_expiry( expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) token = create_token(expiry_in_29_seconds) - assert get_header(responses, auth).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) def test_browser_opening_failure(token_cache, responses: RequestsMock, monkeypatch): @@ -257,7 +319,12 @@ def test_state_change(token_cache, responses: RequestsMock, browser_mock: Browse reply_url="http://localhost:5000", data=f"access_token={token}&state=123456", ) - assert get_header(responses, auth).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) tab.assert_success("You are now authenticated on 123456. You may close this tab.") @@ -307,7 +374,12 @@ def test_oauth2_implicit_flow_get_token_is_sent_in_authorization_header_by_defau opened_url="http://provide_token?response_type=token&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F", reply_url=f"http://localhost:5000#access_token={token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) - assert get_header(responses, auth).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) @@ -328,7 +400,12 @@ def test_oauth2_implicit_flow_token_is_sent_in_requested_field( reply_url="http://localhost:5000", data=f"access_token={token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) - assert get_header(responses, auth).get("Bearer") == token + responses.get( + "http://authorized_only", + match=[header_matcher({"Bearer": token})], + ) + + requests.get("http://authorized_only", auth=auth) tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) @@ -351,7 +428,12 @@ def test_oauth2_implicit_flow_can_send_a_custom_response_type_and_expects_token_ reply_url="http://localhost:5000", data=f"custom_token={token}&state=67b95d2c7555751d1d72c97c7cd9ad6630c8395e0eaa51ee86ac7e451211ded9cd98a7190848789fe93632d8960425710e93f1f5549c6c6bc328bf3865a85ff2", ) - assert get_header(responses, auth).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) tab.assert_success( "You are now authenticated on 67b95d2c7555751d1d72c97c7cd9ad6630c8395e0eaa51ee86ac7e451211ded9cd98a7190848789fe93632d8960425710e93f1f5549c6c6bc328bf3865a85ff2. You may close this tab." ) @@ -372,7 +454,12 @@ def test_oauth2_implicit_flow_expects_token_in_id_token_if_response_type_is_id_t reply_url="http://localhost:5000", data=f"id_token={token}&state=87c4108ec0eb03599335333a40434a36674269690b6957fef684bfb6c5a849ce660ef7031aa874c44d67cd3eada8febdfce41efb1ed3bc53a0a7e716cbba025a", ) - assert get_header(responses, auth).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) tab.assert_success( "You are now authenticated on 87c4108ec0eb03599335333a40434a36674269690b6957fef684bfb6c5a849ce660ef7031aa874c44d67cd3eada8febdfce41efb1ed3bc53a0a7e716cbba025a. You may close this tab." ) @@ -391,7 +478,12 @@ def test_oauth2_implicit_flow_expects_token_in_id_token_if_response_type_in_url_ reply_url="http://localhost:5000", data=f"id_token={token}&state=87c4108ec0eb03599335333a40434a36674269690b6957fef684bfb6c5a849ce660ef7031aa874c44d67cd3eada8febdfce41efb1ed3bc53a0a7e716cbba025a", ) - assert get_header(responses, auth).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) tab.assert_success( "You are now authenticated on 87c4108ec0eb03599335333a40434a36674269690b6957fef684bfb6c5a849ce660ef7031aa874c44d67cd3eada8febdfce41efb1ed3bc53a0a7e716cbba025a. You may close this tab." ) @@ -410,7 +502,12 @@ def test_oauth2_implicit_flow_expects_token_to_be_stored_in_access_token_by_defa reply_url="http://localhost:5000", data=f"access_token={token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) - assert get_header(responses, auth).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth) tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) @@ -429,12 +526,20 @@ def test_oauth2_implicit_flow_token_is_reused_if_not_expired( reply_url="http://localhost:5000", data=f"access_token={token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) - assert get_header(responses, auth1).get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=auth1) oauth2 = requests_auth.OAuth2Implicit("http://provide_token") - response = requests.get("http://authorized_only", auth=oauth2) - # Return headers received on this dummy URL - assert response.request.headers.get("Authorization") == f"Bearer {token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {token}"})], + ) + + requests.get("http://authorized_only", auth=oauth2) tab.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) @@ -768,7 +873,12 @@ def test_oauth2_implicit_flow_token_is_requested_again_if_expired( reply_url="http://localhost:5000", data=f"access_token={first_token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) - assert get_header(responses, auth).get("Authorization") == f"Bearer {first_token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {first_token}"})], + ) + + requests.get("http://authorized_only", auth=auth) # Wait to ensure that the token will be considered as expired time.sleep(0.2) @@ -783,9 +893,12 @@ def test_oauth2_implicit_flow_token_is_requested_again_if_expired( reply_url="http://localhost:5000", data=f"access_token={second_token}&state=42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521", ) - response = requests.get("http://authorized_only", auth=auth) - # Return headers received on this dummy URL - assert response.request.headers.get("Authorization") == f"Bearer {second_token}" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {second_token}"})], + ) + + requests.get("http://authorized_only", auth=auth) tab1.assert_success( "You are now authenticated on 42a85b271b7a652ca3cc4c398cfd3f01b9ad36bf9c945ba823b023e8f8b95c4638576a0e3dcc96838b838bec33ec6c0ee2609d62ed82480b3b8114ca494c0521. You may close this tab." ) diff --git a/tests/oauth2/resource_owner_password/test_oauth2_resource_owner_password.py b/tests/oauth2/resource_owner_password/test_oauth2_resource_owner_password.py index 768b486..06d88d1 100644 --- a/tests/oauth2/resource_owner_password/test_oauth2_resource_owner_password.py +++ b/tests/oauth2/resource_owner_password/test_oauth2_resource_owner_password.py @@ -1,11 +1,18 @@ -from responses import RequestsMock -from responses.matchers import urlencoded_params_matcher +from responses import RequestsMock, Response +from responses.matchers import header_matcher, urlencoded_params_matcher import pytest import requests import requests_auth from requests_auth.testing import token_cache # noqa: F401 -from tests.auth_helper import get_header, get_request + + +def get_request(responses: RequestsMock, url: str) -> Response: + for call in responses.calls: + if call.request.url == url: + # Pop out verified request (to be able to check multiple requests) + responses.calls._calls.remove(call) + return call.request def test_oauth2_password_credentials_flow_uses_provided_session( @@ -19,8 +26,7 @@ def test_oauth2_password_credentials_flow_uses_provided_session( password="test_pwd", session=session, ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -29,14 +35,23 @@ def test_oauth2_password_credentials_flow_uses_provided_session( "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + } + ), + header_matcher({"x-test": "Test value"}), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) - request = get_request(responses, "http://provide_access_token/") - assert request.body == "grant_type=password&username=test_user&password=test_pwd" - assert request.headers["x-test"] == "Test value" + + requests.get("http://authorized_only", auth=auth) def test_oauth2_password_credentials_flow_token_is_sent_in_authorization_header_by_default( @@ -45,8 +60,7 @@ def test_oauth2_password_credentials_flow_token_is_sent_in_authorization_header_ auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -55,16 +69,23 @@ def test_oauth2_password_credentials_flow_token_is_sent_in_authorization_header_ "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + } + ), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request(responses, "http://provide_access_token/").body - == "grant_type=password&username=test_user&password=test_pwd" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_oauth2_password_credentials_flow_does_not_authenticate_by_default( token_cache, responses: RequestsMock @@ -72,8 +93,7 @@ def test_oauth2_password_credentials_flow_does_not_authenticate_by_default( auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -82,15 +102,24 @@ def test_oauth2_password_credentials_flow_does_not_authenticate_by_default( "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + } + ), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + + requests.get("http://authorized_only", auth=auth) + token_request = get_request(responses, "http://provide_access_token/") - assert ( - token_request.body == "grant_type=password&username=test_user&password=test_pwd" - ) assert "Authorization" not in token_request.headers @@ -103,8 +132,7 @@ def test_oauth2_password_credentials_flow_authentication( password="test_pwd", session_auth=("test_user2", "test_pwd2"), ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -113,19 +141,24 @@ def test_oauth2_password_credentials_flow_authentication( "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + } + ), + header_matcher({"Authorization": "Basic dGVzdF91c2VyMjp0ZXN0X3B3ZDI="}), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - token_request = get_request(responses, "http://provide_access_token/") - assert ( - token_request.body == "grant_type=password&username=test_user&password=test_pwd" - ) - assert ( - "Basic dGVzdF91c2VyMjp0ZXN0X3B3ZDI=" == token_request.headers["Authorization"] + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_oauth2_password_credentials_flow_token_is_expired_after_30_seconds_by_default( token_cache, responses: RequestsMock @@ -140,8 +173,7 @@ def test_oauth2_password_credentials_flow_token_is_expired_after_30_seconds_by_d expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) # Meaning a new one will be requested - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -150,16 +182,23 @@ def test_oauth2_password_credentials_flow_token_is_expired_after_30_seconds_by_d "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + } + ), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request(responses, "http://provide_access_token/").body - == "grant_type=password&username=test_user&password=test_pwd" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_oauth2_password_credentials_flow_token_custom_expiry( token_cache, responses: RequestsMock @@ -176,18 +215,19 @@ def test_oauth2_password_credentials_flow_token_custom_expiry( token="2YotnFZFEjr1zCsicMWpAA", expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_expires_in_sent_as_str(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -196,24 +236,30 @@ def test_expires_in_sent_as_str(token_cache, responses: RequestsMock): "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + } + ), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request(responses, "http://provide_access_token/").body - == "grant_type=password&username=test_user&password=test_pwd" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_refresh_token(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) # response for password grant - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -232,19 +278,15 @@ def test_refresh_token(token_cache, responses: RequestsMock): ) ], ) - - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request(responses, "http://provide_access_token/").body - == "grant_type=password&username=test_user&password=test_pwd" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + # response for refresh token grant - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "rVR7Syg5bjZtZYjbZIW", @@ -262,22 +304,20 @@ def test_refresh_token(token_cache, responses: RequestsMock): ) ], ) - - response = requests.get("http://authorized_only", auth=auth) - assert response.request.headers.get("Authorization") == "Bearer rVR7Syg5bjZtZYjbZIW" - assert ( - get_request(responses, "http://provide_access_token/").body - == "grant_type=refresh_token&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer rVR7Syg5bjZtZYjbZIW"})], ) + requests.get("http://authorized_only", auth=auth) + def test_refresh_token_invalid(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) # response for password grant - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -296,19 +336,15 @@ def test_refresh_token_invalid(token_cache, responses: RequestsMock): ) ], ) - - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request(responses, "http://provide_access_token/").body - == "grant_type=password&username=test_user&password=test_pwd" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + # response for refresh token grant - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_request"}, status=400, @@ -323,19 +359,20 @@ def test_refresh_token_invalid(token_cache, responses: RequestsMock): ) # if refreshing the token fails, fallback to requesting a new token - response = requests.get("http://authorized_only", auth=auth) - assert ( - response.request.headers.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_refresh_token_access_token_not_expired(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) # response for password grant - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -354,22 +391,21 @@ def test_refresh_token_access_token_not_expired(token_cache, responses: Requests ) ], ) - - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request(responses, "http://provide_access_token/").body - == "grant_type=password&username=test_user&password=test_pwd" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + # expect Bearer token to remain the same - response = requests.get("http://authorized_only", auth=auth) - assert ( - response.request.headers.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_scope_is_sent_as_is_when_provided_as_str(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( @@ -378,8 +414,7 @@ def test_scope_is_sent_as_is_when_provided_as_str(token_cache, responses: Reques password="test_pwd", scope="my_scope+my_other_scope", ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -388,16 +423,24 @@ def test_scope_is_sent_as_is_when_provided_as_str(token_cache, responses: Reques "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + "scope": "my_scope+my_other_scope", + } + ), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request(responses, "http://provide_access_token/").body - == "grant_type=password&username=test_user&password=test_pwd&scope=my_scope%2Bmy_other_scope" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_scope_is_sent_as_str_when_provided_as_list( token_cache, responses: RequestsMock @@ -408,8 +451,7 @@ def test_scope_is_sent_as_str_when_provided_as_list( password="test_pwd", scope=["my_scope", "my_other_scope"], ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -418,24 +460,30 @@ def test_scope_is_sent_as_str_when_provided_as_list( "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + "scope": "my_scope my_other_scope", + } + ) + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request(responses, "http://provide_access_token/").body - == "grant_type=password&username=test_user&password=test_pwd&scope=my_scope+my_other_scope" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_with_invalid_grant_request_no_json(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, "http://provide_access_token", body="failure", status=400 - ) + responses.post("http://provide_access_token", body="failure", status=400) with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: requests.get("http://authorized_only", auth=auth) assert str(exception_info.value) == "failure" @@ -447,8 +495,7 @@ def test_with_invalid_grant_request_invalid_request_error( auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_request"}, status=400, @@ -470,8 +517,7 @@ def test_with_invalid_grant_request_invalid_request_error_and_error_description( auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_request", "error_description": "desc of the error"}, status=400, @@ -487,8 +533,7 @@ def test_with_invalid_grant_request_invalid_request_error_and_error_description_ auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "error": "invalid_request", @@ -511,8 +556,7 @@ def test_with_invalid_grant_request_invalid_request_error_and_error_description_ auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "error": "invalid_request", @@ -534,8 +578,7 @@ def test_with_invalid_grant_request_without_error(token_cache, responses: Reques auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"other": "other info"}, status=400, @@ -551,8 +594,7 @@ def test_with_invalid_grant_request_invalid_client_error( auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_client"}, status=400, @@ -578,8 +620,7 @@ def test_with_invalid_grant_request_invalid_grant_error( auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_grant"}, status=400, @@ -601,8 +642,7 @@ def test_with_invalid_grant_request_unauthorized_client_error( auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "unauthorized_client"}, status=400, @@ -622,8 +662,7 @@ def test_with_invalid_grant_request_unsupported_grant_type_error( auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "unsupported_grant_type"}, status=400, @@ -643,8 +682,7 @@ def test_with_invalid_grant_request_invalid_scope_error( auth = requests_auth.OAuth2ResourceOwnerPasswordCredentials( "http://provide_access_token", username="test_user", password="test_pwd" ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={"error": "invalid_scope"}, status=400, @@ -665,8 +703,7 @@ def test_without_expected_token(token_cache, responses: RequestsMock): password="test_pwd", token_field_name="not_provided", ) - responses.add( - responses.POST, + responses.post( "http://provide_access_token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", diff --git a/tests/oauth2/resource_owner_password/test_oauth2_resource_owner_password_okta.py b/tests/oauth2/resource_owner_password/test_oauth2_resource_owner_password_okta.py index 5b9c88d..e757a35 100644 --- a/tests/oauth2/resource_owner_password/test_oauth2_resource_owner_password_okta.py +++ b/tests/oauth2/resource_owner_password/test_oauth2_resource_owner_password_okta.py @@ -1,11 +1,10 @@ from responses import RequestsMock -from responses.matchers import urlencoded_params_matcher +from responses.matchers import header_matcher, urlencoded_params_matcher import pytest import requests import requests_auth from requests_auth.testing import token_cache # noqa: F401 -from tests.auth_helper import get_header, get_request def test_oauth2_password_credentials_flow_uses_provided_session( @@ -21,8 +20,7 @@ def test_oauth2_password_credentials_flow_uses_provided_session( client_secret="test_pwd2", session=session, ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -31,19 +29,24 @@ def test_oauth2_password_credentials_flow_uses_provided_session( "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + "scope": "openid", + } + ), + header_matcher({"x-test": "Test value"}), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - request = get_request( - responses, "https://testserver.okta-emea.com/oauth2/default/v1/token" - ) - assert ( - request.body - == "grant_type=password&username=test_user&password=test_pwd&scope=openid" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) - assert request.headers["x-test"] == "Test value" + + requests.get("http://authorized_only", auth=auth) def test_oauth2_password_credentials_flow_token_is_sent_in_authorization_header_by_default( @@ -56,8 +59,7 @@ def test_oauth2_password_credentials_flow_token_is_sent_in_authorization_header_ client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -66,22 +68,25 @@ def test_oauth2_password_credentials_flow_token_is_sent_in_authorization_header_ "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + "scope": "openid", + } + ), + header_matcher({"Authorization": "Basic dGVzdF91c2VyMjp0ZXN0X3B3ZDI="}), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - token_request = get_request( - responses, "https://testserver.okta-emea.com/oauth2/default/v1/token" - ) - assert ( - token_request.body - == "grant_type=password&username=test_user&password=test_pwd&scope=openid" - ) - assert ( - "Basic dGVzdF91c2VyMjp0ZXN0X3B3ZDI=" == token_request.headers["Authorization"] + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_oauth2_password_credentials_flow_token_is_expired_after_30_seconds_by_default( token_cache, responses: RequestsMock @@ -100,8 +105,7 @@ def test_oauth2_password_credentials_flow_token_is_expired_after_30_seconds_by_d expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) # Meaning a new one will be requested - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -110,18 +114,24 @@ def test_oauth2_password_credentials_flow_token_is_expired_after_30_seconds_by_d "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + "scope": "openid", + } + ), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request( - responses, "https://testserver.okta-emea.com/oauth2/default/v1/token" - ).body - == "grant_type=password&username=test_user&password=test_pwd&scope=openid" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_oauth2_password_credentials_flow_token_custom_expiry( token_cache, responses: RequestsMock @@ -140,11 +150,13 @@ def test_oauth2_password_credentials_flow_token_custom_expiry( token="2YotnFZFEjr1zCsicMWpAA", expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_expires_in_sent_as_str(token_cache, responses: RequestsMock): auth = requests_auth.OktaResourceOwnerPasswordCredentials( @@ -154,8 +166,7 @@ def test_expires_in_sent_as_str(token_cache, responses: RequestsMock): client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -164,18 +175,24 @@ def test_expires_in_sent_as_str(token_cache, responses: RequestsMock): "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + "scope": "openid", + } + ), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request( - responses, "https://testserver.okta-emea.com/oauth2/default/v1/token" - ).body - == "grant_type=password&username=test_user&password=test_pwd&scope=openid" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_refresh_token(token_cache, responses: RequestsMock): auth = requests_auth.OktaResourceOwnerPasswordCredentials( @@ -186,8 +203,7 @@ def test_refresh_token(token_cache, responses: RequestsMock): client_secret="test_pwd2", ) # response for password grant - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -207,21 +223,15 @@ def test_refresh_token(token_cache, responses: RequestsMock): ) ], ) - - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request( - responses, "https://testserver.okta-emea.com/oauth2/default/v1/token" - ).body - == "grant_type=password&username=test_user&password=test_pwd&scope=openid" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + # response for refresh token grant - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "rVR7Syg5bjZtZYjbZIW", @@ -240,16 +250,13 @@ def test_refresh_token(token_cache, responses: RequestsMock): ) ], ) - - response = requests.get("http://authorized_only", auth=auth) - assert response.request.headers.get("Authorization") == "Bearer rVR7Syg5bjZtZYjbZIW" - assert ( - get_request( - responses, "https://testserver.okta-emea.com/oauth2/default/v1/token" - ).body - == "grant_type=refresh_token&scope=openid&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer rVR7Syg5bjZtZYjbZIW"})], ) + requests.get("http://authorized_only", auth=auth) + def test_refresh_token_invalid(token_cache, responses: RequestsMock): auth = requests_auth.OktaResourceOwnerPasswordCredentials( @@ -260,8 +267,7 @@ def test_refresh_token_invalid(token_cache, responses: RequestsMock): client_secret="test_pwd2", ) # response for password grant - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -281,21 +287,15 @@ def test_refresh_token_invalid(token_cache, responses: RequestsMock): ) ], ) - - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request( - responses, "https://testserver.okta-emea.com/oauth2/default/v1/token" - ).body - == "grant_type=password&username=test_user&password=test_pwd&scope=openid" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + # response for refresh token grant - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={"error": "invalid_request"}, status=400, @@ -311,11 +311,13 @@ def test_refresh_token_invalid(token_cache, responses: RequestsMock): ) # if refreshing the token fails, fallback to requesting a new token - response = requests.get("http://authorized_only", auth=auth) - assert ( - response.request.headers.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_refresh_token_access_token_not_expired(token_cache, responses: RequestsMock): auth = requests_auth.OktaResourceOwnerPasswordCredentials( @@ -326,8 +328,7 @@ def test_refresh_token_access_token_not_expired(token_cache, responses: Requests client_secret="test_pwd2", ) # response for password grant - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -347,24 +348,21 @@ def test_refresh_token_access_token_not_expired(token_cache, responses: Requests ) ], ) - - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request( - responses, "https://testserver.okta-emea.com/oauth2/default/v1/token" - ).body - == "grant_type=password&username=test_user&password=test_pwd&scope=openid" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + # expect Bearer token to remain the same - response = requests.get("http://authorized_only", auth=auth) - assert ( - response.request.headers.get("Authorization") == "Bearer 2YotnFZFEjr1zCsicMWpAA" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_scope_is_sent_as_is_when_provided_as_str(token_cache, responses: RequestsMock): auth = requests_auth.OktaResourceOwnerPasswordCredentials( @@ -375,8 +373,7 @@ def test_scope_is_sent_as_is_when_provided_as_str(token_cache, responses: Reques client_secret="test_pwd2", scope="my_scope+my_other_scope", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -385,18 +382,24 @@ def test_scope_is_sent_as_is_when_provided_as_str(token_cache, responses: Reques "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + "scope": "my_scope+my_other_scope", + } + ), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request( - responses, "https://testserver.okta-emea.com/oauth2/default/v1/token" - ).body - == "grant_type=password&username=test_user&password=test_pwd&scope=my_scope%2Bmy_other_scope" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_scope_is_sent_as_str_when_provided_as_list( token_cache, responses: RequestsMock @@ -409,8 +412,7 @@ def test_scope_is_sent_as_str_when_provided_as_list( client_secret="test_pwd2", scope=["my_scope", "my_other_scope"], ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", @@ -419,18 +421,24 @@ def test_scope_is_sent_as_str_when_provided_as_list( "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value", }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "password", + "username": "test_user", + "password": "test_pwd", + "scope": "my_scope my_other_scope", + } + ), + ], ) - assert ( - get_header(responses, auth).get("Authorization") - == "Bearer 2YotnFZFEjr1zCsicMWpAA" - ) - assert ( - get_request( - responses, "https://testserver.okta-emea.com/oauth2/default/v1/token" - ).body - == "grant_type=password&username=test_user&password=test_pwd&scope=my_scope+my_other_scope" + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], ) + requests.get("http://authorized_only", auth=auth) + def test_with_invalid_grant_request_no_json(token_cache, responses: RequestsMock): auth = requests_auth.OktaResourceOwnerPasswordCredentials( @@ -440,8 +448,7 @@ def test_with_invalid_grant_request_no_json(token_cache, responses: RequestsMock client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", body="failure", status=400, @@ -461,8 +468,7 @@ def test_with_invalid_grant_request_invalid_request_error( client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={"error": "invalid_request"}, status=400, @@ -488,8 +494,7 @@ def test_with_invalid_grant_request_invalid_request_error_and_error_description( client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={"error": "invalid_request", "error_description": "desc of the error"}, status=400, @@ -509,8 +514,7 @@ def test_with_invalid_grant_request_invalid_request_error_and_error_description_ client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "error": "invalid_request", @@ -537,8 +541,7 @@ def test_with_invalid_grant_request_invalid_request_error_and_error_description_ client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "error": "invalid_request", @@ -564,8 +567,7 @@ def test_with_invalid_grant_request_without_error(token_cache, responses: Reques client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={"other": "other info"}, status=400, @@ -585,8 +587,7 @@ def test_with_invalid_grant_request_invalid_client_error( client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={"error": "invalid_client"}, status=400, @@ -616,8 +617,7 @@ def test_with_invalid_grant_request_invalid_grant_error( client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={"error": "invalid_grant"}, status=400, @@ -643,8 +643,7 @@ def test_with_invalid_grant_request_unauthorized_client_error( client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={"error": "unauthorized_client"}, status=400, @@ -668,8 +667,7 @@ def test_with_invalid_grant_request_unsupported_grant_type_error( client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={"error": "unsupported_grant_type"}, status=400, @@ -693,8 +691,7 @@ def test_with_invalid_grant_request_invalid_scope_error( client_id="test_user2", client_secret="test_pwd2", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={"error": "invalid_scope"}, status=400, @@ -717,8 +714,7 @@ def test_without_expected_token(token_cache, responses: RequestsMock): client_secret="test_pwd2", token_field_name="not_provided", ) - responses.add( - responses.POST, + responses.post( "https://testserver.okta-emea.com/oauth2/default/v1/token", json={ "access_token": "2YotnFZFEjr1zCsicMWpAA", diff --git a/tests/test_api_key.py b/tests/test_api_key.py index e0ee5ed..935fa44 100644 --- a/tests/test_api_key.py +++ b/tests/test_api_key.py @@ -1,9 +1,10 @@ import pytest +import requests from responses import RequestsMock +from responses.matchers import header_matcher, query_string_matcher import requests_auth -from tests.auth_helper import get_header, get_query_args def test_header_api_key_requires_an_api_key(): @@ -20,19 +21,43 @@ def test_query_api_key_requires_an_api_key(): def test_header_api_key_is_sent_in_x_api_key_by_default(responses: RequestsMock): auth = requests_auth.HeaderApiKey("my_provided_api_key") - assert get_header(responses, auth).get("X-Api-Key") == "my_provided_api_key" + + responses.get( + "http://authorized_only", + match=[header_matcher({"X-API-Key": "my_provided_api_key"})], + ) + + requests.get("http://authorized_only", auth=auth) def test_query_api_key_is_sent_in_api_key_by_default(responses: RequestsMock): auth = requests_auth.QueryApiKey("my_provided_api_key") - assert get_query_args(responses, auth) == "/?api_key=my_provided_api_key" + + responses.get( + "http://authorized_only", + match=[query_string_matcher("api_key=my_provided_api_key")], + ) + + requests.get("http://authorized_only", auth=auth) def test_header_api_key_can_be_sent_in_a_custom_field_name(responses: RequestsMock): auth = requests_auth.HeaderApiKey("my_provided_api_key", "X-API-HEADER-KEY") - assert get_header(responses, auth).get("X-Api-Header-Key") == "my_provided_api_key" + + responses.get( + "http://authorized_only", + match=[header_matcher({"X-API-HEADER-KEY": "my_provided_api_key"})], + ) + + requests.get("http://authorized_only", auth=auth) def test_query_api_key_can_be_sent_in_a_custom_field_name(responses: RequestsMock): auth = requests_auth.QueryApiKey("my_provided_api_key", "X-API-QUERY-KEY") - assert get_query_args(responses, auth) == "/?X-API-QUERY-KEY=my_provided_api_key" + + responses.get( + "http://authorized_only", + match=[query_string_matcher("X-API-QUERY-KEY=my_provided_api_key")], + ) + + requests.get("http://authorized_only", auth=auth) diff --git a/tests/test_basic.py b/tests/test_basic.py index 8724950..d9d66d2 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -1,12 +1,16 @@ +import requests from responses import RequestsMock +from responses.matchers import header_matcher import requests_auth -from tests.auth_helper import get_header def test_basic_authentication_send_authorization_header(responses: RequestsMock): auth = requests_auth.Basic("test_user", "test_pwd") - assert ( - get_header(responses, auth).get("Authorization") - == "Basic dGVzdF91c2VyOnRlc3RfcHdk" + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Basic dGVzdF91c2VyOnRlc3RfcHdk"})], ) + + requests.get("http://authorized_only", auth=auth) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index c710316..aa66b0b 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -1,9 +1,10 @@ import os import pytest +import requests +from responses.matchers import header_matcher import requests_auth -from tests.auth_helper import get_header def test_requests_negotiate_sspi_is_used_when_nothing_is_provided_but_without_installed( @@ -28,11 +29,15 @@ def test_requests_negotiate_sspi_is_used_when_nothing_is_provided( monkeypatch.syspath_prepend( os.path.join(os.path.abspath(os.path.dirname(__file__)), "success_ntlm") ) - assert ( - get_header(responses, requests_auth.NTLM()).get("Authorization") - == "HttpNegotiateAuth fake" + auth = requests_auth.NTLM() + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "HttpNegotiateAuth fake"})], ) + requests.get("http://authorized_only", auth=auth) + def test_requests_ntlm_is_used_when_user_and_pass_provided_but_without_installed( monkeypatch, @@ -54,13 +59,17 @@ def test_requests_ntlm_is_used_when_user_and_pass_provided(monkeypatch, response monkeypatch.syspath_prepend( os.path.join(os.path.abspath(os.path.dirname(__file__)), "success_ntlm") ) - assert ( - get_header(responses, requests_auth.NTLM("fake_user", "fake_pwd")).get( - "Authorization" - ) - == "HttpNtlmAuth fake fake_user / fake_pwd" + auth = requests_auth.NTLM("fake_user", "fake_pwd") + + responses.get( + "http://authorized_only", + match=[ + header_matcher({"Authorization": "HttpNtlmAuth fake fake_user / fake_pwd"}) + ], ) + requests.get("http://authorized_only", auth=auth) + def test_user_without_password_is_invalid(): with pytest.raises(Exception) as exception_info: