Source code for suitcase.fields

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Copyright (c) 2015 Digi International Inc. All Rights Reserved.
import struct

import six
from suitcase.exceptions import SuitcaseChecksumException, SuitcaseProgrammingError, \
    SuitcaseParseError, SuitcaseException, SuitcasePackStructException
from six import BytesIO, StringIO


[docs]class FieldPlaceholder(object): """Internally used object that holds information about a field schema A FieldPlaceholder is what is actually instantiated and stored with the message schema class when a message schema is declared. The placeholder stores all instantiation information needed to create the fields when the message object is instantiated """ # record the number of instantiations of fields. This is how # we can track the order of fields within a message _global_seqno = 0 def __init__(self, cls, args, kwargs): self._field_seqno = FieldPlaceholder._global_seqno FieldPlaceholder._global_seqno += 1 self.cls = cls self.args = args self.kwargs = kwargs
[docs] def create_instance(self, parent): """Create an instance based off this placeholder with some parent""" self.kwargs['instantiate'] = True self.kwargs['parent'] = parent instance = self.cls(*self.args, **self.kwargs) instance._field_seqno = self._field_seqno return instance
[docs]class BaseField(object): """Base class for all Field instances Some magic is used whenever a field is created normally in order to allow for a declarative syntax without having to create explicit factory methods/classes everywhere. By default (for a normal call) when a field is instantiated, it actually ends up returning a FieldPlaceholder object containing information about the field that has been declared. To get an actual instance of the field, the call to the construction must include a keyword argument ``instantiate`` that is set to some truthy value. Typically, this instantiation logic is done by calling ``FieldPlaceholder.create_instance(parent)``. This is the recommended way to construct a field as it ensure all invariants are in place (having a parent). :param instantiate: Create an actual instance instead of a placeholder :param parent: Specify the parent of this field (typically a Structure instance). """ def __new__(cls, *args, **kwargs): instantiate = kwargs.pop('instantiate', False) if instantiate: return super(BaseField, cls).__new__(cls) else: return FieldPlaceholder(cls, args, kwargs) def __init__(self, *args, **kwargs): self._value = None self._parent = kwargs.get('parent') def _ph2f(self, placeholder): """Lookup a field given a field placeholder""" return self._parent.lookup_field_by_placeholder(placeholder) def __repr__(self): return repr(self.getval()) def getval(self): return self._value def setval(self, value): self._value = value
[docs]class CRCField(BaseField): r"""Field representing CRC (Cyclical Redundancy Check) in a message CRC checks and calculation frequently work quite differently from other fields in a protocol and as such are treated differently by the message container. In particular, a CRCField requires special steps at either the beginning or end of the message pack/unpack process. For each CRCField, we specify the following: :param field: The underlying field defining how the checksum will be stored. This should match the size of the checksum in whatever algorithm is being used (16 bits for CRC16). :param algo: The algorithm to be used for performing the checksum. This is basically a function that takes a chunk of data and gives the checksum. Several of these are provided out of the box in ``suitcase.crc``. :param start: The offset in the overall message at which we should start the checksum. This may be positive (from the start) or negative (from the end of the message). :param end: The offset in the overall message at which we should end the checksum algo. This may be positive (from the start) or negative (from the end of the message). A quick example of a message with a checksum is in order:: class MyChecksummedMessage(Structure): soh = Magic('\x1f\x1f') message_id = UBInt16() sequence_number = UBInt8() payload_length = LengthField(UBInt16()) payload = VariableRawPayload(payload_length) # crc starts after soh and ends before crc crc = CRCField(UBInt16(), crc16_ccitt, 2, -3) eof = Magic('~') """ def __init__(self, field, algo, start, end, **kwargs): BaseField.__init__(self, **kwargs) self.field = field.create_instance(self._parent) self.field.setval(0) self.algo = algo self.start = start self.end = end self._value = None @property def bytes_required(self): return self.field.bytes_required
[docs] def validate(self, data, offset): """Raises :class:`SuitcaseChecksumException` if not valid""" recorded_checksum = self.field.getval() # convert negative offset to positive if offset < 0: offset += len(data) # replace checksum region with zero data = b''.join((data[:offset], b"\x00" * self.bytes_required, data[offset + self.bytes_required:])) actual_checksum = self.algo(data[self.start:self.end]) if recorded_checksum != actual_checksum: raise SuitcaseChecksumException( "recorded checksum %r did not match actual %r. full data: %r", recorded_checksum, actual_checksum, data)
[docs] def packed_checksum(self, data): """Given the data of the entire packet return the checksum bytes""" self.field.setval(self.algo(data[self.start:self.end])) sio = BytesIO() self.field.pack(sio) return sio.getvalue()
def getval(self): return self.field.getval() def setval(self, *args): raise SuitcaseProgrammingError("CRC will be set automatically") def pack(self, stream): # write placeholder during the first pass stream.write(b'\x00' * self.field.bytes_required) def unpack(self, data): self.field.unpack(data)
[docs]class Magic(BaseField): """Represent Byte Magic (fixed, expected sequence of bytes)""" def __init__(self, expected_sequence, **kwargs): BaseField.__init__(self, **kwargs) self.expected_sequence = expected_sequence self.bytes_required = len(self.expected_sequence) def getval(self): return self.expected_sequence def setval(self, *args): raise SuitcaseProgrammingError("One does not simply modify Magic") def pack(self, stream): stream.write(self.expected_sequence) def unpack(self, data): if not data == self.expected_sequence: raise SuitcaseParseError( "Expected sequence %r for magic field but got %r on " "message %r" % (self.expected_sequence, data, self._parent)) def __repr__(self): return "Magic(%r)" % (self.expected_sequence,)
[docs]class FieldProperty(BaseField): """Provide the ability to define "Property" getter/setters for other fields This is useful and preferable and preferable to inheritance if you want to provide a different interface for getting/setting one some field within a message. Take the test case as an example:: # define the message class MyMessage(Structure): _version = UBByteSequence(2) version = FieldProperty(_version, onget=lambda v: "%d.%02d" % (v[0], v[1]), onset=lambda v: tuple(int(x) for x in v.split(".", 1))) msg = MyMessage() msg.unpack('\x10\x03') self.assertEqual(msg._version,(16, 3)) self.assertEqual(msg.version, "16.03") msg.version = "22.7" self.assertEqual(msg._version, (22, 7)) self.assertEqual(msg.version, "22.07") :param field: The field that is wrapped by this FieldProperty. The value of this field is passed into or set by the associated get/set fns. :param onget: This is a function pointer to a function called to mutate the value returned on field access. The function receives a single argument containing the value of the wrapped field. :oaram onset: This is a function pointer to a function called to map between the property and the underlying field. The function takes a single parameter which is the value the property was set to. It should return the value that the underlying field expects (or raise an exception if appropriate). """ def __init__(self, field, onget=None, onset=None, **kwargs): BaseField.__init__(self, **kwargs) self.onget = onget self.onset = onset self.field = self._ph2f(field) self.bytes_required = 0 def _default_onget(self, value): return value def _default_onset(self, value): return value def getval(self): onget = self.onget if onget is None: onget = self._default_onget value = self.field.getval() return onget(value) def setval(self, value): onset = self.onset if onset is None: onset = self._default_onset self.field.setval(onset(value)) def unpack(self, data): pass def pack(self, stream): pass
[docs]class DispatchField(BaseField): """Decorate a field as a dispatch byte (used as conditional) A DispatchField is always used with a DispatchTarget within the same message (at some level). Example:: class MyMessage(Structure): type = DispatchField(UBInt8()) body = DispatchTarget(dispatch_field=type, dispatch_mapping={ 0x00: MessageType0, 0x01: MessageType1, }) :param field: The field containing the dispatch parameter. This is typically an integer but could be any hashable object. """ def __init__(self, field, **kwargs): BaseField.__init__(self, **kwargs) self.field = field.create_instance(self._parent) @property def bytes_required(self): return self.field.bytes_required def getval(self): return self.field.getval() def setval(self, value): return self.field.setval(value) def __repr__(self): return repr(self.field) def pack(self, stream): return self.field.pack(stream) def unpack(self, data): assert len(data) == self.bytes_required return self.field.unpack(data)
[docs]class DispatchTarget(BaseField): """Represent a conditional branch on some DispatchField Example:: class MyMessage(Structure): type = DispatchField(UBInt8()) body = DispatchTarget(dispatch_field=type, dispatch_mapping={ 0x00: MessageType0, 0x01: MessageType1, }) :param length_provider: The field providing a length value binding this message (if any). Set this field to None to leave unconstrained. :param dispatch_field: The field being target for dispatch. In most protocols this is an integer type byte or something similar. The possible values for this field act as the keys for the dispatch. :param dispatch_mapping: This is a dictionary mapping dispatch_field values to associated message types to handle the remaining processing. """ def __init__(self, length_provider, dispatch_field, dispatch_mapping, **kwargs): BaseField.__init__(self, **kwargs) if length_provider is None: self.length_provider = None else: self.length_provider = self._ph2f(length_provider) self.length_provider.associate_length_consumer(self) self.dispatch_field = self._ph2f(dispatch_field) self.dispatch_mapping = dispatch_mapping self.inverse_dispatch_mapping = dict((v, k) for (k, v) in dispatch_mapping.items()) def _lookup_msg_type(self): target_key = self.dispatch_field.getval() target = self.dispatch_mapping.get(target_key, None) if target is None: target = self.dispatch_mapping.get(None, None) return target @property def bytes_required(self): if self.length_provider is None: return None else: return self.length_provider.get_adjusted_length() def getval(self): return self._value def setval(self, value): try: vtype = type(value) key = self.inverse_dispatch_mapping[vtype] except KeyError: raise SuitcaseProgrammingError("The type specified is not in the " "dispatch table") # OK, things check out. Set both the value here and the # type byte value self._value = value value._parent = self._parent self.dispatch_field.setval(key) def pack(self, stream): return self._value._packer.write(stream) def unpack(self, data): target_msg_type = self._lookup_msg_type() if target_msg_type is None: target_msg_type = self.dispatch_mapping.get(None) if target_msg_type is None: # still none raise SuitcaseParseError("Input data contains type byte not" " contained in mapping") message_instance = target_msg_type() self.setval(message_instance) self._value.unpack(data)
[docs]class LengthField(BaseField): """Wraps an existing field marking it as a LengthField This field wraps another field which is assumed to return an integer value. A LengthField can be pointed to by a variable length field and the two will be coupled appropriately. :param length_field: The field providing the actual length value to be used. This field should return an integer value representing the length (or a multiple of the length) as a return value :param get_length: If specified, this is a function which takes a reference to the encapsulated field and is responsible for reading the length out from that field. By default, this just reads the value of the field (field is expected to be an integral value). :param set_length: If specified, this is a function which takes a reference to the encapsulated field and a length value. Its responsibility is to move the provided length value into the field. By default, the length value is just assigned to the field via `setval()`. :param multiplier: If specified, this multiplier is applied to the length. If I specify a multiplier of 8, I am saying that each bit in the length field represents 8 bytes of actual payload length. By default the multiplier is 1 (1 bit/byte). """ def __init__(self, length_field, get_length=None, set_length=None, multiplier=1, **kwargs): BaseField.__init__(self, **kwargs) self.multiplier = multiplier self.get_length = self._default_get_length if get_length is None else get_length self.set_length = self._default_set_length if set_length is None else set_length self.length_field = length_field.create_instance(self._parent) self.length_value_provider = None def __repr__(self): return repr(self.length_field) def _default_get_length(self, field): return field.getval() def _default_set_length(self, field, length): field._value = length # TODO: use setval() [problem with DependentField tests]? @property def bytes_required(self): return self.length_field.bytes_required def getval(self): return self.length_field.getval() def setval(self, value): raise SuitcaseProgrammingError("Cannot set the value of a LengthField") def associate_length_consumer(self, target_field): def _length_value_provider(): sio = BytesIO() target_field.pack(sio) target_field_length = len(sio.getvalue()) if not target_field_length % self.multiplier == 0: raise SuitcaseProgrammingError("Payload length not divisible " "by %s" % self.multiplier) return (target_field_length // self.multiplier) self.length_value_provider = _length_value_provider def pack(self, stream): if self.length_value_provider is None: raise SuitcaseException("No length_provider added to this LengthField") self.set_length(self.length_field, self.length_value_provider()) self.length_field.pack(stream) def unpack(self, data): assert len(data) == self.bytes_required return self.length_field.unpack(data) def get_adjusted_length(self): return self.get_length(self.length_field) * self.multiplier
[docs]class TypeField(BaseField): """Wraps an existing field marking it as a TypeField This field wraps another field which is assumed to return an integer value. A TypeField can be pointed to by a variable length field and will fix that field's length. :param type_field: The field providing the type id that will be used to lookup a length value :param length_mapping: This is a dictionary mapping type_field values to associated length values. For example, a TypeField could be used as a replacement for a DispatchField so that the associated DispatchTarget is non-greedy, without the existence of a seperate LengthField. This enables a greedy field to follow the dispatch field, without the usage of complex ConditionalField setups:: class Structure8Bit(Structure): value = UBInt8() class Structure16Bit(Structure): value = UBInt16() class DispatchStructure(Structure): type = TypeField(UBInt8(), {0x40: 1, 0x80: 2}) # Here the TypeField is providing both a size and type id dispatch = DispatchTarget(type, type, {0x40: Structure8Bit, 0x80: Structure16Bit}) greedy = Payload() """ def __init__(self, type_field, length_mapping, **kwargs): BaseField.__init__(self, **kwargs) self.type_field = type_field.create_instance(self._parent) self.length_mapping = length_mapping self.length_value_provider = None def _lookup_msg_length(self): target_key = self.type_field.getval() target_length = self.length_mapping.get(target_key, None) if target_length is None: target_length = self.length_mapping.get(None, None) return target_length def __repr__(self): return repr(self.type_field) @property def bytes_required(self): return self.type_field.bytes_required def getval(self): return self.type_field.getval() def setval(self, value): self.type_field.setval(value) def associate_length_consumer(self, target_field): def _length_value_provider(): sio = BytesIO() target_field.pack(sio) target_field_length = len(sio.getvalue()) if target_field_length != self.get_adjusted_length(): raise SuitcaseProgrammingError("Payload length %i does not" " match length %i specified by type" % (target_field_length, self.get_adjusted_length())) return target_field_length self.length_value_provider = _length_value_provider def pack(self, stream): if self.length_value_provider is None: raise SuitcaseException("No length_provider added to this TypeField") # This will throw a SuitcasePackException if the length is not correct self.length_value_provider() self.type_field.pack(stream) def unpack(self, data): assert len(data) == self.bytes_required return self.type_field.unpack(data) def get_adjusted_length(self): return self._lookup_msg_length()
[docs]class ConditionalField(BaseField): """Field which may or may not be included depending on some condition In some protocols, there exist fields which may or may not be present depending on the values of other fields that would have already been parsed at this point in time. Wrapping such fields in a ConditionalField allows us to define a function to examine that state and only have the field capture the bytes if the conditions are right. :param field: The field which should handle the parsing if the condition evaluates to true. :param condition: This is a function which is given access to the parent message that is expected to return a boolean value. If the value is true, then ``field`` will handle the bytes. If not, then the field will be skipped and left with its default value (None). """ def __init__(self, field, condition, **kwargs): BaseField.__init__(self, **kwargs) self.field = field.create_instance(self._parent) self.condition = condition def __repr__(self): if self.condition(self._parent): return repr(self.field) else: return "<ConditionalField: not included>" @property def bytes_required(self): if self.condition(self._parent): return self.field.bytes_required else: return 0 def pack(self, stream): if self.condition(self._parent): self.field.pack(stream) def unpack(self, data): if self.condition(self._parent): self.field.unpack(data) def getval(self): return self.field.getval() def setval(self, value): return self.field.setval(value) def associate_length_consumer(self, target_field): self.field.associate_length_consumer(target_field) def get_adjusted_length(self): return self.field.get_adjusted_length()
[docs]class Payload(BaseField): """Variable length raw (byte string) field This field is expected to be used with a LengthField. The variable length field provides a value to be used by the LengthField on message pack and vice-versa for unpack. :param length_provider: The LengthField with which this variable length payload is associated. If not included, it is assumed that the length_provider should consume the remainder of the bytes available in the string. This is only valid in cases where the developer knows that they will be dealing with a fixed sequence of bytes (already boxed). """ def __init__(self, length_provider=None, **kwargs): BaseField.__init__(self, **kwargs) if isinstance(length_provider, FieldPlaceholder): self.length_provider = self._ph2f(length_provider) self.length_provider.associate_length_consumer(self) else: self.length_provider = None @property def bytes_required(self): if self.length_provider is None: return None else: return self.length_provider.get_adjusted_length() def pack(self, stream): stream.write(self._value) def unpack(self, data): self._value = data # keep for backwards compatibility
VariableRawPayload = Payload class BaseVariableByteSequence(BaseField): def __init__(self, make_format, length_provider, **kwargs): BaseField.__init__(self, **kwargs) self.make_format = make_format if length_provider is not None: self.length_provider = self._ph2f(length_provider) self.length_provider.associate_length_consumer(self) @property def bytes_required(self): return self.length_provider.get_adjusted_length() def pack(self, stream): sfmt = self.make_format(len(self._value)) try: stream.write(struct.pack(sfmt, *self._value)) except struct.error as e: raise SuitcasePackStructException(e) def unpack(self, data): assert len(data) == self.bytes_required length = self.bytes_required sfmt = self.make_format(length) try: self._value = struct.unpack(sfmt, data) except struct.error as e: raise SuitcasePackStructException(e)
[docs]class DependentField(BaseField): """Field populated by container packet at lower level It is sometimes the case that information from another layer of a messaging protocol be needed at another higher-level of the protocol. The DependentField is a way of declaring that a message at some layer is dependent on a field with some name from the parent layer. For instance, let's suppose that my protocol had an option byte that I wanted to include in some logic handling packets at some higher layer. I could include that byte in my message as follows:: class MyDependentMessage(Structure): ll_options = DependentField('proto_options') data = UBInt8Sequence(16) class LowerLevelProtocol(Structure): type = DispatchField(UBInt16()) proto_options = UBInt8() body = DispatchTarget(None, type, { 0x00: MyDependentMessage, }) :param name: The name of the field from the parent message that we would like brought into our message namespace. """ def __init__(self, name, **kwargs): BaseField.__init__(self, **kwargs) self.bytes_required = 0 self.parent_field_name = name self.parent_field = None def _get_parent_field(self): if self.parent_field is None: message_parent = self._parent._parent target_field = message_parent.lookup_field_by_name( self.parent_field_name) self.parent_field = target_field return self.parent_field def __getattr__(self, attr): return getattr(self._get_parent_field(), attr) def pack(self, stream): pass def unpack(self, data): pass def getval(self): return self._get_parent_field().getval() def setval(self, value): return self._get_parent_field().setval(value)
[docs]class SubstructureField(BaseField): """Field which contains another non-greedy Structure. Often data-types are needed which cannot be easily described by a single field, but are representable as Structures. For example, Pascal style strings are prefixed with their length. It is often desirable to embed these data-types within another Structure, to avoid reimplementing them in every usage. A Pascal style string could be described as follows:: from suitcase.structure import Structure from suitcase.fields import Payload, UBInt16, LengthField, SubstructureField class PascalString16(Structure): length = LengthField(UBInt16()) value = Payload(length) A structure describing a name of a person might consist of two Pascal style strings. Instead of describing the ugly way:: class NameUgly(Structure): first_length = LengthField(UBInt16()) first_value = Payload(first_length) last_length = LengthField(UBInt16()) last_value = Payload(last_length) it could be defined using a SubstructureField:: class Name(Structure): first = SubstructureField(PascalString16) last = SubstructureField(PascalString16) """ def __init__(self, substructure, **kwargs): BaseField.__init__(self, **kwargs) self.substructure = substructure self._value = substructure() @property def bytes_required(self): # We return None but do not count as a greedy field to the packer return None def pack(self, stream): stream.write(self._value.pack()) def unpack(self, data, trailing): self._value = self.substructure() return self._value.unpack(data, trailing)
[docs]class FieldArray(BaseField): """Field which contains a list of some other field. In some protocols, there exist repeated substructures which are present in a variable number. The variable nature of these fields make a DispatchField and DispatchTarget combination unsuitable; instead a FieldArray may be used to pack/unpack these fields to/from a list. :param substructure: The type of the array element. Must not be a greedy field, or else the array will only ever have one element containing the entire contents of the array. :param length_provider: The field providing a length value binding this message (if any). Set this field to None to leave unconstrained. For example, one can imagine a message listing the zipcodes covered by a telephone area code. Depending on the population density, the number of zipcodes per area code could vary greatly. One implementation of this data structure could be:: from suitcase.structure import Structure from suitcase.fields import UBInt16, FieldArray class ZipcodeStructure(Structure): zipcode = UBInt16() class TelephoneZipcodes(Structure): areacode = UBInt16() zipcodes = FieldArray(ZipcodeStructure) # variable number of zipcodes """ def __init__(self, substructure, length_provider=None, **kwargs): BaseField.__init__(self, **kwargs) self.substructure = substructure self._value = list() if isinstance(length_provider, FieldPlaceholder): self.length_provider = self._ph2f(length_provider) self.length_provider.associate_length_consumer(self) else: self.length_provider = None @property def bytes_required(self): if self.length_provider is None: return None else: return self.length_provider.get_adjusted_length() def pack(self, stream): for structure in self._value: stream.write(structure.pack()) def unpack(self, data): while True: structure = self.substructure() data = structure.unpack(data, trailing=True).read() self._value.append(structure) if data == b"": break
[docs]class BaseFixedByteSequence(BaseField): """Base fixed-length byte sequence field""" def __init__(self, make_format, size, **kwargs): BaseField.__init__(self, **kwargs) self.bytes_required = size self.format = make_format(size) def pack(self, stream): try: stream.write(struct.pack(self.format, *self._value)) except struct.error as e: raise SuitcasePackStructException(e) def unpack(self, data): try: self._value = struct.unpack(self.format, data) except struct.error as e: raise SuitcasePackStructException(e)
def byte_sequence_factory_factory(make_format): def byte_sequence_factory(length_or_provider): if isinstance(length_or_provider, int): return BaseFixedByteSequence(make_format, length_or_provider) else: return BaseVariableByteSequence(make_format, length_or_provider) return byte_sequence_factory UBInt8Sequence = byte_sequence_factory_factory(lambda l: ">" + "B" * l) ULInt8Sequence = byte_sequence_factory_factory(lambda l: "<" + "B" * l) SBInt8Sequence = byte_sequence_factory_factory(lambda l: ">" + "b" * l) SLInt8Sequence = byte_sequence_factory_factory(lambda l: "<" + "b" * l)
[docs]class BaseStructField(BaseField): """Base for fields based very directly on python struct module formats It is expected that this class will be subclassed and customized by defining ``FORMAT`` at the class level. ``FORMAT`` is expected to be a format string that could be used with struct.pack/unpack. It should include endianness information. If the ``FORMAT`` includes multiple elements, the default ``_unpack`` logic assumes that each element is a single byte and will OR these together. To specialize this, _unpack should be overridden. """ def __init__(self, **kwargs): BaseField.__init__(self, **kwargs) self._value = None self._keep_bytes = getattr(self, "KEEP_BYTES", None) if self._keep_bytes is not None: self.bytes_required = self._keep_bytes else: self.bytes_required = struct.calcsize(self.PACK_FORMAT) def pack(self, stream): try: keep_bytes = getattr(self, 'KEEP_BYTES', None) if keep_bytes is not None: if self.PACK_FORMAT[0] == b">"[0]: # The element access makes this compatible with Python 2 and 3 to_write = struct.pack(self.PACK_FORMAT, self._value)[-keep_bytes:] else: to_write = struct.pack(self.PACK_FORMAT, self._value)[:keep_bytes] else: to_write = struct.pack(self.PACK_FORMAT, self._value) except struct.error as e: raise SuitcasePackStructException(e) stream.write(to_write) def unpack(self, data): value = 0 if self.UNPACK_FORMAT[0] == b">"[0]: # The element access makes this compatible with Python 2 and 3 for i, byte in enumerate(reversed(struct.unpack(self.UNPACK_FORMAT, data))): value |= (byte << (i * 8)) else: for i, byte in enumerate(struct.unpack(self.UNPACK_FORMAT, data)): value |= (byte << (i * 8)) self._value = value # ============================================================================== # Unsigned Big Endian # ==============================================================================
[docs]class UBInt8(BaseStructField): """Unsigned Big Endian 8-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b">B"
[docs]class UBInt16(BaseStructField): """Unsigned Big Endian 16-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b">H"
[docs]class UBInt24(BaseStructField): """Unsigned Big Endian 24-bit integer field""" KEEP_BYTES = 3 PACK_FORMAT = b">I" UNPACK_FORMAT = b">BBB"
[docs]class UBInt32(BaseStructField): """Unsigned Big Endian 32-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b">I"
[docs]class UBInt40(BaseStructField): """Unsigned Big Endian 40-bit integer field""" KEEP_BYTES = 5 PACK_FORMAT = b">Q" UNPACK_FORMAT = b">BBBBB"
[docs]class UBInt48(BaseStructField): """Unsigned Big Endian 48-bit integer field""" KEEP_BYTES = 6 PACK_FORMAT = b">Q" UNPACK_FORMAT = b">BBBBBB"
[docs]class UBInt56(BaseStructField): """Unsigned Big Endian 56-bit integer field""" KEEP_BYTES = 7 PACK_FORMAT = b">Q" UNPACK_FORMAT = b">BBBBBBB"
[docs]class UBInt64(BaseStructField): """Unsigned Big Endian 64-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b">Q" # ============================================================================== # Signed Big Endian # ==============================================================================
[docs]class SBInt8(BaseStructField): """Signed Big Endian 8-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b">b"
[docs]class SBInt16(BaseStructField): """Signed Big Endian 16-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b">h"
[docs]class SBInt24(BaseStructField): """Signed Big Endian 24-bit integer field""" KEEP_BYTES = 3 PACK_FORMAT = b">i" UNPACK_FORMAT = b">bBB"
[docs]class SBInt32(BaseStructField): """Signed Big Endian 32-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b">i"
[docs]class SBInt40(BaseStructField): """Signed Big Endian 40-bit integer field""" KEEP_BYTES = 5 PACK_FORMAT = b">q" UNPACK_FORMAT = b">bBBBB"
[docs]class SBInt48(BaseStructField): """Signed Big Endian 48-bit integer field""" KEEP_BYTES = 6 PACK_FORMAT = b">q" UNPACK_FORMAT = b">bBBBBB"
[docs]class SBInt56(BaseStructField): """Signed Big Endian 56-bit integer field""" KEEP_BYTES = 7 PACK_FORMAT = b">q" UNPACK_FORMAT = b">bBBBBBB"
[docs]class SBInt64(BaseStructField): """Signed Big Endian 64-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b">q" # ============================================================================== # Unsigned Little Endian # ==============================================================================
[docs]class ULInt8(BaseStructField): """Unsigned Little Endian 8-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b"<B"
[docs]class ULInt16(BaseStructField): """Unsigned Little Endian 16-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b"<H"
[docs]class ULInt24(BaseStructField): """Unsigned Little Endian 24-bit integer field""" KEEP_BYTES = 3 PACK_FORMAT = b"<I" UNPACK_FORMAT = b"<BBB"
[docs]class ULInt32(BaseStructField): """Unsigned Little Endian 32-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b"<I"
[docs]class ULInt40(BaseStructField): """Unsigned Little Endian 40-bit integer field""" KEEP_BYTES = 5 PACK_FORMAT = b"<Q" UNPACK_FORMAT = b"<BBBBB"
[docs]class ULInt48(BaseStructField): """Unsigned Little Endian 48-bit integer field""" KEEP_BYTES = 6 PACK_FORMAT = b"<Q" UNPACK_FORMAT = b"<BBBBBB"
[docs]class ULInt56(BaseStructField): """Unsigned Little Endian 56-bit integer field""" KEEP_BYTES = 7 PACK_FORMAT = b"<Q" UNPACK_FORMAT = b"<BBBBBBB"
[docs]class ULInt64(BaseStructField): """Unsigned Little Endian 64-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b"<Q" # ============================================================================== # Signed Little Endian # ==============================================================================
[docs]class SLInt8(BaseStructField): """Signed Little Endian 8-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b"<b"
[docs]class SLInt16(BaseStructField): """Signed Little Endian 16-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b"<h"
[docs]class SLInt24(BaseStructField): """Signed Little Endian 24-bit integer field""" KEEP_BYTES = 3 PACK_FORMAT = b"<i" UNPACK_FORMAT = b"<BBb"
[docs]class SLInt32(BaseStructField): """Signed Little Endian 32-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b"<i"
[docs]class SLInt40(BaseStructField): """Signed Little Endian 40-bit integer field""" KEEP_BYTES = 5 PACK_FORMAT = b"<q" UNPACK_FORMAT = b"<BBBBb"
[docs]class SLInt48(BaseStructField): """Signed Little Endian 48-bit integer field""" KEEP_BYTES = 6 PACK_FORMAT = b"<q" UNPACK_FORMAT = b"<BBBBBb"
[docs]class SLInt56(BaseStructField): """Signed Little Endian 56-bit integer field""" KEEP_BYTES = 7 PACK_FORMAT = b"<q" UNPACK_FORMAT = b"<BBBBBBb"
[docs]class SLInt64(BaseStructField): """Signed Little Endian 64-bit integer field""" PACK_FORMAT = UNPACK_FORMAT = b"<q" # ============================================================================== # BitField and Bits # ==============================================================================
def bitfield_placeholder_factory_factory(cls): def _factory_fn(*args, **kwargs): return cls(*args, **kwargs) return _factory_fn class _BitFieldFieldPlaceholder(object): _global_seqno = 0 def __init__(self, cls, args, kwargs): self.cls = cls self.args = args self.kwargs = kwargs self.sequence_number = _BitFieldFieldPlaceholder._global_seqno _BitFieldFieldPlaceholder._global_seqno += 1 def create_instance(self): kwargs = self.kwargs kwargs['instantiate'] = True return self.cls(*self.args, **self.kwargs) class _BitFieldField(object): def __init__(self, *args, **kwargs): pass def __repr__(self): return repr(self.viewget()) def __new__(cls, *args, **kwargs): if 'instantiate' in kwargs: return super(_BitFieldField, cls).__new__(cls) else: return _BitFieldFieldPlaceholder(cls, args, kwargs) class _BitBool(_BitFieldField): def __init__(self, **kwargs): _BitFieldField.__init__(self, **kwargs) self.size = 1 self._value = 0 def getval(self): return self._value def setval(self, value): self._value = value def viewget(self): return (self._value == 1) def viewset(self, value): if value: self._value = 1 else: self._value = 0 class _BitNum(_BitFieldField): def __init__(self, size, **kwargs): _BitFieldField.__init__(self, **kwargs) self.size = size self._value = 0 def getval(self): return self._value viewget = getval def setval(self, value): self._value = value viewset = setval BitNum = bitfield_placeholder_factory_factory(_BitNum) BitBool = bitfield_placeholder_factory_factory(_BitBool)
[docs]class BitField(BaseField): """Represent a sequence of bytes broken down into bit segments Bit segments may be any BitFieldField instance, with the two most commonly used fields being: * BitBool(): A single bit flag that is either True or False * BitNum(bits): A multi-bit field treated like a big-endian integral value. Example Usage:: class TCPFrameHeader(Structure): source_address = UBInt16() destination_address = UBInt16() sequence_number = UBInt32() acknowledgement_number = UBInt32() options = BitField(16, data_offset=BitNum(4), reserved=BitNum(3), NS=BitBool(), CWR=BitBool(), ECE=BitBool(), URG=BitBool(), ACK=BitBool(), PSH=BitBool(), RST=BitBool(), SYN=BitBool(), FIN=BitBool() ) window_size = UBInt16() checksum = UBInt16() urgent_pointer = UBInt16() tcp_packet = TCPFrameHeader() o = tcp_packet.options o.data_offset = 3 o.NS = True o.CWR = False o. # ... so on, so forth """ def __init__(self, number_bits, field=None, **kwargs): BaseField.__init__(self, **kwargs) self._ordered_bitfields = [] self._bitfield_map = {} if number_bits % 8 != 0: raise SuitcaseProgrammingError("Number of bits must be a factor of " "8, was %d" % number_bits) self.number_bits = number_bits self.number_bytes = number_bits // 8 self.bytes_required = self.number_bytes if field is None: field = { 1: UBInt8, 2: UBInt16, 3: UBInt24, 4: UBInt32, 5: UBInt40, 6: UBInt48, 7: UBInt56, 8: UBInt64, }[self.number_bytes]() self._field = field.create_instance(self._parent) placeholders = [] for key, value in six.iteritems(kwargs): if isinstance(value, _BitFieldFieldPlaceholder): placeholders.append((key, value)) for key, placeholder in sorted(placeholders, key=lambda kv: kv[1].sequence_number): value = placeholder.create_instance() self._bitfield_map[key] = value self._ordered_bitfields.append((key, value)) def __getattr__(self, key): if key in self.__dict__.get('_bitfield_map', {}): return self._bitfield_map[key].viewget() return object.__getattribute__(self, key) def __setattr__(self, key, value): if key in self.__dict__.get('_bitfield_map', {}): self._bitfield_map[key].viewset(value) else: self.__dict__[key] = value def __repr__(self): sio = StringIO() sio.write("BitField(\n") for key, field in self._ordered_bitfields: sio.write(" %s=%r,\n" % (key, field)) sio.write(" )") return sio.getvalue() def getval(self): return self def setval(self, value): raise SuitcaseProgrammingError("Setting the value of a bitfield " "directly is prohibited") def pack(self, stream): value = 0 shift = self.number_bits for _key, field in self._ordered_bitfields: shift -= field.size mask = (2 ** field.size - 1) # mask off size bits value |= ((field.getval() & mask) << shift) self._field.setval(value) sio = BytesIO() self._field.pack(sio) out = sio.getvalue()[-self.number_bytes:] stream.write(out) def unpack(self, data): self._field.unpack(data) value = self._field.getval() shift = self.number_bits for _key, field in self._ordered_bitfields: shift -= field.size mask = (2 ** field.size - 1) fval = (value >> shift) & mask field.setval(fval)