From 2b62277d4d194329f525b2800b311f61df439342 Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Sun, 28 Feb 2021 21:29:22 +0100 Subject: [PATCH 01/16] Adding multiple input parameters --- ArgusProbe_Beamlogic.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 18444fd..1bee33e 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -11,7 +11,7 @@ import Queue import traceback import datetime - +import argparse import paho.mqtt.publish import ArgusVersion @@ -42,7 +42,7 @@ def logCrash(threadName, err): #============================ classes ========================================= -class RxSnifferThread(threading.Thread): +class BeamLogic_RxSnifferThread(threading.Thread): """ Thread which attaches to the sniffer and parses incoming frames. """ @@ -65,7 +65,7 @@ def __init__(self, txMqttThread): # start the thread threading.Thread.__init__(self) - self.name = 'RxSnifferThread' + self.name = 'BeamLogic_RxSnifferThread' self.start() def run(self): @@ -320,12 +320,23 @@ def __init__(self): def main(): - # parse parameters + # parse args + parser = argparse.ArgumentParser() #creating an ArgumentParser object + parser.add_argument("--probetype", nargs="?", default="BeamLogic", choices=["BeamLogic", "Serial","OpenTestBed"]) + args = parser.parse_args() # start thread - txMqttThread = TxMqttThread() - rxSnifferThread = RxSnifferThread(txMqttThread) - cliThread = CliThread() + txMqttThread = TxMqttThread() + if args.probetype == "BeamLogic": + beamlogic_rxSnifferThread = BeamLogic_RxSnifferThread(txMqttThread) + elif args.probetype == "Serial": + pass + elif args.probetype == "OpenTestBed": + pass + else: + print('We do not support this probe type!') + + cliThread = CliThread() if __name__ == "__main__": main() From 06e42d1796eb4d4515aae219aee9daf151181b27 Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Tue, 2 Mar 2021 22:09:49 +0100 Subject: [PATCH 02/16] new Serial probe type --- ArgusProbe_Beamlogic.py | 115 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 110 insertions(+), 5 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 1bee33e..c6e1a66 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -13,7 +13,7 @@ import datetime import argparse import paho.mqtt.publish - +import serial import ArgusVersion #============================ helpers ========================================= @@ -227,12 +227,113 @@ def _get_ntp_timestamp(): diff = datetime.datetime.utcnow() - datetime.datetime(1900, 1, 1, 0, 0, 0) return diff.days * 24 * 60 * 60 + diff.seconds +#adding new class for Serial probe type +class Serial_RxSnifferThread(threading.Thread): + """ + Thread which attaches to the serial and put frames into queue. + """ + + def __init__(self, txMqttThread, serialport): + + # store params + self.txMqttThread = txMqttThread + self.serialport = serialport + self.baudrate = 115200 #Should i put it in input params? + + # local variables + self.serialHandler = None + self.rxBuffer = [] #keep the frame + self.goOn = True + self.pleaseConnect = True + self.dataLock = threading.RLock() + + + # initialize thread + super(Serial_RxSnifferThread, self).__init__() + self.name = 'Serial_RxSnifferThread@{0}'.format(self.serialport) + self.start() + + def run(self): + + time.sleep(1) # let the banners print + while self.goOn: + try: + with self.dataLock: + pleaseConnect = self.pleaseConnect + + if pleaseConnect: + # open serial port + self.serialHandler = serial.Serial(self.serialport, baudrate=self.baudrate) + + # read byte + while True: + waitingbytes = self.serialHandler.inWaiting() + if waitingbytes != 0: + c = self.serialHandler.read(waitingbytes) + self._newByte(c) + print (c) + time.sleep(0.2) + + except serial.SerialException: + # mote disconnected, or pyserialHandler closed + # destroy pyserial instance + print "WARNING: Could not read from serial at \"{0}\".".format( + self.serialport) + print "Is device connected?" + self.goOn = False + self.serialHandler = None + time.sleep(1) + + + except Exception as err: + logCrash(self.name, err) + + + #======================== public ========================================== + + def connectSerialPort(self): + with self.dataLock: + self.pleaseConnect = True + + def disconnectSerialPort(self): + with self.dataLock: + self.pleaseConnect = False + try: + self.serialHandler.close() + except: + pass + + def close(self): + self.goOn = False + #======================== public ========================================== + + #======================== private ========================================= + + def _newByte(self, b): + """ + Just received a byte from the sniffer + """ + with self.dataLock: + self.rxBuffer += [b] + + self._newFrame(self.rxBuffer) + self.rxBuffer = [] + + def _newFrame(self, frame): + """ + Just received a full frame from the sniffer + """ + # publish frame + self.txMqttThread.publishFrame(frame) + + +######################################################################################### class TxMqttThread(threading.Thread): """ Thread which publishes sniffed frames to the MQTT broker. """ - MQTT_BROKER_HOST = 'argus.paris.inria.fr' + MQTT_BROKER_HOST = 'argus.paris.inria.fr' #'broker.hivemq.com' for testing MQTT_BROKER_PORT = 1883 MQTT_BROKER_TOPIC = 'inria-paris/beamlogic' @@ -274,6 +375,7 @@ def run(self): hostname = self.MQTT_BROKER_HOST, port = self.MQTT_BROKER_PORT, ) + except Exception as err: print "WARNING publication to {0}:{1} over MQTT failed ({2})".format( self.MQTT_BROKER_HOST, @@ -281,6 +383,7 @@ def run(self): str(type(err)), ) + except Exception as err: logCrash(self.name, err) @@ -290,7 +393,7 @@ def publishFrame(self, frame): msg = { 'description': 'zep', 'device': 'Beamlogic', - 'bytes': ''.join(['{0:02x}'.format(b) for b in frame]), + 'bytes': frame #''.join(['{0:02x}'.format(b) for b in frame]), } try: self.txQueue.put(json.dumps(msg), block=False) @@ -308,6 +411,7 @@ def __init__(self): ArgusVersion.VERSION[1], ArgusVersion.VERSION[2], ArgusVersion.VERSION[3], + ) while True: @@ -323,6 +427,7 @@ def main(): # parse args parser = argparse.ArgumentParser() #creating an ArgumentParser object parser.add_argument("--probetype", nargs="?", default="BeamLogic", choices=["BeamLogic", "Serial","OpenTestBed"]) + parser.add_argument("--serialport", help= 'Input the serial port for the Serial probe type') args = parser.parse_args() # start thread @@ -330,11 +435,11 @@ def main(): if args.probetype == "BeamLogic": beamlogic_rxSnifferThread = BeamLogic_RxSnifferThread(txMqttThread) elif args.probetype == "Serial": - pass + serial_rxSnifferThread = Serial_RxSnifferThread(txMqttThread,args.serialport) elif args.probetype == "OpenTestBed": pass else: - print('We do not support this probe type!') + print('This probe type is not supported!') cliThread = CliThread() From da1d715db3a043847698a0831761757838eeb7de Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Tue, 9 Mar 2021 20:34:42 +0100 Subject: [PATCH 03/16] Cleaning new lines/necessary comments --- ArgusProbe_Beamlogic.py | 56 ++++++++++++----------------------------- 1 file changed, 16 insertions(+), 40 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index c6e1a66..38aa7fb 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -15,14 +15,13 @@ import paho.mqtt.publish import serial import ArgusVersion +import sys #============================ helpers ========================================= - def currentUtcTime(): return time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()) - def logCrash(threadName, err): output = [] output += ["============================================================="] @@ -227,22 +226,21 @@ def _get_ntp_timestamp(): diff = datetime.datetime.utcnow() - datetime.datetime(1900, 1, 1, 0, 0, 0) return diff.days * 24 * 60 * 60 + diff.seconds -#adding new class for Serial probe type class Serial_RxSnifferThread(threading.Thread): """ Thread which attaches to the serial and put frames into queue. """ - def __init__(self, txMqttThread, serialport): + def __init__(self, txMqttThread, serialport,baudrate): # store params self.txMqttThread = txMqttThread self.serialport = serialport - self.baudrate = 115200 #Should i put it in input params? + self.baudrate = baudrate # local variables self.serialHandler = None - self.rxBuffer = [] #keep the frame + self.rxBuffer = [] self.goOn = True self.pleaseConnect = True self.dataLock = threading.RLock() @@ -270,8 +268,7 @@ def run(self): waitingbytes = self.serialHandler.inWaiting() if waitingbytes != 0: c = self.serialHandler.read(waitingbytes) - self._newByte(c) - print (c) + self.txMqttThread.publishFrame(c) time.sleep(0.2) except serial.SerialException: @@ -284,7 +281,6 @@ def run(self): self.serialHandler = None time.sleep(1) - except Exception as err: logCrash(self.name, err) @@ -307,33 +303,13 @@ def close(self): self.goOn = False #======================== public ========================================== - #======================== private ========================================= - - def _newByte(self, b): - """ - Just received a byte from the sniffer - """ - with self.dataLock: - self.rxBuffer += [b] - - self._newFrame(self.rxBuffer) - self.rxBuffer = [] - - def _newFrame(self, frame): - """ - Just received a full frame from the sniffer - """ - # publish frame - self.txMqttThread.publishFrame(frame) - - ######################################################################################### class TxMqttThread(threading.Thread): """ Thread which publishes sniffed frames to the MQTT broker. """ - MQTT_BROKER_HOST = 'argus.paris.inria.fr' #'broker.hivemq.com' for testing + MQTT_BROKER_HOST = 'argus.paris.inria.fr' MQTT_BROKER_PORT = 1883 MQTT_BROKER_TOPIC = 'inria-paris/beamlogic' @@ -383,7 +359,6 @@ def run(self): str(type(err)), ) - except Exception as err: logCrash(self.name, err) @@ -411,7 +386,6 @@ def __init__(self): ArgusVersion.VERSION[1], ArgusVersion.VERSION[2], ArgusVersion.VERSION[3], - ) while True: @@ -424,24 +398,26 @@ def __init__(self): def main(): - # parse args + parser = argparse.ArgumentParser() #creating an ArgumentParser object - parser.add_argument("--probetype", nargs="?", default="BeamLogic", choices=["BeamLogic", "Serial","OpenTestBed"]) + parser.add_argument("--probetype", nargs="?", default="beamlogic", choices=["beamlogic", "serial","opentestbed"]) parser.add_argument("--serialport", help= 'Input the serial port for the Serial probe type') + parser.add_argument("--baudrate", default= 115200) args = parser.parse_args() # start thread - txMqttThread = TxMqttThread() - if args.probetype == "BeamLogic": + txMqttThread = TxMqttThread() + if args.probetype == "beamlogic": beamlogic_rxSnifferThread = BeamLogic_RxSnifferThread(txMqttThread) - elif args.probetype == "Serial": - serial_rxSnifferThread = Serial_RxSnifferThread(txMqttThread,args.serialport) - elif args.probetype == "OpenTestBed": + elif args.probetype == "serial": + serial_rxSnifferThread = Serial_RxSnifferThread(txMqttThread,args.serialport,args.baudrate) + elif args.probetype == "opentestbed": pass else: print('This probe type is not supported!') + sys.exit() - cliThread = CliThread() + cliThread = CliThread() if __name__ == "__main__": main() From b8875d13f211b8a474c7f671866c7e83e204764b Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Thu, 11 Mar 2021 21:03:51 +0100 Subject: [PATCH 04/16] Adding the code which handles a HDLC frame --- ArgusProbe_Beamlogic.py | 147 ++++++++++++++++++++++++++++++++++------ openhdlc.py | 139 +++++++++++++++++++++++++++++++++++++ 2 files changed, 264 insertions(+), 22 deletions(-) create mode 100644 openhdlc.py diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 38aa7fb..0aeaeb5 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -3,25 +3,30 @@ http://www.beamlogic.com/products/802154-site-analyzer.aspx """ -import time +import Queue +import argparse +import datetime +import json import struct -import socket +import sys import threading -import json -import Queue +import time import traceback -import datetime -import argparse + import paho.mqtt.publish import serial + import ArgusVersion -import sys +import openhdlc + #============================ helpers ========================================= + def currentUtcTime(): return time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()) + def logCrash(threadName, err): output = [] output += ["============================================================="] @@ -231,24 +236,43 @@ class Serial_RxSnifferThread(threading.Thread): Thread which attaches to the serial and put frames into queue. """ + XOFF = 0x13 + XON = 0x11 + XONXOFF_ESCAPE = 0x12 + XONXOFF_MASK = 0x10 + + # XOFF is transmitted as [XONXOFF_ESCAPE, XOFF^XONXOFF_MASK]==[0x12,0x13^0x10]==[0x12,0x03] + # XON is transmitted as [XONXOFF_ESCAPE, XON^XONXOFF_MASK]==[0x12,0x11^0x10]==[0x12,0x01] + # XONXOFF_ESCAPE is transmitted as [XONXOFF_ESCAPE, XONXOFF_ESCAPE^XONXOFF_MASK]==[0x12,0x12^0x10]==[0x12,0x02] + def __init__(self, txMqttThread, serialport,baudrate): # store params - self.txMqttThread = txMqttThread - self.serialport = serialport - self.baudrate = baudrate + self.txMqttThread = txMqttThread + self.serialport = serialport + self.baudrate = baudrate # local variables self.serialHandler = None - self.rxBuffer = [] self.goOn = True self.pleaseConnect = True self.dataLock = threading.RLock() + # hdlc frame parser object + self.hdlc = openhdlc.OpenHdlc() + #frame parsing variables + self.rxBuffer = '' + self.hdlc_flag = False + self.receiving = False + self.xonxoff_escaping = False + + + # to be assigned, callback + self.send_to_parser = None # initialize thread super(Serial_RxSnifferThread, self).__init__() - self.name = 'Serial_RxSnifferThread@{0}'.format(self.serialport) + self.name = 'Serial_RxSnifferThread@{0}'.format(self.serialport) self.start() def run(self): @@ -267,15 +291,16 @@ def run(self): while True: waitingbytes = self.serialHandler.inWaiting() if waitingbytes != 0: - c = self.serialHandler.read(waitingbytes) - self.txMqttThread.publishFrame(c) - time.sleep(0.2) + c= self.serialHandler.read(waitingbytes) + for byte in c: + self._newByte(byte) + time.sleep(2) except serial.SerialException: # mote disconnected, or pyserialHandler closed # destroy pyserial instance print "WARNING: Could not read from serial at \"{0}\".".format( - self.serialport) + self.serialport) print "Is device connected?" self.goOn = False self.serialHandler = None @@ -284,7 +309,6 @@ def run(self): except Exception as err: logCrash(self.name, err) - #======================== public ========================================== def connectSerialPort(self): @@ -303,6 +327,88 @@ def close(self): self.goOn = False #======================== public ========================================== + #======================== private ========================================= + def _rx_buf_add(self, byte): + """ Adds byte to buffer and escapes the XONXOFF bytes """ + + if byte == chr(self.XONXOFF_ESCAPE): + self.xonxoff_escaping = True + else: + if self.xonxoff_escaping is True: + self.rxBuffer += chr(ord(byte) ^ self.XONXOFF_MASK) + self.xonxoff_escaping = False + elif byte != chr(self.XON) and byte != chr(self.XOFF): + self.rxBuffer += byte + + def _handle_frame(self): + """ Handles a HDLC frame """ + valid_frame = False + #temp_buf = self.rxBuffer in case of an error + try: + self.rxBuffer = self.hdlc.dehdlcify(self.rxBuffer) + ''' + if log.isEnabledFor(logging.DEBUG): + log.debug("{}: {} dehdlcized input: {}".format( + self.name, + format_string_buf(temp_buf), + format_string_buf(self.rxBuffer))) + ''' + if self.send_to_parser: + self.send_to_parser([ord(c) for c in self.rxBuffer]) + + valid_frame = True + except openhdlc.HdlcException as err: + #log.warning('{}: invalid serial frame: {} {}'.format(self.name, format_string_buf(temp_buf), err)) + print 'Err' + return valid_frame + + def _newByte(self, b): + """ + Parses bytes received from serial pipe + """ + if not self.receiving: + if self.hdlc_flag and b != self.hdlc.HDLC_FLAG: + # start of frame + print ('Start of HDLC frame..') + self.receiving = True + # discard received self.hdlc_flag + self.hdlc_flag = False + self.xonxoff_escaping = False + self.rxBuffer = self.hdlc.HDLC_FLAG + self._rx_buf_add(b) + elif b == self.hdlc.HDLC_FLAG: + # received hdlc flag + self.hdlc_flag = True + else: + # drop garbage + pass + else: + if b != self.hdlc.HDLC_FLAG: + # middle of frame + self._rx_buf_add(b) + else: + # end of frame, received self.hdlc_flag + print ("End of HDLC frame ..") + self.hdlc_flag = True + self.receiving = False + self._rx_buf_add(b) + valid_frame = self._handle_frame() + + if valid_frame: + # discard valid frame self.hdlc_flag + self.hdlc_flag = False + self._newFrame(self.rxBuffer) + self.rxBuffer = [] + + def _newFrame(self, frame): + """ + Just received a full frame from the sniffer + """ + # publish frame + #self.txMqttThread.publishFrame(frame) + pass + + ######################################################################################### class TxMqttThread(threading.Thread): """ @@ -351,7 +457,6 @@ def run(self): hostname = self.MQTT_BROKER_HOST, port = self.MQTT_BROKER_PORT, ) - except Exception as err: print "WARNING publication to {0}:{1} over MQTT failed ({2})".format( self.MQTT_BROKER_HOST, @@ -368,7 +473,7 @@ def publishFrame(self, frame): msg = { 'description': 'zep', 'device': 'Beamlogic', - 'bytes': frame #''.join(['{0:02x}'.format(b) for b in frame]), + 'bytes': ''.join(['{0:02x}'.format(b) for b in frame]), } try: self.txQueue.put(json.dumps(msg), block=False) @@ -396,9 +501,7 @@ def __init__(self): #============================ main ============================================ - def main(): - parser = argparse.ArgumentParser() #creating an ArgumentParser object parser.add_argument("--probetype", nargs="?", default="beamlogic", choices=["beamlogic", "serial","opentestbed"]) parser.add_argument("--serialport", help= 'Input the serial port for the Serial probe type') @@ -417,7 +520,7 @@ def main(): print('This probe type is not supported!') sys.exit() - cliThread = CliThread() + cliThread = CliThread() if __name__ == "__main__": main() diff --git a/openhdlc.py b/openhdlc.py new file mode 100644 index 0000000..904c3ba --- /dev/null +++ b/openhdlc.py @@ -0,0 +1,139 @@ +""" +HDLC framing module. + +.. moduleauthor:: Min Ting + October 2012 +""" + +import logging + +#from openvisualizer.utils import format_string_buf + +log = logging.getLogger('OpenHdlc') +log.setLevel(logging.ERROR) +log.addHandler(logging.NullHandler()) + + +class HdlcException(Exception): + pass + + +class OpenHdlc(object): + HDLC_FLAG = '\x7e' + HDLC_FLAG_ESCAPED = '\x5e' + HDLC_ESCAPE = '\x7d' + HDLC_ESCAPE_ESCAPED = '\x5d' + HDLC_CRCINIT = 0xffff + HDLC_CRCGOOD = 0xf0b8 + + FCS16TAB = ( + 0x0000, 0x1189, 0x2312, 0x329b, 0x4624, 0x57ad, 0x6536, 0x74bf, + 0x8c48, 0x9dc1, 0xaf5a, 0xbed3, 0xca6c, 0xdbe5, 0xe97e, 0xf8f7, + 0x1081, 0x0108, 0x3393, 0x221a, 0x56a5, 0x472c, 0x75b7, 0x643e, + 0x9cc9, 0x8d40, 0xbfdb, 0xae52, 0xdaed, 0xcb64, 0xf9ff, 0xe876, + 0x2102, 0x308b, 0x0210, 0x1399, 0x6726, 0x76af, 0x4434, 0x55bd, + 0xad4a, 0xbcc3, 0x8e58, 0x9fd1, 0xeb6e, 0xfae7, 0xc87c, 0xd9f5, + 0x3183, 0x200a, 0x1291, 0x0318, 0x77a7, 0x662e, 0x54b5, 0x453c, + 0xbdcb, 0xac42, 0x9ed9, 0x8f50, 0xfbef, 0xea66, 0xd8fd, 0xc974, + 0x4204, 0x538d, 0x6116, 0x709f, 0x0420, 0x15a9, 0x2732, 0x36bb, + 0xce4c, 0xdfc5, 0xed5e, 0xfcd7, 0x8868, 0x99e1, 0xab7a, 0xbaf3, + 0x5285, 0x430c, 0x7197, 0x601e, 0x14a1, 0x0528, 0x37b3, 0x263a, + 0xdecd, 0xcf44, 0xfddf, 0xec56, 0x98e9, 0x8960, 0xbbfb, 0xaa72, + 0x6306, 0x728f, 0x4014, 0x519d, 0x2522, 0x34ab, 0x0630, 0x17b9, + 0xef4e, 0xfec7, 0xcc5c, 0xddd5, 0xa96a, 0xb8e3, 0x8a78, 0x9bf1, + 0x7387, 0x620e, 0x5095, 0x411c, 0x35a3, 0x242a, 0x16b1, 0x0738, + 0xffcf, 0xee46, 0xdcdd, 0xcd54, 0xb9eb, 0xa862, 0x9af9, 0x8b70, + 0x8408, 0x9581, 0xa71a, 0xb693, 0xc22c, 0xd3a5, 0xe13e, 0xf0b7, + 0x0840, 0x19c9, 0x2b52, 0x3adb, 0x4e64, 0x5fed, 0x6d76, 0x7cff, + 0x9489, 0x8500, 0xb79b, 0xa612, 0xd2ad, 0xc324, 0xf1bf, 0xe036, + 0x18c1, 0x0948, 0x3bd3, 0x2a5a, 0x5ee5, 0x4f6c, 0x7df7, 0x6c7e, + 0xa50a, 0xb483, 0x8618, 0x9791, 0xe32e, 0xf2a7, 0xc03c, 0xd1b5, + 0x2942, 0x38cb, 0x0a50, 0x1bd9, 0x6f66, 0x7eef, 0x4c74, 0x5dfd, + 0xb58b, 0xa402, 0x9699, 0x8710, 0xf3af, 0xe226, 0xd0bd, 0xc134, + 0x39c3, 0x284a, 0x1ad1, 0x0b58, 0x7fe7, 0x6e6e, 0x5cf5, 0x4d7c, + 0xc60c, 0xd785, 0xe51e, 0xf497, 0x8028, 0x91a1, 0xa33a, 0xb2b3, + 0x4a44, 0x5bcd, 0x6956, 0x78df, 0x0c60, 0x1de9, 0x2f72, 0x3efb, + 0xd68d, 0xc704, 0xf59f, 0xe416, 0x90a9, 0x8120, 0xb3bb, 0xa232, + 0x5ac5, 0x4b4c, 0x79d7, 0x685e, 0x1ce1, 0x0d68, 0x3ff3, 0x2e7a, + 0xe70e, 0xf687, 0xc41c, 0xd595, 0xa12a, 0xb0a3, 0x8238, 0x93b1, + 0x6b46, 0x7acf, 0x4854, 0x59dd, 0x2d62, 0x3ceb, 0x0e70, 0x1ff9, + 0xf78f, 0xe606, 0xd49d, 0xc514, 0xb1ab, 0xa022, 0x92b9, 0x8330, + 0x7bc7, 0x6a4e, 0x58d5, 0x495c, 0x3de3, 0x2c6a, 0x1ef1, 0x0f78, + ) + + # ============================ public ====================================== + + def hdlcify(self, in_buf): + """ + Build an hdlc frame. + + Use 0x00 for both addr byte, and control byte. + """ + + # make copy of input + out_buf = in_buf[:] + + # calculate CRC + crc = self.HDLC_CRCINIT + for b in out_buf: + crc = self._crc_iteration(crc, b) + crc = 0xffff - crc + + # append CRC + out_buf = out_buf + chr(crc & 0xff) + chr((crc & 0xff00) >> 8) + + # stuff bytes + out_buf = out_buf.replace(self.HDLC_ESCAPE, self.HDLC_ESCAPE + self.HDLC_ESCAPE_ESCAPED) + out_buf = out_buf.replace(self.HDLC_FLAG, self.HDLC_ESCAPE + self.HDLC_FLAG_ESCAPED) + + # add flags + out_buf = self.HDLC_FLAG + out_buf + self.HDLC_FLAG + + return out_buf + + def dehdlcify(self, in_buf): + """ + Parse an hdlc frame. + + :returns: the extracted frame, or -1 if wrong checksum + """ + assert in_buf[0] == self.HDLC_FLAG + assert in_buf[-1] == self.HDLC_FLAG + + # make copy of input + out_buf = in_buf[:] + if log.isEnabledFor(logging.DEBUG): + log.debug("got {0}".format(format_string_buf(out_buf))) + + # remove flags + out_buf = out_buf[1:-1] + if log.isEnabledFor(logging.DEBUG): + log.debug("after flags: {0}".format(format_string_buf(out_buf))) + + # unstuff + out_buf = out_buf.replace(self.HDLC_ESCAPE + self.HDLC_FLAG_ESCAPED, self.HDLC_FLAG) + out_buf = out_buf.replace(self.HDLC_ESCAPE + self.HDLC_ESCAPE_ESCAPED, self.HDLC_ESCAPE) + if log.isEnabledFor(logging.DEBUG): + log.debug("after unstuff: {0}".format(format_string_buf(out_buf))) + + if len(out_buf) < 2: + raise HdlcException('packet too short') + + # check CRC + crc = self.HDLC_CRCINIT + for b in out_buf: + crc = self._crc_iteration(crc, b) + if crc != self.HDLC_CRCGOOD: + raise HdlcException('wrong CRC') + + # remove CRC + out_buf = out_buf[:-2] # remove CRC + #if log.isEnabledFor(logging.DEBUG): + #log.debug("after CRC: {0}".format(format_string_buf(out_buf))) + + return out_buf + + # ============================ private ===================================== + + def _crc_iteration(self, crc, b): + return (crc >> 8) ^ self.FCS16TAB[((crc ^ (ord(b))) & 0xff)] From dcc33f8b5b807405e670d416487a75846f5311eb Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Thu, 25 Mar 2021 20:38:51 +0100 Subject: [PATCH 05/16] Packets that start with 'P' are our interest scope --- ArgusProbe_Beamlogic.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 0aeaeb5..8abffe0 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -343,20 +343,14 @@ def _rx_buf_add(self, byte): def _handle_frame(self): """ Handles a HDLC frame """ valid_frame = False - #temp_buf = self.rxBuffer in case of an error try: self.rxBuffer = self.hdlc.dehdlcify(self.rxBuffer) - ''' - if log.isEnabledFor(logging.DEBUG): - log.debug("{}: {} dehdlcized input: {}".format( - self.name, - format_string_buf(temp_buf), - format_string_buf(self.rxBuffer))) - ''' + if self.send_to_parser: self.send_to_parser([ord(c) for c in self.rxBuffer]) + if self.rxBuffer[0] == 'P': #packet from sniffer SERFRAME_MOTE2PC_SNIFFED_PACKET 'P' + valid_frame = True - valid_frame = True except openhdlc.HdlcException as err: #log.warning('{}: invalid serial frame: {} {}'.format(self.name, format_string_buf(temp_buf), err)) print 'Err' @@ -369,7 +363,7 @@ def _newByte(self, b): if not self.receiving: if self.hdlc_flag and b != self.hdlc.HDLC_FLAG: # start of frame - print ('Start of HDLC frame..') + #print ('Start of HDLC frame..') self.receiving = True # discard received self.hdlc_flag self.hdlc_flag = False @@ -388,7 +382,7 @@ def _newByte(self, b): self._rx_buf_add(b) else: # end of frame, received self.hdlc_flag - print ("End of HDLC frame ..") + #print ("End of HDLC frame ..") self.hdlc_flag = True self.receiving = False self._rx_buf_add(b) From b43d6b97747171f49085855e42b0dd9f5bc3c823 Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Thu, 25 Mar 2021 20:58:36 +0100 Subject: [PATCH 06/16] Parsing incoming frames --- ArgusProbe_Beamlogic.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 8abffe0..4748aa0 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -266,6 +266,8 @@ def __init__(self, txMqttThread, serialport,baudrate): self.receiving = False self.xonxoff_escaping = False + self.status = '' + self.HEADER_LENGTH = 2 # to be assigned, callback self.send_to_parser = None @@ -356,6 +358,23 @@ def _handle_frame(self): print 'Err' return valid_frame + def parse_input(self, data): + + # ensure data not short longer than header + if len(data) < self.HEADER_LENGTH: + raise 'Error packet length' + + _ = data[:2] # header bytes + + # remove mote id at the beginning. + data = data[2:] + + event_type = 'sniffedPacket' + + # notify a tuple including source as one hop away nodes elide SRC address as can be inferred from MAC layer + # header + return event_type,data + def _newByte(self, b): """ Parses bytes received from serial pipe @@ -398,6 +417,9 @@ def _newFrame(self, frame): """ Just received a full frame from the sniffer """ + #Parse incomming frame + frame, self.status = self.parse_input(frame) + # publish frame #self.txMqttThread.publishFrame(frame) pass From 8d9bd6b9a130c9332a71587134a040152a2f6899 Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Wed, 31 Mar 2021 22:53:25 +0200 Subject: [PATCH 07/16] Adding the ZEP header --- ArgusProbe_Beamlogic.py | 110 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 99 insertions(+), 11 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 4748aa0..ac18784 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -235,16 +235,47 @@ class Serial_RxSnifferThread(threading.Thread): """ Thread which attaches to the serial and put frames into queue. """ + # XOFF is transmitted as [XONXOFF_ESCAPE, XOFF^XONXOFF_MASK]==[0x12,0x13^0x10]==[0x12,0x03] + # XON is transmitted as [XONXOFF_ESCAPE, XON^XONXOFF_MASK]==[0x12,0x11^0x10]==[0x12,0x01] + # XONXOFF_ESCAPE is transmitted as [XONXOFF_ESCAPE, XONXOFF_ESCAPE^XONXOFF_MASK]==[0x12,0x12^0x10]==[0x12,0x02] XOFF = 0x13 XON = 0x11 XONXOFF_ESCAPE = 0x12 XONXOFF_MASK = 0x10 - - # XOFF is transmitted as [XONXOFF_ESCAPE, XOFF^XONXOFF_MASK]==[0x12,0x13^0x10]==[0x12,0x03] - # XON is transmitted as [XONXOFF_ESCAPE, XON^XONXOFF_MASK]==[0x12,0x11^0x10]==[0x12,0x01] - # XONXOFF_ESCAPE is transmitted as [XONXOFF_ESCAPE, XONXOFF_ESCAPE^XONXOFF_MASK]==[0x12,0x12^0x10]==[0x12,0x02] - + FCS16TAB = (0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, + 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, + 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, + 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, + 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, + 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, + 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, + 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, + 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, + 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, + 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, + 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, + 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, + 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, + 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, + 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, + 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, + 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, + 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, + 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, + 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, + 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, + 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, + 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, + 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, + 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, + 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, + 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, + 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, + 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, + 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, + 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, + ) def __init__(self, txMqttThread, serialport,baudrate): # store params @@ -266,7 +297,6 @@ def __init__(self, txMqttThread, serialport,baudrate): self.receiving = False self.xonxoff_escaping = False - self.status = '' self.HEADER_LENGTH = 2 # to be assigned, callback @@ -355,7 +385,7 @@ def _handle_frame(self): except openhdlc.HdlcException as err: #log.warning('{}: invalid serial frame: {} {}'.format(self.name, format_string_buf(temp_buf), err)) - print 'Err' + print 'HDLC Exception' return valid_frame def parse_input(self, data): @@ -373,7 +403,7 @@ def parse_input(self, data): # notify a tuple including source as one hop away nodes elide SRC address as can be inferred from MAC layer # header - return event_type,data + return data def _newByte(self, b): """ @@ -382,7 +412,6 @@ def _newByte(self, b): if not self.receiving: if self.hdlc_flag and b != self.hdlc.HDLC_FLAG: # start of frame - #print ('Start of HDLC frame..') self.receiving = True # discard received self.hdlc_flag self.hdlc_flag = False @@ -401,7 +430,6 @@ def _newByte(self, b): self._rx_buf_add(b) else: # end of frame, received self.hdlc_flag - #print ("End of HDLC frame ..") self.hdlc_flag = True self.receiving = False self._rx_buf_add(b) @@ -411,7 +439,7 @@ def _newByte(self, b): # discard valid frame self.hdlc_flag self.hdlc_flag = False self._newFrame(self.rxBuffer) - self.rxBuffer = [] + #self.rxBuffer = [] def _newFrame(self, frame): """ @@ -420,10 +448,70 @@ def _newFrame(self, frame): #Parse incomming frame frame, self.status = self.parse_input(frame) + # transform frame + frame = self._transformFrame(frame) + # publish frame #self.txMqttThread.publishFrame(frame) pass + def _transformFrame(self, frame): + """ + Add ZEP header + """ + body = frame[0:-3] + _ = frame[-3:-1] # crc + frequency = frame[-1] + + zep = self._formatZep(body,frequency) + #frame = self._dispatch_mesh_debug_packet(zep) + return frame + + def _formatZep (self, body, frequency): + # ZEP header + zep = [ord('E'), ord('X')] # Protocol ID String + zep += [0x02] # Protocol Version + zep += [0x01] # Type + zep += [ord(frequency)] # Channel ID int? + zep += [0x00, 0x01] # Device ID + zep += [0x01] # LQI/CRC mode + zep += [0xff] + zep += [0x01] * 8 # timestamp + zep += [0x02] * 4 # sequence number + zep += [0x00] * 10 # reserved + zep += [len(body) + 2] # length + + # mac frame + mac = body + mac += self.calculate_fcs(mac) + return ''.join(map(str,zep)) + mac + + def calculate_fcs(self,rpayload): + payload = [] + a = '' + for b in rpayload: + payload += [self.byteinverse(ord(b))] + crc = 0x0000 + for b in payload: + crc = ((crc << 8) & 0xffff) ^ self.FCS16TAB[((crc >> 8) ^ b) & 0xff] + return_val = [ + self.byteinverse(crc >> 8), + self.byteinverse(crc & 0xff), + ] + + return a.join(map(str, return_val)) + + def byteinverse(self,b): + # TODO: speed up through lookup table + + rb = 0 + for pos in range(8): + if b & (1 << pos) != 0: #check this out + bitval = 1 + else: + bitval = 0 + rb |= bitval << (7 - pos) + return rb ######################################################################################### class TxMqttThread(threading.Thread): From d28f166ccc3330b5a4c368d20bd0158c84d90241 Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Thu, 1 Apr 2021 23:47:44 +0200 Subject: [PATCH 08/16] Correcting the type of the rxBuffer --- ArgusProbe_Beamlogic.py | 22 ++++++++++------------ openhdlc.py | 7 +++++-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index ac18784..50e2898 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -292,7 +292,7 @@ def __init__(self, txMqttThread, serialport,baudrate): # hdlc frame parser object self.hdlc = openhdlc.OpenHdlc() #frame parsing variables - self.rxBuffer = '' + self.rxBuffer = [] self.hdlc_flag = False self.receiving = False self.xonxoff_escaping = False @@ -367,10 +367,10 @@ def _rx_buf_add(self, byte): self.xonxoff_escaping = True else: if self.xonxoff_escaping is True: - self.rxBuffer += chr(ord(byte) ^ self.XONXOFF_MASK) + self.rxBuffer.append(chr(ord(byte) ^ self.XONXOFF_MASK)) self.xonxoff_escaping = False elif byte != chr(self.XON) and byte != chr(self.XOFF): - self.rxBuffer += byte + self.rxBuffer.append(byte) def _handle_frame(self): """ Handles a HDLC frame """ @@ -416,10 +416,11 @@ def _newByte(self, b): # discard received self.hdlc_flag self.hdlc_flag = False self.xonxoff_escaping = False - self.rxBuffer = self.hdlc.HDLC_FLAG + self.rxBuffer.append(self.hdlc.HDLC_FLAG) self._rx_buf_add(b) elif b == self.hdlc.HDLC_FLAG: # received hdlc flag + self.rxBuffer = [] # Start of the frame, reset self.hdlc_flag = True else: # drop garbage @@ -439,18 +440,17 @@ def _newByte(self, b): # discard valid frame self.hdlc_flag self.hdlc_flag = False self._newFrame(self.rxBuffer) - #self.rxBuffer = [] + self.rxBuffer = [] def _newFrame(self, frame): """ Just received a full frame from the sniffer """ #Parse incomming frame - frame, self.status = self.parse_input(frame) + frame = self.parse_input(frame) # transform frame frame = self._transformFrame(frame) - # publish frame #self.txMqttThread.publishFrame(frame) pass @@ -465,7 +465,7 @@ def _transformFrame(self, frame): zep = self._formatZep(body,frequency) #frame = self._dispatch_mesh_debug_packet(zep) - return frame + return zep #at the moment def _formatZep (self, body, frequency): # ZEP header @@ -484,11 +484,10 @@ def _formatZep (self, body, frequency): # mac frame mac = body mac += self.calculate_fcs(mac) - return ''.join(map(str,zep)) + mac + return zep + mac def calculate_fcs(self,rpayload): payload = [] - a = '' for b in rpayload: payload += [self.byteinverse(ord(b))] crc = 0x0000 @@ -498,8 +497,7 @@ def calculate_fcs(self,rpayload): self.byteinverse(crc >> 8), self.byteinverse(crc & 0xff), ] - - return a.join(map(str, return_val)) + return return_val def byteinverse(self,b): # TODO: speed up through lookup table diff --git a/openhdlc.py b/openhdlc.py index 904c3ba..362f764 100644 --- a/openhdlc.py +++ b/openhdlc.py @@ -111,8 +111,11 @@ def dehdlcify(self, in_buf): log.debug("after flags: {0}".format(format_string_buf(out_buf))) # unstuff - out_buf = out_buf.replace(self.HDLC_ESCAPE + self.HDLC_FLAG_ESCAPED, self.HDLC_FLAG) - out_buf = out_buf.replace(self.HDLC_ESCAPE + self.HDLC_ESCAPE_ESCAPED, self.HDLC_ESCAPE) + #out_buf = out_buf.replace(self.HDLC_ESCAPE + self.HDLC_FLAG_ESCAPED, self.HDLC_FLAG) + #out_buf = out_buf.replace(self.HDLC_ESCAPE + self.HDLC_ESCAPE_ESCAPED, self.HDLC_ESCAPE) + out_buf = [self.HDLC_FLAG if x== self.HDLC_ESCAPE + self.HDLC_FLAG_ESCAPED else x for x in out_buf] + out_buf = [self.HDLC_ESCAPE if x==self.HDLC_ESCAPE + self.HDLC_ESCAPE_ESCAPED else x for x in out_buf] + if log.isEnabledFor(logging.DEBUG): log.debug("after unstuff: {0}".format(format_string_buf(out_buf))) From 178d1fd82d9eecd7fbf72b249f096b52a191cd3e Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Fri, 2 Apr 2021 11:35:18 +0200 Subject: [PATCH 09/16] Changing the type of the list values in frame with zep header --- ArgusProbe_Beamlogic.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 50e2898..ffde2fe 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -235,14 +235,15 @@ class Serial_RxSnifferThread(threading.Thread): """ Thread which attaches to the serial and put frames into queue. """ - # XOFF is transmitted as [XONXOFF_ESCAPE, XOFF^XONXOFF_MASK]==[0x12,0x13^0x10]==[0x12,0x03] - # XON is transmitted as [XONXOFF_ESCAPE, XON^XONXOFF_MASK]==[0x12,0x11^0x10]==[0x12,0x01] + # XOFF is transmitted as [XONXOFF_ESCAPE, XOFF^XONXOFF_MASK]==[0x12,0x13^0x10]==[0x12,0x03] + # XON is transmitted as [XONXOFF_ESCAPE, XON^XONXOFF_MASK]==[0x12,0x11^0x10]==[0x12,0x01] # XONXOFF_ESCAPE is transmitted as [XONXOFF_ESCAPE, XONXOFF_ESCAPE^XONXOFF_MASK]==[0x12,0x12^0x10]==[0x12,0x02] XOFF = 0x13 XON = 0x11 XONXOFF_ESCAPE = 0x12 XONXOFF_MASK = 0x10 + FCS16TAB = (0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, @@ -380,6 +381,7 @@ def _handle_frame(self): if self.send_to_parser: self.send_to_parser([ord(c) for c in self.rxBuffer]) + if self.rxBuffer[0] == 'P': #packet from sniffer SERFRAME_MOTE2PC_SNIFFED_PACKET 'P' valid_frame = True @@ -399,10 +401,6 @@ def parse_input(self, data): # remove mote id at the beginning. data = data[2:] - event_type = 'sniffedPacket' - - # notify a tuple including source as one hop away nodes elide SRC address as can be inferred from MAC layer - # header return data def _newByte(self, b): @@ -440,7 +438,7 @@ def _newByte(self, b): # discard valid frame self.hdlc_flag self.hdlc_flag = False self._newFrame(self.rxBuffer) - self.rxBuffer = [] + self.rxBuffer = [] def _newFrame(self, frame): """ @@ -482,14 +480,14 @@ def _formatZep (self, body, frequency): zep += [len(body) + 2] # length # mac frame - mac = body + mac = [ord(i) for i in body] mac += self.calculate_fcs(mac) return zep + mac def calculate_fcs(self,rpayload): payload = [] for b in rpayload: - payload += [self.byteinverse(ord(b))] + payload += [self.byteinverse(b)] crc = 0x0000 for b in payload: crc = ((crc << 8) & 0xffff) ^ self.FCS16TAB[((crc >> 8) ^ b) & 0xff] From beeb7e7c5f497421496f14a1b8d16a932ae5e862 Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Sat, 3 Apr 2021 13:24:08 +0200 Subject: [PATCH 10/16] Adding the TUN interface --- ArgusProbe_Beamlogic.py | 54 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index ffde2fe..61d640e 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -19,6 +19,9 @@ import ArgusVersion import openhdlc +from scapy.compat import raw +from scapy.layers.inet import UDP +from scapy.layers.inet6 import IPv6 #============================ helpers ========================================= @@ -277,6 +280,10 @@ class Serial_RxSnifferThread(threading.Thread): 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, ) + + IPV6PREFIX = [0xbb, 0xbb, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + IPV6HOST = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01] + def __init__(self, txMqttThread, serialport,baudrate): # store params @@ -449,9 +456,9 @@ def _newFrame(self, frame): # transform frame frame = self._transformFrame(frame) + # publish frame - #self.txMqttThread.publishFrame(frame) - pass + self.txMqttThread.publishFrame(frame) def _transformFrame(self, frame): """ @@ -462,8 +469,8 @@ def _transformFrame(self, frame): frequency = frame[-1] zep = self._formatZep(body,frequency) - #frame = self._dispatch_mesh_debug_packet(zep) - return zep #at the moment + frame = self._dispatch_mesh_debug_packet(zep) + return frame def _formatZep (self, body, frequency): # ZEP header @@ -509,7 +516,44 @@ def byteinverse(self,b): rb |= bitval << (7 - pos) return rb -######################################################################################### + def _dispatch_mesh_debug_packet(self, zep): + """ + Wraps ZEP-based debug packet, for outgoing mesh 6LoWPAN message, with UDP and IPv6 headers. Then forwards as + an event to the Internet interface. + """ + + udp = UDP(sport=0, dport=17754) + udp.add_payload("".join([chr(i) for i in zep])) + + # Common address for source and destination + addr = [] + addr += self.IPV6PREFIX + addr += self.IPV6HOST + addr = self.format_ipv6_addr(addr) + + # IP + ip = IPv6(version=6, tc=0, src=addr, hlim=64, dst=addr) + ip = ip / udp + + data = [ord(b) for b in raw(ip)] + return data + + def buf2int(self,buf): + """ + Converts some consecutive bytes of a buffer into an integer. + Big-endianness is assumed. + :param buf: Byte array. + """ + return_val = 0 + for i in range(len(buf)): + return_val += buf[i] << (8 * (len(buf) - i - 1)) + return return_val + + def format_ipv6_addr(self,addr): + # group by 2 bytes + addr = [self.buf2int(addr[2 * i:2 * i + 2]) for i in range(len(addr) / 2)] + return ':'.join(["%x" % b for b in addr]) + class TxMqttThread(threading.Thread): """ Thread which publishes sniffed frames to the MQTT broker. From 96727efaac7ac0853307ef0e0814b6f4cb5ba31c Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Thu, 8 Apr 2021 22:41:23 +0200 Subject: [PATCH 11/16] Remove TUN interface because ArgusClient already has it --- ArgusProbe_Beamlogic.py | 43 ++--------------------------------------- 1 file changed, 2 insertions(+), 41 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 61d640e..dcc9476 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -456,7 +456,7 @@ def _newFrame(self, frame): # transform frame frame = self._transformFrame(frame) - + # publish frame self.txMqttThread.publishFrame(frame) @@ -469,8 +469,7 @@ def _transformFrame(self, frame): frequency = frame[-1] zep = self._formatZep(body,frequency) - frame = self._dispatch_mesh_debug_packet(zep) - return frame + return zep def _formatZep (self, body, frequency): # ZEP header @@ -516,44 +515,6 @@ def byteinverse(self,b): rb |= bitval << (7 - pos) return rb - def _dispatch_mesh_debug_packet(self, zep): - """ - Wraps ZEP-based debug packet, for outgoing mesh 6LoWPAN message, with UDP and IPv6 headers. Then forwards as - an event to the Internet interface. - """ - - udp = UDP(sport=0, dport=17754) - udp.add_payload("".join([chr(i) for i in zep])) - - # Common address for source and destination - addr = [] - addr += self.IPV6PREFIX - addr += self.IPV6HOST - addr = self.format_ipv6_addr(addr) - - # IP - ip = IPv6(version=6, tc=0, src=addr, hlim=64, dst=addr) - ip = ip / udp - - data = [ord(b) for b in raw(ip)] - return data - - def buf2int(self,buf): - """ - Converts some consecutive bytes of a buffer into an integer. - Big-endianness is assumed. - :param buf: Byte array. - """ - return_val = 0 - for i in range(len(buf)): - return_val += buf[i] << (8 * (len(buf) - i - 1)) - return return_val - - def format_ipv6_addr(self,addr): - # group by 2 bytes - addr = [self.buf2int(addr[2 * i:2 * i + 2]) for i in range(len(addr) / 2)] - return ':'.join(["%x" % b for b in addr]) - class TxMqttThread(threading.Thread): """ Thread which publishes sniffed frames to the MQTT broker. From bd0935fe78d0f5b18e0dec7578c3352e5d919663 Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Fri, 9 Apr 2021 10:18:57 +0200 Subject: [PATCH 12/16] Removing the first byte of the decoded HDLC frame ('P') --- ArgusProbe_Beamlogic.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index dcc9476..92d4b45 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -19,17 +19,12 @@ import ArgusVersion import openhdlc -from scapy.compat import raw -from scapy.layers.inet import UDP -from scapy.layers.inet6 import IPv6 - #============================ helpers ========================================= def currentUtcTime(): return time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()) - def logCrash(threadName, err): output = [] output += ["============================================================="] @@ -48,7 +43,6 @@ def logCrash(threadName, err): #============================ classes ========================================= - class BeamLogic_RxSnifferThread(threading.Thread): """ Thread which attaches to the sniffer and parses incoming frames. @@ -281,9 +275,6 @@ class Serial_RxSnifferThread(threading.Thread): 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, ) - IPV6PREFIX = [0xbb, 0xbb, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] - IPV6HOST = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01] - def __init__(self, txMqttThread, serialport,baudrate): # store params @@ -390,7 +381,8 @@ def _handle_frame(self): self.send_to_parser([ord(c) for c in self.rxBuffer]) if self.rxBuffer[0] == 'P': #packet from sniffer SERFRAME_MOTE2PC_SNIFFED_PACKET 'P' - valid_frame = True + self.rxBuffer = self.rxBuffer[1:] + valid_frame = True except openhdlc.HdlcException as err: #log.warning('{}: invalid serial frame: {} {}'.format(self.name, format_string_buf(temp_buf), err)) @@ -445,7 +437,6 @@ def _newByte(self, b): # discard valid frame self.hdlc_flag self.hdlc_flag = False self._newFrame(self.rxBuffer) - self.rxBuffer = [] def _newFrame(self, frame): """ From 29c684561ec7e01fb94ea33fe8b87bc922845dcb Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Mon, 12 Apr 2021 00:20:05 +0200 Subject: [PATCH 13/16] Code cleaning --- ArgusProbe_Beamlogic.py | 55 ++++++++++++++--------------------------- 1 file changed, 18 insertions(+), 37 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 92d4b45..459415b 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -285,8 +285,6 @@ def __init__(self, txMqttThread, serialport,baudrate): # local variables self.serialHandler = None self.goOn = True - self.pleaseConnect = True - self.dataLock = threading.RLock() # hdlc frame parser object self.hdlc = openhdlc.OpenHdlc() @@ -311,51 +309,36 @@ def run(self): time.sleep(1) # let the banners print while self.goOn: try: - with self.dataLock: - pleaseConnect = self.pleaseConnect - - if pleaseConnect: - # open serial port - self.serialHandler = serial.Serial(self.serialport, baudrate=self.baudrate) - - # read byte - while True: - waitingbytes = self.serialHandler.inWaiting() - if waitingbytes != 0: - c= self.serialHandler.read(waitingbytes) - for byte in c: - self._newByte(byte) - time.sleep(2) + # open serial port + self.serialHandler = serial.Serial(self.serialport, baudrate=self.baudrate) + + # read byte + while True: + waitingbytes = self.serialHandler.inWaiting() + if waitingbytes != 0: + c= self.serialHandler.read(waitingbytes) + for byte in c: + self._newByte(byte) + time.sleep(2) except serial.SerialException: # mote disconnected, or pyserialHandler closed # destroy pyserial instance - print "WARNING: Could not read from serial at \"{0}\".".format( - self.serialport) - print "Is device connected?" - self.goOn = False + print "WARNING: Could not read from serial at \"{0}\".".format(self.serialport) + self.close() self.serialHandler = None time.sleep(1) except Exception as err: logCrash(self.name, err) + self.close() #======================== public ========================================== - def connectSerialPort(self): - with self.dataLock: - self.pleaseConnect = True - - def disconnectSerialPort(self): - with self.dataLock: - self.pleaseConnect = False - try: - self.serialHandler.close() - except: - pass - def close(self): + print "Is device connected?" self.goOn = False + #======================== public ========================================== #======================== private ========================================= @@ -381,12 +364,11 @@ def _handle_frame(self): self.send_to_parser([ord(c) for c in self.rxBuffer]) if self.rxBuffer[0] == 'P': #packet from sniffer SERFRAME_MOTE2PC_SNIFFED_PACKET 'P' - self.rxBuffer = self.rxBuffer[1:] + self.rxBuffer = self.rxBuffer[1:] #removing the indicator byte from the packet valid_frame = True except openhdlc.HdlcException as err: - #log.warning('{}: invalid serial frame: {} {}'.format(self.name, format_string_buf(temp_buf), err)) - print 'HDLC Exception' + print '{}: Invalid serial frame: {}'.format(self.name,self.rxBuffer) return valid_frame def parse_input(self, data): @@ -447,7 +429,6 @@ def _newFrame(self, frame): # transform frame frame = self._transformFrame(frame) - # publish frame self.txMqttThread.publishFrame(frame) From 486427a1a66a45ce212c94c308d046aa996bcaed Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Mon, 19 Apr 2021 16:38:25 +0200 Subject: [PATCH 14/16] Adding the Testbed probe --- ArgusProbe_Beamlogic.py | 289 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 288 insertions(+), 1 deletion(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 459415b..6600df5 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -14,6 +14,7 @@ import traceback import paho.mqtt.publish +import paho.mqtt.client as mqtt import serial import ArgusVersion @@ -487,6 +488,290 @@ def byteinverse(self,b): rb |= bitval << (7 - pos) return rb + +class MoteProbeNoData(Exception): + """ No data received from serial pipe """ + pass + +class OpenTestBed_RxSnifferThread(threading.Thread): + """ + Thread which attaches to the OpenTestBed and put frames into queue. + """ + BASE_TOPIC = 'opentestbed/deviceType/mote/deviceId' + + XOFF = 0x13 + XON = 0x11 + XONXOFF_ESCAPE = 0x12 + XONXOFF_MASK = 0x10 + + FCS16TAB = (0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, + 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, + 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, + 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, + 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, + 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, + 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, + 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, + 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, + 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, + 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, + 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, + 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, + 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, + 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, + 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, + 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, + 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, + 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, + 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, + 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, + 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, + 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, + 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, + 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, + 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, + 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, + 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, + 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, + 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, + 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, + 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, + ) + def __init__(self, txMqttThread, mqtt_broker, testbedmote_eui64): + # store params + self.mqtt_broker = mqtt_broker + self.testbedmote_eui64 = testbedmote_eui64 + + self.txMqttThread = txMqttThread + # local variables + self.goOn = True + # mqtt client + self.mqtt_client = mqtt.Client() + self.mqtt_client.on_connect = self._on_mqtt_connect + self.mqtt_client.on_message = self._on_mqtt_message + self.mqtt_client.connect(self.mqtt_broker) + + # hdlc frame parser object + self.hdlc = openhdlc.OpenHdlc() + #frame parsing variables + self.rxBuffer = [] + self.hdlc_flag = False + self.receiving = False + self.xonxoff_escaping = False + + self.HEADER_LENGTH = 2 + + # to be assigned, callback + self.send_to_parser = None + + # initialize thread + super(OpenTestBed_RxSnifferThread, self).__init__() + self.name = 'OpenTestBed_RxSnifferThread@{0}'.format(self.testbedmote_eui64) + self.start() + + def run(self): + + time.sleep(1) # let the banners print + try: + self._attach() + while self.goOn: + try: + rx_bytes = self._rcv_data() + #print(rx_bytes) + except MoteProbeNoData: + continue + except Exception as err: + logCrash(self.name, err) + time.sleep(1) + self.close() + else: + #parse incoming bytes + self.parse_bytes(rx_bytes) + time.sleep(4) + except Exception as err: + logCrash(self.name, err) + self.close() + + #======================== public ========================================== + + def close(self): + print "Is device connected?" + self.goOn = False + + def _rcv_data(self): + rx_bytes = self.mqtt_serial_queue.get() + return [chr(i) for i in rx_bytes] + + def _attach(self): + # create queue for receiving serialbytes messages + self.serialbytes_queue = Queue.Queue(maxsize=10) + self.mqtt_client.loop_start() + self.mqtt_serial_queue = self.serialbytes_queue + + # ==== mqtt callback functions ===================================== + def _on_mqtt_connect(self, client, userdata, flags, rc): + client.subscribe('{}/{}/notif/frommoteserialbytes'.format(self.BASE_TOPIC, self.testbedmote_eui64)) + + def _on_mqtt_message(self, client, userdata, message): + try: + serial_bytes = json.loads(message.payload)['serialbytes'] + except json.JSONDecodeError: + print("failed to parse message payload {}".format(message.payload)) + else: + try: + self.serialbytes_queue.put(serial_bytes, block=False) + except Queue.Full: + print ("queue overflow/full") + + #======================== private ========================================= + def _rx_buf_add(self, byte): + """ Adds byte to buffer and escapes the XONXOFF bytes """ + + if byte == chr(self.XONXOFF_ESCAPE): + self.xonxoff_escaping = True + else: + if self.xonxoff_escaping is True: + self.rxBuffer.append(chr(ord(byte) ^ self.XONXOFF_MASK)) + self.xonxoff_escaping = False + elif byte != chr(self.XON) and byte != chr(self.XOFF): + self.rxBuffer.append(byte) + + def _handle_frame(self): + """ Handles a HDLC frame """ + valid_frame = False + try: + #print(self.rxBuffer) + self.rxBuffer = self.hdlc.dehdlcify(self.rxBuffer) + + if self.send_to_parser: + self.send_to_parser([ord(c) for c in self.rxBuffer]) + + if self.rxBuffer[0] == 'P': #packet from sniffer SERFRAME_MOTE2PC_SNIFFED_PACKET 'P' + self.rxBuffer = self.rxBuffer[1:] #removing the indicator byte from the packet + valid_frame = True + + except openhdlc.HdlcException as err: + print '{}: Invalid serial frame: {}'.format(self.name,self.rxBuffer) + return valid_frame + + def parse_input(self, data): + + # ensure data not short longer than header + if len(data) < self.HEADER_LENGTH: + raise 'Error packet length' + + _ = data[:2] # header bytes + + # remove mote id at the beginning. + data = data[2:] + + return data + + def parse_bytes(self, octets): + """ + Parses bytes received from serial pipe + """ + for b in octets: + if not self.receiving: + if self.hdlc_flag and b != self.hdlc.HDLC_FLAG: + # start of frame + self.receiving = True + # discard received self.hdlc_flag + self.hdlc_flag = False + self.xonxoff_escaping = False + self.rxBuffer.append(self.hdlc.HDLC_FLAG) + self._rx_buf_add(b) + elif b == self.hdlc.HDLC_FLAG: + # received hdlc flag + self.rxBuffer = [] # Start of the frame, reset + self.hdlc_flag = True + else: + # drop garbage + pass + else: + if b != self.hdlc.HDLC_FLAG: + # middle of frame + self._rx_buf_add(b) + else: + # end of frame, received self.hdlc_flag + self.hdlc_flag = True + self.receiving = False + self._rx_buf_add(b) + valid_frame = self._handle_frame() + + if valid_frame: + # discard valid frame self.hdlc_flag + self.hdlc_flag = False + self._newFrame(self.rxBuffer) + + def _newFrame(self, frame): + """ + Just received a full frame from the sniffer + """ + #Parse incomming frame + frame = self.parse_input(frame) + # transform frame + frame = self._transformFrame(frame) + print(frame) + # publish frame + self.txMqttThread.publishFrame(frame) + + def _transformFrame(self, frame): + """ + Add ZEP header + """ + body = frame[0:-3] + _ = frame[-3:-1] # crc + frequency = frame[-1] + + zep = self._formatZep(body,frequency) + #frame = self._dispatch_mesh_debug_packet(zep) + return zep + + def _formatZep (self, body, frequency): + # ZEP header + zep = [ord('E'), ord('X')] # Protocol ID String + zep += [0x02] # Protocol Version + zep += [0x01] # Type + zep += [ord(frequency)] # Channel ID #?? Mislim da treba da bude u ovom obliku + zep += [0x00, 0x01] # Device ID + zep += [0x01] # LQI/CRC mode + zep += [0xff] + zep += [0x01] * 8 # timestamp + zep += [0x02] * 4 # sequence number + zep += [0x00] * 10 # reserved + zep += [len(body) + 2] # length + + # mac frame + mac = [ord(i) for i in body] + mac += self.calculate_fcs(mac) + return zep + mac + + def calculate_fcs(self,rpayload): + payload = [] + for b in rpayload: + payload += [self.byteinverse(b)] + crc = 0x0000 + for b in payload: + crc = ((crc << 8) & 0xffff) ^ self.FCS16TAB[((crc >> 8) ^ b) & 0xff] + return_val = [ + self.byteinverse(crc >> 8), + self.byteinverse(crc & 0xff), + ] + return return_val + + def byteinverse(self,b): + # TODO: speed up through lookup table + + rb = 0 + for pos in range(8): + if b & (1 << pos) != 0: #check this out + bitval = 1 + else: + bitval = 0 + rb |= bitval << (7 - pos) + return rb + class TxMqttThread(threading.Thread): """ Thread which publishes sniffed frames to the MQTT broker. @@ -583,6 +868,8 @@ def main(): parser.add_argument("--probetype", nargs="?", default="beamlogic", choices=["beamlogic", "serial","opentestbed"]) parser.add_argument("--serialport", help= 'Input the serial port for the Serial probe type') parser.add_argument("--baudrate", default= 115200) + parser.add_argument("--mqtt_broker", default='argus.paris.inria.fr') + parser.add_argument("--testbedmote") args = parser.parse_args() # start thread @@ -592,7 +879,7 @@ def main(): elif args.probetype == "serial": serial_rxSnifferThread = Serial_RxSnifferThread(txMqttThread,args.serialport,args.baudrate) elif args.probetype == "opentestbed": - pass + testbed_rxSnifferThread = OpenTestBed_RxSnifferThread(txMqttThread,args.mqtt_broker, args.testbedmote) else: print('This probe type is not supported!') sys.exit() From 9c41871c176eaa1cf5b8a9f8653237c8bc13a38f Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Sat, 22 May 2021 12:12:17 +0200 Subject: [PATCH 15/16] Changes in organization --- ArgusProbe_Beamlogic.py | 87 +++++++++++++++++++++++------------------ 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 6600df5..99d327c 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -497,7 +497,6 @@ class OpenTestBed_RxSnifferThread(threading.Thread): """ Thread which attaches to the OpenTestBed and put frames into queue. """ - BASE_TOPIC = 'opentestbed/deviceType/mote/deviceId' XOFF = 0x13 XON = 0x11 @@ -537,19 +536,12 @@ class OpenTestBed_RxSnifferThread(threading.Thread): 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, ) - def __init__(self, txMqttThread, mqtt_broker, testbedmote_eui64): + def __init__(self, txMqttThread,portname): # store params - self.mqtt_broker = mqtt_broker - self.testbedmote_eui64 = testbedmote_eui64 - self.txMqttThread = txMqttThread + self._portname = portname # local variables self.goOn = True - # mqtt client - self.mqtt_client = mqtt.Client() - self.mqtt_client.on_connect = self._on_mqtt_connect - self.mqtt_client.on_message = self._on_mqtt_message - self.mqtt_client.connect(self.mqtt_broker) # hdlc frame parser object self.hdlc = openhdlc.OpenHdlc() @@ -566,7 +558,7 @@ def __init__(self, txMqttThread, mqtt_broker, testbedmote_eui64): # initialize thread super(OpenTestBed_RxSnifferThread, self).__init__() - self.name = 'OpenTestBed_RxSnifferThread@{0}'.format(self.testbedmote_eui64) + self.name = 'OpenTestBed_RxSnifferThread@{0}'.format(self._portname) self.start() def run(self): @@ -593,36 +585,10 @@ def run(self): self.close() #======================== public ========================================== - def close(self): print "Is device connected?" self.goOn = False - def _rcv_data(self): - rx_bytes = self.mqtt_serial_queue.get() - return [chr(i) for i in rx_bytes] - - def _attach(self): - # create queue for receiving serialbytes messages - self.serialbytes_queue = Queue.Queue(maxsize=10) - self.mqtt_client.loop_start() - self.mqtt_serial_queue = self.serialbytes_queue - - # ==== mqtt callback functions ===================================== - def _on_mqtt_connect(self, client, userdata, flags, rc): - client.subscribe('{}/{}/notif/frommoteserialbytes'.format(self.BASE_TOPIC, self.testbedmote_eui64)) - - def _on_mqtt_message(self, client, userdata, message): - try: - serial_bytes = json.loads(message.payload)['serialbytes'] - except json.JSONDecodeError: - print("failed to parse message payload {}".format(message.payload)) - else: - try: - self.serialbytes_queue.put(serial_bytes, block=False) - except Queue.Full: - print ("queue overflow/full") - #======================== private ========================================= def _rx_buf_add(self, byte): """ Adds byte to buffer and escapes the XONXOFF bytes """ @@ -772,6 +738,50 @@ def byteinverse(self,b): rb |= bitval << (7 - pos) return rb +class OpentestbedMoteProbe (OpenTestBed_RxSnifferThread): + BASE_TOPIC = 'opentestbed/deviceType/mote/deviceId' + + def __init__(self,txMqttThread, mqtt_broker, testbedmote_eui64): + self.mqtt_broker = mqtt_broker + self.testbedmote_eui64 = testbedmote_eui64 + + # mqtt client + self.mqtt_client = mqtt.Client() + self.mqtt_client.on_connect = self._on_mqtt_connect + self.mqtt_client.on_message = self._on_mqtt_message + self.mqtt_client.connect(self.mqtt_broker) + + name = 'opentestbed_{0}'.format(testbedmote_eui64) + # initialize the parent class + OpenTestBed_RxSnifferThread.__init__(self,txMqttThread, portname=name) + + # ======================== private ================================= + def _rcv_data(self): + rx_bytes = self.mqtt_serial_queue.get() + return [chr(i) for i in rx_bytes] + + def _attach(self): + # create queue for receiving serialbytes messages + self.serialbytes_queue = Queue.Queue(maxsize=100) + self.mqtt_client.loop_start() + self.mqtt_serial_queue = self.serialbytes_queue + + # ==== mqtt callback functions ===================================== + + def _on_mqtt_connect(self, client, userdata, flags, rc): + client.subscribe('{}/{}/notif/frommoteserialbytes'.format(self.BASE_TOPIC, self.testbedmote_eui64)) + + def _on_mqtt_message(self, client, userdata, message): + try: + serial_bytes = json.loads(message.payload)['serialbytes'] + except json.JSONDecodeError: + print("failed to parse message payload {}".format(message.payload)) + else: + try: + self.serialbytes_queue.put(serial_bytes, block=False) + except Queue.Full: + print("queue overflow/full") + class TxMqttThread(threading.Thread): """ Thread which publishes sniffed frames to the MQTT broker. @@ -879,7 +889,8 @@ def main(): elif args.probetype == "serial": serial_rxSnifferThread = Serial_RxSnifferThread(txMqttThread,args.serialport,args.baudrate) elif args.probetype == "opentestbed": - testbed_rxSnifferThread = OpenTestBed_RxSnifferThread(txMqttThread,args.mqtt_broker, args.testbedmote) + testbed_rxSnifferThread = OpentestbedMoteProbe(txMqttThread,args.mqtt_broker,args.testbedmote) + else: print('This probe type is not supported!') sys.exit() From 36da0aab3777bf9738b7097dfe9adf8badf38c51 Mon Sep 17 00:00:00 2001 From: JelenaKovacc Date: Sun, 23 May 2021 22:28:32 +0200 Subject: [PATCH 16/16] Adding Abstract Base Class property --- ArgusProbe_Beamlogic.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 99d327c..fc0506a 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -12,14 +12,14 @@ import threading import time import traceback - +import abc import paho.mqtt.publish import paho.mqtt.client as mqtt import serial import ArgusVersion import openhdlc - +from pydispatch import dispatcher #============================ helpers ========================================= @@ -300,6 +300,9 @@ def __init__(self, txMqttThread, serialport,baudrate): # to be assigned, callback self.send_to_parser = None + # connect to dispatcher + dispatcher.connect(self._send_data, signal='fromMoteConnector@' + self._portname) + # initialize thread super(Serial_RxSnifferThread, self).__init__() self.name = 'Serial_RxSnifferThread@{0}'.format(self.serialport) @@ -537,6 +540,7 @@ class OpenTestBed_RxSnifferThread(threading.Thread): 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, ) def __init__(self, txMqttThread,portname): + __metaclass__ = abc.ABCMeta # store params self.txMqttThread = txMqttThread self._portname = portname @@ -569,7 +573,6 @@ def run(self): while self.goOn: try: rx_bytes = self._rcv_data() - #print(rx_bytes) except MoteProbeNoData: continue except Exception as err: @@ -579,7 +582,7 @@ def run(self): else: #parse incoming bytes self.parse_bytes(rx_bytes) - time.sleep(4) + #time.sleep(2) except Exception as err: logCrash(self.name, err) self.close() @@ -590,6 +593,10 @@ def close(self): self.goOn = False #======================== private ========================================= + @abc.abstractmethod + def _rcv_data(self): + raise NotImplementedError("Should be implemented by child class") + def _rx_buf_add(self, byte): """ Adds byte to buffer and escapes the XONXOFF bytes """ @@ -606,7 +613,6 @@ def _handle_frame(self): """ Handles a HDLC frame """ valid_frame = False try: - #print(self.rxBuffer) self.rxBuffer = self.hdlc.dehdlcify(self.rxBuffer) if self.send_to_parser: @@ -678,7 +684,6 @@ def _newFrame(self, frame): frame = self.parse_input(frame) # transform frame frame = self._transformFrame(frame) - print(frame) # publish frame self.txMqttThread.publishFrame(frame) @@ -738,11 +743,13 @@ def byteinverse(self,b): rb |= bitval << (7 - pos) return rb +### child class class OpentestbedMoteProbe (OpenTestBed_RxSnifferThread): BASE_TOPIC = 'opentestbed/deviceType/mote/deviceId' def __init__(self,txMqttThread, mqtt_broker, testbedmote_eui64): - self.mqtt_broker = mqtt_broker + + self.mqtt_broker = mqtt_broker self.testbedmote_eui64 = testbedmote_eui64 # mqtt client