Skip to content

Commit

Permalink
♻️ Move the fortigate api_get method to a low level method
Browse files Browse the repository at this point in the history
  • Loading branch information
patrikspiess committed Oct 28, 2024
1 parent bae463e commit ff66437
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 188 deletions.
26 changes: 24 additions & 2 deletions fotoobo/fortinet/fortigate.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def api( # pylint: disable=too-many-arguments
payload: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
) -> requests.models.Response:
"""
API request to a FortiGate device.
"""Native API request to a FortiGate.
It uses the super.api method but it has to enrich the payload in post requests with the
needed session key.
Expand All @@ -70,6 +70,28 @@ def api( # pylint: disable=too-many-arguments
method, url, payload=payload, params=params, timeout=timeout, headers=headers
)

def api_get(self, url: str, vdom: str = "*", timeout: Optional[float] = None) -> list[Any]:
"""Low level GET request to a FortiGate.
This gets the response from a single API request to a FortiGate and returns it as a fotoobo
Results object.
Args:
url: The API endpoint to access
vdom: The VDOM to access ("vdom1" or "vdom1,vdom2" or "*")
timeout: The time to wait for a response from the FortiGate
Returns:
The Result object with all the results as list (even if only one result is returned)
"""
params = {"vdom": vdom}
response = self.api(method="get", url=url, params=params, timeout=timeout)
data: list[Any] = (
[response.json()] if isinstance(response.json(), dict) else response.json()
) # this is to listify the data from the response

return data

def backup(self, timeout: int = 10) -> str:
"""
Get the configuration backup from a FortiGate.
Expand Down
19 changes: 13 additions & 6 deletions fotoobo/tools/fgt/cmdb/firewall/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from pathlib import Path
from typing import Any

from fotoobo.fortinet.fortigate import FortiGate
from fotoobo.helpers.config import config
from fotoobo.helpers.result import Result
from fotoobo.tools.fgt.get import api_get
from fotoobo.inventory import Inventory


def get_cmdb_firewall_address(
Expand All @@ -14,15 +16,20 @@ def get_cmdb_firewall_address(
The FortiGate api endpoint is: /cmdb/firewall/address
"""
result_raw = api_get(host=host, vdom=vdom, url=f"/cmdb/firewall/address/{name}")
inventory = Inventory(config.inventory_file)
fgt: FortiGate = inventory.get_item(host, "fortigate")
result = Result[list[Any]]()

address_list = fgt.api_get(url=f"/cmdb/firewall/address/{name}", vdom=vdom)
result.push_result(key=host, data=address_list)

if output_file:
result_raw.save_raw(file=Path(output_file), key=host)
result.push_result(key=host, data=address_list)
result.save_raw(file=Path(output_file), key=host)

result = Result[list[Any]]()
assets = []
if result_raw.get_result(host):
for vd in result_raw.get_result(host):
if address_list:
for vd in address_list:
for asset in vd["results"]:

