Whad protocol stack model

Whad supports different wireless protocols such as Bluetooth Low Energy or ZigBee and provides a protocol stack for both of them. Protocol stacks are an important component of Whad as one may need to alter it in order to modify the behavior of an emulated device or simply assess the security of a target protocol stack. This is why Whad provides a generic protocol stack model that can be used to implement any protocol stack with ease, and to make it compatible with other stack-related tools provided by Whad.

Our generic protocol stack model considers any wireless protocol as a graph of individual stack layer able to communicate one with another. Each layer of a protocol stack is defined as a Python class inherated from whad.common.stack.Layer and is used to process incoming data (packets or messages) and forward it to upper layers for further processing. Data/messages are then making their way through the protocol stack and this latter can react to them and send data back.

Generic stack layer

Whad provides a layer model class whad.common.stack.Layer that provides some features to ease the implementation of protocol stacks. A protocol is seen as a graph of various stack layers that is able to process incoming packets and to output other packets, as shown below:

digraph foo {
    center=true;

    root [label="Phy", peripheries=2];
    ether [label="Ether"];
    icmp [label="ICMP"];
    ip [label="IP"];
    udp [label="UDP"];
    tcp [label="TCP"];

    root -> ether;
    ether -> ip;
    ether -> icmp;
    ip -> udp;
    ip -> tcp;
    tcp -> ip;
    udp -> ip;
    ip -> ether;
    ether -> root;
    icmp -> ether;
   }

Example of a protocol stack

From an implementation point-of-view, every layer that belongs to an upper layer is declared as a sub-layer of this upper layer, thus describing a hierarchy. The above protocol stack does in fact uses this hierarchy:

digraph foo {
    center=true;
    rankdir="TB";


    root [label="Phy", peripheries=2];

    subgraph cluster_ether {
        label="Ethernet";

        subgraph cluster_ip {
            rankdir="BT";
            label="IP";
            
            subgraph cluster_tcp {
                label="TCP";
                tcp [label="TCP"];
            }

            subgraph cluster_udp {
                label="UDP";
                udp [label="UDP"];            
            }

            ip [label="IP"];

            ip -> tcp;
            ip -> udp;
            tcp -> ip;
            udp -> ip;
        }

        subgraph cluster_icmp {
            label="ICMP";

            icmp [label="ICMP"];
        }

        ether [label="Ether"];
        ether -> ip;
        ip -> ether;
        ether -> icmp;
        icmp -> ether;  
    }

    root -> ether;
    ether -> root;

   }

Protocol stack with hierarchy

Creating a protocol stack layer

A protocol stack layer in Whad is defined by a class that must inherits from whad.common.stack.Layer. Each stack layer must have an alias defined, a textual name that will be used inside the protocol stack to reference a specific class.

Each layer recieves data from a lower layer, processes it and forwards it to an upper layer. Data flow is automatically handled by the whad.common.stack.Layer class.

Each layer has its own state structure to maintain, managed by default by whad.common.stack.Layer, allowing snapshots of a layer to be taken at any time.

Declaring a new protocol stack layer

A new protocol stack layer must inherits from whad.common.stack.Layer and has an alias set thanks to the whad.common.stack.alias decorator:

from whad.common.stack import Layer, alias

@alias('ether')
class Ethernet(Layer):

    def configure(self, options={}):
        if 'something' in options:
            self.state.something = options['something']

The above code declares a new protocol layer class Ethernet with an alias ether and overrides the configure() method to modify its state property something with the value provided in the options dictionary passed in arguments.

This layer cannot process incoming data yet nor send data to another layer, as it is the only one declared. We need one more layer so let’s declare a PHY layer:

from whad.common.stack import Layer, alias

@alias('ether')
class Ethernet(Layer):

    def configure(self, options={}):
        if 'something' in options:
            self.state.something = options['something']

@alias('phy')
class Phy(Layer):

    def configure(self, options={}):
        pass

Allright, now we have two different layers declared, it’s time to add some interaction between them.

Data processing

A layer processes incoming data by registering one of its method as a handler for data coming from a specific source layer. This is a core mechanism implemented in whad.common.stack.Layer and the most convenient way to pass data from one layer to another. Data can be a scapy packet, raw bytes or custom structures, but mostly Scapy packets.

