Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[uniconfig] add device discovery task #23

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions uniconfig/python/frinx_worker/uniconfig/device_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from ipaddress import IPv4Address
from ipaddress import IPv6Address

import pydantic
import requests
from frinx.common.conductor_enums import TaskResultStatus
from frinx.common.frinx_rest import UNICONFIG_URL_BASE
from frinx.common.type_aliases import ListStr
from frinx.common.worker.service import ServiceWorkersImpl
from frinx.common.worker.service import WorkerImpl
from frinx.common.worker.task_def import TaskDefinition
from frinx.common.worker.task_def import TaskExecutionProperties
from frinx.common.worker.task_def import TaskInput
from frinx.common.worker.task_def import TaskOutput
from frinx.common.worker.task_result import TaskResult
from frinx_api.uniconfig import OperationsDiscoverPostResponse
from frinx_api.uniconfig.device.discovery.discover import Address
from frinx_api.uniconfig.device.discovery.discover import Input
from frinx_api.uniconfig.device.discovery.discover import TcpPortItem
from frinx_api.uniconfig.device.discovery.discover import UdpPortItem
from frinx_api.uniconfig.rest_api import Discover
from pydantic import Field
from pydantic import IPvAnyAddress
from pydantic import IPvAnyNetwork

from . import class_to_json
from .util import get_list_of_ip_addresses
from .util import parse_ranges


class DeviceDiscoveryWorkers(ServiceWorkersImpl):
class DeviceDiscoveryWorker(WorkerImpl):

class ExecutionProperties(TaskExecutionProperties):
exclude_empty_inputs: bool = False
transform_string_to_json_valid: bool = True

class WorkerDefinition(TaskDefinition):

name: str = 'UNICONFIG_device_discovery'
description: str = 'Verification of reachable devices in a network.'
labels: ListStr = ['BASICS', 'UNICONFIG']

class WorkerInput(TaskInput):
ip: list[Address]
tcp_port: list[TcpPortItem] | None = None
udp_port: list[UdpPortItem] | None = Field(None, max_length=500)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why udp list has max_length but tcp list doesn't have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is because UDP port scanning is very slow. If I didn't have it restricted, the task crashed after 120 seconds on timeout error. Maros gave me a tip that I can limit the maximum number of ports - number 500 I set "randomly" as the maximum value when the task is not yet crashing...


@pydantic.field_validator('ip', mode='before')
def validate_ip(cls, ip: str) -> list[Address]:

ip_list = get_list_of_ip_addresses(ip)
if len(ip_list) == 1:
if '/' in ip_list[0]:
address = Address(network=str(IPvAnyNetwork(ip_list[0])))
else:
address = Address(ip_address=str(IPvAnyAddress(ip_list[0])))
else:
try:
address = Address(
start_ipv4_address=str(IPv4Address(ip_list[0])),
end_ipv4_address=str(IPv4Address(ip_list[1]))
)
except ValueError:
address = Address(
start_ipv6_address=str(IPv6Address(ip_list[0])),
end_ipv6_address=str(IPv6Address(ip_list[1]))
)

return [address]

@pydantic.field_validator('tcp_port', mode='before')
def validate_tcp(cls, tcp_port: str) -> list[TcpPortItem]:
return [TcpPortItem(port=p) for p in parse_ranges(tcp_port.split(','))]

@pydantic.field_validator('udp_port', mode='before')
def validate_udp(cls, udp_port: str) -> list[UdpPortItem]:
return [UdpPortItem(port=p) for p in parse_ranges(udp_port.split(','))]

class WorkerOutput(TaskOutput):
output: OperationsDiscoverPostResponse

def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]:
if Discover.request is None:
raise Exception(f'Failed to create request {Discover.request}')

if Discover.response is None:
raise Exception(f'Failed to create request {Discover.response}')

template = Input(
address=worker_input.ip,
tcp_port=worker_input.tcp_port or None,
udp_port=worker_input.udp_port or None,
)

response = requests.request(
url=UNICONFIG_URL_BASE + Discover.uri,
method=Discover.method,
data=class_to_json(
Discover.request(
input=template
),
),
)

if response.ok:
status = TaskResultStatus.COMPLETED

return TaskResult(
status=status,
output=self.WorkerOutput(
output=Discover.response(
output=response.json()['output']
)
)
)

status = TaskResultStatus.FAILED
return TaskResult(
status=status,
output=self.WorkerOutput(
output=response.json()
)
)
53 changes: 53 additions & 0 deletions uniconfig/python/frinx_worker/uniconfig/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@


def parse_ranges(list_of_strings: list[str]) -> list[int]:
"""Takes list containing integers (str) and ranges (str)
and returns corresponding list of integers.
Example:
input: ["3", "5-6" , "8", "10 - 12"]
output: [3, 5, 6, 8, 10, 11, 12]
"""
output = []
for x in list_of_strings:
try:
number = int(x)
output.append(number)
except ValueError:
try:
range_start, range_stop = map(int, x.split('-'))
except Exception:
# continue
raise ValueError(f"Value '{x}' does not represent an integer.")

output += list(range(range_start, range_stop + 1))
return output


def get_list_of_ip_addresses(ip_addresses: str) -> list[str]:
"""
Creates list of IP addresses.
Args:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is better if you use standard python tags in doc

ip_addresses:
String containing valid IP subnet e.g.: 172.16.1.0/24
or IP address range e.g.: 172.16.1.10-172.16.1.20
Returns:
Union[list, str]:
List containing first and last IP address of given range
e.g.: ['172.16.1.10', '172.16.1.20']
or '172.16.1.100/32'
"""
ip_addresses = ip_addresses.replace(' ', '')

