Source code for PyPlcnextRsc.tools.PlcDataTypeSchema

# Copyright (c) 2021 Phoenix Contact. All rights reserved.
# Licensed under the MIT. See LICENSE file in the project root for full license information.

import functools
import logging
from copy import deepcopy
from io import StringIO

__all__ = ['NewSchemaInstance', 'ReceiveAsSchemaInstance', 'DataTypeStore']

from typing import TextIO, Union

from PyPlcnextRsc import RscType, RscList, RscStruct, RscSequence, RscTpAnnotate, RscStructBuilder, RscStructMeta, RscVariant, IecType

log = logging.getLogger(__name__)


def _remove_comment(lines):
    ret = []
    is_in_block = False

    for line in lines:
        ret_line = []
        for _identifier in line:
            if "*/" in _identifier:
                if not is_in_block:
                    raise ValueError()
                _identifier = _identifier[_identifier.index("*/") + 2:]
                if _identifier:
                    ret_line.append(_identifier)
                is_in_block = False
                continue

            if is_in_block:
                continue

            if "/*" in _identifier:
                is_in_block = True
                _identifier = _identifier[:_identifier.index("/*")]
                if _identifier:
                    ret_line.append(_identifier)
                continue
            if "//" in _identifier:
                _identifier = _identifier[:_identifier.index("//")]
                if _identifier:
                    ret_line.append(_identifier)
                break

            if "#" in _identifier:  # deprecated in future to support default value
                _identifier = _identifier[:_identifier.index("#")]
                if _identifier:
                    ret_line.append(_identifier)
                break

            ret_line.append(_identifier)

        if ret_line:
            ret.append(ret_line)
    return ret


def _move_TYPE_Label(lines):
    for line in lines:
        for _identifier in line:
            if _identifier.upper() in ["TYPE", "END_TYPE"]:
                line.remove(_identifier)


def _splitSpecial(identifiers):
    Ids = identifiers
    NEED_SPLIT = {";", ":", "[", "]", "..", "=",
                  # "#", # for future
                  # ","# for future
                  }
    for symbol in NEED_SPLIT:
        tmp = []
        s_len = len(symbol)
        for identifier in Ids:

            def _doSplit(current):
                if symbol in current and len(current) > s_len:
                    left, right = current.split(symbol, 1)
                    if left:
                        tmp.append(left)
                    tmp.append(symbol)
                    if right:
                        _doSplit(right)
                else:
                    tmp.append(current)

            if symbol in identifier and len(identifier) > s_len:
                _doSplit(identifier)
            else:
                tmp.append(identifier)

        Ids = tmp

    identifiers.clear()
    identifiers.extend(Ids[:])


class _Cursor:
    def __init__(self, identifiers):
        self._identifiers = identifiers
        self._max_cursor = len(identifiers) - 1
        self._current_cursor = 0

    @property
    def max(self):
        return self._max_cursor

    @property
    def left(self):
        return self.max - self.cursor + 1

    @property
    def cursor(self):
        return self._current_cursor

    @cursor.setter
    def cursor(self, value):
        self._current_cursor = value

    def seek(self, offset, whence=1):
        if whence == 0:  # from beginning
            self.cursor = offset
        elif whence == 1:  # from current
            self.cursor += offset
        elif whence == 2:  # from ending
            if offset > 0:
                raise
            self.cursor += self._max_cursor + offset

    def __getitem__(self, length):
        seek = True
        if type(length) == tuple:
            length, seek = length
        c = self.cursor
        ret = self._identifiers[c:c + length]
        if seek:
            self.seek(length, 1)
        if length == 1:
            return ret[0]
        else:
            return ret

    def distanceToNext(self, character):
        try:
            _i = self._identifiers.index(character, self.cursor)
            return _i - self.cursor + 1
        except ValueError as e:
            raise e

    def getUntilNext(self, character, seek=False, seekLast=False):
        ret = []
        dis = self.distanceToNext(character)
        if dis == 2:
            ret.append(self[1, seek])
        else:
            ret.extend(self[dis - 1, seek])
        if seekLast:
            self.seek(1, 1)
        return ret

    def consumeCharacter(self, character):
        if self[1, False].upper() != character.upper():
            raise
        else:
            self.seek(1, 1)

    def consumeOptional(self, character):
        if self.left == 0:
            return
        if self[1, False].upper() != character.upper():
            pass
        else:
            self.seek(1, 1)


