Source code for whad.ble.profile

"""This module provides different classes that represent a BLE device and
allows to interact with it:

* :class:`whad.ble.profile.GenericProfile` is a base class used to register all
  the ATT attributes, including services, characteristics, characteristic values
  and descriptors. It is able to inspect any derived class and build the
  corresponding profile based on properties declared with
  :class:`whad.ble.profile.service.PrimaryService` and
  :class:`whad.ble.profile.characteristic.Characteristic`.

"""
import json
import logging
from typing import List, Iterator

from whad.ble.profile.attribute import Attribute, UUID
from whad.ble.profile.characteristic import Characteristic as BleCharacteristic,\
    CharacteristicProperties, ClientCharacteristicConfig, \
    ReportReferenceDescriptor as BleReportReferenceDescriptor, \
    CharacteristicUserDescriptionDescriptor as BleCharacteristicUserDescriptionDescriptor

from whad.ble.profile.service import PrimaryService as BlePrimaryService, \
    SecondaryService as BleSecondaryService, IncludeService as BleIncludeService, \
    Service
from whad.ble.exceptions import InvalidHandleValueException
from whad.ble.stack.att.constants import SecurityAccess

logger = logging.getLogger(__name__)

###################################
# Decorators for GenericProfile
###################################

class CharacteristicHook:
    """Characteristic hook decorator.

    This class defines the base hook decorator class and must be inherited.
    """

    def __init__(self, *args):
        if len(args) == 1:
            characteristic = args[0]
            if isinstance(characteristic, Characteristic):
                self.__characteristic = f"{characteristic.service.uuid}:{characteristic.uuid}"
            else:
                raise TypeError
        elif len(args) == 2:
            service = args[0]
            charac = args[1]
            if isinstance(service, str) and isinstance(charac, str):
                self.__characteristic = f"{UUID(service)}:{UUID(charac)}"
            elif isinstance(service, UUID) and isinstance(charac, UUID):
                self.__characteristic = f"{service}:{charac}"

    def __call__(self, method):
        if not hasattr(method, "hooks"):
            method.hooks = []
        if not hasattr(method, "characteristic"):
            method.characteristic = self.__characteristic
        return method

# Decorator name does not start with a capital letter for ease of use
# pylint: disable-next=invalid-name
class read(CharacteristicHook):
    """Read hook decorator

    This decorator is used to declare a callback method for read operations
    on a specific characteristic.
    """

    def __call__(self, method):
        """Add a specific hook to the method
        """
        super().__call__(method)
        if "read" not in method.hooks:
            method.hooks.append('read')
        return method

# Decorator name does not start with a capital letter for ease of use
# pylint: disable-next=invalid-name
class write(CharacteristicHook):
    """Write hook decorator

    This decorator is used to declare a callback method for write operations
    on a specific characteristic. This callback will be called **before** the
    write operation happens.
    """

    def __call__(self, method):
        """Add a specific hook to the method
        """
        super().__call__(method)
        if "write" not in method.hooks:
            method.hooks.append('write')
        return method

# Decorator name does not start with a capital letter for ease of use
# pylint: disable-next=invalid-name
class written(CharacteristicHook):
    """Written hook decorator

    This decorator is used to declare a callback method for write operations
    on a specific characteristic. This callback will be called **after** the
    write operation happens.
    """

    def __call__(self, method):
        """Add a specific hook to the method
        """
        super().__call__(method)
        if "written" not in method.hooks:
            method.hooks.append("written")
        return method

# Decorator name does not start with a capital letter for ease of use
# pylint: disable-next=invalid-name
class subscribed(CharacteristicHook):
    """Subscribe hook decorator

    This decorator is used to declare a callback method for subscribe operations
    on a specific characteristic.
    """

    def __call__(self, method):
        """Add a specific hook to the method
        """
        super().__call__(method)
        if "sub" not in method.hooks:
            method.hooks.append("sub")
        return method

# Decorator name does not start with a capital letter for ease of use
# pylint: disable-next=invalid-name
class unsubscribed(CharacteristicHook):
    """Unsubscribe hook decorator

    This decorator is used to declare a callback method for unsubscribe operations
    on a specific characteristic.
    """

    def __call__(self, method):
        """Add a specific hook to the method
        """
        super().__call__(method)
        if "unsub" not in method.hooks:
            method.hooks.append("unsub")
        return method

