/ scapy_ex.py
scapy_ex.py
  1  """
  2  A set of additions and modifications to scapy to assist in parsing dot11
  3  """
  4  import scapy
  5  
  6  from scapy.fields import BitField
  7  from scapy.fields import ByteField
  8  from scapy.fields import ConditionalField
  9  from scapy.fields import EnumField
 10  from scapy.fields import Field
 11  from scapy.fields import FieldLenField
 12  from scapy.fields import FieldListField
 13  from scapy.fields import FlagsField
 14  from scapy.fields import LEFieldLenField
 15  from scapy.fields import LELongField
 16  from scapy.fields import LEShortField
 17  from scapy.fields import StrFixedLenField
 18  from scapy.layers.dot11 import Dot11Elt
 19  from scapy.layers.dot11 import Dot11ProbeReq
 20  from scapy.packet import Packet
 21  
 22  from printer import Printer
 23  
 24  
 25  class SignedByteField(Field):
 26  	"""Fields for a signed byte"""
 27  	def __init__(self, name, default):
 28  		Field.__init__(self, name, default, '<b')
 29  
 30  
 31  class LESignedShortField(Field):
 32  	"""Field for a little-endian short"""
 33  	def __init__(self, name, default):
 34  		Field.__init__(self, name, default, '<h')
 35  
 36  
 37  def scapy_packet_Packet_hasflag(self, field_name, value):
 38  	"""Is the specified flag value set in the named field"""
 39  	field, val = self.getfield_and_val(field_name)
 40  	if isinstance(field, EnumField):
 41  		if val not in field.i2s:
 42  			return False
 43  		return field.i2s[val] == value
 44  	else:
 45  		return (1 << field.names.index([value])) & self.__getattr__(field_name) != 0
 46  scapy.packet.Packet.hasflag = scapy_packet_Packet_hasflag
 47  del scapy_packet_Packet_hasflag
 48  
 49  
 50  def scapy_fields_FieldListField_i2repr(self, pkt, x):
 51  	"""Return a list with the representation of contained fields"""
 52  	return repr([self.field.i2repr(pkt, v) for v in x])
 53  FieldListField.i2repr = scapy_fields_FieldListField_i2repr
 54  del scapy_fields_FieldListField_i2repr
 55  
 56  
 57  class ChannelFromMhzField(LEShortField):
 58  	"""A little-endian short field that converts from mhz to channel"""
 59  	def m2i(self, pkt, x):
 60  		return min(14, max(1, (x - 2407) / 5))
 61  
 62  
 63  class PresentFlagField(ConditionalField):
 64  	"""Utility field for use by RadioTap"""
 65  	def __init__(self, field, flag_name):
 66  		ConditionalField.__init__(self, field, lambda pkt: pkt.hasflag('present', flag_name))
 67  
 68  
 69  # TODO(ivanlei): This fields_desc does not cover chained present flags decode will fail in this cases
 70  scapy.layers.dot11.RadioTap.name = '802.11 RadioTap'
 71  
 72  # Greatly improved fields_desc for RadioTap which parses known present flags
 73  scapy.layers.dot11.RadioTap.fields_desc = [
 74  	ByteField('version', 0),
 75  	ByteField('pad', 0),
 76  	LEShortField('RadioTap_len', 0),
 77  	FlagsField('present', None, -32, ['TSFT','Flags','Rate','Channel','FHSS','dBm_AntSignal',
 78  									  'dBm_AntNoise','Lock_Quality','TX_Attenuation','dB_TX_Attenuation',
 79  									  'dBm_TX_Power', 'Antenna', 'dB_AntSignal', 'dB_AntNoise',
 80  									  'b14', 'b15','b16','b17','b18','b19','b20','b21','b22','b23',
 81  									  'b24','b25','b26','b27','b28','b29','b30','Ext']),
 82  	PresentFlagField(LELongField('TSFT', 0), 'TSFT'),
 83  	PresentFlagField(ByteField('Flags', 0), 'Flags'),
 84  	PresentFlagField(ByteField('Rate', 0), 'Rate'),
 85  	PresentFlagField(ChannelFromMhzField('Channel', 0), 'Channel'),
 86  	PresentFlagField(LEShortField('Channel_flags', 0), 'Channel'),
 87  	PresentFlagField(ByteField('FHSS_hop_set', 0), 'FHSS'),
 88  	PresentFlagField(ByteField('FHSS_hop_pattern', 0), 'FHSS'),
 89  	PresentFlagField(SignedByteField('dBm_AntSignal', 0), 'dBm_AntSignal'),
 90  	PresentFlagField(SignedByteField('dBm_AntNoise', 0), 'dBm_AntNoise'),
 91  	PresentFlagField(LEShortField('Lock_Quality', 0), 'Lock_Quality'),
 92  	PresentFlagField(LEShortField('TX_Attenuation', 0), 'TX_Attenuation'),
 93  	PresentFlagField(LEShortField('db_TX_Attenuation', 0), 'dB_TX_Attenuation'),
 94  	PresentFlagField(SignedByteField('dBm_TX_Power', 0), 'dBm_TX_Power'),
 95  	PresentFlagField(ByteField('Antenna', 0), 'Antenna'),
 96  	PresentFlagField(ByteField('dB_AntSignal', 0), 'dB_AntSignal'),
 97  	PresentFlagField(ByteField('dB_AntNoise', 0), 'dB_AntNoise'),
 98  	PresentFlagField(LEShortField('RX_Flags', 0), 'b14')
 99  ]