def _getPrimitiveRscTypeFromHint(family):
    upper = family.upper()
    if hasattr(IecType, upper):
        return getattr(IecType, upper)


class _Schema:
    @classmethod
    def factory(cls, cursor, hints, not_add_hint=False):
        if cursor[2, False][-1] != ":":
            type_name = "#Anonymous"
        else:
            type_name = cursor[1]
            cursor.consumeCharacter(":")

        type_family = cursor[1]
        if type_family.upper() == "ARRAY":
            ins = _ArraySchema.factory(cursor, hints, not_add_hint)
            ins.name = type_name
            ins.family = "ARRAY"
        elif type_family.upper() == "STRUCT":
            ins = _StructSchema.factory(cursor, hints, not_add_hint)
            ins.name = type_name
            ins.family = "STRUCT"
        else:
            ins = cls()
            ins.name = type_name
            ins.family = type_family
            primitive = _getPrimitiveRscTypeFromHint(type_family)
            if primitive:
                ins.rsc_Type = primitive
            else:
                _h = hints.get(type_family, None)
                if _h:
                    final = deepcopy(_h)
                    final.name = ins.name
                    ins = final
                else:
                    raise ValueError("Unknow type of " + type_family)
        cursor.consumeOptional(";")
        if type_name != "#Anonymous" and not not_add_hint:
            hints[type_name] = ins
        return ins

    def __init__(self):
        self.name = ""
        self.family = ""

        self.rsc_Type = RscType.Null

    def __repr__(self):
        return f"PRIM<{self.family}>({self.name})"


class _StructSchema(_Schema):
    @classmethod
    def factory(cls, cursor: _Cursor, hints, not_add_hint=False):
        fields = []
        while cursor[1, False].upper() != "END_STRUCT":
            fields.append(_Schema.factory(cursor, hints, not_add_hint=True))
        cursor.consumeCharacter("END_STRUCT")
        ins = cls()
        ins.fields = fields
        ins.rsc_Type = RscType.Struct
        return ins

    def __init__(self):
        super().__init__()
        self.fields = None

    def __repr__(self):
        s = "STRUCT<"
        for f in self.fields:
            s += f.__repr__()
        s += '>'
        return s


class _ArraySchema(_Schema):
    @classmethod
    def factory(cls, cursor: _Cursor, hints, not_add_hint=False):
        ins = cls()
        ins.rsc_Type = RscType.Array
        cursor.getUntilNext("[", seekLast=True, seek=True)
        ins.bound_lower = int(cursor[1])
        cursor.consumeCharacter("..")
        ins.bound_upper = int(cursor[1])
        cursor.consumeCharacter("]")
        cursor.consumeCharacter("OF")
        _type = cursor.getUntilNext(";", seekLast=False, seek=False)
        if len(_type) == 1:
            primitive = _getPrimitiveRscTypeFromHint(_type[0])
            if primitive:
                ins.element_rsc_type = primitive
            else:
                next_dimension = hints.get(_type[0], None)
                if not next_dimension:
                    raise ValueError("Unknow type of " + _type[0])
                ins.next_dimension_or_struct = next_dimension
                if isinstance(next_dimension, _ArraySchema):
                    ins.element_rsc_type = RscType.Array
                elif isinstance(next_dimension, _StructSchema):
                    ins.element_rsc_type = RscType.Struct
            cursor.seek(1, 1)
        else:
            next_dimension = _Schema.factory(cursor, hints, not_add_hint)
            ins.next_dimension_or_struct = next_dimension
            ins.element_rsc_type = RscType.Array
        return ins

    def __init__(self):
        super().__init__()
        self.bound_upper = 0
        self.bound_lower = 0

        self.next_dimension_or_struct = None
        self.element_rsc_type = RscType.Null

    @property
    def size(self):
        return self.bound_upper - self.bound_lower + 1

    def __repr__(self):
        return f"ARRAY<{self.element_rsc_type.name}>[{self.bound_lower}..{self.bound_upper}]"


_RSC_DEFAULT_VALUES = {
    RscType.Int64: 0,
    RscType.Int32: 0,
    RscType.Int16: 0,
    RscType.Int8: 0,
    RscType.Uint64: 0,
    RscType.Uint32: 0,
    RscType.Uint16: 0,
    RscType.Uint8: 0,
    RscType.Bool: False,
    RscType.Real64: 0.0,
    RscType.Real32: 0.0,
    RscType.Utf8String: "",
    RscType.IecTime: 0,
    RscType.IecTime64: 0,
    RscType.IecDate64: 0,
    RscType.IecDateTime64: 0,
    RscType.IecTimeOfDay64: 0
}


def _make_array_for_write(schema) -> RscList:
    ret = RscList()
    if schema.bound_lower != 0:
        ret.setOffset(schema.bound_lower)
    size = schema.size
    ret.setDesireLength(size)
    element_type = schema.element_rsc_type
    if element_type == RscType.Array:
        next_dimension_schema = schema.next_dimension_or_struct
        ret.setNextDimension(_make_array_for_write(next_dimension_schema), size)
    elif element_type == RscType.Struct:
        ret.setElementAnnotate(_getRscStructTypeFromSchema(schema.next_dimension_or_struct), size, _make_struct_for_write(schema.next_dimension_or_struct))
    else:
        ret.setElementRscType(element_type, size, _RSC_DEFAULT_VALUES[element_type])

    return ret


@functools.lru_cache(maxsize=None, typed=True)
def _getRscStructTypeFromSchema(schema) -> RscStruct:
    if log.isEnabledFor(logging.DEBUG):
        log.debug(f"make RscStructType for schema({schema.name})")
    param = {}
    for field in schema.fields:
        if field.rsc_Type == RscType.Array:
            param[field.name] = RscSequence
        elif field.rsc_Type == RscType.Struct:
            param[field.name] = _getRscStructTypeFromSchema(field)
        else:
            _default = _RSC_DEFAULT_VALUES[field.rsc_Type]
            param[field.name] = RscTpAnnotate[type(_default), field.rsc_Type]
    return RscStruct(schema.name, **param)


def _make_struct_for_write(schema) -> RscStructBuilder:
    tp = _getRscStructTypeFromSchema(schema)
    field_defaults = {}
    for field in schema.fields:
        if field.rsc_Type == RscType.Array:
            field_defaults[field.name] = _make_array_for_write(field)
        elif field.rsc_Type == RscType.Struct:
            field_defaults[field.name] = _make_struct_for_write(field)
        else:
            field_defaults[field.name] = _RSC_DEFAULT_VALUES[field.rsc_Type]
    return RscStructBuilder(tp, field_defaults)


def _receiveArray(schema: _ArraySchema, value, builderMode=False):
    assert isinstance(value, RscSequence)
    should_element_size = schema.size
    should_element_rsc_type = schema.element_rsc_type
    fact_element_size = len(value)  # don't care the syntax warning
    fact_element_rsc_type = value.getElementRscType()
    if should_element_rsc_type != fact_element_rsc_type:
        raise ValueError(f"Expect type {should_element_rsc_type.name} in Array({schema.name}) but is {fact_element_rsc_type.name}")
    if should_element_size != fact_element_size:
        raise ValueError(f"Expect element count is {should_element_size} in Array({schema.name}) but is {fact_element_size}")

    ret = RscList()
    if schema.bound_lower != 0:
        ret.setOffset(schema.bound_lower)
    if should_element_rsc_type == RscType.Array:
        inner = None
        for current in value:  # don't care the syntax warning
            inner = _receiveArray(schema.next_dimension_or_struct, current)
            ret.append(inner)
        ret.setNextDimension(inner)
    elif should_element_rsc_type == RscType.Struct:
        inner = None
        for current in value:  # don't care the syntax warning
            inner = _receiveStruct(schema.next_dimension_or_struct, current, builderMode=builderMode)
            ret.append(inner)
        ret.setElementAnnotate(type(inner))
    else:
        # direct copy if rsc_type is primitive
        ret.extend(value[:])
        ret.setElementRscType(should_element_rsc_type)
    return ret


