From 56e40864bce33d02b3bf95f7325b181174d26b9b Mon Sep 17 00:00:00 2001 From: Vladyslav Bilous Date: Sat, 22 Apr 2023 01:14:50 +0300 Subject: [PATCH 1/3] Optimized imports --- steampy/login.py | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/steampy/login.py b/steampy/login.py index 5f878a9..e528a98 100644 --- a/steampy/login.py +++ b/steampy/login.py @@ -1,22 +1,24 @@ -import base64 -import time -import requests +from time import time +from base64 import b64encode + +from rsa import encrypt, PublicKey +from requests import Session, Response + from steampy import guard -import rsa from steampy.models import SteamUrl -from steampy.exceptions import InvalidCredentials, CaptchaRequired +from steampy.exceptions import InvalidCredentials, CaptchaRequired, ApiException class LoginExecutor: - def __init__(self, username: str, password: str, shared_secret: str, session: requests.Session) -> None: + def __init__(self, username: str, password: str, shared_secret: str, session: Session) -> None: self.username = username self.password = password self.one_time_code = '' self.shared_secret = shared_secret self.session = session - def login(self) -> requests.Session: + def login(self) -> Session: login_response = self._send_login_request() self._check_for_captcha(login_response) login_response = self._enter_steam_guard_if_necessary(login_response) @@ -25,7 +27,7 @@ def login(self) -> requests.Session: self.set_sessionid_cookies() return self.session - def _send_login_request(self) -> requests.Response: + def _send_login_request(self) -> Response: rsa_params = self._fetch_rsa_params() encrypted_password = self._encrypt_password(rsa_params) rsa_timestamp = rsa_params['rsa_timestamp'] @@ -55,7 +57,7 @@ def _fetch_rsa_params(self, current_number_of_repetitions: int = 0) -> dict: rsa_mod = int(key_response['publickey_mod'], 16) rsa_exp = int(key_response['publickey_exp'], 16) rsa_timestamp = key_response['timestamp'] - return {'rsa_key': rsa.PublicKey(rsa_mod, rsa_exp), + return {'rsa_key': PublicKey(rsa_mod, rsa_exp), 'rsa_timestamp': rsa_timestamp} except KeyError: if current_number_of_repetitions < maximal_number_of_repetitions: @@ -63,10 +65,10 @@ def _fetch_rsa_params(self, current_number_of_repetitions: int = 0) -> dict: else: raise ValueError('Could not obtain rsa-key') - def _encrypt_password(self, rsa_params: dict) -> str: - return base64.b64encode(rsa.encrypt(self.password.encode('utf-8'), rsa_params['rsa_key'])) + def _encrypt_password(self, rsa_params: dict) -> bytes: + return b64encode(encrypt(self.password.encode('utf-8'), rsa_params['rsa_key'])) - def _prepare_login_request_data(self, encrypted_password: str, rsa_timestamp: str) -> dict: + def _prepare_login_request_data(self, encrypted_password: bytes, rsa_timestamp: str) -> dict: return { 'password': encrypted_password, 'username': self.username, @@ -78,22 +80,22 @@ def _prepare_login_request_data(self, encrypted_password: str, rsa_timestamp: st 'emailsteamid': '', 'rsatimestamp': rsa_timestamp, 'remember_login': 'true', - 'donotcache': str(int(time.time() * 1000)) + 'donotcache': str(int(time() * 1000)) } @staticmethod - def _check_for_captcha(login_response: requests.Response) -> None: + def _check_for_captcha(login_response: Response) -> None: if login_response.json().get('captcha_needed', False): raise CaptchaRequired('Captcha required') - def _enter_steam_guard_if_necessary(self, login_response: requests.Response) -> requests.Response: + def _enter_steam_guard_if_necessary(self, login_response: Response) -> Response: if login_response.json()['requires_twofactor']: self.one_time_code = guard.generate_one_time_code(self.shared_secret) return self._send_login_request() return login_response @staticmethod - def _assert_valid_credentials(login_response: requests.Response) -> None: + def _assert_valid_credentials(login_response: Response) -> None: if not login_response.json()['success']: raise InvalidCredentials(login_response.json()['message']) @@ -104,5 +106,5 @@ def _perform_redirects(self, response_dict: dict) -> None: for url in response_dict['transfer_urls']: self.session.post(url, parameters) - def _fetch_home_page(self, session: requests.Session) -> requests.Response: + def _fetch_home_page(self, session: Session) -> Response: return session.post(SteamUrl.COMMUNITY_URL + '/my/home/') From def455c0e582c40e527c1330cb75cf74146909ca Mon Sep 17 00:00:00 2001 From: Vladyslav Bilous Date: Sat, 22 Apr 2023 01:24:52 +0300 Subject: [PATCH 2/3] Created an API call to get an RSA key --- steampy/login.py | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/steampy/login.py b/steampy/login.py index e528a98..427124c 100644 --- a/steampy/login.py +++ b/steampy/login.py @@ -18,6 +18,18 @@ def __init__(self, username: str, password: str, shared_secret: str, session: Se self.shared_secret = shared_secret self.session = session + def _api_call(self, method: str, service: str, endpoint: str, version: str = 'v1', params: dict = None) -> Response: + url = '/'.join([SteamUrl.API_URL, service, endpoint, version]) + # all requests from the login page use the same "Referer" and "Origin" values + headers = { + "Referer": SteamUrl.COMMUNITY_URL + '/', + "Origin": SteamUrl.COMMUNITY_URL + } + if method.upper() == 'GET': + return self.session.get(url, params=params, headers=headers) + else: + return self.session.post(url, data=params, headers=headers) + def login(self) -> Session: login_response = self._send_login_request() self._check_for_captcha(login_response) @@ -50,20 +62,22 @@ def _create_session_id_cookie(sessionid: str, domain: str) -> dict: "domain": domain} def _fetch_rsa_params(self, current_number_of_repetitions: int = 0) -> dict: + request_data = {'account_name': self.username} + response = self._api_call('GET', 'IAuthenticationService', 'GetPasswordRSAPublicKey', params=request_data) + + if response.status_code == 200 and 'response' in response.json(): + key_data = response.json()['response'] + # Steam may return an empty "response" value even if the status is 200 + if 'publickey_mod' in key_data and 'publickey_exp' in key_data and 'timestamp' in key_data: + rsa_mod = int(key_data['publickey_mod'], 16) + rsa_exp = int(key_data['publickey_exp'], 16) + return {'rsa_key': PublicKey(rsa_mod, rsa_exp), 'rsa_timestamp': key_data['timestamp']} + maximal_number_of_repetitions = 5 - key_response = self.session.post(SteamUrl.COMMUNITY_URL + '/login/getrsakey/', - data={'username': self.username}).json() - try: - rsa_mod = int(key_response['publickey_mod'], 16) - rsa_exp = int(key_response['publickey_exp'], 16) - rsa_timestamp = key_response['timestamp'] - return {'rsa_key': PublicKey(rsa_mod, rsa_exp), - 'rsa_timestamp': rsa_timestamp} - except KeyError: - if current_number_of_repetitions < maximal_number_of_repetitions: - return self._fetch_rsa_params(current_number_of_repetitions + 1) - else: - raise ValueError('Could not obtain rsa-key') + if current_number_of_repetitions < maximal_number_of_repetitions: + return self._fetch_rsa_params(current_number_of_repetitions + 1) + + raise ApiException('Could not obtain rsa-key. Status code: %s' % response.status_code) def _encrypt_password(self, rsa_params: dict) -> bytes: return b64encode(encrypt(self.password.encode('utf-8'), rsa_params['rsa_key'])) From d8ce169105cafb73b6343efba2818a3160a3c48a Mon Sep 17 00:00:00 2001 From: Vladyslav Bilous Date: Sun, 23 Apr 2023 14:11:18 +0300 Subject: [PATCH 3/3] Replaced raw status code with HTTPStatus enum --- steampy/login.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/steampy/login.py b/steampy/login.py index 427124c..51b1e41 100644 --- a/steampy/login.py +++ b/steampy/login.py @@ -1,4 +1,5 @@ from time import time +from http import HTTPStatus from base64 import b64encode from rsa import encrypt, PublicKey @@ -65,7 +66,7 @@ def _fetch_rsa_params(self, current_number_of_repetitions: int = 0) -> dict: request_data = {'account_name': self.username} response = self._api_call('GET', 'IAuthenticationService', 'GetPasswordRSAPublicKey', params=request_data) - if response.status_code == 200 and 'response' in response.json(): + if response.status_code == HTTPStatus.OK and 'response' in response.json(): key_data = response.json()['response'] # Steam may return an empty "response" value even if the status is 200 if 'publickey_mod' in key_data and 'publickey_exp' in key_data and 'timestamp' in key_data: