Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ service-identity
psutil
cryptography
PySocks
croniter>=1.3.8
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
'service-identity',
'psutil',
'PySocks',
'croniter>=1.3.8',
],
download_url='https://github.com/thingsboard/thingsboard-gateway/archive/%s.tar.gz' % version.VERSION,
entry_points={
Expand Down
129 changes: 129 additions & 0 deletions tests/unit/test_poll_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import pytest
from time import monotonic
from datetime import datetime
from unittest.mock import patch

from thingsboard_gateway.tb_utility.poll_scheduler import (
PollScheduleEntry, PollScheduler, compute_next_poll
)


# --- Config parsing ---

class TestPollSchedulerParsing:
def test_parse_string(self):
s = PollScheduler("*/5 * * * *")
assert s.is_active
assert len(s.entries) == 1
assert s.entries[0].label == "*/5 * * * *"

def test_parse_list_of_strings(self):
s = PollScheduler(["*/5 * * * *", "*/10 * * * *"])
assert s.is_active
assert len(s.entries) == 2

def test_parse_list_of_dicts(self):
s = PollScheduler([{"cron": "*/5 * * * *", "label": "prod"}])
assert s.is_active
assert len(s.entries) == 1
assert s.entries[0].label == "prod"

def test_parse_mixed_list(self):
s = PollScheduler([
"*/5 * * * *",
{"cron": "*/10 * * * *", "label": "slow"}
])
assert len(s.entries) == 2
assert s.entries[0].label == "*/5 * * * *"
assert s.entries[1].label == "slow"

def test_parse_none(self):
s = PollScheduler(None)
assert not s.is_active
assert len(s.entries) == 0

def test_parse_empty_list(self):
s = PollScheduler([])
assert not s.is_active

def test_invalid_cron(self):
with pytest.raises(ValueError, match="Invalid cron expression"):
PollScheduler("not a cron")

def test_invalid_type(self):
with pytest.raises(ValueError, match="pollSchedule must be str, list, or None"):
PollScheduler(12345)


# --- Schedule computation ---

class TestScheduleComputation:
def test_next_fire_single(self):
s = PollScheduler("*/1 * * * *")
result = s.next_poll_monotonic()
assert result > monotonic()

def test_next_fire_multiple_nearest_wins(self):
# Every minute should fire sooner than every 30 minutes
s = PollScheduler(["*/30 * * * *", "*/1 * * * *"])
result = s.next_poll_monotonic()
now = monotonic()
# Should be within ~60 seconds (the every-minute schedule)
assert result - now <= 61.0

def test_next_fire_distant(self):
# Schedule for a specific unlikely time — still returns a valid future value
s = PollScheduler("0 3 1 1 *") # Jan 1 at 3am
result = s.next_poll_monotonic()
assert result > monotonic()


# --- compute_next_poll() ---

class TestComputeNextPoll:
def test_compute_with_active_scheduler(self):
s = PollScheduler("*/1 * * * *")
now = monotonic()
result = compute_next_poll(now, 5.0, s)
# Should NOT be now + 5.0 (that's the fallback)
assert result != now + 5.0
assert result > now

def test_compute_without_scheduler(self):
now = monotonic()
result = compute_next_poll(now, 5.0, None)
assert result == now + 5.0

def test_compute_with_inactive_scheduler(self):
s = PollScheduler(None)
now = monotonic()
result = compute_next_poll(now, 5.0, s)
assert result == now + 5.0


# --- PollScheduleEntry ---

class TestPollScheduleEntry:
def test_next_fire_time(self):
entry = PollScheduleEntry("*/5 * * * *")
base = datetime(2025, 1, 1, 12, 3, 0)
nxt = entry.next_fire_time(base)
assert nxt == datetime(2025, 1, 1, 12, 5, 0)

def test_label_defaults_to_cron(self):
entry = PollScheduleEntry("*/5 * * * *")
assert entry.label == "*/5 * * * *"

def test_custom_label(self):
entry = PollScheduleEntry("*/5 * * * *", label="fast")
assert entry.label == "fast"


class TestSchedulerWallClockDelta:
def test_scheduler_wall_clock_delta(self):
"""Verify next_poll_monotonic() - monotonic() produces a positive delta
suitable for wall-clock offset (used by Request connector pattern)."""
s = PollScheduler("*/1 * * * *")
delta = s.next_poll_monotonic() - monotonic()
assert delta > 0
assert delta <= 61.0
1 change: 1 addition & 0 deletions thingsboard_gateway/config/bacnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"port": "47808",
"mask": "24",
"pollPeriod": 10000,
"pollSchedule": null,
"attributes": [
{
"key": "temperature",
Expand Down
1 change: 1 addition & 0 deletions thingsboard_gateway/config/ble.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"name": "Temperature and humidity sensor",
"MACAddress": "4C:65:A8:DF:85:C0",
"pollPeriod": 500000,
"pollSchedule": null,
"showMap": false,
"timeout": 10000,
"connectRetry": 5,
Expand Down
2 changes: 2 additions & 0 deletions thingsboard_gateway/config/can.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"polling": {
"type": "always",
"period": 5,
"pollSchedule": null,
"dataInHex": "aaaa bbbb aaaa bbbb"
}
},
Expand All @@ -47,6 +48,7 @@
"polling": {
"type": "always",
"period": 30,
"pollSchedule": null,
"dataInHex": "aa bb cc dd ee ff aa bb"
}
}
Expand Down
1 change: 1 addition & 0 deletions thingsboard_gateway/config/ftp.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"readMode": "FULL",
"maxFileSize": 5,
"pollPeriod": 500,
"pollSchedule": null,
"txtFileDataView": "SLICED",
"withSortingFiles": true,
"attributes": [
Expand Down
1 change: 1 addition & 0 deletions thingsboard_gateway/config/modbus.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"retryOnEmpty": true,
"retryOnInvalid": true,
"pollPeriod": 5000,
"pollSchedule": null,
"unitId": 1,
"deviceName": "Temp Sensor",
"sendDataOnlyOnChange": true,
Expand Down
1 change: 1 addition & 0 deletions thingsboard_gateway/config/odbc.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"polling": {
"query": "SELECT bool_v, str_v, dbl_v, long_v, entity_id, ts FROM ts_kv WHERE ts > ? ORDER BY ts ASC LIMIT 10",
"period": 10,
"pollSchedule": null,
"iterator": {
"column": "ts",
"query": "SELECT MIN(ts) - 1 FROM ts_kv",
Expand Down
1 change: 1 addition & 0 deletions thingsboard_gateway/config/opcua.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"timeoutInMillis": 5000,
"scanPeriodInMillis": 3600000,
"pollPeriodInMillis": 5000,
"pollSchedule": null,
"enableSubscriptions": true,
"subCheckPeriodInMillis": 100,
"subDataMaxBatchSize": 1000,
Expand Down
3 changes: 3 additions & 0 deletions thingsboard_gateway/config/request.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"allowRedirects": true,
"timeout": 0.5,
"scanPeriod": 5,
"pollSchedule": null,
"dataUnpackExpression": "",
"converter": {
"type": "json",
Expand Down Expand Up @@ -55,6 +56,7 @@
"allowRedirects": true,
"timeout": 0.5,
"scanPeriod": 100,
"pollSchedule": null,
"dataUnpackExpression": "",
"converter": {
"type": "custom",
Expand Down Expand Up @@ -135,6 +137,7 @@
"allowRedirects": true,
"timeout": 3.0,
"scanPeriod": 1800,
"pollSchedule": null,
"subRequests": {
"cumPwr": {
"url": "/${id}/hist/values/ActiveEnergy/SUM13/3600?start=RELATIVE_-1HOUR&end=RELATIVE_-1HOUR&online=false&aggregate=false",
Expand Down
2 changes: 2 additions & 0 deletions thingsboard_gateway/config/snmp.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"ip": "snmp.live.gambitcommunications.com",
"port": 161,
"pollPeriod": 5000,
"pollSchedule": null,
"community": "public",
"attributes": [
{
Expand Down Expand Up @@ -117,6 +118,7 @@
"deviceType": "snmp",
"ip": "127.0.0.1",
"pollPeriod": 5000,
"pollSchedule": null,
"community": "public",
"converter": "CustomSNMPConverter",
"attributes": [
Expand Down
12 changes: 10 additions & 2 deletions thingsboard_gateway/connectors/bacnet/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from thingsboard_gateway.connectors.bacnet.entities.uplink_converter_config import UplinkConverterConfig
from thingsboard_gateway.gateway.constants import UPLINK_PREFIX, CONVERTER_PARAMETER
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
from thingsboard_gateway.tb_utility.poll_scheduler import (
PollScheduler, compute_next_poll
)
from thingsboard_gateway.tb_utility.tb_utility import TBUtility


Expand Down Expand Up @@ -74,6 +77,7 @@ def __init__(self, connector_type, config, i_am_request, reading_queue, rescan_q

self.__config_poll_period = self.__config.get('pollPeriod', 10000) / 1000
self.__poll_period = self.__config_poll_period
self.__scheduler = PollScheduler(self.__config.get('pollSchedule'))
self.attributes_updates = self.__config.get('attributeUpdates', [])
self.shared_attributes_keys = self.__get_shared_attributes_keys()
self.server_side_rpc = self.__config.get('serverSideRpc', [])
Expand Down Expand Up @@ -138,15 +142,19 @@ async def run(self):
if len(self.uplink_converter_config.objects_to_read) > 0:
self.__request_process_queue.put_nowait(self)

next_poll_time = monotonic() + self.__poll_period
next_poll_time = compute_next_poll(
monotonic(), self.__poll_period, self.__scheduler
)

while not self.__stopped:
current_time = monotonic()
if current_time >= next_poll_time:
if len(self.uplink_converter_config.objects_to_read) > 0:
self.__request_process_queue.put_nowait(self)

next_poll_time = current_time + self.__poll_period
next_poll_time = compute_next_poll(
current_time, self.__poll_period, self.__scheduler
)

sleep_time = max(0.0, next_poll_time - current_time)

Expand Down
21 changes: 19 additions & 2 deletions thingsboard_gateway/connectors/ble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
from thingsboard_gateway.gateway.statistics.decorators import CollectStatistics # noqa
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
from thingsboard_gateway.connectors.ble.error_handler import ErrorHandler
from thingsboard_gateway.tb_utility.poll_scheduler import (
PollScheduler, compute_next_poll
)

MAC_ADDRESS_FORMAT = {
'Darwin': '-',
Expand Down Expand Up @@ -64,6 +67,7 @@ def __init__(self, config, logger):
self._log.error(e)

self.poll_period = config.get('pollPeriod', 5000) / 1000
self.scheduler = PollScheduler(config.get('pollSchedule'))
self.config = self._generate_config(config)
self.adv_only = self._check_adv_mode()
self.callback = config['callback']
Expand Down Expand Up @@ -125,10 +129,16 @@ def __load_converter(self, name):
self.stopped = True

async def timer(self, scanned_devices_callback):
next_poll_time = compute_next_poll(
time(), self.poll_period, self.scheduler
)
while True:
try:
if time() - self.last_polled_time >= self.poll_period:
if time() >= next_poll_time:
self.last_polled_time = time()
next_poll_time = compute_next_poll(
time(), self.poll_period, self.scheduler
)
await self.__process_self()
await self._process_adv_data(scanned_devices_callback=scanned_devices_callback)
else:
Expand Down Expand Up @@ -284,9 +294,16 @@ async def run_client(self, scanned_devices_callback):

await self.timer(scanned_devices_callback)
else:
next_poll_time = compute_next_poll(
time(), self.poll_period, self.scheduler
)
while not self.stopped:
await self._process_adv_data(scanned_devices_callback)
await sleep(self.poll_period)
sleep_time = max(0.0, next_poll_time - time())
await sleep(sleep_time)
next_poll_time = compute_next_poll(
time(), self.poll_period, self.scheduler
)

async def __show_map(self, return_result=False):
result = f'MAP FOR {self.name.upper()}'
Expand Down
16 changes: 15 additions & 1 deletion thingsboard_gateway/connectors/can/can_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
from thingsboard_gateway.connectors.can.bytes_can_downlink_converter import BytesCanDownlinkConverter
from thingsboard_gateway.connectors.can.bytes_can_uplink_converter import BytesCanUplinkConverter
from thingsboard_gateway.connectors.connector import Connector
from thingsboard_gateway.tb_utility.poll_scheduler import (
PollScheduler, compute_next_poll
)


class CanConnector(Connector, Thread):
Expand Down Expand Up @@ -672,6 +675,7 @@ def __init__(self, connector: CanConnector):
super().__init__()
self.connector = connector
self.scheduler = sched.scheduler(time.time, time.sleep)
self._poll_schedulers = {}
self.events = []
self.first_run = True
self.daemon = True
Expand Down Expand Up @@ -714,5 +718,15 @@ def __poll_and_schedule(self, data, config):
self.connector.get_name(), config["period"], config["nodeId"], data)
self.connector.send_data_to_bus(data, config, raise_exception=self.first_run)

event = self.scheduler.enter(config["period"], 1, self.__poll_and_schedule, argument=(data, config))
key = config.get("key", "")
if key not in self._poll_schedulers:
self._poll_schedulers[key] = PollScheduler(config.get("pollSchedule"))
ps = self._poll_schedulers[key]
if ps.is_active:
from time import monotonic as _mono
delay = max(0.0, ps.next_poll_monotonic() - _mono())
else:
delay = config["period"]

event = self.scheduler.enter(delay, 1, self.__poll_and_schedule, argument=(data, config))
self.events.append(event)
Loading
Loading