Source code for whad.ble.connector.central

"""
Bluetooth Low Energy Central connector
======================================
"""
import logging
from time import time, sleep

from whad.ble.connector.base import BLE
from whad.hub.ble import Direction
from whad.hub.ble.bdaddr import BDAddress
from whad.ble.stack import BleStack
from whad.ble.stack.constants import BT_MANUFACTURERS, BT_VERSIONS
from whad.ble.stack.gatt import GattClient
from whad.ble.stack.att import ATTLayer
from whad.ble.stack.smp import CryptographicDatabase
from whad.ble.exceptions import ConnectionLostException, PeripheralNotFound
from whad.ble.profile.device import PeripheralDevice
from whad.common.stack import Layer
from whad.exceptions import UnsupportedCapability

logger = logging.getLogger(__name__)

[docs] class Central(BLE): """This connector provides a BLE Central role. To initiate a connection to a device, just call :meth:`Central.connect` with the target BD address and it should return an instance of :class:`whad.ble.profile.device.PeripheralDevice` in return. """ def __init__(self, device, existing_connection = None, from_json=None, stack=BleStack, client=GattClient, security_database=None): """Attach a GATT client if specified in parameter If `client` is set to None, the default GATT layer is used, which does not provide any client feature at all. """ super().__init__(device) if client is not None: if issubclass(client, Layer) and client.alias == 'gatt': ATTLayer.add(client) self.__gatt_client = None self.__stack = stack(self) self.__connected = False self.__peripheral = None self.__profile_json = from_json self.__target = None self.__local = None self.__conn_handle = None self.connection = None # If no connection, check if if not self.can_be_central(): raise UnsupportedCapability('Central') # Initialize security database if security_database is None: logger.info(( "No security database provided to this Peripheral instance, " "use a default one.")) self.__security_database = CryptographicDatabase() else: logger.info('Peripheral will use the provided security database.') self.__security_database = security_database # If a connection already exists, just feed the stack with the parameters if existing_connection is not None: self.on_connected(existing_connection) else: # self.stop() # ButteRFly doesn't support calling stop when spawning central self.enable_central_mode() @property def security_database(self): """Central role security database. """ return self.__security_database @property def local_peer(self) -> BDAddress: """Local peer BD address. """ return self.__local @property def target_peer(self) -> BDAddress: """Remote peer BD address. """ return self.__target @property def conn_handle(self) -> int: """Connection handle. """ return self.__conn_handle @property def stack(self): '''Return the current stack instance ''' return self.__stack
[docs] def connect(self, bd_address, random=False, timeout=30, access_address=None,channel_map=None, crc_init=None, hop_interval=None, hop_increment=None) -> PeripheralDevice: """Connect to a target device :param bd_address: Bluetooth device address (in format 'xx:xx:xx:xx:xx:xx') :type bd_address: str :param timeout: Connection timeout :type timeout: float :param access_address: Access address to use (optional) :type access_address: int :param channel_map: Channel map to use (optional) :type channel_map: int :param crc_init: CRC Initialization value to use (optional) :type crc_init: int :param hop_interval: Hop interval to use (optional) :type hop_interval: int :param hop_increment: Hop increment to use (optional) :type hop_increment: int :return: An instance of `PeripheralDevice` on success, `None` on failure. :rtype: :class:`whad.ble.profile.device.PeripheralDevice` """ if self.can_connect(): self.connect_to( bd_address, random=random, access_address=access_address, channel_map=channel_map, crc_init=crc_init, hop_interval=hop_interval, hop_increment=hop_increment ) self.start() start_time=time() while not self.is_connected(): if time()-start_time >= timeout: raise PeripheralNotFound sleep(0.1) return self.peripheral() # Raise error raise PeripheralNotFound()
[docs] def peripheral(self) -> PeripheralDevice: """Connected BLE peripheral. """ return self.__peripheral
[docs] def send_pdu(self, pdu, conn_handle=0, direction=Direction.MASTER_TO_SLAVE, access_address=0x8e89bed6, encrypt=None) -> bool: """Send a PDU to the connected peripheral device or to the central device. :param pdu: BLE PDU to send. :type pdu: :class:`scapy.layers.bluetooth4LE.BTLE` :param conn_handle: Connection handle :type conn_handle: int :param direction: Direction (central to peripheral, peripheral to central) :type direction: :class:`whad.protocol.ble.ble_pb2.BleDirection` :param access_address: Access address to use while sending PDU. :type access_address: int :param encrypt: Enable PDU encryption if set to ``True``. :type encrypt: bool :return: PDU transmission result. :rtype: bool """ if self.__connected: return super().send_pdu(pdu, conn_handle, direction=direction, access_address=access_address, encrypt=encrypt) # Connection lost raise ConnectionLostException(None)
############################## # Incoming events ##############################
[docs] def is_connected(self) -> bool: """Determine if the central device is connected to a peripheral. :return: ``True`` if central is connected to a peripheral device, `False` otherwise. :rtype: bool """ return self.__connected
[docs] def on_connected(self, connection_data): """Callback method to handle connection event. :param connection_data: Connection data :type connection_data: dict """ # Save local and target peer info self.__local = BDAddress.from_bytes( connection_data.initiator, addr_type=connection_data.init_addr_type ) self.__target = BDAddress.from_bytes( connection_data.advertiser, connection_data.adv_addr_type ) self.__stack.on_connection( connection_data.conn_handle, self.__local, self.__target ) self.__conn_handle = connection_data.conn_handle
[docs] def on_disconnected(self, disconnection_data): """Callback method to handle disconnection event. :param disconnection_data: Disconnection data :type disconnection_data: :class:`whad.protocol.ble_pb2.Disconnected` """ logger.debug("[central] Device disconnected, forward to stack") self.__stack.on_disconnection( disconnection_data.conn_handle, disconnection_data.reason ) self.__connected = False self.__local = None self.__target = None # Notify peripheral device about this disconnection logger.debug("[central] Notify peripheral object that a disconnection occurred") if self.__peripheral is not None: self.__peripheral.on_disconnect(disconnection_data.conn_handle)
[docs] def on_ctl_pdu(self, pdu): """This callback method is called whenever a control PDU is received. This PDU is then forwarded to the BLE stack to handle it. Central devices act as master, so we only forward slave to master messages to the stack. :param pdu: BLE Control PDU :type pdu: :class:`scapy.layers.bluetooth4LE.BTLE` """ logger.info('received control PDU') if pdu.metadata.direction == Direction.SLAVE_TO_MASTER: self.__stack.on_ctl_pdu(pdu.metadata.connection_handle, pdu)
[docs] def on_data_pdu(self, pdu): """This callback method is called whenever a data PDU is received. This PDU is then forwarded to the BLE stack to handle it. Central devices act as master, so we only forward slave to master messages to the stack. :param pdu: BLE Data PDU :type pdu: :class:`scapy.layers.bluetooth4LE.BTLE_DATA` """ logger.info('received data PDU') if pdu.metadata.direction == Direction.SLAVE_TO_MASTER: self.__stack.on_data_pdu(pdu.metadata.connection_handle, pdu)
[docs] def on_new_connection(self, connection): """On new connection, discover primary services. :param connection: New connection Protobuf message :type connection: :class:`whad.protocol.ble_pb2.Connected` """ logger.info('new connection established') # Use GATT client self.connection = connection self.__peripheral = PeripheralDevice( self, connection.gatt, connection.conn_handle, from_json=self.__profile_json ) # Retrieve GATT client self.__gatt_client = connection.gatt self.__gatt_client.set_client_model(self.__peripheral) self.__connected = True # Configure SMP layer # we set the security database self.connection.smp.set_security_database(self.__security_database) # Check if we got a matching LTK crypto_material = self.security_database.get(address=self.__target) if crypto_material is not None and crypto_material.has_ltk(): self.__stack.get_layer('ll').state.register_encryption_key( connection.conn_handle, crypto_material.ltk.value ) if crypto_material.is_authenticated(): #print("Marked as authenticated") self.__stack.get_layer('ll').state.mark_as_authenticated(connection.conn_handle) else: pass#print("Marked as unauthenticated") # Notify peripheral about this connection self.__peripheral.on_connect(self.connection.conn_handle)
[docs] def version(self, synchronous=True): """Query BLE version of remote peer. """ if self.connection is not None: # Send an LL_VERSION_IND PDU self.connection.send_version() # Wait for an answer (mandatory) if synchronous: while not self.connection.remote_version: sleep(0.01) result = self.connection.remote_version # Identify BT version if result.version in BT_VERSIONS: version = BT_VERSIONS[result.version] else: version = result.version # Identify BT company if result.company in BT_MANUFACTURERS: company = BT_MANUFACTURERS[result.company] else: company = result.company # Return information return (version, result.subversion, company) # Error return None
[docs] def export_profile(self): """Export GATT profile of the existing connection. :return: Profile as a JSON string :rtype: str """ return self.connection.gatt.model.export_json()