Source code for whad.ble.scanning

"""
BLE Scanning device database
============================

This module provides a database that keeps track of discovered devices,
:class:`whad.ble.scanning.AdvertisingDevicesDB`. Discovered devices information
are handled in :class:`whad.ble.scanning.AdvertisingDevice`.
"""
from time import time

from scapy.layers.bluetooth4LE import BTLE_ADV_IND, BTLE_ADV_NONCONN_IND, \
    BTLE_SCAN_RSP, BTLE_ADV

from whad.hub.ble import BDAddress
from whad.ble.profile.advdata import AdvDataFieldList, AdvCompleteLocalName, \
    AdvDataError, AdvDataFieldListOverflow, AdvShortenedLocalName

[docs] class AdvertisingDevice: """Store information about a device: * Received Signal Strength Indicator (RSSI) * Address type (public or random) * Advertising data * Scan response data * Type of advertising PDU * Connectable information """ SCAN_RSP_TIMEOUT = 0.5 def __init__(self, rssi, address_type, bd_address, adv_data, rsp_data=None, undirected=True, connectable=True): """Instantiate an AdvertisingDevice. :param rssi: Received Signal Strength Indicator :type rssi: float :param address_type: Address type :type address_type: int :param bd_address: Bluetooth device address :type bd_address: str :param adv_data: Advertising data :type adv_data: bytes :param rsp_data: Scan response data :type rsp_data: bytes, optional :param undirected: ``True`` if advertising PDUs are undirected, `False` otherwise :type undirected: bool, optional :param connectable: ``True`` if device accepts connection, `False` otherwise :type connectable: bool, optional """ self.__address_type = address_type self.__bd_address = bd_address self.__adv_data = adv_data self.__rsp_data = rsp_data self.__rssi = rssi self.__got_scan_rsp = False self.__undirected = undirected self.__connectable = connectable self.__scanned = False self.__reported = False self.__timestamp = time() self.__last_seen = self.__timestamp @property def address(self) -> str: """Device BD address. """ return str(self.__bd_address) @property def address_type(self) -> int: """Device address type. """ return self.__address_type @property def rssi(self) -> float: """Device RSSI. """ return self.__rssi @property def adv_records(self) -> bytes: """Advertising records. """ return self.__adv_data @property def scan_rsp_records(self) -> bytes: """Scan response records. """ return self.__rsp_data @property def ad_records(self) -> AdvDataFieldList: """Combined advertising and scan response records. """ out = AdvDataFieldList() for record in self.__adv_data: out.add(record) if self.__rsp_data is not None: for record in self.__rsp_data: out.add(record) return out @property def got_scan_rsp(self) -> bool: """Received a scan response from device. """ return self.__got_scan_rsp @property def name(self) -> str: """Device complete or short name. """ # Do we have a name ? complete_name = None short_name = None for record in self.ad_records: if isinstance(record, AdvShortenedLocalName): short_name = record.name.decode('utf-8') elif isinstance(record, AdvCompleteLocalName): complete_name = record.name.decode('utf-8') # Return discovered name (if any) if complete_name is not None: return complete_name if short_name is not None: return short_name return None @property def scanned(self) -> bool: """Device scanned status """ return self.__scanned @property def timestamp(self) -> float: """Device discovery timestamp. """ return self.__timestamp @property def last_seen(self) -> float: """Device last seen timestamp. """ return self.__last_seen @property def reported(self) -> bool: """Reported status """ return self.__reported @property def connectable(self) -> bool: """Connectable status. """ return self.__connectable def __repr__(self): """Show device information. """ # Do we have a name ? complete_name = None short_name = None for record in self.ad_records: if isinstance(record, AdvShortenedLocalName): try: short_name = record.name.decode('utf-8') except UnicodeDecodeError: short_name = record.name.decode('latin1') elif isinstance(record, AdvCompleteLocalName): try: complete_name = record.name.decode('utf-8') except UnicodeDecodeError: complete_name = record.name.decode('latin1') # Pick the best name if complete_name: name = f'name:"{complete_name}"' elif short_name: name = f'name:"{short_name}"' else: name = '' # Display address type if self.__address_type == 0: addrtype = '[PUB]' else: addrtype = '[RND]' # Generate device summary return f'[{self.__rssi:04d} dBm] {addrtype} {self.__bd_address} {name}'
[docs] def seen(self): """Mark device as seen (update last_seen value with current time). """ self.__last_seen = time()
[docs] def mark_reported(self): """Mark device as reported (avoid duplicates) """ self.__reported = True
[docs] def update(self, rssi : float = None, adv_data : bytes = None): """Update device RSSI and advertising data and check for scan response timeout. :param rssi: New RSSI value. :type rssi: float :param adv_data: New advertising data :type adv_data: bytes """ if rssi is not None: self.__rssi = rssi if adv_data is not None: self.__adv_data = adv_data # Update scanned status if required if not self.__scanned: if (time() - self.__timestamp) > self.SCAN_RSP_TIMEOUT: self.__scanned = True
[docs] def set_scan_rsp(self, scan_rsp): """Update device advertisement data. :param scan_rsp: Raw scan response. :type scan_rsp: bytes """ if not self.__got_scan_rsp: self.__rsp_data = scan_rsp self.__got_scan_rsp = True self.__scanned = True
[docs] class AdvertisingDevicesDB: """Bluetooth Low Energy devices database. This class stores information about discovered devices. """ def __init__(self): self.reset()
[docs] def reset(self): """Remove database content. """ self.__db = {}
[docs] def find_device(self, address: str) -> AdvertisingDevice: """Find a device based on its BD address. :param address: Device BD address :type address: str :return: Device if found, `None` otherwise. :rtype: :class:`whad.ble.scanning.AdvertisingDevice` """ device = None address = address.lower() if address in self.__db: device = self.__db[address] return device
[docs] def register_device(self, device): """Register or update a device. :param device: Device to register. :type device: :class:`whad.ble.scanning.AdvertisingDevice` """ if device.address not in self.__db: self.__db[device.address] = device else: self.__db[device.address].seen()
def __apply_scan_rsp_timeout(self): """Check every device to determine if our scan request has timed out. """ for _, device in self.__db.items(): if not device.scanned: device.update() if device.scanned and not device.reported: device.mark_reported() yield device
[docs] def on_device_found(self, rssi, adv_packet, filter_addr=None): """Device advertising packet or scan response received. Parse the incoming packet and handle device appropriately. :param rssi: Received Signal Strength Indicator :type rssi: float :param adv_packet: Advertising packet :type adv_packet: :class:`scapy.packet.Packet` :param filter_addr: BD address to filter :type filter_addr: str """ devices = [] addr_type = adv_packet.getlayer(BTLE_ADV).TxAdd if adv_packet.haslayer(BTLE_ADV_IND): bd_address = BDAddress(adv_packet[BTLE_ADV_IND].AdvA) try: adv_data = b''.join([ bytes(record) for record in adv_packet[BTLE_ADV_IND].data]) adv_list = AdvDataFieldList.from_bytes(adv_data) device = AdvertisingDevice( rssi, addr_type, bd_address, adv_list ) # Register device if it matches our criterias if filter_addr is not None and filter_addr.lower() == str(bd_address).lower(): self.register_device(device) elif filter_addr is None: self.register_device(device) except AdvDataError: pass except AdvDataFieldListOverflow: pass elif adv_packet.haslayer(BTLE_ADV_NONCONN_IND): try: bd_address = BDAddress(adv_packet[BTLE_ADV_NONCONN_IND].AdvA) adv_data = b''.join([ bytes(record) for record in adv_packet[BTLE_ADV_NONCONN_IND].data]) adv_list = AdvDataFieldList.from_bytes(adv_data) device = AdvertisingDevice( rssi, addr_type, bd_address, adv_list, connectable=False ) # Register device if it matches our criterias if filter_addr is not None and filter_addr.lower() == str(bd_address).lower(): self.register_device(device) elif filter_addr is None: self.register_device(device) except AdvDataError: pass except AdvDataFieldListOverflow: pass elif adv_packet.haslayer(BTLE_SCAN_RSP): try: bd_address = BDAddress(adv_packet[BTLE_SCAN_RSP].AdvA) adv_data = b''.join([ bytes(record) for record in adv_packet[BTLE_SCAN_RSP].data]) adv_list = AdvDataFieldList.from_bytes(adv_data) if str(bd_address) in self.__db: device = self.__db[str(bd_address)] if not device.got_scan_rsp: device.set_scan_rsp(adv_list) except AdvDataError: pass except AdvDataFieldListOverflow: pass # Check if some devices scan response timeout is reached for device in self.__apply_scan_rsp_timeout(): devices.append(device) return devices