diff --git a/container/example-zeroconf.yaml b/container/example-zeroconf.yaml new file mode 100644 index 0000000..ed2a323 --- /dev/null +++ b/container/example-zeroconf.yaml @@ -0,0 +1,34 @@ +nets: + dtnA: + subnet4: "192.168.100.0/24" + dtnB: + subnet4: "192.168.110.0/24" +# subnet6: "fda1:1cec:f450:c055::/64" + +nodes: + node000: + nets: [dtnA] + keys: + sign: + keytype: SECP256R1 + config: + apps: + zeroconf: + enumerate: true + tls_enable: false + + node001: + nets: [dtnA, dtnB] + config: + apps: + zeroconf: + offer: true + tls_enable: false + + node002: + nets: [dtnB] + config: + apps: + zeroconf: + enumerate: true + tls_enable: false diff --git a/container/example.yaml b/container/example.yaml index 78e2d58..30698d7 100644 --- a/container/example.yaml +++ b/container/example.yaml @@ -9,26 +9,30 @@ nodes: node000: nets: [dtnA] keys: - sign: - keytype: SECP256R1 + sign: + keytype: SECP256R1 config: - apps: - nmp: - enable: true - tls_enable: true + apps: + nmp: + enable: true + zeroconf: + enumerate: true + tls_enable: true node001: nets: [dtnA, dtnB] config: - apps: - nmp: - enable: true - tls_enable: true + apps: + nmp: + enable: true + zeroconf: + offer: true + tls_enable: true node002: nets: [dtnB] config: - apps: - nmp: - enable: true - tls_enable: true + apps: + nmp: + enable: true + tls_enable: true diff --git a/container/run.py b/container/run.py index fb273c3..13268f6 100755 --- a/container/run.py +++ b/container/run.py @@ -263,11 +263,11 @@ def action(self, act): bp_rx_routes = extconfig.get('bp_rx_routes', []) bp_rx_routes += [ { - 'eid_pattern': r'dtn://{{node_name}}/.*', + 'eid_pattern': f'dtn://{node_name}/.*', 'action': 'deliver', }, { - 'eid_pattern': '".*"', + 'eid_pattern': '.*', 'action': 'forward', }, ] diff --git a/pyproject.toml b/pyproject.toml index e7c1f90..dcc6282 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ version = "0.0.0" authors = [ { name="Brian Sipos", email="brian.sipos+ietf@gmail.com" }, ] -description = "A demonstration agent for the DTN BPv7/TCPCLv4/UDPCL" +description = "A demonstration agent for the DTN BPv7/TCPCLv4/UDPCLv2" readme = "README.md" license = { text="LGPL-3" } requires-python = ">=3.7" @@ -29,7 +29,6 @@ dependencies = [ "cryptography >=0.9", "certvalidator", "dbus-python", - "lakers-python", "portion >=2.1", "psutil", "PyGObject >=3.34", # glib integration @@ -37,6 +36,7 @@ dependencies = [ "python3-dtls", "scapy >=2.4,<2.4.5", "six", + "zeroconf", ] [project.optional-dependencies] @@ -84,4 +84,4 @@ bp-agent = "bp.cmd:main" [project.urls] "Homepage" = "https://github.com/BrianSipos/dtn-demo-agent" -"Bug Tracker" = "https://github.com/BrianSipos/dtn-demo-agent/issues" \ No newline at end of file +"Bug Tracker" = "https://github.com/BrianSipos/dtn-demo-agent/issues" diff --git a/src/bp/agent.py b/src/bp/agent.py index 2013b17..58b2791 100644 --- a/src/bp/agent.py +++ b/src/bp/agent.py @@ -173,6 +173,12 @@ def exec_loop(self): # wait for graceful shutdown eloop.run() + def add_tx_route(self, item:TxRouteItem): + self._config.tx_route_table.append(item) + + def get_cla(self, name:str) -> bp.cla.AbstractAdaptor: + return self._cl_agent[name] + def _bus_name_changed(self, servname, old_owner, new_owner): for cl_agent in self._cl_agent.values(): if cl_agent.serv_name == servname: @@ -350,6 +356,7 @@ def _do_fwd(self): ctr.record_action('forward') except Exception as err: self._logger.error('Failed to forward bundle %s: %s', ctr.log_name(), err) + self._logger.debug('%s', traceback.format_exc()) ctr.record_action('delete', StatusReport.ReasonCode.NO_ROUTE) self._finish_bundle(ctr) @@ -435,7 +442,7 @@ def cl_attach(self, cltype, servname): except KeyError: raise ValueError('Invalid cltype: {}'.format(cltype)) - agent = cls() + agent = cls(agent=self) agent.serv_name = servname agent.peer_node_seen = self._cl_peer_node_seen(cltype) agent.recv_bundle_finish = self._cl_recv_bundle_finish(cltype) diff --git a/src/bp/app/__init__.py b/src/bp/app/__init__.py index c8626b3..6b90add 100644 --- a/src/bp/app/__init__.py +++ b/src/bp/app/__init__.py @@ -4,3 +4,4 @@ from . import fragment from . import bpsec from . import nmp +from . import zeroconf \ No newline at end of file diff --git a/src/bp/app/nmp.py b/src/bp/app/nmp.py index 62f5c57..8d55fad 100644 --- a/src/bp/app/nmp.py +++ b/src/bp/app/nmp.py @@ -1,4 +1,4 @@ -''' Application layer adaptors. +''' Prototype of Secure Advertisement and Neighborhood Discovery (SAND). ''' import cbor2 from dataclasses import dataclass, field, fields diff --git a/src/bp/app/zeroconf.py b/src/bp/app/zeroconf.py new file mode 100644 index 0000000..f3793f4 --- /dev/null +++ b/src/bp/app/zeroconf.py @@ -0,0 +1,168 @@ +''' Prototype of Zero-Configuration BP router discovery. +''' +from gi.repository import GLib as glib +import ipaddress +import logging +import random +import re +import socket +from typing import List +import ifaddr +from zeroconf import ( + Zeroconf, + ServiceInfo, + ServiceBrowser, + ServiceStateChange, +) + +from bp.config import Config, TxRouteItem +from bp.app.base import app, AbstractApplication + +LOGGER = logging.getLogger(__name__) + +SVCLOCAL = '_dtn-bundle._tcp.local.' +''' Global service name to register under ''' + + +@app('zeroconf') +class App(AbstractApplication): + + DBUS_IFACE = 'org.ietf.dtn.bp.zeroconf' + ''' Interface name ''' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._config = None + self._zco = None + self._browser = None + + def load_config(self, config:Config): + super().load_config(config) + self._config = config.apps.get(self._app_name, {}) + + self._zco = Zeroconf() + + if self._config.get('offer', False): + glib.timeout_add(1e3 * random.randint(5, 8), self._offer) + if self._config.get('enumerate', False): + glib.timeout_add(1e3 * random.randint(5, 8), self._enumerate) + + def _iface_addrs(self) -> List[ipaddress._IPAddressBase]: + all_addrs = [] + for adapter in ifaddr.get_adapters(): + for ipobj in adapter.ips: + if isinstance(ipobj.ip, tuple): + # ipv6 + addr = ipobj.ip[0] + else: + # ipv4 + addr = ipobj.ip + + addrobj = ipaddress.ip_address(addr) + if addrobj.is_loopback or addrobj.is_reserved: + continue + + all_addrs.append(addrobj) + + return all_addrs + + def _iface_nets(self) -> List[ipaddress._BaseNetwork]: + all_nets = [] + for adapter in ifaddr.get_adapters(): + for ipobj in adapter.ips: + if isinstance(ipobj.ip, tuple): + # ipv6 + addr = ipobj.ip[0] + else: + # ipv4 + addr = ipobj.ip + + ifaceobj = ipaddress.ip_interface(f'{addr}/{ipobj.network_prefix}') + if ifaceobj.ip.is_loopback or ifaceobj.ip.is_reserved: + continue + + all_nets.append(ifaceobj.network) + + return all_nets + + def _offer(self): + ''' Deferred async offer on mDNS. ''' + + hostname = ( + self._config.get('hostname') + or socket.gethostname().split('.', 1)[0] + ) + fqdn = hostname + '.local.' + + # Offer all reachable unicast addresses + all_addrs = self._iface_addrs() + # ignore when no usable addresses + if not all_addrs: + return False + + instname = self._config.get('instance', hostname) + instlocal = f'{instname}.{SVCLOCAL}' + + servinfo = ServiceInfo( + SVCLOCAL, + instlocal, + weight=1, + server=fqdn, + addresses=list(map(str, all_addrs)), + port=4556, + properties=dict( + txtvers=1, + protovers=4, + ), + ) + self._zco.register_service(servinfo) + LOGGER.info('mDNS registered as %s on %s', + instlocal, all_addrs) + + return False + + def _enumerate(self): + + self._browser = ServiceBrowser( + self._zco, + [SVCLOCAL], + handlers=[self._serv_state_change] + ) + + return False + + def _serv_state_change(self, zeroconf, service_type, name, state_change): + all_nets = self._iface_nets() + LOGGER.info('Iface nets: %s', all_nets) + + if state_change == ServiceStateChange.Added: + info = zeroconf.get_service_info(service_type, name) + LOGGER.info('Service added: %s', info) + + addr_objs = list(map(ipaddress.ip_address, info.addresses)) + LOGGER.info('Possible addresses: %s', addr_objs) + usable_addrs = [ + addr + for addr in addr_objs + if [net for net in all_nets if addr in net] + ] + LOGGER.info('Usable addresses: %s', usable_addrs) + if not usable_addrs: + return + best_addr = str(usable_addrs[0]) + best_port = int(info.port) + + route = TxRouteItem( + eid_pattern=re.compile(r'.*'), + next_nodeid=None, # FIXME needed? + cl_type='tcpcl', + raw_config=dict( + address=best_addr, + port=best_port, + ), + ) + LOGGER.info('Route item %s', route) + self._agent.add_tx_route(route) + + self._agent.get_cla('tcpcl').connect(best_addr, best_port) diff --git a/src/bp/cla.py b/src/bp/cla.py index 9102990..d0f2576 100644 --- a/src/bp/cla.py +++ b/src/bp/cla.py @@ -5,6 +5,8 @@ import dbus from gi.repository import GLib as glib from binascii import unhexlify +import re +from bp.config import TxRouteItem # Dictionary of CL types CL_TYPES = {} @@ -27,8 +29,9 @@ class AbstractAdaptor(ABC): :ivar agent_obj: The bus proxy object when it is valid. ''' - def __init__(self): + def __init__(self, agent): self._logger = logging.getLogger(__name__ + '.' + self.__class__.__name__) + self._agent = agent self.serv_name = None self.obj_path = None @@ -88,8 +91,8 @@ class UdpclAdaptor(AbstractAdaptor): TAG_ENC = unhexlify('d9d9f7') - def __init__(self): - AbstractAdaptor.__init__(self) + def __init__(self, **kwargs): + AbstractAdaptor.__init__(self, **kwargs) self.obj_path = '/org/ietf/dtn/udpcl/Agent' def _handle_polling_received(self, dtntime, interval_ms, node_id, address, port): @@ -138,8 +141,8 @@ class TcpclAdaptor(AbstractAdaptor): # Interface name DBUS_IFACE = 'org.ietf.dtn.tcpcl.Agent' - def __init__(self): - AbstractAdaptor.__init__(self) + def __init__(self, **kwargs): + AbstractAdaptor.__init__(self, **kwargs) self.obj_path = '/org/ietf/dtn/tcpcl/Agent' self.conns = set() @@ -177,7 +180,7 @@ def handle_recv_bundle_finish(bid, _length, result): data = conn_iface.recv_bundle_pop_data(bid) data = dbus.ByteArray(data) try: - self.recv_bundle_finish(data) + self.recv_bundle_finish(data, {}) except NotImplementedError: pass @@ -192,7 +195,9 @@ def handle_state_change(state): cl_conn.sess_params = params self._logger.debug('Session established with %s', cl_conn.nodeid) self._cl_conn_nodeid[cl_conn.nodeid] = cl_conn - self._conn_ready(cl_conn) + self._conn_ready(cl_conn, cl_conn.nodeid) + self._conn_ready(cl_conn, None) + self._reverse_route(cl_conn) conn_iface.connect_to_signal('session_state_changed', handle_state_change) state = conn_iface.get_session_state() @@ -207,37 +212,61 @@ def _conn_detach(self, conn_path): if cl_conn.nodeid: del self._cl_conn_nodeid[cl_conn.nodeid] - def _conn_ready(self, cl_conn): - ''' Handle a new connection being established. + def _conn_ready(self, cl_conn:'TcpclConnection', next_hop:str): + ''' Handle a new connection being established and + send any pending data for a next-hop. :param cl_conn: The connection object. - :type cl_conn: :py:cls:`TcpclConnection` + :param next_hop: The desired next-hop or None ''' - pend_data = self._sess_wait.get(cl_conn.nodeid) - if not pend_data: - return - cl_conn = self._cl_conn_nodeid[cl_conn.nodeid] + pend_data = self._sess_wait.get(next_hop, []) for data in pend_data: cl_conn.send_bundle_data(data) + def _reverse_route(self, cl_conn): + ''' Add reverse route to node on other side of connection. + ''' + route = TxRouteItem( + eid_pattern=re.compile(re.escape(cl_conn.nodeid) + r'.*'), + next_nodeid=cl_conn.nodeid, # FIXME needed? + cl_type='tcpcl', + raw_config=dict( + next_nodeid=cl_conn.nodeid, # allow session lookup + ), + ) + self._logger.info('Route item %s', route) + self._agent.add_tx_route(route) + + def connect(self, address:str, port:int): + ''' Initiate a connection preemptively. + ''' + self._logger.info('Connecting to %s:%d', address, port) + self.agent_obj.connect(address, port) + def send_bundle_func(self, tx_params:dict): - # Get an active session or create one if needed. - next_nodeid = tx_params['next_nodeid'] - address = tx_params['address'] - port = tx_params.get('port', 4556) + ''' Get an active session or create one if needed. + ''' + next_nodeid = tx_params.get('next_nodeid') def sender(data): - # Either send immeidately or put in TX queue - if next_nodeid in self._cl_conn_nodeid: + # Either send immediately or put in TX queue + if next_nodeid is None and len(self._cl_conn_nodeid) == 1: + nodeid, cl_conn = next(iter(self._cl_conn_nodeid.items())) + self._logger.info('Existing default session with %s', nodeid) + cl_conn.send_bundle_data(data) + elif next_nodeid in self._cl_conn_nodeid: self._logger.info('Existing session with %s', next_nodeid) cl_conn = self._cl_conn_nodeid[next_nodeid] cl_conn.send_bundle_data(data) else: + self._logger.info('Need session with %s', next_nodeid) if next_nodeid not in self._sess_wait: self._sess_wait[next_nodeid] = [] self._sess_wait[next_nodeid].append(data) if next_nodeid not in self._cl_conn_nodeid: + address = tx_params['address'] + port = tx_params.get('port', 4556) self._logger.info('Connecting to %s:%d', address, port) self.agent_obj.connect(address, port) diff --git a/zeroconf.md b/zeroconf.md new file mode 100644 index 0000000..e975163 --- /dev/null +++ b/zeroconf.md @@ -0,0 +1,15 @@ +Start the containers with: +``` +./container/run.py --config container/example-zeroconf.yaml delete prep start +``` +which will cause the mDNS offer and enumeration after a few (under 10) seconds. + +The traffic can be monitored with: +``` +wireshark -i br-dtnA -i br-dtnB -f 'port 4556 or port 1113 or port 5353' -Y tcpcl -k +``` + +Send a ping to exercise the routing with: +``` +docker container exec -it node000 dbus-send --system --print-reply --dest=org.ietf.dtn.node.bp /org/ietf/dtn/bp/Agent org.ietf.dtn.bp.Agent.ping string:"dtn://node002/srv" int32:32 +```