To register a handler for a specific source, use the whad.common.stack.source decodator as shown below:

from whad.common.stack import Layer, alias, source

@alias('ether')
class Ethernet(Layer):

    def configure(self, options={}):
        if 'something' in options:
            self.state.something = options['something']

    @source('phy')
    def on_packet_received(self, packet):
        '''Process incoming packets from the PHY layer.
        '''
        pass

@alias('phy')
class Phy(Layer):

    def configure(self, options={}):
        pass

Phy.add(Ethernet)

The Ethernet layer class has now a registered handler that will be called every time the Phy layer sends it some data with the whad.common.stack.Layer.send() method. The Ethernet layer also needs to be added as a sub-layer of the Phy layer for this communication mechanism to work properly (if not, the Phy layer won’t be able to find the registered handler in the Phy layer). This is how upper layers of the protocol stack are attached to their lower layers.

Our Phy layer needs to expose a method to be notified of the reception of a packet. This method will be called by some code outside of this protocol stack, providing every received Scapy packets to the Phy layer. Let’s call this method on_packet_received:

from whad.common.stack import Layer, alias, source
from scapy.all import *

@alias('ether')
class EthernetLayer):

    def configure(self, options={}):
        if 'something' in options:
            self.state.something = options['something']

    @source('phy')
    def on_packet_received(self, packet):
        '''Process incoming packets from the PHY layer.
        '''
        pass

@alias('phy')
class Phy(Layer):

    def configure(self, options={}):
        pass

    def on_packet_received(self, packet: Packet):
        pass

Now we need to tell the Phy layer to forward a packet to the Ethernet layer if it contains an ethernet header. This is done by using scapy haslayer() method combined with the whad.common.stack.Layer.send():

from whad.common.stack import Layer, alias, source
from scapy.all import *

@alias('ether')
class EthernetLayer):

    def configure(self, options={}):
        if 'something' in options:
            self.state.something = options['something']

    @source('phy')
    def on_packet_received(self, packet):
        '''Process incoming packets from the PHY layer.
        '''
        pass

@alias('phy')
class Phy(Layer):

    def configure(self, options={}):
        pass

    def on_packet_received(self, packet: Packet):
        if packet.haslayer(Ether):
            self.send('ether', packet.getlayer(Ether))

When a packet is received by the Phy layer, it is sent to the Ethernet layer through its registered handler on_packet_received. It is then possible to process it, and send an answer back to the Phy layer using the same mechanism:

from whad.common.stack import Layer, alias, source
from scapy.all import *

@alias('ether')
class EthernetLayer):

    def configure(self, options={}):
        if 'something' in options:
            self.state.something = options['something']

    @source('phy')
    def on_packet_received(self, packet):
        '''Process incoming packets from the PHY layer.
        '''
        # Send back the packet
        self.send('phy', packet)

@alias('phy')
class Phy(Layer):

    def configure(self, options={}):
        pass

    def on_packet_received(self, packet: Packet):
        if packet.haslayer(Ether):
            self.send('ether', packet.getlayer(Ether))

    @source('ether')
    def on_ether_packet(self, packet):
        print('Received a packet from Ether layer:')
        packet.show()

Let’s try this small protocol stack in a nutshell:

from whad.common.stack import Layer, alias, source
from scapy.all import *

@alias('ether')
class Ethernet(Layer):

    def configure(self, options={}):
        if 'something' in options:
            self.state.something = options['something']

    @source('phy')
    def on_packet_received(self, packet):
        '''Process incoming packets from the PHY layer.
        '''
        # Send back the packet
        self.send('phy', packet)

@alias('phy')
class Phy(Layer):

    def configure(self, options={}):
        pass

    def on_packet_received(self, packet: Packet):
        if packet.haslayer(Ether):
            self.send('ether', packet.getlayer(Ether))

    @source('ether')
    def on_ether_packet(self, packet):
        print('Received a packet from Ether layer:')
        packet.show()

Phy.add(Ethernet)

if __name__ == '__main__':

    # Instantiate our protocol stack
    my_stack = Phy()

    # Pass a packet to our stack
    packet = Ether())/IP(src="192.168.1.1", dest="192.168.2.2")/TCP()
    my_stack.on_packet_received(packet)

It produces the following output:

