Source code for whad.device.device

"""
WHAD provides various classes to interact with WHAD-enabled hardware:

- :py:class:`whad.device.device.Device`
- :py:class:`whad.device.device.VirtualDevice`

Class :class:`whad.device.device.Device` is the default class that allows WHAD devices
enumeration and access. It is the main class to use to open any device, through
its :method:`whad.device.Device.create` method as shown below:

.. code-block:: python

    from whad.device import Device

    dev = Device.create("uart0")

The :py:class:`whad.device.device.VirtualDevice` shall not be directly used. This class
is used to add support for incompatible WHAD devices like the *Ubertooth*
or the *ApiMote* and acts as an adaptation layer between the underlying WHAD
protocol and the specific protocol used by the target hardware.

.. important::

    The :py:class:`whad.device.device.WhadDevice` class that is still defined in WHAD (and used
    in some old example scripts or documentation) is an alias for the new
    :py:class:`whad.device.Device` class, and is meant to be deprecated in the future.
    This old class has been renamed to ``Device`` for clarity, and the same happened
    with the old default connector class :py:class:`whad.device.connector.WhadConnector`
    that has been renamed to :py:class:`whad.device.connector.Connector`.

    These old classes will be marked as *deprecated* in a future release, with a
    specific EOL date announced. A warning message will be issued in case one of
    these classes is used in a script or a tool to give time to users to migrate
    to the new ones (renaming classes is enough to switch to the new implementation,
    APIs stay the same).

Default device classes
----------------------

.. autoclass:: whad.device.device.Device
    :members:

.. autoclass:: whad.device.device.VirtualDevice
    :show-inheritance:
    :members:

Old device classes to be deprecated in the future
-------------------------------------------------

.. autoclass:: whad.device.device.WhadDevice
    :show-inheritance:

.. autoclass:: whad.device.device.WhadVirtualDevice
    :show-inheritance:

"""
import re
import logging
import contextlib
from time import time
from typing import Generator, Callable, Union, Type, Optional
from threading import Thread, Lock
from queue import Queue, Empty

from whad.helpers import message_filter
from whad.exceptions import (
    WhadDeviceNotReady, WhadDeviceError, WhadDeviceTimeout, WhadDeviceDisconnected,
    WhadDeviceNotFound,
)
from whad.hub import ProtocolHub
from whad.hub.message import HubMessage
from whad.hub.generic.cmdresult import CommandResult
from whad.hub.discovery import InfoQueryResp, DomainInfoQueryResp, DeviceReady
from whad.hub.discovery import DeviceType

from .info import DeviceInfo

logger = logging.getLogger(__name__)

class DeviceEvt:
    """Device event.
    """
    def __init__(self, device: Optional['Device'] = None):
        self.__device = device

    @property
    def device(self) -> Optional['Device']:
        """Related device"""
        return self.__device

    def __repr__(self) -> str:
        """Printable representation of this event
        """
        iface = self.device.interface if self.device is not None else 'undefined'
        return f"DeviceEvt(iface='{iface}')"

class Disconnected(DeviceEvt):
    """Interface has disconnected.
    """
    def __repr__(self) -> str:
        """Printable representation of this event.
        """
        iface = self.device.interface if self.device is not None else 'undefined'
        return f"Disconnected(device='{iface}')"

class MessageReceived(DeviceEvt):
    """Message has been received.
    """
    def __init__(self, device = None, message = None):
        super().__init__(device)
        self.__message = message

    @property
    def message(self) -> Optional[HubMessage]:
        """Received message."""
        return self.__message

    def __repr__(self) -> str:
        """Printable representation of this event.
        """
        iface = self.device.interface if self.device is not None else 'undefined'
        return f"MessageReceived(device='{iface}')"

