From fc58a2edf2bf1874cbc8ad7304f08f1e6e653a72 Mon Sep 17 00:00:00 2001 From: Andreas Pachler Date: Wed, 18 Feb 2026 09:16:07 +0100 Subject: [PATCH] feat(poll_scheduler): add cron-based poll scheduling to all connectors Introduce PollScheduler, a new utility that accepts one or more cron expressions (via croniter) and computes the next monotonic poll time. The helper function compute_next_poll() provides a single drop-in replacement for the existing "current_time + poll_period" pattern used across every connector. - Add thingsboard_gateway/tb_utility/poll_scheduler.py with PollScheduleEntry, PollScheduler, and compute_next_poll() - Add unit tests in tests/unit/test_poll_scheduler.py - Integrate PollScheduler into all connectors: bacnet, ble, can, ftp, modbus, odbc, opcua, request, snmp - Add pollSchedule: null placeholder to all connector config examples - Add croniter>=1.3.8 to requirements.txt and setup.py dependencies When pollSchedule is null or absent the behaviour is unchanged (pollPeriod is used as before). When a cron schedule is supplied the next fire time is derived from the schedule instead. --- requirements.txt | 1 + setup.py | 1 + tests/unit/test_poll_scheduler.py | 129 ++++++++++++++++++ thingsboard_gateway/config/bacnet.json | 1 + thingsboard_gateway/config/ble.json | 1 + thingsboard_gateway/config/can.json | 2 + thingsboard_gateway/config/ftp.json | 1 + thingsboard_gateway/config/modbus.json | 1 + thingsboard_gateway/config/odbc.json | 1 + thingsboard_gateway/config/opcua.json | 1 + thingsboard_gateway/config/request.json | 3 + thingsboard_gateway/config/snmp.json | 2 + .../connectors/bacnet/device.py | 12 +- thingsboard_gateway/connectors/ble/device.py | 21 ++- .../connectors/can/can_connector.py | 16 ++- .../connectors/ftp/ftp_connector.py | 25 +++- .../connectors/modbus/slave.py | 14 +- .../connectors/odbc/odbc_connector.py | 14 +- .../connectors/opcua/opcua_connector.py | 8 +- .../connectors/request/request_connector.py | 9 +- .../connectors/snmp/snmp_connector.py | 15 +- .../tb_utility/poll_scheduler.py | 129 ++++++++++++++++++ 22 files changed, 390 insertions(+), 17 deletions(-) create mode 100644 tests/unit/test_poll_scheduler.py create mode 100644 thingsboard_gateway/tb_utility/poll_scheduler.py diff --git a/requirements.txt b/requirements.txt index 393f9ed81a3..3f080912b65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,3 +20,4 @@ service-identity psutil cryptography PySocks +croniter>=1.3.8 diff --git a/setup.py b/setup.py index 05d1b1ced17..984ffcf7328 100644 --- a/setup.py +++ b/setup.py @@ -88,6 +88,7 @@ 'service-identity', 'psutil', 'PySocks', + 'croniter>=1.3.8', ], download_url='https://github.com/thingsboard/thingsboard-gateway/archive/%s.tar.gz' % version.VERSION, entry_points={ diff --git a/tests/unit/test_poll_scheduler.py b/tests/unit/test_poll_scheduler.py new file mode 100644 index 00000000000..dfa56fdcbb8 --- /dev/null +++ b/tests/unit/test_poll_scheduler.py @@ -0,0 +1,129 @@ +import pytest +from time import monotonic +from datetime import datetime +from unittest.mock import patch + +from thingsboard_gateway.tb_utility.poll_scheduler import ( + PollScheduleEntry, PollScheduler, compute_next_poll +) + + +# --- Config parsing --- + +class TestPollSchedulerParsing: + def test_parse_string(self): + s = PollScheduler("*/5 * * * *") + assert s.is_active + assert len(s.entries) == 1 + assert s.entries[0].label == "*/5 * * * *" + + def test_parse_list_of_strings(self): + s = PollScheduler(["*/5 * * * *", "*/10 * * * *"]) + assert s.is_active + assert len(s.entries) == 2 + + def test_parse_list_of_dicts(self): + s = PollScheduler([{"cron": "*/5 * * * *", "label": "prod"}]) + assert s.is_active + assert len(s.entries) == 1 + assert s.entries[0].label == "prod" + + def test_parse_mixed_list(self): + s = PollScheduler([ + "*/5 * * * *", + {"cron": "*/10 * * * *", "label": "slow"} + ]) + assert len(s.entries) == 2 + assert s.entries[0].label == "*/5 * * * *" + assert s.entries[1].label == "slow" + + def test_parse_none(self): + s = PollScheduler(None) + assert not s.is_active + assert len(s.entries) == 0 + + def test_parse_empty_list(self): + s = PollScheduler([]) + assert not s.is_active + + def test_invalid_cron(self): + with pytest.raises(ValueError, match="Invalid cron expression"): + PollScheduler("not a cron") + + def test_invalid_type(self): + with pytest.raises(ValueError, match="pollSchedule must be str, list, or None"): + PollScheduler(12345) + + +# --- Schedule computation --- + +class TestScheduleComputation: + def test_next_fire_single(self): + s = PollScheduler("*/1 * * * *") + result = s.next_poll_monotonic() + assert result > monotonic() + + def test_next_fire_multiple_nearest_wins(self): + # Every minute should fire sooner than every 30 minutes + s = PollScheduler(["*/30 * * * *", "*/1 * * * *"]) + result = s.next_poll_monotonic() + now = monotonic() + # Should be within ~60 seconds (the every-minute schedule) + assert result - now <= 61.0 + + def test_next_fire_distant(self): + # Schedule for a specific unlikely time — still returns a valid future value + s = PollScheduler("0 3 1 1 *") # Jan 1 at 3am + result = s.next_poll_monotonic() + assert result > monotonic() + + +# --- compute_next_poll() --- + +class TestComputeNextPoll: + def test_compute_with_active_scheduler(self): + s = PollScheduler("*/1 * * * *") + now = monotonic() + result = compute_next_poll(now, 5.0, s) + # Should NOT be now + 5.0 (that's the fallback) + assert result != now + 5.0 + assert result > now + + def test_compute_without_scheduler(self): + now = monotonic() + result = compute_next_poll(now, 5.0, None) + assert result == now + 5.0 + + def test_compute_with_inactive_scheduler(self): + s = PollScheduler(None) + now = monotonic() + result = compute_next_poll(now, 5.0, s) + assert result == now + 5.0 + + +# --- PollScheduleEntry --- + +class TestPollScheduleEntry: + def test_next_fire_time(self): + entry = PollScheduleEntry("*/5 * * * *") + base = datetime(2025, 1, 1, 12, 3, 0) + nxt = entry.next_fire_time(base) + assert nxt == datetime(2025, 1, 1, 12, 5, 0) + + def test_label_defaults_to_cron(self): + entry = PollScheduleEntry("*/5 * * * *") + assert entry.label == "*/5 * * * *" + + def test_custom_label(self): + entry = PollScheduleEntry("*/5 * * * *", label="fast") + assert entry.label == "fast" + + +class TestSchedulerWallClockDelta: + def test_scheduler_wall_clock_delta(self): + """Verify next_poll_monotonic() - monotonic() produces a positive delta + suitable for wall-clock offset (used by Request connector pattern).""" + s = PollScheduler("*/1 * * * *") + delta = s.next_poll_monotonic() - monotonic() + assert delta > 0 + assert delta <= 61.0 diff --git a/thingsboard_gateway/config/bacnet.json b/thingsboard_gateway/config/bacnet.json index 00780e90214..a243e606ebb 100644 --- a/thingsboard_gateway/config/bacnet.json +++ b/thingsboard_gateway/config/bacnet.json @@ -31,6 +31,7 @@ "port": "47808", "mask": "24", "pollPeriod": 10000, + "pollSchedule": null, "attributes": [ { "key": "temperature", diff --git a/thingsboard_gateway/config/ble.json b/thingsboard_gateway/config/ble.json index df963bb6884..b023b2c2737 100644 --- a/thingsboard_gateway/config/ble.json +++ b/thingsboard_gateway/config/ble.json @@ -11,6 +11,7 @@ "name": "Temperature and humidity sensor", "MACAddress": "4C:65:A8:DF:85:C0", "pollPeriod": 500000, + "pollSchedule": null, "showMap": false, "timeout": 10000, "connectRetry": 5, diff --git a/thingsboard_gateway/config/can.json b/thingsboard_gateway/config/can.json index ea9d162960e..20030e63976 100644 --- a/thingsboard_gateway/config/can.json +++ b/thingsboard_gateway/config/can.json @@ -35,6 +35,7 @@ "polling": { "type": "always", "period": 5, + "pollSchedule": null, "dataInHex": "aaaa bbbb aaaa bbbb" } }, @@ -47,6 +48,7 @@ "polling": { "type": "always", "period": 30, + "pollSchedule": null, "dataInHex": "aa bb cc dd ee ff aa bb" } } diff --git a/thingsboard_gateway/config/ftp.json b/thingsboard_gateway/config/ftp.json index 3a193e43da4..6dc35784e43 100644 --- a/thingsboard_gateway/config/ftp.json +++ b/thingsboard_gateway/config/ftp.json @@ -16,6 +16,7 @@ "readMode": "FULL", "maxFileSize": 5, "pollPeriod": 500, + "pollSchedule": null, "txtFileDataView": "SLICED", "withSortingFiles": true, "attributes": [ diff --git a/thingsboard_gateway/config/modbus.json b/thingsboard_gateway/config/modbus.json index 699b492c75c..43d61d97e14 100644 --- a/thingsboard_gateway/config/modbus.json +++ b/thingsboard_gateway/config/modbus.json @@ -20,6 +20,7 @@ "retryOnEmpty": true, "retryOnInvalid": true, "pollPeriod": 5000, + "pollSchedule": null, "unitId": 1, "deviceName": "Temp Sensor", "sendDataOnlyOnChange": true, diff --git a/thingsboard_gateway/config/odbc.json b/thingsboard_gateway/config/odbc.json index 55d4c6f60ee..87705c0f133 100644 --- a/thingsboard_gateway/config/odbc.json +++ b/thingsboard_gateway/config/odbc.json @@ -20,6 +20,7 @@ "polling": { "query": "SELECT bool_v, str_v, dbl_v, long_v, entity_id, ts FROM ts_kv WHERE ts > ? ORDER BY ts ASC LIMIT 10", "period": 10, + "pollSchedule": null, "iterator": { "column": "ts", "query": "SELECT MIN(ts) - 1 FROM ts_kv", diff --git a/thingsboard_gateway/config/opcua.json b/thingsboard_gateway/config/opcua.json index 019bf94db1f..e3f46ebd265 100644 --- a/thingsboard_gateway/config/opcua.json +++ b/thingsboard_gateway/config/opcua.json @@ -4,6 +4,7 @@ "timeoutInMillis": 5000, "scanPeriodInMillis": 3600000, "pollPeriodInMillis": 5000, + "pollSchedule": null, "enableSubscriptions": true, "subCheckPeriodInMillis": 100, "subDataMaxBatchSize": 1000, diff --git a/thingsboard_gateway/config/request.json b/thingsboard_gateway/config/request.json index bdc6cf424cc..0f652a08fdb 100644 --- a/thingsboard_gateway/config/request.json +++ b/thingsboard_gateway/config/request.json @@ -20,6 +20,7 @@ "allowRedirects": true, "timeout": 0.5, "scanPeriod": 5, + "pollSchedule": null, "dataUnpackExpression": "", "converter": { "type": "json", @@ -55,6 +56,7 @@ "allowRedirects": true, "timeout": 0.5, "scanPeriod": 100, + "pollSchedule": null, "dataUnpackExpression": "", "converter": { "type": "custom", @@ -135,6 +137,7 @@ "allowRedirects": true, "timeout": 3.0, "scanPeriod": 1800, + "pollSchedule": null, "subRequests": { "cumPwr": { "url": "/${id}/hist/values/ActiveEnergy/SUM13/3600?start=RELATIVE_-1HOUR&end=RELATIVE_-1HOUR&online=false&aggregate=false", diff --git a/thingsboard_gateway/config/snmp.json b/thingsboard_gateway/config/snmp.json index 1d782dc6657..276d7b62c04 100644 --- a/thingsboard_gateway/config/snmp.json +++ b/thingsboard_gateway/config/snmp.json @@ -6,6 +6,7 @@ "ip": "snmp.live.gambitcommunications.com", "port": 161, "pollPeriod": 5000, + "pollSchedule": null, "community": "public", "attributes": [ { @@ -117,6 +118,7 @@ "deviceType": "snmp", "ip": "127.0.0.1", "pollPeriod": 5000, + "pollSchedule": null, "community": "public", "converter": "CustomSNMPConverter", "attributes": [ diff --git a/thingsboard_gateway/connectors/bacnet/device.py b/thingsboard_gateway/connectors/bacnet/device.py index e3198cd4ca5..9cf25b82d3e 100644 --- a/thingsboard_gateway/connectors/bacnet/device.py +++ b/thingsboard_gateway/connectors/bacnet/device.py @@ -25,6 +25,9 @@ from thingsboard_gateway.connectors.bacnet.entities.uplink_converter_config import UplinkConverterConfig from thingsboard_gateway.gateway.constants import UPLINK_PREFIX, CONVERTER_PARAMETER from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader +from thingsboard_gateway.tb_utility.poll_scheduler import ( + PollScheduler, compute_next_poll +) from thingsboard_gateway.tb_utility.tb_utility import TBUtility @@ -74,6 +77,7 @@ def __init__(self, connector_type, config, i_am_request, reading_queue, rescan_q self.__config_poll_period = self.__config.get('pollPeriod', 10000) / 1000 self.__poll_period = self.__config_poll_period + self.__scheduler = PollScheduler(self.__config.get('pollSchedule')) self.attributes_updates = self.__config.get('attributeUpdates', []) self.shared_attributes_keys = self.__get_shared_attributes_keys() self.server_side_rpc = self.__config.get('serverSideRpc', []) @@ -138,7 +142,9 @@ async def run(self): if len(self.uplink_converter_config.objects_to_read) > 0: self.__request_process_queue.put_nowait(self) - next_poll_time = monotonic() + self.__poll_period + next_poll_time = compute_next_poll( + monotonic(), self.__poll_period, self.__scheduler + ) while not self.__stopped: current_time = monotonic() @@ -146,7 +152,9 @@ async def run(self): if len(self.uplink_converter_config.objects_to_read) > 0: self.__request_process_queue.put_nowait(self) - next_poll_time = current_time + self.__poll_period + next_poll_time = compute_next_poll( + current_time, self.__poll_period, self.__scheduler + ) sleep_time = max(0.0, next_poll_time - current_time) diff --git a/thingsboard_gateway/connectors/ble/device.py b/thingsboard_gateway/connectors/ble/device.py index 5d6d2b32c19..7ae27a75544 100644 --- a/thingsboard_gateway/connectors/ble/device.py +++ b/thingsboard_gateway/connectors/ble/device.py @@ -32,6 +32,9 @@ from thingsboard_gateway.gateway.statistics.decorators import CollectStatistics # noqa from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader from thingsboard_gateway.connectors.ble.error_handler import ErrorHandler +from thingsboard_gateway.tb_utility.poll_scheduler import ( + PollScheduler, compute_next_poll +) MAC_ADDRESS_FORMAT = { 'Darwin': '-', @@ -64,6 +67,7 @@ def __init__(self, config, logger): self._log.error(e) self.poll_period = config.get('pollPeriod', 5000) / 1000 + self.scheduler = PollScheduler(config.get('pollSchedule')) self.config = self._generate_config(config) self.adv_only = self._check_adv_mode() self.callback = config['callback'] @@ -125,10 +129,16 @@ def __load_converter(self, name): self.stopped = True async def timer(self, scanned_devices_callback): + next_poll_time = compute_next_poll( + time(), self.poll_period, self.scheduler + ) while True: try: - if time() - self.last_polled_time >= self.poll_period: + if time() >= next_poll_time: self.last_polled_time = time() + next_poll_time = compute_next_poll( + time(), self.poll_period, self.scheduler + ) await self.__process_self() await self._process_adv_data(scanned_devices_callback=scanned_devices_callback) else: @@ -284,9 +294,16 @@ async def run_client(self, scanned_devices_callback): await self.timer(scanned_devices_callback) else: + next_poll_time = compute_next_poll( + time(), self.poll_period, self.scheduler + ) while not self.stopped: await self._process_adv_data(scanned_devices_callback) - await sleep(self.poll_period) + sleep_time = max(0.0, next_poll_time - time()) + await sleep(sleep_time) + next_poll_time = compute_next_poll( + time(), self.poll_period, self.scheduler + ) async def __show_map(self, return_result=False): result = f'MAP FOR {self.name.upper()}' diff --git a/thingsboard_gateway/connectors/can/can_connector.py b/thingsboard_gateway/connectors/can/can_connector.py index 3f71614998c..e9e32246887 100644 --- a/thingsboard_gateway/connectors/can/can_connector.py +++ b/thingsboard_gateway/connectors/can/can_connector.py @@ -36,6 +36,9 @@ from thingsboard_gateway.connectors.can.bytes_can_downlink_converter import BytesCanDownlinkConverter from thingsboard_gateway.connectors.can.bytes_can_uplink_converter import BytesCanUplinkConverter from thingsboard_gateway.connectors.connector import Connector +from thingsboard_gateway.tb_utility.poll_scheduler import ( + PollScheduler, compute_next_poll +) class CanConnector(Connector, Thread): @@ -672,6 +675,7 @@ def __init__(self, connector: CanConnector): super().__init__() self.connector = connector self.scheduler = sched.scheduler(time.time, time.sleep) + self._poll_schedulers = {} self.events = [] self.first_run = True self.daemon = True @@ -714,5 +718,15 @@ def __poll_and_schedule(self, data, config): self.connector.get_name(), config["period"], config["nodeId"], data) self.connector.send_data_to_bus(data, config, raise_exception=self.first_run) - event = self.scheduler.enter(config["period"], 1, self.__poll_and_schedule, argument=(data, config)) + key = config.get("key", "") + if key not in self._poll_schedulers: + self._poll_schedulers[key] = PollScheduler(config.get("pollSchedule")) + ps = self._poll_schedulers[key] + if ps.is_active: + from time import monotonic as _mono + delay = max(0.0, ps.next_poll_monotonic() - _mono()) + else: + delay = config["period"] + + event = self.scheduler.enter(delay, 1, self.__poll_and_schedule, argument=(data, config)) self.events.append(event) diff --git a/thingsboard_gateway/connectors/ftp/ftp_connector.py b/thingsboard_gateway/connectors/ftp/ftp_connector.py index 81b09f1b76a..2de88fa126a 100644 --- a/thingsboard_gateway/connectors/ftp/ftp_connector.py +++ b/thingsboard_gateway/connectors/ftp/ftp_connector.py @@ -28,6 +28,9 @@ from thingsboard_gateway.connectors.ftp.file import File from thingsboard_gateway.connectors.ftp.ftp_uplink_converter import FTPUplinkConverter from thingsboard_gateway.connectors.ftp.path import Path +from thingsboard_gateway.tb_utility.poll_scheduler import ( + PollScheduler, compute_next_poll +) from thingsboard_gateway.gateway.entities.converted_data import ConvertedData from thingsboard_gateway.gateway.statistics.decorators import CollectAllReceivedBytesStatistics from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService @@ -149,13 +152,17 @@ def __fill_ftp_path_parameters(self): "extension": path["converter"]["extension"] } - path_parameters_list.append(Path(**base_path_config, **custom_path_config)) + p = Path(**base_path_config, **custom_path_config) + p._poll_schedule = path.get('pollSchedule') + path_parameters_list.append(p) else: base_path_config['telemetry'] = path.get('timeseries') base_path_config['attributes'] = path.get('attributes') base_path_config['device_name'] = path['devicePatternName'] base_path_config['device_type'] = path.get('devicePatternType', 'Device') - path_parameters_list.append(Path(**base_path_config)) + p = Path(**base_path_config) + p._poll_schedule = path.get('pollSchedule') + path_parameters_list.append(p) except KeyError as e: self.__log.debug("Failed to extract path arguments for path %s", str(e)) continue @@ -187,7 +194,16 @@ def __connect(self, ftp): def __process_paths(self, ftp): for path in self.paths: time_point = timer() - if time_point - path.last_polled_time >= path.poll_period or path.last_polled_time == 0: + if not hasattr(path, '_scheduler'): + path._scheduler = PollScheduler(getattr(path, '_poll_schedule', None)) + if not hasattr(path, '_next_poll_time'): + path._next_poll_time = 0 + if path._scheduler.is_active: + from time import monotonic as _mono + should_poll = _mono() >= path._next_poll_time + else: + should_poll = time_point - path.last_polled_time >= path.poll_period or path.last_polled_time == 0 + if should_poll: configuration = path.config if path.custom_converter_type == "custom": @@ -217,6 +233,9 @@ def __process_paths(self, ftp): converter = FTPUplinkConverter(configuration, self.__converter_log) path.last_polled_time = time_point + if path._scheduler.is_active: + from time import monotonic as _mono + path._next_poll_time = path._scheduler.next_poll_monotonic() if '*' in path.path: path.find_files(ftp) diff --git a/thingsboard_gateway/connectors/modbus/slave.py b/thingsboard_gateway/connectors/modbus/slave.py index fff28fd76cd..4cc55dec964 100644 --- a/thingsboard_gateway/connectors/modbus/slave.py +++ b/thingsboard_gateway/connectors/modbus/slave.py @@ -54,6 +54,9 @@ DOWNLINK_PREFIX ) from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService +from thingsboard_gateway.tb_utility.poll_scheduler import ( + PollScheduler, compute_next_poll +) from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader if TYPE_CHECKING: @@ -110,7 +113,8 @@ def __init__(self, connector: 'ModbusConnector', logger, config): self.device_name = config[DEVICE_NAME_PARAMETER] self.device_type = config.get(DEVICE_TYPE_PARAMETER, 'default') - self.poll_period = config['pollPeriod'] / 1000 + self.poll_period = config.get('pollPeriod', 5000) / 1000 + self.scheduler = PollScheduler(config.get('pollSchedule')) self.last_connect_time = 0 self.last_polled_time = 0 @@ -132,13 +136,17 @@ def __init__(self, connector: 'ModbusConnector', logger, config): async def __timer(self): if self.__master is not None: self.__send_callback(monotonic()) - next_poll_time = monotonic() + self.poll_period + next_poll_time = compute_next_poll( + monotonic(), self.poll_period, self.scheduler + ) while not self.stopped and not self.connector.is_stopped(): current_time = monotonic() if current_time >= next_poll_time: self.__send_callback(current_time) - next_poll_time = current_time + self.poll_period + next_poll_time = compute_next_poll( + current_time, self.poll_period, self.scheduler + ) sleep_time = max(0.0, next_poll_time - monotonic()) await asyncio.sleep(sleep_time) diff --git a/thingsboard_gateway/connectors/odbc/odbc_connector.py b/thingsboard_gateway/connectors/odbc/odbc_connector.py index a257c3919cc..b149365939c 100644 --- a/thingsboard_gateway/connectors/odbc/odbc_connector.py +++ b/thingsboard_gateway/connectors/odbc/odbc_connector.py @@ -39,6 +39,7 @@ import pyodbc from thingsboard_gateway.connectors.odbc.odbc_uplink_converter import OdbcUplinkConverter +from thingsboard_gateway.tb_utility.poll_scheduler import PollScheduler from thingsboard_gateway.connectors.connector import Connector @@ -271,6 +272,7 @@ def __process_rpc(self, procedure_name, query, sql_params): self.__rpc_cursor.execute("{{CALL {}}}".format(procedure_name)) def run(self): + scheduler = PollScheduler(self.__config.get("polling", {}).get("pollSchedule")) while not self.__stopped: # Initialization phase if not self.is_connected(): @@ -293,9 +295,15 @@ def run(self): try: self.__poll() if not self.__stopped: - polling_period = self.__config["polling"].get("period", self.DEFAULT_POLL_PERIOD) - self._log.debug("[%s] Next polling iteration will be in %d second(s)", self.get_name(), polling_period) - sleep(polling_period) + if scheduler.is_active: + from time import monotonic + delay = max(0.0, scheduler.next_poll_monotonic() - monotonic()) + self._log.debug("[%s] Next polling iteration (cron) in %.1f second(s)", self.get_name(), delay) + sleep(delay) + else: + polling_period = self.__config["polling"].get("period", self.DEFAULT_POLL_PERIOD) + self._log.debug("[%s] Next polling iteration will be in %d second(s)", self.get_name(), polling_period) + sleep(polling_period) except pyodbc.Warning as w: self._log.warning("[%s] Warning while polling database: %s", self.get_name(), str(w)) except pyodbc.Error as e: diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 067df868775..2d2fab9a3fe 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -27,6 +27,9 @@ from cachetools import TTLCache from thingsboard_gateway.connectors.connector import Connector +from thingsboard_gateway.tb_utility.poll_scheduler import ( + PollScheduler, compute_next_poll +) from thingsboard_gateway.gateway.constants import CONNECTOR_PARAMETER, RECEIVED_TS_PARAMETER, CONVERTED_TS_PARAMETER, \ DATA_RETRIEVING_STARTED, REPORT_STRATEGY_PARAMETER, ON_ATTRIBUTE_UPDATE_DEFAULT_TIMEOUT from thingsboard_gateway.gateway.entities.converted_data import ConvertedData @@ -396,6 +399,7 @@ async def start_client(self): self.__log.error("Error on fetching server limitations:\n %s", e) poll_period = int(self.__server_conf.get('pollPeriodInMillis', 5000) / 1000) + scheduler = PollScheduler(self.__server_conf.get('pollSchedule')) scan_period = int(self.__server_conf.get('scanPeriodInMillis', 3600000) / 1000) if self.__enable_subscriptions: @@ -412,7 +416,9 @@ async def start_client(self): await self.__scan_device_nodes() if not self.__enable_subscriptions and monotonic() >= self.__next_poll: - self.__next_poll = monotonic() + poll_period + self.__next_poll = compute_next_poll( + monotonic(), poll_period, scheduler + ) await self.__poll_nodes() current_time = monotonic() diff --git a/thingsboard_gateway/connectors/request/request_connector.py b/thingsboard_gateway/connectors/request/request_connector.py index 802e460174f..bdfcb1803d3 100644 --- a/thingsboard_gateway/connectors/request/request_connector.py +++ b/thingsboard_gateway/connectors/request/request_connector.py @@ -37,6 +37,7 @@ from thingsboard_gateway.connectors.connector import Connector from thingsboard_gateway.connectors.request.json_request_uplink_converter import JsonRequestUplinkConverter from thingsboard_gateway.connectors.request.json_request_downlink_converter import JsonRequestDownlinkConverter +from thingsboard_gateway.tb_utility.poll_scheduler import PollScheduler class RequestConnector(Connector, Thread): @@ -230,6 +231,7 @@ def __fill_requests(self): self.__requests_in_progress.append({"config": endpoint, "converter": converter, "next_time": time(), + "scheduler": PollScheduler(endpoint.get("pollSchedule")), "request": request}) except Exception as e: self._log.exception(e) @@ -256,7 +258,12 @@ def __fill_rpc_requests(self): def __send_request(self, request, converter_queue, logger): url = "" try: - request["next_time"] = time() + request["config"].get("scanPeriod", 10) + scheduler = request.get("scheduler") + if scheduler and scheduler.is_active: + from time import monotonic + request["next_time"] = time() + (scheduler.next_poll_monotonic() - monotonic()) + else: + request["next_time"] = time() + request["config"].get("scanPeriod", 10) if request.get("converter") is None and isinstance(request["config"].get("converter"), dict): logger.error("Converter for request to '%s' endpoint is not defined. Request will be skipped.", request["config"].get("url")) return diff --git a/thingsboard_gateway/connectors/snmp/snmp_connector.py b/thingsboard_gateway/connectors/snmp/snmp_connector.py index a509bb173b4..b53e6af75e8 100644 --- a/thingsboard_gateway/connectors/snmp/snmp_connector.py +++ b/thingsboard_gateway/connectors/snmp/snmp_connector.py @@ -24,6 +24,9 @@ from thingsboard_gateway.gateway.entities.converted_data import ConvertedData from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader +from thingsboard_gateway.tb_utility.poll_scheduler import ( + PollScheduler, compute_next_poll +) from thingsboard_gateway.tb_utility.tb_utility import TBUtility from thingsboard_gateway.tb_utility.tb_logger import init_logger @@ -94,7 +97,17 @@ async def _run(self): current_time = time() * 1000 for device in self.__devices: try: - if device.get("previous_poll_time", 0) + device.get("pollPeriod", 10000) < current_time: + if "__scheduler" not in device: + device["__scheduler"] = PollScheduler(device.get("pollSchedule")) + scheduler = device["__scheduler"] + if scheduler.is_active: + from time import monotonic as _mono + if "__next_poll_mono" not in device: + device["__next_poll_mono"] = scheduler.next_poll_monotonic() + if _mono() >= device["__next_poll_mono"]: + await self.__process_data(device) + device["__next_poll_mono"] = scheduler.next_poll_monotonic() + elif device.get("previous_poll_time", 0) + device.get("pollPeriod", 10000) < current_time: await self.__process_data(device) device["previous_poll_time"] = current_time except Exception as e: diff --git a/thingsboard_gateway/tb_utility/poll_scheduler.py b/thingsboard_gateway/tb_utility/poll_scheduler.py new file mode 100644 index 00000000000..d9c0c4eed7d --- /dev/null +++ b/thingsboard_gateway/tb_utility/poll_scheduler.py @@ -0,0 +1,129 @@ +from time import time, monotonic +from datetime import datetime +from typing import Union, List, Optional +import logging + +from croniter import croniter + +log = logging.getLogger(__name__) + + +class PollScheduleEntry: + """Wraps a single cron expression with validation and fire-time computation.""" + + def __init__(self, cron: str, label: Optional[str] = None): + self.cron = cron + self.label = label or cron + if not croniter.is_valid(cron): + raise ValueError(f"Invalid cron expression: '{cron}'") + + def next_fire_time(self, base: datetime) -> datetime: + """Return the next fire time after the given base datetime.""" + return croniter(self.cron, base).get_next(datetime) + + +class PollScheduler: + """ + Manages one or more cron schedules and computes the next poll + time as a monotonic() value. + + Accepts three config formats: + - str: single cron expression + - list: list of strings or dicts with 'cron' and optional 'label' + - None: inactive scheduler (falls back to pollPeriod) + """ + + def __init__(self, config: Union[str, list, None]): + self.entries: List[PollScheduleEntry] = [] + self._parse_config(config) + if self.entries: + labels = ", ".join(e.label for e in self.entries) + log.info( + "PollScheduler initialized with %d schedule(s): %s", + len(self.entries), labels + ) + + def _parse_config(self, config): + if config is None: + return + + if isinstance(config, str): + self.entries.append(PollScheduleEntry(config)) + return + + if isinstance(config, list): + for item in config: + if isinstance(item, str): + self.entries.append(PollScheduleEntry(item)) + elif isinstance(item, dict): + self.entries.append(PollScheduleEntry( + cron=item['cron'], + label=item.get('label') + )) + else: + raise ValueError( + f"Invalid pollSchedule entry: {item}" + ) + return + + raise ValueError( + f"pollSchedule must be str, list, or None — " + f"got {type(config).__name__}" + ) + + @property + def is_active(self) -> bool: + """Return True if at least one schedule entry is configured.""" + return len(self.entries) > 0 + + def next_poll_monotonic(self) -> float: + """ + Compute the next poll time across all schedule entries and + return it as a monotonic() timestamp. + + When multiple entries are configured, the nearest upcoming + fire time wins. + """ + now = datetime.now() + now_mono = monotonic() + + best_delta = None + best_entry = None + + for entry in self.entries: + next_fire = entry.next_fire_time(now) + delta = (next_fire - now).total_seconds() + if best_delta is None or delta < best_delta: + best_delta = delta + best_entry = entry + + target_mono = now_mono + best_delta + + log.debug( + "Poll schedule [%s]: next poll in %.1fs at %s", + best_entry.label, + best_delta, + datetime.fromtimestamp(time() + best_delta).strftime("%H:%M:%S") + ) + + return target_mono + + +def compute_next_poll(current_monotonic: float, + poll_period_sec: float, + scheduler: Optional[PollScheduler] = None) -> float: + """ + Unified next-poll-time computation. Drop-in replacement for all connectors. + + Args: + current_monotonic: The current monotonic() timestamp. + poll_period_sec: The configured poll period in seconds (fallback). + scheduler: Optional PollScheduler instance. + + Returns: + The monotonic() timestamp for the next poll. + """ + if scheduler is not None and scheduler.is_active: + return scheduler.next_poll_monotonic() + else: + return current_monotonic + poll_period_sec