if '/' in ip_addresses:
return [ip_addresses]

if '-' in ip_addresses:
ip_addresses_list = ip_addresses.split(sep='-', maxsplit=1)
start_ip = ip_addresses_list[0]
end_ip = ip_addresses_list[1]

return [start_ip, end_ip]

else:
return [ip_addresses]
Empty file.
139 changes: 139 additions & 0 deletions uniconfig/python/tests/test_device_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import unittest

from frinx_api.uniconfig.device.discovery.discover import Address
from frinx_api.uniconfig.device.discovery.discover import TcpPortItem
from frinx_api.uniconfig.device.discovery.discover import UdpPortItem

from frinx_worker.uniconfig.device_discovery import DeviceDiscoveryWorkers

# from uniconfig.python.frinx_worker.uniconfig.device_discovery import DeviceDiscoveryWorkers # type: ignore


class TestDeviceDiscovery(unittest.TestCase):

def test_tcp_validation_list(self) -> None:
tcp_port = '21,22,23'
expected = [TcpPortItem(port=21), TcpPortItem(port=22), TcpPortItem(port=23)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_tcp(tcp_port=tcp_port)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_tcp_validation_range(self) -> None:
tcp_port = '21-23'
expected = [TcpPortItem(port=21), TcpPortItem(port=22), TcpPortItem(port=23)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_tcp(tcp_port=tcp_port)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_tcp_validation_list_range(self) -> None:
tcp_port = '21-23,25'
expected = [TcpPortItem(port=21), TcpPortItem(port=22), TcpPortItem(port=23), TcpPortItem(port=25)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_tcp(tcp_port=tcp_port)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_udp_validation_list(self) -> None:
udp_port = '21,22,23'
expected = [UdpPortItem(port=21), UdpPortItem(port=22), UdpPortItem(port=23)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_udp(udp_port=udp_port)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_udp_validation_range(self) -> None:
udp_port = '21-23'
expected = [UdpPortItem(port=21), UdpPortItem(port=22), UdpPortItem(port=23)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_udp(udp_port=udp_port)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_udp_validation_list_range(self) -> None:
udp_port = '21-23,25'
expected = [UdpPortItem(port=21), UdpPortItem(port=22), UdpPortItem(port=23), UdpPortItem(port=25)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_udp(udp_port=udp_port)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_validate_ip_single_ip_v4(self) -> None:
ip = '192.168.0.59'
expected = [Address(end_ipv6_address=None, ip_address='192.168.0.59', hostname=None, end_ipv4_address=None,
start_ipv4_address=None, start_ipv6_address=None, network=None)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_validate_ip_range_ip_v4(self) -> None:
ip = '192.168.0.59-192.168.0.90'
expected = [Address(
end_ipv6_address=None, ip_address=None, hostname=None, end_ipv4_address='192.168.0.90',
start_ipv4_address='192.168.0.59', start_ipv6_address=None, network=None)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_validate_ip_network_v4(self) -> None:
ip = '192.168.0.0/24'
expected = [Address(
end_ipv6_address=None, ip_address=None, hostname=None, end_ipv4_address=None, start_ipv4_address=None,
start_ipv6_address=None, network='192.168.0.0/24')]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_validate_ip_single_ip_v6(self) -> None:
ip = '0000:0000:0000:0000:0000:ffff:c0a8:003b'
expected = [Address(end_ipv6_address=None, ip_address='::ffff:c0a8:3b', hostname=None, end_ipv4_address=None,
start_ipv4_address=None, start_ipv6_address=None, network=None)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) # type: ignore
assert expected == result
assert isinstance(result, list)

ip = '::ffff:c0a8:3b'
expected = [Address(end_ipv6_address=None, ip_address='::ffff:c0a8:3b', hostname=None, end_ipv4_address=None,
start_ipv4_address=None, start_ipv6_address=None, network=None)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_validate_ip_range_ip_v6(self) -> None:
ip = '0000:0000:0000:0000:0000:ffff:c0a8:003b-0000:0000:0000:0000:0000:ffff:c0a8:005a'
expected = [Address(
end_ipv6_address='::ffff:c0a8:5a', ip_address=None, hostname=None, end_ipv4_address=None,
start_ipv4_address=None, start_ipv6_address='::ffff:c0a8:3b', network=None)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) # type: ignore
assert expected == result
assert isinstance(result, list)

ip = '::ffff:c0a8:3b-::ffff:c0a8:5a'
expected = [Address(
end_ipv6_address='::ffff:c0a8:5a', ip_address=None, hostname=None, end_ipv4_address=None,
start_ipv4_address=None, start_ipv6_address='::ffff:c0a8:3b', network=None)]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip=ip)) # type: ignore
assert expected == result
assert isinstance(result, list)

def test_validate_ip_network_v6(self) -> None:
ip = '::ffff:c0a8:0/128'
expected = [Address(
end_ipv6_address=None, ip_address=None, hostname=None, end_ipv4_address=None, start_ipv4_address=None,
start_ipv6_address=None, network='::ffff:c0a8:0/128')]
result = (DeviceDiscoveryWorkers.
DeviceDiscoveryWorker.WorkerInput.validate_ip(ip)) # type: ignore
assert expected == result
assert isinstance(result, list)


if __name__ == '__main__':
unittest.main()