"""
Bluetooth Low Energy Peripheral connector
=========================================
WHAD provides a specific connector to create a BLE device, :class:`Peripheral`.
This connector implements a GATT server and hosts a GATT profile, allowing remote
BLE devices to connect to it and query its services, characteristics, and descriptors.
"""
import logging
from time import sleep
from queue import Queue, Empty
from threading import Thread
from whad.ble.connector.base import BLE
from whad.hub.ble.bdaddr import BDAddress
from whad.ble.stack import BleStack, Layer
from whad.ble.stack.gatt import GattServer, GattClientServer
from whad.ble.stack.att import ATTLayer
from whad.ble.stack.smp import CryptographicDatabase, Pairing
from whad.ble.profile import GenericProfile
from whad.ble.profile.device import PeripheralDevice
from whad.ble.profile.advdata import AdvDataFieldList, AdvFlagsField
from whad.hub.ble import Direction as BleDirection
from whad.exceptions import UnsupportedCapability
# Logging
logger = logging.getLogger(__name__)
[docs]
class PeripheralEventDisconnected:
"""Peripheral disconnected event.
"""
def __init__(self, conn_handle: int):
"""Initialize this event
:param conn_handle: Connection handle
:type conn_handle: int
"""
self.__conn_handle = conn_handle
@property
def conn_handle(self) -> int:
"""Connection handle
:rtype: int
"""
return self.__conn_handle
[docs]
class PeripheralEventConnected:
"""Connected event
"""
def __init__(self, conn_handle: int, local: BDAddress, remote: BDAddress):
"""Initialize event
:param conn_handle: Connection handle
:type conn_handle: int
:param local: Peripheral BD address
:type local: :py:class:`whad.ble.BDAddress`
:param remote: Central BD address
:type remote: :py:class:`whad.ble.BDAddress`
"""
self.__conn_handle = conn_handle
self.__local = local
self.__remote = remote
@property
def conn_handle(self) -> int:
"""Connection handle
:rtype: int
"""
return self.__conn_handle
@property
def local(self) -> BDAddress:
"""Peripheral BD address
:rtype: :py:class:`whad.ble.BDAddress`
"""
return self.__local
@property
def remote(self) -> BDAddress:
"""Central BD address
:rtype: :py:class:`whad.ble.BDAddress`
"""
return self.__remote
class PeripheralEventListener(Thread):
"""Peripheral event listener.
"""
def __init__(self, callback=None):
"""Initialize event listener
"""
super().__init__()
self.__queue = Queue()
self.__running = True
self.__callback = callback
# This thread is a daemon (must be terminated when main thread ends).
self.daemon = True
@property
def queue(self):
return self.__queue
def notify(self, event):
"""Add event to notify
"""
self.__queue.put(event)
def stop(self):
"""Stop listener
"""
self.__running = False
def run(self):
while self.__running:
try:
event = self.__queue.get(block=True, timeout=1.0)
if event is not None:
if self.__callback is not None:
self.__callback(event)
except Empty:
pass
[docs]
class Peripheral(BLE):
"""This BLE connector provides a way to create a peripheral device.
A peripheral device exposes some services and characteristics that may
be accessed by a central device. These services and characteristics are
defined by a specific profile.
"""
def __init__(self, device, existing_connection = None, profile=None, adv_data=None,
scan_data=None, bd_address=None, public=True, stack=BleStack, gatt=GattServer,
pairing=Pairing(), security_database=None):
"""Create a peripheral device.
:param device: WHAD device to use as a peripheral
:type device: :class:`whad.device.WhadDevice`
:param profile: Device profile to use
:type profile: :class:`whad.ble.profile.GenericProfile`
:param adv_data: Advertisement data
:type adv_data: :class:`whad.ble.profile.advdata.AdvDataFieldList`
:param scan_data: Advertisement data sent in Scan Response
:type scan_data: :class:`whad.ble.profile.advdata.AdvDataFieldList`
:param bd_address: Bluetooth Device address to use
:type bd_address: str
:param public: Set to True to use a public Bluetooth Device address,
False to use a random one
:type public: bool
:param stack: Bluetooth Low Energy stack to use,
:class:`whad.ble.stack.BleStack` by default
:param gatt: Bluetooth Low Energy GATT
"""
super().__init__(device)
# Initialize local peer and remote per info
self.__local_peer = None
self.__remote_peer = None
self.__gatt_server = None
self.__peripheral = None
self.__central = None
# Initialize stack
self.__configure_stack(stack, gatt)
self.connection = None
self.__connected = False
self.__conn_handle = None
# Initialize event listener
self.__evt_listener = None
# Initialize profile
if profile is None:
logger.info("No profile provided to this Peripheral instance, use a default one.")
self.__profile = GenericProfile()
else:
logger.info("Peripheral will use the provided profile.")
self.__profile = profile
# Initialize security database
if security_database is None:
logger.info((
"No security database provided to this Peripheral instance, "
"use a default one."))
self.__security_database = CryptographicDatabase()
else:
logger.info("Peripheral will use the provided security database.")
self.__security_database = security_database
# Initiate pairing parameters
self.__pairing_parameters = pairing
# Check if device accepts peripheral mode
if not self.can_be_peripheral():
logger.info("Capability MasterRole not supported by this WHAD device")
raise UnsupportedCapability("Peripheral")
# Set bd address if provided
if bd_address is not None:
logger.info("Set BD address to %s", bd_address)
self.set_bd_address(bd_address, public=public)
# If an existing connection is hijacked, simulate a connection
if existing_connection is not None:
self.on_connected(existing_connection)
else:
# If no advertising data has been set, initialize this peripheral
# with default flags.
if adv_data is None:
adv_data = AdvDataFieldList(AdvFlagsField())
# Enable peripheral mode
logger.info("Enable peripheral mode with advertising data: %s", adv_data)
self.enable_peripheral_mode(adv_data, scan_data)
@property
def listener(self):
"""Attached event listener
"""
return self.__evt_listener
@property
def local_peer(self):
"""Local peer object
"""
return self.__local_peer
@property
def remote_peer(self):
"""Remote peer object
"""
return self.__remote_peer
@property
def conn_handle(self) -> int:
"""Connection handle
"""
return self.__conn_handle
def __configure_stack(self, phy_layer=None, gatt_layer=None):
"""
"""
# Save GATT and PHY layers
if gatt_layer is not None:
self.__gatt_layer = gatt_layer
if phy_layer is not None:
self.__phy_layer = phy_layer
# Configure BLE stack to use our PHY class
self.__stack = phy_layer(self)
# Configure ATT layer to use our GATT class
if self.__gatt_layer is not None:
if issubclass(self.__gatt_layer, Layer) and self.__gatt_layer.alias == 'gatt':
ATTLayer.add(self.__gatt_layer)
[docs]
def attach_event_listener(self, listener: PeripheralEventListener):
"""Attach an event queue to receive asynchronous notifications.
:param listener: Event listener
:type listener: PeripheralEventListener
"""
# Save event queue
self.__evt_listener = listener
[docs]
def notify_event(self, event):
"""Notify event
"""
if self.__evt_listener is not None:
self.__evt_listener.notify(event)
[docs]
def get_pairing_parameters(self):
"""Returns the provided pairing parameters, if any.
"""
return self.__pairing_parameters
[docs]
def send_data_pdu(self, data, conn_handle=1, direction=BleDirection.SLAVE_TO_MASTER,
access_address=0x8e89bed6, encrypt=None) -> bool:
"""Send a PDU to the central device this peripheral device is connected to.
Sending direction is set to ̀ BleDirection.SLAVE_TO_MASTER` as we need
to send PDUs to a central device.
:param pdu: PDU to send
:type pdu: :class:`scapy.layers.bluetooth4LE.BTLE`
:param conn_handle: Connection handle
:type conn_handle: int
:param direction: Sending direction (to master or slave)
:type direction: :class:`whad.protocol.ble_pb2.BleDirection`, optional
:param access_address: Target access address
:type access_address: int, optional
:return: PDU transmission result.
:rtype: bool
"""
return super().send_data_pdu(data, conn_handle=conn_handle, direction=direction,
access_address=access_address, encrypt=encrypt)
[docs]
def send_ctrl_pdu(self, pdu, conn_handle=1, direction=BleDirection.SLAVE_TO_MASTER,
access_address=0x8e89bed6, encrypt=None) -> bool:
"""Send a PDU to the central device this peripheral device is connected to.
Sending direction is set to ̀ BleDirection.SLAVE_TO_MASTER` as we need
to send PDUs to a central device.
:param pdu: PDU to send
:type pdu: :class:`scapy.layers.bluetooth4LE.BTLE`
:param conn_handle: Connection handle
:type conn_handle: int
:param direction: Sending direction (to master or slave)
:type direction: :class:`whad.protocol.ble_pb2.BleDirection`, optional
:param access_address: Target access address
:type access_address: int, optional
:return: PDU transmission result.
:rtype: bool
"""
return super().send_ctrl_pdu(pdu, conn_handle=conn_handle, direction=direction,
access_address=access_address, encrypt=encrypt)
[docs]
def set_profile(self, profile: GenericProfile):
"""Set peripheral profile.
"""
self.__profile = profile
[docs]
def use_stack(self, clazz=BleStack):
"""Specify a stack class to use for BLE. By default, our own stack
(BleStack) is used.
:param clazz: BLE stack to use.
:type clazz: :class:`whad.ble.stack.BleStack`
"""
self.__stack = clazz(self)
@property
def smp(self):
"""Security Manager Protocol
"""
if self.connection is not None:
return self.connection.smp
return None
[docs]
def pairing(self, pairing=None):
"""Trigger a pairing request with the provided parameters, if any.
"""
if self.smp is None:
return False
if pairing is not None:
self.__pairing_parameters = pairing
if not self.smp.request_pairing(self.__pairing_parameters):
return False
while not self.smp.is_pairing_done():
sleep(0.1)
if self.smp.is_pairing_failed():
return False
self.smp.reset_state()
return True
@property
def gatt(self):
"""Generic Attribute layer
"""
if self.connection is not None:
return self.connection.gatt
return None
@property
def security_database(self):
"""Security Database
"""
return self.__security_database
[docs]
def is_connected(self) -> bool:
"""Determine if the peripheral has an active connection from a
GATT client.
"""
return self.__connected
[docs]
def wait_connection(self):
"""Wait for a GATT client to connect to the peripheral. If a connection
is already active, returns immediately.
"""
while not self.is_connected():
sleep(.5)
[docs]
def wait_disconnection(self):
"""Wait for a GATT client to connect to the peripheral. If a connection
is already active, returns immediately.
"""
while self.is_connected():
sleep(.5)
##############################
# Incoming events
##############################
[docs]
def on_connected(self, connection_data):
"""A device has just connected to this peripheral.
:param connection_data: Connection data
:type connection_data: :class:`whad.protocol.ble_pb2.Connected`
"""
# Make sure stack is correctly configured
self.__configure_stack()
# Retrieve the GATT server instance and set its profile
logger.info("a device is now connected (connection handle: %d)",
connection_data.conn_handle)
self.__local_peer = BDAddress.from_bytes(
connection_data.advertiser,
connection_data.adv_addr_type
)
self.__remote_peer = BDAddress.from_bytes(
connection_data.initiator,
connection_data.init_addr_type
)
self.__stack.on_connection(
connection_data.conn_handle,
self.__local_peer,
self.__remote_peer
)
# GATT server is now connected
self.__connected = True
self.__conn_handle = connection_data.conn_handle
# Notify event listener, if any
self.notify_event(PeripheralEventConnected(
connection_data.conn_handle,
self.__local_peer,
self.__remote_peer
))
[docs]
def on_disconnected(self, disconnection_data):
"""A device has just disconnected from this peripheral.
:param connection_data: Connection data
:type connection_data: :class:`whad.protocol.ble_pb2.Disconnected`
"""
logger.info("a device has just connected (connection handle: %d)",
disconnection_data.conn_handle)
self.__stack.on_disconnection(
disconnection_data.conn_handle,
disconnection_data.reason
)
# Notify peripheral device about this disconnection
if self.__profile is not None:
self.__profile.on_disconnect(disconnection_data.conn_handle)
# We are now disconnected
self.__connected = False
# Notify event listener, if any
self.notify_event(PeripheralEventDisconnected(
disconnection_data.conn_handle
))
[docs]
def on_ctl_pdu(self, pdu):
"""This method is called whenever a control PDU is received.
This PDU is then forwarded to the BLE stack to handle it.
Peripheral devices act as a slave, so we only forward master to slave
messages to the stack.
:param pdu: BLE PDU
:type pdu: :class:`scapy.layers.bluetooth4LE.BTLE`
"""
if pdu.metadata.direction == BleDirection.MASTER_TO_SLAVE:
logger.info("Control PDU comes from master, forward to peripheral")
self.__stack.on_ctl_pdu(pdu.metadata.connection_handle, pdu)
[docs]
def on_data_pdu(self, pdu):
"""This method is called whenever a data PDU is received.
This PDU is then forwarded to the BLE stack to handle it.
:param pdu: BLE PDU
:type pdu: :class:`scapy.layers.bluetooth4LE.BTLE_DATA`
"""
if pdu.metadata.direction == BleDirection.MASTER_TO_SLAVE:
logger.info("Data PDU comes from master, forward to peripheral")
self.__stack.on_data_pdu(pdu.metadata.connection_handle, pdu)
[docs]
def on_new_connection(self, connection):
"""On new connection, discover primary services
:param connection: Connection data
:type connection: :class:`whad.protocol.ble_pb2.Connected`
"""
# Use GATT server
self.connection = connection
self.__connected = True
# Retrieve GATT server
self.__gatt_server = connection.gatt
self.__gatt_server.set_server_model(self.__profile)
self.__connected = True
# Configure SMP layer
# we set the security database
self.connection.smp.set_security_database(self.__security_database)
self.connection.smp.pairing_parameters = self.__pairing_parameters
# Check if we got a matching LTK
crypto_material = self.security_database.get(address=self.__local_peer)
if crypto_material is not None and crypto_material.has_ltk():
conn_handle = connection.conn_handle
self.__stack.get_layer("ll").state.register_encryption_key(
conn_handle,
crypto_material.ltk.value
)
if crypto_material.is_authenticated():
#print("Marked as authenticated")
self.__stack.get_layer("ll").state.mark_as_authenticated(connection.conn_handle)
else:
pass#print("Marked as unauthenticated")
# we indicate that we are a responder
self.connection.smp.set_responder_role()
# Notify our profile about this connection
self.__profile.on_connect(self.connection.conn_handle)
[docs]
def set_mtu(self, mtu: int):
"""Set connection MTU.
"""
if self.connection is not None:
# Start a MTU exchange procedure
self.connection.gatt.set_mtu(mtu)
[docs]
def get_mtu(self) -> int:
"""Retrieve the connection MTU.
"""
if self.connection is not None:
return self.connection.l2cap.get_local_mtu()
class PeripheralClient(Peripheral):
'''This BLE connector provides a way to create a peripheral device with
both GATT server and client roles.
'''
def __init__(self, device, existing_connection = None, profile=None, adv_data=None,
scan_data=None, bd_address=None, public=True, stack=BleStack):
super().__init__(
device,
existing_connection=existing_connection,
profile=profile,
adv_data=adv_data,
scan_data=scan_data,
bd_address=bd_address,
public=public,
stack=stack
)
# Change ATTLayer to use GattClientServer and reinstantiate our stack
ATTLayer.add(GattClientServer)
self.use_stack(BleStack)
def on_new_connection(self, connection):
super().on_new_connection(connection)
# Create a new peripheral device to represent the central device
# that has just connected
self.__peripheral = PeripheralDevice(
self,
connection.gatt,
connection.conn_handle
)
# Retrieve GATT client
self.__central = connection.gatt
self.__central.set_client_model(self.__peripheral)
@property
def central_device(self):
"""Central device
"""
return self.__peripheral