Source code for whad.ble.profile.device

Bluetooth Low Energy Peripheral abstraction

This module provides the PeripheralDevice class used to wrap all GATT operations
for a given connected device. This class wraps all the following operations:

* service and characteristics discovery
* ATT MTU exchange
* characteristic and descriptor read
* characteristic and descriptor write

For instance, a `Central` object can be used to initiate a BLE connection to a
target, and will return a `PeripheralDevice` object, as shown below::

    central = Central(...)
    target = central.connect('00:11:22:33:44:55')

One can then use this object to discover all the services and characteristics::

And look for a specific characteristic and read it::

    device_name = target.get_characteristic(UUID('1800'), UUID('2A00'))
    if device_name is not None:
        print('Device name is {}'.format(

It is also possible to write to a characteristic (if writeable)::

    device_name.value = b'MyNewDeviceName'

import logging
from struct import unpack
from time import sleep

from whad.ble.profile.service import Service
from whad.ble.profile.characteristic import CharacteristicProperties, Characteristic, \
from whad.ble.profile import GenericProfile
from whad.ble.profile.attribute import UUID

logger = logging.getLogger(__name__)

class PeripheralCharacteristicDescriptor:
    """Wrapper for a peripheral characteristic descriptor.

    def __init__(self, descriptor, gatt):
        """Initialize a PeripheralCharacteristicDescriptor.

        :param CharacteristicDescriptor descriptor: Descriptor to wrap.
        :param GattClient gatt: GATT client to use for GATT operations (read/write).
        self.__descriptor = descriptor
        self.__gatt = gatt

    def handle(self):
        """Return this characteristic descriptor handle.

        :return int: Descriptor handle
        return self.__descriptor.handle

    def type_uuid(self):
        """Return this attribute type UUID.

        :return UUID: Attribute type UUID
        return self.__descriptor.type_uuid

    def read(self):
        """Read descriptor value.

        :return bytes: Descriptor value

    def write(self, value, without_response=False):
        """Write descriptor value.

        :param bytes value: Value to write to this descriptor.
        :param bool without_response: If set, use a GATT write command request
                                      to write to this descriptor.
        if without_response:
            self.__gatt.write_command(self.__descriptor.handle, value)
            self.__gatt.write(self.__descriptor.handle, value)

class PeripheralCharacteristicValue:
    """CharacteristicValue wrapper for peripheral devices

    Forward all read/write operations to PeripheralCharacteristic wrapper
    because initially access to characteristic has been implemented there.

    Could be interesting in the future to implement all these operations here
    and to forward from characteristic wrapper to characteristic value wrapper
    because it makes more sense.

    Anyway, this code just works but from an architectural point-of-view is
    a bit crappy.

    def __init__(self, char_value, gatt):
        self.__char_value = char_value
        self.__characteristic = PeripheralCharacteristic(

    def handle(self) -> int:
        """Characteristic handle value
        return self.__char_value.handle

    def characteristic(self):
        """Underlying GATT characteristic
        return self.__characteristic

    def value(self):
        """Transparent characteristic read.

        :return bytes: Characteristic value

    def value(self, val):
        """Transparent characteristic write.

        :param bytes val: Value to write into this characteristic
        return self.__characteristic.write(val)

    def read(self, offset=0):
        """Read characteristic value

    def read_long(self):
        """Perform a read long GATT operation on characteristic
        return self.__characteristic.read_long()

    def write(self, value: bytes, without_response=False):
        """Write data to characteristic
        return self.__characteristic.write(value, without_response=without_response)

class PeripheralCharacteristic:
    """Characteristic wrapper for peripheral devices

    Instruments gatt to read/write a remote characteristic.
    def __init__(self, characteristic, gatt):
        self.__characteristic = characteristic
        self.__gatt = gatt

    def value(self) -> bytes:
        """Transparent characteristic read.

        :return bytes: Characteristic value

    def value(self, val):
        """Transparent characteristic write.

        :param bytes val: Value to write into this characteristic
        return self.write(val)

    def uuid(self) -> UUID:
        """Characteristic UUID
        return self.__characteristic.uuid

    def type_uuid(self) -> UUID:
        """Type UUID
        return self.__characteristic.type_uuid

    def properties(self):
        """Characteristic properties

    def handle(self) -> int:
        """Handle value
        return self.__characteristic.handle

    def end_handle(self) -> int:
        """End handle for this characteristic
        return self.__characteristic.end_handle

    def value_handle(self) -> int:
        """Handle for this characteristic value attribute
        return self.__characteristic.value_handle

    def can_notify(self) -> bool:
        """Check if characteristic accepts notifications
        return self.__characteristic.can_notify()

    def must_notify(self) -> bool:
        """Determine if notifications must be sent for this characteristic
        return self.__characteristic.must_notify()

    def can_indicate(self) -> bool:
        """Check if characteristic accepts indications
        return self.__characteristic.can_indicate()

    def must_indicate(self) -> bool:
        """Determine if indications must be sent for this characteristic
        return self.__characteristic.must_indicate()

    def read(self, offset: int = 0) -> bytes:
        """Read characteristic value
        if offset == 0:

        # Read from give offset if specified
        return self.__gatt.read_blob(self.__characteristic.value_handle, offset)

    def read_long(self) -> bytes:
        """Read long characteristic value
        return self.__gatt.read_long(self.__characteristic.value_handle)

    def write(self, value: bytes, without_response: bool = False) -> bool:
        """Set characteristic value

        If characteristic is only writeable without response, use a write command
        rather than a write request. Otherwise, use a write request. If a characteristic
        has both write and write without response properties, `without_response` must be
        set to True to use a write command.

        :param value: Value to write into the characteristic
        :type value: bytes
        :param without_response: Send a GATT write command instead of a GATT write
                                 if set to `True`
        :return: `True` on successful write, `False` otherwise
        :rtype: bool
        # If characteristic is only writeable without response, force without_response to True.
        access_mask = CharacteristicProperties.WRITE_WITHOUT_RESPONSE | CharacteristicProperties.WRITE
        if ( & access_mask) == CharacteristicProperties.WRITE_WITHOUT_RESPONSE:
            without_response = True

        if isinstance(value, bytes):
            if without_response:
                return self.__gatt.write_command(

            # Response required
            return self.__gatt.write(

        # Error
        raise ValueError()

    def descriptors(self):
        """Return all the descriptors associated with this characteristic.

        This method is a generator and will yield all the descriptors registered
        with this characteristic.
        for desc in self.__characteristic.descriptors():
            yield PeripheralCharacteristicDescriptor(

    def get_descriptor(self, type_uuid):
        """Get descriptor of a given type.

        :param UUID type_uuid: Descriptor type UUID
        :return PeripheralCharacteristicDescriptor: Return descriptor if found, `None` otherwise
        for desc in self.__characteristic.descriptors():
            if desc.type_uuid == type_uuid:
                return PeripheralCharacteristicDescriptor(
        # Not found
        return None

        # No descriptor
        return None

    def readable(self):
        """Check if this characteristic is readable.

        :return bool: True if readable, False otherwise.
        return ( & CharacteristicProperties.READ) != 0

    def writeable(self):
        """Check if this characteristic is writeable.

        :return bool: True if writeable, False otherwise.
        return (
            (( & CharacteristicProperties.WRITE) != 0) or
            (( & CharacteristicProperties.WRITE_WITHOUT_RESPONSE) != 0)

    def subscribe(self, notification=True, indication=False, callback=None):
        """Subscribe for notification/indication.

        :param bool notification: If set, subscribe for notification
        :param bool indication: If set, subscribe for indication (cannot be used
                                when notification is set)
        :param callable callback: Callback function to be called on
                                  indication/notification event
        :return bool: True if subscription has successfully been performed, False otherwise.
        if notification:
            # Look for CCCD
            desc = self.get_descriptor(UUID(0x2902))
            if desc is not None:
                # wrap our callback to provide more details about the concerned
                # characteristic
                def wrapped_cb(handle, value, indication=False):

                # Register our callback
                if callback is not None:

                # Enable notification
                desc.write(bytes([0x01, 0x00]))

                return True

            # No CCCD, cannot subscribe
            logger.debug("No CCC descriptor, cannot subscribe to charac. %s", self.uuid)
            return False

        if indication:
            # Look for CCCD
            desc = self.get_descriptor(UUID(0x2902))
            if desc is not None:
                # Register our callback
                if callback is not None:

                # Enable indication
                desc.write(bytes([0x02, 0x00]))

                return True

            # No CCCD, cannot subscribe for indications
            logger.debug("No CCC descriptor, cannot subscribe to charac. %s", self.uuid)
            return False

        # No indication or notification subscription required
        return False

    def unsubscribe(self):
        """Unsubscribe from this characteristic.
        # Look for CCCD
        desc = self.get_descriptor(UUID(0x2902))

        if desc is not None:
            # Disable notification/indication
            desc.write(bytes([0x00, 0x00]))

            # Unregister our callback
            return True

        # No descriptor found, cannot unsubscribe
        logger.debug("No CCC descriptor, cannot unsubscribe from charac. %s", self.uuid)
        return False

class PeripheralService:
    """Service wrapper for peripheral devices

    def __init__(self, service, gatt):
        self.__service = service
        self.__gatt = gatt

    def handle(self):
        """Return this service handle.

        :param int: service handle
        return self.__service.handle

    def end_handle(self):
        """Return this service end handle.

        :return int: end handle
        return self.__service.end_handle

    def uuid(self):
        """Return this service UUID.

        :param UUID: Service UUID
        return self.__service.uuid

    def type_uuid(self):
        """Return this service type UUID

        :return UUID: Service type UUID
        return self.__service.type_uuid

    def read_characteristic_by_uuid(self, uuid):
        """Read a characteristic belonging to this service identified by its UUID.

        :param UUID uuid: Characteristic UUID
        :return bytes: Characteristic value
        return self.__gatt.read_characteristic_by_uuid(

    def get_characteristic(self, uuid):
        """Look for a specific characteristic belonging to this service, identified by its UUID.

        :param UUID uuid: Characteristic UUID
        :return PeripheralCharacteristic: Found characteristic if any, `None` otherwise.
        for charac in self.__service.characteristics():
            if charac.uuid == uuid:
                return PeripheralCharacteristic(
        return None

    def characteristics(self):
        """Enumerate this service's characteristics (generator).
        for characteristic in self.__service.characteristics():
            yield PeripheralCharacteristic(

[docs] class PeripheralDevice(GenericProfile): """GATT client wrapper representing a remote device. This class is used to wrap a device model used in a gatt client in order to provide easy-to-use methods to access its services, characteristics and descriptors. """ def __init__(self, central, gatt_client, conn_handle, from_json=None): """Create a peripheral device from a Central and a GATT client. :param central: Central instance used to connect to a target device. :type central: :class:`whad.ble.connector.central.Central` :param gatt_client: GATT client connected to a target device. :type gatt_client: :class:`whad.ble.stack.gatt.GattClient` :param conn_handle: Current connection handle. :type conn_handle: int :param from_json: GATT profile (JSON) to be used when instanciating the underlying GattProfile. :type from_json: str, optional """ self.__gatt = gatt_client self.__smp = gatt_client.smp self.__ll = gatt_client.get_layer('ll') self.__conn_handle = conn_handle self.__central = central self.__disconnect_cb = None super().__init__(from_json=from_json) @property def conn_handle(self) -> int: """Current connection handle. """ return self.__conn_handle
[docs] def start_encryption(self): """Start encryption procedure for BLE peripheral """ security_database = self.__smp.security_database crypto_material = security_database.get(address=self.__central.target_peer) conn_handle = self.__smp.get_layer('l2cap').state.conn_handle if crypto_material is not None and crypto_material.has_ltk(): self.__ll.start_encryption( conn_handle, unpack('>Q', crypto_material.ltk.rand)[0], crypto_material.ltk.ediv )
[docs] def pairing(self, pairing=None): """Trigger a pairing according to provided parameters. Default parameters will be used if pairing parameter is None. """ if not self.__smp.initiate_pairing(parameters=pairing): return False while not self.__smp.is_pairing_done(): sleep(0.1) if self.__smp.is_pairing_failed(): return False self.__smp.reset_state() return True
[docs] def set_disconnect_cb(self, callback): """Set disconnection callback. :param callback: Callback function to call on disconnection. :type callback: callable """ self.__disconnect_cb = callback
[docs] def set_mtu(self, mtu: int): """Update connection MTU. :param mtu: ATT MTU to use for this connection. :type mtu: int :return: Remote device MTU. :rtype: int """ return self.__gatt.set_mtu(mtu)
[docs] def disconnect(self): """Terminate the connection to this device """ # Ask associated central to disconnect this peripheral device. self.__central.disconnect(self.__conn_handle)
[docs] def discover(self): """Discovers services, characteristics and descriptors. This method must be called before accessing any service or characteristic, as it is required to retrieve the corresponding GATT handles. """ # Discover
[docs] def find_service_by_uuid(self, uuid: UUID) -> PeripheralService: """Find service by its UUID :param uuid: Characteristic UUID :type uuid: :class:`whad.ble.profile.attribute.UUID` :return: PeripheralService: An instance of PeripheralService if service has been found, None otherwise. :rtype: :class:`whad.ble.profile.device.PeripheralService` """ service = self.__gatt.discover_primary_service_by_uuid(uuid) if service is not None: return PeripheralService( service, self.__gatt ) # Not found return None
[docs] def find_characteristic_by_uuid(self, uuid: UUID): """Find characteristic by its UUID :param uuid: Characteristic UUID :type uuid: :class:`whad.ble.profile.attribute.UUID` :return: PeripheralCharacteristic: An instance of PeripheralCharacteristic if characteristic has been found, None otherwise. :rtype: :class:`whad.ble.profile.device.PeripheralCharacteristic` """ for service in for charac in service.characteristics(): if charac.uuid == uuid: return PeripheralCharacteristic( charac, self.__gatt ) # Not found return None # Not found return False
[docs] def find_object_by_handle(self, handle): """Find an existing object (service, attribute, descriptor) based on its handle, it known from the underlying GenericProfile. :param handle: Object handle :type handle: int :return: Characteristic, characteristic value or service :rtype: :class:`whad.ble.profile.device.PeripheralCharacteristic`, :class:`whad.ble.profile.device.PeripheralCharacteristicValue`, :class:`whad.ble.profile.device.PeripheralService` """ obj = super().find_object_by_handle(handle) if isinstance(obj, Characteristic): return PeripheralCharacteristic( obj, self.__gatt ) if isinstance(obj, Service): return PeripheralService( obj, self.__gatt ) if isinstance(obj, CharacteristicValue): return PeripheralCharacteristicValue( obj, self.__gatt ) # Not found return None
[docs] def get_characteristic(self, service_uuid: UUID, charac_uuid: UUID): """Get a PeripheralCharacteristic object representing a characteristic defined by the given service UUID and characteristic UUID. :param service_uuid: Service UUID :type service_uuid: :class:`whad.ble.profile.attribute.UUID` :param charac_uuid: Characteristic UUID :type charac_uuid: :class:`whad.ble.profile.attribute.UUID` :return: PeripheralCharacteristic object on success, None if not found. :rtype: :class:`whad.ble.profile.device.PeripheralCharacteristic` """ service = self.get_service(service_uuid) if service is not None: return service.get_characteristic(charac_uuid) return None
[docs] def get_service(self, uuid): """Retrieve a PeripheralService object given its UUID. :param uuid: Service UUID :type uuid: :class:`whad.ble.profile.attribute.UUID` :return: Corresponding PeripheralService object if found, None otherwise. :rtype: :class:`whad.ble.profile.device.PeripheralService` """ for service in if service.uuid == uuid: return PeripheralService(service, self.__gatt) return None
[docs] def write(self, handle, value): """Perform a write operation on an attribute based on its handle. This method allows to interact with characteristics and descriptors without having performing a GATT services and characteristics discovery. One just need to specify the handle corresponding to a characteristic value or descriptor and the value to write to it, and our GATT stack will handle it. Note that there is absolutely no check on corresponding characteristic permissions (meaning you can try to write on a read-only characteristic value) and that this method may raise exceptions due to potential GATT errors the remote device may return. :param handle: Characteristic or descriptor handle to write. :type handle: int :param value: Bytes to write into this characteristic. :type value: bytes """ return self.__gatt.write(handle, value)
[docs] def write_command(self, handle, value): """Perform a write command operation (no write response will be sent) on an attribute based on its handle. This method allows to interact with characteristics and descriptors without having performing a GATT services and characteristics discovery. One just need to specify the handle corresponding to a characteristic value or descriptor and the value to write to it, and our GATT stack will handle it. Note that there is absolutely no check on corresponding characteristic permissions (meaning you can try to write on a read-only characteristic value) and that this method may raise exceptions due to potential GATT errors the remote device may return. :param handle: Characteristic or descriptor handle to write. :type handle: int :param value: Bytes to write into this characteristic. :type value: bytes """ return self.__gatt.write_command(handle, value)
[docs] def read(self, handle, offset=None, long=False): """Perform a read operation on an attribute based on its handle. This method allows to interact with characteristics and descriptors without having performing a GATT services and characteristics discovery. One just need to specify the handle corresponding to a characteristic value or descriptor and our GATT stack will handle it. Note that there is absolutely no check on corresponding characteristic permissions (meaning you can try to read from a write-only characteristic value) and that this method may raise exceptions due to potential GATT errors the remote device may return. :param handle: Characteristic or descriptor handle. :type handle: int :param offset: Offset applied when reading data from characteristic or descriptor (default: 0). :type offset: int, optional :param long: use GATT long read procedure if set to True (default: False) :type long: bool, optional :return: Content of the characteristic or descriptor. :rtype: bytes """ if not long: if offset is None: return # Use provided offset return self.__gatt.read_blob(handle, offset=offset) # Read long return self.__gatt.read_long(handle)
[docs] def on_disconnect(self, conn_handle): """Disconnection callback :param conn_handle: Connection handle :type conn_handle: int """ logger.debug('PeripheralDevice has disconnected') if self.__disconnect_cb is not None: self.__disconnect_cb()