Received a packet from Ether layer:
###[ Ethernet ]###
dst       = ff:ff:ff:ff:ff:ff
src       = d4:3b:04:2c:ad:12
type      = IPv4
###[ IP ]###
    version   = 4
    ihl       = None
    tos       = 0x0
    len       = None
    id        = 1
    flags     =
    frag      = 0
    ttl       = 64
    proto     = tcp
    chksum    = None
    src       = 192.168.1.1
    dst       = 192.168.1.2
    \options   \
###[ TCP ]###
        sport     = ftp_data
        dport     = http
        seq       = 0
        ack       = 0
        dataofs   = None
        reserved  = 0
        flags     = S
        window    = 8192
        chksum    = None
        urgptr    = 0
        options   = ''

Our Phy layer has correctly sent the received packet from the Ethernet layer.

Note

A tag parameter is also supported by both the source decorator and the whad.common.stack.Layer.send() method to allow filtering on the source layer and a specific tag.

Layer State management

Of course, actual protocol stacks implemented on top of whad.common.stack.Layer have to maintain a state while handling incoming and outgoing packets. The stack state is composed of each sub-layer’ state which are maintained by these layers themselves.

Remember, each layer has its own state exposed in its state property. This state is by default an instance of whad.common.stack.LayerState that behaves like a dictionary with keys mapped as properties. It is possible to create a new state class in order to provide custom methods to make its manipulation easier, and to associate this specific class with a specific layer class thanks to the whad.common.stack.state decorator:

from whad.common.stack import Layer, LayerState, alias, source, state
from scapy.all import *

class EthernetState(LayerState):

    def __init__(self):
        super().__init__()
        self.macs = []

    def clear(self):
        self.macs = []

    def add_mac_address(self, mac):
        if mac not in self.macs:
            self.macs.append(mac)

    def has_mac_address(self, mac):
        return mac in self.macs

    def remove_mac_address(self, mac):
        if mac in self.macs:
            self.macs.remove(mac)

@state(EthernetState)
@alias('ether')
class Ethernet(Layer):

    def configure(self, options={}):
        self.state.clear()

    @source('phy')
    def on_packet_received(self, packet):
        '''Process incoming packets from the PHY layer.
        '''
        # Add source mac to our mac address book
        self.state.add_mac_address(packet.getlayer(Ether).src)

        # Send back the packet
        self.send('phy', packet)

A layer state can be retrieved with the whad.common.stack.Layer.save() method and loaded with the whad.common.stack.Layer.load() method. The example below demonstrates how to save and reload the state of out stack:

# Create an instance of our stack and save its state
my_stack = Phy()
stack_state = my_stack.save()

# Reload the state of our stack
my_stack.load(stack_state)

Protocol stack instantiation

Once a protocol stack implemented using this generic model, it can be easily instantiated using the root layer class (i.e. the PHY layer), as shown below:

from whad.common.stack import Layer, LayerState, alias, source, state
from scapy.all import *

class EthernetState(LayerState):

    def __init__(self):
        super().__init__()
        self.macs = []

    def clear(self):
        self.macs = []

    def add_mac_address(self, mac):
        if mac not in self.macs:
            self.macs.append(mac)

    def has_mac_address(self, mac):
        return mac in self.macs

    def remove_mac_address(self, mac):
        if mac in self.macs:
            self.macs.remove(mac)

@state(EthernetState)
@alias('ether')
class Ethernet(Layer):

    def configure(self, options={}):
        # Clear state
        self.state.clear()
        self.log_macs = False

        # Check if we are asked to log mac addresses
        if 'log_macs' in options:
            if options['log_macs'] == True:
                self.log_macs = True


    @source('phy')
    def on_packet_received(self, packet):
        '''Process incoming packets from the PHY layer.
        '''
        if self.log_macs:
            # Add source mac to our mac address book
            self.state.add_mac_address(packet.getlayer(Ether).src)

        # Send back the packet
        self.send('phy', packet)


@alias('phy')
class Phy(Layer):

    def configure(self, options={}):
        pass

    def on_packet_received(self, packet: Packet):
        if packet.haslayer(Ether):
            self.send('ether', packet.getlayer(Ether))

    @source('ether')
    def on_ether_packet(self, packet):
        print('Received a packet from Ether layer:')
        packet.show()

Phy.add(Ethernet)

