"""
This module provides a :class:`Sniffer` class that derives from :class:`ESB`
to allow *Enhanced ShockBurst* frame sniffing. Sniffed frames are wrapped in
a dedicated *Scapy* layer and therefore are represented as *Scapy* packets.
This :class:`Sniffer` class is used by the generic `wsniff` utility when
*Enhanced ShockBurst* protocol is selected, and the associated
:class:`whad.esb.sniffing.SnifferConfiguration` class holds all the sniffing
parameters that can be set.
By default, the *Enhanced ShockBurst* sniffer sniffs packet on all channels by
looping from channel 0 to 100 over and over and capturing frames that match
the expected format.
"""
from typing import Generator
from scapy.packet import Packet
from whad.device import Device
from whad.exceptions import WhadDeviceDisconnected, UnsupportedCapability
from whad.common.sniffing import EventsManager
from whad.hub.esb import PduReceived, RawPduReceived
from whad.hub.message import AbstractPacket
from .base import ESB
from ..sniffing import SnifferConfiguration
[docs]
class Sniffer(ESB, EventsManager):
"""
Enhanced ShockBurst Sniffer interface for compatible WHAD device.
"""
def __init__(self, device: Device):
ESB.__init__(self, device)
EventsManager.__init__(self)
self.__configuration = SnifferConfiguration()
# Check if device can perform sniffing
if not self.can_sniff():
raise UnsupportedCapability("Sniff")
def _enable_sniffing(self):
"""Configure the underlying ESB connector.
This method parses the current configuration and sets the underlying
ESB connector properties accordingly before starting sniffing packets.
"""
if self.__configuration.scanning:
channel = None
else:
channel = self.__configuration.channel
ack = self.__configuration.acknowledgements
address = self.__configuration.address
super().start_sniff(channel=channel, show_acknowledgements=ack, address=address)
self.start()
@property
def configuration(self) -> SnifferConfiguration:
"""Current configuration getter.
"""
return self.__configuration
@configuration.setter
def configuration(self, new_configuration):
"""Configuration setter.
:param new_configuration: New configuration to apply.
:type new_configuration: :class:`whad.esb.sniffing.SnifferConfiguration`
"""
self.stop()
self.__configuration = new_configuration
self._enable_sniffing()
@property
def channel(self) -> int:
"""Channel getter.
"""
return self.__configuration.channel
@channel.setter
def channel(self, channel=11):
"""Channel setter.
This method stops sniffing, update the current channel to the one
specified into the configuration and then return to sniffing on this
newly set channel.
"""
self.stop()
self.__configuration.channel = channel
self._enable_sniffing()
[docs]
def sniff(self, channel: int = None, address: str = None, show_acknowledgements: bool = False,
timeout: float = None) -> Generator[Packet, None, None]:
"""Update current sniffing configuration if required and sniff packets. This function
returns after `timeout` seconds, if specified.
:param channel: Channel to listen, None to iterate over all possible channels
:type channel: int
:param address: Device address to target
:type address: str
:param show_acknowledgements: Sniff packets acknowledgements if set to True (default: False)
:type show_acknowledgements: bool
:param timeout: Number of seconds after which sniffing will stop,
uninterrupted if set to None
:type timeout: float
"""
# Set channel, address and show ack if provided
update_config_required = False
if show_acknowledgements is not None:
self.__configuration.acknowledgements = show_acknowledgements
update_config_required = True
if address is not None:
self.__configuration.address = address
update_config_required = True
if channel is not None:
self.__configuration.channel = channel
# Update configuration if at least one parameter has been changed
if update_config_required:
self.stop()
self._enable_sniffing()
# Determine message type
if self.support_raw_pdu():
message_type = RawPduReceived
else:
message_type = PduReceived
try:
for message in super().sniff(messages=(message_type), timeout=timeout):
if message is not None and issubclass(message, AbstractPacket):
packet = message.to_packet()
if packet is not None:
self.monitor_packet_rx(packet)
yield packet
# Handle device disconnection
except WhadDeviceDisconnected:
pass