diff --git a/.codegen.json b/.codegen.json index 842d6d6b..919e8cf4 100644 --- a/.codegen.json +++ b/.codegen.json @@ -1 +1 @@ -{ "engineHash": "ce205f7", "specHash": "1698c95", "version": "0.6.4" } +{ "engineHash": "81071a6", "specHash": "1698c95", "version": "0.6.4" } diff --git a/box_sdk_gen/box/developer_token_auth.py b/box_sdk_gen/box/developer_token_auth.py index a6be9acc..d5b8ebb5 100644 --- a/box_sdk_gen/box/developer_token_auth.py +++ b/box_sdk_gen/box/developer_token_auth.py @@ -1,5 +1,11 @@ from typing import Optional +from typing import List + +from box_sdk_gen.schemas import PostOAuth2TokenGrantTypeField + +from box_sdk_gen.schemas import PostOAuth2TokenSubjectTokenTypeField + from box_sdk_gen.schemas import AccessToken from box_sdk_gen.networking.auth import Authentication @@ -8,11 +14,37 @@ from box_sdk_gen.box.errors import BoxSDKError +from box_sdk_gen.box.token_storage import TokenStorage + +from box_sdk_gen.box.token_storage import InMemoryTokenStorage + +from box_sdk_gen.managers.authorization import AuthorizationManager + +from box_sdk_gen.schemas import PostOAuth2Token + +from box_sdk_gen.schemas import PostOAuth2Revoke + + +class DeveloperTokenConfig: + def __init__( + self, *, client_id: Optional[str] = None, client_secret: Optional[str] = None + ): + self.client_id = client_id + self.client_secret = client_secret + class BoxDeveloperTokenAuth(Authentication): - def __init__(self, token: str, **kwargs): + def __init__(self, token: str, *, config: DeveloperTokenConfig = None, **kwargs): + """ + :param config: Configuration object of DeveloperTokenAuth., defaults to None + :type config: DeveloperTokenConfig, optional + """ super().__init__(**kwargs) self.token = token + self.config = config + self.token_storage = InMemoryTokenStorage( + token=AccessToken(access_token=self.token) + ) def retrieve_token( self, *, network_session: Optional[NetworkSession] = None @@ -22,7 +54,10 @@ def retrieve_token( :param network_session: An object to keep network session state, defaults to None :type network_session: Optional[NetworkSession], optional """ - return AccessToken(access_token=self.token) + token: Optional[AccessToken] = self.token_storage.get() + if token == None: + raise BoxSDKError(message='No access token is available.') + return token def refresh_token( self, *, network_session: Optional[NetworkSession] = None @@ -41,3 +76,62 @@ def retrieve_authorization_header( ) -> str: token: AccessToken = self.retrieve_token(network_session=network_session) return ''.join(['Bearer ', token.access_token]) + + def revoke_token(self, *, network_session: Optional[NetworkSession] = None) -> None: + """ + Revoke an active Access Token, effectively logging a user out that has been previously authenticated. + :param network_session: An object to keep network session state, defaults to None + :type network_session: Optional[NetworkSession], optional + """ + token: Optional[AccessToken] = self.token_storage.get() + if token == None: + return None + auth_manager: AuthorizationManager = AuthorizationManager( + network_session=( + network_session if not network_session == None else NetworkSession() + ) + ) + auth_manager.revoke_access_token( + client_id=self.config.client_id, + client_secret=self.config.client_secret, + token=token.access_token, + ) + self.token_storage.clear() + return None + + def downscope_token( + self, + scopes: List[str], + *, + resource: Optional[str] = None, + shared_link: Optional[str] = None, + network_session: Optional[NetworkSession] = None + ) -> AccessToken: + """ + Downscope access token to the provided scopes. Returning a new access token with the provided scopes, with the original access token unchanged. + :param scopes: The scope(s) to apply to the resulting token. + :type scopes: List[str] + :param resource: The file or folder to get a downscoped token for. If None and shared_link None, the resulting token will not be scoped down to just a single item. The resource should be a full URL to an item, e.g. https://api.box.com/2.0/files/123456., defaults to None + :type resource: Optional[str], optional + :param shared_link: The shared link to get a downscoped token for. If None and item None, the resulting token will not be scoped down to just a single item., defaults to None + :type shared_link: Optional[str], optional + :param network_session: An object to keep network session state, defaults to None + :type network_session: Optional[NetworkSession], optional + """ + token: Optional[AccessToken] = self.token_storage.get() + if token == None or token.access_token == None: + raise BoxSDKError(message='No access token is available.') + auth_manager: AuthorizationManager = AuthorizationManager( + network_session=( + network_session if not network_session == None else NetworkSession() + ) + ) + downscoped_token: AccessToken = auth_manager.request_access_token( + PostOAuth2TokenGrantTypeField.URN_IETF_PARAMS_OAUTH_GRANT_TYPE_TOKEN_EXCHANGE.value, + subject_token=token.access_token, + subject_token_type=PostOAuth2TokenSubjectTokenTypeField.URN_IETF_PARAMS_OAUTH_TOKEN_TYPE_ACCESS_TOKEN.value, + resource=resource, + scope=' '.join(scopes), + box_shared_link=shared_link, + ) + return downscoped_token diff --git a/box_sdk_gen/box/token_storage.py b/box_sdk_gen/box/token_storage.py index 27ccdc5a..7a964b59 100644 --- a/box_sdk_gen/box/token_storage.py +++ b/box_sdk_gen/box/token_storage.py @@ -20,17 +20,17 @@ def clear(self) -> None: class InMemoryTokenStorage(TokenStorage): - def __init__(self): - self.token: Optional[AccessToken] = None + def __init__(self, token: Optional[AccessToken] = None): + self._token = token def store(self, token: AccessToken) -> None: - self.token = token + self._token = token def get(self) -> Optional[AccessToken]: - return self.token + return self._token def clear(self) -> None: - self.token = None + self._token = None class FileTokenStorage(TokenStorage): diff --git a/box_sdk_gen/internal/base_object.py b/box_sdk_gen/internal/base_object.py index f9034655..3e266b42 100644 --- a/box_sdk_gen/internal/base_object.py +++ b/box_sdk_gen/internal/base_object.py @@ -48,7 +48,7 @@ def to_dict(self) -> dict: @classmethod def _deserialize(cls, key, value, annotation=None): - if annotation is None: + if annotation is None or value is None: return value if get_origin(annotation) == Optional: return cls._deserialize(key, value, get_args(annotation)) @@ -69,13 +69,15 @@ def _deserialize(cls, key, value, annotation=None): return cls._deserialize_datetime(key, value, annotation) elif annotation == date: return cls._deserialize_date(key, value, annotation) - else: + elif isinstance(annotation, type) and issubclass(annotation, BaseObject): return cls._deserialize_nested_type(key, value, annotation) + else: + return value @classmethod def _deserialize_list(cls, key, value, annotation: list): - list_type = get_args(annotation)[0] try: + list_type = get_args(annotation)[0] return [ cls._deserialize(key, list_entry, list_type) for list_entry in value ] @@ -84,27 +86,32 @@ def _deserialize_list(cls, key, value, annotation: list): @classmethod def _deserialize_union(cls, key, value, annotation): - possible_types = get_args(annotation) - if value is None: - if type(None) not in possible_types: - print('Value: ', value, 'should not be allowed in Union:', annotation) + try: + possible_types = get_args(annotation) + if value is None: + if type(None) not in possible_types: + print( + 'Value: ', value, 'should not be allowed in Union:', annotation + ) + return value + + for possible_type in possible_types: + if ( + isinstance(possible_type, type) + and issubclass(possible_type, BaseObject) + and value.get(possible_type._discriminator[0], None) + in possible_type._discriminator[1] + ): + return cls._deserialize(key, value, possible_type) + + for possible_type in possible_types: + try: + return cls._deserialize(key, value, possible_type) + except Exception: + continue + return value + except Exception: return value - - for possible_type in possible_types: - if ( - issubclass(possible_type, BaseObject) - and value.get(possible_type._discriminator[0], None) - in possible_type._discriminator[1] - ): - return cls._deserialize(key, value, possible_type) - - for possible_type in possible_types: - try: - return cls._deserialize(key, value, possible_type) - except Exception: - continue - - return value @classmethod def _deserialize_enum(cls, key, value, annotation): diff --git a/box_sdk_gen/networking/auth.py b/box_sdk_gen/networking/auth.py index c5be20cc..f976406b 100644 --- a/box_sdk_gen/networking/auth.py +++ b/box_sdk_gen/networking/auth.py @@ -2,6 +2,8 @@ from abc import abstractmethod +from typing import List + from box_sdk_gen.schemas import AccessToken from box_sdk_gen.networking.network import NetworkSession @@ -28,3 +30,18 @@ def retrieve_authorization_header( self, *, network_session: Optional[NetworkSession] = None ) -> str: pass + + @abstractmethod + def revoke_token(self, *, network_session: Optional[NetworkSession] = None) -> None: + pass + + @abstractmethod + def downscope_token( + self, + scopes: List[str], + *, + resource: Optional[str] = None, + shared_link: Optional[str] = None, + network_session: Optional[NetworkSession] = None + ) -> AccessToken: + pass diff --git a/docs/authentication.md b/docs/authentication.md index 6d2007fe..0d7b6ee0 100644 --- a/docs/authentication.md +++ b/docs/authentication.md @@ -315,9 +315,11 @@ if __name__ == '__main__': # Revoke token Access tokens for a client can be revoked when needed. This call invalidates old token. -For CCGAuth and JWTAuth you can still reuse the `auth` object to retrieve a new token. If you make any new call after revoking the token, -a new token will be automatically retrieved. -For OAuth it would be necessary to manually go through the authentication process again. +For BoxCCGAuth and BoxJWTAuth you can still reuse the `auth` object to retrieve a new token. +If you make any new call after revoking the token, a new token will be automatically retrieved. +For BoxOAuth it would be necessary to manually go through the authentication process again. +For BoxDeveloperTokenAuth, it is necessary to provide a DeveloperTokenConfig during initialization, +containing the client ID and client secret. To revoke current client's tokens in the storage use the following code: @@ -342,7 +344,7 @@ If you want to learn more about available scopes please go [here](https://develo For example to get a new token with only `item_preview` scope, restricted to a single file, suitable for the [Content Preview UI Element](https://developer.box.com/en/guides/embed/ui-elements/preview/) you can use the following code. -You can also initialize `DeveloperTokenAuth` with the retrieved access token and use it to create a new Client. +You can also initialize `BoxDeveloperTokenAuth` with the retrieved access token and use it to create a new Client. @@ -354,7 +356,7 @@ downscoped_token: AccessToken = auth.downscope_token( scopes=['item_preview'], resource=resource, ) -downscoped_auth = BoxDeveloperTokenAuth(downscoped_token.access_token) +downscoped_auth = BoxDeveloperTokenAuth(token=downscoped_token.access_token) client = BoxClient(auth=downscoped_auth) ``` diff --git a/docs/tasks.md b/docs/tasks.md index 11d7a5db..34499f65 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -51,7 +51,7 @@ See the endpoint docs at ```python -client.tasks.create_task(CreateTaskItem(type=CreateTaskItemTypeField.FILE.value, id=file.id), action=CreateTaskAction.REVIEW.value, message='test message', due_at=date, completion_rule=CreateTaskCompletionRule.ALL_ASSIGNEES.value) +client.tasks.create_task(CreateTaskItem(type=CreateTaskItemTypeField.FILE.value, id=file.id), action=CreateTaskAction.REVIEW.value, message='test message', due_at=date_time, completion_rule=CreateTaskCompletionRule.ALL_ASSIGNEES.value) ``` ### Arguments diff --git a/test/auth.py b/test/auth.py index d29c7370..4c40a7e3 100644 --- a/test/auth.py +++ b/test/auth.py @@ -40,6 +40,8 @@ from box_sdk_gen.box.developer_token_auth import BoxDeveloperTokenAuth +from box_sdk_gen.box.developer_token_auth import DeveloperTokenConfig + from box_sdk_gen.box.oauth import BoxOAuth from box_sdk_gen.box.oauth import OAuthConfig @@ -200,6 +202,48 @@ def get_access_token() -> AccessToken: return auth_user.retrieve_token() +def test_developer_token_auth_revoke(): + developer_token_config: DeveloperTokenConfig = DeveloperTokenConfig( + client_id=get_env_var('CLIENT_ID'), client_secret=get_env_var('CLIENT_SECRET') + ) + token: AccessToken = get_access_token() + auth: BoxDeveloperTokenAuth = BoxDeveloperTokenAuth( + token=token.access_token, config=developer_token_config + ) + auth.retrieve_token() + token_from_storage_before_revoke: Optional[AccessToken] = auth.token_storage.get() + auth.revoke_token() + token_from_storage_after_revoke: Optional[AccessToken] = auth.token_storage.get() + assert not token_from_storage_before_revoke == None + assert token_from_storage_after_revoke == None + + +def test_developer_token_auth_downscope(): + developer_token_config: DeveloperTokenConfig = DeveloperTokenConfig( + client_id=get_env_var('CLIENT_ID'), client_secret=get_env_var('CLIENT_SECRET') + ) + token: AccessToken = get_access_token() + auth: BoxDeveloperTokenAuth = BoxDeveloperTokenAuth( + token=token.access_token, config=developer_token_config + ) + parent_client: BoxClient = BoxClient(auth=auth) + folder: FolderFull = parent_client.folders.create_folder( + get_uuid(), CreateFolderParent(id='0') + ) + resource_path: str = ''.join(['https://api.box.com/2.0/folders/', folder.id]) + downscoped_token: AccessToken = auth.downscope_token( + ['item_rename', 'item_preview'], resource=resource_path + ) + assert not downscoped_token.access_token == None + downscoped_client: BoxClient = BoxClient( + auth=BoxDeveloperTokenAuth(token=downscoped_token.access_token) + ) + downscoped_client.folders.update_folder_by_id(folder.id, name=get_uuid()) + with pytest.raises(Exception): + downscoped_client.folders.delete_folder_by_id(folder.id) + parent_client.folders.delete_folder_by_id(folder.id) + + def test_developer_token_auth(): user_id: str = get_env_var('USER_ID') token: AccessToken = get_access_token() diff --git a/test/legal_hold_policies.py b/test/legal_hold_policies.py index c2219d51..cfb62ff7 100644 --- a/test/legal_hold_policies.py +++ b/test/legal_hold_policies.py @@ -2,6 +2,8 @@ from box_sdk_gen.schemas import LegalHoldPolicies +from box_sdk_gen.internal.utils import DateTime + from box_sdk_gen.internal.utils import get_uuid from box_sdk_gen.internal.utils import date_time_from_string @@ -47,23 +49,23 @@ def testCreateUpdateGetDeleteLegalHoldPolicy(): def testCreateNotOngoingLegalHoldPolicy(): legal_hold_policy_name: str = get_uuid() legal_hold_description: str = 'test description' + filter_started_at: DateTime = date_time_from_string('2021-01-01T00:00:00-08:00') + filter_ended_at: DateTime = date_time_from_string('2022-01-01T00:00:00-08:00') legal_hold_policy: LegalHoldPolicy = ( client.legal_hold_policies.create_legal_hold_policy( legal_hold_policy_name, description=legal_hold_description, - filter_started_at=date_time_from_string('2021-01-01T00:00:00-08:00'), - filter_ended_at=date_time_from_string('2022-01-01T00:00:00-08:00'), + filter_started_at=filter_started_at, + filter_ended_at=filter_ended_at, is_ongoing=False, ) ) assert legal_hold_policy.policy_name == legal_hold_policy_name assert legal_hold_policy.description == legal_hold_description - assert ( - date_time_to_string(legal_hold_policy.filter_started_at) - == '2021-01-01T00:00:00-08:00' - ) - assert ( - date_time_to_string(legal_hold_policy.filter_ended_at) - == '2022-01-01T00:00:00-08:00' - ) + assert date_time_to_string( + legal_hold_policy.filter_started_at + ) == date_time_to_string(filter_started_at) + assert date_time_to_string( + legal_hold_policy.filter_ended_at + ) == date_time_to_string(filter_ended_at) client.legal_hold_policies.delete_legal_hold_policy_by_id(legal_hold_policy.id) diff --git a/test/tasks.py b/test/tasks.py index d9461899..f3e53fd0 100644 --- a/test/tasks.py +++ b/test/tasks.py @@ -43,17 +43,17 @@ def testCreateUpdateGetDeleteTask(): generate_byte_stream(10), ) file: FileFull = files.entries[0] - date: DateTime = date_time_from_string('2035-01-01T00:00:00Z') + date_time: DateTime = date_time_from_string('2035-01-01T00:00:00Z') task: Task = client.tasks.create_task( CreateTaskItem(type=CreateTaskItemTypeField.FILE.value, id=file.id), action=CreateTaskAction.REVIEW.value, message='test message', - due_at=date, + due_at=date_time, completion_rule=CreateTaskCompletionRule.ALL_ASSIGNEES.value, ) assert task.message == 'test message' assert task.item.id == file.id - assert date_time_to_string(task.due_at) == '2035-01-01T00:00:00Z' + assert date_time_to_string(task.due_at) == date_time_to_string(date_time) task_by_id: Task = client.tasks.get_task_by_id(task.id) assert task_by_id.id == task.id task_on_file: Tasks = client.tasks.get_file_tasks(file.id)