From 217e1470b8f5513368e24543321f4c262479ca8d Mon Sep 17 00:00:00 2001 From: SharUpOff Date: Fri, 4 Oct 2024 20:13:58 +0400 Subject: [PATCH] [ftr] Add IPv6 support --- .github/workflows/pytest.yml | 22 +++ LICENSE.md | 21 +++ gwhosts/dns/__init__.py | 13 +- gwhosts/dns/_casts.py | 26 +++ gwhosts/dns/_exceptions.py | 8 + gwhosts/dns/_parsers.py | 4 +- gwhosts/dns/_serializers.py | 2 +- gwhosts/dns/_tools.py | 21 --- gwhosts/dns/_types.py | 14 ++ gwhosts/dns/parser.py | 2 +- gwhosts/main.py | 12 +- gwhosts/network/__init__.py | 46 +---- gwhosts/network/_casts.py | 76 --------- gwhosts/network/_types.py | 5 - gwhosts/network/_utils.py | 6 +- gwhosts/network/ipv4/__init__.py | 32 ++++ gwhosts/network/ipv4/_casts.py | 53 ++++++ gwhosts/network/ipv4/_types.py | 32 ++++ gwhosts/network/ipv4/_utils.py | 9 + gwhosts/network/ipv6/__init__.py | 31 ++++ gwhosts/network/ipv6/_casts.py | 52 ++++++ gwhosts/network/ipv6/_types.py | 32 ++++ gwhosts/network/ipv6/_utils.py | 9 + gwhosts/performance/__init__.py | 2 +- gwhosts/proxy/__init__.py | 4 +- gwhosts/proxy/_proxy.py | 225 +++++++++++++++++-------- gwhosts/proxy/_types.py | 10 +- gwhosts/py.typed | 0 gwhosts/routes/__init__.py | 2 +- gwhosts/routes/_netlink.py | 147 ++++++++++++---- gwhosts/routes/_rtmsg.py | 28 +++ pyproject.toml | 47 +++++- tests/dns/__init__.py | 0 tests/dns/test_casts.py | 52 ++++++ tests/dns/test_parser.py | 131 ++++++++++++++ tests/dns/test_serializer.py | 138 +++++++++++++++ tests/dns/test_types.py | 28 +++ tests/network/ipv4/__init__.py | 0 tests/network/ipv4/test_casts.py | 123 ++++++++++++++ tests/network/ipv4/test_utils.py | 64 +++++++ tests/network/ipv6/__init__.py | 0 tests/network/{ => ipv6}/test_casts.py | 130 ++++++-------- tests/network/ipv6/test_utils.py | 52 ++++++ tests/network/test_utils.py | 59 ------- tests/performance/__init__.py | 0 tests/performance/test_gc.py | 12 ++ tests/proxy/__init__.py | 0 tests/proxy/test_proxy.py | 51 ++++++ tests/routes/__init__.py | 0 tests/routes/test_netlink.py | 189 +++++++++++++++++++++ tests/test_dns.py | 48 ------ tox.ini | 16 ++ 52 files changed, 1620 insertions(+), 466 deletions(-) create mode 100644 .github/workflows/pytest.yml create mode 100644 LICENSE.md create mode 100644 gwhosts/dns/_casts.py delete mode 100644 gwhosts/dns/_tools.py delete mode 100644 gwhosts/network/_casts.py create mode 100644 gwhosts/network/ipv4/__init__.py create mode 100644 gwhosts/network/ipv4/_casts.py create mode 100644 gwhosts/network/ipv4/_types.py create mode 100644 gwhosts/network/ipv4/_utils.py create mode 100644 gwhosts/network/ipv6/__init__.py create mode 100644 gwhosts/network/ipv6/_casts.py create mode 100644 gwhosts/network/ipv6/_types.py create mode 100644 gwhosts/network/ipv6/_utils.py create mode 100644 gwhosts/py.typed create mode 100644 gwhosts/routes/_rtmsg.py create mode 100644 tests/dns/__init__.py create mode 100644 tests/dns/test_casts.py create mode 100644 tests/dns/test_parser.py create mode 100644 tests/dns/test_serializer.py create mode 100644 tests/dns/test_types.py create mode 100644 tests/network/ipv4/__init__.py create mode 100644 tests/network/ipv4/test_casts.py create mode 100644 tests/network/ipv4/test_utils.py create mode 100644 tests/network/ipv6/__init__.py rename tests/network/{ => ipv6}/test_casts.py (51%) create mode 100644 tests/network/ipv6/test_utils.py delete mode 100644 tests/network/test_utils.py create mode 100644 tests/performance/__init__.py create mode 100644 tests/performance/test_gc.py create mode 100644 tests/proxy/__init__.py create mode 100644 tests/proxy/test_proxy.py create mode 100644 tests/routes/__init__.py create mode 100644 tests/routes/test_netlink.py delete mode 100644 tests/test_dns.py create mode 100644 tox.ini diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml new file mode 100644 index 0000000..c14bf2b --- /dev/null +++ b/.github/workflows/pytest.yml @@ -0,0 +1,22 @@ +name: Pytest + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install .[test] + - name: Running Pytest + run: | + pytest diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..8c72b41 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 SharUpOff + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/gwhosts/dns/__init__.py b/gwhosts/dns/__init__.py index aab9f2d..1a52be7 100644 --- a/gwhosts/dns/__init__.py +++ b/gwhosts/dns/__init__.py @@ -1,8 +1,8 @@ -from gwhosts.dns._parsers import parse, parse_qname -from gwhosts.dns._serializers import serialize -from gwhosts.dns._exceptions import DNSParserError -from gwhosts.dns._tools import remove_ipv6 -from gwhosts.dns._types import Addition, Answer, Authority, DNSData, Header, QName, Question, RRType +from ._casts import qname_to_str, answer_to_str +from ._exceptions import DNSParserError +from ._parsers import parse, parse_qname +from ._serializers import serialize +from ._types import Addition, Answer, Authority, DNSData, Header, QName, Question, RRType __all__ = [ "DNSData", @@ -16,6 +16,7 @@ "RRType", "parse", "serialize", - "remove_ipv6", "parse_qname", + "qname_to_str", + "answer_to_str", ] diff --git a/gwhosts/dns/_casts.py b/gwhosts/dns/_casts.py new file mode 100644 index 0000000..1f7d364 --- /dev/null +++ b/gwhosts/dns/_casts.py @@ -0,0 +1,26 @@ +from ._parsers import parse_qname +from ._types import QName, Answer, RRType +from ..network.ipv4 import ipv4_bytes_to_str +from ..network.ipv6 import ipv6_bytes_to_str + + +def qname_to_str(qname: QName) -> str: + return b".".join(qname).decode("utf8") + + +def qname_bytes_to_str(data: bytes) -> str: + return qname_to_str(parse_qname(data)) + + +_RR_TO_STR = { + RRType.A.value: ipv4_bytes_to_str, + RRType.AAAA.value: ipv6_bytes_to_str, + RRType.CNAME.value: qname_bytes_to_str, +} + + +def answer_to_str(answer: Answer) -> str: + if answer.rr_type in _RR_TO_STR: + return f"{qname_to_str(answer.name)} -> {_RR_TO_STR[answer.rr_type](answer.rr_data)}" + + return f"{qname_to_str(answer.name)} -> {answer.rr_data}" diff --git a/gwhosts/dns/_exceptions.py b/gwhosts/dns/_exceptions.py index 70c695b..4639f99 100644 --- a/gwhosts/dns/_exceptions.py +++ b/gwhosts/dns/_exceptions.py @@ -4,3 +4,11 @@ class DNSException(Exception): class DNSParserError(DNSException): pass + + +class DNSParserUnpackError(DNSParserError): + pass + + +class DNSParserRecursionError(DNSParserError): + pass diff --git a/gwhosts/dns/_parsers.py b/gwhosts/dns/_parsers.py index be71ba7..1aaf20e 100644 --- a/gwhosts/dns/_parsers.py +++ b/gwhosts/dns/_parsers.py @@ -3,7 +3,7 @@ from ._exceptions import DNSParserRecursionError from ._struct import unpack -from ._types import Addition, Answer, Authority, DNSData, Header, QName, Question +from ._types import Addition, Answer, Authority, DNSData, Header, QName, Question, RRType def _parse_header(buffer: BinaryIO) -> Header: @@ -48,7 +48,7 @@ def _parse_question(buffer: BinaryIO) -> Question: return Question(name, rr_type, rr_class) -def _parse_resource(buffer: BinaryIO) -> Tuple[QName, int, int, int, int, bytes]: +def _parse_resource(buffer: BinaryIO) -> Tuple[QName, RRType, int, int, int, bytes]: name = _parse_qname(buffer) rr_type, rr_class, ttl, rr_data_length = unpack("!HHIH", buffer.read(10)) return name, rr_type, rr_class, ttl, rr_data_length, buffer.read(rr_data_length) diff --git a/gwhosts/dns/_serializers.py b/gwhosts/dns/_serializers.py index 7aa153a..3337e50 100644 --- a/gwhosts/dns/_serializers.py +++ b/gwhosts/dns/_serializers.py @@ -1,7 +1,7 @@ from dataclasses import astuple from struct import pack -from gwhosts.dns._types import Addition, Answer, Authority, DNSData, Header, QName, Question +from ._types import Addition, Answer, Authority, DNSData, Header, QName, Question def _encode_qname(qname: QName): diff --git a/gwhosts/dns/_tools.py b/gwhosts/dns/_tools.py deleted file mode 100644 index ee6612f..0000000 --- a/gwhosts/dns/_tools.py +++ /dev/null @@ -1,21 +0,0 @@ -from gwhosts.dns._types import DNSData, RRType, Header - - -def remove_ipv6(data: DNSData) -> DNSData: - answers = [answer for answer in data.answers if answer.rr_type != RRType.AAAA.value] - header = data.header - - return DNSData( - header=Header( - id=header.id, - flags=header.flags, - questions=header.questions, - answers=len(answers), - authorities=header.authorities, - additions=header.additions, - ), - questions=data.questions, - answers=answers, - authorities=data.authorities, - additions=data.additions, - ) diff --git a/gwhosts/dns/_types.py b/gwhosts/dns/_types.py index f5d09c8..547f745 100644 --- a/gwhosts/dns/_types.py +++ b/gwhosts/dns/_types.py @@ -36,12 +36,14 @@ class RRType(Enum): :param A: IPv4 Address [https://www.iana.org/go/rfc1035] :param AAAA: IPv6 Address [https://www.iana.org/go/rfc3596] :param CNAME: the canonical name for an alias [https://www.iana.org/go/rfc1035] + :param OPT: a pseudo-record type [https://www.iana.org/go/rfc6891] :see: https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#dns-parameters-4 """ A: int = 1 AAAA: int = 28 CNAME: int = 5 + OPT: int = 41 @dataclass @@ -61,6 +63,18 @@ def qr(self) -> bool: def aa(self) -> bool: return bool(self.flags & Flags.AA.value) + @property + def tc(self) -> bool: + return bool(self.flags & Flags.TC.value) + + @property + def rd(self) -> bool: + return bool(self.flags & Flags.RD.value) + + @property + def ra(self) -> bool: + return bool(self.flags & Flags.RA.value) + class QName(Tuple[bytes]): pass diff --git a/gwhosts/dns/parser.py b/gwhosts/dns/parser.py index 6b48b67..12e696b 100644 --- a/gwhosts/dns/parser.py +++ b/gwhosts/dns/parser.py @@ -1,6 +1,6 @@ from sys import stdin -from gwhosts.dns import parse +from . import parse if __name__ == "__main__": print(parse(stdin.buffer.read())) diff --git a/gwhosts/main.py b/gwhosts/main.py index e1c129f..70533a4 100644 --- a/gwhosts/main.py +++ b/gwhosts/main.py @@ -3,10 +3,10 @@ import sys from argparse import ArgumentParser -from gwhosts.dns import QName -from gwhosts.network import Address -from gwhosts.performance import no_gc -from gwhosts.proxy import DNSProxy +from .dns import QName +from .network import Address +from .performance import no_gc +from .proxy import DNSProxy if __name__ == "__main__": parser = ArgumentParser() @@ -19,9 +19,10 @@ "debug": logging.DEBUG, } - parser.add_argument("gateway", help="Gateway IP") + parser.add_argument("gateway", help="Gateway IPv4") parser.add_argument("hostsfile", help="Host List", nargs="?") parser.add_argument("--host", dest="host", help="Listening address", default="127.0.0.1") + parser.add_argument("--ipv6-gateway", dest="ipv6_gateway", help="Gateway IPv6", default=None) parser.add_argument("--port", dest="port", help="Listening port", default="8053", type=int) parser.add_argument("--dns-host", dest="dns_host", help="Remote DNS address", default="127.0.0.1") parser.add_argument("--dns-port", dest="dns_port", help="Remote DNS port", default="65053", type=int) @@ -55,6 +56,7 @@ proxy = DNSProxy( gateway=args.gateway, + ipv6_gateway=args.ipv6_gateway, to_addr=Address(args.dns_host, args.dns_port), hostnames=_hostnames, logger=logger, diff --git a/gwhosts/network/__init__.py b/gwhosts/network/__init__.py index ace35d9..f746ead 100644 --- a/gwhosts/network/__init__.py +++ b/gwhosts/network/__init__.py @@ -1,63 +1,23 @@ -from gwhosts.network._casts import ( - ipv4_bytes_to_int, - ipv4_int_to_bytes, - ipv4_bytes_to_str, - ipv4_str_to_bytes, - ipv4_str_to_int, - ipv4_int_to_str, - ipv6_bytes_to_int, - ipv6_int_to_bytes, - ipv6_bytes_to_str, - ipv6_str_to_bytes, - ipv6_str_to_int, - ipv6_int_to_str, - network_to_str, - str_to_network, - netmask_to_network_size, - network_size_to_netmask, -) -from gwhosts.network._types import ( +from ._types import ( Address, Datagram, ExpiringAddress, IPAddress, IPBinary, Network, + NetworkSize, UDPSocket, - NETSIZE_MAX, - NETMASK_MAX, - NETMASK_MIN, RT_CLASS_MAIN, ) -from gwhosts.network._utils import reduce_subnets __all__ = [ - "ipv4_bytes_to_int", - "ipv4_int_to_bytes", - "ipv4_bytes_to_str", - "ipv4_str_to_bytes", - "ipv4_str_to_int", - "ipv4_int_to_str", - "ipv6_bytes_to_int", - "ipv6_int_to_bytes", - "ipv6_bytes_to_str", - "ipv6_str_to_bytes", - "ipv6_str_to_int", - "ipv6_int_to_str", - "network_to_str", - "str_to_network", - "netmask_to_network_size", - "network_size_to_netmask", "Address", "Datagram", "ExpiringAddress", "IPAddress", "IPBinary", "Network", + "NetworkSize", "UDPSocket", - "NETSIZE_MAX", - "NETMASK_MAX", - "NETMASK_MIN", "RT_CLASS_MAIN", - "reduce_subnets", ] diff --git a/gwhosts/network/_casts.py b/gwhosts/network/_casts.py deleted file mode 100644 index fa94740..0000000 --- a/gwhosts/network/_casts.py +++ /dev/null @@ -1,76 +0,0 @@ -from socket import AF_INET, AF_INET6, inet_ntop, inet_pton -from struct import pack, unpack - -from gwhosts.network._types import IPAddress, IPBinary, Network, NetworkSize, NETMASK_MAX, NETSIZE_MAX - - -def ipv4_bytes_to_int(address: bytes) -> IPBinary: - return unpack("!L", address)[0] - - -def ipv4_int_to_bytes(number: IPBinary) -> bytes: - return pack("!L", number) - - -def ipv4_bytes_to_str(address: bytes) -> IPAddress: - return inet_ntop(AF_INET, address) - - -def ipv4_str_to_bytes(address: IPAddress) -> bytes: - return inet_pton(AF_INET, address) - - -def ipv4_str_to_int(address: IPAddress) -> IPBinary: - return ipv4_bytes_to_int(ipv4_str_to_bytes(address)) - - -def ipv4_int_to_str(number: IPBinary) -> IPAddress: - return ipv4_bytes_to_str(ipv4_int_to_bytes(number)) - - -def ipv6_bytes_to_int(address: bytes) -> IPBinary: - return int.from_bytes(address, byteorder="big") - - -def ipv6_int_to_bytes(number: IPBinary) -> bytes: - return number.to_bytes(16, byteorder="big") - - -def ipv6_bytes_to_str(address: bytes) -> IPAddress: - return inet_ntop(AF_INET6, address) - - -def ipv6_str_to_bytes(address: IPAddress) -> bytes: - return inet_pton(AF_INET6, address) - - -def ipv6_str_to_int(address: IPAddress) -> IPBinary: - return ipv6_bytes_to_int(ipv6_str_to_bytes(address)) - - -def ipv6_int_to_str(number: IPBinary) -> IPAddress: - return ipv6_bytes_to_str(ipv6_int_to_bytes(number)) - - -def network_to_str(network: Network) -> str: - return f"{ipv4_int_to_str(network.address)}/{netmask_to_network_size(network.mask)}" - - -def str_to_network(address: str) -> Network: - parts = address.split("/") - - if len(parts) == 1: - return Network(ipv4_str_to_int(address), NETMASK_MAX) - - if len(parts) == 2: - return Network(ipv4_str_to_int(parts[0]), network_size_to_netmask(NetworkSize(parts[1]))) - - raise ValueError(f"{address} is not a network address") - - -def netmask_to_network_size(netmask: IPBinary) -> NetworkSize: - return NETSIZE_MAX - (NETMASK_MAX ^ netmask).bit_length() - - -def network_size_to_netmask(size: NetworkSize) -> IPBinary: - return NETMASK_MAX >> size ^ NETMASK_MAX diff --git a/gwhosts/network/_types.py b/gwhosts/network/_types.py index ceefc18..521e4be 100644 --- a/gwhosts/network/_types.py +++ b/gwhosts/network/_types.py @@ -7,11 +7,6 @@ NetworkSize = int IPBinary = int - -NETSIZE_MAX = 32 -NETMASK_MAX: IPBinary = 0xFFFFFFFF -NETMASK_MIN: IPBinary = 0xFF000000 - # all normal routes are put there by default # https://www.kernel.org/doc/Documentation/networking/policy-routing.txt RT_CLASS_MAIN: RouteClass = 254 diff --git a/gwhosts/network/_utils.py b/gwhosts/network/_utils.py index fea457f..a9a3f64 100644 --- a/gwhosts/network/_utils.py +++ b/gwhosts/network/_utils.py @@ -1,9 +1,9 @@ from typing import Iterable, Iterator -from gwhosts.network._types import Network, NETMASK_MIN +from ._types import Network, NetworkSize -def reduce_subnets(addresses: Iterable[Network]) -> Iterator[Network]: +def _reduce_subnets(addresses: Iterable[Network], netmask_min: NetworkSize) -> Iterator[Network]: addresses = sorted(addresses) length = len(addresses) idx = 0 @@ -19,7 +19,7 @@ def reduce_subnets(addresses: Iterable[Network]) -> Iterator[Network]: while idx < length: _address = addresses[idx].address - while _netmask ^ NETMASK_MIN: + while _netmask ^ netmask_min: if _address & _netmask == _netaddr: netaddr, netmask = _netaddr, _netmask break diff --git a/gwhosts/network/ipv4/__init__.py b/gwhosts/network/ipv4/__init__.py new file mode 100644 index 0000000..cb645e3 --- /dev/null +++ b/gwhosts/network/ipv4/__init__.py @@ -0,0 +1,32 @@ +from ._casts import ( + ipv4_bytes_to_int, + ipv4_int_to_bytes, + ipv4_bytes_to_str, + ipv4_str_to_bytes, + ipv4_str_to_int, + ipv4_int_to_str, + ipv4_network_to_str, + ipv4_str_to_network, + ipv4_netmask_to_network_size, + ipv4_network_size_to_netmask, +) + +from ._types import IPV4_NETMASK_MAX, IPV4_NETMASK_MIN, IPV4_NETSIZE_MAX +from ._utils import ipv4_reduce_subnets + +__all__ = [ + "ipv4_bytes_to_int", + "ipv4_int_to_bytes", + "ipv4_bytes_to_str", + "ipv4_str_to_bytes", + "ipv4_str_to_int", + "ipv4_int_to_str", + "ipv4_network_to_str", + "ipv4_str_to_network", + "ipv4_netmask_to_network_size", + "ipv4_network_size_to_netmask", + "IPV4_NETMASK_MAX", + "IPV4_NETMASK_MIN", + "IPV4_NETSIZE_MAX", + "ipv4_reduce_subnets", +] diff --git a/gwhosts/network/ipv4/_casts.py b/gwhosts/network/ipv4/_casts.py new file mode 100644 index 0000000..2e38417 --- /dev/null +++ b/gwhosts/network/ipv4/_casts.py @@ -0,0 +1,53 @@ +from socket import AF_INET, inet_ntop, inet_pton +from struct import pack, unpack + +from ._types import IPV4_NETMASK_MAX, IPV4_NETSIZE_MAX +from .._types import IPAddress, IPBinary, Network, NetworkSize + + +def ipv4_bytes_to_int(address: bytes) -> IPBinary: + return unpack("!L", address)[0] + + +def ipv4_int_to_bytes(number: IPBinary) -> bytes: + return pack("!L", number) + + +def ipv4_bytes_to_str(address: bytes) -> IPAddress: + return inet_ntop(AF_INET, address) + + +def ipv4_str_to_bytes(address: IPAddress) -> bytes: + return inet_pton(AF_INET, address) + + +def ipv4_str_to_int(address: IPAddress) -> IPBinary: + return ipv4_bytes_to_int(ipv4_str_to_bytes(address)) + + +def ipv4_int_to_str(number: IPBinary) -> IPAddress: + return ipv4_bytes_to_str(ipv4_int_to_bytes(number)) + + +def ipv4_network_to_str(network: Network) -> str: + return f"{ipv4_int_to_str(network.address)}/{ipv4_netmask_to_network_size(network.mask)}" + + +def ipv4_str_to_network(address: str) -> Network: + parts = address.split("/") + + if len(parts) == 1: + return Network(ipv4_str_to_int(address), IPV4_NETMASK_MAX) + + if len(parts) == 2: + return Network(ipv4_str_to_int(parts[0]), ipv4_network_size_to_netmask(NetworkSize(parts[1]))) + + raise ValueError(f"{address} is not a network address") + + +def ipv4_netmask_to_network_size(netmask: IPBinary) -> NetworkSize: + return IPV4_NETSIZE_MAX - (IPV4_NETMASK_MAX ^ netmask).bit_length() + + +def ipv4_network_size_to_netmask(size: NetworkSize) -> IPBinary: + return IPV4_NETMASK_MAX >> size ^ IPV4_NETMASK_MAX diff --git a/gwhosts/network/ipv4/_types.py b/gwhosts/network/ipv4/_types.py new file mode 100644 index 0000000..98a2b81 --- /dev/null +++ b/gwhosts/network/ipv4/_types.py @@ -0,0 +1,32 @@ +from typing import NamedTuple +from .._types import Port + + +IPv4String = str +IPv4NetworkSize = int +IPv4Binary = int + + +IPV4_NETSIZE_MAX: IPv4NetworkSize = 32 +IPV4_NETMASK_MAX: IPv4Binary = 0xFFFFFFFF +IPV4_NETMASK_MIN: IPv4Binary = 0xFF000000 + + +class IPv4Address(NamedTuple): + host: IPv4String + port: Port + + +class IPv4ExpiringAddress(NamedTuple): + address: IPv4Address + time: float + + +class IPv4Datagram(NamedTuple): + data: bytes + target: IPv4Address + + +class IPv4Network(NamedTuple): + address: IPv4Binary + mask: IPv4Binary diff --git a/gwhosts/network/ipv4/_utils.py b/gwhosts/network/ipv4/_utils.py new file mode 100644 index 0000000..ea74c82 --- /dev/null +++ b/gwhosts/network/ipv4/_utils.py @@ -0,0 +1,9 @@ +from typing import Iterable, Iterator + +from ._types import IPV4_NETMASK_MIN +from .._types import Network +from .._utils import _reduce_subnets + + +def ipv4_reduce_subnets(addresses: Iterable[Network]) -> Iterator[Network]: + return _reduce_subnets(addresses, IPV4_NETMASK_MIN) diff --git a/gwhosts/network/ipv6/__init__.py b/gwhosts/network/ipv6/__init__.py new file mode 100644 index 0000000..38e5542 --- /dev/null +++ b/gwhosts/network/ipv6/__init__.py @@ -0,0 +1,31 @@ +from ._casts import ( + ipv6_bytes_to_int, + ipv6_int_to_bytes, + ipv6_bytes_to_str, + ipv6_str_to_bytes, + ipv6_str_to_int, + ipv6_int_to_str, + ipv6_network_to_str, + ipv6_str_to_network, + ipv6_netmask_to_network_size, + ipv6_network_size_to_netmask, +) +from ._types import IPV6_NETMASK_MAX, IPV6_NETMASK_MIN, IPV6_NETSIZE_MAX +from ._utils import ipv6_reduce_subnets + +__all__ = [ + "ipv6_bytes_to_int", + "ipv6_int_to_bytes", + "ipv6_bytes_to_str", + "ipv6_str_to_bytes", + "ipv6_str_to_int", + "ipv6_int_to_str", + "ipv6_network_to_str", + "ipv6_str_to_network", + "ipv6_netmask_to_network_size", + "ipv6_network_size_to_netmask", + "IPV6_NETMASK_MAX", + "IPV6_NETMASK_MIN", + "IPV6_NETSIZE_MAX", + "ipv6_reduce_subnets", +] diff --git a/gwhosts/network/ipv6/_casts.py b/gwhosts/network/ipv6/_casts.py new file mode 100644 index 0000000..67819e8 --- /dev/null +++ b/gwhosts/network/ipv6/_casts.py @@ -0,0 +1,52 @@ +from socket import AF_INET6, inet_ntop, inet_pton + +from ._types import IPV6_NETMASK_MAX, IPV6_NETSIZE_MAX +from .._types import IPAddress, IPBinary, Network, NetworkSize + + +def ipv6_bytes_to_int(address: bytes) -> IPBinary: + return int.from_bytes(address, byteorder="big") + + +def ipv6_int_to_bytes(number: IPBinary) -> bytes: + return number.to_bytes(16, byteorder="big") + + +def ipv6_bytes_to_str(address: bytes) -> IPAddress: + return inet_ntop(AF_INET6, address) + + +def ipv6_str_to_bytes(address: IPAddress) -> bytes: + return inet_pton(AF_INET6, address) + + +def ipv6_str_to_int(address: IPAddress) -> IPBinary: + return ipv6_bytes_to_int(ipv6_str_to_bytes(address)) + + +def ipv6_int_to_str(number: IPBinary) -> IPAddress: + return ipv6_bytes_to_str(ipv6_int_to_bytes(number)) + + +def ipv6_network_to_str(network: Network) -> str: + return f"{ipv6_int_to_str(network.address)}/{ipv6_netmask_to_network_size(network.mask)}" + + +def ipv6_str_to_network(address: str) -> Network: + parts = address.split("/") + + if len(parts) == 1: + return Network(ipv6_str_to_int(address), IPV6_NETMASK_MAX) + + if len(parts) == 2: + return Network(ipv6_str_to_int(parts[0]), ipv6_network_size_to_netmask(NetworkSize(parts[1]))) + + raise ValueError(f"{address} is not a network address") + + +def ipv6_netmask_to_network_size(netmask: IPBinary) -> NetworkSize: + return IPV6_NETSIZE_MAX - (IPV6_NETMASK_MAX ^ netmask).bit_length() + + +def ipv6_network_size_to_netmask(size: NetworkSize) -> IPBinary: + return IPV6_NETMASK_MAX >> size ^ IPV6_NETMASK_MAX diff --git a/gwhosts/network/ipv6/_types.py b/gwhosts/network/ipv6/_types.py new file mode 100644 index 0000000..bb1fa7f --- /dev/null +++ b/gwhosts/network/ipv6/_types.py @@ -0,0 +1,32 @@ +from typing import NamedTuple +from .._types import Port + + +IPv6String = str +IPv6NetworkSize = int +IPv6Binary = int + + +IPV6_NETSIZE_MAX: IPv6NetworkSize = 128 +IPV6_NETMASK_MAX: IPv6Binary = 0xFFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF +IPV6_NETMASK_MIN: IPv6Binary = 0xFFFF_FFFF_0000_0000_0000_0000_0000_0000 + + +class IPv6Address(NamedTuple): + host: IPv6String + port: Port + + +class IPv6ExpiringAddress(NamedTuple): + address: IPv6Address + time: float + + +class IPv6Datagram(NamedTuple): + data: bytes + target: IPv6Address + + +class IPv6Network(NamedTuple): + address: IPv6Binary + mask: IPv6Binary diff --git a/gwhosts/network/ipv6/_utils.py b/gwhosts/network/ipv6/_utils.py new file mode 100644 index 0000000..f67959d --- /dev/null +++ b/gwhosts/network/ipv6/_utils.py @@ -0,0 +1,9 @@ +from typing import Iterable, Iterator + +from ._types import IPV6_NETMASK_MIN +from .._types import Network +from .._utils import _reduce_subnets + + +def ipv6_reduce_subnets(addresses: Iterable[Network]) -> Iterator[Network]: + return _reduce_subnets(addresses, IPV6_NETMASK_MIN) diff --git a/gwhosts/performance/__init__.py b/gwhosts/performance/__init__.py index 2ca8916..48707d6 100644 --- a/gwhosts/performance/__init__.py +++ b/gwhosts/performance/__init__.py @@ -1,4 +1,4 @@ -from gwhosts.performance._gc import no_gc +from ._gc import no_gc __all__ = [ "no_gc", diff --git a/gwhosts/proxy/__init__.py b/gwhosts/proxy/__init__.py index 37d3b51..aa458e8 100644 --- a/gwhosts/proxy/__init__.py +++ b/gwhosts/proxy/__init__.py @@ -1,4 +1,4 @@ -from gwhosts.proxy._proxy import DNSProxy -from gwhosts.proxy._types import RTMEvent +from ._proxy import DNSProxy +from ._types import RTMEvent __all__ = ["DNSProxy", "RTMEvent"] diff --git a/gwhosts/proxy/_proxy.py b/gwhosts/proxy/_proxy.py index 42e4073..8f84d5f 100644 --- a/gwhosts/proxy/_proxy.py +++ b/gwhosts/proxy/_proxy.py @@ -5,30 +5,40 @@ from functools import lru_cache from logging import Logger from select import select -from socket import socket +from socket import socket, AF_INET, AF_INET6 from tempfile import mktemp from time import time -from typing import Callable, Dict, Iterator, List, Set, Tuple +from typing import Callable, Dict, Iterator, List, Set, Tuple, Optional -from gwhosts.dns import QName, DNSParserError, RRType, parse, remove_ipv6, serialize -from gwhosts.network import ( - NETMASK_MAX, +from ._types import DNSDataMessage, RTMEvent +from ..dns import QName, DNSParserError, RRType, parse, serialize, qname_to_str, answer_to_str +from ..network import ( Address, Datagram, ExpiringAddress, IPAddress, IPBinary, Network, + NetworkSize, UDPSocket, +) +from ..network.ipv4 import ( + IPV4_NETMASK_MAX, ipv4_bytes_to_int, - ipv4_int_to_str, ipv4_str_to_int, - network_size_to_netmask, - network_to_str, - reduce_subnets, + ipv4_network_size_to_netmask, + ipv4_network_to_str, + ipv4_reduce_subnets, +) +from ..network.ipv6 import ( + IPV6_NETMASK_MAX, + ipv6_bytes_to_int, + ipv6_str_to_int, + ipv6_network_size_to_netmask, + ipv6_network_to_str, + ipv6_reduce_subnets, ) -from gwhosts.proxy._types import DNSDataMessage, RTMEvent -from gwhosts.routes import Netlink +from ..routes import Netlink class DNSProxy: @@ -37,11 +47,13 @@ def __init__( gateway: IPAddress, hostnames: Set[QName], logger: Logger, + ipv6_gateway: Optional[IPAddress] = None, to_addr: Address = Address("127.0.0.1", 8053), buff_size: int = 1024, timeout_in_seconds: int = 5, ) -> None: - self._gateway = gateway + self._ipv4_gateway = gateway + self._ipv6_gateway = ipv6_gateway self._to_addr = to_addr self._buff_size = buff_size self._timeout_in_seconds = timeout_in_seconds @@ -52,26 +64,35 @@ def __init__( self._regular_pool: Dict[UDPSocket, ExpiringAddress] = {} self._routed_pool: Dict[UDPSocket, ExpiringAddress] = {} self._queries_queue: deque = deque() - self._addresses: Set[IPAddress] = set() - self._subnets: Set[Network] = set() - self._netlink_event_handlers: Dict[RTMEvent, Callable] = { - RTMEvent.NEW_ROUTE.value: self._process_rtm_new_route, - RTMEvent.DEL_ROUTE.value: self._process_rtm_del_route, + self._ipv4_addresses: Set[IPAddress] = set() + self._ipv4_subnets: Set[Network] = set() + self._ipv6_addresses: Set[IPAddress] = set() + self._ipv6_subnets: Set[Network] = set() + self._netlink_event_handlers: Dict[RTMEvent, Dict[Tuple[int, str], Callable]] = { + RTMEvent.NEW_ROUTE: { + (AF_INET, self._ipv4_gateway): self._ipv4_process_rtm_new_route, + (AF_INET6, self._ipv6_gateway): self._ipv6_process_rtm_new_route, + }, + RTMEvent.DEL_ROUTE: { + (AF_INET, self._ipv4_gateway): self._ipv4_process_rtm_del_route, + (AF_INET6, self._ipv6_gateway): self._ipv6_process_rtm_del_route, + }, + } + self._netlink_to_network = { + AF_INET: self._ipv4_netlink_to_network, + AF_INET6: self._ipv6_netlink_to_network, } - self._update_routes_queue: List[Tuple[Dict[Network, bool], List[Datagram]]] = [] @property def _open_files_count(self) -> int: - """ :return: The number of open file descriptors - """ + """:return: The number of open file descriptors""" open_files = os.listdir("/proc/self/fd") return len(open_files) @property def _max_open_files_count(self) -> int: - """ :return: Current soft limit on the number of open file descriptors - """ + """:return: Current soft limit on the number of open file descriptors""" soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) return soft @@ -89,10 +110,6 @@ def _get_socket(self) -> UDPSocket: return _socket - @property - def subnets(self) -> Set[Network]: - return self._subnets - def _hostname_exists(self, hostname: QName) -> bool: for level in range(len(hostname)): if hostname[level:] in self._hostnames: @@ -101,43 +118,61 @@ def _hostname_exists(self, hostname: QName) -> bool: return False + @property + def ipv4_subnets(self) -> Set[Network]: + return self._ipv4_subnets + @lru_cache(maxsize=4094) - def _ip_in_subnets(self, address: IPBinary) -> bool: - return any(address & subnet.mask == subnet.address for subnet in self.subnets) + def _ipv4_in_subnets(self, address: IPBinary) -> bool: + return any(address & subnet.mask == subnet.address for subnet in self.ipv4_subnets) - def _update_subnets(self, addresses: Set[Network]) -> dict[Network, bool]: - subnets = set(reduce_subnets(addresses.union(self.subnets))) - updates = self._subnets.symmetric_difference(subnets) + def _ipv4_update_subnets(self, addresses: Set[Network]) -> Dict[Network, bool]: + subnets = set(ipv4_reduce_subnets(addresses.union(self.ipv4_subnets))) + updates = self._ipv4_subnets.symmetric_difference(subnets) return {subnet: subnet in subnets for subnet in updates} - def _update_routes(self, queue: List[DNSDataMessage]) -> dict[Network, bool]: - addresses = set() + @property + def ipv6_subnets(self) -> Set[Network]: + return self._ipv6_subnets + + @lru_cache(maxsize=4094) + def _ipv6_in_subnets(self, address: IPBinary) -> bool: + return any(address & subnet.mask == subnet.address for subnet in self.ipv6_subnets) + + def _ipv6_update_subnets(self, addresses: Set[Network]) -> Dict[Network, bool]: + subnets = set(ipv6_reduce_subnets(addresses.union(self.ipv6_subnets))) + updates = self._ipv6_subnets.symmetric_difference(subnets) + + return {subnet: subnet in subnets for subnet in updates} + + def _update_routes(self, queue: List[DNSDataMessage]) -> Tuple[Dict[Network, bool], Dict[Network, bool]]: + ipv4_addresses = set() + ipv6_addresses = set() for response, addr in queue: for answer in response.answers: if answer.rr_type == RRType.A.value: address = ipv4_bytes_to_int(answer.rr_data) - self._logger.info(f"DNS: {b'.'.join(answer.name).decode('utf8')} -> {ipv4_int_to_str(address)}") - - else: - continue + if not self._ipv4_in_subnets(address): + ipv4_addresses.add(Network(address, IPV4_NETMASK_MAX)) - if self._ip_in_subnets(address): - continue + elif answer.rr_type == RRType.AAAA.value: + address = ipv6_bytes_to_int(answer.rr_data) - addresses.add(Network(address, NETMASK_MAX)) + if not self._ipv6_in_subnets(address): + ipv6_addresses.add(Network(address, IPV6_NETMASK_MAX)) - if addresses: - return self._update_subnets(addresses) - - return {} + return ( + self._ipv4_update_subnets(ipv4_addresses) if ipv4_addresses else {}, + self._ipv6_update_subnets(ipv6_addresses) if ipv6_addresses else {}, + ) def _process_queued_queries(self) -> int: - """ Process queued queries and return the number of remaining ones + """Process queued queries and return the number of remaining ones - :return: Number of remaining queries + :return: Number of remaining queries """ queue_size = len(self._queries_queue) available_file_descriptors_count = self._max_open_files_count - self._open_files_count @@ -157,7 +192,7 @@ def _route_request(self, datagram: Datagram) -> None: self._logger.error("Failed to parse DNS query") try: - with gzip.open(mktemp(prefix="dns.query.", suffix=".gz"), 'w') as dump: + with gzip.open(mktemp(prefix="dns.query.", suffix=".gz"), "w") as dump: dump.write(data) except Exception as dump_error: @@ -178,11 +213,14 @@ def _route_request(self, datagram: Datagram) -> None: self._routed_pool[remote] = ExpiringAddress(addr, time()) for hostname in domains: - self._logger.info(f"DNS: [{query.header.id}] -> {b'.'.join(hostname).decode('utf8')}") + self._logger.info(f"DNS: Q[{query.header.id}] -> {qname_to_str(hostname)} (P)") else: self._regular_pool[remote] = ExpiringAddress(addr, time()) + for hostname in domains: + self._logger.info(f"DNS: Q[{query.header.id}] -> {qname_to_str(hostname)}") + @staticmethod def _sanitize_free_pool(pool: List[socket]) -> None: while pool: @@ -221,7 +259,7 @@ def _parse_responses(self, responses: List[Datagram]) -> Iterator[DNSDataMessage self._logger.error("Failed to parse DNS response") try: - with gzip.open(mktemp(prefix="dns.response.", suffix=".gz"), 'w') as dump: + with gzip.open(mktemp(prefix="dns.response.", suffix=".gz"), "w") as dump: dump.write(data) except Exception as dump_error: @@ -232,33 +270,67 @@ def _parse_responses(self, responses: List[Datagram]) -> Iterator[DNSDataMessage self._logger.error(f"To reproduce, run: zcat {dump.name} | python -m gwhosts.dns.parser") else: + for answer in response.answers: + self._logger.info(f"DNS: R[{response.header.id}] {answer_to_str(answer)}") + yield DNSDataMessage(response, addr) @staticmethod def _prepare_routed_responses(data_messages: List[DNSDataMessage]) -> Iterator[Datagram]: - return (Datagram(serialize(remove_ipv6(data)), addr) for data, addr in data_messages) + return (Datagram(serialize(data), addr) for data, addr in data_messages) @staticmethod def _send_responses(queue: List[Datagram], udp: UDPSocket) -> None: for data, addr in queue: udp.sendto(data, addr) - def _process_rtm_new_route(self, network: Network) -> None: - """New route is added""" - self._subnets.add(network) - self._logger.info(f"DNS: network added {network_to_str(network)}") + @staticmethod + def _ipv4_netlink_to_network(address: IPAddress, length: NetworkSize) -> Network: + return Network( + address=ipv4_str_to_int(address), + mask=ipv4_network_size_to_netmask(length), + ) + + def _ipv4_process_rtm_new_route(self, network: Network) -> None: + """New IPv4 route is added""" + self._ipv4_subnets.add(network) + self._logger.info(f"DNS: network added {ipv4_network_to_str(network)}") + + def _ipv4_process_rtm_del_route(self, network: Network) -> None: + """An existing IPv4 route is deleted""" + try: + self._ipv4_subnets.remove(network) + + except KeyError as e: + self._logger.exception(e) + self._logger.info(f"DNS: network does not exists {ipv4_network_to_str(network)}") - def _process_rtm_del_route(self, network: Network) -> None: - """An existing route is deleted""" + else: + self._logger.info(f"DNS: network deleted {ipv4_network_to_str(network)}") + + @staticmethod + def _ipv6_netlink_to_network(address: IPAddress, length: NetworkSize) -> Network: + return Network( + address=ipv6_str_to_int(address), + mask=ipv6_network_size_to_netmask(length), + ) + + def _ipv6_process_rtm_new_route(self, network: Network) -> None: + """New IPv6 route is added""" + self._ipv6_subnets.add(network) + self._logger.info(f"DNS: network added {ipv6_network_to_str(network)}") + + def _ipv6_process_rtm_del_route(self, network: Network) -> None: + """An IPv6 existing route is deleted""" try: - self._subnets.remove(network) + self._ipv6_subnets.remove(network) except KeyError as e: self._logger.exception(e) - self._logger.info(f"DNS: network does not exists {network_to_str(network)}") + self._logger.info(f"DNS: network does not exists {ipv6_network_to_str(network)}") else: - self._logger.info(f"DNS: network deleted {network_to_str(network)}") + self._logger.info(f"DNS: network deleted {ipv6_network_to_str(network)}") def _process_netlink_message(self, message: dict) -> None: event = message["event"] @@ -267,13 +339,18 @@ def _process_netlink_message(self, message: dict) -> None: attrs = dict(message["attrs"]) if "RTA_GATEWAY" in attrs: - if attrs["RTA_GATEWAY"] == self._gateway: - network = Network( - address=ipv4_str_to_int(attrs["RTA_DST"]), - mask=network_size_to_netmask(message["dst_len"]), - ) + event_handlers = self._netlink_event_handlers[event] + family = message["family"] + + if family in self._netlink_to_network: + gateway = attrs["RTA_GATEWAY"] - self._netlink_event_handlers[event](network) + if (family, gateway) in event_handlers: + network = self._netlink_to_network[family]( + address=attrs["RTA_DST"], + length=message["dst_len"], + ) + event_handlers[(family, gateway)](network) def listen(self, addr: Address) -> None: with Netlink() as netlink: @@ -281,7 +358,9 @@ def listen(self, addr: Address) -> None: self._input_pool.append(netlink) self._logger.info("DNS: Getting routed subnets...") - netlink.get_ipv4_routes() + + netlink.ipv4_get_routes() + netlink.ipv6_get_routes() with UDPSocket() as udp: udp.bind(addr) @@ -329,15 +408,21 @@ def listen(self, addr: Address) -> None: if routed_responses: dns_data_messages = list(self._parse_responses(routed_responses)) - updates = self._update_routes(dns_data_messages) + ipv4_updates, ipv6_updates = self._update_routes(dns_data_messages) ready_responses.extend(self._prepare_routed_responses(dns_data_messages)) - for network, exist in updates.items(): + for network, exist in ipv4_updates.items(): + if exist: + netlink.ipv4_add_route(network, self._ipv4_gateway) + else: + netlink.ipv4_del_route(network, self._ipv4_gateway) + + for network, exist in ipv6_updates.items(): if exist: - netlink.add_ipv4_route(network, self._gateway) + netlink.ipv6_add_route(network, self._ipv6_gateway) else: - netlink.del_ipv4_route(network, self._gateway) + netlink.ipv6_del_route(network, self._ipv6_gateway) if ready_responses: self._send_responses(ready_responses, udp) diff --git a/gwhosts/proxy/_types.py b/gwhosts/proxy/_types.py index a98255a..ae7c9b0 100644 --- a/gwhosts/proxy/_types.py +++ b/gwhosts/proxy/_types.py @@ -1,8 +1,9 @@ from enum import Enum +from socket import AF_INET, AF_INET6 from typing import NamedTuple -from gwhosts.dns import DNSData -from gwhosts.network import Address +from ..dns import DNSData +from ..network import Address class RTMEvent(Enum): @@ -11,6 +12,11 @@ class RTMEvent(Enum): GET_ROUTE: str = "RTM_GETROUTE" +class AddressFamily(Enum): + AF_INET: int = AF_INET + AF_INET6: int = AF_INET6 + + class DNSDataMessage(NamedTuple): data: DNSData address: Address diff --git a/gwhosts/py.typed b/gwhosts/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/gwhosts/routes/__init__.py b/gwhosts/routes/__init__.py index 375484d..2291453 100644 --- a/gwhosts/routes/__init__.py +++ b/gwhosts/routes/__init__.py @@ -1,3 +1,3 @@ -from gwhosts.routes._netlink import Netlink +from ._netlink import Netlink __all__ = ["Netlink"] diff --git a/gwhosts/routes/_netlink.py b/gwhosts/routes/_netlink.py index 525f1f9..6274eaf 100644 --- a/gwhosts/routes/_netlink.py +++ b/gwhosts/routes/_netlink.py @@ -1,10 +1,6 @@ +from socket import AF_INET, AF_INET6 + from pyroute2 import IPRoute -from socket import AF_INET -from gwhosts.network import ( - ipv4_int_to_str, - netmask_to_network_size, -) -from gwhosts.network import IPAddress, Network from pyroute2.netlink import ( NLM_F_ACK, NLM_F_APPEND, @@ -23,13 +19,19 @@ RTM_DELROUTE, RTM_GETROUTE, RTM_NEWROUTE, - rt_proto, - rt_type, ) - from pyroute2.netlink.rtnl.rtmsg import rtmsg -from pyroute2.iproute.linux import DEFAULT_TABLE +from ._rtmsg import _msg_get_routes, _msg_route +from ..network import IPAddress, Network +from ..network.ipv4 import ( + ipv4_int_to_str, + ipv4_netmask_to_network_size, +) +from ..network.ipv6 import ( + ipv6_int_to_str, + ipv6_netmask_to_network_size, +) __all__ = [ "NLM_F_ACK", @@ -46,36 +48,31 @@ "NETLINK_ROUTE", ] +def _ipv4_msg_get_routes() -> rtmsg: + return _msg_get_routes(AF_INET) + + +def _ipv4_msg_route(netaddr: str, netsize: int, gateway: str) -> rtmsg: + return _msg_route(netaddr, netsize, gateway, AF_INET) -def _msg_get_ipv4_routes() -> rtmsg: - msg = rtmsg() - msg["family"] = AF_INET - msg["table"] = DEFAULT_TABLE - return msg +def _ipv6_msg_get_routes() -> rtmsg: + return _msg_get_routes(AF_INET6) -def _msg_ipv4_route(netaddr: str, netsize: int, gateway: str) -> rtmsg: - msg = rtmsg() - msg["family"] = AF_INET - msg["table"] = DEFAULT_TABLE - msg["proto"] = rt_proto["static"] - msg["type"] = rt_type["unicast"] - msg["dst_len"] = netsize - msg["attrs"] = [ - ("RTA_TABLE", DEFAULT_TABLE), - ("RTA_DST", netaddr), - ("RTA_GATEWAY", gateway), - ] - return msg +def _ipv6_msg_route(netaddr: str, netsize: int, gateway: str) -> rtmsg: + return _msg_route(netaddr, netsize, gateway, AF_INET6) class Netlink(IPRoute): def __init__(self, *args, family=NETLINK_ROUTE, **kwargs): super().__init__(*args, family=family, **kwargs) - def get_ipv4_routes(self) -> None: + def ipv4_get_routes(self) -> None: """ Get all ipv4 routes + Shell command example: + ip r + Event message example: {'family': 2, 'dst_len': 32, 'src_len': 0, 'tos': 0, 'table': 254, 'proto': 4, 'scope': 0, 'type': 1, \ 'flags': 0, 'attrs': [('RTA_TABLE', 254), ('RTA_DST', '192.168.2.123'), ('RTA_GATEWAY', '192.168.2.1'), \ @@ -83,16 +80,19 @@ def get_ipv4_routes(self) -> None: 'error': None, 'stats': Stats(qsize=0, delta=0, delay=0)}, 'event': 'RTM_NEWROUTE'} """ self.put( - msg=_msg_get_ipv4_routes(), + msg=_ipv4_msg_get_routes(), msg_type=RTM_GETROUTE, msg_flags=NLM_F_REQUEST | NLM_F_ACK | NLM_F_DUMP, ) - def add_ipv4_route(self, network: Network, gateway: IPAddress) -> None: + def ipv4_add_route(self, network: Network, gateway: IPAddress) -> None: """ Add an ipv4 route with the specified parameters :param network: Subnet :param gateway: Gateway + Shell command example: + ip r add 192.168.2.123/32 via 192.168.2.1 + Event message example: {'family': 2, 'dst_len': 32, 'src_len': 0, 'tos': 0, 'table': 254, 'proto': 4, 'scope': 0, 'type': 1, \ 'flags': 0, 'attrs': [('RTA_TABLE', 254), ('RTA_DST', '192.168.2.123'), ('RTA_GATEWAY', '192.168.2.1'), \ @@ -100,20 +100,23 @@ def add_ipv4_route(self, network: Network, gateway: IPAddress) -> None: 'error': None, 'stats': Stats(qsize=0, delta=0, delay=0)}, 'event': 'RTM_NEWROUTE'} """ self.put( - msg=_msg_ipv4_route( + msg=_ipv4_msg_route( netaddr=ipv4_int_to_str(network.address), - netsize=netmask_to_network_size(network.mask), + netsize=ipv4_netmask_to_network_size(network.mask), gateway=gateway, ), msg_type=RTM_NEWROUTE, msg_flags=NLM_F_REQUEST | NLM_F_ACK | NLM_F_CREATE | NLM_F_REPLACE, ) - def del_ipv4_route(self, network: Network, gateway: IPAddress) -> None: + def ipv4_del_route(self, network: Network, gateway: IPAddress) -> None: """ Delete an ipv4 route that matches the specified parameters :param network: Subnet :param gateway: Gateway + Shell command example: + ip r del 192.168.2.123/32 via 192.168.2.1 + Event message example: {'family': 2, 'dst_len': 32, 'src_len': 0, 'tos': 0, 'table': 254, 'proto': 4, 'scope': 0, 'type': 1, \ 'flags': 0, 'attrs': [('RTA_TABLE', 254), ('RTA_DST', '192.168.2.123'), ('RTA_GATEWAY', '192.168.2.1'), \ @@ -121,9 +124,81 @@ def del_ipv4_route(self, network: Network, gateway: IPAddress) -> None: 'error': None, 'stats': Stats(qsize=0, delta=0, delay=0)}, 'event': 'RTM_DELROUTE'} """ self.put( - msg=_msg_ipv4_route( + msg=_ipv4_msg_route( netaddr=ipv4_int_to_str(network.address), - netsize=netmask_to_network_size(network.mask), + netsize=ipv4_netmask_to_network_size(network.mask), + gateway=gateway, + ), + msg_type=RTM_DELROUTE, + msg_flags=NLM_F_REQUEST | NLM_F_ACK, + ) + + def ipv6_get_routes(self) -> None: + """ Get all ipv6 routes + + Shell command example: + ip -6 r + + Event message example: + {'family': 10, 'dst_len': 56, 'src_len': 0, 'tos': 0, 'table': 254, 'proto': 3, 'scope': 0, 'type': 1, \ + 'flags': 0, 'attrs': [('RTA_TABLE', 254), ('RTA_DST', '2a00:1450:4005:800::'), ('RTA_PRIORITY', 1024), \ + ('RTA_GATEWAY', 'fced:9999::1'), ('RTA_OIF', 91), ('RTA_CACHEINFO', {'rta_clntref': 0, 'rta_lastuse': 0, \ + 'rta_expires': 0, 'rta_error': 0, 'rta_used': 0, 'rta_id': 0, 'rta_ts': 0, 'rta_tsage': 0}), ('RTA_PREF', 0)], \ + 'header': {'length': 136, 'type': 24, 'flags': 1536, 'sequence_number': 1726729418, 'pid': 10120, \ + 'error': None, 'target': 'localhost', 'stats': Stats(qsize=0, delta=0, delay=0)}, 'event': 'RTM_NEWROUTE'} + """ + self.put( + msg=_ipv6_msg_get_routes(), + msg_type=RTM_GETROUTE, + msg_flags=NLM_F_REQUEST | NLM_F_ACK | NLM_F_DUMP, + ) + + def ipv6_add_route(self, network: Network, gateway: IPAddress) -> None: + """ Add an ipv6 route with the specified parameters + :param network: Subnet + :param gateway: Gateway + + Shell command example: + ip -6 r add 2a00:1450:4005:800::/56 via fced:9999::1 + + Event message example: + {'family': 10, 'dst_len': 56, 'src_len': 0, 'tos': 0, 'table': 254, 'proto': 3, 'scope': 0, 'type': 1, \ + 'flags': 0, 'attrs': [('RTA_TABLE', 254), ('RTA_DST', '2a00:1450:4005:800::'), ('RTA_PRIORITY', 1024), \ + ('RTA_GATEWAY', 'fced:9999::1'), ('RTA_OIF', 91), ('RTA_CACHEINFO', {'rta_clntref': 0, 'rta_lastuse': 0, \ + 'rta_expires': 0, 'rta_error': 0, 'rta_used': 0, 'rta_id': 0, 'rta_ts': 0, 'rta_tsage': 0}), ('RTA_PREF', 0)], \ + 'header': {'length': 136, 'type': 24, 'flags': 1536, 'sequence_number': 1726729418, 'pid': 10120, \ + 'error': None, 'target': 'localhost', 'stats': Stats(qsize=0, delta=0, delay=0)}, 'event': 'RTM_NEWROUTE'} + """ + self.put( + msg=_ipv6_msg_route( + netaddr=ipv6_int_to_str(network.address), + netsize=ipv6_netmask_to_network_size(network.mask), + gateway=gateway, + ), + msg_type=RTM_NEWROUTE, + msg_flags=NLM_F_REQUEST | NLM_F_ACK | NLM_F_CREATE | NLM_F_REPLACE, + ) + + def ipv6_del_route(self, network: Network, gateway: IPAddress) -> None: + """ Delete an ipv6 route that matches the specified parameters + :param network: Subnet + :param gateway: Gateway + + Shell command example: + ip -6 r del 2a00:1450:4005:800::/56 via fced:9999::1 + + Event message example: + {'family': 10, 'dst_len': 56, 'src_len': 0, 'tos': 0, 'table': 254, 'proto': 3, 'scope': 0, 'type': 1, \ + 'flags': 0, 'attrs': [('RTA_TABLE', 254), ('RTA_DST', '2a00:1450:4005:800::'), ('RTA_PRIORITY', 1024), \ + ('RTA_GATEWAY', 'fced:9999::1'), ('RTA_OIF', 91), ('RTA_CACHEINFO', {'rta_clntref': 0, 'rta_lastuse': 0, \ + 'rta_expires': 0, 'rta_error': 0, 'rta_used': 0, 'rta_id': 0, 'rta_ts': 0, 'rta_tsage': 0}), ('RTA_PREF', 0)], \ + 'header': {'length': 136, 'type': 25, 'flags': 0, 'sequence_number': 1726729519, 'pid': 15041, 'error': None, \ + 'target': 'localhost', 'stats': Stats(qsize=0, delta=0, delay=0)}, 'event': 'RTM_DELROUTE'} + """ + self.put( + msg=_ipv6_msg_route( + netaddr=ipv6_int_to_str(network.address), + netsize=ipv6_netmask_to_network_size(network.mask), gateway=gateway, ), msg_type=RTM_DELROUTE, diff --git a/gwhosts/routes/_rtmsg.py b/gwhosts/routes/_rtmsg.py new file mode 100644 index 0000000..efec5be --- /dev/null +++ b/gwhosts/routes/_rtmsg.py @@ -0,0 +1,28 @@ +from pyroute2.netlink.rtnl.rtmsg import rtmsg +from pyroute2.iproute.linux import DEFAULT_TABLE +from pyroute2.netlink.rtnl import ( + rt_proto, + rt_type, +) + + +def _msg_get_routes(family: int) -> rtmsg: + msg = rtmsg() + msg["family"] = family + msg["table"] = DEFAULT_TABLE + return msg + + +def _msg_route(netaddr: str, netsize: int, gateway: str, family: int) -> rtmsg: + msg = rtmsg() + msg["family"] = family + msg["table"] = DEFAULT_TABLE + msg["proto"] = rt_proto["static"] + msg["type"] = rt_type["unicast"] + msg["dst_len"] = netsize + msg["attrs"] = [ + ("RTA_TABLE", DEFAULT_TABLE), + ("RTA_DST", netaddr), + ("RTA_GATEWAY", gateway), + ] + return msg diff --git a/pyproject.toml b/pyproject.toml index 8cbdf46..9f2db84 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["setuptools~=68.0"] +requires = ["setuptools~=75.0"] build-backend = "setuptools.build_meta" [project] @@ -7,26 +7,61 @@ name = "gwhosts-proxy" authors = [{name="SharUpOff", email = "sharupoff@efstudios.org"}] description = "DNS proxy/router for a specified list of hostnames" readme = "README.md" +license = {file = "LICENSE.md"} requires-python = ">=3.8, <3.13" dependencies = ["pyroute2~=0.7.0"] version = "1.0.0" +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Information Technology", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "License :: OSI Approved :: MIT License", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", + "Topic :: System :: Networking", + "Topic :: System :: Networking :: Firewalls", + "Topic :: Internet :: Name Service (DNS)", + "Topic :: Internet :: Proxy Servers", + "Framework :: tox", +] [project.optional-dependencies] -test = ["pytest~=7.0", "pytest-cov~=3.0"] +test = ["pytest~=8.0", "pytest-mock~=3.0", "pytest-cov~=5.0"] [project.urls] homepage = "https://github.com/sharupoff/gwhosts-proxy" [tool.pytest.ini_options] -addopts = "-ra -q --cov-report term-missing --cov gwhosts" +pythonpath = [ + "." +] +addopts = "-ra -q --cov-report term-missing --cov-append --cov gwhosts" [tool.ruff] line-length = 119 +indent-width = 4 target-version = "py38" -[tool.black] -line-length = 119 -target-version = ["py38"] +[tool.ruff.format] +quote-style = "double" +indent-style = "space" +skip-magic-trailing-comma = false +line-ending = "auto" + +[tool.mypy] +explicit_package_bases = true +install_types = true +non_interactive = true +exclude = ["env", "venv", "build"] [tool.setuptools.packages.find] include = ["gwhosts.*"] diff --git a/tests/dns/__init__.py b/tests/dns/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/dns/test_casts.py b/tests/dns/test_casts.py new file mode 100644 index 0000000..e902228 --- /dev/null +++ b/tests/dns/test_casts.py @@ -0,0 +1,52 @@ +import pytest +from gwhosts.dns import Answer, QName, RRType, answer_to_str + + +@pytest.mark.parametrize(("answer", "string"), ( + ( + Answer( + name=QName((b"www", b"youtube", b"com")), + rr_type=RRType.CNAME.value, + rr_class=1, + ttl=810, + rr_data_length=22, + rr_data=b"\x0ayoutube-ui\x01l\x06google\xc0\x18", + ), + "www.youtube.com -> youtube-ui.l.google", + ), + ( + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.A.value, + rr_class=1, + ttl=1933, + rr_data_length=4, + rr_data=b'\xac\xd9\x13\x4e', + ), + "youtube-ui.l.google.com -> 172.217.19.78", + ), + ( + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.AAAA.value, + rr_class=1, + ttl=810, + rr_data_length=16, + rr_data=b"\x2a\x00\x14\x50\x40\x05\x08\x0b\x00\x00\x00\x00\x00\x00\x20\x0e", + ), + "youtube-ui.l.google.com -> 2a00:1450:4005:80b::200e", + ), + ( + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.OPT.value, + rr_class=1, + ttl=810, + rr_data_length=16, + rr_data=b"\x75\x6e\x6b\x6e\x6f\x77\x6e", + ), + "youtube-ui.l.google.com -> b'unknown'", + ), +)) +def test_answer_to_str(answer: Answer, string: str) -> None: + assert answer_to_str(answer) == string diff --git a/tests/dns/test_parser.py b/tests/dns/test_parser.py new file mode 100644 index 0000000..b6988d1 --- /dev/null +++ b/tests/dns/test_parser.py @@ -0,0 +1,131 @@ +import pytest + +from gwhosts.dns import DNSData, Header, Question, Addition, QName, Answer, RRType, parse + + +@pytest.mark.parametrize(("raw", "dto"), ( + ( + b"\xad\xaa\x81\x80\x00\x01\x00\x05\x00\x00\x00\x01\x03\x77\x77\x77\x07\x79\x6f\x75\x74\x75\x62\x65\x03\x63\x6f" + b"\x6d\x00\x00\x1c\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00\x03\x2a\x00\x16\x0a\x79\x6f\x75\x74\x75\x62\x65\x2d" + b"\x75\x69\x01\x6c\x06\x67\x6f\x6f\x67\x6c\x65\xc0\x18\xc0\x2d\x00\x1c\x00\x01\x00\x00\x03\x2a\x00\x10\x2a\x00" + b"\x14\x50\x40\x05\x08\x0b\x00\x00\x00\x00\x00\x00\x20\x0e\xc0\x2d\x00\x1c\x00\x01\x00\x00\x03\x2a\x00\x10\x2a" + b"\x00\x14\x50\x40\x05\x08\x02\x00\x00\x00\x00\x00\x00\x20\x0e\xc0\x2d\x00\x1c\x00\x01\x00\x00\x03\x2a\x00\x10" + b"\x2a\x00\x14\x50\x40\x05\x08\x00\x00\x00\x00\x00\x00\x00\x20\x0e\xc0\x2d\x00\x1c\x00\x01\x00\x00\x03\x2a\x00" + b"\x10\x2a\x00\x14\x50\x40\x05\x08\x01\x00\x00\x00\x00\x00\x00\x20\x0e\x00\x00\x29\xff\xd6\x00\x00\x00\x00\x00" + b"\x00", + DNSData( + header=Header(id=44458, flags=0b10000001_10000000, questions=1, answers=5, authorities=0, additions=1), + questions=[ + Question(name=QName((b"www", b"youtube", b"com")), rr_type=RRType.AAAA.value, rr_class=1), + ], + answers=[ + Answer( + name=QName((b"www", b"youtube", b"com")), + rr_type=RRType.CNAME.value, + rr_class=1, + ttl=810, + rr_data_length=22, + rr_data=b"\x0ayoutube-ui\x01l\x06google\xc0\x18", + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.AAAA.value, + rr_class=1, + ttl=810, + rr_data_length=16, + rr_data=b"\x2a\x00\x14\x50\x40\x05\x08\x0b\x00\x00\x00\x00\x00\x00\x20\x0e", + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.AAAA.value, + rr_class=1, + ttl=810, + rr_data_length=16, + rr_data=b"\x2a\x00\x14\x50\x40\x05\x08\x02\x00\x00\x00\x00\x00\x00\x20\x0e", + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.AAAA.value, + rr_class=1, + ttl=810, + rr_data_length=16, + rr_data=b"\x2a\x00\x14\x50\x40\x05\x08\x00\x00\x00\x00\x00\x00\x00\x20\x0e", + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.AAAA.value, + rr_class=1, + ttl=810, + rr_data_length=16, + rr_data=b"\x2a\x00\x14\x50\x40\x05\x08\x01\x00\x00\x00\x00\x00\x00\x20\x0e", + ), + ], + authorities=[], + additions=[ + Addition(name=QName(), rr_type=RRType.OPT.value, rr_class=65494, ttl=0, rr_data_length=0, rr_data=b""), + ], + ), + ), + ( + b"\x68\x52\x81\x80\x00\x01\x00\x05\x00\x00\x00\x01\x03\x77\x77\x77\x07\x79\x6f\x75\x74\x75\x62\x65\x03\x63\x6f" + b"\x6d\x00\x00\x01\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00\x07\x8d\x00\x16\x0a\x79\x6f\x75\x74\x75\x62\x65\x2d" + b"\x75\x69\x01\x6c\x06\x67\x6f\x6f\x67\x6c\x65\xc0\x18\xc0\x2d\x00\x01\x00\x01\x00\x00\x07\x8d\x00\x04\xac\xd9" + b"\x13\x4e\xc0\x2d\x00\x01\x00\x01\x00\x00\x07\x8d\x00\x04\xac\xd9\x10\x4e\xc0\x2d\x00\x01\x00\x01\x00\x00\x07" + b"\x8d\x00\x04\x8e\xfa\xb5\xce\xc0\x2d\x00\x01\x00\x01\x00\x00\x07\x8d\x00\x04\x8e\xfb\xd1\x8e\x00\x00\x29\xff" + b"\xd6\x00\x00\x00\x00\x00\x00", + DNSData( + header=Header(id=26706, flags=0b10000001_10000000, questions=1, answers=5, authorities=0, additions=1), + questions=[ + Question(name=QName((b'www', b'youtube', b'com')), rr_type=RRType.A.value, rr_class=1), + ], + answers=[ + Answer( + name=QName((b"www", b"youtube", b"com")), + rr_type=RRType.CNAME.value, + rr_class=1, + ttl=1933, + rr_data_length=22, + rr_data=b"\x0ayoutube-ui\x01l\x06google\xc0\x18", + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.A.value, + rr_class=1, + ttl=1933, + rr_data_length=4, + rr_data=b'\xac\xd9\x13\x4e', + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.A.value, + rr_class=1, + ttl=1933, + rr_data_length=4, + rr_data=b'\xac\xd9\x10\x4e', + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.A.value, + rr_class=1, + ttl=1933, + rr_data_length=4, + rr_data=b'\x8e\xfa\xb5\xce', + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.A.value, + rr_class=1, + ttl=1933, + rr_data_length=4, + rr_data=b'\x8e\xfb\xd1\x8e', + ), + ], + authorities=[], + additions=[ + Addition(name=QName(), rr_type=RRType.OPT.value, rr_class=65494, ttl=0, rr_data_length=0, rr_data=b''), + ] + ), + ), +)) +def test_parse(raw: bytes, dto: DNSData) -> None: + assert parse(raw) == dto diff --git a/tests/dns/test_serializer.py b/tests/dns/test_serializer.py new file mode 100644 index 0000000..fb2d01d --- /dev/null +++ b/tests/dns/test_serializer.py @@ -0,0 +1,138 @@ +import pytest + +from gwhosts.dns import DNSData, Header, Question, Addition, QName, Answer, RRType, serialize + + +@pytest.mark.parametrize(("dto", "raw"), ( + ( + DNSData( + header=Header(id=44458, flags=0b10000001_10000000, questions=1, answers=5, authorities=0, additions=1), + questions=[ + Question(name=QName((b"www", b"youtube", b"com")), rr_type=RRType.AAAA.value, rr_class=1), + ], + answers=[ + Answer( + name=QName((b"www", b"youtube", b"com")), + rr_type=RRType.CNAME.value, + rr_class=1, + ttl=810, + rr_data_length=22, + rr_data=b"\x0ayoutube-ui\x01l\x06google\xc0\x18", + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.AAAA.value, + rr_class=1, + ttl=810, + rr_data_length=16, + rr_data=b"\x2a\x00\x14\x50\x40\x05\x08\x0b\x00\x00\x00\x00\x00\x00\x20\x0e", + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.AAAA.value, + rr_class=1, + ttl=810, + rr_data_length=16, + rr_data=b"\x2a\x00\x14\x50\x40\x05\x08\x02\x00\x00\x00\x00\x00\x00\x20\x0e", + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.AAAA.value, + rr_class=1, + ttl=810, + rr_data_length=16, + rr_data=b"\x2a\x00\x14\x50\x40\x05\x08\x00\x00\x00\x00\x00\x00\x00\x20\x0e", + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.AAAA.value, + rr_class=1, + ttl=810, + rr_data_length=16, + rr_data=b"\x2a\x00\x14\x50\x40\x05\x08\x01\x00\x00\x00\x00\x00\x00\x20\x0e", + ), + ], + authorities=[], + additions=[ + Addition(name=QName(), rr_type=RRType.OPT.value, rr_class=65494, ttl=0, rr_data_length=0, rr_data=b""), + ], + ), + b"\xad\xaa\x81\x80\x00\x01\x00\x05\x00\x00\x00\x01\x03\x77\x77\x77\x07\x79\x6f\x75\x74\x75\x62\x65\x03\x63\x6f" + b"\x6d\x00\x00\x1c\x00\x01\x03\x77\x77\x77\x07\x79\x6f\x75\x74\x75\x62\x65\x03\x63\x6f\x6d\x00\x00\x05\x00\x01" + b"\x00\x00\x03\x2a\x00\x16\x0a\x79\x6f\x75\x74\x75\x62\x65\x2d\x75\x69\x01\x6c\x06\x67\x6f\x6f\x67\x6c\x65\xc0" + b"\x18\x0a\x79\x6f\x75\x74\x75\x62\x65\x2d\x75\x69\x01\x6c\x06\x67\x6f\x6f\x67\x6c\x65\x03\x63\x6f\x6d\x00\x00" + b"\x1c\x00\x01\x00\x00\x03\x2a\x00\x10\x2a\x00\x14\x50\x40\x05\x08\x0b\x00\x00\x00\x00\x00\x00\x20\x0e\x0a\x79" + b"\x6f\x75\x74\x75\x62\x65\x2d\x75\x69\x01\x6c\x06\x67\x6f\x6f\x67\x6c\x65\x03\x63\x6f\x6d\x00\x00\x1c\x00\x01" + b"\x00\x00\x03\x2a\x00\x10\x2a\x00\x14\x50\x40\x05\x08\x02\x00\x00\x00\x00\x00\x00\x20\x0e\x0a\x79\x6f\x75\x74" + b"\x75\x62\x65\x2d\x75\x69\x01\x6c\x06\x67\x6f\x6f\x67\x6c\x65\x03\x63\x6f\x6d\x00\x00\x1c\x00\x01\x00\x00\x03" + b"\x2a\x00\x10\x2a\x00\x14\x50\x40\x05\x08\x00\x00\x00\x00\x00\x00\x00\x20\x0e\x0a\x79\x6f\x75\x74\x75\x62\x65" + b"\x2d\x75\x69\x01\x6c\x06\x67\x6f\x6f\x67\x6c\x65\x03\x63\x6f\x6d\x00\x00\x1c\x00\x01\x00\x00\x03\x2a\x00\x10" + b"\x2a\x00\x14\x50\x40\x05\x08\x01\x00\x00\x00\x00\x00\x00\x20\x0e\x00\x00\x29\xff\xd6\x00\x00\x00\x00\x00\x00", + ), + ( + DNSData( + header=Header(id=26706, flags=0b10000001_10000000, questions=1, answers=5, authorities=0, additions=1), + questions=[ + Question(name=QName((b'www', b'youtube', b'com')), rr_type=RRType.A.value, rr_class=1), + ], + answers=[ + Answer( + name=QName((b"www", b"youtube", b"com")), + rr_type=RRType.CNAME.value, + rr_class=1, + ttl=1933, + rr_data_length=22, + rr_data=b"\x0ayoutube-ui\x01l\x06google\xc0\x18", + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.A.value, + rr_class=1, + ttl=1933, + rr_data_length=4, + rr_data=b'\xac\xd9\x13\x4e', + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.A.value, + rr_class=1, + ttl=1933, + rr_data_length=4, + rr_data=b'\xac\xd9\x10\x4e', + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.A.value, + rr_class=1, + ttl=1933, + rr_data_length=4, + rr_data=b'\x8e\xfa\xb5\xce', + ), + Answer( + name=QName((b"youtube-ui", b"l", b"google", b"com")), + rr_type=RRType.A.value, + rr_class=1, + ttl=1933, + rr_data_length=4, + rr_data=b'\x8e\xfb\xd1\x8e', + ), + ], + authorities=[], + additions=[ + Addition(name=QName(), rr_type=RRType.OPT.value, rr_class=65494, ttl=0, rr_data_length=0, rr_data=b''), + ], + ), + b"\x68\x52\x81\x80\x00\x01\x00\x05\x00\x00\x00\x01\x03\x77\x77\x77\x07\x79\x6f\x75\x74\x75\x62\x65\x03\x63\x6f" + b"\x6d\x00\x00\x01\x00\x01\x03\x77\x77\x77\x07\x79\x6f\x75\x74\x75\x62\x65\x03\x63\x6f\x6d\x00\x00\x05\x00\x01" + b"\x00\x00\x07\x8d\x00\x16\x0a\x79\x6f\x75\x74\x75\x62\x65\x2d\x75\x69\x01\x6c\x06\x67\x6f\x6f\x67\x6c\x65\xc0" + b"\x18\x0a\x79\x6f\x75\x74\x75\x62\x65\x2d\x75\x69\x01\x6c\x06\x67\x6f\x6f\x67\x6c\x65\x03\x63\x6f\x6d\x00\x00" + b"\x01\x00\x01\x00\x00\x07\x8d\x00\x04\xac\xd9\x13\x4e\x0a\x79\x6f\x75\x74\x75\x62\x65\x2d\x75\x69\x01\x6c\x06" + b"\x67\x6f\x6f\x67\x6c\x65\x03\x63\x6f\x6d\x00\x00\x01\x00\x01\x00\x00\x07\x8d\x00\x04\xac\xd9\x10\x4e\x0a\x79" + b"\x6f\x75\x74\x75\x62\x65\x2d\x75\x69\x01\x6c\x06\x67\x6f\x6f\x67\x6c\x65\x03\x63\x6f\x6d\x00\x00\x01\x00\x01" + b"\x00\x00\x07\x8d\x00\x04\x8e\xfa\xb5\xce\x0a\x79\x6f\x75\x74\x75\x62\x65\x2d\x75\x69\x01\x6c\x06\x67\x6f\x6f" + b"\x67\x6c\x65\x03\x63\x6f\x6d\x00\x00\x01\x00\x01\x00\x00\x07\x8d\x00\x04\x8e\xfb\xd1\x8e\x00\x00\x29\xff\xd6" + b"\x00\x00\x00\x00\x00\x00", + ), +)) +def test_serialize(raw: bytes, dto: DNSData) -> None: + assert serialize(dto) == raw diff --git a/tests/dns/test_types.py b/tests/dns/test_types.py new file mode 100644 index 0000000..865305a --- /dev/null +++ b/tests/dns/test_types.py @@ -0,0 +1,28 @@ +import pytest + +from gwhosts.dns import Header + + +@pytest.mark.parametrize(("flags", "qr", "aa", "tc", "rd", "ra"), ( + (0b00000000_00000000, False, False, False, False, False), + (0b10000000_00000000, True, False, False, False, False), + (0b00000100_00000000, False, True, False, False, False), + (0b00000010_00000000, False, False, True, False, False), + (0b00000001_00000000, False, False, False, True, False), + (0b00000000_10000000, False, False, False, False, True), + (0b10000111_10000000, True, True, True, True, True), +)) +def test_header_flags(flags: int, qr: bool, aa: bool, tc: bool, rd: bool, ra: bool) -> None: + header = Header( + id=0, + flags=flags, + questions=0, + answers=0, + authorities=0, + additions=0, + ) + assert header.qr == qr + assert header.aa == aa + assert header.tc == tc + assert header.rd == rd + assert header.ra == ra diff --git a/tests/network/ipv4/__init__.py b/tests/network/ipv4/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/network/ipv4/test_casts.py b/tests/network/ipv4/test_casts.py new file mode 100644 index 0000000..61a5ee0 --- /dev/null +++ b/tests/network/ipv4/test_casts.py @@ -0,0 +1,123 @@ +from typing import NamedTuple + +import pytest + +from gwhosts.network import ( + IPAddress, + IPBinary, + Network, +) +from gwhosts.network.ipv4 import ( + ipv4_bytes_to_int, + ipv4_int_to_bytes, + ipv4_bytes_to_str, + ipv4_str_to_bytes, + ipv4_str_to_int, + ipv4_int_to_str, + ipv4_network_to_str, + ipv4_str_to_network, + ipv4_netmask_to_network_size, + ipv4_network_size_to_netmask, +) + + +class IPRepr(NamedTuple): + number: IPBinary + bytes: bytes + string: IPAddress + + +_IPV4_DATA = ( + # randint(0, 2**32-1) + IPRepr(0xA4_BE_42_2F, b"\xa4\xbe\x42\x2f", "164.190.66.47"), + IPRepr(0xD3_16_23_F8, b"\xd3\x16\x23\xf8", "211.22.35.248"), + IPRepr(0x8B_0D_BE_B7, b"\x8b\x0d\xbe\xb7", "139.13.190.183"), + IPRepr(0x43_BC_F1_A8, b"\x43\xbc\xf1\xa8", "67.188.241.168"), + IPRepr(0xDD_8C_04_E5, b"\xdd\x8c\x04\xe5", "221.140.4.229"), +) + + +@pytest.mark.parametrize(("src", "dst"), ((ip.bytes, ip.number) for ip in _IPV4_DATA)) +def test_ipv4_bytes_to_int(src: bytes, dst: IPBinary) -> None: + assert ipv4_bytes_to_int(src) == dst + + +@pytest.mark.parametrize(("src", "dst"), ((ip.number, ip.bytes) for ip in _IPV4_DATA)) +def test_ipv4_int_to_bytes(src: IPBinary, dst: bytes) -> None: + assert ipv4_int_to_bytes(src) == dst + + +@pytest.mark.parametrize(("src", "dst"), ((ip.bytes, ip.string) for ip in _IPV4_DATA)) +def test_ipv4_bytes_to_str(src: bytes, dst: IPAddress) -> None: + assert ipv4_bytes_to_str(src) == dst + + +@pytest.mark.parametrize(("src", "dst"), ((ip.string, ip.bytes) for ip in _IPV4_DATA)) +def test_ipv4_str_to_bytes(src: IPAddress, dst: bytes) -> None: + assert ipv4_str_to_bytes(src) == dst + + +@pytest.mark.parametrize(("src", "dst"), ((ip.string, ip.number) for ip in _IPV4_DATA)) +def test_ipv4_str_to_int(src: IPAddress, dst: IPBinary) -> None: + assert ipv4_str_to_int(src) == dst + + +@pytest.mark.parametrize(("src", "dst"), ((ip.number, ip.string) for ip in _IPV4_DATA)) +def test_ipv4_int_to_str(src: IPBinary, dst: IPAddress) -> None: + assert ipv4_int_to_str(src) == dst + + +_NETWORK_DATA = ( + (Network(0x68_B9_1D_DC, 0xFFFFFFFF), "104.185.29.220/32"), + (Network(0x14_54_27_00, 0xFFFFFF00), "20.84.39.0/24"), + (Network(0xC7_17_00_00, 0xFFFF0000), "199.23.0.0/16"), + (Network(0x77_00_00_00, 0xFF000000), "119.0.0.0/8"), +) + + +@pytest.mark.parametrize(("network", "address"), _NETWORK_DATA) +def test_ipv4_network_to_str(network: Network, address: str) -> None: + assert ipv4_network_to_str(network) == address + + +@pytest.mark.parametrize( + ("network", "address"), + ( + (Network(0x9B_65_04_EF, 0xFFFFFFFF), "155.101.4.239"), + *_NETWORK_DATA, + ), +) +def test_ipv4_str_to_network(network: Network, address: str) -> None: + assert ipv4_str_to_network(address) == network + + +@pytest.mark.parametrize( + "address", + ( + "19/09/2024", + "01/57/AM", + ), +) +def test_ipv4_not_a_network(address: str) -> None: + with pytest.raises(ValueError) as cast_error: + ipv4_str_to_network(address) + + assert str(cast_error.value) == f"{address} is not a network address" + + +_NETMASK_DATA = ( + (32, 0xFFFFFFFF), + (24, 0xFFFFFF00), + (16, 0xFFFF0000), + (8, 0xFF000000), +) + + +@pytest.mark.parametrize(("size", "mask"), _NETMASK_DATA) +def test_ipv4_netmask_to_network_size(size: int, mask: IPBinary) -> None: + assert ipv4_netmask_to_network_size(mask) == size + + +@pytest.mark.parametrize(("size", "mask"), _NETMASK_DATA) +def test_ipv4_network_size_to_netmask(size: int, mask: IPBinary) -> None: + assert ipv4_network_size_to_netmask(size) == mask diff --git a/tests/network/ipv4/test_utils.py b/tests/network/ipv4/test_utils.py new file mode 100644 index 0000000..22d2c85 --- /dev/null +++ b/tests/network/ipv4/test_utils.py @@ -0,0 +1,64 @@ +from typing import Set, List + +import pytest + +from gwhosts.network.ipv4 import ( + ipv4_reduce_subnets, + ipv4_network_to_str, + ipv4_str_to_network, +) + + +@pytest.mark.parametrize( + ("source", "result"), + ( + ( + {"192.168.1.1"}, + {"192.168.1.1/32"}, + ), + ( + {"192.168.1.1", "192.168.1.2"}, + {"192.168.1.0/24"}, + ), + ( + {"192.168.1.1", "192.168.1.2", "192.168.2.1", "192.168.2.2"}, + {"192.168.0.0/16"}, + ), + ( + {"192.168.1.1", "192.168.1.2", "192.168.2.1", "192.168.2.2", "192.1.1.1"}, + {"192.168.0.0/16", "192.1.1.1/32"}, + ), + ( + {"192.168.1.1", "192.168.1.2", "192.168.2.1", "192.168.2.2", "192.1.1.1", "1.1.1.1"}, + {"192.168.0.0/16", "192.1.1.1/32", "1.1.1.1/32"}, + ), + ( + {"192.168.0.0/16", "192.168.1.0/24"}, + {"192.168.0.0/16"}, + ), + ), +) +def test_ipv4_reduce_subnets(source: Set[str], result: Set[str]) -> None: + assert { + ipv4_network_to_str(subnet) + for subnet in ipv4_reduce_subnets(ipv4_str_to_network(address) for address in source) + } == result + + +@pytest.mark.parametrize( + ("source", "result"), + ( + ( + {"192.168.1.1", "192.168.1.2", "192.168.2.1", "192.168.2.2", "192.1.1.1", "1.1.1.1"}, + ["1.1.1.1/32", "192.1.1.1/32", "192.168.1.1/32", "192.168.1.2/32", "192.168.2.1/32", "192.168.2.2/32"], + ), + ( + {"192.168.1.0/24", "192.168.0.0/16"}, + ["192.168.0.0/16", "192.168.1.0/24"], + ), + ), +) +def test_ipv4_sort_addresses(source: Set[str], result: List[str]) -> None: + assert [ + ipv4_network_to_str(network) for network in sorted(ipv4_str_to_network(address) for address in source) + ] == result diff --git a/tests/network/ipv6/__init__.py b/tests/network/ipv6/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/network/test_casts.py b/tests/network/ipv6/test_casts.py similarity index 51% rename from tests/network/test_casts.py rename to tests/network/ipv6/test_casts.py index d9f5b5a..239dfd0 100644 --- a/tests/network/test_casts.py +++ b/tests/network/ipv6/test_casts.py @@ -6,22 +6,18 @@ IPAddress, IPBinary, Network, - ipv4_bytes_to_int, - ipv4_int_to_bytes, - ipv4_bytes_to_str, - ipv4_str_to_bytes, - ipv4_str_to_int, - ipv4_int_to_str, +) +from gwhosts.network.ipv6 import ( ipv6_bytes_to_int, ipv6_int_to_bytes, ipv6_bytes_to_str, ipv6_str_to_bytes, ipv6_str_to_int, ipv6_int_to_str, - network_to_str, - str_to_network, - netmask_to_network_size, - network_size_to_netmask, + ipv6_network_to_str, + ipv6_str_to_network, + ipv6_netmask_to_network_size, + ipv6_network_size_to_netmask, ) @@ -31,46 +27,6 @@ class IPRepr(NamedTuple): string: IPAddress -_IPV4_DATA = ( - # randint(0, 2**32-1) - IPRepr(0xA4_BE_42_2F, b"\xa4\xbe\x42\x2f", "164.190.66.47"), - IPRepr(0xD3_16_23_F8, b"\xd3\x16\x23\xf8", "211.22.35.248"), - IPRepr(0x8B_0D_BE_B7, b"\x8b\x0d\xbe\xb7", "139.13.190.183"), - IPRepr(0x43_BC_F1_A8, b"\x43\xbc\xf1\xa8", "67.188.241.168"), - IPRepr(0xDD_8C_04_E5, b"\xdd\x8c\x04\xe5", "221.140.4.229"), -) - - -@pytest.mark.parametrize(("src", "dst"), ((ip.bytes, ip.number) for ip in _IPV4_DATA)) -def test_ipv4_bytes_to_int(src: bytes, dst: IPBinary) -> None: - assert ipv4_bytes_to_int(src) == dst - - -@pytest.mark.parametrize(("src", "dst"), ((ip.number, ip.bytes) for ip in _IPV4_DATA)) -def test_ipv4_int_to_bytes(src: IPBinary, dst: bytes) -> None: - assert ipv4_int_to_bytes(src) == dst - - -@pytest.mark.parametrize(("src", "dst"), ((ip.bytes, ip.string) for ip in _IPV4_DATA)) -def test_ipv4_bytes_to_str(src: bytes, dst: IPAddress) -> None: - assert ipv4_bytes_to_str(src) == dst - - -@pytest.mark.parametrize(("src", "dst"), ((ip.string, ip.bytes) for ip in _IPV4_DATA)) -def test_ipv4_str_to_bytes(src: IPAddress, dst: bytes) -> None: - assert ipv4_str_to_bytes(src) == dst - - -@pytest.mark.parametrize(("src", "dst"), ((ip.string, ip.number) for ip in _IPV4_DATA)) -def test_ipv4_str_to_int(src: IPAddress, dst: IPBinary) -> None: - assert ipv4_str_to_int(src) == dst - - -@pytest.mark.parametrize(("src", "dst"), ((ip.number, ip.string) for ip in _IPV4_DATA)) -def test_ipv4_int_to_str(src: IPBinary, dst: IPAddress) -> None: - assert ipv4_int_to_str(src) == dst - - _IPV6_DATA = ( # randint(0, 2**128-1) IPRepr( @@ -100,6 +56,7 @@ def test_ipv4_int_to_str(src: IPBinary, dst: IPAddress) -> None: ), ) + @pytest.mark.parametrize(("src", "dst"), ((ip.bytes, ip.number) for ip in _IPV6_DATA)) def test_ipv6_bytes_to_int(src: bytes, dst: IPBinary) -> None: assert ipv6_bytes_to_int(src) == dst @@ -131,49 +88,66 @@ def test_ipv6_int_to_str(src: IPBinary, dst: IPAddress) -> None: _NETWORK_DATA = ( - (Network(0x68_B9_1D_DC, 0xFFFFFFFF), "104.185.29.220/32"), - (Network(0x14_54_27_00, 0xFFFFFF00), "20.84.39.0/24"), - (Network(0xC7_17_00_00, 0xFFFF0000), "199.23.0.0/16"), - (Network(0x77_00_00_00, 0xFF000000), "119.0.0.0/8"), + ( + Network(0x2A03_2880_F145_0082_FACE_B00C_0000_25DE, 0xFFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF), + "2a03:2880:f145:82:face:b00c:0:25de/128", + ), + ( + Network(0x2A00_1450_4005_0800_0000_0000_0000_0000, 0xFFFF_FFFF_FFFF_FF00_0000_0000_0000_0000), + "2a00:1450:4005:800::/56", + ), + ( + Network(0x2603_1030_0000_0000_0000_0000_0000_0000, 0xFFFF_FFFF_0000_0000_0000_0000_0000_0000), + "2603:1030::/32", + ), ) @pytest.mark.parametrize(("network", "address"), _NETWORK_DATA) -def test_network_to_str(network: Network, address: str) -> None: - assert network_to_str(network) == address - - -@pytest.mark.parametrize(("network", "address"), ( - (Network(0x9B_65_04_EF, 0xFFFFFFFF), "155.101.4.239"), - *_NETWORK_DATA, -)) -def test_str_to_network(network: Network, address: str) -> None: - assert str_to_network(address) == network +def test_ipv6_network_to_str(network: Network, address: str) -> None: + assert ipv6_network_to_str(network) == address + + +@pytest.mark.parametrize( + ("network", "address"), + ( + ( + Network(0x99D0_E578_266F_3196_4BD7_F557_BAF7_4A6A, 0xFFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF), + "99d0:e578:266f:3196:4bd7:f557:baf7:4a6a", + ), + *_NETWORK_DATA, + ), +) +def test_ipv6_str_to_network(network: Network, address: str) -> None: + assert ipv6_str_to_network(address) == network -@pytest.mark.parametrize("address", ( - "19/09/2024", - "01/57/AM", -)) -def test_not_a_network(address: str) -> None: +@pytest.mark.parametrize( + "address", + ( + "19/09/2024", + "01/57/AM", + ), +) +def test_ipv6_not_a_network(address: str) -> None: with pytest.raises(ValueError) as cast_error: - str_to_network(address) + ipv6_str_to_network(address) assert str(cast_error.value) == f"{address} is not a network address" _NETMASK_DATA = ( - (32, 0xFFFFFFFF), - (24, 0xFFFFFF00), - (16, 0xFFFF0000), - (8, 0xFF000000), + (128, 0xFFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF), + (56, 0xFFFF_FFFF_FFFF_FF00_0000_0000_0000_0000), + (32, 0xFFFF_FFFF_0000_0000_0000_0000_0000_0000), ) + @pytest.mark.parametrize(("size", "mask"), _NETMASK_DATA) -def test_netmask_to_network_size(size: int, mask: IPBinary) -> None: - assert netmask_to_network_size(mask) == size +def test_ipv6_netmask_to_network_size(size: int, mask: IPBinary) -> None: + assert ipv6_netmask_to_network_size(mask) == size @pytest.mark.parametrize(("size", "mask"), _NETMASK_DATA) -def test_network_size_to_netmask(size: int, mask: IPBinary) -> None: - assert network_size_to_netmask(size) == mask +def test_ipv6_network_size_to_netmask(size: int, mask: IPBinary) -> None: + assert ipv6_network_size_to_netmask(size) == mask diff --git a/tests/network/ipv6/test_utils.py b/tests/network/ipv6/test_utils.py new file mode 100644 index 0000000..edd733b --- /dev/null +++ b/tests/network/ipv6/test_utils.py @@ -0,0 +1,52 @@ +from typing import Set, List + +import pytest + +from gwhosts.network.ipv6 import ( + ipv6_reduce_subnets, + ipv6_network_to_str, + ipv6_str_to_network, +) + + +@pytest.mark.parametrize( + ("source", "result"), + ( + ( + { + "2a00:1450:4005:801::200e", + "2a00:1450:4005:80b::200e", + "2a00:1450:4005:802::200e", + "2a00:1450:4005:800::200e", + "2a00:1450:4005:801::200e", + "2a00:1450:4005:800::2004", + "2a00:1450:4005:802::2003", + }, + {"2a00:1450:4005:800::/56"}, + ), + ), +) +def test_ipv6_reduce_subnets(source: Set[str], result: Set[str]) -> None: + assert { + ipv6_network_to_str(subnet) + for subnet in ipv6_reduce_subnets(ipv6_str_to_network(address) for address in source) + } == result + + +@pytest.mark.parametrize( + ("source", "result"), + ( + ( + {"2a00:1450:4005:801::200e", "2a00:1450:4005:80b::200e", "2a00:1450:4005:800::2004"}, + ["2a00:1450:4005:800::2004/128", "2a00:1450:4005:801::200e/128", "2a00:1450:4005:80b::200e/128"], + ), + ( + {"2a00:1450:4005:800::/56", "2603:1030::/32"}, + ["2603:1030::/32", "2a00:1450:4005:800::/56"], + ), + ), +) +def test_ipv6_sort_addresses(source: Set[str], result: List[str]) -> None: + assert [ + ipv6_network_to_str(network) for network in sorted(ipv6_str_to_network(address) for address in source) + ] == result diff --git a/tests/network/test_utils.py b/tests/network/test_utils.py deleted file mode 100644 index 8c92cc0..0000000 --- a/tests/network/test_utils.py +++ /dev/null @@ -1,59 +0,0 @@ -from typing import Set, List - -import pytest - -from gwhosts.network import ( - reduce_subnets, - network_to_str, - str_to_network, -) - - -@pytest.mark.parametrize(("source", "result"), ( - ( - {"192.168.1.1"}, - {"192.168.1.1/32"}, - ), - ( - {"192.168.1.1", "192.168.1.2"}, - {"192.168.1.0/24"}, - ), - ( - {"192.168.1.1", "192.168.1.2", "192.168.2.1", "192.168.2.2"}, - {"192.168.0.0/16"}, - ), - ( - {"192.168.1.1", "192.168.1.2", "192.168.2.1", "192.168.2.2", "192.1.1.1"}, - {"192.168.0.0/16", "192.1.1.1/32"}, - ), - ( - {"192.168.1.1", "192.168.1.2", "192.168.2.1", "192.168.2.2", "192.1.1.1", "1.1.1.1"}, - {"192.168.0.0/16", "192.1.1.1/32", "1.1.1.1/32"}, - ), - ( - {"192.168.0.0/16", "192.168.1.0/24"}, - {"192.168.0.0/16"}, - ) -)) -def test_reduce_subnets(source: Set[str], result: Set[str]) -> None: - assert { - network_to_str(subnet) - for subnet in reduce_subnets( - str_to_network(address) - for address in source - ) - } == result - - -@pytest.mark.parametrize(("source", "result"), ( - ( - {"192.168.1.1", "192.168.1.2", "192.168.2.1", "192.168.2.2", "192.1.1.1", "1.1.1.1"}, - ['1.1.1.1/32', '192.1.1.1/32', '192.168.1.1/32', '192.168.1.2/32', '192.168.2.1/32', '192.168.2.2/32'], - ), - ( - {"192.168.1.0/24", "192.168.0.0/16"}, - ["192.168.0.0/16", "192.168.1.0/24"], - ), -)) -def test_sort_addresses(source: Set[str], result: List[str]) -> None: - assert [network_to_str(network) for network in sorted(str_to_network(address) for address in source)] == result diff --git a/tests/performance/__init__.py b/tests/performance/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/performance/test_gc.py b/tests/performance/test_gc.py new file mode 100644 index 0000000..8740cb1 --- /dev/null +++ b/tests/performance/test_gc.py @@ -0,0 +1,12 @@ +import gc + +from gwhosts.performance import no_gc + + +def test_no_gc() -> None: + is_gc_enabled = gc.isenabled() + + with no_gc(): + assert gc.isenabled() is False + + assert gc.isenabled() == is_gc_enabled diff --git a/tests/proxy/__init__.py b/tests/proxy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/proxy/test_proxy.py b/tests/proxy/test_proxy.py new file mode 100644 index 0000000..3b974f8 --- /dev/null +++ b/tests/proxy/test_proxy.py @@ -0,0 +1,51 @@ +import pytest +from gwhosts.proxy import DNSProxy +from gwhosts.dns import QName +from gwhosts.network import UDPSocket +from logging import getLogger +from pytest_mock import MockerFixture +from typing import List + + +_logger = getLogger("pytest") + + +@pytest.fixture() +def proxy() -> DNSProxy: + return DNSProxy( + gateway="192.168.2.1", + hostnames={QName((b"example", b"com"))}, + logger=_logger, + ipv6_gateway="fced:9999::1", + ) + + +@pytest.mark.parametrize("listdir", ([], ["0"], ["0", "1"], ["0", "1", "2"], ["0", "1", "2", "3"])) +def test_open_files_count(mocker: MockerFixture, proxy: DNSProxy, listdir: List[str]) -> None: + mock_os_listdir = mocker.patch("os.listdir", return_value=listdir) + + assert proxy._open_files_count == len(listdir) + mock_os_listdir.assert_called_once_with("/proc/self/fd") + + +def test_active_pool(proxy: DNSProxy) -> None: + pass + + +def test_get_socket(proxy: DNSProxy) -> None: + free_pool_socket = UDPSocket() + proxy._free_pool = [free_pool_socket] + + assert proxy._get_socket() is free_pool_socket + assert len(proxy._free_pool) == 0 + assert proxy._get_socket() is not free_pool_socket + + +@pytest.mark.parametrize(("hostname", "exists"), ( + (QName((b"something", b"example", b"com")), True), + (QName((b"example", b"com")), True), + (QName((b"something", b"com")), False), + (QName((b"com",)), False), +)) +def test_hostname_exists(proxy: DNSProxy, hostname: QName, exists: bool) -> None: + assert proxy._hostname_exists(hostname) is exists diff --git a/tests/routes/__init__.py b/tests/routes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/routes/test_netlink.py b/tests/routes/test_netlink.py new file mode 100644 index 0000000..c87a7fc --- /dev/null +++ b/tests/routes/test_netlink.py @@ -0,0 +1,189 @@ +from socket import AF_INET, AF_INET6 + +import pytest +from pyroute2.iproute.linux import DEFAULT_TABLE +from pyroute2.netlink import ( + NLM_F_ACK, + NLM_F_CREATE, + NLM_F_DUMP, + NLM_F_REPLACE, + NLM_F_REQUEST, +) +from pyroute2.netlink.rtnl import ( + RTM_DELROUTE, + RTM_GETROUTE, + RTM_NEWROUTE, +) +from pyroute2.netlink.rtnl import ( + rt_proto, + rt_type, +) +from pyroute2.netlink.rtnl.rtmsg import rtmsg +from pytest_mock import MockerFixture + +from gwhosts.network import Network, IPAddress +from gwhosts.network.ipv4 import ipv4_int_to_str, ipv4_netmask_to_network_size +from gwhosts.network.ipv6 import ipv6_int_to_str, ipv6_netmask_to_network_size +from gwhosts.routes import Netlink + + +@pytest.fixture +def netlink(mocker: MockerFixture) -> Netlink: + mocker.patch("gwhosts.routes.Netlink.__init__", return_value=None) + mocker.patch("gwhosts.routes.Netlink.put", return_value=None) + return Netlink() + +def test_ipv4_get_routes(netlink: Netlink) -> None: + msg = rtmsg() + msg["family"] = AF_INET + msg["table"] = DEFAULT_TABLE + + netlink.ipv4_get_routes() + + netlink.put.assert_called_once_with( + msg=msg, + msg_type=RTM_GETROUTE, + msg_flags=NLM_F_REQUEST | NLM_F_ACK | NLM_F_DUMP, + ) + + +def test_ipv6_get_routes(netlink: Netlink) -> None: + msg = rtmsg() + msg["family"] = AF_INET6 + msg["table"] = DEFAULT_TABLE + + netlink.ipv6_get_routes() + + netlink.put.assert_called_once_with( + msg=msg, + msg_type=RTM_GETROUTE, + msg_flags=NLM_F_REQUEST | NLM_F_ACK | NLM_F_DUMP, + ) + + +@pytest.mark.parametrize(("network", "gateway"), ( + (Network(0x68_B9_1D_DC, 0xFFFFFFFF), "104.185.29.1"), + (Network(0x14_54_27_00, 0xFFFFFF00), "20.84.39.1"), + (Network(0xC7_17_00_00, 0xFFFF0000), "199.23.0.1"), + (Network(0x77_00_00_00, 0xFF000000), "119.0.0.1"), +)) +def test_ipv4_add_route(netlink: Netlink, network: Network, gateway: IPAddress) -> None: + msg = rtmsg() + msg["family"] = AF_INET + msg["table"] = DEFAULT_TABLE + msg["proto"] = rt_proto["static"] + msg["type"] = rt_type["unicast"] + msg["dst_len"] = ipv4_netmask_to_network_size(network.mask) + msg["attrs"] = [ + ("RTA_TABLE", DEFAULT_TABLE), + ("RTA_DST", ipv4_int_to_str(network.address)), + ("RTA_GATEWAY", gateway), + ] + + netlink.ipv4_add_route(network, gateway) + + netlink.put.assert_called_once_with( + msg=msg, + msg_type=RTM_NEWROUTE, + msg_flags=NLM_F_REQUEST | NLM_F_ACK | NLM_F_CREATE | NLM_F_REPLACE, + ) + + +@pytest.mark.parametrize(("network", "gateway"), ( + ( + Network(0x2A03_2880_F145_0082_FACE_B00C_0000_25DE, 0xFFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF), + "2a03:2880:f145:82:face:b00c:0:1", + ), + ( + Network(0x2A00_1450_4005_0800_0000_0000_0000_0000, 0xFFFF_FFFF_FFFF_FF00_0000_0000_0000_0000), + "2a00:1450:4005:800::1", + ), + ( + Network(0x2603_1030_0000_0000_0000_0000_0000_0000, 0xFFFF_FFFF_0000_0000_0000_0000_0000_0000), + "2603:1030::1", + ), +)) +def test_ipv6_add_route(netlink: Netlink, network: Network, gateway: IPAddress) -> None: + msg = rtmsg() + msg["family"] = AF_INET6 + msg["table"] = DEFAULT_TABLE + msg["proto"] = rt_proto["static"] + msg["type"] = rt_type["unicast"] + msg["dst_len"] = ipv6_netmask_to_network_size(network.mask) + msg["attrs"] = [ + ("RTA_TABLE", DEFAULT_TABLE), + ("RTA_DST", ipv6_int_to_str(network.address)), + ("RTA_GATEWAY", gateway), + ] + + netlink.ipv6_add_route(network, gateway) + + netlink.put.assert_called_once_with( + msg=msg, + msg_type=RTM_NEWROUTE, + msg_flags=NLM_F_REQUEST | NLM_F_ACK | NLM_F_CREATE | NLM_F_REPLACE, + ) + + +@pytest.mark.parametrize(("network", "gateway"), ( + (Network(0x68_B9_1D_DC, 0xFFFFFFFF), "104.185.29.1"), + (Network(0x14_54_27_00, 0xFFFFFF00), "20.84.39.1"), + (Network(0xC7_17_00_00, 0xFFFF0000), "199.23.0.1"), + (Network(0x77_00_00_00, 0xFF000000), "119.0.0.1"), +)) +def test_ipv4_del_route(netlink: Netlink, network: Network, gateway: IPAddress) -> None: + msg = rtmsg() + msg["family"] = AF_INET + msg["table"] = DEFAULT_TABLE + msg["proto"] = rt_proto["static"] + msg["type"] = rt_type["unicast"] + msg["dst_len"] = ipv4_netmask_to_network_size(network.mask) + msg["attrs"] = [ + ("RTA_TABLE", DEFAULT_TABLE), + ("RTA_DST", ipv4_int_to_str(network.address)), + ("RTA_GATEWAY", gateway), + ] + + netlink.ipv4_del_route(network, gateway) + + netlink.put.assert_called_once_with( + msg=msg, + msg_type=RTM_DELROUTE, + msg_flags=NLM_F_REQUEST | NLM_F_ACK, + ) + + +@pytest.mark.parametrize(("network", "gateway"), ( + ( + Network(0x2A03_2880_F145_0082_FACE_B00C_0000_25DE, 0xFFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF), + "2a03:2880:f145:82:face:b00c:0:1", + ), + ( + Network(0x2A00_1450_4005_0800_0000_0000_0000_0000, 0xFFFF_FFFF_FFFF_FF00_0000_0000_0000_0000), + "2a00:1450:4005:800::1", + ), + ( + Network(0x2603_1030_0000_0000_0000_0000_0000_0000, 0xFFFF_FFFF_0000_0000_0000_0000_0000_0000), + "2603:1030::1", + ), +)) +def test_ipv6_del_route(netlink: Netlink, network: Network, gateway: IPAddress) -> None: + msg = rtmsg() + msg["family"] = AF_INET6 + msg["table"] = DEFAULT_TABLE + msg["proto"] = rt_proto["static"] + msg["type"] = rt_type["unicast"] + msg["dst_len"] = ipv6_netmask_to_network_size(network.mask) + msg["attrs"] = [ + ("RTA_TABLE", DEFAULT_TABLE), + ("RTA_DST", ipv6_int_to_str(network.address)), + ("RTA_GATEWAY", gateway), + ] + + netlink.ipv6_del_route(network, gateway) + + netlink.put.assert_called_once_with( + msg=msg, + msg_type=RTM_DELROUTE, + msg_flags=NLM_F_REQUEST | NLM_F_ACK, + ) diff --git a/tests/test_dns.py b/tests/test_dns.py deleted file mode 100644 index 79ab68d..0000000 --- a/tests/test_dns.py +++ /dev/null @@ -1,48 +0,0 @@ -import pytest - -from gwhosts.dns import ( - DNSData, - Header, - Question, - Answer, - parse, - serialize, -) - -parameters = ( - ( - b"e\x03\x85\x80\x00\x01\x00\x03\x00\x00\x00\x00\x06google\x03com\x00\x00\x01\x00\x01\x06google\x03com\x00\x00" - b"\x01\x00\x01\x00\x00\x00\x00\x00\x04@\xe9\xa4d\x06google\x03com\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x04@" - b"\xe9\xa4\x8b\x06google\x03com\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x04@\xe9\xa4\x8a", - DNSData( - header=Header( - id=25859, - flags=0b10000101_10000000, - questions=1, - answers=3, - authorities=0, - additions=0, - ), - questions=[ - Question(name=(b'google', b'com'), rr_type=1, rr_class=1), - ], - answers=[ - Answer(name=(b'google', b'com'), rr_type=1, rr_class=1, ttl=0, rr_data_length=4, rr_data=b'@\xe9\xa4d'), - Answer(name=(b'google', b'com'), rr_type=1, rr_class=1, ttl=0, rr_data_length=4, rr_data=b'@\xe9\xa4\x8b'), - Answer(name=(b'google', b'com'), rr_type=1, rr_class=1, ttl=0, rr_data_length=4, rr_data=b'@\xe9\xa4\x8a'), - ], - authorities=[], - additions=[], - ), - ), -) - - -@pytest.mark.parametrize(("raw", "dto"), parameters) -def test_parse(raw: bytes, dto: DNSData) -> None: - assert parse(raw) == dto - - -@pytest.mark.parametrize(("raw", "dto"), parameters) -def test_serialize(raw: bytes, dto: DNSData) -> None: - assert serialize(dto) == raw diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..af374cb --- /dev/null +++ b/tox.ini @@ -0,0 +1,16 @@ +[tox] +envlist = py3{8,9,10,11,12,13},pypy3{9,10},format,check + +[testenv] +extras = test +commands = pytest {posargs} + +[testenv:lint] +skip_install = true +deps = ruff~=0.6.5 +commands = ruff check + +[testenv:format] +skip_install = true +deps = ruff~=0.6.5 +commands = ruff format --check