my_stack = Phy(options={
    'ether': {
        'log_macs': True
    }
})

When a layer is instantiated, be it a root layer or not, every registered sub-layer is automatically instantiated as well and configured using the main options dictionnary. This options dictionnary may contain a key named the same as a registered sub-layer, and if so the value corresponding to this key is used as an options parameter when this sub-layer is configured.

Note

Each sub-layer is only instantiated once and only once in the whole protocol stack graph, with the provided options (unless it is a contextual layer).

The protocol stack instantiation shown above can then be feed with a custom packet, as shown below:

my_stack.on_packet_received(Ether()/IP()/TCP())

Contextual layers

When it is required to implement a multiplexing/demultiplexing layer, contextual layers are a great help. Multiplexing/demultiplexing could make your life easier when you have to deal with multiple links combined in one physical layer, such as TCP connections for instance.

A contextual layer is not automatically instantiated when the protocol stack is instantiated but when it is required. The layer that instantiates a contextual layer is generally in charge of multiplexing/demultplexing the incoming/outgoing data. When a contextual layer is instantiated, it is automatically registered as a sub-layer but with a generated name. Let’s consider the following contextual layer:

from whad.common.stack import ContextualLayer, alias

@alias('ip')
class IPLayer(ContextualLayer):

    def configure(self, options={}):
        pass

When instantiated, the instance will be in the form ip#0. The next layer instantiated will be named ip#1, and so on.

Let’s get back to our example protocol stack and consider using the above contextual layer. We need to create as many IPLayer instances as destination IP addresses we have to handle. This is done this way:

from whad.common.stack import Layer, LayerState, ContextualLayer, alias, source, state, instance
from scapy.all import *

@alias('ip')
class IPLayer(ContextualLayer):

    def configure(self, options={}):
        pass

    @source('ether')
    def on_ip_packet(self, packet):
        '''Simply echoes the packet to the Ethernet layer.
        '''
        self.send('ether', packet)


class EthernetState(LayerState):
    '''This class implements a custom state for the Ethernet layer.

    This state will keep track of every stream identified by a source
    MAC address and source IP address.
    '''

    def __init__(self):
        super().__init__()
        self.clear()

    def clear(self):
        self.streams = {}

    def has_stream(self, mac, ip):
        '''Check if a stream is already handled by an IPLayer instance.
        '''
        return ((mac,ip) in self.streams.values())

    def register_stream(self, mac, ip, layer):
        '''Register a new IPLayer instance for a specified IP/MAC
        '''
        self.streams[layer] = (mac, ip)

    def get_stream(self, mac, ip):
        '''Retrieve the IPLayer instance name associated with the given
        IP/MAC addresses.
        '''
        for layer in self.streams:
            m,i = self.streams[layer]
            if m==mac and i==ip:
                return layer
        return None

    def get_mac_by_layer(self, layer_name):
        '''Retrieve the MAC address associated with a specific IPLayer
        instance.
        '''
        if layer_name in self.streams:
            return self.streams[layer_name][0]
        else:
            return None


@state(EthernetState)
@alias('ether')
class Ethernet(Layer):

    def configure(self, options={}):
        self.state.clear()

    @source('phy')
    def on_packet_received(self, packet):
        '''Process incoming packets from the PHY layer.
        '''
        if packet.haslayer(IP):
            # get packet source IP and MAC
            packet_ip = packet.getlayer(IP).src
            packet_mac = packet.getlayer(Ether).src

            # If we already know this IP address
            if self.state.has_stream(packet_mac, packet_ip):
                # Retrieve the associated MAC and instantiated layer name
                ip_layer = self.state.get_stream(packet_mac, packet_ip)

                print('IP address already known, forward to %s' % ip_layer)

                # Send this IP packet to the corresponding layer name
                self.send(ip_layer, packet.getlayer(IP))
            else:
                print('New IP address seen: %s' % packet_ip)

                # Create a new IP layer
                ip_layer_obj = self.instantiate(IPLayer)
                print('Instantiated a new layer: %s' % ip_layer_obj.name)

                # Register our source MAC and IP address with our new layer name ('ip#0')
                self.state.register_stream(packet_mac, packet_ip, ip_layer_obj.name)

                # Send packet to this new sub-layer
                self.send(ip_layer_obj.name, packet.getlayer(IP))

    @instance('ip')
    def on_ip_packet_received(self, source, packet):
        '''Handling packets sent by an instantiated IPLayer

        It is important to note the use of @instance rather than @source,
        as @instance will provide the handler the source layer that sent
        a message.
        '''
        # Search mac address belonging to this source layer
        src_mac = self.state.get_mac_by_layer(source)
        if src_mac is not None:
            # Mac is known, encapsulate our packet and send to PHY
            self.send('phy', Ether(src=src_mac)/packet)