data: dict[str, str] = {
Expand Down
18 changes: 12 additions & 6 deletions fotoobo/tools/fgt/cmdb/firewall/addrgrp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from pathlib import Path
from typing import Any

from fotoobo.fortinet.fortigate import FortiGate
from fotoobo.helpers.config import config
from fotoobo.helpers.result import Result
from fotoobo.tools.fgt.get import api_get
from fotoobo.inventory import Inventory


def get_cmdb_firewall_addrgrp(
Expand All @@ -14,15 +16,19 @@ def get_cmdb_firewall_addrgrp(
The FortiGate api endpoint is: /cmdb/firewall/addrgrp
"""
result_raw = api_get(host=host, vdom=vdom, url=f"/cmdb/firewall/addrgrp/{name}")
inventory = Inventory(config.inventory_file)
fgt: FortiGate = inventory.get_item(host, "fortigate")
result = Result[list[Any]]()

addrgrp_list = fgt.api_get(url=f"/cmdb/firewall/addrgrp/{name}", vdom=vdom)

if output_file:
result_raw.save_raw(file=Path(output_file), key=host)
result.push_result(key=host, data=addrgrp_list)
result.save_raw(file=Path(output_file), key=host)

result = Result[list[Any]]()
assets = []
if result_raw.get_result(host):
for vd in result_raw.get_result(host):
if addrgrp_list:
for vd in addrgrp_list:
for asset in vd["results"]:
# print(asset)
data: dict[str, str] = {
Expand Down
18 changes: 12 additions & 6 deletions fotoobo/tools/fgt/cmdb/firewall/service_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from pathlib import Path
from typing import Any

from fotoobo.fortinet.fortigate import FortiGate
from fotoobo.helpers.config import config
from fotoobo.helpers.result import Result
from fotoobo.tools.fgt.get import api_get
from fotoobo.inventory import Inventory


def get_cmdb_firewall_service_custom(
Expand All @@ -14,15 +16,19 @@ def get_cmdb_firewall_service_custom(
The FortiGate api endpoint is: /cmdb/firewall.service/custom
"""
result_raw = api_get(host=host, vdom=vdom, url=f"/cmdb/firewall.service/custom/{name}")
inventory = Inventory(config.inventory_file)
fgt: FortiGate = inventory.get_item(host, "fortigate")
result = Result[list[Any]]()

service_custom_list = fgt.api_get(url=f"/cmdb/firewall.service/custom/{name}", vdom=vdom)

if output_file:
result_raw.save_raw(file=Path(output_file), key=host)
result.push_result(key=host, data=service_custom_list)
result.save_raw(file=Path(output_file), key=host)

result = Result[list[Any]]()
assets = []
if result_raw.get_result(host):
for vd in result_raw.get_result(host):
if service_custom_list:
for vd in service_custom_list:
for asset in vd["results"]:

data: dict[str, str] = {
Expand Down
18 changes: 12 additions & 6 deletions fotoobo/tools/fgt/cmdb/firewall/service_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from pathlib import Path
from typing import Any

from fotoobo.fortinet.fortigate import FortiGate
from fotoobo.helpers.config import config
from fotoobo.helpers.result import Result
from fotoobo.tools.fgt.get import api_get
from fotoobo.inventory import Inventory


def get_cmdb_firewall_service_group(
Expand All @@ -14,15 +16,19 @@ def get_cmdb_firewall_service_group(
The FortiGate api endpoint is: /cmdb/firewall.service/group
"""
result_raw = api_get(host=host, vdom=vdom, url=f"/cmdb/firewall/addrgrp/{name}")
inventory = Inventory(config.inventory_file)
fgt: FortiGate = inventory.get_item(host, "fortigate")
result = Result[list[Any]]()

service_group_list = fgt.api_get(url=f"/cmdb/firewall/addrgrp/{name}", vdom=vdom)

if output_file:
result_raw.save_raw(file=Path(output_file), key=host)
result.push_result(key=host, data=service_group_list)
result.save_raw(file=Path(output_file), key=host)

result = Result[list[Any]]()
assets = []
if result_raw.get_result(host):
for vd in result_raw.get_result(host):
if service_group_list:
for vd in service_group_list:
for asset in vd["results"]:
data: dict[str, str] = {
"name": asset["name"],
Expand Down
45 changes: 23 additions & 22 deletions fotoobo/tools/fgt/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import concurrent.futures
import logging
from typing import Any, Optional, Tuple
from typing import Optional, Tuple

from rich.progress import Progress

Expand All @@ -17,31 +17,32 @@
log = logging.getLogger("fotoobo")


def api_get(
host: str, url: str = "", vdom: str = "*", timeout: Optional[float] = None
) -> Result[list[Any]]:
"""Native GET request to a FortiGate.
# def api_get(
# host: str, url: str = "", vdom: str = "*", timeout: Optional[float] = None
# ) -> Result[list[Any]]:
# """Native GET request to a FortiGate.

This gets the response from a single API request to a FortiGate and returns it as a fotoobo
Results object.
# This gets the response from a single API request to a FortiGate and returns it as a fotoobo
# Results object.

Args:
host: The host from the inventory to send the GET requests to
url: The API endpoint to access
vdom: The VDOM to access ("vdom1" or "vdom1,vdom2" or "*")
# Args:
# host: The host from the inventory to send the GET requests to
# url: The API endpoint to access
# vdom: The VDOM to access ("vdom1" or "vdom1,vdom2" or "*")

Returns:
The Result object with all the results as list (even if only one result is returned)
"""
inventory = Inventory(config.inventory_file)
fgt: FortiGate = inventory.get_item(host, "fortigate")
result = Result[list[Any]]()
params = {"vdom": vdom}
response = fgt.api(method="get", url=url, params=params, timeout=timeout)
data = [response.json()] if isinstance(response.json(), dict) else response.json() # listify
result.push_result(host, data=data)
# Returns:
# The Result object with all the results as list (even if only one result is returned)
# """
# inventory = Inventory(config.inventory_file)
# fgt: FortiGate = inventory.get_item(host, "fortigate")
# result = Result[list[Any]]()

return result
# params = {"vdom": vdom}
# response = fgt.api(method="get", url=url, params=params, timeout=timeout)
# data = [response.json()] if isinstance(response.json(), dict) else response.json() # listify
# result.push_result(host, data=data)

# return result


def version(host: Optional[str] = None) -> Result[str]:
Expand Down
13 changes: 13 additions & 0 deletions tests/fortinet/test_fortigate.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ def test_api(monkeypatch: MonkeyPatch) -> None:
"get", "dummy", payload=None, params=None, timeout=None, headers=None
)

def test_api_get(self, monkeypatch: MonkeyPatch) -> None:
"""Test the FortiGate api_get method"""
response_mock = ResponseMock(json=[{"http_method": "GET", "results": []}], status_code=200)
monkeypatch.setattr(
"fotoobo.fortinet.fortigate.FortiGate.api", MagicMock(return_value=response_mock)
)
fortigate = FortiGate("dummy_hostname", "token")
result = fortigate.api_get("/test/dummy/fake")
assert result == [{"http_method": "GET", "results": []}]
FortiGate.api.assert_called_with(
method="get", url="/test/dummy/fake", params={"vdom": "*"}, timeout=None
)

@staticmethod
def test_backup(monkeypatch: MonkeyPatch) -> None:
"""Test the FortiGate backup method"""
Expand Down
76 changes: 35 additions & 41 deletions tests/tools/fgt/cmdb/firewall/test_address.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
# pylint: disable=no-member
# mypy: disable-error-code=attr-defined
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock

import pytest
from _pytest.monkeypatch import MonkeyPatch

import fotoobo
from fotoobo.helpers.result import Result
from fotoobo.tools.fgt.cmdb.firewall import get_cmdb_firewall_address


Expand All @@ -26,44 +24,40 @@ def inventory_file(monkeypatch: MonkeyPatch) -> None:

def test_get_cmdb_firewall_address(monkeypatch: MonkeyPatch) -> None:
"""Test the get cmdb firewall address method"""
result_mock = Result[list[Any]]()
result_mock.push_result(
"test_fgt_1",
[
{
"results": [
{
"name": "dummy_1",
"type": "fqdn",
"fqdn": "dummy.local",
},
{
"name": "dummy_2",
"type": "geography",
"country": "dummy-country",
},
{
"name": "dummy_3",
"type": "ipmask",
"subnet": "1.1.1.1 2.2.2.2",
},
{
"name": "dummy_4",
"type": "iprange",
"start-ip": "1.1.1.1",
"end-ip": "2.2.2.2",
},
{
"name": "dummy_5",
"type": "dummy-type",
},
],
"vdom": "vdom_1",
}
],
)
result_mock = [
{
"results": [
{
"name": "dummy_1",
"type": "fqdn",
"fqdn": "dummy.local",
},
{
"name": "dummy_2",
"type": "geography",
"country": "dummy-country",
},
{
"name": "dummy_3",
"type": "ipmask",
"subnet": "1.1.1.1 2.2.2.2",
},
{
"name": "dummy_4",
"type": "iprange",
"start-ip": "1.1.1.1",
"end-ip": "2.2.2.2",
},
{
"name": "dummy_5",
"type": "dummy-type",
},
],
"vdom": "vdom_1",
}
]
monkeypatch.setattr(
"fotoobo.tools.fgt.cmdb.firewall.address.api_get", MagicMock(return_value=result_mock)
"fotoobo.fortinet.fortigate.FortiGate.api_get", MagicMock(return_value=result_mock)
)
monkeypatch.setattr("fotoobo.helpers.result.Result.save_raw", MagicMock(return_value=True))
result = get_cmdb_firewall_address("test_fgt_1", "", "", "test.json")
Expand All @@ -74,8 +68,8 @@ def test_get_cmdb_firewall_address(monkeypatch: MonkeyPatch) -> None:
assert data[2]["content"] == "1.1.1.1/2.2.2.2"
assert data[3]["content"] == "1.1.1.1 - 2.2.2.2"
assert data[4]["content"] == ""
fotoobo.tools.fgt.cmdb.firewall.address.api_get.assert_called_with(
host="test_fgt_1", vdom="", url="/cmdb/firewall/address/"
fotoobo.fortinet.fortigate.FortiGate.api_get.assert_called_with(
url="/cmdb/firewall/address/", vdom=""
)
fotoobo.helpers.result.Result.save_raw.assert_called_with(
file=Path("test.json"), key="test_fgt_1"
Expand Down
Loading

0 comments on commit ff66437

Please sign in to comment.