/ scapy_ex.py
scapy_ex.py
1 """ 2 A set of additions and modifications to scapy to assist in parsing dot11 3 """ 4 import scapy 5 6 from scapy.fields import BitField 7 from scapy.fields import ByteField 8 from scapy.fields import ConditionalField 9 from scapy.fields import EnumField 10 from scapy.fields import Field 11 from scapy.fields import FieldLenField 12 from scapy.fields import FieldListField 13 from scapy.fields import FlagsField 14 from scapy.fields import LEFieldLenField 15 from scapy.fields import LELongField 16 from scapy.fields import LEShortField 17 from scapy.fields import StrFixedLenField 18 from scapy.layers.dot11 import Dot11Elt 19 from scapy.layers.dot11 import Dot11ProbeReq 20 from scapy.packet import Packet 21 22 from printer import Printer 23 24 25 class SignedByteField(Field): 26 """Fields for a signed byte""" 27 def __init__(self, name, default): 28 Field.__init__(self, name, default, '<b') 29 30 31 class LESignedShortField(Field): 32 """Field for a little-endian short""" 33 def __init__(self, name, default): 34 Field.__init__(self, name, default, '<h') 35 36 37 def scapy_packet_Packet_hasflag(self, field_name, value): 38 """Is the specified flag value set in the named field""" 39 field, val = self.getfield_and_val(field_name) 40 if isinstance(field, EnumField): 41 if val not in field.i2s: 42 return False 43 return field.i2s[val] == value 44 else: 45 return (1 << field.names.index([value])) & self.__getattr__(field_name) != 0 46 scapy.packet.Packet.hasflag = scapy_packet_Packet_hasflag 47 del scapy_packet_Packet_hasflag 48 49 50 def scapy_fields_FieldListField_i2repr(self, pkt, x): 51 """Return a list with the representation of contained fields""" 52 return repr([self.field.i2repr(pkt, v) for v in x]) 53 FieldListField.i2repr = scapy_fields_FieldListField_i2repr 54 del scapy_fields_FieldListField_i2repr 55 56 57 class ChannelFromMhzField(LEShortField): 58 """A little-endian short field that converts from mhz to channel""" 59 def m2i(self, pkt, x): 60 return min(14, max(1, (x - 2407) / 5)) 61 62 63 class PresentFlagField(ConditionalField): 64 """Utility field for use by RadioTap""" 65 def __init__(self, field, flag_name): 66 ConditionalField.__init__(self, field, lambda pkt: pkt.hasflag('present', flag_name)) 67 68 69 # TODO(ivanlei): This fields_desc does not cover chained present flags decode will fail in this cases 70 scapy.layers.dot11.RadioTap.name = '802.11 RadioTap' 71 72 # Greatly improved fields_desc for RadioTap which parses known present flags 73 scapy.layers.dot11.RadioTap.fields_desc = [ 74 ByteField('version', 0), 75 ByteField('pad', 0), 76 LEShortField('RadioTap_len', 0), 77 FlagsField('present', None, -32, ['TSFT','Flags','Rate','Channel','FHSS','dBm_AntSignal', 78 'dBm_AntNoise','Lock_Quality','TX_Attenuation','dB_TX_Attenuation', 79 'dBm_TX_Power', 'Antenna', 'dB_AntSignal', 'dB_AntNoise', 80 'b14', 'b15','b16','b17','b18','b19','b20','b21','b22','b23', 81 'b24','b25','b26','b27','b28','b29','b30','Ext']), 82 PresentFlagField(LELongField('TSFT', 0), 'TSFT'), 83 PresentFlagField(ByteField('Flags', 0), 'Flags'), 84 PresentFlagField(ByteField('Rate', 0), 'Rate'), 85 PresentFlagField(ChannelFromMhzField('Channel', 0), 'Channel'), 86 PresentFlagField(LEShortField('Channel_flags', 0), 'Channel'), 87 PresentFlagField(ByteField('FHSS_hop_set', 0), 'FHSS'), 88 PresentFlagField(ByteField('FHSS_hop_pattern', 0), 'FHSS'), 89 PresentFlagField(SignedByteField('dBm_AntSignal', 0), 'dBm_AntSignal'), 90 PresentFlagField(SignedByteField('dBm_AntNoise', 0), 'dBm_AntNoise'), 91 PresentFlagField(LEShortField('Lock_Quality', 0), 'Lock_Quality'), 92 PresentFlagField(LEShortField('TX_Attenuation', 0), 'TX_Attenuation'), 93 PresentFlagField(LEShortField('db_TX_Attenuation', 0), 'dB_TX_Attenuation'), 94 PresentFlagField(SignedByteField('dBm_TX_Power', 0), 'dBm_TX_Power'), 95 PresentFlagField(ByteField('Antenna', 0), 'Antenna'), 96 PresentFlagField(ByteField('dB_AntSignal', 0), 'dB_AntSignal'), 97 PresentFlagField(ByteField('dB_AntNoise', 0), 'dB_AntNoise'), 98 PresentFlagField(LEShortField('RX_Flags', 0), 'b14') 99 ] 100 101 102 def scapy_layers_dot11_RadioTap_extract_padding(self, s): 103 """Ignore any unparsed conditionally present fields 104 105 If all fields have been parsed, the payload length should have decreased RadioTap_len bytes 106 If it has not, there are unparsed fields which should be treated as padding 107 """ 108 padding = len(s) - (self.pre_dissect_len - self.RadioTap_len) 109 if padding: 110 return s[padding:], s[:padding] 111 else: 112 return s, None 113 scapy.layers.dot11.RadioTap.extract_padding = scapy_layers_dot11_RadioTap_extract_padding 114 del scapy_layers_dot11_RadioTap_extract_padding 115 116 117 def scapy_layers_dot11_RadioTap_pre_dissect(self, s): 118 """Cache to total payload length prior to dissection for use in finding padding latter""" 119 self.pre_dissect_len = len(s) 120 return s 121 scapy.layers.dot11.RadioTap.pre_dissect = scapy_layers_dot11_RadioTap_pre_dissect 122 del scapy_layers_dot11_RadioTap_pre_dissect 123 124 125 class Dot11EltRates(Packet): 126 """The rates member contains an array of supported rates""" 127 128 name = '802.11 Rates Information Element' 129 130 # Known rates come from table in 6.5.5.2 of the 802.11 spec 131 known_rates = { 132 2 : 1, 133 3 : 1.5, 134 4 : 2, 135 5 : 2.5, 136 6 : 3, 137 9 : 4.5, 138 11 : 5.5, 139 12 : 6, 140 18 : 9, 141 22 : 11, 142 24 : 12, 143 27 : 13.5, 144 36 : 18, 145 44 : 22, 146 48 : 24, 147 54 : 27, 148 66 : 33, 149 72 : 36, 150 96 : 48, 151 108 : 54 152 } 153 154 fields_desc = [ 155 ByteField('ID', 0), 156 FieldLenField("len", None, "info", "B"), 157 FieldListField('supported_rates', None, ByteField('', 0), count_from=lambda pkt: pkt.len), 158 ] 159 160 def post_dissection(self, pkt): 161 self.rates = [] 162 for supported_rate in self.supported_rates: 163 # check the msb for each rate 164 rate_msb = supported_rate & 0x80 165 rate_value = supported_rate & 0x7F 166 if rate_msb: 167 # a value of 127 means HT PHY feature is required to join the BSS 168 if 127 != rate_value: 169 self.rates.append(rate_value/2) 170 elif rate_value in Dot11EltRates.known_rates: 171 self.rates.append(Dot11EltRates.known_rates[rate_value]) 172 173 174 class Dot11EltExtendedRates(Dot11EltRates): 175 """The rates member contains an additional array of supported rates""" 176 177 name = '802.11 Extended Rates Information Element' 178 179 180 class Dot11EltRSN(Packet): 181 """The enc, cipher, and auth members contain the decoded 'security' details""" 182 183 name = '802.11 RSN Information Element' 184 185 cipher_suites = { '\x00\x0f\xac\x00': 'GROUP', 186 '\x00\x0f\xac\x01': 'WEP', 187 '\x00\x0f\xac\x02': 'TKIP', 188 '\x00\x0f\xac\x04': 'CCMP', 189 '\x00\x0f\xac\x05': 'WEP' } 190 191 auth_suites = { '\x00\x0f\xac\x01': 'MGT', 192 '\x00\x0f\xac\x02': 'PSK' } 193 194 fields_desc = [ 195 ByteField('ID', 0), 196 FieldLenField("len", None, "info", "B"), 197 LEShortField('version', 1), 198 StrFixedLenField('group_cipher_suite', '', length=4), 199 LEFieldLenField('pairwise_cipher_suite_count', 1, count_of='pairwise_cipher_suite'), 200 FieldListField('pairwise_cipher_suite', None, StrFixedLenField('','', length=4), count_from=lambda pkt: pkt.pairwise_cipher_suite_count), 201 LEFieldLenField('auth_cipher_suite_count', 1, count_of='auth_cipher_suite'), 202 FieldListField('auth_cipher_suite', None, StrFixedLenField('','',length=4), count_from=lambda pkt: pkt.auth_cipher_suite_count), 203 BitField('rsn_cap_pre_auth', 0, 1), 204 BitField('rsn_cap_no_pairwise', 0, 1), 205 BitField('rsn_cap_ptksa_replay_counter', 0, 2), 206 BitField('rsn_cap_gtksa_replay_counter', 0, 2), 207 BitField('rsn_cap_mgmt_frame_protect_required', 0, 1), 208 BitField('rsn_cap_mgmt_frame_protect_capable', 0, 1), 209 BitField('rsn_cap_reserved_1', 0, 1), 210 BitField('rsn_cap_peer_key_enabled', 0, 1), 211 BitField('rsn_cap_reserved_2', 0, 6), 212 ] 213 214 def post_dissection(self, pkt): 215 """Parse cipher suites to determine encryption, cipher, and authentication methods""" 216 217 self.enc = 'WPA2' # Everything is assumed to be WPA 218 self.cipher = '' 219 self.auth = '' 220 221 ciphers = [self.cipher_suites.get(pairwise_cipher) for pairwise_cipher in self.getfieldval('pairwise_cipher_suite')] 222 if 'GROUP' in ciphers: 223 ciphers = [self.cipher_suites.get(group_cipher, '') for group_cipher in self.getfieldval('group_cipher_suite')] 224 for cipher in ['CCMP', 'TKIP', 'WEP']: 225 if cipher in ciphers: 226 self.cipher = cipher 227 break 228 229 if 'WEP' == self.cipher: 230 self.enc = 'WEP' 231 232 for auth_cipher in self.getfieldval('auth_cipher_suite'): 233 self.auth = self.auth_suites.get(auth_cipher, '') 234 break 235 236 237 def scapy_layers_dot11_Dot11_elts(self): 238 """An iterator of Dot11Elt""" 239 dot11elt = self.getlayer(Dot11Elt) 240 while dot11elt and dot11elt.haslayer(Dot11Elt): 241 yield dot11elt 242 dot11elt = dot11elt.payload 243 scapy.layers.dot11.Dot11.elts = scapy_layers_dot11_Dot11_elts 244 del scapy_layers_dot11_Dot11_elts 245 246 247 def scapy_layers_dot11_Dot11_find_elt_by_id(self, id): 248 """Iterate over elt and return the first with a specific ID""" 249 for elt in self.elts(): 250 if elt.ID == id: 251 return elt 252 return None 253 scapy.layers.dot11.Dot11.find_elt_by_id = scapy_layers_dot11_Dot11_find_elt_by_id 254 del scapy_layers_dot11_Dot11_find_elt_by_id 255 256 257 def scapy_layers_dot11_Dot11_essid(self): 258 """Return the payload of the SSID Dot11Elt if it exists""" 259 elt = self.find_elt_by_id(0) 260 return elt.info if elt else None 261 scapy.layers.dot11.Dot11.essid = scapy_layers_dot11_Dot11_essid 262 del scapy_layers_dot11_Dot11_essid 263 264 265 def scapy_layers_dot11_Dot11_rates(self, id=1): 266 """Return the payload of the rates Dot11Elt if it exists""" 267 elt = self.find_elt_by_id(id) 268 if elt: 269 try: 270 return Dot11EltRates(str(elt)).rates 271 except (exception, e): 272 Printer.error('Bad Dot11EltRates got[{0:s}]'.format(elt.info)) 273 Printer.exception(e) 274 return [] 275 scapy.layers.dot11.Dot11.rates = scapy_layers_dot11_Dot11_rates 276 del scapy_layers_dot11_Dot11_rates 277 278 279 def scapy_layers_dot11_Dot11_extended_rates(self): 280 """Return the payload of the extended rates Dot11Elt if it exists""" 281 return scapy.layers.dot11.Dot11.rates(self, 50) 282 scapy.layers.dot11.Dot11.extended_rates = scapy_layers_dot11_Dot11_extended_rates 283 del scapy_layers_dot11_Dot11_extended_rates 284 285 286 def scapy_layers_dot11_Dot11_sta_bssid(self): 287 """Return the bssid for a station associated with the packet""" 288 if self.haslayer(Dot11ProbeReq) or self.hasflag('FCfield', 'to-DS'): 289 return self.addr2 290 else: 291 return self.addr1 292 scapy.layers.dot11.Dot11.sta_bssid = scapy_layers_dot11_Dot11_sta_bssid 293 del scapy_layers_dot11_Dot11_sta_bssid 294 295 296 def scapy_layers_dot11_Dot11_ap_bssid(self): 297 """Return the bssid for a access point associated with the packet""" 298 if self.haslayer(Dot11ProbeReq) or self.hasflag('FCfield', 'to-DS'): 299 return self.addr1 300 else: 301 return self.addr2 302 scapy.layers.dot11.Dot11.ap_bssid = scapy_layers_dot11_Dot11_ap_bssid 303 del scapy_layers_dot11_Dot11_ap_bssid 304 305 306 def scapy_layers_dot11_Dot11_channel(self): 307 """Return the payload of the channel Dot11Elt if it exists""" 308 elt = self.find_elt_by_id(3) 309 if elt: 310 try: 311 return int(ord(elt.info)) 312 except (exception, e): 313 Printer.error('Bad Dot11Elt channel got[{0:s}]'.format(elt.info)) 314 Printer.exception(e) 315 return None 316 scapy.layers.dot11.Dot11.channel = scapy_layers_dot11_Dot11_channel 317 del scapy_layers_dot11_Dot11_channel 318 319 320 def scapy_layers_dot11_Dot11_rsn(self): 321 """Return the payload of the RSN Dot11Elt as a Dot11EltRSN""" 322 elt = self.find_elt_by_id(48) 323 if elt: 324 try: 325 return Dot11EltRSN(str(elt)) 326 except (exception, e): 327 Printer.error('Bad Dot11EltRSN got[{0:s}]'.format(elt.info)) 328 Printer.exception(e) 329 return None 330 scapy.layers.dot11.Dot11.rsn = scapy_layers_dot11_Dot11_rsn 331 del scapy_layers_dot11_Dot11_rsn