diff --git a/pycomm3/logix_driver.py b/pycomm3/logix_driver.py index a561099..1c39cfc 100644 --- a/pycomm3/logix_driver.py +++ b/pycomm3/logix_driver.py @@ -94,6 +94,20 @@ AtomicValueType = Union[int, float, bool, str] TagValueType = Union[AtomicValueType, List[AtomicValueType], Dict[str, "TagValueType"]] ReadWriteReturnType = Union[Tag, List[Tag]] +BIT_ACCESSIBLE_TYPES = { + "SINT", + "INT", + "DINT", + "LINT", + "USINT", + "UINT", + "UDINT", + "ULINT", + "BYTE", + "WORD", + "DWORD", + "LWORD", +} class LogixDriver(CIPDriver): @@ -426,7 +440,7 @@ def get_tag_list(self, program: str = None, cache: bool = True) -> List[dict]: tags = self._get_tag_list(program) if cache: - self._tags = {tag["tag_name"]: tag for tag in tags} + self._tags = {util.tag_lookup_key(tag["tag_name"]): tag for tag in tags} self._cache = None @@ -626,7 +640,7 @@ def _isolate_user_tags(self, all_tags, program=None): if program is not None: name = f"Program:{program}.{name}" - self._cache["tag_name:id"][name] = tag["instance_id"] + self._cache["tag_name:id"][util.tag_lookup_key(name)] = tag["instance_id"] user_tags.append(self._create_tag(name, tag)) @@ -810,7 +824,9 @@ def _parse_template_data(self, data, template, symbol_type): else: data_type["attributes"].append(member) - data_type["internal_tags"][member] = info + # Add tag name field for a case-sensitive tag name + info["tag_name"] = member + data_type["internal_tags"][util.tag_lookup_key(member)] = info if info["data_type_name"] == "BOOL" and 'bit' in info: # bit members aren't really 'struct' members since they are aliased to bits of other members @@ -820,10 +836,10 @@ def _parse_template_data(self, data, template, symbol_type): if ( # determine if struct is a string or not data_type["attributes"] == ["LEN", "DATA"] - and data_type["internal_tags"]["DATA"]["data_type_name"] == "SINT" - and data_type["internal_tags"]["DATA"].get("array") + and data_type["internal_tags"]["data"]["data_type_name"] == "SINT" + and data_type["internal_tags"]["data"].get("array") ): - data_type["string"] = data_type["internal_tags"]["DATA"]["array"] + data_type["string"] = data_type["internal_tags"]["data"]["array"] data_type["type_class"] = FixedSizeString(template["structure_size"] - 4) else: @@ -1243,14 +1259,19 @@ def get_tag_info(self, tag_name: str) -> Optional[dict]: """ base, *attrs = tag_name.split(".") - if base.startswith("Program:"): + if base.lower().startswith("program:"): base = f"{base}.{attrs.pop(0)}" + if len(attrs) and attrs[-1].isdigit(): + bit = int(attrs.pop(-1)) + tag_info = self._get_tag_info(base, attrs) + self._validate_bit_access(tag_info, bit) + return self._get_bit_tag_info(tag_info, bit) return self._get_tag_info(base, attrs) def _get_tag_info(self, base, attrs) -> Optional[dict]: def _recurse_attrs(attrs, data): cur, *remain = attrs - curr_tag = util.strip_array(cur) + curr_tag = util.tag_lookup_key(cur) if not len(remain): return data[curr_tag] else: @@ -1260,7 +1281,7 @@ def _recurse_attrs(attrs, data): return None try: - data = self._tags[util.strip_array(base)] + data = self._tags[util.tag_lookup_key(base)] if not len(attrs): return data else: @@ -1274,6 +1295,33 @@ def _recurse_attrs(attrs, data): self.__log.exception(_msg) raise RequestError(_msg) from err + @staticmethod + def _validate_bit_access(tag_info: dict, bit: int): + data_type = tag_info["data_type_name"] + if data_type not in BIT_ACCESSIBLE_TYPES: + raise RequestError(f"Tag type {data_type!r} does not support bit access") + + bit_count = DataTypes.get(data_type).size * 8 + if bit >= bit_count: + raise RequestError( + f"Bit index {bit} out of range for {data_type}, expected 0-{bit_count - 1}" + ) + + @staticmethod + def _get_bit_tag_info(tag_info: dict, bit: int) -> dict: + bit_info = { + **tag_info, + "tag_type": "atomic", + "data_type": "BOOL", + "data_type_name": "BOOL", + "type_class": DataTypes.get("BOOL"), + "bit": bit, + } + bit_info.pop("array", None) + if "tag_name" in bit_info: + bit_info["tag_name"] = f'{bit_info["tag_name"]}.{bit}' + return bit_info + def _parse_requested_tags(self, tags, rw="r"): requests = {} @@ -1310,23 +1358,27 @@ def _parse_tag_request(self, tag: str, rw="r") -> dict: bool_elements = None base, *attrs = tag.split(".") - if base.startswith("Program:"): + if base.lower().startswith("program:"): base = f"{base}.{attrs.pop(0)}" + explicit_bit = False if len(attrs) and attrs[-1].isdigit(): bit = int(attrs.pop(-1)) + explicit_bit = True tag = base if not len(attrs) else f"{base}.{'.'.join(attrs)}" tag_info = self._get_tag_info(base, attrs) + if explicit_bit: + self._validate_bit_access(tag_info, bit) if tag_info["data_type"] == "DWORD": _tag, idx = util.get_array_index(tag) - if idx is not None: + if idx is not None and not explicit_bit: tag = f"{_tag}[0]" if rw == "r" else f"{_tag}[{idx // 32}]" - bit = idx - bool_elements = None if implicit_element or elements == 1 else elements - total_size = (bit or 0) + elements - elements = (total_size // 32) + (1 if total_size % 32 else 0) + bit = idx + bool_elements = None if implicit_element or elements == 1 else elements + total_size = (bit or 0) + elements + elements = (total_size // 32) + (1 if total_size % 32 else 0) return { "user_tag": request_tag, # tag name from user, without element request diff --git a/pycomm3/util.py b/pycomm3/util.py index 76e99ef..fe60896 100644 --- a/pycomm3/util.py +++ b/pycomm3/util.py @@ -30,6 +30,18 @@ from typing import Tuple +def tag_lookup_key(tag_name: str) -> str: + """Strip array portions of the tag, and ensure lower case strings for consistent lookups + + Args: + tag_name (str): Tag name to create key from + + Returns: + str: Tag lookup key for the given tag name + """ + return strip_array(tag_name).lower() + + def strip_array(tag: str) -> str: """ Strip off the array portion of the tag diff --git a/tests/offline/test_util.py b/tests/offline/test_util.py index ae55144..40863d7 100644 --- a/tests/offline/test_util.py +++ b/tests/offline/test_util.py @@ -1,4 +1,4 @@ -from pycomm3.util import strip_array, get_array_index +from pycomm3.util import strip_array, get_array_index, tag_lookup_key TEST_TAG = "This is a tag" @@ -21,3 +21,7 @@ def test_get_array_index_returns_index_value(): TEST_ARRAY = "[123]" EXPECTED = (TEST_TAG, 123) assert EXPECTED == get_array_index(TEST_TAG + TEST_ARRAY) + + +def test_tag_lookup_key_strips_array_and_lowercases_tag(): + assert 'mixed_case_tag' == tag_lookup_key('Mixed_Case_Tag[123]')