-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
1893 lines (1637 loc) · 122 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# pyPCAP
# Copyright (C) 2024 Hallabalooza
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
####################################################################################################
import enum
import itertools
import io
import os.path
import socket
import struct
import sys
import typing
import warnings
####################################################################################################
class IPv4(object):
"""
@brief Class representing an IPv4 Ethernet frame
"""
PROTOCOL_TCP = 6
PROTOCOL_UDP = 17
def __init__(self, pProtocol:int=PROTOCOL_UDP, pMacSrc:str="B4-45-06-2D-4A-99", pMacDst:str="CC-15-31-0A-C0-C2", pAddrSrc:str="192.168.0.1", pAddrDst:str="192.168.0.2", pPortSrc:int=0, pPortDst:int=0, pData:bytes=bytes([])):
"""
@brief Creates an IPv4 instance
@param pProtocol tbd
@param pMacSrc tbd
@param pMacDst tbd
@param pAddrSrc tbd
@param pAddrDst tbd
@param pPortSrc tbd
@param pPortDst tbd
@param pData tbd
"""
self._frmEthr = None
self._frmIPv4 = None
self._frmProt = None
vIPv4HdrDscp = 0 # Differentiated Services Code Point
vIPv4HdrEcn = 0 # Explicit Congestion Notification
vIPv4HdrFlgRsv = 0 # Flag; Reserved
vIPv4HdrFlgDf = 0 # Flag; Don't Fragment
vIPv4HdrFlgMf = 0 # Flag; More Fragments
vIPv4HdrFo = 0 # Fragment Offset
if pProtocol == self.PROTOCOL_UDP:
self._frmProt = ( struct.pack('!HHHH', pPortSrc, pPortDst, (len(pData) + 8), 0) # Header
+ pData # Data
)
self._frmIPv4 = ( struct.pack('!BBHHHBBH4s4s', ((4 << 4) + 5), # Version (4) + Internet Header Length (5)
((vIPv4HdrDscp << 2) + vIPv4HdrEcn),
(len(self._frmProt) + 20), # Total Length
0xAFFE, # Identification
((vIPv4HdrFlgRsv << 7) + (vIPv4HdrFlgDf << 6) + (vIPv4HdrFlgMf << 5) + vIPv4HdrFo),
255, # Time to live
pProtocol, # Protocol
0, # Header Checksum
socket.inet_aton(pAddrSrc), # Source address
socket.inet_aton(pAddrDst) # Destination address
)
+ self._frmProt
)
self._frmEthr = ( int(pMacDst.replace("-", ""), 16).to_bytes(6, 'big') + int(pMacSrc.replace("-", ""), 16).to_bytes(6, 'big') + 0x0800.to_bytes(2, 'big') # Header
+ self._frmIPv4 # Data
+ bytes([0, 0, 0, 0]) # Checksum
)
@property
def eth(self):
"""
@brief Returns the Ethernet frames binary representation
"""
return self._frmEthr
@property
def ipv4(self):
"""
@brief Returns the IPv4 packets binary representation
"""
return self._frmIPv4
@property
def prot(self):
"""
@brief Returns the IPv4 data portions binary representation
"""
return self._frmProt
####################################################################################################
class __EnumStr(enum.Enum):
"""
@brief pyPCAPNG enumeration class
"""
@classmethod
def has_value(cls, value):
return value in cls._value2member_map_
@classmethod
def range(self):
return ",".join(["{}.{}".format(self.__name__, x) for x in self.__members__.keys()])
class __EnumInt(enum.IntEnum):
"""
@brief pyPCAPNG enumeration class
"""
@classmethod
def has_value(cls, value):
return value in cls._value2member_map_
@classmethod
def range(self):
return ",".join(["{}.{}".format(self.__name__, x) for x in self.__members__.keys()])
class PCAPNGException(Exception):
"""
@brief pyPCAPNG exception class
"""
def __init__(self, pMssg=None):
"""
@brief Creates a pyPCAPNG exception instance
@param pMssg The Exception message
"""
self._mssg = pMssg
def __str__(self):
"""
@brief Prints a nicely string representation
"""
if (self._mssg is None): return
else : return repr(self._mssg)
class PCAPNGWarning(Warning):
"""
@brief pyPCAPNG warning class
"""
def __init__(self, pMssg=None, pFile=None, pLiNo=None, pTrgt=None):
"""
@brief Creates a pyPCAPNG warning instance and prints the message
@param pMssg The Warning message
@param pFile The File the Warning relates to
@param pLiNo The number of the line in pFile the Warning relates to
@param pTrgt The stream or file the Warning shall be printed to. Default is sys.stderr.
"""
self.__name__ = "PCAPNGWarning"
self._mssg = {True:"UnknownMessage", False:pMssg}[pMssg is None]
self._file = {True:"UnknownFile", False:pFile}[pFile is None]
self._lino = {True:0, False:pLiNo}[pLiNo is None]
warnings.showwarning(self._mssg, self, self._file, self._lino, pTrgt)
def __str__(self):
"""
@brief Prints a nicely string representation
"""
return repr("{fFile}:{fLiNo}: PCAPNGWarning: {fMssg}".format(fFile=self._file, fLiNo=self._lino, fMssg=self._mssg))
class BLKType(__EnumInt):
SHB = 0x0A0D0D0A
IDB = 0x00000001
EPB = 0x00000006
SPB = 0x00000003
NRB = 0x00000004
ISB = 0x00000005
DSB = 0x0000000A
CB0 = 0x00000BAD
CB1 = 0x40000BAD
UNKNOWN = 0xFFFFFFFF
class BLKByteOrderType(__EnumStr):
BIG = ">"
LITTLE = "<"
NATIVE = {"little":"<", "big":">"}[sys.byteorder]
EVITAN = {"little":">", "big":"<"}[sys.byteorder]
class BLKOptionType(__EnumInt):
ENDOFOPT = 0x0000
COMMENT = 0x0001
CUSTOM0 = 0x0BAC
CUSTOM1 = 0x0BAD
CUSTOM2 = 0x4BAC
CUSTOM3 = 0x4BAD
class SHBOptionType(__EnumInt):
HARDWARE = 0x0002
OS = 0x0003
USERAPPL = 0x0004
class IDBOptionType(__EnumInt):
NAME = 0x0002
DESCRIPTION = 0x0003
IPV4ADDR = 0x0004
IPV6ADDR = 0x0005
MACADDR = 0x0006
EUIADDR = 0x0007
SPEED = 0x0008
TSRESOL = 0x0009
TZONE = 0x000A
FILTER = 0x000B
OS = 0x000C
FCSLEN = 0x000D
TSOFFSET = 0x000E
HARDWARE = 0x000F
TXSPEED = 0x0010
RXSPEED = 0x0011
class EPBOptionType(__EnumInt):
FLAGS = 0x0002
HASH = 0x0003
DROPCOUNT = 0x0004
PACKETID = 0x0005
QUEUE = 0x0006
VERDICT = 0x0007
class SPBOptionType(__EnumInt):
pass
class NRBOptionType(__EnumInt):
DNSNAME = 0x0002
DNSIP4ADDR = 0x0003
DNSIP6ADDR = 0x0004
class ISBOptionType(__EnumInt):
STARTTIME = 0x0002
ENDTIME = 0x0003
IFRECV = 0x0004
IFDROP = 0x0005
FILTERACCEPT = 0x0006
OSDROP = 0x0007
USRDELIV = 0x0008
class DSBOptionType(__EnumInt):
pass
class CB0BOptionType(__EnumInt):
pass
class CB1OptionType(__EnumInt):
pass
SHBOptionType = __EnumInt('SHBOptionType', [(i.name, i.value) for i in itertools.chain(BLKOptionType, SHBOptionType)])
IDBOptionType = __EnumInt('IDBOptionType', [(i.name, i.value) for i in itertools.chain(BLKOptionType, IDBOptionType)])
EPBOptionType = __EnumInt('EPBOptionType', [(i.name, i.value) for i in itertools.chain(BLKOptionType, EPBOptionType)])
SPBOptionType = SPBOptionType
NRBOptionType = __EnumInt('NRBOptionType', [(i.name, i.value) for i in itertools.chain(BLKOptionType, NRBOptionType)])
ISBOptionType = __EnumInt('ISBOptionType', [(i.name, i.value) for i in itertools.chain(BLKOptionType, ISBOptionType)])
DSBOptionType = BLKOptionType
CB0OptionType = BLKOptionType
CB1OptionType = BLKOptionType
class NRBRecordType(__EnumInt):
END = 0x0000
IPV4 = 0x0001
IPV6 = 0x0002
BLKOptionSpec = {BLKOptionType.ENDOFOPT : dict(lenMin = 0, lenMax = 0, type = None, unique = True ),
BLKOptionType.COMMENT : dict(lenMin = 0, lenMax = 65535, type = "{f_len}s", unique = False),
BLKOptionType.CUSTOM0 : dict(lenMin = 4, lenMax = 65535, type = None, unique = False),
BLKOptionType.CUSTOM1 : dict(lenMin = 4, lenMax = 65535, type = None, unique = False),
BLKOptionType.CUSTOM2 : dict(lenMin = 4, lenMax = 65535, type = None, unique = False),
BLKOptionType.CUSTOM3 : dict(lenMin = 4, lenMax = 65535, type = None, unique = False)
}
SHBOptionSpec = {SHBOptionType.HARDWARE : dict(lenMin = 0, lenMax = 65535, type = None, unique = True ),
SHBOptionType.OS : dict(lenMin = 0, lenMax = 65535, type = None, unique = True ),
SHBOptionType.USERAPPL : dict(lenMin = 0, lenMax = 65535, type = None, unique = True )
}
IDBOptionSpec = {IDBOptionType.NAME : dict(lenMin = 0, lenMax = 65535, type = None, unique = True ),
IDBOptionType.DESCRIPTION : dict(lenMin = 0, lenMax = 65535, type = None, unique = True ),
IDBOptionType.IPV4ADDR : dict(lenMin = 8, lenMax = 8, type = None, unique = False),
IDBOptionType.IPV6ADDR : dict(lenMin = 17, lenMax = 17, type = None, unique = False),
IDBOptionType.MACADDR : dict(lenMin = 6, lenMax = 6, type = None, unique = True ),
IDBOptionType.EUIADDR : dict(lenMin = 8, lenMax = 8, type = None, unique = True ),
IDBOptionType.SPEED : dict(lenMin = 8, lenMax = 8, type = "Q", unique = True ),
IDBOptionType.TSRESOL : dict(lenMin = 1, lenMax = 1, type = None, unique = True ),
IDBOptionType.TZONE : dict(lenMin = 4, lenMax = 4, type = None, unique = True ),
IDBOptionType.FILTER : dict(lenMin = 1, lenMax = 65535, type = None, unique = True ),
IDBOptionType.OS : dict(lenMin = 0, lenMax = 65535, type = None, unique = True ),
IDBOptionType.FCSLEN : dict(lenMin = 1, lenMax = 1, type = None, unique = True ),
IDBOptionType.TSOFFSET : dict(lenMin = 8, lenMax = 8, type = "Q", unique = True ),
IDBOptionType.HARDWARE : dict(lenMin = 0, lenMax = 65535, type = None, unique = True ),
IDBOptionType.TXSPEED : dict(lenMin = 8, lenMax = 8, type = "Q", unique = True ),
IDBOptionType.RXSPEED : dict(lenMin = 8, lenMax = 8, type = "Q", unique = True ),
}
EPBOptionSpec = {EPBOptionType.FLAGS : dict(lenMin = 4, lenMax = 4, type = None, unique = True ),
EPBOptionType.HASH : dict(lenMin = 0, lenMax = 65535, type = None, unique = False),
EPBOptionType.DROPCOUNT : dict(lenMin = 8, lenMax = 8, type = None, unique = True ),
EPBOptionType.PACKETID : dict(lenMin = 8, lenMax = 8, type = None, unique = True ),
EPBOptionType.QUEUE : dict(lenMin = 4, lenMax = 4, type = None, unique = True ),
EPBOptionType.VERDICT : dict(lenMin = 0, lenMax = 65535, type = None, unique = False),
}
SPBOptionSpec = {}
NRBOptionSpec = {NRBOptionType.DNSNAME : dict(lenMin = 0, lenMax = 65535, type = None, unique = True),
NRBOptionType.DNSIP4ADDR : dict(lenMin = 4, lenMax = 4, type = None, unique = True),
NRBOptionType.DNSIP6ADDR : dict(lenMin = 16, lenMax = 16, type = None, unique = True),
}
ISBOptionSpec = {ISBOptionType.STARTTIME : dict(lenMin = 8, lenMax = 8, type = None, unique = True),
ISBOptionType.ENDTIME : dict(lenMin = 8, lenMax = 8, type = None, unique = True),
ISBOptionType.IFRECV : dict(lenMin = 8, lenMax = 8, type = None, unique = True),
ISBOptionType.IFDROP : dict(lenMin = 8, lenMax = 8, type = None, unique = True),
ISBOptionType.FILTERACCEPT : dict(lenMin = 8, lenMax = 8, type = None, unique = True),
ISBOptionType.OSDROP : dict(lenMin = 8, lenMax = 8, type = None, unique = True),
ISBOptionType.USRDELIV : dict(lenMin = 8, lenMax = 8, type = None, unique = True),
}
DSBOptionSpec = {}
CB0OptionSpec = {}
CB1OptionSpec = {}
SHBOptionSpec.update(BLKOptionSpec)
IDBOptionSpec.update(BLKOptionSpec)
EPBOptionSpec.update(BLKOptionSpec)
SPBOptionSpec.update()
NRBOptionSpec.update(BLKOptionSpec)
ISBOptionSpec.update(BLKOptionSpec)
DSBOptionSpec.update(BLKOptionSpec)
CB0OptionSpec.update(BLKOptionSpec)
CB1OptionSpec.update(BLKOptionSpec)
NRBRecordSpec = {NRBRecordType.END : dict(lenMin = 0, lenMax = 0, type = None, unique = True),
NRBRecordType.IPV4 : dict(lenMin = 0, lenMax = 65535, type = None, unique = False),
NRBRecordType.IPV6 : dict(lenMin = 0, lenMax = 65535, type = None, unique = False),
}
class LINKType(__EnumInt):
# https://datatracker.ietf.org/doc/html/draft-richardson-opsawg-pcaplinktype-01
NULL = 0
ETHERNET = 1
EXP_ETHERNET = 2
AX25 = 3
PRONET = 4
CHAOS = 5
IEEE802_5 = 6
ARCNET_BSD = 7
SLIP = 8
PPP = 9
FDDI = 10
PPP_HDLC = 50
PPP_ETHER = 51
SYMANTEC_FIREWALL = 99
ATM_RFC1483 = 100
RAW = 101
SLIP_BSDOS = 102
PPP_BSDOS = 103
C_HDLC = 104
IEEE802_11 = 105
ATM_CLIP = 106
FRELAY = 107
LOOP = 108
ENC = 109
LANE8023 = 110
HIPPI = 111
HDLC = 112
LINUX_SLL = 113
LTALK = 114
ECONET = 115
IPFILTER = 116
PFLOG = 117
CISCO_IOS = 118
IEEE802_11_PRISM = 119
IEEE802_11_AIRONET = 120
HHDLC = 121
IP_OVER_FC = 122
SUNATM = 123
RIO = 124
PCI_EXP = 125
AURORA = 126
IEEE802_11_RADIOTAP = 127
TZSP = 128
ARCNET_LINUX = 129
JUNIPER_MLPPP = 130
JUNIPER_MLFR = 131
JUNIPER_ES = 132
JUNIPER_GGSN = 133
JUNIPER_MFR = 134
JUNIPER_ATM2 = 135
JUNIPER_SERVICES = 136
JUNIPER_ATM1 = 137
APPLE_IP_OVER_IEEE1394 = 138
MTP2_WITH_PHDR = 139
MTP2 = 140
MTP3 = 141
SCCP = 142
DOCSIS = 143
LINUX_IRDA = 144
IBM_SP = 145
IBM_SN = 146
RESERVED_01 = 147
RESERVED_02 = 148
RESERVED_03 = 149
RESERVED_04 = 150
RESERVED_05 = 151
RESERVED_06 = 152
RESERVED_07 = 153
RESERVED_08 = 154
RESERVED_09 = 155
RESERVED_10 = 156
RESERVED_11 = 157
RESERVED_12 = 158
RESERVED_13 = 159
RESERVED_14 = 160
RESERVED_15 = 161
RESERVED_16 = 162
IEEE802_11_AVS = 163
JUNIPER_MONITOR = 164
BACNET_MS_TP = 165
PPP_PPPD = 166
JUNIPER_PPPOE = 167
JUNIPER_PPPOE_ATM = 168
GPRS_LLC = 169
GPF_T = 170
GPF_F = 171
GCOM_T1E1 = 172
GCOM_SERIAL = 173
JUNIPER_PIC_PEER = 174
ERF_ETH = 175
ERF_POS = 176
LINUX_LAPD = 177
JUNIPER_ETHER = 178
JUNIPER_PPP = 179
JUNIPER_FRELAY = 180
JUNIPER_CHDLC = 181
MFR = 182
JUNIPER_VP = 182
A653_ICM = 185
USB_FREEBSD = 186
BLUETOOTH_HCI_H4 = 187
IEEE802_16_MAC_CPS = 188
USB_LINUX = 189
CAN20B = 190
IEEE802_15_4_LINUX = 191
PPI = 192
IEEE802_16_MAC_CPS_RADIO = 193
JUNIPER_ISM = 194
IEEE802_15_4_WITHFCS = 195
SITA = 196
ERF = 197
RAIF1 = 198
IPMB_KONTRON = 199
JUNIPER_ST = 200
BLUETOOTH_HCI_H4_WITH_PHDR = 201
AX25_KISS = 202
LAPD = 203
PPP_WITH_DIR = 204
C_HDLC_WITH_DIR = 205
FRELAY_WITH_DIR = 206
LAPB_WITH_DIR = 207
Reserved_17 = 208
IPMB_LINUX = 209
FLEXRAY = 210
MOST = 211
LIN = 212
X2E_SERIAL = 213
X2E_XORAYA = 214
IEEE802_15_4_NONASK_PHY = 215
LINUX_EVDEV = 216
GSMTAP_UM = 217
GSMTAP_ABIS = 218
MPLS = 219
USB_LINUX_MMAPPED = 220
DECT = 221
AOS = 222
WIHART = 223
FC_2 = 224
FC_2_WITH_FRAME_DELIMS = 225
IPNET = 226
CAN_SOCKETCAN = 227
IPV4 = 228
IPV6 = 229
IEEE802_15_4_NOFCS = 230
DBUS = 231
JUNIPER_VS = 232
JUNIPER_SRX_E2E = 233
JUNIPER_FIBRECHANNEL = 234
DVB_CI = 235
MUX27010 = 236
STANAG_5066_D_PDU = 237
JUNIPER_ATM_CEMIC = 238
NFLOG = 239
NETANALYZER = 240
NETANALYZER_TRANSPARENT = 241
IPOIB = 242
MPEG_2_TS = 243
NG40 = 244
NFC_LLCP = 245
PFSYNC = 246
INFINIBAND = 247
SCTP = 248
USBPCAP = 249
RTAC_SERIAL = 250
BLUETOOTH_LE_LL = 251
WIRESHARK_UPPER_PDU = 252
NETLINK = 253
BLUETOOTH_LINUX_MONITOR = 254
BLUETOOTH_BREDR_BB = 255
BLUETOOTH_LE_LL_WITH_PHDR = 256
PROFIBUS_DL = 257
PKTAP = 258
EPON = 259
IPMI_HPM_2 = 260
ZWAVE_R1_R2 = 261
ZWAVE_R3 = 262
WATTSTOPPER_DLM = 263
ISO_14443 = 264
RDS = 265
USB_DARWIN = 266
OPENFLOW = 267
SDLC = 268
TI_LLN_SNIFFER = 269
LORATAP = 270
VSOCK = 271
NORDIC_BLE = 272
DOCSIS31_XRA31 = 273
ETHERNET_MPACKET = 274
DISPLAYPORT_AUX = 275
LINUX_SLL2 = 276
SERCOS_MONITOR = 277
OPENVIZSLA = 278
EBHSCR = 279
VPP_DISPATCH = 280
DSA_TAG_BRCM = 281
DSA_TAG_BRCM_PREPEND = 282
IEEE802_15_4_TAP = 283
DSA_TAG_DSA = 284
DSA_TAG_EDSA = 285
ELEE = 286
Z_WAVE_SERIAL = 287
USB_2_0 = 288
ATSC_ALP = 289
class _GB(object):
"""
@brief Class representing a PCAPNG general block structure
"""
def __init__(self, pFile=None, **kwargs) -> None:
"""
@brief Creates a general PCAPNG block
@param pFile Name of the file this block is located in. This is only used for informational outputs.
"""
if (set(kwargs.keys()) == set(["pBlockType", "pBlockByteOrder" ])): self.__init_gb_spc__(pFile, **kwargs)
elif (set(kwargs.keys()) == set(["pBlockType", "pBlockByteOrder", "pBlockBinData"])): self.__init_gb_bin__(pFile, **kwargs)
else : raise PCAPNGException("Failed to instaniate an object of class '_GB'.")
def __init_gb_spc__(self, pFile=None, pBlockType=BLKType.UNKNOWN, pBlockByteOrder:BLKByteOrderType=BLKByteOrderType.NATIVE) -> None:
"""
@brief Creates a general PCAPNG block from specific data
@param pFile Name of the file this block is located in. This is only used for informational outputs.
@param pBlockType Value of type 'BLKType' that specifies the desired PCAPNG format block type.
@param pBlockByteOrder Value of type 'BLKByteOrderType' that specifies the desired PCAPNG block byte order.
"""
if (not BLKType.has_value(pBlockType) ): raise PCAPNGException("Failed to create a {f_bt} object. Parameter 'pBlockType' (== {f_val}) is not in range '[{f_rng}]'." .format(f_bt=str(pBlockType), f_val=pBlockType, f_rng=BLKType.range()) )
if (not BLKByteOrderType.has_value(pBlockByteOrder.value)): raise PCAPNGException("Failed to create a {f_bt} object. Parameter 'pBlockByteOrder' (== {f_val}) is not in range '[{f_rng}]'.".format(f_bt=str(pBlockType), f_val=pBlockByteOrder, f_rng=BLKByteOrderType.range()))
self._file = pFile
self._blockType = pBlockType
self._blockLength = None
self._blockByteOrder = None
self._blockRecords = dict(raw=None, lst=None)
self._blockRecordType = {BLKType.SHB: None, BLKType.IDB: None, BLKType.EPB: None, BLKType.SPB: None, BLKType.NRB: NRBRecordType, BLKType.ISB: None, BLKType.DSB: None, BLKType.CB0: None, BLKType.CB1: None, BLKType.UNKNOWN: None}[pBlockType]
self._blockRecordSpec = {BLKType.SHB: None, BLKType.IDB: None, BLKType.EPB: None, BLKType.SPB: None, BLKType.NRB: NRBRecordSpec, BLKType.ISB: None, BLKType.DSB: None, BLKType.CB0: None, BLKType.CB1: None, BLKType.UNKNOWN: None}[pBlockType]
self._blockOptions = dict(raw=None, lst=None)
self._blockOptionType = {BLKType.SHB: SHBOptionType, BLKType.IDB: IDBOptionType, BLKType.EPB: EPBOptionType, BLKType.SPB: SPBOptionType, BLKType.NRB: NRBOptionType, BLKType.ISB: ISBOptionType, BLKType.DSB: DSBOptionType, BLKType.CB0: CB0OptionType, BLKType.CB1: CB1OptionType, BLKType.UNKNOWN: None}[pBlockType]
self._blockOptionSpec = {BLKType.SHB: SHBOptionSpec, BLKType.IDB: IDBOptionSpec, BLKType.EPB: EPBOptionSpec, BLKType.SPB: SPBOptionSpec, BLKType.NRB: NRBOptionSpec, BLKType.ISB: ISBOptionSpec, BLKType.DSB: DSBOptionSpec, BLKType.CB0: CB0OptionSpec, BLKType.CB1: CB1OptionSpec, BLKType.UNKNOWN: None}[pBlockType]
self.blockByteOrder = pBlockByteOrder
def __init_gb_bin__(self, pFile=None, pBlockType:BLKType=BLKType.UNKNOWN, pBlockByteOrder:BLKByteOrderType=BLKByteOrderType.NATIVE, pBlockBinData:typing.Union[bytes, bytearray]=None) -> None:
"""
@brief Creates a general PCAPNG block from binary data
@param pFile Name of the file this block is located in. This is only used for informational outputs.
@param pBlockType Value of type 'BLKType' that specifies the desired PCAPNG format block type.
@param pBlockByteOrder Value of type 'BLKByteOrderType' that specifies the desired PCAPNG block byte order.
@param pBlockBinData Value of type 'bytes' or 'bytearray' that specifies the exact binary data of the block.
"""
if (not BLKType.has_value(pBlockType) ): raise PCAPNGException("Failed to create a {f_bt} object. Parameter 'pBlockType' (== {f_val}) is not in range '[{f_rng}]'." .format(f_bt=str(pBlockType), f_val=pBlockType, f_rng=BLKType.range()) )
if (not BLKByteOrderType.has_value(pBlockByteOrder.value)): raise PCAPNGException("Failed to create a {f_bt} object. Parameter 'pBlockByteOrder' (== {f_val}) is not in range '[{f_rng}]'." .format(f_bt=str(pBlockType), f_val=pBlockByteOrder, f_rng=BLKByteOrderType.range()))
if (not isinstance(pBlockBinData, (bytes, bytearray)) ): raise PCAPNGException("Failed to create a {f_bt} object. Parameter 'pBlockBinData' (== {f_val}) is not of type 'bytes' or 'bytearray'.".format(f_bt=str(pBlockType), f_val=type(pBlockBinData)) )
if ((len(pBlockBinData) % 4) != 0 ): raise PCAPNGException("Failed to create a {f_bt} object. Length of parameter 'pBlockBinData' (== {f_val}) is not a multiple of 4." .format(f_bt=str(pBlockType), f_val=len(pBlockBinData)) )
self._file = pFile
self._blockType = BLKType(struct.unpack("{f_bo}I".format(f_bo=pBlockByteOrder.value), pBlockBinData[0:4])[0])
self._blockLength = struct.unpack("{f_bo}I".format(f_bo=pBlockByteOrder.value), pBlockBinData[4:8])[0]
self._blockByteOrder = None
self._blockRecords = dict(raw=None, lst=None)
self._blockRecordType = {BLKType.SHB: None, BLKType.IDB: IDBOptionType, BLKType.EPB: EPBOptionType, BLKType.SPB: SPBOptionType, BLKType.NRB: NRBRecordType, BLKType.ISB: None, BLKType.DSB: None, BLKType.CB0: None, BLKType.CB1: None, BLKType.UNKNOWN: None}[pBlockType]
self._blockRecordSpec = {BLKType.SHB: None, BLKType.IDB: IDBOptionSpec, BLKType.EPB: EPBOptionSpec, BLKType.SPB: SPBOptionSpec, BLKType.NRB: NRBRecordSpec, BLKType.ISB: None, BLKType.DSB: None, BLKType.CB0: None, BLKType.CB1: None, BLKType.UNKNOWN: None}[pBlockType]
self._blockOptions = dict(raw=None, lst=None)
self._blockOptionType = {BLKType.SHB: SHBOptionType, BLKType.IDB: IDBOptionType, BLKType.EPB: EPBOptionType, BLKType.SPB: SPBOptionType, BLKType.NRB: NRBOptionType, BLKType.ISB: ISBOptionType, BLKType.DSB: DSBOptionType, BLKType.CB0: CB0OptionType, BLKType.CB1: CB1OptionType, BLKType.UNKNOWN: None}[pBlockType]
self._blockOptionSpec = {BLKType.SHB: SHBOptionSpec, BLKType.IDB: IDBOptionSpec, BLKType.EPB: EPBOptionSpec, BLKType.SPB: SPBOptionSpec, BLKType.NRB: NRBOptionSpec, BLKType.ISB: ISBOptionSpec, BLKType.DSB: DSBOptionSpec, BLKType.CB0: CB0OptionSpec, BLKType.CB1: CB1OptionSpec, BLKType.UNKNOWN: None}[pBlockType]
if (not BLKType.has_value(self._blockType) ): raise PCAPNGException("Failed to create a {f_bt} object. Block type (== {f_val}) extracted from parameter 'pBlockBinData' is not in range '[{f_rng}]'.".format(f_bt=str(pBlockType), f_val=self._blockType, f_rng=BLKType.range()))
self.blockByteOrder = pBlockByteOrder
if ((pBlockType != BLKType.UNKNOWN) and (pBlockType != self._blockType) ): raise PCAPNGException("Failed to create a {f_bt} object. Block type (== {f_val0}) extracted from parameter 'pBlockBinData' does not match the the expected block type (== {f_val1}) specified via parameter 'pBlockType'.".format(f_bt=str(pBlockType), f_val0=self._blockType, f_val1=pBlockType))
if (self.blockLength != struct.unpack("{f_bo}I".format(f_bo=pBlockByteOrder.value), pBlockBinData[-4:])[0]): raise PCAPNGException("Failed to create a {f_bt} object. Block length elements extracted from parameter 'pBlockBinData' are not equal.".format(f_bt=str(pBlockType)))
def __tlvListCheck(self, pTlvSpec:dict=None, pValue:list=None) -> dict:
"""
tlv list = [ (tag1.1, valX), (tag1.2, valY), (tag2.1, valZ), ... ]
"""
lRslt = list()
if (pValue is not None):
lTlvCntr = dict([(k,0) for k in pTlvSpec.keys()])
for i,(lTag, lVal) in enumerate(pValue):
if (lTag not in pTlvSpec ): lRslt.append(("Failed to check {f_bt} object {{f_typ}}s, because of illegal tag '{{f_tag}}'.".format(f_bt=str(self.blockType)), lTag))
if ((pTlvSpec[lTag]["unique"] is True) and (lTlvCntr[lTag] != 0) ): lRslt.append(("Failed to check {f_bt} object {{f_typ}}s, because unique tag '{{f_tag}}' exists multiple times.".format(f_bt=str(self.blockType)), lTag))
if (isinstance(lVal, bytes) and (len(lVal) < pTlvSpec[lTag]["lenMin"]) ): lRslt.append(("Failed to check {f_bt} object {{f_typ}}s, because the actual length '{f_actLen}' of the value for tag '{{f_tag}}' is smaller then minimum specified length '{f_nomLen}'.".format(f_bt=str(self.blockType), f_actLen=len(lVal), f_nomLen=pTlvSpec[lTag]["lenMin"]), lTag))
if (isinstance(lVal, bytes) and (len(lVal) > pTlvSpec[lTag]["lenMax"]) ): lRslt.append(("Failed to check {f_bt} object {{f_typ}}s, because tag actual length '{f_actLen}' of the value for tag '{{f_tag}}' is greater then maximum specified length '{f_nomLen}'.".format(f_bt=str(self.blockType), f_actLen=len(lVal), f_nomLen=pTlvSpec[lTag]["lenMax"]), lTag))
if (isinstance(lVal, int) and (((lVal.bit_length() + 7) // 8) > pTlvSpec[lTag]["lenMax"])): lRslt.append(("Failed to check {f_bt} object {{f_typ}}s, because tag actual length '{f_actLen}' of the value for tag '{{f_tag}}' is greater then maximum specified length '{f_nomLen}'.".format(f_bt=str(self.blockType), f_actLen=(lVal.bit_length() + 7) // 8, f_nomLen=pTlvSpec[lTag]["lenMax"]), lTag))
lTlvCntr[lTag] += 1
return lRslt
def __tlvListToRaw(self, pTlvSpecs:dict=None, pValue:list=None) -> bytearray:
lRslt = bytearray()
for lTag, lVal in pValue:
if ((pTlvSpecs[lTag]["type"] is not None) and (not pTlvSpecs[lTag]["type"].startswith("{f_len}"))): lVal = bytearray(struct.pack(pTlvSpecs["type"].format(f_len=len(lVal)), lVal))
else : lVal = bytearray( lVal )
lLen = len(lVal)
lPad = ((4 - (lLen % 4)) & 0x3)
lRslt += bytearray(struct.pack("{f_bo}HH".format(f_bo=self.blockByteOrder.value), lTag, lLen)) + lVal + bytearray([0 for i in range(lPad)])
return lRslt
def __tlvRawToList(self, pTlvSpecs:dict=None, pValue:typing.Union[bytes, bytearray]=None, pType="unknown") -> list:
lRslt = list()
lPos = 0
while (lPos < len(pValue)):
lTag, lLen = struct.unpack("{f_bo}HH".format(f_bo=self.blockByteOrder.value), pValue[lPos : (lPos + 4)])
if (lTag in pTlvSpecs):
if (pTlvSpecs[lTag]["type"] is not None): lVal = struct.unpack(pTlvSpecs[lTag]["type"].format(f_len=lLen), pValue[(lPos + 4) : (lPos + 4 + lLen)])[0]
else : lVal = pValue[(lPos + 4) : (lPos + 4 + lLen)]
lRslt.append(tuple([lTag, lVal]))
else:
# ignore not specified option
PCAPNGWarning(pMssg="[{f_blk}] unspecified {f_typ} tag code '{f_tag:>6d}/0x{f_tag:>04X}' ignored ('{f_val}')".format(f_blk=str(self.blockType), f_typ=pType, f_tag=lTag, f_val=pValue[(lPos + 4) : (lPos + 4 + lLen)]), pFile=self.file)
lPad = pValue[(lPos + 4 + lLen) : (lPos + 4 + lLen + ((4 - (lLen % 4)) & 0x3))]
lPos += ((4 + lLen) + len(lPad))
return lRslt
def _rawData(self, pBlockBody) -> bytes:
"""
@brief Returns the blocks binary representation
"""
self._blockLength = 12 + len(pBlockBody)
return struct.pack("{f_bo}II".format(f_bo=self._blockByteOrder.value), self._blockType, self._blockLength) + pBlockBody + struct.pack("{f_bo}I".format(f_bo=self._blockByteOrder.value), self._blockLength)
def _recordListCheck(self, pRecordSpec:dict=None, pRecordList:list=None) -> dict:
lRslt = list()
if (pRecordSpec is None):
lRslt.append("Failed to check {f_bt} object records, because no record definiton is available.".format(f_bt=str(self.blockType)))
else:
lRslt.extend(self.__tlvListCheck(pRecordSpec, pRecordList))
if ((pRecordList is not None) and (len(pRecordList) >= 1) and (pRecordList[-1][0] != NRBRecordType.END)): lRslt.append("Failed to check {f_bt} object records, because tag '{f_tag}' is not located at the end.".format(f_bt=str(self.blockType), f_tag=NRBRecordType.END))
lRslt = [m[0].format(f_typ="record", f_tag=self._blockRecordType(m[1]).name) for m in lRslt]
return lRslt
def _recordListToRaw(self, pRecordSpecs:dict=None, pValue:typing.Union[bytes, bytearray]=None) -> list:
return self.__tlvListToRaw(pRecordSpecs, pValue)
def _recordRawToList(self, pRecordSpecs:dict=None, pValue:typing.Union[bytes, bytearray]=None) -> list:
lRslt = self.__tlvRawToList(pRecordSpecs, pValue, "record")
if (lRslt): lRslt = [(self.blockRecordType(lTag), lVal) for lTag, lVal in lRslt]
return lRslt
def _optionListCheck(self, pOptionSpec:dict=None, pOptionList:list=None) -> dict:
lRslt = list()
if (pOptionSpec is None):
lRslt.append("Failed to check {f_bt} object options, because no option definiton is available.".format(f_bt=str(self.blockType)))
else:
lRslt.extend(self.__tlvListCheck(pOptionSpec, pOptionList))
if ((pOptionList is not None) and (len(pOptionList) >= 1) and (pOptionList[-1][0] != BLKOptionType.ENDOFOPT)): lRslt.append("Failed to check {f_bt} object options, because tag '{f_tag}' is not located at the end.".format(f_bt=str(self.blockType), f_tag=BLKOptionType.ENDOFOPT))
lRslt = [m[0].format(f_typ="option", f_tag=self._blockOptionType(m[1]).name) for m in lRslt]
return lRslt
def _optionListToRaw(self, pOptionSpecs:dict=None, pValue:typing.Union[bytes, bytearray]=None) -> list:
return self.__tlvListToRaw(pOptionSpecs, pValue)
def _optionRawToList(self, pOptionSpecs:dict=None, pValue:typing.Union[bytes, bytearray]=None) -> list:
lRslt = self.__tlvRawToList(pOptionSpecs, pValue, "option")
if (lRslt): lRslt = [(self.blockOptionType(lTag), lVal) for lTag, lVal in lRslt]
return lRslt
@property
def file(self) -> str:
return self._file
@property
def blockRecordType(self) -> dict:
return self._blockRecordType
@property
def blockRecordSpec(self) -> dict:
return self._blockRecordSpec
@property
def blockOptionType(self) -> dict:
return self._blockOptionType
@property
def blockOptionSpec(self) -> dict:
return self._blockOptionSpec
@property
def blockType(self) -> int:
return self._blockType
@property
def blockLength(self) -> int:
return self._blockLength
@property
def blockByteOrder(self) -> int:
return self._blockByteOrder
@blockByteOrder.setter
def blockByteOrder(self, pValue:BLKByteOrderType) -> None:
if (not BLKByteOrderType.has_value(pValue.value)): raise PCAPNGException("Failed to set property 'blockByteOrder' of a {f_bt} object. The desired value (== {f_val}) of the property is not in range '[{f_rng}]'.".format(f_bt=self.blockType.name, f_val=pValue, f_rng=BLKByteOrderType.range()))
self._blockByteOrder = pValue
@property
def options(self) -> list:
# options = [ (optioncode1.1, [optionvalueX, ...]), (optioncode1.2, [optionvalueY, ...]), (optioncode2.1, [optionvalueZ, ...]), ..., (pyPCAPNG.BLKOptionType.ENDOFOPT, []) ]
return self._blockOptions["lst"]
@options.setter
def options(self, pValue:typing.Union[list, type(None)]) -> None:
if (not isinstance(pValue, (list, type(None)))): raise PCAPNGException("Failed to set property 'options' of a {f_bt} object. The desired value of the property is not of type 'list' or 'NoneType'.".format(f_bt=self.blockType.name))
if (self.blockByteOrder is None ): raise PCAPNGException("Failed to set property 'options' of a {f_bt} object. The attribute 'blockByteOrder' is unknown.".format(f_bt=self.blockType.name))
if (isinstance(pValue, list)):
lErrors = list(set(self._optionListCheck(self.blockOptionSpec, pValue)))
if (lErrors): PCAPNGWarning(pMssg="Failed to set property 'options' of a {f_bt} object. The desired value of the property is incorrect. ({f_error})".format(f_error="\n".join(lErrors)), pFile=self.file)
self._blockOptions = dict(raw=self._optionListToRaw(self.blockOptionSpec, pValue), lst=pValue)
else:
self._blockOptions = dict(raw=None, lst=None)
@property
def optionsRaw(self) -> bytearray:
return self._blockOptions["raw"]
@optionsRaw.setter
def optionsRaw(self, pValue:typing.Union[bytes, bytearray, type(None)]) -> None:
if (not isinstance(pValue, (bytes, bytearray, type(None)))): raise PCAPNGException("Failed to set property 'optionsRaw' of a {f_bt} object. The desired value of the property is not of type 'bytes', 'bytearray' or 'NoneType'.".format(f_bt=self.blockType.name))
if ((len(pValue) % 4) != 0 ): raise PCAPNGException("Failed to set property 'optionsRaw' of a {f_bt} object. The length of the provided parameter 'pValue' is not a multiple of 4.".format(f_bt=self.blockType.name))
if (self.blockByteOrder is None ): raise PCAPNGException("Failed to set property 'optionsRaw' of a {f_bt} object. The attribute 'blockByteOrder' is unknown.".format(f_bt=self.blockType.name))
if (isinstance(pValue, (bytes, bytearray))):
lBlockOptions = self._optionRawToList(self.blockOptionSpec, pValue)
lErrors = list(set(self._optionListCheck(self.blockOptionSpec, lBlockOptions)))
if (lErrors): PCAPNGWarning(pMssg="Failed to set property 'optionsRaw' of a {f_bt} object. The desired value of the property is incorrect. ({f_error})".format(f_bt=self.blockType.name, f_error="\n".join(lErrors)), pFile=self.file)
self._blockOptions = dict(raw=pValue, lst=lBlockOptions)
else:
self._blockOptions = dict(raw=None, lst=None)
class SHB(_GB):
"""
@brief PCAPNG Section Header Block
"""
def __init__(self, pFile=None, **kwargs):
"""
@brief Creates a PCAPNG SHB block
@param pFile Name of the file this block is located in. This is only used for informational outputs.
"""
self._versionMajor = None
self._versionMinor = None
self._sectionLength = None
if (set(kwargs.keys()) == set(["pBlockByteOrder", "pBlockBinData"])): self.__init_shb_bin__(pFile, **kwargs)
else : self.__init_shb_spc__(pFile, **kwargs)
def __init_shb_spc__(self, pFile=None, pBlockByteOrder:BLKByteOrderType=BLKByteOrderType.NATIVE, pMajorVersion:int=int(1), pMinorVersion:int=int(0), pSectionLength:int=int(0xFFFFFFFFFFFFFFFF), pOptions:list=None) -> None:
"""
@brief Creates a PCAPNG SHB block from specific data
@param pFile Name of the file this block is located in. This is only used for informational outputs.
@param pBlockByteOrder Value of type 'BLKByteOrderType' that specifies the desired PCAPNG block byte order.
@param pMajorVersion Value of type 'int' that specifies the number of the current major version of the format.
@param pMinorVersion Value of type 'int' that specifies the number of the current minor version of the format.
@param pSectionLength Value of type 'int' that specifies the number of the length in octets of the section initiated by this SHB.
@param pOptions List of tuples of option key/value pairs. E.g. [(pyPCAPNG.SHBOptionType.HARDWARE, bytes("Z80", encoding="utf-8")), (pyPCAPNG.SHBOptionType.COMMENT, bytes("Comment 1", encoding="utf-8")), (pyPCAPNG.SHBOptionType.COMMENT, bytes("Comment 2", encoding="utf-8")), (pyPCAPNG.SHBOptionType.ENDOFOPT, [])]
"""
super().__init__(pFile=pFile, pBlockType=BLKType.SHB, pBlockByteOrder=pBlockByteOrder)
self.versionMajor = pMajorVersion
self.versionMinor = pMinorVersion
self.sectionLength = pSectionLength
self.options = pOptions
def __init_shb_bin__(self, pFile=None, pBlockByteOrder:BLKByteOrderType=None, pBlockBinData:typing.Union[bytes, bytearray]=bytes([])) -> None:
"""
@brief Creates a PCAPNG SHB block from binary data
@param pFile Name of the file this block is located in. This is only used for informational outputs.
@param pBlockByteOrder Value of type 'BLKByteOrderType' that specifies the desired PCAPNG block byte order.
@param pBlockBinData Value of type 'bytes' or 'bytearray' that specifies the exact binary data of the block.
"""
if (len(pBlockBinData) < 28): raise PCAPNGException("Failed to create a {f_bt} object. Parameter 'pBlockBinData' does not have the required minimum length.".format(f_bt=str(self.blockType), f_val=type(pBlockBinData)))
lBlockByteOrder = struct.unpack("@I", pBlockBinData[8:12])[0]
if (lBlockByteOrder == 0x1A2B3C4D): self.blockByteOrder = BLKByteOrderType.LITTLE
elif (lBlockByteOrder == 0x4D3C2B1A): self.blockByteOrder = BLKByteOrderType.BIG
else : raise PCAPNGException("Failed to create a SHB object from binary data that 'Byte-Order Magic' element is wrong.")
super().__init__(pFile=pFile, pBlockType=BLKType.SHB, pBlockByteOrder=self.blockByteOrder, pBlockBinData=pBlockBinData)
if ((pBlockByteOrder is not None) and (pBlockByteOrder != self.blockByteOrder)): raise PCAPNGException("Failed to create a {f_bt} object. Attribute 'blockByteOrder' (== {f_val}) extracted from parameter 'pBlockBinData' is not in range '[{f_rng}]'.".format(f_bt=str(pBlockType), f_val=pBlockByteOrder, f_rng=BLKByteOrderType.range()))
self.versionMajor, self.versionMinor, self.sectionLength = struct.unpack("{f_bo}HHQ".format(f_bo=self.blockByteOrder.value), pBlockBinData[12:24])
if (24 < (len(pBlockBinData) - 4)):
self.optionsRaw = pBlockBinData[24:-4]
@property
def rawData(self):
return self._rawData(struct.pack("{f_bo}IHHQ".format(f_bo=self._blockByteOrder.value), int(0x1A2B3C4D), self._versionMajor, self._versionMinor, self._sectionLength) + {True:self.optionsRaw, False:bytes([])}[self.optionsRaw is not None])
@property
def versionMajor(self) -> int:
return self._versionMajor
@versionMajor.setter
def versionMajor(self, pValue:int) -> None:
if (not isinstance(pValue, int)): raise PCAPNGException("Failed to set property 'versionMajor' of a {f_bt} object. The desired value (== {f_val}) is not of type 'int'.".format(f_bt=self.blockType.name, f_val=pValue))
if (not 0 <= pValue <= 0xFFFF ): raise PCAPNGException("Failed to set property 'versionMajor' of a {f_bt} object. The desired value (== {f_val}) of the property is not in range '[0,(2^16)-1]'.".format(f_bt=self.blockType.name, f_val=pValue))
self._versionMajor = pValue
@property
def versionMinor(self) -> int:
return self._versionMinor
@versionMinor.setter
def versionMinor(self, pValue:int) -> None:
if (not isinstance(pValue, int)): raise PCAPNGException("Failed to set property 'versionMinor' of a {f_bt} object. The desired value (== {f_val}) is not of type 'int'.".format(f_bt=self.blockType.name, f_val=pValue))
if (not 0 <= pValue <= 0xFFFF ): raise PCAPNGException("Failed to set property 'versionMinor' of a {f_bt} object. The desired value (== {f_val}) of the property is not in range '[0,(2^16)-1]'.".format(f_bt=self.blockType.name, f_val=pValue))
self._versionMinor = pValue
@property
def sectionLength(self) -> int:
return self._sectionLength
@sectionLength.setter
def sectionLength(self, pValue:int) -> None:
if (not isinstance(pValue, int) ): raise PCAPNGException("Failed to set property 'sectionLength' of a {f_bt} object. The desired value (== {f_val}) is not of type 'int'.".format(f_bt=self.blockType.name, f_val=pValue))
if (not 0 <= pValue <= 0xFFFFFFFFFFFFFFFF): raise PCAPNGException("Failed to set property 'sectionLength' of a {f_bt} object. The desired value (== {f_val}) of the property is not in range '[0,(2^64)-1]'.".format(f_bt=self.blockType.name, f_val=pValue))
self._sectionLength = pValue
class IDB(_GB):
"""
@brief PCAPNG Interface Description Block
"""
def __init__(self, pFile=None, **kwargs):
self._linkType = None
self._snapLen = None
if (set(kwargs.keys()) == set(["pBlockByteOrder", "pBlockBinData"])): self.__init_idb_bin__(pFile, **kwargs)
else : self.__init_idb_spc__(pFile, **kwargs)
def __init_idb_spc__(self, pFile=None, pBlockByteOrder:BLKByteOrderType=BLKByteOrderType.NATIVE, pLinkType:int=int(1), pSnapLen:int=int(0), pOptions:list=None) -> None:
"""
@brief Creates a PCAPNG IDB block from specific data
@param pFile Name of the file this block is located in. This is only used for informational outputs.
@param pBlockByteOrder Value of type 'BLKByteOrderType' that specifies the desired PCAPNG block byte order.
@param pLinkType Value of type '(unsigned) int' that specifies the link layer type of this interface.
@param pSnapLen Value of type '(unsigned) int' that specifies the maximum number of octets captured from each packet. The portion of each packet that exceeds this value will not be stored in the file. A value of zero indicates no limit.
@param pOptions List of tuples of option key/value pairs. E.g. [(pyPCAPNG.IDBOptionType.NAME, bytes("LAN1", encoding="utf-8")), (pyPCAPNG.IDBOptionType.IPV4ADDR, [0xC0, 0xA8, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0x00]), (pyPCAPNG.IDBOptionType.COMMENT, bytes("Comment 2", encoding="utf-8")), (pyPCAPNG.IDBOptionType.ENDOFOPT, [])]
"""
super().__init__(pFile=pFile, pBlockType=BLKType.IDB, pBlockByteOrder=pBlockByteOrder)
self.linkType = pLinkType
self.snapLen = pSnapLen
self.options = pOptions
def __init_idb_bin__(self, pFile=None, pBlockByteOrder:BLKByteOrderType=None, pBlockBinData:typing.Union[bytes, bytearray]=bytes([])) -> None:
"""
@brief Creates a PCAPNG IDB block from binary data
@param pFile Name of the file this block is located in. This is only used for informational outputs.
@param pBlockByteOrder Value of type 'BLKByteOrderType' that specifies the desired PCAPNG block byte order.
@param pBlockBinData Value of type 'bytes' or 'bytearray' that specifies the exact binary data of the block.
"""
super().__init__(pFile=pFile, pBlockType=BLKType.IDB, pBlockByteOrder=pBlockByteOrder, pBlockBinData=pBlockBinData)
if (len(pBlockBinData) < 20): raise PCAPNGException("Failed to create a {f_bt} object. Parameter 'pBlockBinData' does not have the required minimum length.".format(f_bt=str(self.blockType), f_val=type(pBlockBinData)))
self.linkType, dummy, self.snapLen = struct.unpack("{f_bo}HHI".format(f_bo=pBlockByteOrder.value), pBlockBinData[8:16])
if (16 < (len(pBlockBinData) - 4)):
self.optionsRaw = pBlockBinData[16:-4]
@property
def rawData(self):
return self._rawData(struct.pack("{f_bo}HHI".format(f_bo=self.blockByteOrder.value), self.linkType, 0, self.snapLen) + {True:self.optionsRaw, False:bytes([])}[self.optionsRaw is not None])
@property
def linkType(self) -> int:
return self._linkType
@linkType.setter
def linkType(self, pValue:int) -> None:
if (not isinstance(pValue, int)): raise PCAPNGException("Failed to set property 'linkType' of a {f_bt} object. The desired value (== {f_val}) is not of type 'int'.".format(f_bt=self.blockType.name, f_val=pValue))
if (not 0 <= pValue <= 0xFFFF ): raise PCAPNGException("Failed to set property 'linkType' of a {f_bt} object. The desired value (== {f_val}) of the property is not in range '[0,(2^16)-1]'.".format(f_bt=self.blockType.name, f_val=pValue))
self._linkType = pValue
@property
def snapLen(self) -> int:
return self._snapLen
@snapLen.setter
def snapLen(self, pValue:int) -> None:
if (not isinstance(pValue, int) ): raise PCAPNGException("Failed to set property 'linkType' of a {f_bt} object. The desired value (== {f_val}) is not of type 'int'.".format(f_bt=self.blockType.name, f_val=pValue))
if (not 0 <= pValue <= 0xFFFFFFFF): raise PCAPNGException("Failed to set property 'linkType' of a {f_bt} object. The desired value (== {f_val}) of the property is not in range '[0,(2^32)-1]'.".format(f_bt=self.blockType.name, f_val=pValue))
self._snapLen = pValue
class EPB(_GB):
"""
@brief PCAPNG Enhanced Packet Block
"""
def __init__(self, pFile=None, **kwargs):
self._interfaceId = None
self._timestamp = None
self._capturedPacketLength = None
self._originalPacketLength = None
self._packetData = None
if (set(kwargs.keys()) == set(["pBlockByteOrder", "pBlockBinData"])): self.__init_epb_bin__(pFile, **kwargs)
else : self.__init_epb_spc__(pFile, **kwargs)
def __init_epb_spc__(self, pFile=None, pBlockByteOrder:BLKByteOrderType=BLKByteOrderType.NATIVE, pInterfaceId:int=int(0), pTimestamp:int=int(0), pCapturedPacketLength:int=int(0), pOriginalPacketLength:int=int(0), pPacketData:typing.Union[bytes, bytearray]=bytes([]), pOptions:list=None) -> None:
"""
@brief Creates a PCAPNG EPB block from specific data
@param pFile Name of the file this block is located in. This is only used for informational outputs.
@param pBlockByteOrder Value of type 'BLKByteOrderType' that specifies the desired PCAPNG block byte order.
@param pInterfaceId Value of type '(unsigned) int' that specifies the interface on which this packet was received or transmitted.
@param pTimestamp Value of type '(unsigned) int' that specifies the number of units of time that have elapsed since 1970-01-01 00:00:00 UTC.
@param pCapturedPacketLength Value of type '(unsigned) int' that specifies the number of octets captured from the packet (i.e. the length of the Packet Data field). It will be the minimum value among the Original Packet Length and the snapshot length for the interface (SnapLen, defined in Figure 10). The value of this field does not include the padding octets added at the end of the Packet Data field to align the Packet Data field to a 32-bit boundary.
@param pOriginalPacketLength Value of type '(unsigned) int' that specifies the actual length of the packet when it was transmitted on the network. It can be different from the Captured Packet Length if the packet has been truncated by the capture process.
@param pPacketData Value of type 'bytes' or 'bytearray' that specifies payload of the block.
@param pOptions List of tuples of option key/value pairs. E.g. [(pyPCAPNG.EPBOptionType.PACKETID, [0xC0, 0xA8, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0x00]), (pyPCAPNG.EPBOptionType.ENDOFOPT, [])]
"""
super().__init__(pFile=pFile, pBlockType=BLKType.EPB, pBlockByteOrder=pBlockByteOrder)
self.interfaceId = pInterfaceId
self.timestamp = pTimestamp
self.capturedPacketLength = pCapturedPacketLength
self.originalPacketLength = pOriginalPacketLength
self.packetData = pPacketData
self.options = pOptions
def __init_epb_bin__(self, pFile=None, pBlockByteOrder:BLKByteOrderType=None, pBlockBinData:typing.Union[bytes, bytearray]=bytes([])) -> None:
"""
@brief Creates a PCAPNG EPB block from binary data
@param pFile Name of the file this block is located in. This is only used for informational outputs.
@param pBlockByteOrder Value of type 'BLKByteOrderType' that specifies the desired PCAPNG block byte order.
@param pBlockBinData Value of type 'bytes' or 'bytearray' that specifies the exact binary data of the block.