class DevInThread(Thread):
    """Internal thread processing data sent by the connector to the
    hardware interface.
    """

    def __init__(self, device = None):
        super().__init__(daemon=True)
        self.__iface = device
        self.__canceled = False

    def cancel(self):
        """Cancel thread
        """
        self.__canceled = True

    def serialize(self, message) -> bytes:
        """Serialize a WHAD message.
        """
        # Serialize protobuf message
        raw_msg = message.serialize()

        # Define header
        header = [
            0xAC, 0xBE,
            len(raw_msg) & 0xff,
            (len(raw_msg) >> 8) & 0xff
        ]

        # Build the final payload
        return bytes(header) + raw_msg

    def run(self):
        """Out thread main task.
        """
        while not self.__canceled:
            # Read data from device (may block)
            try:
                # Wait for a message to send to interface (blocking)
                logger.debug("[%s][in_thread] waiting for message to send", self.__iface.interface)
                with self.__iface.get_pending_message(timeout=1.0) as message:
                    logger.debug("[%s][in_thread] sending message %s",
                                 self.__iface.interface, message)

                    # Serialize message and send it.
                    payload = self.serialize(message)

                    # Send serialized message to interface
                    logger.debug("[%s][in_thread] acquiring lock on interface ...",
                                 self.__iface.interface)

                    self.__iface.lock()

                    logger.debug("[%s][in_thread] sending payload %s ...",
                                 self.__iface.interface, payload)
                    self.__iface.write(payload)
                    logger.debug("[%s][in_thread] releasing lock ...", self.__iface.interface)
                    self.__iface.unlock()

                    # Notify message has correctly been sent, from a dedicated
                    # thread.
                    if message.has_callback():
                        Thread(target=message.sent).start()
            except Empty:
                pass
            except WhadDeviceNotReady:
                if message.has_callback():
                    Thread(target=message.error, args=[1]).start()
                break
            except WhadDeviceDisconnected:
                if message.has_callback():
                    Thread(target=message.error, args=[2]).start()
                break

class DevOutThread(Thread):
    """Internal thread processing data sent by the hardware interface
    to the device object.
    """
    def __init__(self, device = None):
        super().__init__(daemon=True)
        self.__iface = device
        self.__canceled = False


        # Data processing
        self.__data = bytearray()

    def cancel(self):
        """Cancel thread
        """
        self.__canceled = True

    def ingest(self, data: bytes):
        """Ingest incoming bytes.
        """
        self.__data.extend(data)
        while len(self.__data) > 2:
            # Is the magic correct ?
            if self.__data[0] == 0xAC and self.__data[1] == 0xBE:
                # Have we received a complete message ?
                if len(self.__data) > 4:
                    msg_size = self.__data[2] | (self.__data[3] << 8)
                    if len(self.__data) >= (msg_size+4):
                        raw_message = self.__data[4:4+msg_size]

                        # Parse received message with our Protocol Hub
                        msg = self.__iface.hub.parse(bytes(raw_message))

                        # Forward message if successfully parsed
                        if msg is not None:
                            self.__iface.put_message(msg)

                        # Chomp
                        self.__data = self.__data[msg_size + 4:]
                    else:
                        break
                else:
                    break
            else:
                # Nope, that's not a header
                while len(self.__data) >= 2:
                    if (self.__data[0] != 0xAC) or (self.__data[1] != 0xBE):
                        self.__data = self.__data[1:]
                    else:
                        break

    def run(self):
        """Out thread main task.
        """
        while not self.__canceled:
            # Read data from device (may block)
            try:
                data = self.__iface.read()
                if data is not None:
                    self.ingest(data)
            except WhadDeviceNotReady:
                break
            except WhadDeviceDisconnected:
                # Device has disconnected, notify interface by injecting a
                # `Disconnected` event message in its output message queue.
                # This event message will be filtered out by the Interface class
                # when called from our connector thread, and it will exit
                # gracefully
                logger.debug("[iface][%s] Device disconnected, sending event to connector.",
                             self.__iface.interface)
                self.__iface.put_message(Disconnected(self.__iface))
                return



