diff --git a/WHATSNEW.md b/WHATSNEW.md index dc0bacb..601a9e5 100644 --- a/WHATSNEW.md +++ b/WHATSNEW.md @@ -1,5 +1,7 @@ ### Added +- FortiManager Methods for handling addresses and services (get and delete) + ### Changed - Upgrade requests to >=2.32 due to dependabot warning diff --git a/fotoobo/fortinet/fortimanager.py b/fotoobo/fortinet/fortimanager.py index 38c2714..1b06990 100644 --- a/fotoobo/fortinet/fortimanager.py +++ b/fotoobo/fortinet/fortimanager.py @@ -15,11 +15,16 @@ log = logging.getLogger("fotoobo") -class FortiManager(Fortinet): +class FortiManager(Fortinet): # pylint: disable=too-many-public-methods """ Represents one FortiManager (digital twin) """ + def __del__(self) -> None: + """The destructor""" + if self.session_key and not self.session_path: + self.logout() + def __init__(self, hostname: str, username: str, password: str, **kwargs: Any) -> None: """ Set some initial parameters. @@ -63,10 +68,43 @@ def __init__(self, hostname: str, username: str, password: str, **kwargs: Any) - "rootp", ] - def __del__(self) -> None: - """The destructor""" - if self.session_key and not self.session_path: - self.logout() + def api_delete(self, url: str) -> requests.models.Response: + """DELETE method for API requests + + Args: + url: API endpoint to access + + Result: + FortiManager result item + """ + payload = { + "method": "delete", + "params": [{"url": f"{url}"}], + } + return self.api("post", payload=payload) + + def api_get( + self, url: str, params: Optional[dict[str, Any]] = None, timeout: Optional[float] = None + ) -> requests.models.Response: + """GET method for API requests + + Args: + url: API endpoint to access + params: Additional query parameters if needed + timeout: The requests read timeout in seconds + + Result: + FortiManager result item + """ + _params = {"url": f"{url}"} + if params: + _params = {**_params, **params} + + payload = { + "method": "get", + "params": [_params], + } + return self.api("post", payload=payload, timeout=timeout) def api( # pylint: disable=too-many-arguments self, @@ -155,7 +193,377 @@ def assign_all_objects(self, adoms: str, policy: str) -> int: return task_id - def get_adoms(self, ignored_adoms: Optional[List[str]] = None) -> List[Any]: + def delete_adom_address(self, adom: str, address: str, dry: bool = False) -> dict[str, Any]: + """ + Delete an address from an ADOM in FortiManager + + Args: + adom: The ADOM to delete the address from + address: The address to delete + dry: Set to True to enable dry-run (no changes on FortiManager) + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + if not dry: + url: str = f"/pm/config/adom/{adom}/obj/firewall/address/{address}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove address '%s' in ADOM '%s'", address, adom) + + return result + + def delete_adom_address_group(self, adom: str, group: str, dry: bool = False) -> dict[str, Any]: + """ + Delete an address group from an ADOM in FortiManager + + Args: + adom: The ADOM to delete the address group from + group: The address group to delete + dry: Set to True to enable dry-run (no changes on FortiManager) + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + if not dry: + url: str = f"/pm/config/adom/{adom}/obj/firewall/addrgrp/{group}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove address group '%s' in ADOM '%s'", group, adom) + + return result + + def delete_adom_service(self, adom: str, service: str, dry: bool = False) -> dict[str, Any]: + """ + Delete a service from an ADOM in FortiManager + + Args: + adom: The ADOM to delete the address from + service: The service to delete + dry: Set to True to enable dry-run (no changes on FortiManager) + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + if not dry: + url: str = f"/pm/config/adom/{adom}/obj/firewall/service/custom/{service}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove service '%s' in ADOM '%s'", service, adom) + + return result + + def delete_adom_service_group(self, adom: str, group: str, dry: bool = False) -> dict[str, Any]: + """ + Delete a service group from an ADOM in FortiManager + + Args: + adom: The ADOM to delete the address group from + group: The address group to delete + dry: Set to True to enable dry-run (no changes on FortiManager) + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + if not dry: + url: str = f"/pm/config/adom/{adom}/obj/firewall/service/group/{group}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove service group '%s' in ADOM '%s'", group, adom) + + return result + + def delete_global_address(self, address: str, dry: bool = False) -> dict[str, Any]: + """ + Delete a global address from FortiManager + + To be sure to not delete used objects we configure the FortiManager as follows: + + config system admin setting + set objects-force-deletion disable + + Before deleting a global address object we have to check if it's in use in any ADOM. Only + after we found that no ADOM uses the object we can safely delete it. + + Fortinet documentation links:\n + https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.6.0/administration-guide/322714 + + Args: + address: The global address to delete + dry: Set to True to enable dry-run (no changes on FortiManager) + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + + # Get the address object with 'scope member' information + address_object = self.get_global_address(address, scope_member=True) + if address_object["status"]["code"] == 0: + # Generate a list of ADOMs where the object is used. Therefore we get the address object + # from FortiManager with the 'scope member' option. If the object is used in any ADOM it + # is listed in the key 'scope member'. If the object is not used in any ADOM the + # 'scope member' key is not present. + if "scope member" in address_object["data"]: + used_adoms = [_["name"] for _ in address_object["data"]["scope member"]] + log.debug("'%s' is used in ADOM '%s'", address, ",".join(used_adoms)) + + else: + used_adoms = [] + + if not dry: + # Try to delete the address group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if self.delete_adom_address(adom, address)["status"]["code"] not in [ + -3, + 0, + ]: + blocked_adoms.append(adom) + + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", address, ",".join(blocked_adoms)) + result = address_object + result["status"] = { + "code": 601, + "message": f"Used in ADOM {','.join(blocked_adoms)}", + } + + else: + # Try to delete the global address object + url: str = f"/pm/config/global/obj/firewall/address/{address}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove global address '%s'", address) + + else: + result = address_object + + return result + + def delete_global_address_group(self, group: str, dry: bool = False) -> dict[str, Any]: + """ + Delete a global address group from FortiManager + + To be sure to not delete used objects we configure the FortiManager as follows: + + config system admin setting + set objects-force-deletion disable + + Before deleting a global address group object we have to check if it's in use in any ADOM. + Only after we found that no ADOM uses the object we can safely delete it. + + Fortinet documentation links:\n + https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.6.0/administration-guide/322714 + + Args: + group: The global address group to delete + dry: Set to True to enable dry-run (no changes on FortiManager) + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + + # Get the address group object with 'scope member' information + address_group_object = self.get_global_address_group(group, scope_member=True) + if address_group_object["status"]["code"] == 0: + + # Generate a list of ADOMs where the object is used. Therefore we get the address group + # object from FortiManager with the 'scope member' option. If the object is used in any + # ADOM it is listed in the key 'scope member'. If the object is not used in any ADOM the + # 'scope member' key is not present. + if "scope member" in address_group_object["data"]: + used_adoms = [_["name"] for _ in address_group_object["data"]["scope member"]] + log.debug("'%s' is used in ADOM '%s'", group, ",".join(used_adoms)) + + else: + used_adoms = [] + + if not dry: + # Try to delete the address group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if not self.delete_adom_address_group(adom, group)["status"]["code"] in [-3, 0]: + blocked_adoms.append(adom) + + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) + result = address_group_object + result["status"] = { + "code": 601, + "message": f"Used in ADOM {','.join(blocked_adoms)}", + } + + else: + # Try to delete the global address group object + url: str = f"/pm/config/global/obj/firewall/addrgrp/{group}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove global address group '%s'", group) + + else: + result = address_group_object + + return result + + def delete_global_service(self, service: str, dry: bool = False) -> dict[str, Any]: + """ + Delete a global service from FortiManager + + To be sure to not delete used objects we configure the FortiManager as follows: + + config system admin setting + set objects-force-deletion disable + + Before deleting a global service object we have to check if it's in use in any ADOM. Only + after we found that no ADOM uses the object we can safely delete it. + + Fortinet documentation links:\n + https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.6.0/administration-guide/322714 + + Args: + service: The global service to delete + dry: Set to True to enable dry-run (no changes on FortiManager) + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + + # Get the service object with 'scope member' information + service_object = self.get_global_service(service, scope_member=True) + if service_object["status"]["code"] == 0: + + # Generate a list of ADOMs where the object is used. Therefore we get the service object + # from FortiManager with the 'scope member' option. If the object is used in any ADOM it + # is listed in the key 'scope member'. If the object is not used in any ADOM the + # 'scope member' key is not present. + if "scope member" in service_object["data"]: + used_adoms = [_["name"] for _ in service_object["data"]["scope member"]] + log.debug("'%s' is used in ADOM '%s'", service, ",".join(used_adoms)) + + else: + used_adoms = [] + + if not dry: + # Try to delete the address group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if not self.delete_adom_service(adom, service)["status"]["code"] in [-3, 0]: + blocked_adoms.append(adom) + + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", service, ",".join(blocked_adoms)) + result = service_object + result["status"] = { + "code": 601, + "message": f"Used in ADOM {','.join(blocked_adoms)}", + } + + else: + # Try to delete the global service object + url: str = f"/pm/config/global/obj/firewall/service/custom/{service}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove global service '%s'", service) + + else: + result = service_object + + return result + + def delete_global_service_group(self, group: str, dry: bool = False) -> dict[str, Any]: + """ + Delete a global service group from FortiManager + + To be sure to not delete used objects we configure the FortiManager as follows: + + config system admin setting + set objects-force-deletion disable + + Before deleting a global service group object we have to check if it's in use in any ADOM. + Only after we found that no ADOM uses the object we can safely delete it. + + Fortinet documentation links:\n + https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.6.0/administration-guide/322714 + + Args: + group: The global service group to delete + dry: Set to True to enable dry-run (no changes on FortiManager) + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + + # Get the service group object with 'scope member' information + service_group_object = self.get_global_service_group(group, scope_member=True) + if service_group_object["status"]["code"] == 0: + + # Generate a list of ADOMs where the object is used. Therefore we get the service group + # object from FortiManager with the 'scope member' option. If the object is used in any + # ADOM it is listed in the key 'scope member'. If the object is not used in any ADOM the + # 'scope member' key is not present. + if "scope member" in service_group_object["data"]: + used_adoms = [_["name"] for _ in service_group_object["data"]["scope member"]] + log.debug("'%s' is used in ADOM '%s'", group, ",".join(used_adoms)) + + else: + used_adoms = [] + + if not dry: + # Try to delete the service group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if not self.delete_adom_service_group(adom, group)["status"]["code"] in [-3, 0]: + blocked_adoms.append(adom) + + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) + result = service_group_object + result["status"] = { + "code": 601, + "message": f"Used in ADOM {','.join(blocked_adoms)}", + } + + else: + # Try to delete the global service group object + url: str = f"/pm/config/global/obj/firewall/service/group/{group}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove global service group '%s'", group) + + else: + result = service_group_object + + return result + + def get_adoms(self, ignored_adoms: Optional[List[str]] = None) -> list[Any]: """ Get FortiManager ADOM list @@ -177,6 +585,162 @@ def get_adoms(self, ignored_adoms: Optional[List[str]] = None) -> List[Any]: return fmg_adoms + def get_global_address(self, address: str, scope_member: bool = False) -> dict[str, Any]: + """ + Get an address object from global ADOM + + Args: + address: The address to get + scope_member: Whether the scope member attribute should be included in the response + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + url: str = f"/pm/config/global/obj/firewall/address/{address}" + + if scope_member: + params = {"option": ["scope member"]} + response = self.api_get(url, params) + + else: + response = self.api_get(url) + + result = response.json()["result"][0] + + return result + + def get_global_addresses(self) -> dict[str, Any]: + """ + Get the global address database + + Returns: + FortiManager result item + """ + result: dict[str, Any] = self.api_get( + "/pm/config/global/obj/firewall/address", + timeout=10, + ).json()["result"][0] + + return result + + def get_global_address_group(self, group: str, scope_member: bool = False) -> dict[str, Any]: + """ + Get an address group object from the global ADOM + + Args: + group: The address group to get + scope_member: Whether the scope member attribute should be included in the response + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + url: str = f"/pm/config/global/obj/firewall/addrgrp/{group}" + + if scope_member: + params = {"option": ["scope member"]} + response = self.api_get(url, params) + + else: + response = self.api_get(url) + + result = response.json()["result"][0] + + return result + + def get_global_address_groups(self) -> dict[str, Any]: + """ + Get the global address group database + + Returns: + FortiManager result item + """ + result: dict[str, Any] = self.api_get( + "/pm/config/global/obj/firewall/addrgrp", + timeout=10, + ).json()["result"][0] + + return result + + def get_global_service(self, service: str, scope_member: bool = False) -> dict[str, Any]: + """ + Get a service object from global ADOM + + Args: + service: The service to get + scope_member: Whether the scope member attribute should be included in the response + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + url: str = f"/pm/config/global/obj/firewall/service/custom/{service}" + + if scope_member: + params = {"option": ["scope member"]} + response = self.api_get(url, params) + + else: + response = self.api_get(url) + + result = response.json()["result"][0] + + return result + + def get_global_services(self) -> dict[str, Any]: + """ + Get the global services database + + Returns: + FortiManager result item + """ + result: dict[str, Any] = self.api_get( + "/pm/config/global/obj/firewall/service/custom", + timeout=10, + ).json()["result"][0] + + return result + + def get_global_service_group(self, group: str, scope_member: bool = False) -> dict[str, Any]: + """ + Get a service group object from the global ADOM + + Args: + group: The address group to get + scope_member: Whether the scope member attribute should be included in the response + + Returns: + FortiManager result item + """ + result: dict[str, Any] = {} + url: str = f"/pm/config/global/obj/firewall/service/group/{group}" + + if scope_member: + params = {"option": ["scope member"]} + response = self.api_get(url, params) + + else: + response = self.api_get(url) + + result = response.json()["result"][0] + + return result + + def get_global_service_groups(self) -> dict[str, Any]: + """ + Get the global network service group database + + Returns: + FortiManager result item + """ + result: dict[str, Any] = self.api_get( + "/pm/config/global/obj/firewall/service/group", + timeout=10, + ).json()["result"][0] + + return result + def get_version(self) -> str: """ Get FortiManager version. diff --git a/tests/fortinet/test_fortimanager.py b/tests/fortinet/test_fortimanager.py index 99d4aa4..812c471 100644 --- a/tests/fortinet/test_fortimanager.py +++ b/tests/fortinet/test_fortimanager.py @@ -2,7 +2,9 @@ Test the FortiManager class """ -# pylint: disable=no-member +# pylint: disable=no-member, too-many-lines +# mypy: disable-error-code=attr-defined +from typing import Any from unittest.mock import MagicMock import pytest @@ -14,9 +16,134 @@ from tests.helper import ResponseMock -class TestFortiManager: +class TestFortiManager: # pylint: disable=too-many-public-methods """Test the FortiManager class""" + @staticmethod + def _response_mock_api_ok() -> MagicMock: + """Fixture to return a mocked response for API ok""" + return MagicMock( + return_value=ResponseMock( + json={"result": [{"data": {}, "status": {"code": 0, "message": "OK"}}]}, + status_code=200, + ) + ) + + @staticmethod + @pytest.fixture + def api_delete_ok(monkeypatch: MonkeyPatch) -> None: + """Fixture to patch the FortiManager.api_delete() method""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.api_delete", + TestFortiManager._response_mock_api_ok(), + ) + + @staticmethod + @pytest.fixture + def api_get_ok(monkeypatch: MonkeyPatch) -> None: + """Fixture to patch the FortiManager.api_get() method""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.api_get", + TestFortiManager._response_mock_api_ok(), + ) + + @staticmethod + def test_api_delete(monkeypatch: MonkeyPatch) -> None: + """Test api_delete""" + url: str = "/pm/config/adom/dummy/obj/firewall/address/dummy" + monkeypatch.setattr( + "fotoobo.fortinet.fortinet.requests.Session.post", + MagicMock( + return_value=ResponseMock( + json={ + "result": [ + { + "status": {"code": 0, "message": "OK"}, + "url": url, + } + ] + }, + status_code=200, + ) + ), + ) + fmg = FortiManager("host", "", "") + assert fmg.api_delete(url).json()["result"][0]["status"]["code"] == 0 + requests.Session.post.assert_called_with( + "https://host:443/jsonrpc", + headers=None, + json={"method": "delete", "params": [{"url": url}], "session": ""}, + params=None, + timeout=3, + verify=True, + ) + + @staticmethod + @pytest.mark.parametrize( + "with_params", + ( + pytest.param(False, id="without params"), + pytest.param(True, id="with params"), + ), + ) + def test_api_get(with_params: bool, monkeypatch: MonkeyPatch) -> None: + """test api_get""" + url: str = "/pm/config/global/obj/firewall/address/dummy" + params = {"option": ["scope member"]} + monkeypatch.setattr( + "fotoobo.fortinet.fortinet.requests.Session.post", + MagicMock( + return_value=ResponseMock( + json={ + "result": [ + { + "data": { + "name": "dummy", + "uuid": "88888888-4444-4444-4444-121212121212", + }, + "status": {"code": 0, "message": "OK"}, + "url": url, + } + ] + }, + status_code=200, + ) + ), + ) + fmg = FortiManager("host", "", "") + expected_call = ( + ["https://host:443/jsonrpc"], + { + "headers": None, + "json": { + "method": "get", + "params": [ + { + "url": "/pm/config/global/obj/firewall/address/dummy", + } + ], + "session": "", + }, + "params": None, + "timeout": 3, + "verify": True, + }, + ) + if with_params: + assert fmg.api_get(url, params).json()["result"][0]["status"]["code"] == 0 + expected_call[1]["json"]["params"][0] = { # type: ignore + **expected_call[1]["json"]["params"][0], # type: ignore + **{"option": ["scope member"]}, + } + requests.Session.post.assert_called_with( + *expected_call[0], + **expected_call[1], + ) + + else: + assert fmg.api_get(url).json()["result"][0]["status"]["code"] == 0 + requests.Session.post.assert_called_with(*expected_call[0], **expected_call[1]) + @staticmethod def test_assign_all_objects(monkeypatch: MonkeyPatch) -> None: """Test assign_all_objects""" @@ -38,7 +165,7 @@ def test_assign_all_objects(monkeypatch: MonkeyPatch) -> None: ), ) assert FortiManager("host", "", "").assign_all_objects("dummy_adom", "dummy_policy") == 111 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -69,8 +196,9 @@ def test_assign_all_objects_http_404(monkeypatch: MonkeyPatch) -> None: ) with pytest.raises(APIError) as err: FortiManager("host", "", "").assign_all_objects("dummy_adom", "dummy_policy") + assert "HTTP/404 Resource Not Found" in str(err.value) - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -113,7 +241,7 @@ def test_assign_all_objects_status_not_ok(monkeypatch: MonkeyPatch) -> None: ), ) assert FortiManager("host", "", "").assign_all_objects("adom1,adom2", "policy1") == 0 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -138,6 +266,294 @@ def test_assign_all_objects_status_not_ok(monkeypatch: MonkeyPatch) -> None: verify=True, ) + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_address() -> None: + """Test fmg delete_adom_address""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_address("dummy", "dummy")["status"]["code"] == 0 + FortiManager.api_delete.assert_called_with( + "/pm/config/adom/dummy/obj/firewall/address/dummy" + ) + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_address_dry() -> None: + """Test fmg delete_adom_address with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_address("dummy", "dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_address_group() -> None: + """Test fmg delete_adom_address_group""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_address_group("dummy", "dummy")["status"]["code"] == 0 + FortiManager.api_delete.assert_called_with( + "/pm/config/adom/dummy/obj/firewall/addrgrp/dummy" + ) + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_address_group_dry() -> None: + """Test fmg delete_adom_address_group with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_address_group("dummy", "dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_service() -> None: + """Test fmg delete_adom_service""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_service("dummy", "dummy")["status"]["code"] == 0 + FortiManager.api_delete.assert_called_with( + "/pm/config/adom/dummy/obj/firewall/service/custom/dummy" + ) + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_service_dry() -> None: + """Test fmg delete_adom_service with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_service("dummy", "dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_service_group() -> None: + """Test fmg delete_adom_service_group""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_service_group("dummy", "dummy")["status"]["code"] == 0 + FortiManager.api_delete.assert_called_with( + "/pm/config/adom/dummy/obj/firewall/service/group/dummy" + ) + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_service_group_dry() -> None: + """Test fmg delete_adom_service_group with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_service_group("dummy", "dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + @pytest.mark.parametrize( + "get_global_address_data", + ( + pytest.param({"scope member": [{"name": "ADOM"}]}, id="with scope member"), + pytest.param({}, id="without scope member"), + ), + ) + @pytest.mark.parametrize( + "get_global_address_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + @pytest.mark.parametrize( + "delete_adom_address_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + def test_delete_global_address( + get_global_address_data: dict[str, Any], + get_global_address_status: dict[str, Any], + delete_adom_address_status: dict[str, Any], + monkeypatch: MonkeyPatch, + ) -> None: + """Test fmg delete_global_address""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.get_global_address", + MagicMock( + return_value={ + "data": get_global_address_data, + "status": get_global_address_status, + } + ), + ) + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.delete_adom_address", + MagicMock(return_value={"status": delete_adom_address_status}), + ) + fmg = FortiManager("host", "", "") + assert fmg.delete_global_address("dummy")["status"]["code"] in [0, 7, 601] + + @staticmethod + @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") + def test_delete_global_address_dry() -> None: + """Test fmg delete_global_address with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_global_address("dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + @pytest.mark.parametrize( + "get_global_address_group_data", + ( + pytest.param({"scope member": [{"name": "ADOM"}]}, id="with scope member"), + pytest.param({}, id="without scope member"), + ), + ) + @pytest.mark.parametrize( + "get_global_address_group_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + @pytest.mark.parametrize( + "delete_adom_address_group_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + def test_delete_global_address_group( + get_global_address_group_data: dict[str, Any], + get_global_address_group_status: dict[str, Any], + delete_adom_address_group_status: dict[str, Any], + monkeypatch: MonkeyPatch, + ) -> None: + """Test fmg delete_global_address_group""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.get_global_address_group", + MagicMock( + return_value={ + "data": get_global_address_group_data, + "status": get_global_address_group_status, + } + ), + ) + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.delete_adom_address_group", + MagicMock(return_value={"status": delete_adom_address_group_status}), + ) + fmg = FortiManager("host", "", "") + assert fmg.delete_global_address_group("dummy")["status"]["code"] in [0, 7, 601] + + @staticmethod + @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") + def test_delete_global_address_group_dry() -> None: + """Test fmg delete_global_address_group with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_global_address_group("dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + @pytest.mark.parametrize( + "get_global_service_data", + ( + pytest.param({"scope member": [{"name": "ADOM"}]}, id="with scope member"), + pytest.param({}, id="without scope member"), + ), + ) + @pytest.mark.parametrize( + "get_global_service_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + @pytest.mark.parametrize( + "delete_adom_service_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + def test_delete_global_service( + get_global_service_data: dict[str, Any], + get_global_service_status: dict[str, Any], + delete_adom_service_status: dict[str, Any], + monkeypatch: MonkeyPatch, + ) -> None: + """Test fmg delete_global_service""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.get_global_service", + MagicMock( + return_value={ + "data": get_global_service_data, + "status": get_global_service_status, + } + ), + ) + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.delete_adom_service", + MagicMock(return_value={"status": delete_adom_service_status}), + ) + fmg = FortiManager("host", "", "") + assert fmg.delete_global_service("dummy")["status"]["code"] in [0, 7, 601] + + @staticmethod + @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") + def test_delete_global_service_dry() -> None: + """Test fmg delete_global_service with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_global_service("dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + @pytest.mark.parametrize( + "get_global_service_group_data", + ( + pytest.param({"scope member": [{"name": "ADOM"}]}, id="with scope member"), + pytest.param({}, id="without scope member"), + ), + ) + @pytest.mark.parametrize( + "get_global_service_group_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + @pytest.mark.parametrize( + "delete_adom_service_group_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + def test_delete_global_service_group( + get_global_service_group_data: dict[str, Any], + get_global_service_group_status: dict[str, Any], + delete_adom_service_group_status: dict[str, Any], + monkeypatch: MonkeyPatch, + ) -> None: + """Test fmg delete_global_service_group""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.get_global_service_group", + MagicMock( + return_value={ + "data": get_global_service_group_data, + "status": get_global_service_group_status, + } + ), + ) + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.delete_adom_service_group", + MagicMock(return_value={"status": delete_adom_service_group_status}), + ) + fmg = FortiManager("host", "", "") + assert fmg.delete_global_service_group("dummy")["status"]["code"] in [0, 7, 601] + + @staticmethod + @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") + def test_delete_global_service_group_dry() -> None: + """Test fmg delete_global_service_group with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_global_service_group("dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + @staticmethod def test_get_adoms(monkeypatch: MonkeyPatch) -> None: """Test fmg get adoms""" @@ -150,7 +566,7 @@ def test_get_adoms(monkeypatch: MonkeyPatch) -> None: ), ) assert FortiManager("host", "", "").get_adoms() == [{"name": "dummy"}] - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"method": "get", "params": [{"url": "/dvmdb/adom"}], "session": ""}, @@ -170,6 +586,122 @@ def test_get_adoms_http_error(monkeypatch: MonkeyPatch) -> None: FortiManager("", "", "").get_adoms() assert "HTTP/400 Bad Request" in str(err.value) + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + @pytest.mark.parametrize( + "scope_member", + ( + pytest.param(True, id="with scope member"), + pytest.param(False, id="without scope member"), + ), + ) + def test_get_global_address(scope_member: bool) -> None: + """Test fmg get_global_address""" + fmg = FortiManager("host", "", "") + assert fmg.get_global_address("dummy", scope_member=scope_member)["status"]["code"] == 0 + expected_call: list[Any] = ["/pm/config/global/obj/firewall/address/dummy"] + if scope_member: + expected_call.append({"option": ["scope member"]}) + + FortiManager.api_get.assert_called_with(*expected_call) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + def test_get_global_addresses() -> None: + """Test fmg get_global_addresses""" + assert FortiManager("host", "", "").get_global_addresses()["status"]["code"] == 0 + FortiManager.api_get.assert_called_with( + "/pm/config/global/obj/firewall/address", timeout=10 + ) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + @pytest.mark.parametrize( + "scope_member", + ( + pytest.param(True, id="with scope member"), + pytest.param(False, id="without scope member"), + ), + ) + def test_get_global_address_group(scope_member: bool) -> None: + """Test fmg get_global_address_group""" + fmg = FortiManager("host", "", "") + assert ( + fmg.get_global_address_group("dummy", scope_member=scope_member)["status"]["code"] == 0 + ) + expected_call: list[Any] = ["/pm/config/global/obj/firewall/addrgrp/dummy"] + if scope_member: + expected_call.append({"option": ["scope member"]}) + FortiManager.api_get.assert_called_with(*expected_call) + + # assert fmg.get_global_address_group("dummy")["status"]["code"] == 0 + # FortiManager.api_get.assert_called_with("/pm/config/global/obj/firewall/addrgrp/dummy") + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + def test_get_global_address_groups() -> None: + """Test fmg get_global_address_groups""" + assert FortiManager("host", "", "").get_global_address_groups()["status"]["code"] == 0 + FortiManager.api_get.assert_called_with( + "/pm/config/global/obj/firewall/addrgrp", timeout=10 + ) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + @pytest.mark.parametrize( + "scope_member", + ( + pytest.param(True, id="with scope member"), + pytest.param(False, id="without scope member"), + ), + ) + def test_get_global_service(scope_member: bool) -> None: + """Test fmg get_global_service""" + fmg = FortiManager("host", "", "") + assert fmg.get_global_service("dummy", scope_member=scope_member)["status"]["code"] == 0 + expected_call: list[Any] = ["/pm/config/global/obj/firewall/service/custom/dummy"] + if scope_member: + expected_call.append({"option": ["scope member"]}) + FortiManager.api_get.assert_called_with(*expected_call) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + def test_get_global_services() -> None: + """Test fmg get_global_services""" + assert FortiManager("host", "", "").get_global_services()["status"]["code"] == 0 + FortiManager.api_get.assert_called_with( + "/pm/config/global/obj/firewall/service/custom", timeout=10 + ) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + @pytest.mark.parametrize( + "scope_member", + ( + pytest.param(True, id="with scope member"), + pytest.param(False, id="without scope member"), + ), + ) + def test_get_global_service_group(scope_member: bool) -> None: + """Test fmg get_global_service_group""" + fmg = FortiManager("host", "", "") + assert ( + fmg.get_global_service_group("dummy", scope_member=scope_member)["status"]["code"] == 0 + ) + expected_call: list[Any] = ["/pm/config/global/obj/firewall/service/group/dummy"] + if scope_member: + expected_call.append({"option": ["scope member"]}) + FortiManager.api_get.assert_called_with(*expected_call) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + def test_get_global_service_groups() -> None: + """Test fmg get_global_service_groups""" + assert FortiManager("host", "", "").get_global_service_groups()["status"]["code"] == 0 + FortiManager.api_get.assert_called_with( + "/pm/config/global/obj/firewall/service/group", timeout=10 + ) + @staticmethod @pytest.mark.parametrize( "response, expected", @@ -189,7 +721,7 @@ def test_get_version(response: MagicMock, expected: str, monkeypatch: MonkeyPatc MagicMock(return_value=ResponseMock(json=response, status_code=200)), ) assert FortiManager("host", "", "").get_version() == expected - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"method": "get", "params": [{"url": "/sys/status"}], "session": ""}, @@ -218,7 +750,7 @@ def test_login(monkeypatch: MonkeyPatch) -> None: ) fmg = FortiManager("host", "user", "pass") assert fmg.login() == 200 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -255,7 +787,7 @@ def test_login_with_session_path(monkeypatch: MonkeyPatch) -> None: fmg.hostname = "test_fmg" fmg.session_path = "tests/data" assert fmg.login() == 200 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -295,7 +827,7 @@ def test_login_with_session_path_invalid_key(monkeypatch: MonkeyPatch) -> None: fmg.hostname = "test_fmg" fmg.session_path = "tests/data" assert fmg.login() == 200 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -334,7 +866,7 @@ def test_login_with_session_path_not_found(temp_dir: str, monkeypatch: MonkeyPat fmg.hostname = "test_fmg_dummy" fmg.session_path = temp_dir assert fmg.login() == 200 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -364,7 +896,7 @@ def test_logout(monkeypatch: MonkeyPatch) -> None: ) fortimanager = FortiManager("host", "user", "pass") assert fortimanager.logout() == 200 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -389,7 +921,7 @@ def test_post_single(monkeypatch: MonkeyPatch) -> None: ), ) assert not FortiManager("host", "", "").post("ADOM", {"params": [{"url": "{adom}"}]}) - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, @@ -410,7 +942,7 @@ def test_post_multiple(monkeypatch: MonkeyPatch) -> None: ), ) assert not FortiManager("host", "", "").post("ADOM", [{"params": [{"url": "{adom}"}]}]) - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, @@ -431,7 +963,7 @@ def test_post_single_global(monkeypatch: MonkeyPatch) -> None: ), ) assert not FortiManager("host", "", "").post("global", {"params": [{"url": "{adom}"}]}) - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "global"}], "session": ""}, @@ -457,7 +989,7 @@ def test_post_response_error(monkeypatch: MonkeyPatch) -> None: assert FortiManager("host", "", "").post("ADOM", [{"params": [{"url": "{adom}"}]}]) == [ "dummy: dummy (code: 444)" ] - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, @@ -476,7 +1008,7 @@ def test_post_http_error(monkeypatch: MonkeyPatch) -> None: with pytest.raises(APIError) as err: FortiManager("host", "", "").post("ADOM", [{"params": [{"url": "{adom}"}]}]) assert "HTTP/444 general API Error" in str(err.value) - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, @@ -519,7 +1051,7 @@ def test_wait_for_task(monkeypatch: MonkeyPatch) -> None: messages = FortiManager("host", "", "").wait_for_task(222, 0) assert isinstance(messages, list) assert messages[0]["task_id"] == 222 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"method": "get", "params": [{"url": "/task/task/222/line"}], "session": ""},