diff --git a/ArgusClient.py b/ArgusClient.py index 59e63c4..624cf32 100755 --- a/ArgusClient.py +++ b/ArgusClient.py @@ -1,7 +1,7 @@ -''' +""" Argus client script which attaches to the broker and sends sniffed packets through a pipe to Wireshark. -''' +""" import threading import time @@ -17,9 +17,11 @@ import ArgusVersion + def isLinux(): return platform.system() == "Linux" + def isWindows(): return platform.system() == "Windows" @@ -35,10 +37,12 @@ def isWindows(): #============================ helpers ========================================= + def currentUtcTime(): return time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()) -def logCrash(threadName,err): + +def logCrash(threadName, err): output = [] output += ["============================================================="] output += [currentUtcTime()] @@ -56,17 +60,18 @@ def logCrash(threadName,err): #============================ classes ========================================= + class RxMqttThread(threading.Thread): - ''' + """ Thread which subscribes to the MQTT broker and pushes received frames to he - ''' + """ MQTT_BROKER_HOST = 'argus.paris.inria.fr' MQTT_BROKER_PORT = 1883 MQTT_BROKER_TOPIC = 'inria-paris/beamlogic' - def __init__(self,txWiresharkThread): + def __init__(self, txWiresharkThread): # store params self.txWiresharkThread = txWiresharkThread @@ -84,26 +89,27 @@ def __init__(self,txWiresharkThread): def run(self): try: self.mqtt.connect(host=self.MQTT_BROKER_HOST, port=1883, keepalive=60) - self.mqtt.loop_forever() # handles reconnects + self.mqtt.loop_forever() # handles reconnects except Exception as err: - logCrash(self.name,err) + logCrash(self.name, err) #======================== public ========================================== #======================== private ========================================= - def _mqtt_on_connect(self,client,userdata,flags,rc): - assert rc==0 + def _mqtt_on_connect(self, client, userdata, flags, rc): + assert rc == 0 print("INFO: Connected to {0} MQTT broker".format(self.MQTT_BROKER_HOST)) self.mqtt.subscribe('argus/{0}'.format(self.MQTT_BROKER_TOPIC)) - def _mqtt_on_message(self,client,userdata,msg): + def _mqtt_on_message(self, client, userdata, msg): self.txWiresharkThread.publish(msg.payload) + class TxWiresharkThread(threading.Thread): - ''' + """ Thread which publishes sniffed frames to Wireshark broker. - ''' + """ if isWindows(): PIPE_NAME_WIRESHARK = r'\\.\pipe\argus' @@ -144,14 +150,14 @@ def run(self): try: # connect to pipe (blocks until Wireshark appears) if isWindows(): - win32pipe.ConnectNamedPipe(self.pipe,None) + win32pipe.ConnectNamedPipe(self.pipe, None) elif isLinux(): open(self.PIPE_NAME_WIRESHARK, 'wb') # send PCAP global header to Wireshark ghdr = self._createPcapGlobalHeader() if isWindows(): - win32file.WriteFile(self.pipe,ghdr) + win32file.WriteFile(self.pipe, ghdr) elif isLinux(): self.pipe.write(ghdr) self.pipe.flush() @@ -176,11 +182,11 @@ def run(self): elif isLinux(): self.pipe.close() except Exception as err: - logCrash(self.name,err) + logCrash(self.name, err) #======================== public ========================================== - def publish(self,msg): + def publish(self, msg): with self.dataLock: if not self.wiresharkConnected: # no Wireshark listening, dropping. @@ -190,40 +196,42 @@ def publish(self,msg): udp = ''.join( [ chr(b) for b in [ - 0x00,0x00, # source port - 0x45,0x5a, # destination port - 0x00,8+len(zep), # length - 0xbc,0x04, # checksum + 0x00, 0x00, # source port + 0x45, 0x5a, # destination port + 0x00, 8+len(zep), # length + 0xbc, 0x04, # checksum ] ] ) ipv6 = ''.join( [ chr(b) for b in [ - 0x60, # version - 0x00,0x00,0x00, # traffic class - 0x00,len(udp)+len(zep), # payload length - 0x11, # next header (17==UDP) - 0x08, # HLIM - 0xbb,0xbb,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x01, # src - 0xbb,0xbb,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x01, # dest + 0x60, # version + 0x00, 0x00, 0x00, # traffic class + 0x00, len(udp) + len(zep), # payload length + 0x11, # next header (17==UDP) + 0x08, # HLIM + 0xbb, 0xbb, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, # src + 0xbb, 0xbb, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, # dest ] ] ) ethernet = ''.join([chr(b) for b in [ - 0x00,0x00,0x00,0x00,0x00,0x00, # source - 0x00,0x00,0x00,0x00,0x00,0x00, # destination - 0x86,0xdd, # type (IPv6) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, # source + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, # destination + 0x86, 0xdd, # type (IPv6) ] ] ) - frame = ''.join([ethernet,ipv6,udp,zep]) + frame = ''.join([ethernet, ipv6, udp, zep]) pcap = self._createPcapPacketHeader(len(frame)) try: if isWindows(): - win32file.WriteFile(self.pipe,pcap+frame) + win32file.WriteFile(self.pipe, pcap+frame) elif isLinux(): self.pipe.write(pcap+frame) self.pipe.flush() @@ -234,7 +242,7 @@ def publish(self,msg): #======================== private ========================================= def _createPcapGlobalHeader(self): - ''' + """ Create a PCAP global header. Per https://wiki.wireshark.org/Development/LibpcapFileFormat: @@ -248,21 +256,21 @@ def _createPcapGlobalHeader(self): guint32 snaplen; /* max length of captured packets, in octets */ guint32 network; /* data link type */ } pcap_hdr_t; - ''' + """ return struct.pack( ' ') - print input, + user_input = raw_input('> ') + print user_input, except Exception as err: - logCrash('CliThread',err) + logCrash('CliThread', err) #============================ main ============================================ + def main(): try: # parse parameters # start Wireshark if isWindows(): - wireshark_cmd = ['C:\Program Files\Wireshark\Wireshark.exe', r'-i\\.\pipe\argus','-k'] + wireshark_cmd = ['C:\Program Files\Wireshark\Wireshark.exe', + r'-i\\.\pipe\argus', '-k'] elif isLinux(): fifo_name = "/tmp/argus" if not os.path.exists(fifo_name): @@ -325,7 +336,7 @@ def main(): rxMqttThread = RxMqttThread(txWiresharkThread) cliThread = CliThread() except Exception as err: - logCrash('main',err) + logCrash('main', err) -if __name__=="__main__": +if __name__ == "__main__": main() diff --git a/ArgusProbe_Beamlogic.py b/ArgusProbe_Beamlogic.py index 61c7aa3..18444fd 100755 --- a/ArgusProbe_Beamlogic.py +++ b/ArgusProbe_Beamlogic.py @@ -1,7 +1,7 @@ -''' +""" Argus probe for the Beamlogic Site Analyzer Lite http://www.beamlogic.com/products/802154-site-analyzer.aspx -''' +""" import time import struct @@ -18,10 +18,12 @@ #============================ helpers ========================================= + def currentUtcTime(): return time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()) -def logCrash(threadName,err): + +def logCrash(threadName, err): output = [] output += ["============================================================="] output += [currentUtcTime()] @@ -39,17 +41,18 @@ def logCrash(threadName,err): #============================ classes ========================================= + class RxSnifferThread(threading.Thread): - ''' + """ Thread which attaches to the sniffer and parses incoming frames. - ''' + """ PCAP_GLOBALHEADER_LEN = 24 # 4+2+2+4+4+4+4 PCAP_PACKETHEADER_LEN = 16 # 4+4+4+4 BEAMLOGIC_HEADER_LEN = 20 # 1+8+1+1+4+4+1 PIPE_SNIFFER = r'\\.\pipe\analyzer' - def __init__(self,txMqttThread): + def __init__(self, txMqttThread): # store params self.txMqttThread = txMqttThread @@ -67,56 +70,56 @@ def __init__(self,txMqttThread): def run(self): try: - time.sleep(1) # let the banners print + time.sleep(1) # let the banners print while True: try: with open(self.PIPE_SNIFFER, 'rb') as sniffer: while True: b = ord(sniffer.read(1)) self._newByte(b) - except (IOError): + except IOError: print "WARNING: Could not read from pipe at \"{0}\".".format( self.PIPE_SNIFFER ) print "Is SiteAnalyzerAdapter running?" time.sleep(1) except Exception as err: - logCrash(self.name,err) + logCrash(self.name, err) #======================== public ========================================== #======================== private ========================================= - def _newByte(self,b): - ''' + def _newByte(self, b): + """ Just received a byte from the sniffer - ''' + """ with self.dataLock: self.rxBuffer += [b] # PCAP global header if not self.doneReceivingGlobalHeader: - if len(self.rxBuffer)==self.PCAP_GLOBALHEADER_LEN: + if len(self.rxBuffer) == self.PCAP_GLOBALHEADER_LEN: self.doneReceivingGlobalHeader = True self.rxBuffer = [] # PCAP packet header elif not self.doneReceivingPacketHeader: - if len(self.rxBuffer)==self.PCAP_PACKETHEADER_LEN: + if len(self.rxBuffer) == self.PCAP_PACKETHEADER_LEN: self.doneReceivingPacketHeader = True self.packetHeader = self._parsePcapPacketHeader(self.rxBuffer) - assert self.packetHeader['incl_len']==self.packetHeader['orig_len'] + assert self.packetHeader['incl_len'] == self.packetHeader['orig_len'] self.rxBuffer = [] # PCAP packet bytes else: - if len(self.rxBuffer)==self.packetHeader['incl_len']: + if len(self.rxBuffer) == self.packetHeader['incl_len']: self.doneReceivingPacketHeader = False self._newFrame(self.rxBuffer) self.rxBuffer = [] - def _parsePcapPacketHeader(self,header): - ''' + def _parsePcapPacketHeader(self, header): + """ Parse a PCAP packet header Per https://wiki.wireshark.org/Development/LibpcapFileFormat: @@ -127,9 +130,9 @@ def _parsePcapPacketHeader(self,header): guint32 incl_len; /* number of octets of packet saved in file */ guint32 orig_len; /* actual length of packet */ } pcaprec_hdr_t; - ''' + """ - assert len(header)==self.PCAP_PACKETHEADER_LEN + assert len(header) == self.PCAP_PACKETHEADER_LEN returnVal = {} ( @@ -141,10 +144,10 @@ def _parsePcapPacketHeader(self,header): return returnVal - def _newFrame(self,frame): - ''' + def _newFrame(self, frame): + """ Just received a full frame from the sniffer - ''' + """ # transform frame frame = self._transformFrame(frame) @@ -152,10 +155,10 @@ def _newFrame(self,frame): # publish frame self.txMqttThread.publishFrame(frame) - def _transformFrame(self,frame): - ''' + def _transformFrame(self, frame): + """ Replace BeamLogic header by ZEP header. - ''' + """ beamlogic = self._parseBeamlogicHeader(frame[:self.BEAMLOGIC_HEADER_LEN]) ieee154 = frame[self.BEAMLOGIC_HEADER_LEN:beamlogic['Length']+self.BEAMLOGIC_HEADER_LEN] @@ -168,8 +171,8 @@ def _transformFrame(self,frame): return zep+ieee154 - def _parseBeamlogicHeader(self,header): - ''' + def _parseBeamlogicHeader(self, header): + """ Parse a Beamlogic header uint64 TimeStamp @@ -177,9 +180,9 @@ def _parseBeamlogicHeader(self,header): uint8 RSSI uint32 GpsLat uint32 GpsLong - ''' + """ - assert len(header)==self.BEAMLOGIC_HEADER_LEN + assert len(header) == self.BEAMLOGIC_HEADER_LEN returnVal = {} ( @@ -208,7 +211,7 @@ def _formatZep(self, channel, timestamp, length, rssi): ord(b) for b in struct.pack('>Q', self._get_ntp_timestamp()) ] + \ [ # Sequence number - 0x02,0x02,0x02,0x02] + \ + 0x02, 0x02, 0x02, 0x02] + \ [ # Reserved Beam logic Timestamp (1/3 of us) ord(b) for b in struct.pack('>Q', timestamp)] + \ [ # Reserved @@ -224,11 +227,10 @@ def _get_ntp_timestamp(): diff = datetime.datetime.utcnow() - datetime.datetime(1900, 1, 1, 0, 0, 0) return diff.days * 24 * 60 * 60 + diff.seconds - class TxMqttThread(threading.Thread): - ''' + """ Thread which publishes sniffed frames to the MQTT broker. - ''' + """ MQTT_BROKER_HOST = 'argus.paris.inria.fr' MQTT_BROKER_PORT = 1883 @@ -248,7 +250,7 @@ def run(self): try: while True: # wait for first frame - msgs = [self.txQueue.get(),] + msgs = [self.txQueue.get(), ] # get other frames (if any) try: @@ -280,18 +282,18 @@ def run(self): ) except Exception as err: - logCrash(self.name,err) + logCrash(self.name, err) #======================== public ========================================== - def publishFrame(self,frame): + def publishFrame(self, frame): msg = { 'description': 'zep', 'device': 'Beamlogic', 'bytes': ''.join(['{0:02x}'.format(b) for b in frame]), } try: - self.txQueue.put(json.dumps(msg),block=False) + self.txQueue.put(json.dumps(msg), block=False) except Queue.Full: print "WARNING transmit queue to MQTT broker full. Dropping frame." @@ -309,13 +311,14 @@ def __init__(self): ) while True: - input = raw_input('>') - print input, + user_input = raw_input('>') + print user_input, except Exception as err: - logCrash('CliThread',err) + logCrash('CliThread', err) #============================ main ============================================ + def main(): # parse parameters @@ -324,5 +327,5 @@ def main(): rxSnifferThread = RxSnifferThread(txMqttThread) cliThread = CliThread() -if __name__=="__main__": +if __name__ == "__main__": main()