"""
This module provides a ::class:`whad.esb.connector.prx.PRX` connector that
implements Nordic Semiconductor's *Primary Receiver* role.
The *primary receiver* (*PRX*) role consists in receiving packets from other devices
and sending acknowledgements if required to the sending device. When a device
is in *PRX* mode, it is only capable of receiving ESB packets on a specific
channel, including *ESB pings*.
The :class:`whad.esb.connector.prx.PRX` class relies on a custom protocol stack
to handle acknowledgements and pings automatically.
"""
from typing import Generator
from scapy.packet import Packet
from whad.esb.connector.base import ESB
from whad.esb.stack import ESBStack
from whad.exceptions import UnsupportedCapability
[docs]
class PRX(ESB):
"""
Enhanced ShockBurst Primary Receiver Role (PRX) implementation for compatible WHAD device.
TODO: PRX and PTX shares a lot of code in common, it could be a good idea to
implement this code in a single class that PRX and PTX shall inherit.
"""
def __init__(self, device):
"""Instantiate a new Primary Receiver connector.
:param device: ESB-compatible device
:type device: :class:`whad.device.WhadDevice`
"""
super().__init__(device)
# Check if device can modify its address and enter the PRX role
self.__stack = ESBStack(self)
self.__channel = 8
self.__address = "11:22:33:44:55"
self.__started = False
if not self.can_set_node_address():
raise UnsupportedCapability("SetNodeAddress")
if not self.can_be_prx():
raise UnsupportedCapability("PrimaryReceiverMode")
self._enable_role()
def _enable_role(self):
"""Enable PRX role.
"""
self.set_node_address(self.__address)
self.enable_prx_mode(self.__channel)
if self.__started:
self.start()
@property
def stack(self) -> ESBStack:
"""Return current ESB stack instance.
:return: Current ESB stack instance
:rtype: :class:`whad.esb.stack.ESBStack`
"""
return self.__stack
@property
def channel(self) -> int:
"""Return current channel.
:return: current channel number
:rtype: int
"""
return self.__channel
@channel.setter
def channel(self, channel: int):
"""Set channel.
:param channel: Channel to set
:type channel: int
"""
self.stop()
self.__channel = channel
self._enable_role()
@property
def address(self) -> str:
"""Return current address
:return: ESB address
:rtype: str
"""
return self.__address
@address.setter
def address(self, address: str):
"""Set ESB address
:param address: ESB address to set
:type address: str
"""
self.stop()
self.__address = address
self._enable_role()
[docs]
def start(self):
"""Start PRX mode.
"""
super().start()
self.__started = True
[docs]
def stop(self):
"""Stop PRX mode.
"""
super().stop()
self.__started = False
[docs]
def prepare_acknowledgment(self, ack: bytes):
"""Prepare and send acknowledgement packet
:param ack: Acknowledgement packet to prepare
:type ack: bytes
"""
self.__stack.ll.prepare_acknowledgment(ack)
[docs]
def on_pdu(self, packet: Packet):
"""ESB packet reception callback
:param packet: ESB packet received
:type packet: :class:`scapy.packet.Packet`
"""
self.__stack.on_pdu(packet)
[docs]
def stream(self) -> Generator[Packet, None, None]:
"""Stream received ESB packets
"""
yield from self.__stack.ll.data_stream()