diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index 7e059d5292..b3bbd6bea3 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -35,7 +35,7 @@ # Try import Pymodbus library or install it and import installation_required = False -required_version = '3.0.0' +required_version = '3.0.2' force_install = False try: @@ -160,7 +160,7 @@ def run(self): try: self.__log.info("Starting Modbus server") self.__run_server() - self.__add_slave(self.__server.get_slave_config_format()) + self.__server.slave = self.__add_slave(self.__server.get_slave_config_format()) self.__log.info("Modbus server started") except Exception as e: self.__log.exception('Failed to start Modbus server: %s', e) @@ -177,7 +177,7 @@ def run(self): self.__log.exception(e) def __run_server(self): - self.__server = Server(self.__config['slave'], self.__log) + self.__server = Server(self, self.__config['slave'], self.__log) self.__server.start() def __get_master(self, slave: Slave): @@ -204,10 +204,13 @@ def __get_master(self, slave: Slave): def __add_slave(self, slave_config): slave = Slave(self, self.__log, slave_config) - master = self.__get_master(slave) - slave.master = master + if slave.no_master: + master = self.__get_master(slave) + slave.master = master self.__slaves.append(slave) + + return slave def __add_slaves(self, slaves_config): for slave_config in slaves_config: @@ -217,7 +220,7 @@ def __add_slaves(self, slaves_config): self.__log.exception('Failed to add slave: %s', e) self.__log.debug('Added %d slaves', len(self.__slaves)) - + @classmethod def callback(cls, slave: Slave, queue: Queue): queue.put_nowait(slave) @@ -226,26 +229,28 @@ async def __process_requests(self): while not self.__stopped: try: slave = self.process_device_requests.get_nowait() - await self.__poll_device(slave) + await self.__process_device(slave) except Empty: await asyncio.sleep(.01) except Exception as e: self.__log.exception('Failed to poll device: %s', e) - async def __poll_device(self, slave: Slave): + async def __process_device(self, slave: Slave): self.__log.debug("Polling %s slave", slave) # check if device have attributes or telemetry to poll if slave.uplink_converter_config.attributes or slave.uplink_converter_config.telemetry: try: - connected_to_master = await slave.connect() - + if slave.no_master: + connected_to_master = True + else: + connected_to_master = await slave.connect() + if connected_to_master: self.__manage_device_connectivity_to_platform(slave) - - if connected_to_master: slave_data = await self.__read_slave_data(slave) - self.__data_to_convert.put_nowait((slave, slave_data)) + if slave_data: + self.__data_to_convert.put_nowait((slave, slave_data)) else: self.__log.error('Socket is closed, connection is lost, for device %s', slave) self.__delete_device_from_platform(slave) @@ -270,7 +275,14 @@ async def __read_slave_data(self, slave: Slave): for config_section in ('attributes', 'telemetry'): for config in getattr(slave.uplink_converter_config, config_section): try: - response = await slave.read(config['functionCode'], config['address'], config['objectsCount']) + if slave.no_master: + if config['address'] in self.__server.addresses_updated: + response = self.__server.read(config['functionCode'], config['address'], config['objectsCount']) + else: + continue + else: + response = await slave.read(config['functionCode'], config['address'], config['objectsCount']) + except asyncio.exceptions.TimeoutError: self.__log.error("Timeout error for device %s function code %s address %s", slave.device_name, config['functionCode'], config[ADDRESS_PARAMETER]) @@ -319,12 +331,20 @@ def __convert_data(self): batch_to_convert[batch_key] = [] batch_to_convert[batch_key].append(data) - - for (device_name, uplink_converter), data in batch_to_convert.items(): - converted_data: ConvertedData = uplink_converter.convert({}, data) - self.__log.trace("Converted data: %r", converted_data) - if len(converted_data['attributes']) or len(converted_data['telemetry']): - self.__data_to_save.put_nowait(converted_data) + + self.__log.trace("Data read from %s slave: %s", slave, data) + + if slave.no_master: + converted_data = ConvertedData(slave.device_name, slave.device_type) + converted_data.add_to_attributes(data['attributes']) + converted_data.add_to_telemetry(data['telemetry']) + else: + for (device_name, uplink_converter), data in batch_to_convert.items(): + converted_data: ConvertedData = uplink_converter.convert({}, data) + self.__log.trace("Converted data: %r", converted_data) + + if len(converted_data['attributes']) or len(converted_data['telemetry']): + self.__data_to_save.put_nowait(converted_data) else: sleep(.001) except Exception as e: @@ -334,7 +354,7 @@ def __save_data(self): while not self.__stopped: if not self.__data_to_save.empty(): try: - converted_data = self.__data_to_save.get_nowait() + converted_data: ConvertedData = self.__data_to_save.get_nowait() StatisticsService.count_connector_message(self.get_name(), stat_parameter_name='storageMsgPushed') self.__gateway.send_to_storage(self.get_name(), self.get_id(), converted_data) self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1 diff --git a/thingsboard_gateway/connectors/modbus/server.py b/thingsboard_gateway/connectors/modbus/server.py index e07d6203b0..cf43a6b3fa 100644 --- a/thingsboard_gateway/connectors/modbus/server.py +++ b/thingsboard_gateway/connectors/modbus/server.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING, Dict, Union + import asyncio from asyncio import CancelledError from threading import Thread @@ -31,6 +33,11 @@ from thingsboard_gateway.connectors.modbus.constants import ADDRESS_PARAMETER, BYTE_ORDER_PARAMETER, FUNCTION_CODE_SLAVE_INITIALIZATION, FUNCTION_TYPE, \ FUNCTION_CODE_READ, HOST_PARAMETER, IDENTITY_SECTION, METHOD_PARAMETER, OBJECTS_COUNT_PARAMETER, PORT_PARAMETER, REPACK_PARAMETER, SERIAL_CONNECTION_TYPE_PARAMETER, TAG_PARAMETER, WORD_ORDER_PARAMETER from thingsboard_gateway.gateway.constants import DEVICE_NAME_PARAMETER, TYPE_PARAMETER +from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService + +if TYPE_CHECKING: + from thingsboard_gateway.connectors.modbus.modbus_connector import ModbusConnector + from thingsboard_gateway.connectors.modbus.modbus_connector import Slave SLAVE_TYPE = { 'tcp': StartAsyncTcpServer, @@ -47,26 +54,40 @@ class Server(Thread): - def __init__(self, config, logger): + class CallbackDatablock(ModbusSparseDataBlock): + def __init__(self, values, callback): + self.callback = callback + super().__init__(values) + + def setValues(self, address, value): + self.callback(address, len(value)) + super().setValues(address, value) + + def __init__(self, connector: 'ModbusConnector', config, logger): super().__init__() self.__stopped = False self.daemon = True self.name = 'Gateway Modbus Server (Slave)' - + + self.connector = connector self.__log = logger - self.__config = config + self.connector = connector self.device_name = config.get('deviceName', 'Modbus Slave') self.device_type = config.get('deviceType', 'default') self.poll_period = config.get('pollPeriod', 5000) self.__type = config.get('type', 'tcp').lower() self.__identity = self.__get_identity(self.__config) - self.__server_context = self.__get_server_context(self.__config) + self.__server_context, self.__datablock = self.__get_server_context(self.__config) self.__connection_config = self.__get_connection_config(self.__config) + self.__log.trace("Connection config loaded: %s", self.__connection_config) + self.no_master = False self.__server = None + self.slave: Slave = None + self.addresses_updated = set() try: self.loop = asyncio.new_event_loop() @@ -76,6 +97,31 @@ def __init__(self, config, logger): def __str__(self): return self.name + + def __callback(self, address, count): + if self.slave: + for n in range(count): + self.addresses_updated.add(address - 1 + n) + self.__log.trace("Updated addresses: %s", self.addresses_updated) + try: + self.slave.connector.callback(self.slave, self.slave.connector.process_device_requests) + except Exception as e: + self.__log.exception('Error sending slave callback from Server: %s', e) + + def read(self, function_code, address, objects_count): + self.__log.debug('Reading %s registers from address %s with function code %s', objects_count, address, + function_code) + + result, = self.__datablock.getValues(function_code, address, objects_count) + + self.addresses_updated.remove(address) + + StatisticsService.count_connector_message(self.connector.get_name(), + stat_parameter_name='connectorMsgsReceived') + StatisticsService.count_connector_bytes(self.connector.get_name(), result, + stat_parameter_name='connectorBytesReceived') + + return result def run(self): try: @@ -109,6 +155,9 @@ async def start_server(self): self.__server = await SLAVE_TYPE[self.__type](identity=self.__identity, context=self.__server_context, **self.__connection_config, defer_start=True, allow_reuse_address=True, allow_reuse_port=True) + + if self.__config[TYPE_PARAMETER] == SERIAL_CONNECTION_TYPE_PARAMETER: + await self.__server.start() await self.__server.serve_forever() except Exception as e: self.__stopped = True @@ -123,7 +172,8 @@ def get_slave_config_format(self): **self.__config, 'deviceName': self.device_name, 'deviceType': self.device_type, - 'pollPeriod': self.poll_period + 'pollPeriod': self.poll_period, + 'no_master': True # shows that this slave don't have a master } for (register, register_values) in self.__config.get('values', {}).items(): @@ -179,7 +229,7 @@ def __get_server_context(self, config): blocks = {} if (config.get('values') is None) or (not len(config.get('values'))): self.__log.error("No values to read from device %s", config.get(DEVICE_NAME_PARAMETER, 'Modbus Slave')) - return + return None, None for (key, value) in config.get('values').items(): values = {} @@ -210,13 +260,15 @@ def __get_server_context(self, config): except Exception as e: self.__log.error("Failed to configure value %s with error: %s, skipping...", item['value'], e) + try: if len(values): - blocks[FUNCTION_TYPE[key]] = ModbusSparseDataBlock(values) + blocks[FUNCTION_TYPE[key]] = self.CallbackDatablock(values, callback=self.__callback) except Exception as e: self.__log.error("Failed to configure block %s with error: %s", key, e) if not len(blocks): self.__log.info("%s - will be initialized without values", config.get(DEVICE_NAME_PARAMETER, 'Modbus Slave')) - return ModbusServerContext(slaves=ModbusSlaveContext(**blocks), single=True) + datablock = ModbusSlaveContext(**blocks) + return ModbusServerContext(slaves=datablock, single=True), datablock diff --git a/thingsboard_gateway/connectors/modbus/slave.py b/thingsboard_gateway/connectors/modbus/slave.py index dff241c0b3..db03be0b62 100644 --- a/thingsboard_gateway/connectors/modbus/slave.py +++ b/thingsboard_gateway/connectors/modbus/slave.py @@ -58,6 +58,7 @@ def __init__(self, connector: 'ModbusConnector', logger, config): self.callback = connector.callback + self.no_master = config.get('no_master', False) self.unit_id = config[UNIT_ID_PARAMETER] self.host = config.get(HOST_PARAMETER) self.port = config[PORT_PARAMETER] @@ -110,7 +111,8 @@ def __init__(self, connector: 'ModbusConnector', logger, config): for attr_config in self.attributes_updates_config: self.shared_attributes_keys.append(attr_config[TAG_PARAMETER]) - self.start() + if not self.no_master: + self.start() def __timer(self): self.__send_callback()