def _receiveStruct(schema: _StructSchema, value, builderMode=False):
    assert isinstance(value, RscStructMeta)
    should_fields_entries = schema.fields
    should_field_count = len(should_fields_entries)
    fact_field_count = len(value)
    if should_field_count != fact_field_count:
        raise ValueError(f"Expect field count {should_field_count} in Struct({schema.name}) but is {fact_field_count}")
    tp = _getRscStructTypeFromSchema(schema)
    _values = {}
    for idx in range(should_field_count):
        current_schema = should_fields_entries[idx]
        current_value = value[idx]
        if isinstance(current_schema, _ArraySchema):
            if current_value.GetType() != RscType.Array:
                raise ValueError(f"Expect field {current_schema.name} in Struct({schema.name}) is Array but is {current_value.GetType()}")
            _values[current_schema.name] = _receiveArray(current_schema, current_value.GetValue())
        elif isinstance(current_schema, _StructSchema):
            if current_value.GetType() != RscType.Struct:
                raise ValueError(f"Expect field {current_schema.name} in Struct({schema.name}) is Struct but is {current_value.GetType()}")
            _values[current_schema.name] = _receiveStruct(current_schema, current_value.GetValue(), builderMode=builderMode)
        else:
            if current_schema.rsc_Type != current_value.GetType():
                raise ValueError(f"Expect field {current_schema.name} in Struct({schema.name}) is {current_schema.rsc_Type.name} but is {current_value.GetType()}")
            _values[current_schema.name] = current_value.GetValue()
    if builderMode:
        return RscStructBuilder(tp, _values)
    else:
        return tp(**_values)  # don't care the syntax warning


####################################################################################
####################################################################################


