From b03b2632e18c90cd152094c2334ae6c8be533f74 Mon Sep 17 00:00:00 2001 From: Keoma Brun Date: Fri, 21 Jul 2017 18:35:15 -0700 Subject: [PATCH 1/8] adding rssi to zep reserved field --- ArgusProbe_Beamlogic.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index b4d1729..68df53a 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -161,6 +161,7 @@ def _transformFrame(self,frame): zep = self._formatZep( channel = beamlogic['Channel'], timestamp = beamlogic['TimeStamp'], + rssi = beamlogic['RSSI'], length = len(ieee154), ) @@ -190,7 +191,7 @@ def _parseBeamlogicHeader(self,header): return returnVal - def _formatZep(self,channel,timestamp,length): + def _formatZep(self,channel,timestamp,rssi,length): return [ 0x45,0x58, 0x02, @@ -203,7 +204,8 @@ def _formatZep(self,channel,timestamp,length): [ord(b) for b in struct.pack('>Q',timestamp)]+ \ [ 0x02,0x02,0x02,0x02, - 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, + rssi, + 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, length, ] From 77a31542a7ee5fec513d4c4d2cac678a0cfd59d0 Mon Sep 17 00:00:00 2001 From: Keoma Brun Date: Fri, 21 Jul 2017 18:40:04 -0700 Subject: [PATCH 2/8] fix PEP8 coding style --- ArgusProbe_Beamlogic.py | 67 ++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 68df53a..bcad75f 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -17,10 +17,12 @@ #============================ helpers ========================================= + def currentUtcTime(): return time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()) -def logCrash(threadName,err): + +def logCrash(threadName, err): output = [] output += ["============================================================="] output += [currentUtcTime()] @@ -38,17 +40,18 @@ def logCrash(threadName,err): #============================ classes ========================================= + class RxSnifferThread(threading.Thread): ''' Thread which attaches to the sniffer and parses incoming frames. ''' - PCAP_GLOBALHEADER_LEN = 24 # 4+2+2+4+4+4+4 - PCAP_PACKETHEADER_LEN = 16 # 4+4+4+4 - BEAMLOGIC_HEADER_LEN = 18 # 8+1+1+4+4 + PCAP_GLOBALHEADER_LEN = 24 # 4+2+2+4+4+4+4 + PCAP_PACKETHEADER_LEN = 16 # 4+4+4+4 + BEAMLOGIC_HEADER_LEN = 18 # 8+1+1+4+4 PIPE_SNIFFER = r'\\.\pipe\analyzer' - def __init__(self,txMqttThread): + def __init__(self, txMqttThread): # store params self.txMqttThread = txMqttThread @@ -66,7 +69,7 @@ def __init__(self,txMqttThread): def run(self): try: - time.sleep(1) # let the banners print + time.sleep(1) # let the banners print while True: try: with open(self.PIPE_SNIFFER, 'rb') as sniffer: @@ -80,13 +83,13 @@ def run(self): print "Is SiteAnalyzerAdapter running?" time.sleep(1) except Exception as err: - logCrash(self.name,err) + logCrash(self.name, err) #======================== public ========================================== #======================== private ========================================= - def _newByte(self,b): + def _newByte(self, b): ''' Just received a byte from the sniffer ''' @@ -95,26 +98,26 @@ def _newByte(self,b): # PCAP global header if not self.doneReceivingGlobalHeader: - if len(self.rxBuffer)==self.PCAP_GLOBALHEADER_LEN: + if len(self.rxBuffer) == self.PCAP_GLOBALHEADER_LEN: self.doneReceivingGlobalHeader = True self.rxBuffer = [] # PCAP packet header elif not self.doneReceivingPacketHeader: - if len(self.rxBuffer)==self.PCAP_PACKETHEADER_LEN: + if len(self.rxBuffer) == self.PCAP_PACKETHEADER_LEN: self.doneReceivingPacketHeader = True self.packetHeader = self._parsePcapPacketHeader(self.rxBuffer) - assert self.packetHeader['incl_len']==self.packetHeader['orig_len'] + assert self.packetHeader['incl_len'] == self.packetHeader['orig_len'] self.rxBuffer = [] # PCAP packet bytes else: - if len(self.rxBuffer)==self.packetHeader['incl_len']: + if len(self.rxBuffer) == self.packetHeader['incl_len']: self.doneReceivingPacketHeader = False self._newFrame(self.rxBuffer) self.rxBuffer = [] - def _parsePcapPacketHeader(self,header): + def _parsePcapPacketHeader(self, header): ''' Parse a PCAP packet header @@ -128,7 +131,7 @@ def _parsePcapPacketHeader(self,header): } pcaprec_hdr_t; ''' - assert len(header)==self.PCAP_PACKETHEADER_LEN + assert len(header) == self.PCAP_PACKETHEADER_LEN returnVal = {} ( @@ -140,7 +143,7 @@ def _parsePcapPacketHeader(self,header): return returnVal - def _newFrame(self,frame): + def _newFrame(self, frame): ''' Just received a full frame from the sniffer ''' @@ -151,7 +154,7 @@ def _newFrame(self,frame): # publish frame self.txMqttThread.publishFrame(frame) - def _transformFrame(self,frame): + def _transformFrame(self, frame): ''' Replace BeamLogic header by ZEP header. ''' @@ -167,7 +170,7 @@ def _transformFrame(self,frame): return zep+ieee154 - def _parseBeamlogicHeader(self,header): + def _parseBeamlogicHeader(self, header): ''' Parse a Beamlogic header @@ -178,7 +181,7 @@ def _parseBeamlogicHeader(self,header): uint32 GpsLong ''' - assert len(header)==self.BEAMLOGIC_HEADER_LEN + assert len(header) == self.BEAMLOGIC_HEADER_LEN returnVal = {} ( @@ -191,24 +194,25 @@ def _parseBeamlogicHeader(self,header): return returnVal - def _formatZep(self,channel,timestamp,rssi,length): + def _formatZep(self, channel, timestamp, rssi, length): return [ - 0x45,0x58, + 0x45, 0x58, 0x02, 0x01, channel, - 0x00,0x01, + 0x00, 0x01, 0x01, 0xff, - ]+ \ - [ord(b) for b in struct.pack('>Q',timestamp)]+ \ + ] + \ + [ord(b) for b in struct.pack('>Q', timestamp)] + \ [ - 0x02,0x02,0x02,0x02, + 0x02, 0x02, 0x02, 0x02, rssi, - 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, length, ] + class TxMqttThread(threading.Thread): ''' Thread which publishes sniffed frames to the MQTT broker. @@ -232,7 +236,7 @@ def run(self): try: while True: # wait for first frame - msgs = [self.txQueue.get(),] + msgs = [self.txQueue.get(), ] # get other frames (if any) try: @@ -264,18 +268,18 @@ def run(self): ) except Exception as err: - logCrash(self.name,err) + logCrash(self.name, err) #======================== public ========================================== - def publishFrame(self,frame): + def publishFrame(self, frame): msg = { 'description': 'zep', 'device': 'Beamlogic', 'bytes': ''.join(['{0:02x}'.format(b) for b in frame]), } try: - self.txQueue.put(json.dumps(msg),block=False) + self.txQueue.put(json.dumps(msg), block=False) except Queue.Full: print "WARNING transmit queue to MQTT broker full. Dropping frame." @@ -296,10 +300,11 @@ def __init__(self): input = raw_input('>') print input, except Exception as err: - logCrash('CliThread',err) + logCrash('CliThread', err) #============================ main ============================================ + def main(): # parse parameters @@ -308,5 +313,5 @@ def main(): rxSnifferThread = RxSnifferThread(txMqttThread) cliThread = CliThread() -if __name__=="__main__": +if __name__ == "__main__": main() From 90c57d62304d6cba0fe6edc1d44fc9b8dc16ab67 Mon Sep 17 00:00:00 2001 From: Keoma Brun Date: Fri, 21 Jul 2017 18:40:55 -0700 Subject: [PATCH 3/8] PEP8 redundant parenthesis --- ArgusProbe_Beamlogic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index bcad75f..e574012 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -76,7 +76,7 @@ def run(self): while True: b = ord(sniffer.read(1)) self._newByte(b) - except (IOError): + except IOError: print "WARNING: Could not read from pipe at \"{0}\".".format( self.PIPE_SNIFFER ) From ec6f8598617979dda887ec9828328b1b09e10266 Mon Sep 17 00:00:00 2001 From: Keoma Brun Date: Fri, 21 Jul 2017 18:42:43 -0700 Subject: [PATCH 4/8] using triple quote for docstring --- ArgusProbe_Beamlogic.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index e574012..ef173f4 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -1,7 +1,7 @@ -''' +""" Argus probe for the Beamlogic Site Analyzer Lite http://www.beamlogic.com/products/802154-site-analyzer.aspx -''' +""" import time import struct @@ -42,9 +42,9 @@ def logCrash(threadName, err): class RxSnifferThread(threading.Thread): - ''' + """ Thread which attaches to the sniffer and parses incoming frames. - ''' + """ PCAP_GLOBALHEADER_LEN = 24 # 4+2+2+4+4+4+4 PCAP_PACKETHEADER_LEN = 16 # 4+4+4+4 @@ -90,9 +90,9 @@ def run(self): #======================== private ========================================= def _newByte(self, b): - ''' + """ Just received a byte from the sniffer - ''' + """ with self.dataLock: self.rxBuffer += [b] @@ -118,7 +118,7 @@ def _newByte(self, b): self.rxBuffer = [] def _parsePcapPacketHeader(self, header): - ''' + """ Parse a PCAP packet header Per https://wiki.wireshark.org/Development/LibpcapFileFormat: @@ -129,7 +129,7 @@ def _parsePcapPacketHeader(self, header): guint32 incl_len; /* number of octets of packet saved in file */ guint32 orig_len; /* actual length of packet */ } pcaprec_hdr_t; - ''' + """ assert len(header) == self.PCAP_PACKETHEADER_LEN @@ -144,9 +144,9 @@ def _parsePcapPacketHeader(self, header): return returnVal def _newFrame(self, frame): - ''' + """ Just received a full frame from the sniffer - ''' + """ # transform frame frame = self._transformFrame(frame) @@ -155,9 +155,9 @@ def _newFrame(self, frame): self.txMqttThread.publishFrame(frame) def _transformFrame(self, frame): - ''' + """ Replace BeamLogic header by ZEP header. - ''' + """ beamlogic = self._parseBeamlogicHeader(frame[1:1+self.BEAMLOGIC_HEADER_LEN]) ieee154 = frame[self.BEAMLOGIC_HEADER_LEN+2:-1] @@ -171,7 +171,7 @@ def _transformFrame(self, frame): return zep+ieee154 def _parseBeamlogicHeader(self, header): - ''' + """ Parse a Beamlogic header uint64 TimeStamp @@ -179,7 +179,7 @@ def _parseBeamlogicHeader(self, header): uint8 RSSI uint32 GpsLat uint32 GpsLong - ''' + """ assert len(header) == self.BEAMLOGIC_HEADER_LEN @@ -214,9 +214,9 @@ def _formatZep(self, channel, timestamp, rssi, length): class TxMqttThread(threading.Thread): - ''' + """ Thread which publishes sniffed frames to the MQTT broker. - ''' + """ MQTT_BROKER_HOST = 'argus.paris.inria.fr' MQTT_BROKER_PORT = 1883 From ff0e596729c5ee7c7f32995f33859640b6b6f0c2 Mon Sep 17 00:00:00 2001 From: Keoma Brun Date: Fri, 21 Jul 2017 18:44:12 -0700 Subject: [PATCH 5/8] renamed shadowing builtin variable --- ArgusProbe_Beamlogic.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index ef173f4..ee4ea89 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -297,8 +297,8 @@ def __init__(self): ) while True: - input = raw_input('>') - print input, + user_input = raw_input('>') + print user_input, except Exception as err: logCrash('CliThread', err) From a770a00ce10034bdb6cc3c4a413974c4694ff782 Mon Sep 17 00:00:00 2001 From: Keoma Brun Date: Fri, 21 Jul 2017 18:51:20 -0700 Subject: [PATCH 6/8] PEP8 style --- ArgusClient.py | 97 ++++++++++++++++++++++++++++---------------------- 1 file changed, 54 insertions(+), 43 deletions(-) diff --git a/ArgusClient.py b/ArgusClient.py index 59e63c4..bd315fc 100755 --- a/ArgusClient.py +++ b/ArgusClient.py @@ -17,9 +17,11 @@ import ArgusVersion + def isLinux(): return platform.system() == "Linux" + def isWindows(): return platform.system() == "Windows" @@ -35,10 +37,12 @@ def isWindows(): #============================ helpers ========================================= + def currentUtcTime(): return time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()) -def logCrash(threadName,err): + +def logCrash(threadName, err): output = [] output += ["============================================================="] output += [currentUtcTime()] @@ -56,6 +60,7 @@ def logCrash(threadName,err): #============================ classes ========================================= + class RxMqttThread(threading.Thread): ''' Thread which subscribes to the MQTT broker and pushes @@ -66,7 +71,7 @@ class RxMqttThread(threading.Thread): MQTT_BROKER_PORT = 1883 MQTT_BROKER_TOPIC = 'inria-paris/beamlogic' - def __init__(self,txWiresharkThread): + def __init__(self, txWiresharkThread): # store params self.txWiresharkThread = txWiresharkThread @@ -84,22 +89,23 @@ def __init__(self,txWiresharkThread): def run(self): try: self.mqtt.connect(host=self.MQTT_BROKER_HOST, port=1883, keepalive=60) - self.mqtt.loop_forever() # handles reconnects + self.mqtt.loop_forever() # handles reconnects except Exception as err: - logCrash(self.name,err) + logCrash(self.name, err) #======================== public ========================================== #======================== private ========================================= - def _mqtt_on_connect(self,client,userdata,flags,rc): - assert rc==0 + def _mqtt_on_connect(self, client, userdata, flags, rc): + assert rc == 0 print("INFO: Connected to {0} MQTT broker".format(self.MQTT_BROKER_HOST)) self.mqtt.subscribe('argus/{0}'.format(self.MQTT_BROKER_TOPIC)) - def _mqtt_on_message(self,client,userdata,msg): + def _mqtt_on_message(self, client, userdata, msg): self.txWiresharkThread.publish(msg.payload) + class TxWiresharkThread(threading.Thread): ''' Thread which publishes sniffed frames to Wireshark broker. @@ -144,14 +150,14 @@ def run(self): try: # connect to pipe (blocks until Wireshark appears) if isWindows(): - win32pipe.ConnectNamedPipe(self.pipe,None) + win32pipe.ConnectNamedPipe(self.pipe, None) elif isLinux(): open(self.PIPE_NAME_WIRESHARK, 'wb') # send PCAP global header to Wireshark ghdr = self._createPcapGlobalHeader() if isWindows(): - win32file.WriteFile(self.pipe,ghdr) + win32file.WriteFile(self.pipe, ghdr) elif isLinux(): self.pipe.write(ghdr) self.pipe.flush() @@ -176,11 +182,11 @@ def run(self): elif isLinux(): self.pipe.close() except Exception as err: - logCrash(self.name,err) + logCrash(self.name, err) #======================== public ========================================== - def publish(self,msg): + def publish(self, msg): with self.dataLock: if not self.wiresharkConnected: # no Wireshark listening, dropping. @@ -190,40 +196,42 @@ def publish(self,msg): udp = ''.join( [ chr(b) for b in [ - 0x00,0x00, # source port - 0x45,0x5a, # destination port - 0x00,8+len(zep), # length - 0xbc,0x04, # checksum + 0x00, 0x00, # source port + 0x45, 0x5a, # destination port + 0x00, 8+len(zep), # length + 0xbc, 0x04, # checksum ] ] ) ipv6 = ''.join( [ chr(b) for b in [ - 0x60, # version - 0x00,0x00,0x00, # traffic class - 0x00,len(udp)+len(zep), # payload length - 0x11, # next header (17==UDP) - 0x08, # HLIM - 0xbb,0xbb,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x01, # src - 0xbb,0xbb,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x01, # dest + 0x60, # version + 0x00, 0x00, 0x00, # traffic class + 0x00, len(udp) + len(zep), # payload length + 0x11, # next header (17==UDP) + 0x08, # HLIM + 0xbb, 0xbb, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, # src + 0xbb, 0xbb, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, # dest ] ] ) ethernet = ''.join([chr(b) for b in [ - 0x00,0x00,0x00,0x00,0x00,0x00, # source - 0x00,0x00,0x00,0x00,0x00,0x00, # destination - 0x86,0xdd, # type (IPv6) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, # source + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, # destination + 0x86, 0xdd, # type (IPv6) ] ] ) - frame = ''.join([ethernet,ipv6,udp,zep]) + frame = ''.join([ethernet, ipv6, udp, zep]) pcap = self._createPcapPacketHeader(len(frame)) try: if isWindows(): - win32file.WriteFile(self.pipe,pcap+frame) + win32file.WriteFile(self.pipe, pcap+frame) elif isLinux(): self.pipe.write(pcap+frame) self.pipe.flush() @@ -252,16 +260,16 @@ def _createPcapGlobalHeader(self): return struct.pack( ' ') print input, except Exception as err: - logCrash('CliThread',err) + logCrash('CliThread', err) #============================ main ============================================ + def main(): try: # parse parameters # start Wireshark if isWindows(): - wireshark_cmd = ['C:\Program Files\Wireshark\Wireshark.exe', r'-i\\.\pipe\argus','-k'] + wireshark_cmd = ['C:\Program Files\Wireshark\Wireshark.exe', + r'-i\\.\pipe\argus', '-k'] elif isLinux(): fifo_name = "/tmp/argus" if not os.path.exists(fifo_name): @@ -325,7 +336,7 @@ def main(): rxMqttThread = RxMqttThread(txWiresharkThread) cliThread = CliThread() except Exception as err: - logCrash('main',err) + logCrash('main', err) -if __name__=="__main__": +if __name__ == "__main__": main() From 899c8cf6b27692174d7009d5e99318078bac6189 Mon Sep 17 00:00:00 2001 From: Keoma Brun Date: Fri, 21 Jul 2017 18:51:36 -0700 Subject: [PATCH 7/8] renamed shadowing builtin variable --- ArgusClient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ArgusClient.py b/ArgusClient.py index bd315fc..2dce4ea 100755 --- a/ArgusClient.py +++ b/ArgusClient.py @@ -303,8 +303,8 @@ def __init__(self): ) while True: - input = raw_input('> ') - print input, + user_input = raw_input('> ') + print user_input, except Exception as err: logCrash('CliThread', err) From 28565d999a98d0a8652ff98650bcba3bbec6e764 Mon Sep 17 00:00:00 2001 From: Keoma Brun Date: Fri, 21 Jul 2017 18:55:44 -0700 Subject: [PATCH 8/8] using triple quote for docstring --- ArgusClient.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ArgusClient.py b/ArgusClient.py index 2dce4ea..624cf32 100755 --- a/ArgusClient.py +++ b/ArgusClient.py @@ -1,7 +1,7 @@ -''' +""" Argus client script which attaches to the broker and sends sniffed packets through a pipe to Wireshark. -''' +""" import threading import time @@ -62,10 +62,10 @@ def logCrash(threadName, err): class RxMqttThread(threading.Thread): - ''' + """ Thread which subscribes to the MQTT broker and pushes received frames to he - ''' + """ MQTT_BROKER_HOST = 'argus.paris.inria.fr' MQTT_BROKER_PORT = 1883 @@ -107,9 +107,9 @@ def _mqtt_on_message(self, client, userdata, msg): class TxWiresharkThread(threading.Thread): - ''' + """ Thread which publishes sniffed frames to Wireshark broker. - ''' + """ if isWindows(): PIPE_NAME_WIRESHARK = r'\\.\pipe\argus' @@ -242,7 +242,7 @@ def publish(self, msg): #======================== private ========================================= def _createPcapGlobalHeader(self): - ''' + """ Create a PCAP global header. Per https://wiki.wireshark.org/Development/LibpcapFileFormat: @@ -256,7 +256,7 @@ def _createPcapGlobalHeader(self): guint32 snaplen; /* max length of captured packets, in octets */ guint32 network; /* data link type */ } pcap_hdr_t; - ''' + """ return struct.pack( '