@alias('phy')
class Phy(Layer):

    def configure(self, options={}):
        pass

    def on_packet_received(self, packet: Packet):
        if packet.haslayer(Ether):
            self.send('ether', packet.getlayer(Ether))

    @source('ether')
    def on_ether_packet(self, packet):
        print('Received a packet from Ether layer')
        #packet.show()

# Assemble our stack
Ethernet.add(IPLayer)
Phy.add(Ethernet)

if __name__ == '__main__':

    # Instantiate our protocol stack
    my_stack = Phy()

    # Pass some packets to our stack
    packets = [
        Ether()/IP(src="192.168.1.1", dst="192.168.2.2")/TCP(),
        Ether()/IP(src="192.168.1.1", dst="192.168.2.2")/TCP(),
        Ether()/IP(src="192.168.1.2", dst="192.168.2.2")/TCP()
    ]

    for packet in packets:
        my_stack.on_packet_received(packet)

    # Display the current state of the stack
    print(my_stack.save())

Using contextual layers dynamically adds layer nodes to the stack graph starting from a layer that performs multiplexing/demultiplexing operations. Thus, the upper layer don’t have to mess with information related to the lower layers and let the mux/demux layer assembles everything.

Therefore, the number of active layers when the stack is running may vary, and the stack state reflects this fact.

Visualizing a stack

A generic layer can generate a DOT file including all its sub-layers and contextual layers, using the whad.common.stack.Layer.export() method, as shown below:

Phy.export('mystack.dot')
digraph T {
rankdir=LR;
node [label="phy", fontsize=12] phy;
node [label="ether", fontsize=12] ether;
subgraph cluster_ip {
style=filled;
color=lightgrey;
node [label="ip", fontsize=12] ip;
}
ether -> phy;
ip -> ether;
phy -> ether;
ether -> ip;
}

Our example stack

Testing protocol layers

This generic stack model also provides some tools to implement one or more unit tests for a given layer, such as whad.common.stack.tests.Sandbox and whad.common.stack.tests.LayerMessage.

Layer sandboxing

whad.common.stack.tests.Sandbox is a special class that behaves like a protocol layer but captures every message sent between any layers, thus allowing to check if a specific layer is correctly implemented.

This class must be used as a layer container as shown below:

import pytest
from whad.common.stack.tests import Sandbox, LayerMessage

# Import our previously declared protocol layer
from . import Ether

@alias('phy')
class PhyMock(Sandbox):
    pass
PhyMock.add(Ether)

Pytest-based tests

We then can implement one or more unit tests using pytest and this sandbox:

import pytest
from whad.common.stack.tests import Sandbox, LayerMessage

# Import our previously declared protocol layer
from . import Ether

@alias('phy')
class PhyMock(Sandbox):
    pass
PhyMock.add(Ether)

class TestEtherLayer(object):

    @pytest.fixture
    def phy(self):
        return PhyMock()

    def test_packet_processing(self, phy):
        '''This test function relies on the `phy` fixture that will create
        a sandbox containing an instance of the `Ethernet` layer.
        '''
        # We send a test packet to the Ether layer
        packet = Ether(src='00:11:22:33:44:55')/IP(src="192.168.1.1", dst="192.168.2.2")/TCP()
        phy.send('ether', packet)

        # Message has been processed, we should have seen a message sent back to
        # the phy layer
        assert phy.expect(LayerMessage(
            'ether',
            'phy',
            packet
        ))

        # We also checks that a new layer has been created
        assert (phy.get_layer('ip#0') is not None)

The whad.common.stack.tests.LayerMessage class holds all the information sent by a layer to another and is used by the whad.common.stack.tests.Sandbox.expect() method to check if such a message has been observed during the test. The contained layers can also be accessed such as the ip#0 layer in our example to check if some of their properties match expected values.