def is_method_hook(method):
    """Determine if a method is a characteristic operation hook
    """
    if hasattr(method, "hooks") and hasattr(method, "characteristic"):
        return len(method.hooks) > 0
    return False

################################
# Descriptors model
#
# This section contains all the descriptor models to use while creating
# a profile from Python code. It contains a set of alternative classes
# used by GenericProfile to build the attribute database and populate an
# instance with the corresponding properties and objects.
################################

class CharacteristicDescriptor:
    """Generic CharacteristicDescriptor model
    """

    def __init__(self, bleclass=None):
        """Instanciate a characteristic descriptor model

        :param str name: attribute name to access this descriptor
        :param class bleclass: BLE descriptor class to use when instanciating the model
        :param list permissions: descriptor permissions (read/write/notify/indicate)
        """
        self.__handle = 0
        self.__class = bleclass

    @property
    def handle(self) -> int:
        """Descriptor handle value
        """
        return self.__handle

    @handle.setter
    def handle(self, value: int):
        """Set descriptor handle value
        """
        self.__handle = value

    @property
    def bleclass(self):
        """BLE descriptor class used for instanciation
        """
        return self.__class


class ReportReferenceDescriptor(CharacteristicDescriptor):
    """Report Reference Descriptor model

    TODO: does it require specific permissions to be set ?
    """
    def __init__(self):
        super().__init__(BleReportReferenceDescriptor)