[docs] class Device: """WHAD hardware interface """ # Should be lowercase. INTERFACE_NAME = None
[docs] @staticmethod def get_all_ifaces(): """Load all known interfaces.""" # Base transport from .uart import Uart from .hci import Hci from .tcp import TcpSocket from .unix import UnixSocket # Virtual transport from .apimote import Apimote from .pcap import Pcap from .rfstorm import RfStorm from .rzusbstick import RzUsbStick from .ubertooth import Ubertooth from .yard import YardStickOne return [ Uart, Hci, TcpSocket, UnixSocket, Pcap, Apimote, RfStorm, RzUsbStick, Ubertooth, YardStickOne ]
@classmethod def _get_sub_classes(cls): """ Helper allowing to get every subclass of Device. """ # List every available device class device_classes = set() for device_class in cls.__subclasses__(): if device_class.__name__ in ("WhadVirtualDevice", "VirtualDevice"): for virtual_device_class in device_class.__subclasses__(): device_classes.add(virtual_device_class) else: device_classes.add(device_class) return device_classes
[docs] @classmethod def create_inst(cls, interface_string) -> Type['Device']: """ Helper allowing to get a device according to the interface string provided. To make it work, every device class must implement: - a class attribute INTERFACE_NAME, matching the interface name - a class method list, returning the available devices - a property identifier, allowing to identify the device in a unique way This method should NOT be used outside of this class. Use Device.create instead. """ if cls.INTERFACE_NAME is None: raise WhadDeviceNotFound() logger.debug("Creating an instance of %s", interface_string) # Parses interface string, raise execption if format is incorrect pattern = r'^' + cls.INTERFACE_NAME + r'(:(?P<identifier>.*))?(?P<index>\d+)?$' result = re.match(pattern, interface_string) if result is None: raise WhadDeviceNotFound() # Extract interface identifier, if specified iface = result.groupdict() if iface['index'] is not None: iface_id = int(iface['index']) elif iface['identifier'] is not None: iface_id = iface['identifier'] else: iface_id = None # Retrieve the list of available devices # and build a lookup dict interfaces = {} ifaces = cls.list() if isinstance(ifaces, list): for index, dev in enumerate(ifaces): interfaces[index] = dev interfaces[dev.identifier] = dev elif isinstance(ifaces, dict): for dev_id, dev in ifaces.items(): interfaces[dev_id] = dev interfaces[dev.identifier] = dev else: interfaces = None # Some child classes may return None, in this case we need to use the # `check_interface()` to find the specified interface. if interfaces is None and iface_id is not None and cls.check_interface(iface_id): # Found, return an instance of this interface return cls(iface_id) if interfaces is not None and iface_id in interfaces: return interfaces[iface_id] if interfaces is not None and iface_id is None and 0 in interfaces and interfaces[0] is not None: return interfaces[0] # Return interface or raise an exception raise WhadDeviceNotFound()
[docs] @classmethod def create(cls, interface_string): ''' Create a specific device according to the provided interface string, formed as follows: <device_type>[device_index][:device_identifier] Examples: - `uart` or `uart0`: defines the first compatible UART device available - `uart1`: defines the second compatible UART device available - `uart:/dev/ttyACMO`: defines a compatible UART device identified by `/dev/tty/ACMO` - `ubertooth` or `ubertooth0`: defines the first available Ubertooth device - `ubertooth:11223344556677881122334455667788`: defines a Ubertooth device with serial number *11223344556677881122334455667788* ''' # First try to find the corresponding device by calling find() device = Device.find(interface_string) if device is not None: return device # If not found, loop over all known interfaces device_classes = cls.get_all_ifaces() device = None for device_class in device_classes: logger.debug("trying class %s", device_class) try: device = device_class.create_inst(interface_string) return device except WhadDeviceNotFound: continue raise WhadDeviceNotFound
[docs] @staticmethod def find(interface: str) -> Type['Device']: """Find a device based on its interface string with lazy loading of available interface implementations. :param interface: Interface name following WHAD's standard interface pattern :type interface: str :return: Found interface object :rtype: Device :raise: WhadDeviceNotFound """ # Based on selected transport, load the corresponding interface class. dev_info = re.match('^([^0-9:]+)([0-9]|:).*$', interface) if dev_info is not None: transport = dev_info.group(1).lower() # Basic interfaces using native transport if transport == 'uart': from .uart import Uart return Uart.create_inst(interface) elif transport == 'hci': from .hci import Hci return Hci.create_inst(interface) elif transport == 'tcp': from .tcp import TcpSocket return TcpSocket.create_inst(interface) elif transport == 'unix': from .unix import UnixSocket return UnixSocket.create_inst(interface) # Virtual interfaces using emulated transport elif transport == 'apimote': from .apimote import Apimote return Apimote.create_inst(interface) elif transport == 'pcap': from .pcap import Pcap return Pcap.create_inst(interface) elif transport == 'rfstorm': from .rfstorm import RfStorm return RfStorm.create_inst(interface) elif transport == 'rzusbstick': from .rzusbstick import RzUsbStick return RzUsbStick.create_inst(interface) elif transport == 'ubertooth': from .ubertooth import Ubertooth return Ubertooth.create_inst(interface) elif transport == 'yardstickone': from .yard import YardStickOne return YardStickOne.create_inst(interface) # No other known transport raise WhadDeviceNotFound
[docs] @classmethod def list(cls) -> Union[list,dict]: ''' Returns every available compatible devices. ''' device_classes = cls.get_all_ifaces() available_devices = [] for device_class in device_classes: # Reset device index before enumerating. device_class.reset_dev_index() # Enumerating device_class_list = device_class.list() if device_class_list is not None: if isinstance(device_class_list, list): for device in device_class_list: available_devices.append(device) elif isinstance(device_class_list, dict): for _, device in device_class_list.items(): available_devices.append(device) return available_devices
[docs] @classmethod def check_interface(cls, interface): ''' Checks dynamically if the device can be instantiated. ''' logger.debug("default: checking interface %s fails.", interface) return False
@property def interface(self): ''' Returns the current interface of the device. ''' # If class has interface name, return the interface alias if hasattr(self.__class__,"INTERFACE_NAME"): if self.__class__.INTERFACE_NAME is not None: return self.__class__.INTERFACE_NAME + str(self.index) # Interface is unknown return "unknown" @property def type(self): ''' Returns the name of the class linked to the current device. ''' return self.__class__.__name__ def __init__(self, index: Optional[int] = None): """Initialize an interface """ # Interface state self.__info = None self.__opened = False self.__discovered = False # Generate device index if not provided if index is None: self.inc_dev_index() self.__index = self.__class__.CURRENT_DEVICE_INDEX elif isinstance(index, int): # Used by HCI devices to force index to match system names self.__index = index else: raise WhadDeviceNotFound() # IO Threads self.__iface_in = None self.__iface_out = None # Queue holding messages from connector, waiting to be sent to # the interface. self.__in_messages = Queue() # Queue holding messages from interface, waiting to be sent to # an attached connector self.__out_messages = Queue() # Connector bound to this device self.__connector = None # Interface lock self.__lock = Lock() # Connector lock self.__msg_filter: Callable[..., bool] = None # Protocol hub self.__hub = ProtocolHub() # Communication timeout self.__timeout = 5.0
[docs] @contextlib.contextmanager def get_pending_message(self, timeout: float = None) -> Generator[HubMessage, None, None]: """Get message waiting to be sent to the interface. """ try: yield self.__in_messages.get(timeout=timeout) except Empty as err: raise err # Mark task done self.__in_messages.task_done()
@property def connector(self): """Connector bound to the interface """ return self.__connector @property def hub(self): """Retrieve the device protocol hub (parser/factory) """ return self.__hub @property def index(self) -> int: """Get the interface index :return: Interface index :rtype: int """ return self.__index @property def device_id(self) -> Optional[str]: """Return device ID """ if self.__info is not None: return self.__info.device_id # No info yet. return None @property def info(self) -> Optional[DeviceInfo]: """Get device info object :return: Device information object :rtype: DeviceInfo """ return self.__info @property def opened(self) -> bool: """Device is open ? """ return self.is_open()
[docs] def is_open(self) -> bool: """Determine if interface is opened. """ return self.__opened
[docs] @classmethod def reset_dev_index(cls) -> None: """Reset the device index of the specified class. :param cls: Device class :type cls: Device """ # Remove 'CURRENT_DEVICE_INDEX' attribute if set. if hasattr(cls, 'CURRENT_DEVICE_INDEX'): delattr(cls, 'CURRENT_DEVICE_INDEX')
[docs] @classmethod def inc_dev_index(cls): """Inject and maintain device index. """ if hasattr(cls, 'CURRENT_DEVICE_INDEX'): cls.CURRENT_DEVICE_INDEX += 1 else: cls.CURRENT_DEVICE_INDEX = 0
[docs] def set_connector(self, connector): """Set interface connector. """ self.__connector = connector
[docs] def lock(self): """Lock interface for read/write operation. """ self.__lock.acquire() logger.debug("Lock acquired !")
[docs] def unlock(self): """Unlock interface for read/write operation. """ logger.debug("Releasing lock ...") self.__lock.release()
def __start_io_threads(self): """Start background IO threads """ self.__iface_in = DevInThread(self) self.__iface_in.start() self.__iface_out= DevOutThread(self) self.__iface_out.start() def __stop_io_threads(self): """Stop background IO threads """ if self.__iface_in is not None: self.__iface_in.cancel() if self.__iface_out is not None: self.__iface_out.cancel() ## # Device specific methods ##
[docs] def open(self): """Handle device open """ # Create interface I/O threads and start them. self.__start_io_threads() # Ask interface for a reset try: logger.info("resetting interface (if possible)") self.__opened = True self.reset() except Empty as err: # Device is unresponsive, shutdown IO threads self.__stop_io_threads() raise WhadDeviceNotReady() from err
[docs] def read(self) -> bytes: """Read bytes from interface (blocking). """ return b''
[docs] def write(self, payload: bytes) -> int: """Write payload to interface. """ return len(payload)
[docs] def close(self): """Close device """ logger.info("closing WHAD interface") # Cancel I/O thread if required self.__stop_io_threads() self.__opened = False # Notify connector that device has closed if self.__connector is not None: logger.debug("Send disconnection event to connector %s", self.__connector) self.__connector.send_event(Disconnected(self)) logger.debug("Disconnection event sent !")
[docs] def change_transport_speed(self, speed): """Set device transport speed. Optional. """
## # Message processing ##
[docs] def busy(self) -> bool: """Check if the interface is busy. We consider an interface as busy if there is at least one message in its output messages or in its input messages. """ return not (self.__out_messages.empty() and self.__in_messages.empty())
[docs] def set_queue_filter(self, keep: Callable[..., bool] = None): """Set message queue filter. """ self.__msg_filter = keep
[docs] def put_message(self, message: Union[HubMessage, DeviceEvt]): """Process incoming message. """ # If no connector is attached to the interface, redirect to a dedicated # message queue. Same if the message is an interface event (this type of # messages MUST be handled by the interface itself). logger.debug("[%s] putting message %s", self.interface, message) if self.__connector is None: logger.debug("[%s] connector is None, sending to pending messages", self.interface) self.__out_messages.put(message) # If a connector is attached to the interface but a message filter # is set, redirect matching messages to a dedicated message queue # and notify the connector about the other messages, except if we # receive critical events from interface. elif isinstance(message, DeviceEvt): logger.debug("Sending event %s to connector %s", message, self.connector) self.connector.send_event(message) elif isinstance(message, HubMessage): # If a filter is set and message does not match, save it in our # pending messages queue. if self.__msg_filter is not None and self.__msg_filter(message): self.__out_messages.put(message) else: logger.debug("[%s] forwarding message to connector %s", self.interface, self.connector) # Otherwise, wrap hub message into a `MessageReceived` event self.connector.send_event(MessageReceived(self, message)) else: # Unknown message type, log it. logger.debug("[%s] put_message() called with an invalid parameter of type %s", self.interface, type(message))
[docs] def wait_for_single_message(self, timeout: float = None , keep: Callable[..., bool] = None): """Configures the device message queue filter to automatically move messages that matches the filter into the queue, and then waits for the first message that matches this filter and returns it. """ unexpected_messages = [] if keep is not None: self.set_queue_filter(keep) # Wait for a matching message to be caught (blocking) msg = self.__out_messages.get(block=True, timeout=timeout) # If message filter is set and message does not match, wait until an # expected message matches if keep is not None: # Wait for a matching message while not keep(msg): unexpected_messages.append(msg) msg = self.__out_messages.get(block=True, timeout=timeout) # Re-enqueue non-matching messages for m in unexpected_messages: self.__out_messages.put(m) return msg
[docs] def wait_for_message(self, timeout: float = None, keep: Callable[..., bool] = None, command: bool = False): """ Configures the device message queue filter to automatically move messages that matches the filter into the queue, and then waits for the first message that matches this filter and process it. This method is blocking until a matching message is received. :param int timeout: Timeout :param filter: Message queue filtering function (optional) """ # Raise a WhadDeviceDisconnected exception when the interface is still # considered opened but no messages to read in its output message queue. # # (This specific condition is met when a Unix client socket has closed # following a connection to a server, after the server sent a set of # messages that are still to process by the client). if not self.opened and self.__out_messages.empty(): logger.debug("[%s] wait_for_message() cannot succeed because device is closed.") raise WhadDeviceDisconnected() if keep is not None: self.set_queue_filter(keep) start_time = time() while True: try: # Wait for a matching message to be caught (blocking) msg = self.__out_messages.get(block=True, timeout=timeout) # If we receive a disconnection event, raise an exception if isinstance(msg, Disconnected): raise WhadDeviceDisconnected() # If message does not match, re-enqueue for late processing if not self.__msg_filter(msg): self.put_message(msg) else: # If it does match, return it return msg except Empty as err: # Queue is empty, wait for a message to show up. if timeout is not None and (time() - start_time > timeout): if command: raise WhadDeviceTimeout("WHAD device did not answer to a command") from err logger.debug("exiting wait_for_message (timeout: %s)...", timeout) return None
[docs] def send_message(self, message: HubMessage, keep: Callable[..., bool] = None): """ Serializes a message and sends it to the interface, without waiting for an answer. Optionally, you can update the message queue filter if you need to wait for specific messages after the message is sent. :param Message message: Message to send :param keep: Message queue filter function """ if not self.opened and self.__out_messages.empty(): logger.debug("[%s] Cannot send message: device closed.", self.interface) raise WhadDeviceDisconnected() # Set message queue filter if keep is not None: self.set_queue_filter(keep) # Enqueue message to transmit to the interface self.__in_messages.put(message)
[docs] def send_command(self, command: HubMessage, keep: Callable[..., bool] = None): """ Sends a command and awaits a specific response from the device. WHAD commands usualy expect a CmdResult message, if `keep` is not provided then this method will by default wait for a CmdResult. :param Message command: Command message to send to the device :param keep: Message queue filter function (optional) :returns: Response message from the device :rtype: Message """ # If a queue filter is not provided, expect a default CmdResult try: if keep is None: self.send_message(command, message_filter(CommandResult)) else: self.send_message(command, keep) except WhadDeviceError as error: # Device error has been triggered, it looks like our device is in # an unspecified state, notify user. logger.debug("WHAD device in error while sending message: %s", error) raise error try: # Retrieve the first message that matches our filter result = self.wait_for_message(self.__timeout, command=True) except WhadDeviceTimeout as timedout: # Forward exception raise timedout # Log message logger.debug("Command result: %s", result) # Return command result return result
## # Interface management ## ###################################### # Generic discovery ######################################
[docs] def on_discovery_msg(self, message): """ Method called when a discovery message is received. If a connector has been associated with the device, forward this message to this connector. """ # Forward everything to the connector, if any if self.__connector is not None: self.__connector.on_discovery_msg(message)
[docs] def has_domain(self, domain) -> bool: """Checks if device supports a specific domain. :param Domain domain: Domain :returns: True if domain is supported, False otherwise. :rtype: bool """ if self.__info is not None: return self.__info.has_domain(domain) # No info available on device, domain is not supported by default return False
[docs] def get_domains(self): """Get device' supported domains. :returns: list of supported domains :rtype: list """ if self.__info is not None: return self.__info.domains # No domain discovered yet return []
[docs] def get_domain_capability(self, domain) -> int: """Get a device domain capabilities. :param Domain domain: Target domain :returns: Domain capabilities :rtype: DeviceDomainInfoResp """ if self.__info is not None: cap = self.__info.get_domain_capabilities(domain) if cap is not None: return cap # No capability if not discovered return 0
[docs] def get_domain_commands(self, domain): """Get a device supported domain commands. :param Domain domain: Target domain :returns: Bitmask of supported commands :rtype: int """ if self.__info is not None: return self.__info.get_domain_commands(domain) # No supported commands by default return 0
[docs] def send_discover_info_query(self, proto_version=0x0100): """ Sends a DeviceInfoQuery message and awaits for a DeviceInfoResp answer. """ logger.info("preparing a DeviceInfoQuery message") msg = self.__hub.discovery.create_info_query(proto_version) return self.send_command( msg, message_filter(InfoQueryResp) )
[docs] def send_discover_domain_query(self, domain): """ Sends a DeviceDomainQuery message and awaits for a DeviceDomainResp answer. """ logger.info("preparing a DeviceDomainInfoQuery message") msg = self.__hub.discovery.create_domain_query(domain) return self.send_command( msg, message_filter(DomainInfoQueryResp) )
[docs] def discover(self): """ Performs device discovery (synchronously). Discovery process asks the device to provide its description, including its supported domains and associated capabilities. For each domain we then query the device and get the list of supported commands. """ if not self.__discovered: # We send a DeviceInfoQuery message to the device and expect a # DeviceInfoResponse in return. resp = self.send_discover_info_query() # If we have an answer, process it. if resp is not None: # Ensure response is the one we expect assert isinstance(resp, InfoQueryResp) # Save device information self.__info = DeviceInfo( resp ) # Parse DeviceInfoResponse #device_info = self.hub.parse(resp) # Update our ProtocolHub version to the device version self.__hub = ProtocolHub(resp.proto_min_ver) # Query device domains logger.info("query supported commands per domain") for domain in self.__info.domains: resp = self.send_discover_domain_query(domain) self.__info.add_supported_commands( resp.domain, resp.supported_commands ) # Mark device as discovered logger.info("device discovery done") self.__discovered = True # Switch to max transport speed logger.info("set transport speed to %d", self.info.max_speed) self.change_transport_speed( self.info.max_speed ) else: logger.error("device is not ready !") raise WhadDeviceNotReady()
[docs] def reset(self): """Reset device """ logger.info("preparing a DeviceResetQuery message") msg = self.__hub.discovery.create_reset_query() return self.send_command( msg, message_filter(DeviceReady) )
[docs] class VirtualDevice(Device): """ Virtual interface implementation. This variant of the base Interface class provides a way to emulate an interface compatible with WHAD. This emulated compatible interface is used as an adaptation layer between WHAD's core and third-party hardware that does not run a WHAD-enabled firmware. """ def __init__(self, index: Optional[int] = None): self._dev_type = None self._dev_id = None self._fw_author = None self._fw_url = None self._fw_version = (0, 0, 0) self._dev_capabilities = {} self.__lock = Lock() super().__init__(index)
[docs] def send_message(self, message, keep=None): """Send message to host. """ logger.debug("[virtual_iface][%s] send_message(%s)", self.interface, message) with self.__lock: super().set_queue_filter(keep) self._on_whad_message(message)
def _on_whad_message(self, message): """TODO: associate callbacks with classes ? """ logger.debug("on_whad_message: %s", message) category = message.message_type message_type = message.message_name callback_name = f"_on_whad_{category}_{message_type}" if hasattr(self, callback_name) and callable(getattr(self, callback_name)): getattr(self, callback_name)(message) else: logger.info("unhandled message: %s", message) self._send_whad_command_result(CommandResult.ERROR) def _on_whad_discovery_info_query(self, _): major, minor, revision = self._fw_version msg = self.hub.discovery.create_info_resp( DeviceType.VirtualDevice, self._dev_id, 0x0100, 0, self._fw_author, self._fw_url, major, minor, revision, [dom | (cap[0] & 0xFFFFFF) for dom, cap in self._dev_capabilities.items()] ) self._send_whad_message(msg) def _on_whad_discovery_domain_query(self, message): # Compute supported commands for domain commands = 0 supported_commands = self._dev_capabilities[message.domain][1] for command in supported_commands: commands |= (1 << command) # Create a DomainResp message and send it msg = self.hub.discovery.create_domain_resp( message.domain, commands ) self._send_whad_message(msg) def _send_whad_message(self, message): self.put_message(message) def _send_whad_command_result(self, code): msg = self.hub.generic.create_command_result(code) self._send_whad_message(msg)
[docs] class WhadDevice(Device): """ This class is an alias for :py:class:`whad.device.device.Device`, and will be deprecated in a near future. This class has been introduced in a previous version of WHAD and has been renamed for clarity purpose. """ @classmethod def create_inst(cls, interface_string): """Create an instance of interface from its name, using Device.""" return Device.create_inst(interface_string) @classmethod def create(cls, interface_string): """Create an instance of interface from its name, using Device.""" return Device.create(interface_string) @classmethod def check_interface(cls, interface): """Check if Device supports the requested interface.""" return Device.check_interface(interface)
[docs] class WhadVirtualDevice(VirtualDevice): """ This class is an alias for :py:class:`whad.device.device.VirtualDevice`, and will be deprecated in a near future. This class has been introduced in a previous version of WHAD and has been renamed for clarity purpose. """ @classmethod def create_inst(cls, interface_string): """Create an instance of interface from its name, using VirtualDevice.""" return VirtualDevice.create_inst(interface_string) @classmethod def create(cls, interface_string): """Create an instance of interface from its name, using VirtualDevice.""" return VirtualDevice.create(interface_string) @classmethod def check_interface(cls, interface): """Check if VirtualDevice supports the requested interface.""" return VirtualDevice.check_interface(interface)