# Copyright (c) 2021 Phoenix Contact. All rights reserved.
# Licensed under the MIT. See LICENSE file in the project root for full license information.
import warnings
from copy import deepcopy
from PyPlcnextRsc.common.exceptions import InvalidOperationException, CommonRemotingClientException
from PyPlcnextRsc.common.tag_type import RscType
from PyPlcnextRsc.common.types import RscTpNamedTuple
__all__ = [
'RscStructBuilder',
'RscStructMeta',
'GetFieldFromInstance',
"RscStruct",
"isRscStructInstance",
"isRscStructType",
]
RscStruct = RscTpNamedTuple
"""
Represent the struct type with Rsc
.. note::
At current version ,this is equal to NamedTuple
Usage:
.. code:: python
class ST_Prototype0(RscStruct):
F0: IecAnnotation.INT
F1: IecAnnotation.INT
F2: IecAnnotation.BOOL
class ST_Prototype1(RscStruct):
F1: bool
F2: int
F3: RscSequence
class ST_Prototype2(RscStruct):
Field1: RscTpAnnotate[int, Marshal(rscType=RscType.Int16)]
Field2: bool
Field3: str
Field_another_struct: ST_Prototype1
Field5: RscSequence
"""
[docs]def isRscStructInstance(value):
"""
Test whether the given value is the instance of RscStruct
"""
return hasattr(value, "_field_defaults") and hasattr(value, "_fields") and type(value) != type
[docs]def isRscStructType(value):
"""
Test whether the given value is the prototype (or called 'type') of RscStruct
"""
return hasattr(value, "_field_defaults") and hasattr(value, "_fields") and type(value) == type
[docs]class RscStructBuilder:
"""
This is a wrapper around RscStruct , supporting filling fields step by step
Normally this is auto created by :py:class:`PyPlcnextRsc.tools.PlcDataTypeSchema`
.. note::
This class has overload the __eq__ method , so this builder instance is compareable to *RscStruct*
.. tip::
Since V0.1.4 :
'_friendlyMode':
A syntactic sugar for Array (as a field in Struct) operation has been added
'Struct.ArrField = [100, 200, 300]' in now allowed (the old way is 'Struct.ArrField[:] = [100, 200, 300]').
Usage:
.. code:: python
class ST_Prototype(RscStruct):
F0: IecAnnotation.INT
F1: IecAnnotation.INT
F2: IecAnnotation.BOOL
# Not possible to init fields' value separately:
# st = ST_Prototype()
# st.F0 = 0
# st.F1 = 100 # Illegal ! , because 'RscStruct' is tuple in fact
# st.F2 = False
# You can only instance it by the following way:
# st = ST_Prototype(0,100,False)
# use RscStructBuilder:
builder = RscStructBuilder(ST_Prototype)
builder(F0 = 0,F1 = 100,F2 = False) # by __call__
builder.F1 = 200 # by set attribute
builder.F2 = True
# not necessary for user to call _getRscStruct because it will auto invoke while sending to PLC
st = builder._getRscStruct()
:param prototype: RscStruct prototype
:type prototype: :py:class:`~PyPlcnextRsc.common.objects.rsc_struct.RscStruct`
:param defaults: dict with field name and it's default value if supplied, default it None.
:type defaults: dict
"""
def __init__(self, prototype, defaults=None):
self._proto = prototype
from PyPlcnextRsc.common.transport import DataTagContext
self._proto_context = DataTagContext.factory(prototype)
field_names = prototype._fields
value_map = deepcopy(prototype._field_defaults)
if defaults is None:
defaults = {}
for field, value in defaults.items():
if field in field_names:
value_map[field] = value
else:
warnings.warn("Have Unknow default value")
self._value_map = value_map
self._last_value_map = {}
self._last_result = None
self._call_param_checker = None
# self._repr_fmt = '(' + ', '.join(f'{name}=%r' for name in field_names) + ')'
self._friendlyMode = True # a switch to control syntactic sugar for Struct.ArrField[:] = [1,2,3] to Struct.ArrField = [1,2,3]
def __getattribute__(self, item):
if item.startswith("_"):
return super().__getattribute__(item)
else:
if item in self._proto._fields:
return self._value_map.get(item, None)
else:
raise AttributeError(f'{self._proto.__name__} object has no attribute {item}')
def __setattr__(self, key, value):
from PyPlcnextRsc.common.objects.rsc_sequence import RscList
if key.startswith("_"):
super().__setattr__(key, value)
else:
field_names = self._proto._fields
if key in field_names:
idx = field_names.index(key)
inner_val = self._value_map[key]
# the following statement is for : Struct.ArrField = [1,2,3]
# only a syntactic sugar , but maybe sometime it will not be desired, use 'self._friendlyMode' to switch.
if self._friendlyMode:
if isinstance(inner_val, RscList) and not isinstance(value, RscList):
inner_val._internal_setter(value)
elif isinstance(inner_val, RscStructBuilder) and not isinstance(value, RscStructBuilder):
inner_val._internal_setter(value)
else:
self._proto_context.subTag[idx].checkValueValid(value)
self._value_map[key] = value
else:
# if not in friendlyMode directly push value in map
self._proto_context.subTag[idx].checkValueValid(value)
self._value_map[key] = value
else:
raise AttributeError(f'{self._proto.__name__} object has no attribute {key}')
def _internal_setter(self, value):
if isinstance(value, (list, tuple)):
if len(self._proto._fields) != len(value):
raise ValueError("Must keep the same size if using list or tuple to fill struct")
self.__call__(*value)
elif isinstance(value, dict):
for field_key, val in value.items():
self.__setattr__(field_key, val)
def __call__(self, *args, **kwargs):
field_names = self._proto._fields
if not self._call_param_checker:
num_fields = len(field_names)
arg_list = ', '.join(field_names)
if num_fields == 1:
arg_list += ','
self._call_param_checker = eval(f"lambda {arg_list}:({arg_list})")
values = self._call_param_checker(*args, **kwargs)
for key, value in zip(field_names, values):
self.__setattr__(key, value)
# idx = field_names.index(key)
# self._proto_context.subTag[idx].checkValueValid(value)
# self._value_map[key] = value
return self
def __repr__(self):
return self._proto.__name__ + str(self._value_map)
def _getRscStruct(self):
def create_new():
proto = self._proto
value_map = self._value_map
if len(value_map) != len(proto._fields):
raise ValueError("Missing parameters")
return proto(**value_map)
if self._last_value_map == self._value_map:
if self._last_result is None:
self._last_result = create_new()
return self._last_result
else:
self._last_result = create_new()
self._last_value_map = deepcopy(self._value_map)
return self._last_result
def __eq__(self, other):
if isinstance(other, tuple):
try:
return self._getRscStruct() == other
except:
return False
else:
return super(RscStructBuilder, self).__eq__(other)
def GetFieldFromInstance(struct_like_instance):
if hasattr(struct_like_instance, "_getRscStruct"):
struct_like_instance = struct_like_instance._getRscStruct()
if isRscStructInstance(struct_like_instance):
# for RscStruct
for fieldName in struct_like_instance._fields:
if fieldName.startswith('_'):
continue
yield getattr(struct_like_instance, fieldName)
else:
# for other class
for fieldName, val in struct_like_instance.__dict__.items():
if fieldName.startswith('_'):
continue
yield val
def _GetAsRscStruct(fieldValuesFromRsc, ctx, strictMode=True):
tmp_ret = []
if strictMode:
if ctx.rsc_type != RscType.Struct:
raise InvalidOperationException("must pass in struct annotation")
if ctx.fieldCounts != len(fieldValuesFromRsc):
raise CommonRemotingClientException(f"struct field size not match ,should be {ctx.fieldCounts},but is {len(fieldValuesFromRsc)}")
for idx, field in enumerate(fieldValuesFromRsc):
t = ctx.subTag[idx].rsc_type
if field.GetType() != t and t != RscType.Null:
raise CommonRemotingClientException(
f"struct<{ctx.name}> field '{ctx.subTag[idx].name}' type mismatch ,excepted '{ctx.subTag[idx].rsc_type.name}' ,but is '{field.GetType().name}'")
for idx, field in enumerate(fieldValuesFromRsc):
currentFieldCtx = ctx.subTag[idx]
tp = currentFieldCtx.rsc_type
if tp == RscType.Null: # any
tmp_ret.append(field.GetValue())
elif tp == RscType.Struct:
tmp_ret.append(_GetAsRscStruct(field.GetValue(), currentFieldCtx, strictMode))
else:
tmp_ret.append(field.GetValue())
return ctx.annotation(*tmp_ret)