Source code for whad.esb.connector.sniffer

"""
This module provides a :class:`Sniffer` class that derives from :class:`ESB`
to allow *Enhanced ShockBurst* frame sniffing. Sniffed frames are wrapped in
a dedicated *Scapy* layer and therefore are represented as *Scapy* packets.

This :class:`Sniffer` class is used by the generic `wsniff` utility when
*Enhanced ShockBurst* protocol is selected, and the associated
:class:`whad.esb.sniffing.SnifferConfiguration` class holds all the sniffing
parameters that can be set.

By default, the *Enhanced ShockBurst* sniffer sniffs packet on all channels by
looping from channel 0 to 100 over and over and capturing frames that match
the expected format.
"""
from time import time
from typing import Generator

from scapy.packet import Packet

from whad.device import WhadDevice
from whad.exceptions import WhadDeviceDisconnected
from whad.esb.connector.base import ESB
from whad.esb.sniffing import SnifferConfiguration
from whad.exceptions import UnsupportedCapability
from whad.helpers import message_filter
from whad.common.sniffing import EventsManager
from whad.hub.esb import PduReceived, RawPduReceived
from whad.hub.message import AbstractPacket

[docs] class Sniffer(ESB, EventsManager): """ Enhanced ShockBurst Sniffer interface for compatible WHAD device. """ def __init__(self, device: WhadDevice): ESB.__init__(self, device) EventsManager.__init__(self) self.__configuration = SnifferConfiguration() # Check if device can perform sniffing if not self.can_sniff(): raise UnsupportedCapability("Sniff") def _enable_sniffing(self): """Configure the underlying ESB connector. This method parses the current configuration and sets the underlying ESB connector properties accordingly before starting sniffing packets. """ if self.__configuration.scanning: channel = None else: channel = self.__configuration.channel ack = self.__configuration.acknowledgements address = self.__configuration.address super().sniff(channel=channel, show_acknowledgements=ack, address=address) self.start() @property def configuration(self) -> SnifferConfiguration: """Current configuration getter. """ return self.__configuration @configuration.setter def configuration(self, new_configuration): """Configuration setter. :param new_configuration: New configuration to apply. :type new_configuration: :class:`whad.esb.sniffing.SnifferConfiguration` """ self.stop() self.__configuration = new_configuration self._enable_sniffing() @property def channel(self) -> int: """Channel getter. """ return self.__configuration.channel @channel.setter def channel(self, channel=11): """Channel setter. This method stops sniffing, update the current channel to the one specified into the configuration and then return to sniffing on this newly set channel. """ self.stop() self.__configuration.channel = channel self._enable_sniffing()
[docs] def sniff(self, channel: int = None, address: str = None, show_acknowledgements: bool = False, timeout: float = None) -> Generator[Packet, None, None]: """Update current sniffing configuration if required and sniff packets. This function returns after `timeout` seconds, if specified. :param channel: Channel to listen, None to iterate over all possible channels :type channel: int :param address: Device address to target :type address: str :param show_acknowledgements: Sniff packets acknowledgements if set to True (default: False) :type show_acknowledgements: bool :param timeout: Number of seconds after which sniffing will stop, uninterrupted if set to None :type timeout: float """ # Set channel, address and show ack if provided update_config_required = False if show_acknowledgements is not None: self.__configuration.acknowledgements = show_acknowledgements update_config_required = True if address is not None: self.__configuration.address = address update_config_required = True if channel is not None: self.__configuration.channel = channel # Update configuration if at least one parameter has been changed if update_config_required: self.stop() self._enable_sniffing() # Determine message type if self.support_raw_pdu(): message_type = RawPduReceived else: message_type = PduReceived # Sniff packets start = time() try: while True: # Exit if timeout is set and reached if timeout is not None and (time() - start >= timeout): break if self.support_raw_pdu(): message_type = RawPduReceived else: message_type = PduReceived message = self.wait_for_message(filter=message_filter(message_type), timeout=.1) if message is not None and issubclass(message, AbstractPacket): packet = message.to_packet() self.monitor_packet_rx(packet) yield packet # Handle device disconnection except WhadDeviceDisconnected: pass