Source code for whad.ble.connector.scanner

"""
BLE Scanner connector
=====================

This module provides a scanner connector :class:`whad.ble.connector.scanner.Scanner`
for WHAD BLE devices, allowing to discover available BLE devices.

It implements two different scanning approaches:

* The first one is based on a *normal device scanning* based on WHAD BLE protocol:
  it puts the hardware adapter in device discovery mode and grab each advertisement
  received.

* The second one puts the hardware adapter in *sniffing mode* and sniffs advertisements
  sent on default channels. This mode is only available with WHAD devices that support
  sniffing.

This connector must be instantiated with a compatible WHAD device, as shown below:

.. code-block:: python

    device = WhadDevice.create('uart0')
    scanner = Scanner(device)

If the underlying device does not support scanning, this connector will raise
an :class:`UnsupportedCapability` exception.

"""
from time import time
from typing import Iterator, List, Optional

from scapy.packet import Packet
from scapy.layers.bluetooth4LE import BTLE_ADV

from whad.hub.ble import BleAdvPduReceived, BleRawPduReceived
from whad.exceptions import UnsupportedCapability, WhadDeviceNotReady

from .base import BLE
from ..scanning import AdvertisingDevicesDB, AdvertisingDevice

[docs] class Scanner(BLE): """ BLE Observer interface for compatible WHAD device. """ domain = 'ble' def __init__(self, device): """Instantiate scanner connector over `device`. :param device: BLE WHAD device instance :type device: :class:`whad.device.WhadDevice` """ super().__init__(device) self.__db = AdvertisingDevicesDB() # Check device accept scanning mode if not self.can_scan(): # Does our device accept sniffing mode and is able of sniffing advertising reports ? if not self.can_sniff_advertisements(): raise UnsupportedCapability('Scan') # Stop current mode self.stop() # Set advertisement sniffing self.sniff_advertisements() else: # Stop current mode self.stop() # Enable active scanning self.enable_scan_mode(True)
[docs] def start(self) -> bool: """Start the BLE scanner. Calling this method resets the discovered devices database and put the WHAD device into BLE scanning mode. """ self.__db.reset() return super().start()
[docs] def stop(self) -> bool: """Stop scanning. Stop scanning for devices, disable scan mode if used. """ # Stop scanning result = super().stop() # Disable active scan mode (passive mode by default) if self.can_scan(): self.enable_scan_mode(False) # Send result return result
def __enter__(self) -> 'Scanner': """Special method used to use this connector as a context manager.""" # Make sure we are started if not self.started: if not self.start(): raise WhadDeviceNotReady() return self def __exit__(self, exc_type, exc_value, exc_traceback): """Special method used to close the connector when a context manager is done with it.""" if self.started: if not self.stop(): raise WhadDeviceNotReady()
[docs] def discover_devices(self, minimal_rssi = None, filter_address = None, timeout: Optional[float] = None, updates: bool = False) -> Iterator[AdvertisingDevice]: """ Parse incoming advertisements and yield discovered devices. If `updates` is set to `True`, all discovered devices will be reported with updated information. :param minimal_rssi: Minimal RSSI level :type minimal_rssi: float, optional :param filter_address: BD address of a device to discover :type filter_address: :class:`whad.ble.bdaddr.BDAddress`, optional :param timeout: Timeout in seconds :type timeout: float, optional :param updates: Enable/disable reporting updated information about already discovered devices :type updates: bool """ # Start sniffing start_time = time() for advertisement in self.sniff(timeout=timeout): if minimal_rssi is None or advertisement.metadata.rssi > minimal_rssi: devices = self.__db.on_device_found( advertisement.metadata.rssi, advertisement, filter_addr=filter_address, updates=updates ) for device in devices: yield device if (timeout is not None) and (time() - start_time > timeout): break
[docs] def sniff(self, messages: Optional[List] = None, timeout: Optional[float] = None) -> Iterator[Packet]: """ Listen and yield incoming advertising PDUs. """ # Make sure the hardware interface is started if not self.started: if not self.start(): raise WhadDeviceNotReady() stop_on_exit = True else: stop_on_exit = False # Determine the type of message we are expecting to receive if self.support_raw_pdu(): message_type = BleRawPduReceived else: message_type = BleAdvPduReceived # Loop until timeout reached or stopped start_anchor = time() while True: # Switch to sniffing mode for message in super().sniff(messages=(message_type), timeout=timeout): # Convert message from rebuilt PDU packet = message.to_packet() if packet is not None: self.monitor_packet_rx(packet) # Force TxAdd value to propagate the address type if isinstance(message, BleAdvPduReceived): if message.addr_type > 0: packet.getlayer(BTLE_ADV).TxAdd = 1 yield packet # Make sure we did not run over a configured timeout, if so # exit the main while loop if timeout is not None: if (time() - start_anchor) >= timeout: break # Stop current mode if we enabled it if stop_on_exit: if not self.stop(): raise WhadDeviceNotReady()
[docs] def clear(self): """ Clear device database. """ self.__db.reset()