Source code for whad.ble.connector.peripheral

"""
Bluetooth Low Energy Peripheral connector
=========================================

WHAD provides a specific connector to create a BLE device, :class:`Peripheral`.
This connector implements a GATT server and hosts a GATT profile, allowing remote
BLE devices to connect to it and query its services, characteristics, and descriptors.

The connector provides some callbacks such as :meth:`Peripheral.on_connected` to
react on specific events.
"""
import logging
from time import sleep

from whad.ble.connector.base import BLE
from whad.hub.ble.bdaddr import BDAddress
from whad.ble.stack import BleStack
from whad.ble.stack.gatt import GattServer, GattClientServer
from whad.ble.stack.att import ATTLayer
from whad.ble.stack.smp import CryptographicDatabase, Pairing
from whad.ble.profile import GenericProfile
from whad.ble.profile.device import PeripheralDevice
from whad.ble.profile.advdata import AdvDataFieldList, AdvFlagsField
from whad.hub.ble import Direction as BleDirection
from whad.exceptions import UnsupportedCapability

# Logging
logger = logging.getLogger(__name__)

[docs] class Peripheral(BLE): """This BLE connector provides a way to create a peripheral device. A peripheral device exposes some services and characteristics that may be accessed by a central device. These services and characteristics are defined by a specific profile. """ def __init__(self, device, existing_connection = None, profile=None, adv_data=None, scan_data=None, bd_address=None, public=True, stack=BleStack, gatt=GattServer, pairing=Pairing(), security_database=None): """Create a peripheral device. :param device: WHAD device to use as a peripheral :type device: :class:`whad.device.WhadDevice` :param profile: Device profile to use :type profile: :class:`whad.ble.profile.GenericProfile` :param adv_data: Advertisement data :type adv_data: :class:`whad.ble.profile.advdata.AdvDataFieldList` :param scan_data: Advertisement data sent in Scan Response :type scan_data: :class:`whad.ble.profile.advdata.AdvDataFieldList` :param bd_address: Bluetooth Device address to use :type bd_address: str :param public: Set to True to use a public Bluetooth Device address, False to use a random one :type public: bool :param stack: Bluetooth Low Energy stack to use, :class:`whad.ble.stack.BleStack` by default :param gatt: Bluetooth Low Energy GATT """ super().__init__(device) # Attach a GATT server to our stack ATT layer att_layer = stack.find("att") if att_layer is not None: att_layer.add(gatt) # Initialize local peer and remote per info self.__local_peer = None self.__remote_peer = None self.__gatt_server = None self.__peripheral = None self.__central = None # Initialize stack self.__stack = stack(self) self.connection = None self.__connected = False self.__conn_handle = None # Initialize profile if profile is None: logger.info("No profile provided to this Peripheral instance, use a default one.") self.__profile = GenericProfile() else: logger.info("Peripheral will use the provided profile.") self.__profile = profile # Initialize security database if security_database is None: logger.info(( "No security database provided to this Peripheral instance, " "use a default one.")) self.__security_database = CryptographicDatabase() else: logger.info("Peripheral will use the provided security database.") self.__security_database = security_database # Initiate pairing parameters self.__pairing_parameters = pairing # Check if device accepts peripheral mode if not self.can_be_peripheral(): logger.info("Capability MasterRole not supported by this WHAD device") raise UnsupportedCapability("Peripheral") # Set bd address if provided if bd_address is not None: logger.info("Set BD address to %s", bd_address) self.set_bd_address(bd_address, public=public) # If an existing connection is hijacked, simulate a connection if existing_connection is not None: self.on_connected(existing_connection) else: # If no advertising data has been set, initialize this peripheral # with default flags. if adv_data is None: adv_data = AdvDataFieldList(AdvFlagsField()) # Enable peripheral mode logger.info("Enable peripheral mode with advertising data: %s", adv_data) self.enable_peripheral_mode(adv_data, scan_data) @property def local_peer(self): """Local peer object """ return self.__local_peer @property def remote_peer(self): """Remote peer object """ return self.__remote_peer @property def conn_handle(self) -> int: """Connection handle """ return self.__conn_handle
[docs] def get_pairing_parameters(self): """Returns the provided pairing parameters, if any. """ return self.__pairing_parameters
[docs] def send_data_pdu(self, data, conn_handle=1, direction=BleDirection.SLAVE_TO_MASTER, access_address=0x8e89bed6, encrypt=None) -> bool: """Send a PDU to the central device this peripheral device is connected to. Sending direction is set to ̀ BleDirection.SLAVE_TO_MASTER` as we need to send PDUs to a central device. :param pdu: PDU to send :type pdu: :class:`scapy.layers.bluetooth4LE.BTLE` :param conn_handle: Connection handle :type conn_handle: int :param direction: Sending direction (to master or slave) :type direction: :class:`whad.protocol.ble_pb2.BleDirection`, optional :param access_address: Target access address :type access_address: int, optional :return: PDU transmission result. :rtype: bool """ return super().send_data_pdu(data, conn_handle=conn_handle, direction=direction, access_address=access_address, encrypt=encrypt)
[docs] def send_ctrl_pdu(self, pdu, conn_handle=1, direction=BleDirection.SLAVE_TO_MASTER, access_address=0x8e89bed6, encrypt=None) -> bool: """Send a PDU to the central device this peripheral device is connected to. Sending direction is set to ̀ BleDirection.SLAVE_TO_MASTER` as we need to send PDUs to a central device. :param pdu: PDU to send :type pdu: :class:`scapy.layers.bluetooth4LE.BTLE` :param conn_handle: Connection handle :type conn_handle: int :param direction: Sending direction (to master or slave) :type direction: :class:`whad.protocol.ble_pb2.BleDirection`, optional :param access_address: Target access address :type access_address: int, optional :return: PDU transmission result. :rtype: bool """ return super().send_ctrl_pdu(pdu, conn_handle=conn_handle, direction=direction, access_address=access_address, encrypt=encrypt)
[docs] def use_stack(self, clazz=BleStack): """Specify a stack class to use for BLE. By default, our own stack (BleStack) is used. :param clazz: BLE stack to use. :type clazz: :class:`whad.ble.stack.BleStack` """ self.__stack = clazz(self)
@property def smp(self): """Security Manager Protocol """ if self.connection is not None: return self.connection.smp return None
[docs] def pairing(self, pairing=None): """Trigger a pairing request with the provided parameters, if any. """ if self.smp is None: return False if pairing is not None: self.__pairing_parameters = pairing if not self.smp.request_pairing(self.__pairing_parameters): return False while not self.smp.is_pairing_done(): sleep(0.1) if self.smp.is_pairing_failed(): return False self.smp.reset_state() return True
@property def gatt(self): """Generic Attribute layer """ if self.connection is not None: return self.connection.gatt return None @property def security_database(self): """Security Database """ return self.__security_database
[docs] def is_connected(self) -> bool: """Determine if the peripheral has an active connection from a GATT client. """ return self.__connected
[docs] def wait_connection(self): """Wait for a GATT client to connect to the peripheral. If a connection is already active, returns immediately. """ while not self.is_connected(): sleep(.5)
############################## # Incoming events ##############################
[docs] def on_connected(self, connection_data): """A device has just connected to this peripheral. :param connection_data: Connection data :type connection_data: :class:`whad.protocol.ble_pb2.Connected` """ # Retrieve the GATT server instance and set its profile logger.info("a device is now connected (connection handle: %d)", connection_data.conn_handle) self.__local_peer = BDAddress.from_bytes( connection_data.advertiser, connection_data.adv_addr_type ) self.__remote_peer = BDAddress.from_bytes( connection_data.initiator, connection_data.init_addr_type ) self.__stack.on_connection( connection_data.conn_handle, self.__local_peer, self.__remote_peer ) # GATT server is now connected self.__connected = True self.__conn_handle = connection_data.conn_handle
[docs] def on_disconnected(self, disconnection_data): """A device has just disconnected from this peripheral. :param connection_data: Connection data :type connection_data: :class:`whad.protocol.ble_pb2.Disconnected` """ logger.info("a device has just connected (connection handle: %d)", disconnection_data.conn_handle) self.__stack.on_disconnection( disconnection_data.conn_handle, disconnection_data.reason ) # Notify peripheral device about this disconnection if self.__profile is not None: self.__profile.on_disconnect(disconnection_data.conn_handle) # We are now disconnected self.__connected = False
[docs] def on_ctl_pdu(self, pdu): """This method is called whenever a control PDU is received. This PDU is then forwarded to the BLE stack to handle it. Peripheral devices act as a slave, so we only forward master to slave messages to the stack. :param pdu: BLE PDU :type pdu: :class:`scapy.layers.bluetooth4LE.BTLE` """ if pdu.metadata.direction == BleDirection.MASTER_TO_SLAVE: logger.info("Control PDU comes from master, forward to peripheral") self.__stack.on_ctl_pdu(pdu.metadata.connection_handle, pdu)
[docs] def on_data_pdu(self, pdu): """This method is called whenever a data PDU is received. This PDU is then forwarded to the BLE stack to handle it. :param pdu: BLE PDU :type pdu: :class:`scapy.layers.bluetooth4LE.BTLE_DATA` """ if pdu.metadata.direction == BleDirection.MASTER_TO_SLAVE: logger.info("Data PDU comes from master, forward to peripheral") self.__stack.on_data_pdu(pdu.metadata.connection_handle, pdu)
[docs] def on_new_connection(self, connection): """On new connection, discover primary services :param connection: Connection data :type connection: :class:`whad.protocol.ble_pb2.Connected` """ # Use GATT server self.connection = connection self.__connected = True # Retrieve GATT server self.__gatt_server = connection.gatt self.__gatt_server.set_server_model(self.__profile) self.__connected = True # Configure SMP layer # we set the security database self.connection.smp.set_security_database(self.__security_database) self.connection.smp.pairing_parameters = self.__pairing_parameters # Check if we got a matching LTK crypto_material = self.security_database.get(address=self.__local_peer) if crypto_material is not None and crypto_material.has_ltk(): conn_handle = connection.conn_handle self.__stack.get_layer("ll").state.register_encryption_key( conn_handle, crypto_material.ltk.value ) if crypto_material.is_authenticated(): #print("Marked as authenticated") self.__stack.get_layer("ll").state.mark_as_authenticated(connection.conn_handle) else: pass#print("Marked as unauthenticated") # we indicate that we are a responder self.connection.smp.set_responder_role() # Notify our profile about this connection self.__profile.on_connect(self.connection.conn_handle)
class PeripheralClient(Peripheral): '''This BLE connector provides a way to create a peripheral device with both GATT server and client roles. ''' def __init__(self, device, existing_connection = None, profile=None, adv_data=None, scan_data=None, bd_address=None, public=True, stack=BleStack): super().__init__( device, existing_connection=existing_connection, profile=profile, adv_data=adv_data, scan_data=scan_data, bd_address=bd_address, public=public, stack=stack ) # Change ATTLayer to use GattClientServer and reinstantiate our stack ATTLayer.add(GattClientServer) self.use_stack(BleStack) def on_new_connection(self, connection): super().on_new_connection(connection) # Create a new peripheral device to represent the central device # that has just connected self.__peripheral = PeripheralDevice( self, connection.gatt, connection.conn_handle ) # Retrieve GATT client self.__central = connection.gatt self.__central.set_client_model(self.__peripheral) @property def central_device(self): """Central device """ return self.__peripheral