# Copyright (c) 2021 Phoenix Contact. All rights reserved.
# Licensed under the MIT. See LICENSE file in the project root for full license information.
from collections.abc import Iterable, Sized
from copy import deepcopy
from PyPlcnextRsc.common.exceptions import InvalidOperationException
from PyPlcnextRsc.common.tag_type import RscType
__all__ = [
"RscSequence",
"RscList",
"RscTuple",
]
[docs]class RscSequence:
"""
COMMON ABSTRACT CLASS FOR :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList` & :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscTuple`
ALSO USED IN USER STRUCT DEF
"""
[docs] def setElementRscType(self, rscType: RscType):
"""
Configure the sequence by passing in primitive type (:py:class:`~PyPlcnextRsc.common.tag_type.RscType`)
.. warning::
This method only support primitive types,
If next-dimension is *Array*, use :py:func:`~PyPlcnextRsc.common.objects.rsc_sequence.RscSequence.setNextDimension` instead.
else if element is *Struct*, use :py:func:`~PyPlcnextRsc.common.objects.rsc_sequence.RscSequence.setElementAnnotate` instead.
:param rscType: the element type
:type rscType: :py:class:`~PyPlcnextRsc.common.tag_type.RscType`
:return: self
"""
if rscType == RscType.Array:
raise InvalidOperationException("use setNextDimension instead")
if rscType == RscType.Struct:
raise InvalidOperationException("use setElementAnnotate instead")
from PyPlcnextRsc.common.transport.rsc_datatag_ctx import DataTagContext
ctx = DataTagContext(rscType=rscType)
self.setElementContext(ctx)
return self
[docs] def setElementAnnotate(self, annotate):
"""
Configure by annotate
Used if element type is Primitive type or :py:class:`~PyPlcnextRsc.common.objects.rsc_struct.RscStruct`
:param annotate: field type annotation ,
such as 'RscTpAnnotate[int, Marshal(rscType=RscType.Int16)]' or defined :py:class:`~PyPlcnextRsc.common.objects.rsc_struct.RscStruct`
:return: self
"""
from PyPlcnextRsc.common.objects.rsc_struct import RscStructBuilder
from PyPlcnextRsc.common.transport.rsc_datatag_ctx import DataTagContext
if isinstance(annotate, RscStructBuilder):
annotate = annotate._proto
ctx = DataTagContext.factory(annotate)
self.setElementContext(ctx)
return self
[docs] def setNextDimension(self, nextDimension):
"""
Configure by next :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscSequence` or :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList`
.. warning::
Only used for define a multi-dimension array.
:param nextDimension: next configured :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscSequence` or :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList`
:type nextDimension: :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscSequence` or factory of :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList`
:return: self
"""
if id(nextDimension) == id(self):
raise InvalidOperationException("can not add self as next dimension")
if isinstance(nextDimension, RscSequence):
setattr(self, '_next_dimension', nextDimension)
# factory
elif hasattr(nextDimension, 'getSingleton'):
setattr(self, '_next_dimension', nextDimension.getSingleton())
else:
raise InvalidOperationException("should pass in 'RscSequence' instance or 'RscList.factory' instance")
return self
# ----------------------------------------------------------------
[docs] def getElementRscType(self) -> RscType:
"""
Get the :py:class:`~PyPlcnextRsc.common.tag_type.RscType` of the element.
:return: the :py:class:`~PyPlcnextRsc.common.tag_type.RscType` of the element.
:rtype: RscType
"""
# 注意这个不能用于向RSC发送(array标签不完整)
if hasattr(self, '_element_context'):
return self._element_context.rsc_type
if hasattr(self, '_next_dimension'):
return RscType.Array
return RscType.Null
[docs] def setElementContext(self, context):
"""
Set the DataTagContext of the element, this is used internally.
:param context: DataTagContext of the element
"""
# also called when reading array from PLC
setattr(self, '_element_context', context)
return self
[docs] def getElementContext(self):
"""
Get the DataTagContext of the element,, this is used internally.
:return: DataTagContext
"""
# used when writing array to PLC
ret = getattr(self, '_element_context', None)
if ret is None:
_nd = getattr(self, '_next_dimension', None)
if _nd is None:
raise InvalidOperationException("missing array information")
from PyPlcnextRsc.common.transport import DataTagContext
ret = DataTagContext(rscType=RscType.Array)
ret.subTag.append(_nd.getElementContext())
self.setElementContext(ret)
return ret
[docs] def setDesireLength(self, length):
"""
if the desire length is set, the framework will check the length before sending to device.
this will auto set by :py:class:`PyPlcnextRsc.tools.PlcDataTypeSchema` internally.
:param length: the desired length of this sequence.
:type length: int
"""
self._desire_length = length
[docs] def getDesireLength(self):
"""
Get the desired length internal. if not set return -1
:return: the desired length , -1 if not set
:rtype: int
"""
if hasattr(self, '_desire_length'):
return self._desire_length
else:
return -1
[docs]class RscList(list, RscSequence):
"""
Use List to represent an array for transferring with device.
.. tip::
Since V0.1.4 :
'_friendlyMode' :
if an element is struct type in this array , use this method is quick !
.. code:: python
MyArrayOfStruct[1] = {"Field1": 200, "Field2": False} # or (200,False) or [200,False]
MyArrayOfStruct[2] = (300, True)
and this can override all above values (this example showed '[:]' you can
use other slice which you want such as '[0:8]' '[1:-1]'... )
.. code:: python
MyArrayOfStruct[:] = [{"Field1": 200, "Field2": False}, (200, True), (300, True)]
"""
class _factory:
FACTORIES = {}
@classmethod
def get(cls, funcName, *args):
key = (funcName, args)
if key in cls.FACTORIES:
return cls.FACTORIES[key]
else:
ret = cls(funcName, *args)
cls.FACTORIES[key] = ret
return ret
def __init__(self, funcName, *args):
self._funcName = funcName
self._arguments = args
self._singleton = RscList()
if hasattr(self._singleton, funcName):
getattr(self._singleton, funcName)(*args)
else:
raise InvalidOperationException("wrong funcName :" + self._funcName)
def create(self, *args, **kwargs):
ret = RscList(*args, **kwargs)
return getattr(ret, self._funcName)(*self._arguments)
def getSingleton(self):
return self._singleton
[docs] @classmethod
def factory(cls, funcName, *args):
"""
Make a *RscList* factory, this is only a helper function to create :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList` of same structure.
Usage:
.. code:: python
middle_layer_factory = RscList.factory('createNextDimension', RscType.Int16, 3)
middle1 = middle_layer_factory.create()
middle1[0].extend((1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 11))
middle1[1].extend((1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 11))
middle1[2].extend((1, 3, 2, 1, 1, 1, 1, 1, 1, 1, 11))
middle2 = middle_layer_factory.create()
middle2[0].extend((2, 1, 0, 1, 1, 1, 1, 1, 1, 1, 11))
middle2[1].extend((2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 11))
middle2[2].extend((2, 3, 2, 1, 1, 1, 1, 1, 1, 1, 11))
outer = RscList((middle1, middle2)).setNextDimension(middle_layer_factory)
:param funcName: the exist function name for configure the :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList`.
:type funcName: str
:param args: the function arguments corresponding to the 'funcName'.
:return: return a factory instance , which has a **create()** method to call for construct a new :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList`
by the provided parameters.
"""
return cls._factory.get(funcName, *args)
[docs] def setOffset(self, offset: int):
"""
Set the start offset of this array object , because in *IEC61131*, an array can be defined with lower bound not equal to *0*
.. warning::
Although lower bound not equal to *0* is acceptable by this method, but still not recommend to use that kind of array definition! because it can be confused.
and slice function of python can not work normally if use negative index.
:param offset: lower bound of the array
:type offset: int
"""
self._offset = offset
def _useFriendlyMode(self):
return getattr(self, "_friendlyMode", True) is True
def __getitem__(self, item):
return super().__getitem__(self.__applyOffset(item))
def __setitem__(self, key, gived_value):
offset = self.__applyOffset(key)
_friendly = self._useFriendlyMode()
if type(offset) == slice:
# this part handle 'RscListInstance[:] = [[1,2],[1,2],[1,2]]',transmitting the operation to element RscStructBuilder's _internal_setter if condition fits
if _friendly and isinstance(gived_value, (Iterable, Sized)):
vals = super().__getitem__(offset)
if len(vals) != len(gived_value):
raise ValueError(f"Expect {len(vals)} element{'s' if len(vals) > 1 else ''} in array, but supplied {len(gived_value)} !")
if self.getElementRscType() == RscType.Struct:
for selfVal, gived_val in zip(vals, gived_value):
assert hasattr(selfVal, '_getRscStruct')
if not hasattr(gived_value, "_getRscStruct"):
selfVal._internal_setter(gived_val)
else:
# the value index in RawList (because use slice we can't ensure the order, so here use self.index()
idx = self.index(selfVal)
super().__setitem__(idx, gived_val)
# elif self.getElementRscType() == RscType.Array:
# ...
else:
return super().__setitem__(offset, gived_value)
else:
return super().__setitem__(offset, gived_value)
elif _friendly:
# this part handle 'RscListInstance[X] = [1,2]',
# transmitting the operation to element(RscStructBuilder or RscList)'s _internal_setter if condition fits
selfVal = super().__getitem__(offset)
if self.getElementRscType() == RscType.Struct and hasattr(selfVal, '_getRscStruct') and not hasattr(gived_value, "_getRscStruct"):
selfVal._internal_setter(gived_value)
elif self.getElementRscType() == RscType.Array and isinstance(selfVal, RscList) and not isinstance(gived_value, RscList):
selfVal._internal_setter(gived_value)
else:
return super().__setitem__(offset, gived_value)
else:
return super().__setitem__(offset, gived_value)
def _internal_setter(self, value):
"""fill this array at once using tuple or list or other array-like object"""
if not isinstance(value, (Iterable, Sized)):
raise ValueError("Must use Iterable and Sized type value (such as tuple, list) to fill the array")
if len(self) != len(value):
raise ValueError(f"Expect {len(self)} element{'s' if len(self) > 1 else ''} in array, but is {len(value)} !")
self[:] = value # override the content
def __applyOffset(self, item):
# deal with situation if defined like ARRAY[-10..10] or ARRAY[1..10]
if hasattr(self, "_offset") and self._offset != 0:
_offset = self._offset
_t = type(item)
if _t == int:
item = item - _offset
elif _t == slice:
start = item.start - _offset
stop = item.stop - _offset
item = slice(start, stop, item.step)
return item
def __repr__(self):
return f"RscList<{self.getElementRscType().name}>" + super().__repr__()
[docs] def createNextDimension(self, elementRscType: RscType, count: int, reserve: int = 0, default: any = None):
"""
Helper function to create next dimension ,only support for primitive python type as next dimension's element type!
:param elementRscType: :py:class:`~PyPlcnextRsc.common.tag_type.RscType` of the element
:type elementRscType: :py:class:`~PyPlcnextRsc.common.tag_type.RscType`
:param count: the count of element in current dimension to reserve
:type count: int
:param reserve: the count of element in next dimension to reserve
:type count: int
:param default: the value to fill in next dimension
:return: self
"""
self.setNextDimension(RscList().setElementRscType(elementRscType, reserve, default), count)
# self.clear()
# for i in range(count):
# inner = RscList().setElementRscType(elementRscType, reserve, default)
# self.append(inner)
return self
[docs] def reserve(self, reserve: int = 0, default: any = None, use_deepcopy: bool = False):
"""
Use the provided value(default) to fill the array
:param reserve: element count to reserve
:type reserve: int
:param default: the value to fill
:param use_deepcopy: true if use deepcopy to fill the element,default is False
:type use_deepcopy: bool
"""
if reserve > 0:
left = reserve - len(self)
if left > 0:
if use_deepcopy:
for _ in range(left):
self.append(deepcopy(default))
else:
self.extend([default] * left)
[docs] def setElementRscType(self, rscType: RscType, reserve: int = 0, default: any = None):
"""
Overload the :py:func:`~PyPlcnextRsc.common.objects.rsc_sequence.RscSequence.setElementRscType`, add the :py:func:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList.reserve` method.
:param rscType: the element type
:type rscType: :py:class:`~PyPlcnextRsc.common.tag_type.RscType`
:param reserve: element count to reserve
:type reserve: int
:param default: the value to fill
"""
self.reserve(reserve, default)
super().setElementRscType(rscType)
return self
[docs] def setElementAnnotate(self, annotate, reserve: int = 0, default: any = None):
"""
Overload the :py:func:`~PyPlcnextRsc.common.objects.rsc_sequence.RscSequence.setElementAnnotate`, add the :py:func:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList.reserve` method.
:param annotate: field type annotation ,
such as 'RscTpAnnotate[int, Marshal(rscType=RscType.Int16)]' or defined :py:class:`~PyPlcnextRsc.common.objects.rsc_struct.RscStruct`
:param reserve: element count to reserve
:type reserve: int
:param default: the value to fill
"""
self.reserve(reserve, default, True)
super().setElementAnnotate(annotate)
return self
[docs] def setNextDimension(self, nextDimension, count: int = None):
"""
Overload the :py:func:`~PyPlcnextRsc.common.objects.rsc_sequence.RscSequence.setNextDimension`, add support for reserve.
:param nextDimension: next configured :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscSequence` or :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList`
:type nextDimension: :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscSequence` or factory of :py:class:`~PyPlcnextRsc.common.objects.rsc_sequence.RscList`
:param count: element count to reserve
:type count: int
:return: self
"""
super().setNextDimension(nextDimension)
if count:
self.clear()
# is factory
if hasattr(nextDimension, 'getSingleton'):
self.reserve(count, nextDimension.create(), True)
else:
self.reserve(count, nextDimension, True)
return self
[docs] def __eq__(self, other):
"""
Overload the list.__eq__, now must the other element's :py:class:`~PyPlcnextRsc.common.tag_type.RscType`
same with self's element type is possible to return True
:param other: other instance to compare
:return: true if 'other' is same with self
"""
if super(RscList, self).__eq__(other):
try:
return other.getElementRscType() == self.getElementRscType()
except:
pass
return False
[docs]class RscTuple(tuple, RscSequence):
"""
Use Tuple to represent an array for transferring with device
"""
def __repr__(self):
return f"RscTuple<{self.getElementRscType().name}>" + super().__repr__()
[docs] def setOffset(self, offset):
"""
see doc from :py:func:`PyPlcnextRsc.common.objects.rsc_sequence.RscList.setOffset`
"""
self._offset = offset
def __getitem__(self, item):
return super().__getitem__(self.__applyOffset(item))
def __applyOffset(self, item):
if hasattr(self, "_offset") and self._offset != 0:
_offset = self._offset
_t = type(item)
if _t == int:
item = item - _offset
elif _t == slice:
start = item.start - _offset
stop = item.stop - _offset
item = slice(start, stop, item.step)
return item
[docs] def __eq__(self, other):
"""
Overload the tuple.__eq__, now must the other element's :py:class:`~PyPlcnextRsc.common.tag_type.RscType`
same with self's element type is possible to return True
:param other: other instance to compare
:return: true if 'other' is same with self
"""
if super(RscTuple, self).__eq__(other):
try:
return other.getElementRscType() == self.getElementRscType()
except:
pass
return False