[docs]def NewSchemaInstance(schema): """ Construct a new data type instance defined by schema this instance is used for Write complex data to PLCnext .. note:: In the created instance , all values have been set to its' default value, means that if user don't change the element (field) in it, it can also send to PLCnext successfully , but all elements (or fields) are *0* , *0.0* , *False* or *""(empty str)* :param schema: type schema, get from :py:func:`PyPlcnextRsc.tools.PlcDataTypeSchema.DataTypeStore.__getitem__` :return: a new data type instance for user to fill elements or fields """ if isinstance(schema, _ArraySchema): return _make_array_for_write(schema) elif isinstance(schema, _StructSchema): return _make_struct_for_write(schema)
[docs]def ReceiveAsSchemaInstance(schema, variant: RscVariant, builderMode: bool = False): """ Receive the certain value from :py:class:`~PyPlcnextRsc.common.objects.rsc_variant.RscVariant` as data_type defined in schema :param schema: type schema :param variant: from IDataAccessService or ISubscriptionService or some other service , which represent the meta-value of the data_type defined in schema :type variant: :py:class:`~PyPlcnextRsc.common.objects.rsc_variant.RscVariant` :param builderMode: if set true, all the struct received will be the :py:class:`~PyPlcnextRsc.common.objects.rsc_struct.RscStructBuilder`, so it is possible for user to change the fields' value and send back to device directly. :type builderMode: bool :return: final value , which has the same element type or fields defined in schema """ if isinstance(schema, _ArraySchema): if isinstance(variant, RscVariant): if variant.GetType() != RscType.Array: raise ValueError(f"Type Mismatch! Expect Array({schema.name}) but is {variant.GetType().name}") _val = variant.GetValue() return _receiveArray(schema, _val, builderMode=builderMode) elif isinstance(variant, RscSequence): return _receiveArray(schema, variant, builderMode=builderMode) else: raise ValueError(f"Unknow Value type:{type(variant)}") elif isinstance(schema, _StructSchema): if isinstance(variant, RscVariant): if variant.GetType() != RscType.Struct: raise ValueError(f"Type Mismatch! Expect Struct({schema.name}) but is {variant.GetType().name}") _val = variant.GetValue() return _receiveStruct(schema, _val, builderMode=builderMode) elif isinstance(variant, RscStructMeta): return _receiveStruct(schema, variant, builderMode=builderMode) else: raise ValueError(f"Unknow Value type:{type(variant)}")
[docs]class DataTypeStore: """ This is a helper function for python user to create the equivalent variable model to *IEC61131*, it is the most convenient way to construct a complex value such as **Array** and **Struct** for sending or receiving. Basic usage: .. code:: python from PyPlcnextRsc import Device, RscVariant, GUISupplierExample from PyPlcnextRsc.Arp.Plc.Gds.Services import IDataAccessService, WriteItem from PyPlcnextRsc.tools import DataTypeStore if __name__ == "__main__": TypeStore = DataTypeStore.fromString( ''' TYPE DemoStruct : STRUCT Field1 : INT; Field2 : BOOL; END_STRUCT DemoArray : ARRAY[0..10] OF INT; END_TYPE ''') with Device('192.168.1.10', secureInfoSupplier=GUISupplierExample) as device: # create DemoStruct demo1 = TypeStore.NewSchemaInstance("DemoStruct") demo1.Field1 = 123 demo1.Field2 = True # create DemoArray demo2 = TypeStore.NewSchemaInstance("DemoArray") demo2[:] = [i * 2 for i in range(11)] # get raw data access service data_access_service = IDataAccessService(device) # Write demo1 to PLCnext data_access_service.Write((WriteItem("Arp.Plc.Eclr/demo1", RscVariant.of(demo1)),)) # Read demo1 read_item = data_access_service.Read(("Arp.Plc.Eclr/demo1",))[0] rcv_demo1 = TypeStore.ReceiveAsSchemaInstance("DemoStruct", read_item.Value) print(rcv_demo1) # --------------- data_access_service.Write((WriteItem("Arp.Plc.Eclr/demo2", RscVariant.of(demo2)),)) read_item = data_access_service.Read(("Arp.Plc.Eclr/demo2",))[0] rcv_demo2 = TypeStore.ReceiveAsSchemaInstance("DemoArray", read_item.Value) print(rcv_demo2) """
[docs] @classmethod def fromFile(cls, file_or_filename: Union[str, TextIO]): """ Create the DataTypeStore using local file. :param file_or_filename: file object or the path of the local file. :type file_or_filename: TextIO or str """ try: file_contents = file_or_filename.read() return cls.fromString(file_contents) except AttributeError: with open(file_or_filename, 'r') as f: ins = cls(f) return ins
[docs] @classmethod def fromString(cls, string: str): """ Using DataType string to create the DataTypeStore :param string: the DataType code. :type string: str """ return cls(StringIO(string))
def __init__(self, SIO): lines = list(filter(lambda _l: len(_l) != 0, [_l.split() for _l in SIO.readlines()])) lines = _remove_comment(lines) _move_TYPE_Label(lines) identifiers = [] for _l in lines: identifiers.extend(_l) _splitSpecial(identifiers) # print(identifiers) cursor = _Cursor(identifiers) hints = dict() if cursor.max < 0: ... # print("EMPTY") while cursor.left > 0: _Schema.factory(cursor, hints) self._schemas = hints
[docs] def __getitem__(self, item) -> any: """ Get type schema :param item: type named defined in DataTypeStore :type item: str :return: type schema """ return self._getSchema(item)
def _getSchema(self, item): _t = self._schemas.get(item, None) if not _t: raise ValueError("Unknow Type of " + item) return _t
[docs] def NewSchemaInstance(self, data_type_name: str): """ Construct a new data type instance defined by schema this instance is used for Write complex data to PLCnext .. note:: In the created instance , all values have been set to its' default value, means that if user don't change the element (field) in it, it can also send to PLCnext successfully ,but all elements (or fields) are *0*,*0.0*,*False* or *""(empty str)* :param data_type_name: type named defined in DataTypeStore :type data_type_name: str :return: a new data type instance for user to fill elements or fields """ schema = self._getSchema(data_type_name) return NewSchemaInstance(schema)
[docs] def ReceiveAsSchemaInstance(self, data_type_name: str, variant: RscVariant, builderMode: bool = False): """ Receive the certain value from PLCnext as data_type defined in schema :param data_type_name: type named defined in DataTypeStore :type data_type_name: str :param variant: from IDataAccessService or ISubscriptionService or some other service , which represent the meta-value of the data_type defined in schema :type variant: :py:class:`~PyPlcnextRsc.common.objects.rsc_variant.RscVariant` :param builderMode: if set true, all the struct received will be the :py:class:`~PyPlcnextRsc.common.objects.rsc_struct.RscStructBuilder`, so it is possible for user to change the fields' value and send back to device directly. :type builderMode: bool :return: final value , which has the same element type or fields defined in schema """ schema = self._schemas.get(data_type_name, None) return ReceiveAsSchemaInstance(schema, variant, builderMode)