100  
101  
102  def scapy_layers_dot11_RadioTap_extract_padding(self, s):
103  	"""Ignore any unparsed conditionally present fields
104  
105  	If all fields have been parsed, the payload length should have decreased RadioTap_len bytes
106  	If it has not, there are unparsed fields which should be treated as padding
107  	"""
108  	padding = len(s) - (self.pre_dissect_len - self.RadioTap_len)
109  	if padding:
110  		return s[padding:], s[:padding]
111  	else:
112  		return s, None
113  scapy.layers.dot11.RadioTap.extract_padding = scapy_layers_dot11_RadioTap_extract_padding
114  del scapy_layers_dot11_RadioTap_extract_padding
115  
116  
117  def scapy_layers_dot11_RadioTap_pre_dissect(self, s):
118  	"""Cache to total payload length prior to dissection for use in finding padding latter"""
119  	self.pre_dissect_len = len(s)
120  	return s
121  scapy.layers.dot11.RadioTap.pre_dissect = scapy_layers_dot11_RadioTap_pre_dissect
122  del scapy_layers_dot11_RadioTap_pre_dissect
123  
124  
125  class Dot11EltRates(Packet):
126  	"""The rates member contains an array of supported rates"""
127  
128  	name = '802.11 Rates Information Element'
129  
130  	# Known rates come from table in 6.5.5.2 of the 802.11 spec
131  	known_rates = {
132  		  2 :  1,
133  		  3 :  1.5,
134  		  4 :  2,
135  		  5 :  2.5,
136  		  6 :  3,
137  		  9 :  4.5,
138  		 11 :  5.5,
139  		 12 :  6,
140  		 18 :  9,
141  		 22 : 11,
142  		 24 : 12,
143  		 27 : 13.5,
144  		 36 : 18,
145  		 44 : 22,
146  		 48 : 24,
147  		 54 : 27,
148  		 66 : 33,
149  		 72 : 36,
150  		 96 : 48,
151  		108 : 54
152  	}
153  
154  	fields_desc = [
155  		ByteField('ID', 0),
156  		FieldLenField("len", None, "info", "B"),
157  		FieldListField('supported_rates', None, ByteField('', 0), count_from=lambda pkt: pkt.len),
158  	]
159  
160  	def post_dissection(self, pkt):
161  		self.rates = []
162  		for supported_rate in self.supported_rates:
163  			# check the msb for each rate
164  			rate_msb = supported_rate & 0x80
165  			rate_value = supported_rate & 0x7F
166  			if rate_msb:
167  				# a value of 127 means HT PHY feature is required to join the BSS
168  				if 127 != rate_value:
169  					self.rates.append(rate_value/2)
170  			elif rate_value in Dot11EltRates.known_rates:
171  				self.rates.append(Dot11EltRates.known_rates[rate_value])
172  
173  
174  class Dot11EltExtendedRates(Dot11EltRates):
175  	"""The rates member contains an additional array of supported rates"""
176  
177  	name = '802.11 Extended Rates Information Element'
178  
179  
180  class Dot11EltRSN(Packet):
181  	"""The enc, cipher, and auth members contain the decoded 'security' details"""
182  
183  	name = '802.11 RSN Information Element'
184  
185  	cipher_suites = { '\x00\x0f\xac\x00': 'GROUP',
186  					  '\x00\x0f\xac\x01': 'WEP',
187  					  '\x00\x0f\xac\x02': 'TKIP',
188  					  '\x00\x0f\xac\x04': 'CCMP',
189  					  '\x00\x0f\xac\x05': 'WEP' }
190  
191  	auth_suites = { '\x00\x0f\xac\x01': 'MGT',
192  					'\x00\x0f\xac\x02': 'PSK' }
193  
194  	fields_desc = [
195  		ByteField('ID', 0),
196  		FieldLenField("len", None, "info", "B"),
197  		LEShortField('version', 1),
198  		StrFixedLenField('group_cipher_suite', '', length=4),
199  		LEFieldLenField('pairwise_cipher_suite_count', 1, count_of='pairwise_cipher_suite'),
200  		FieldListField('pairwise_cipher_suite', None, StrFixedLenField('','', length=4), count_from=lambda pkt: pkt.pairwise_cipher_suite_count),
201  		LEFieldLenField('auth_cipher_suite_count', 1, count_of='auth_cipher_suite'),
202  		FieldListField('auth_cipher_suite', None, StrFixedLenField('','',length=4), count_from=lambda pkt: pkt.auth_cipher_suite_count),
203  		BitField('rsn_cap_pre_auth', 0, 1),
204  		BitField('rsn_cap_no_pairwise', 0, 1),
205  		BitField('rsn_cap_ptksa_replay_counter', 0, 2),
206  		BitField('rsn_cap_gtksa_replay_counter', 0, 2),
207  		BitField('rsn_cap_mgmt_frame_protect_required', 0, 1),
208  		BitField('rsn_cap_mgmt_frame_protect_capable', 0, 1),
209  		BitField('rsn_cap_reserved_1', 0, 1),
210  		BitField('rsn_cap_peer_key_enabled', 0, 1),
211  		BitField('rsn_cap_reserved_2', 0, 6),
212  	]
213  
214  	def post_dissection(self, pkt):
215  		"""Parse cipher suites to determine encryption, cipher, and authentication methods"""
216  
217  		self.enc = 'WPA2' # Everything is assumed to be WPA
218  		self.cipher = ''
219  		self.auth = ''
220  
221  		ciphers = [self.cipher_suites.get(pairwise_cipher) for pairwise_cipher in self.getfieldval('pairwise_cipher_suite')]
222  		if 'GROUP' in ciphers:
223  			ciphers = [self.cipher_suites.get(group_cipher, '') for group_cipher in self.getfieldval('group_cipher_suite')]
224  		for cipher in ['CCMP', 'TKIP', 'WEP']:
225  			if cipher in ciphers:
226  				self.cipher = cipher
227  				break
228  
229  		if 'WEP' == self.cipher:
230  			self.enc = 'WEP'
231  
232  		for auth_cipher in self.getfieldval('auth_cipher_suite'):
233  			self.auth = self.auth_suites.get(auth_cipher, '')
234  			break
235  
236  
237  def scapy_layers_dot11_Dot11_elts(self):
238  	"""An iterator of Dot11Elt"""
239  	dot11elt = self.getlayer(Dot11Elt)
240  	while dot11elt and dot11elt.haslayer(Dot11Elt):
241  		yield dot11elt
242  		dot11elt = dot11elt.payload
243  scapy.layers.dot11.Dot11.elts = scapy_layers_dot11_Dot11_elts
244  del scapy_layers_dot11_Dot11_elts
245  
246  
247  def scapy_layers_dot11_Dot11_find_elt_by_id(self, id):
248  	"""Iterate over elt and return the first with a specific ID"""
249  	for elt in self.elts():
250  		if elt.ID == id:
251  			return elt
252  	return None
253  scapy.layers.dot11.Dot11.find_elt_by_id = scapy_layers_dot11_Dot11_find_elt_by_id
254  del scapy_layers_dot11_Dot11_find_elt_by_id
255  
256  
257  def scapy_layers_dot11_Dot11_essid(self):
258  	"""Return the payload of the SSID Dot11Elt if it exists"""
259  	elt = self.find_elt_by_id(0)
260  	return elt.info if elt else None
261  scapy.layers.dot11.Dot11.essid = scapy_layers_dot11_Dot11_essid
262  del scapy_layers_dot11_Dot11_essid
263  
264  
265  def scapy_layers_dot11_Dot11_rates(self, id=1):
266  	"""Return the payload of the rates Dot11Elt if it exists"""
267  	elt = self.find_elt_by_id(id)
268  	if elt:
269  		try:
270  			return Dot11EltRates(str(elt)).rates
271  		except (exception, e):
272  			Printer.error('Bad Dot11EltRates got[{0:s}]'.format(elt.info))
273  			Printer.exception(e)
274  	return []
275  scapy.layers.dot11.Dot11.rates = scapy_layers_dot11_Dot11_rates
276  del scapy_layers_dot11_Dot11_rates
277  
278  
279  def scapy_layers_dot11_Dot11_extended_rates(self):
280  	"""Return the payload of the extended rates Dot11Elt if it exists"""
281  	return scapy.layers.dot11.Dot11.rates(self, 50)
282  scapy.layers.dot11.Dot11.extended_rates = scapy_layers_dot11_Dot11_extended_rates
283  del scapy_layers_dot11_Dot11_extended_rates
284  
285  
286  def scapy_layers_dot11_Dot11_sta_bssid(self):
287  	"""Return the bssid for a station associated with the packet"""
288  	if self.haslayer(Dot11ProbeReq) or self.hasflag('FCfield', 'to-DS'):
289  		return self.addr2
290  	else:
291  		return self.addr1
292  scapy.layers.dot11.Dot11.sta_bssid = scapy_layers_dot11_Dot11_sta_bssid
293  del scapy_layers_dot11_Dot11_sta_bssid
294  
295  
296  def scapy_layers_dot11_Dot11_ap_bssid(self):
297  	"""Return the bssid for a access point associated with the packet"""
298  	if self.haslayer(Dot11ProbeReq) or self.hasflag('FCfield', 'to-DS'):
299  		return self.addr1
300  	else:
301  		return self.addr2
302  scapy.layers.dot11.Dot11.ap_bssid = scapy_layers_dot11_Dot11_ap_bssid
303  del scapy_layers_dot11_Dot11_ap_bssid
304  
305  
306  def scapy_layers_dot11_Dot11_channel(self):
307  	"""Return the payload of the channel Dot11Elt if it exists"""
308  	elt = self.find_elt_by_id(3)
309  	if elt:
310  		try:
311  			return int(ord(elt.info))
312  		except (exception, e):
313  			Printer.error('Bad Dot11Elt channel got[{0:s}]'.format(elt.info))
314  			Printer.exception(e)
315  	return None
316  scapy.layers.dot11.Dot11.channel = scapy_layers_dot11_Dot11_channel
317  del scapy_layers_dot11_Dot11_channel
318  
319  
320  def scapy_layers_dot11_Dot11_rsn(self):
321  	"""Return the payload of the RSN Dot11Elt as a Dot11EltRSN"""
322  	elt = self.find_elt_by_id(48)
323  	if elt:
324  		try:
325  			return Dot11EltRSN(str(elt))
326  		except (exception, e):
327  			Printer.error('Bad Dot11EltRSN got[{0:s}]'.format(elt.info))
328  			Printer.exception(e)
329  	return None
330  scapy.layers.dot11.Dot11.rsn = scapy_layers_dot11_Dot11_rsn
331  del scapy_layers_dot11_Dot11_rsn