[docs] class Characteristic: """GATT characteristic. """
[docs] def __init__(self, name=None, uuid=None, value=b'', permissions=None, notify=False, indicate=False, description=None, security: list = None, **kwargs): """Declares a GATT characteristic. Other named arguments are used to declare characteristic's descriptors. :param name: Characteristic name used in GATT model :type name: str :param uuid: Characteristic UUID :type uuid: :class:`whad.ble.profile.attribute.UUID` :param permissions: List of permissions for this characteristic (*read*, *write*, *notify*, *indicate*) :type permissions: list :param notify: Enable notifications :type notify: bool :param indicate: Enable indications :type indicate: bool :param description: Textual description for this characteristic :type description: str :param security: Indicate the security property associated to this characteristic :type security: SecurityAccess """ self.__handle = 0 self.__name = name self.__uuid = uuid self.__value = value self.__perms = permissions self.__notify = notify self.__indicate = indicate self.__security = SecurityAccess.generate(security if security is not None else []) self.__service = None self.__description = description self.__descriptors = [] # Loop on kwargs to find descriptors for arg, argval in kwargs.items(): if isinstance(argval, CharacteristicDescriptor): descriptor = argval descriptor.handle = 0 descriptor.name = arg self.add_descriptor(descriptor) # Add descriptor to a property to this ServiceModel instance if not hasattr(self, arg): setattr(self, arg, descriptor)
[docs] def add_descriptor(self, descriptor): """Add descriptor to our descriptor list :param descriptor: Descriptor to add to the characteristic's descriptor list :type descriptor: :class:`whad.ble.profile.characteristic.CharacteristicDescriptor` """ self.__descriptors.append(descriptor)
[docs] def descriptors(self) -> Iterator[CharacteristicDescriptor]: """Enumerate descriptors attached to this characteristic This method will yield every descriptor attached to the characteristic. """ yield from self.__descriptors
[docs] def get_required_handles(self) -> int: """Compute the number of handles this characteristic will consume :return: Number of handles :rtype: int """ handles = 2 # A more handle as we may need a ClientCharacteristicConfiguration descriptor if self.__notify or self.__indicate: handles += 1 return handles
[docs] def attach(self, service): """Attach this characteristic to the corresponding service. :param service: Service :type service: :class:̀ whad.ble.profile.service.Service` """ self.__service = service
@property def handle(self) -> int: """Characteristic handle """ return self.__handle @handle.setter def handle(self, value): """Set characteristic handle. :param value: New handle value :type value: int """ self.__handle = value @property def end_handle(self) -> int: """Characteristic end handle (including characteristic value and descriptors). """ return self.handle + self.get_required_handles() - 1 @property def name(self) -> str: """Name """ return self.__name @name.setter def name(self, value): """Set characteristic name. :param value: New name :type value: str """ self.__name = value @property def uuid(self): """Characteristic UUID """ return self.__uuid @property def value(self) -> UUID: """Characteristic value UUID """ return self.__value @property def permissions(self) -> List[str]: """Characteristics permissions """ return self.__perms @property def must_notify(self) -> bool: """Check if notification has to be sent on value change. """ return self.__notify @property def must_indicate(self) -> bool: """Check if indication has to be sent on value change. """ return self.__indicate @property def description(self) -> str: """Return characteristic textual description, if any """ return self.__description @property def service(self) -> Service: """Related service. """ return self.__service @property def security(self) -> SecurityAccess: """Returns security access property """ return self.__security
class ServiceModel: """Bluetooth Low Energy service model used to describe a GATT service. """ PRIMARY = 1 SECONDARY = 2 def __init__(self, uuid=None, start_handle=None, end_handle=None, name=None, **kwargs): self.__handle = 0 self.__end_handle = 0 self.__uuid = uuid self.__name = name self.__characteristics = [] self.__included_services = [] if start_handle is None: self.__handle = 0 else: self.__handle = start_handle if end_handle is None: self.__end_handle = 0 else: self.__end_handle = end_handle # Loop on kwargs to find characteristics and included services for arg, argval in kwargs.items(): if isinstance(argval, Characteristic): charac = argval charac.handle = 0 charac.name = arg self.add_characteristic(charac) charac.attach(self) # Add characteristic to a property to this ServiceModel instance if not hasattr(self, arg): setattr(self, arg, charac) elif isinstance(argval, SecondaryService): # We must include this secondary service in this service service = argval self.add_included_service(service) # Add included service to a property to this ServiceModel instance if not hasattr(self, arg): setattr(self, arg, service) def add_characteristic(self, characteristic_model): """Add a characteristic to the model """ # Add characteristic to the list of our characteristics self.__characteristics.append(characteristic_model) # Update end handle value (include definition is a single attribute) self.__end_handle = max(self.__end_handle, characteristic_model.end_handle) def add_included_service(self, service_model): """Add an included service to the model """ self.__included_services.append(service_model) # Update end handle value self.__end_handle = max(self.__end_handle, service_model.end) @property def uuid(self) -> UUID: """Service UUID """ return self.__uuid @property def handle(self) -> int: """Handle value """ return self.__handle @property def end(self) -> int: """End handle value """ return self.__end_handle @property def name(self) -> str: """Service name """ return self.__name @name.setter def name(self, value: str): """Set service name """ self.__name = value @handle.setter def handle(self, value: int): """Set service handle value """ self.__handle = value def characteristics(self) -> Iterator[Characteristic]: """Enumerate characteristics """ yield from self.__characteristics def included_services(self): """Enumerate services """ yield from self.__included_services
[docs] class PrimaryService(ServiceModel): """Primary service model. """
class SecondaryService(ServiceModel): """Secondary service model. """
[docs] class GenericProfile: """Generic Profile """
[docs] def __init__(self, start_handle=0, from_json=None): """Parse the device model, instanciate all the services, characteristics and descriptors, compute all handle values and registers everything inside this instance for further use. :param start_handle: Start handle value to use (default: 0) :type start_handle: int :param from_json: JSON data describing a GATT profile :type from_json: str """ self.__attr_db = {} self.__services = [] self.__service_by_characteristic_handle = {} self.__start_handle = start_handle self.__handle = self.__start_handle self.__hooks = {} # Populate attribute database and model from JSON export if provided if from_json is not None: from_json = json.loads(from_json) # Parse JSON services, characteristics and descriptors to create # the corresponding model and attribute database if 'services' in from_json: # Loop on services for service in from_json['services']: # Collect characteristics if UUID(service['type_uuid']) == UUID(0x2800): service_obj = BlePrimaryService( uuid=UUID(service['uuid']), handle=service['start_handle'] ) elif UUID(service['type_uuid']) == UUID(0x2801): service_obj = BleSecondaryService( uuid=UUID(service['uuid']), handle=service['start_handle'] ) else: # This is not a known service type UUID, continue with # next service continue if 'characteristics' in service: for charac in service['characteristics']: charac_obj = BleCharacteristic( uuid=UUID(charac['value']['uuid']), handle=charac['handle'], value=b'', properties=charac['properties'], security=SecurityAccess.int_to_accesses(charac['security']) ) # Loop on descriptors, only support CCC at the moment for desc in charac['descriptors']: if UUID(desc['uuid']) == UUID(0x2902): desc_obj = ClientCharacteristicConfig( charac_obj, handle=desc['handle'], notify=charac_obj.must_notify(), indicate=charac_obj.must_indicate() ) charac_obj.add_descriptor(desc_obj) self.register_attribute(desc_obj) # Register characteristic and its value self.register_attribute(charac_obj) self.register_attribute(charac_obj.value_attr) # Add characteristic to its related service service_obj.add_characteristic(charac_obj) self.register_attribute(service_obj) self.add_service(service_obj) else: # Introspect this class definition and build model services = [] props = dir(self) for prop in props: if not prop.startswith('_'): if isinstance(getattr(self, prop), ServiceModel): service = getattr(self, prop) service.name = prop services.append(service) # Instanciate each service, and for each of them the corresponding # characteristics for service in services: if isinstance(service, PrimaryService): logger.info("creating primary service %s", service.uuid) # Create service service_obj = BlePrimaryService( uuid=service.uuid, handle=self.__alloc_handle() ) elif isinstance(service, SecondaryService): logger.info("creating secondary service %s", service.uuid) # Create service service_obj = BleSecondaryService( uuid=service.uuid, handle=self.__alloc_handle() ) self.__attr_db[service_obj.handle] = service_obj else: continue # Create the corresponding instance property setattr(self, service.name, service_obj) # Loop on included services and create them if required for inc_service in service.included_services(): inc_service_obj = BleIncludeService( uuid=inc_service.uuid, handle=self.__alloc_handle(1), start_handle=inc_service.handle, end_handle=inc_service.end ) self.__handle = inc_service_obj.end_handle # Register this service include definition self.register_attribute(inc_service_obj) service_obj.add_include_service(inc_service_obj) # Loop on underlying characteristics, and create them too. for charac in service.characteristics(): charac_props = 0 if 'read' in charac.permissions: charac_props |= CharacteristicProperties.READ if 'write' in charac.permissions: charac_props |= CharacteristicProperties.WRITE if 'write_without_response' in charac.permissions: charac_props |= CharacteristicProperties.WRITE_WITHOUT_RESPONSE if charac.must_notify: charac_props |= CharacteristicProperties.NOTIFY if charac.must_indicate: charac_props |= CharacteristicProperties.INDICATE charac_obj = BleCharacteristic( uuid=charac.uuid, handle=self.__alloc_handle(1), value=charac.value, properties=charac_props, security=charac.security ) logger.info(" creating characteristic %s (handle:%d)", charac_obj.uuid, charac_obj.handle ) self.__handle = charac_obj.end_handle # Register this characteristic self.register_attribute(charac_obj) self.register_attribute(charac_obj.value_attr) # If notify or indicate is set to true, we must add a new CCC descriptor if charac.must_notify or charac.must_indicate: ccc_desc = ClientCharacteristicConfig( charac_obj, handle=self.__alloc_handle(), notify=charac.must_notify, indicate=charac.must_indicate ) logger.info(" creating cccd (handle:%d)", ccc_desc.handle) charac_obj.add_descriptor(ccc_desc) self.register_attribute(ccc_desc) # If characteristic description has been set, add a descriptor if charac.description is not None: cudd_desc = BleCharacteristicUserDescriptionDescriptor( charac_obj, handle=self.__alloc_handle(), description=charac.description ) logger.info(" creating cudd (handle:%d) with text \"%s\"", cudd_desc.handle, charac.description ) charac_obj.add_descriptor(cudd_desc) self.register_attribute(cudd_desc) # Loop on other characteristic descriptors and add them for descriptor in charac.descriptors(): desc = descriptor.bleclass( charac_obj, handle=self.__alloc_handle() ) logger.info(" creating %s descriptor (handle:%d)", type(desc), desc.handle ) charac_obj.add_descriptor(desc) self.register_attribute(desc) # Add our characteristic object to the corresponding service setattr(service_obj, charac.name, charac_obj) service_obj.add_characteristic(charac_obj) self.add_service(service_obj) # We then need to update included service start and end handles for inc_service in self.included_services(): # Retrieve the included service UUID service_uuid = inc_service.service_uuid # Retrieve the corresponding object by UUID service_obj = self.get_service_by_uuid(service_uuid) # If found, update start and end handles if service_obj is not None: inc_service.service_start_handle = service_obj.handle inc_service.service_end_handle = service_obj.end_handle # Register any hook function props = dir(self) for prop in props: prop_obj = getattr(self, prop) # Is this property a callable hook ? if callable(prop_obj) and is_method_hook(prop_obj): # Associate hook method with each operation if prop_obj.characteristic not in self.__hooks: self.__hooks[prop_obj.characteristic] = {} for operation in prop_obj.hooks: self.__hooks[prop_obj.characteristic][operation] = prop_obj
def __alloc_handle(self, number=1): """Allocate one or more handle values. :param number: Number of handle values to allocate :type number: int :return: Current handle value :rtype: int """ self.__handle += number return self.__handle def __repr__(self): output = '' for service in self.services(): output += ( f"Service {service.uuid} (handles from {service.handle:d} to " f"{service.end_handle:d}):\n" ) for inc_service in service.included_services(): output += ( f" Included service {inc_service.service_uuid} " f"(handle:{inc_service.handle:d}, " f"start_handle:{inc_service.service_start_handle:d}, " f"end_handle:{inc_service.service_end_handle:d})\n" ) for charac in service.characteristics(): properties = charac.properties charac_rights = '' if properties & CharacteristicProperties.READ != 0: charac_rights += 'R' if properties & CharacteristicProperties.WRITE != 0: charac_rights += 'W' if properties & CharacteristicProperties.INDICATE != 0: charac_rights += 'I' if properties & CharacteristicProperties.NOTIFY != 0: charac_rights += 'N' output += ( f" Characteristic {charac.uuid} (handle:{charac.handle:d}, " f"value handle: {charac.value_handle:d}, " f"props: {charac_rights})\n" ) for desc in charac.descriptors(): output += f" Descriptor {desc.type_uuid} (handle: {desc.handle:d})\n" return output
[docs] def register_attribute(self, attribute): """Register a GATT attribute :param attribute: Attribute to register :type attribute: :class:`whad.ble.profile.attribute.Attribute` """ if isinstance(attribute, Attribute): self.__attr_db[attribute.handle] = attribute
[docs] def add_service(self, service, handles_only=False): """Add a service to the current device :param service: Service to add to the device :type service: :class:`whad.ble.profile.service.Service` :param handles_only: Add only service handles if set to ``True`` :type handles_only: bool """ logger.debug("add service %s", service.uuid) if service.handle == 0: # Service has not been fully configured, update its handle # and the handles of its characteristics and descriptors. service.handle = self.__alloc_handle() # Append service to the list of our services if not handles_only: self.__services.append(service) # Register service as an attribute self.register_attribute(service) # Register all its characteristics for charac in service.characteristics(): # Register Characteristic and its CharacteristicValue self.register_attribute(charac) self.register_attribute(charac.value_attr) # Register characteristic's descriptors for desc in charac.descriptors(): self.register_attribute(desc) # Add characteristic in our lookup table self.__service_by_characteristic_handle[charac.handle] = service # Update our last handle based on service's end handle self.__handle = service.end_handle
[docs] def remove_service(self, service, handles_only=False): """Remove service :param service: Service object or UUID :type service: :class:`whad.ble.profile.service.Service` :param handles_only: Remove only handles if set to ``True`` :type handles_only: bool """ if isinstance(service, (BlePrimaryService, BleSecondaryService)): service_obj = self.get_service_by_uuid(service.uuid) elif isinstance(service, UUID): service_obj = self.get_service_by_uuid(service) else: service_obj = None # Process service object if service_obj is not None: # Remove service and all its characteristics from the attribute DB for charac in service_obj.characteristics(): # Remove characteristic handle if charac.handle in self.__attr_db: del self.__attr_db[charac.handle] # Remove characteristic value handle if charac.value_handle in self.__attr_db: del self.__attr_db[charac.value_handle] # Remove all the attached descriptors for desc in charac.descriptors(): if desc.handle in self.__attr_db: del self.__attr_db[desc.handle] # Remove service object from attribute db del self.__attr_db[service_obj.handle] # Remove service from our list of services (if required) if not handles_only: self.__services.remove(service) else: # Not found, raise IndexError raise IndexError()
[docs] def update_service(self, service) -> bool: """Update service in profile. Keep service in place in the service list, but update all the services declared after this one. :param service: Service object to update. :type service: :class:`whad.ble.profile.service.Service` :return: ``True`` if service has been updated, ``False`` otherwise. :rtype: bool """ try: service_index = self.__services.index(service) # Remove all handles used by this service self.remove_service(service, handles_only=True) # Register all the handles back into our attribute DB self.add_service(service, handles_only=True) # Update all other services handle = service.end_handle for remaining_service in self.__services[service_index+1:]: remaining_service.handle = handle + 1 self.update_service(remaining_service) handle = remaining_service.end_handle self.__handle = handle return True except IndexError: return False
[docs] def find_object_by_handle(self, handle) -> Attribute: """Find an object by its handle value :param handle: Object handle :type handle: int :return: Object if handle is valid, or raise an IndexError exception otherwise :rtype: :class:`whad.ble.profile.attribute.Attribute` :raises: IndexError """ if handle in self.__attr_db: return self.__attr_db[handle] # Error. raise IndexError
[docs] def find_objects_by_range(self, start, end) -> List[Attribute]: """Find attributes with handles belonging in the [start, end+1] interval. :param start: Start handle value :type start: int :param end: End handle value :type end: int :return: List of objects with handles between start and end values :rtype: list """ handles = [] for handle in self.__attr_db: if start <= handle <= end: handles.append(handle) handles.sort() return [self.find_object_by_handle(handle) for handle in handles]
[docs] def find_characteristic_by_value_handle(self, value_handle) -> BleCharacteristic: """Find characteristic object by its value handle. :param value_handle: Characteristic value handle :type value_handle: int :return: Corresponding characteristic object or ``None`` if not found. :rtype: :class:`whad.ble.profile.characteristic.Characteristic` """ try: char_value = self.find_object_by_handle(value_handle) if char_value is not None and hasattr(char_value, 'characteristic'): return char_value.characteristic # Not found. return None except InvalidHandleValueException: return None
[docs] def find_characteristic_end_handle(self, handle) -> int: """Find characteristic end handle based on its handle. :param handle: Characteristic handle :type handle: int :rtype: int :return: Characteristic value handle :raises: :class:`whad.ble.exceptions.InvalidHandleValueException` """ try: # Find service owning the characteristic service = self.find_service_by_characteristic_handle(handle) # Build a list of characteristic handles service_char_handles=[] for characteristic in service.characteristics(): service_char_handles.append(characteristic.handle) # Sort handles service_char_handles.sort() idx = service_char_handles.index(handle) if idx == len(service_char_handles) - 1: return service.end_handle return service_char_handles[idx+1] - 1 except InvalidHandleValueException: return None
[docs] def find_service_by_characteristic_handle(self, handle) -> Service: """Find a service object given a characteristic handle that belongs to this service. :param handle: Characteristic handle belonging to the searched service :type handle: int :rtype: :class:`whad.ble.profile.service.Service` :return: Service object containing the specified characteristic :raises: :class:`whad.ble.exceptions.InvalidHandleValueException` """ try: if handle in self.__service_by_characteristic_handle: return self.__service_by_characteristic_handle[handle] # Invalid handle raise InvalidHandleValueException except IndexError as err: raise InvalidHandleValueException from err
[docs] def services(self) -> Iterator[Service]: """Enumerate service objects. This method is a generator and will yield service objects registered into the profile. """ for _, obj in self.__attr_db.items(): if isinstance(obj, (BlePrimaryService, BleSecondaryService)): yield obj
[docs] def included_services(self) -> Iterator[BleIncludeService]: """Enumerate included services. """ for _, obj in self.__attr_db.items(): if isinstance(obj, BleIncludeService): yield obj
[docs] def get_service_by_uuid(self, service_uuid: UUID): """Get a service by its UUID. :param service_uuid: Service UUID to look for :type service_uuid: :class:`whad.ble.profile.attribute.UUID` :return: Service if found, ``None`` otherwise :rtype: :class:`whad.ble.profile.service.Service` """ for _, obj in self.__attr_db.items(): if isinstance(obj, (BlePrimaryService, BleSecondaryService)): if obj.uuid == service_uuid: return obj # Not found return None
[docs] def get_characteristic_by_uuid(self, charac_uuid: UUID): """Get characteristic by its UUID. :param charac_uuid: Characteristic UUID to look for :type charac_uuid: :class:`whad.ble.profile.attribute.UUID` :return: Characteristic if found, ``None`` otherwise :rtype: :class:`whad.ble.profile.characteristic.Characteristic` """ for _, obj in self.__attr_db.items(): if isinstance(obj, BleCharacteristic): if obj.uuid == charac_uuid: return obj # Not found return None
[docs] def attr_by_type_uuid(self, uuid, start=1, end=0xFFFF) -> Iterator[Attribute]: """Enumerate attributes that have a specific type UUID. :param uuid: Type UUID :type uuid: :class:`whad.ble.profile.attribute.UUID` :param start: Start handle :type start: int :param end: End handle :type end: int """ for _, obj in self.__attr_db.items(): if obj.type_uuid == uuid and start <= obj.handle <= end: yield obj
[docs] def export_json(self): """Export profile as JSON data, including services, characteristics and descriptors definition. :return: JSON data corresponding to this profile :rtype: str """ profile_dict = {} profile_dict['services'] = [] for service in self.services(): service_dict = { 'uuid': str(service.uuid), 'type_uuid': str(service.type_uuid), 'start_handle': service.handle, 'end_handle': service.end_handle } service_dict['characteristics'] = [] for charac in service.characteristics(): charac_dict = { 'handle': charac.handle, 'uuid': str(charac.type_uuid), 'properties': charac.properties, 'security': SecurityAccess.accesses_to_int(charac.security), 'value': { 'handle': charac.value_handle, 'uuid': str(charac.uuid), } } charac_dict['descriptors'] = [] for desc in charac.descriptors(): desc_dict = { 'handle': desc.handle, 'uuid': str(desc.type_uuid) } charac_dict['descriptors'].append(desc_dict) service_dict['characteristics'].append(charac_dict) profile_dict['services'].append(service_dict) return json.dumps(profile_dict)
[docs] def find_hook(self, service, characteristic, operation) -> callable: """Find a registered hook for a specific service, characteristic and operation. :param service: Service object :type service: :class:`whad.ble.profile.service.Service` :param characteristic: Characteristic object :type characteristic: :class:`whad.ble.profile.characteristic.Characteristic` :param operation: GATT operation :type operation: str :return: Hook callback :rtype: callable """ hook_key = str(service.uuid) + ':' + str(characteristic.uuid) if hook_key in self.__hooks: if operation in self.__hooks[hook_key]: return self.__hooks[hook_key][operation] return None
################################################ # Connection/disconnection hooks ################################################
[docs] def on_connect(self, conn_handle): """Connection hook. This hook is only used to notify the connection of a device. :param conn_handle: Connection handle :type conn_handle: int """
[docs] def on_disconnect(self, conn_handle): """Disconnection hook. This hook is only used to notify the disconnection of a device. :param conn_handle: Connection handle :type conn_handle: int """
################################################ # Characteristic Read/Write/Subscribe hooks ################################################
[docs] def on_characteristic_read(self, service, characteristic, offset=0, length=0): """Characteristic read hook. This hook is called whenever a characteristic is about to be read by a GATT client. If this method returns a byte array, this byte array will be sent back to the GATT client. If this method returns None, then the read operation will return an error (not allowed to read characteristic value). :param service: Service owning the characteristic :type service: :class:`whad.ble.profile.service.Service` :param characteristic: Characteristic object :type characteristic: :class:`whad.ble.profile.characteristic.Characteristic` :param offset: Read offset (default: 0) :type offset: int :param length: Max read length :type length: int :return: Value to return to the GATT client :rtype: bytes """ # Check if we have a hook to call hook = self.find_hook(service, characteristic, 'read') if hook is not None: return hook(offset, length) # If no hook registered, then return the characteristic value return characteristic.value[offset:offset + length]
[docs] def on_characteristic_write(self, service, characteristic, offset=0, value=b'', without_response=False): """Characteristic write hook This hook is called whenever a charactertistic is about to be written by a GATT client. :param service: Service owning the characteristic :type service: :class:`whad.ble.profile.service.Service` :param characteristic: Characteristic object :type characteristic: :class:`whad.ble.profile.characteristic.Characteristic` :param offset: Read offset (default: 0) :type offset: int :param value: Value about to be written into the characteristic :type value: bytes :param without_response: Set to ``True`` if no response is required :type without_response: bool """ hook = self.find_hook(service, characteristic, 'write') if hook is not None: return hook( offset, value, without_response=without_response ) # No action return None
[docs] def on_characteristic_written(self, service, characteristic, offset=0, value=b'', without_response=False): """Characteristic written hook This hook is called whenever a charactertistic has been written by a GATT client. :param service: Service owning the characteristic :type service: :class:`whad.ble.profile.service.Service` :param characteristic: Characteristic object :type characteristic: :class:`whad.ble.profile.characteristic.Characteristic` :param offset: Read offset (default: 0) :type offset: int :param value: Value about to be written into the characteristic :type value: bytes :param without_response: Set to ``True`` if no response is required :type without_response: bool """ # Check if we have a hook to call hook = self.find_hook(service, characteristic, 'written') if hook is not None: # Call our hook return hook( offset, value, without_response=without_response ) # No action return None
[docs] def on_characteristic_subscribed(self, service, characteristic, notification=False, indication=False): """Characteristic subscribed hook This hook is called whenever a characteristic has been subscribed to. :param service: Service owning the characteristic :type service: :class:`whad.ble.profile.service.Service` :param characteristic: Characteristic object :type characteristic: :class:`whad.ble.profile.characteristic.Characteristic` :param notification: Set to ``True`` if subscribed to notification :type notification: bool :param indication: Set to ``True`` if subscribed to notification :type indication: bool """ # Check if we have a hook to call hook = self.find_hook(service, characteristic, 'sub') if hook is not None: # Call our hook return hook( notification=notification, indication=indication ) # No action return None
[docs] def on_characteristic_unsubscribed(self, service, characteristic): """Characteristic unsubscribed hook This hook is called whenever a characteristic has been unsubscribed. :param service: Service owning the characteristic :type service: :class:`whad.ble.profile.service.Service` :param characteristic: Characteristic object :type characteristic: :class:`whad.ble.profile.characteristic.Characteristic` """ # Check if we have a hook to call hook = self.find_hook(service, characteristic, 'unsub') if hook is not None: # Call our hook return hook() # No action return None
[docs] def on_notification(self, service, characteristic, value): """Characteristic notification hook. This hook is called when a notification is sent to a characteristic. :param service: Service owning the characteristic :type service: :class:`whad.ble.profile.service.Service` :param characteristic: Characteristic object :type characteristic: :class:`whad.ble.profile.characteristic.Characteristic` :param value: Characteristic value :type value: bytes """
[docs] def on_indication(self, service, characteristic, value): """Characteristic indication hook. This hook is called when a indication is sent to a characteristic. :param service: Service owning the characteristic :type service: :class:`whad.ble.profile.service.Service` :param characteristic: Characteristic object :type characteristic: :class:`whad.ble.profile.characteristic.Characteristic` :param value: Characteristic value :type value: bytes """