Source code for whad.common.stack.tests

"""Generic stack testing features
"""

from whad.common.stack import Layer, alias

class contextual(object):

    def __init__(self, value):
        self.__value = value

    def __call__(self, func):
        func.is_contextual = self.__value
        return func

[docs] class LayerMessage(object): '''This class is used by `Sandbox` to represent a message between two layers, keeping track of the source, destination, data, tag and named arguments. ''' def __init__(self, source, destination, data, tag='default', **kwargs): self.__source = source self.__destination = destination self.__data = data self.__tag = tag self.__args = kwargs @property def source(self): return self.__source @property def destination(self): return self.__destination @property def data(self): return self.__data @property def tag(self): return self.__tag @property def args(self): return self.__args def __repr__(self): return 'LayerMessage(source="%s", destination="%s", data="%s", tag="%s")' % ( self.source, self.destination, bytes(self.data), self.tag ) def __eq__(self, other): # check main properties if (self.source != other.source) or (self.destination != other.destination) \ or (self.data != other.data) or (self.tag != other.tag): return False # check arguments for arg in self.args: if arg not in other.args: print('LayerMessage - comparison failed: missing arg %s' % arg) return False if self.args[arg] != other.args[arg]: print('LayerMessage - comparison failed: arg %s values are different (%s, %s)' % ( arg, self.args[arg], other.args[arg]) ) return False # Same. print('same') return True
[docs] @alias('sandbox') class Sandbox(Layer): """This layer is used as a container for one or more layers that need to be tested. """ def __init__(self, parent=None, layer_name=None, options={}, target=None): super().__init__(parent=parent, layer_name=layer_name, options=options) self.messages = [] #self.target = target.alias #self.target_class = target
[docs] def populate(self, options={}): '''Populate static layers and install a custom message monitor callback ''' super().populate(options=options) # Install a monitor callback on all sub-layers for layer in self.layers: self.layers[layer].register_monitor_callback(self.log_message)
[docs] def instantiate(self, clazz): '''Instantiate a layer and install a custom message monitor callback ''' layer = super().instantiate(clazz) layer.register_monitor_callback(self.log_message) return layer
[docs] def get_layer(self, name: str): '''Retrieve a specific layer if one of ours, or this instance if layer cannot be found. ''' layer = super().get_layer(name) if layer is None: self.destination = self return self else: return layer
[docs] def get_handler(self, source, tag): '''This method is called by our stack message handler search code, and is required here to return our dummy message handler. ''' return self.dummy_message_handler
[docs] def log_message(self, source, destination, data, tag='default', **kwargs): '''Layer message monitoring. ''' self.messages.append(LayerMessage( source, destination, data, tag=tag, **kwargs ))
[docs] @contextual(True) def dummy_message_handler(self, source, data, **kwargs): """Dummy message handler, does nothing. """ pass
[docs] def expect(self, messages, strict=False): '''Checks if one or more messages have been captured. ''' if isinstance(messages, list): if strict and len(self.messages) != len(messages): return False if len(self.messages) >= len(messages): for i in range(len(self.messages)): if messages[i] != self.messages[i]: return False return True else: return False elif isinstance(messages, LayerMessage): if len(self.messages) == 0: print('no message received') return False # Check if this message has been seen in our monitored messages for msg in self.messages: if msg == messages: return True return False