-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
0071cb1
commit be02ee7
Showing
6 changed files
with
317 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
Empty file.
127 changes: 127 additions & 0 deletions
127
uniconfig/python/frinx_worker/uniconfig/device_discovery.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
from ipaddress import IPv4Address | ||
from ipaddress import IPv6Address | ||
|
||
import pydantic | ||
import requests | ||
from frinx.common.conductor_enums import TaskResultStatus | ||
from frinx.common.frinx_rest import UNICONFIG_URL_BASE | ||
from frinx.common.type_aliases import ListStr | ||
from frinx.common.worker.service import ServiceWorkersImpl | ||
from frinx.common.worker.service import WorkerImpl | ||
from frinx.common.worker.task_def import TaskDefinition | ||
from frinx.common.worker.task_def import TaskExecutionProperties | ||
from frinx.common.worker.task_def import TaskInput | ||
from frinx.common.worker.task_def import TaskOutput | ||
from frinx.common.worker.task_result import TaskResult | ||
from frinx_api.uniconfig import OperationsDiscoverPostResponse | ||
from frinx_api.uniconfig.device.discovery.discover import Address | ||
from frinx_api.uniconfig.device.discovery.discover import Input | ||
from frinx_api.uniconfig.device.discovery.discover import TcpPortItem | ||
from frinx_api.uniconfig.device.discovery.discover import UdpPortItem | ||
from frinx_api.uniconfig.rest_api import Discover | ||
from pydantic import Field | ||
from pydantic import IPvAnyAddress | ||
from pydantic import IPvAnyNetwork | ||
|
||
from . import class_to_json | ||
from .util import get_list_of_ip_addresses | ||
from .util import parse_ranges | ||
|
||
|
||
class DeviceDiscoveryWorkers(ServiceWorkersImpl): | ||
class DeviceDiscoveryWorker(WorkerImpl): | ||
|
||
class ExecutionProperties(TaskExecutionProperties): | ||
exclude_empty_inputs: bool = False | ||
transform_string_to_json_valid: bool = True | ||
|
||
class WorkerDefinition(TaskDefinition): | ||
|
||
name: str = 'UNICONFIG_device_discovery' | ||
description: str = 'Verification of reachable devices in a network.' | ||
labels: ListStr = ['BASICS', 'UNICONFIG'] | ||
|
||
class WorkerInput(TaskInput): | ||
ip: list[Address] | ||
tcp_port: list[TcpPortItem] | None = None | ||
udp_port: list[UdpPortItem] | None = Field(None, max_length=500) | ||
|
||
@pydantic.field_validator('ip', mode='before') | ||
@classmethod | ||
def validate_ip(cls, ip: str) -> list[Address]: | ||
|
||
ip_list = get_list_of_ip_addresses(ip) | ||
if len(ip_list) == 1: | ||
if '/' in ip_list[0]: | ||
address = Address(network=str(IPvAnyNetwork(ip_list[0]))) | ||
else: | ||
address = Address(ip_address=str(IPvAnyAddress(ip_list[0]))) | ||
else: | ||
try: | ||
address = Address( | ||
start_ipv4_address=str(IPv4Address(ip_list[0])), | ||
end_ipv4_address=str(IPv4Address(ip_list[1])) | ||
) | ||
except ValueError: | ||
address = Address( | ||
start_ipv6_address=str(IPv6Address(ip_list[0])), | ||
end_ipv6_address=str(IPv6Address(ip_list[1])) | ||
) | ||
|
||
return [address] | ||
|
||
@pydantic.field_validator('tcp_port', mode='before') | ||
@classmethod | ||
def validate_tcp(cls, tcp_port: str) -> list[TcpPortItem]: | ||
return [TcpPortItem(port=p) for p in parse_ranges(tcp_port.split(','))] | ||
|
||
@pydantic.field_validator('udp_port', mode='before') | ||
@classmethod | ||
def validate_udp(cls, udp_port: str) -> list[UdpPortItem]: | ||
return [UdpPortItem(port=p) for p in parse_ranges(udp_port.split(','))] | ||
|
||
class WorkerOutput(TaskOutput): | ||
output: OperationsDiscoverPostResponse | ||
|
||
def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: | ||
if Discover.request is None: | ||
raise Exception(f'Failed to create request {Discover.request}') | ||
|
||
if Discover.response is None: | ||
raise Exception(f'Failed to create request {Discover.response}') | ||
|
||
template = Input( | ||
address=worker_input.ip, | ||
tcp_port=worker_input.tcp_port or None, | ||
udp_port=worker_input.udp_port or None, | ||
) | ||
|
||
response = requests.request( | ||
url=UNICONFIG_URL_BASE + Discover.uri, | ||
method=Discover.method, | ||
data=class_to_json( | ||
Discover.request( | ||
input=template | ||
), | ||
), | ||
) | ||
|
||
if response.ok: | ||
status = TaskResultStatus.COMPLETED | ||
|
||
return TaskResult( | ||
status=status, | ||
output=self.WorkerOutput( | ||
output=Discover.response( | ||
output=response.json()['output'] | ||
) | ||
) | ||
) | ||
|
||
status = TaskResultStatus.FAILED | ||
return TaskResult( | ||
status=status, | ||
output=self.WorkerOutput( | ||
output=response.json() | ||
) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
|
||
|
||
def parse_ranges(list_of_strings: list[str]) -> list[int]: | ||
"""Takes list containing integers (str) and ranges (str) | ||
and returns corresponding list of integers. | ||
Example: | ||
input: ["3", "5-6" , "8", "10 - 12"] | ||
output: [3, 5, 6, 8, 10, 11, 12] | ||
""" | ||
output = [] | ||
for x in list_of_strings: | ||
try: | ||
number = int(x) | ||
output.append(number) | ||
except ValueError: | ||
try: | ||
range_start, range_stop = map(int, x.split('-')) | ||
except Exception: | ||
# continue | ||
raise ValueError(f"Value '{x}' does not represent an integer.") | ||
|
||
output += list(range(range_start, range_stop + 1)) | ||
return output | ||
|
||
|
||
def get_list_of_ip_addresses(ip_addresses: str) -> list[str]: | ||
""" | ||
Creates list of IP addresses. | ||
Args: | ||
ip_addresses: | ||
String containing valid IP subnet e.g.: 172.16.1.0/24 | ||
or IP address range e.g.: 172.16.1.10-172.16.1.20 | ||
Returns: | ||
Union[list, str]: | ||
List containing first and last IP address of given range | ||
e.g.: ['172.16.1.10', '172.16.1.20'] | ||
or '172.16.1.100/32' | ||
""" | ||
ip_addresses = ip_addresses.replace(' ', '') | ||
|
||
if '/' in ip_addresses: | ||
return [ip_addresses] | ||
|
||
if '-' in ip_addresses: | ||
ip_addresses_list = ip_addresses.split(sep='-', maxsplit=1) | ||
start_ip = ip_addresses_list[0] | ||
end_ip = ip_addresses_list[1] | ||
|
||
return [start_ip, end_ip] | ||
|
||
else: | ||
return [ip_addresses] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import unittest | ||
|
||
from frinx_api.uniconfig.device.discovery.discover import Address | ||
from frinx_api.uniconfig.device.discovery.discover import TcpPortItem | ||
from frinx_api.uniconfig.device.discovery.discover import UdpPortItem | ||
|
||
from uniconfig.python.frinx_worker.uniconfig.device_discovery import DeviceDiscoveryWorkers # type: ignore | ||
|
||
|
||
class TestDeviceDiscovery(unittest.TestCase): | ||
|
||
def test_tcp_validation_list(self) -> None: | ||
tcp_port = '21,22,23' | ||
expected = [TcpPortItem(port=21), TcpPortItem(port=22), TcpPortItem(port=23)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_tcp(tcp_port=tcp_port)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_tcp_validation_range(self) -> None: | ||
tcp_port = '21-23' | ||
expected = [TcpPortItem(port=21), TcpPortItem(port=22), TcpPortItem(port=23)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_tcp(tcp_port)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_tcp_validation_list_range(self) -> None: | ||
tcp_port = '21-23,25' | ||
expected = [TcpPortItem(port=21), TcpPortItem(port=22), TcpPortItem(port=23), TcpPortItem(port=25)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_tcp(tcp_port)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_udp_validation_list(self) -> None: | ||
udp_port = '21,22,23' | ||
expected = [UdpPortItem(port=21), UdpPortItem(port=22), UdpPortItem(port=23)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_udp(udp_port=udp_port)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_udp_validation_range(self) -> None: | ||
udp_port = '21-23' | ||
expected = [UdpPortItem(port=21), UdpPortItem(port=22), UdpPortItem(port=23)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_udp(udp_port=udp_port)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_udp_validation_list_range(self) -> None: | ||
udp_port = '21-23,25' | ||
expected = [UdpPortItem(port=21), UdpPortItem(port=22), UdpPortItem(port=23), UdpPortItem(port=25)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_udp(udp_port=udp_port)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_validate_ip_single_ip_v4(self) -> None: | ||
ip = '192.168.0.59' | ||
expected = [Address(end_ipv6_address=None, ip_address='192.168.0.59', hostname=None, end_ipv4_address=None, | ||
start_ipv4_address=None, start_ipv6_address=None, network=None)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_validate_ip_range_ip_v4(self) -> None: | ||
ip = '192.168.0.59-192.168.0.90' | ||
expected = [Address( | ||
end_ipv6_address=None, ip_address=None, hostname=None, end_ipv4_address='192.168.0.90', | ||
start_ipv4_address='192.168.0.59', start_ipv6_address=None, network=None)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_validate_ip_network_v4(self) -> None: | ||
ip = '192.168.0.0/24' | ||
expected = [Address( | ||
end_ipv6_address=None, ip_address=None, hostname=None, end_ipv4_address=None, start_ipv4_address=None, | ||
start_ipv6_address=None, network='192.168.0.0/24')] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_validate_ip_single_ip_v6(self) -> None: | ||
ip = '0000:0000:0000:0000:0000:ffff:c0a8:003b' | ||
expected = [Address(end_ipv6_address=None, ip_address='::ffff:c0a8:3b', hostname=None, end_ipv4_address=None, | ||
start_ipv4_address=None, start_ipv6_address=None, network=None)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
ip = '::ffff:c0a8:3b' | ||
expected = [Address(end_ipv6_address=None, ip_address='::ffff:c0a8:3b', hostname=None, end_ipv4_address=None, | ||
start_ipv4_address=None, start_ipv6_address=None, network=None)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_validate_ip_range_ip_v6(self) -> None: | ||
ip = '0000:0000:0000:0000:0000:ffff:c0a8:003b-0000:0000:0000:0000:0000:ffff:c0a8:005a' | ||
expected = [Address( | ||
end_ipv6_address='::ffff:c0a8:5a', ip_address=None, hostname=None, end_ipv4_address=None, | ||
start_ipv4_address=None, start_ipv6_address='::ffff:c0a8:3b', network=None)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
ip = '::ffff:c0a8:3b-::ffff:c0a8:5a' | ||
expected = [Address( | ||
end_ipv6_address='::ffff:c0a8:5a', ip_address=None, hostname=None, end_ipv4_address=None, | ||
start_ipv4_address=None, start_ipv6_address='::ffff:c0a8:3b', network=None)] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
def test_validate_ip_network_v6(self) -> None: | ||
ip = '::ffff:c0a8:0/128' | ||
expected = [Address( | ||
end_ipv6_address=None, ip_address=None, hostname=None, end_ipv4_address=None, start_ipv4_address=None, | ||
start_ipv6_address=None, network='::ffff:c0a8:0/128')] | ||
result = (DeviceDiscoveryWorkers. | ||
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip)) | ||
assert expected == result | ||
assert isinstance(result, list) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |