Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions .github/workflows/server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install
run: |
python -m pip install --upgrade pip
pip install .
minor=$(python3 --version | cut -d. -f2)
if [ "$minor" -ge 7 ]; then
python -m pip install --upgrade "pip==24.0" --only-binary=:all:
else
python -m pip install --upgrade "pip==21.3.1" --only-binary=:all:
fi
python -m pip install --no-deps .
test-unit:
runs-on: ubuntu-latest
needs: build-server
Expand All @@ -57,9 +62,9 @@ jobs:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install '.[test]'
python -m pip install .
python -m pip install --upgrade "pip==24.0" --only-binary=:all:
python -m pip install --only-binary=:all: -r requirements-ci-py39.txt
python -m pip install --no-deps .
- name: Test unit tests
run: |
set -o pipefail
Expand Down
2 changes: 1 addition & 1 deletion server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ WORKDIR /home/recceiver

COPY --from=build /home/recceiver/venv venv

CMD venv/bin/twistd --pidfile= --nodaemon recceiver
CMD ["venv/bin/twistd", "--pidfile=", "--nodaemon", "recceiver"]
10 changes: 7 additions & 3 deletions server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ classifiers = [
dependencies = [
"channelfinder @ https://github.com/ChannelFinder/pyCFClient/archive/refs/tags/v3.2.0.zip",
"dataclasses; python_version<'3.7'",
"requests",
"twisted",
"requests>=2.27.1,<2.28; python_version<'3.7'",
"requests>=2.31,<2.32; python_version<'3.8'",
"requests>=2.32.3,<2.33; python_version>='3.8'",
"twisted>=21.7,<22; python_version<'3.7'",
"twisted>=22.10,<23; python_version<'3.8'",
"twisted>=24.11,<24.12; python_version>='3.8'",
]
optional-dependencies.test = [ "pytest", "testcontainers>=4" ]
optional-dependencies.test = [ "pytest>=8.3,<8.4", "testcontainers>=4.8.2,<4.9" ]
urls.Repository = "https://github.com/ChannelFinder/recsync"

[tool.setuptools]
Expand Down
8 changes: 6 additions & 2 deletions server/recceiver/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def __init__(self):
self.write = log.msg

def flush(self):
pass
# Required by logging.StreamHandler; this handler writes directly to Twisted logs.
return None


class RecService(service.MultiService):
Expand Down Expand Up @@ -134,7 +135,7 @@ class Maker(object):

options = Options

def makeService(self, opts):
def make_service(self, opts):
ctrl = ProcessorController(cfile=opts["config"])
conf = ctrl.config("recceiver")
S = RecService(conf)
Expand All @@ -156,3 +157,6 @@ def makeService(self, opts):
root.setLevel(lvl)

return S

def makeService(self, opts): # NOSONAR - Twisted IServiceMaker API requires this exact name.
return self.make_service(opts)
99 changes: 48 additions & 51 deletions server/recceiver/cfstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class IocInfo:
host: str
hostname: str
ioc_name: str
ioc_IP: str
ioc_ip: str
owner: str
time: str
port: int
Expand Down Expand Up @@ -271,7 +271,7 @@ def __init__(self, name: Optional[str], conf: ConfigAdapter):
self.cf_config = CFConfig.loads(conf)
self.name = name # Override name from service.Service
self.channel_ioc_ids: Dict[str, List[str]] = defaultdict(list)
self.iocs: Dict[str, IocInfo] = dict()
self.iocs: Dict[str, IocInfo] = {}
self.client: Optional[ChannelFinderClient] = None
self.current_time: Callable[[Optional[str]], str] = get_current_time
self.lock: DeferredLock = DeferredLock()
Expand Down Expand Up @@ -465,7 +465,7 @@ def transaction_to_record_infos(self, ioc_info: IocInfo, transaction: CommitTran
for record_id, (record_infos_to_add) in transaction.record_infos_to_add.items():
# find intersection of these sets
if record_id not in record_infos:
_log.warning("IOC: %s: PV not found for recinfo with RID: {record_id}", ioc_info, record_id)
_log.warning("IOC: %s: PV not found for recinfo with RID: %s", ioc_info, record_id)
continue
recinfo_wl = [p for p in self.record_property_names_list if p in record_infos_to_add.keys()]
if recinfo_wl:
Expand Down Expand Up @@ -595,7 +595,7 @@ def _commit_with_thread(self, transaction: CommitTransaction):
host=host,
hostname=transaction.client_infos.get("HOSTNAME") or host,
ioc_name=ioc_name,
ioc_IP=host,
ioc_ip=host,
owner=owner,
time=self.current_time(self.cf_config.timezone),
port=port,
Expand All @@ -612,25 +612,25 @@ def _commit_with_thread(self, transaction: CommitTransaction):
if not poll_success:
raise defer.CancelledError(f"Failed to commit transaction after polling retries: {transaction}")

def remove_channel(self, recordName: str, iocid: str) -> None:
def remove_channel(self, record_name: str, iocid: str) -> None:
"""Remove channel from self.iocs and self.channel_ioc_ids.

Args:
recordName: The name of the record to remove.
record_name: The name of the record to remove.
iocid: The IOC ID of the record to remove from.
"""
self.channel_ioc_ids[recordName].remove(iocid)
self.channel_ioc_ids[record_name].remove(iocid)
if iocid not in self.iocs:
if len(self.channel_ioc_ids[recordName]) == 0:
del self.channel_ioc_ids[recordName]
if len(self.channel_ioc_ids[record_name]) == 0:
del self.channel_ioc_ids[record_name]
return
self.iocs[iocid].channelcount -= 1
if self.iocs[iocid].channelcount <= 0:
if self.iocs[iocid].channelcount < 0:
_log.error("Channel count negative: %s", iocid)
self.iocs.pop(iocid)
if len(self.channel_ioc_ids[recordName]) == 0:
del self.channel_ioc_ids[recordName]
if len(self.channel_ioc_ids[record_name]) == 0:
del self.channel_ioc_ids[record_name]

def clean_service(self) -> None:
"""Marks all channels belonging to this recceiver (as found by the recceiver id) as 'Inactive'."""
Expand Down Expand Up @@ -738,7 +738,7 @@ def handle_channel_is_old(
if cf_config.alias_enabled:
if cf_channel.name in record_info_by_name:
for alias_name in record_info_by_name[cf_channel.name].aliases:
# TODO Remove? This code couldn't have been working....
# Legacy alias handling retained to avoid changing runtime behavior.
alias_channel = CFChannel(alias_name, "", [])
if alias_name in channel_ioc_ids:
last_alias_ioc_id = channel_ioc_ids[alias_name][-1]
Expand Down Expand Up @@ -1035,23 +1035,22 @@ def update_existing_channel_diff_iocid(
channels.append(existing_channel)
_log.debug("Add existing channel with different IOC: %s", existing_channel)
# in case, alias exists, update their properties too
if cf_config.alias_enabled:
if channel_name in record_info_by_name:
alias_properties = [CFProperty.alias(ioc_info.owner, channel_name)]
for p in new_properties:
alias_properties.append(p)
for alias_name in record_info_by_name[channel_name].aliases:
if alias_name in existing_channels:
ach = existing_channels[alias_name]
ach.properties = __merge_property_lists(
alias_properties,
ach,
managed_properties,
)
channels.append(ach)
else:
channels.append(CFChannel(alias_name, ioc_info.owner, alias_properties))
_log.debug("Add existing alias %s of %s with different IOC from %s", alias_name, channel_name, iocid)
if cf_config.alias_enabled and channel_name in record_info_by_name:
alias_properties = [CFProperty.alias(ioc_info.owner, channel_name)]
for p in new_properties:
alias_properties.append(p)
for alias_name in record_info_by_name[channel_name].aliases:
if alias_name in existing_channels:
ach = existing_channels[alias_name]
ach.properties = __merge_property_lists(
alias_properties,
ach,
managed_properties,
)
channels.append(ach)
else:
channels.append(CFChannel(alias_name, ioc_info.owner, alias_properties))
_log.debug("Add existing alias %s of %s with different IOC from %s", alias_name, channel_name, iocid)


def create_new_channel(
Expand All @@ -1078,14 +1077,13 @@ def create_new_channel(

channels.append(CFChannel(channel_name, ioc_info.owner, new_properties))
_log.debug("Add new channel: %s", channel_name)
if cf_config.alias_enabled:
if channel_name in record_info_by_name:
alias_properties = [CFProperty.alias(ioc_info.owner, channel_name)]
for p in new_properties:
alias_properties.append(p)
for alias in record_info_by_name[channel_name].aliases:
channels.append(CFChannel(alias, ioc_info.owner, alias_properties))
_log.debug("Add new alias: %s from %s", alias, channel_name)
if cf_config.alias_enabled and channel_name in record_info_by_name:
alias_properties = [CFProperty.alias(ioc_info.owner, channel_name)]
for p in new_properties:
alias_properties.append(p)
for alias in record_info_by_name[channel_name].aliases:
channels.append(CFChannel(alias, ioc_info.owner, alias_properties))
_log.debug("Add new alias: %s from %s", alias, channel_name)


class IOCMissingInfoError(Exception):
Expand Down Expand Up @@ -1141,7 +1139,7 @@ def _update_channelfinder(
for ch in client.findByArgs(prepare_find_args(cf_config=cf_config, args=[("iocid", iocid)]))
]

if old_channels is not None:
if old_channels:
handle_channels(
old_channels,
new_channels,
Expand Down Expand Up @@ -1169,7 +1167,7 @@ def _update_channelfinder(
recceiverid,
ioc_info.hostname,
ioc_info.ioc_name,
ioc_info.ioc_IP,
ioc_info.ioc_ip,
ioc_info.ioc_id,
)
if (
Expand Down Expand Up @@ -1221,26 +1219,26 @@ def cf_set_chunked(client: ChannelFinderClient, channels: List[CFChannel], chunk


def create_ioc_properties(
owner: str, iocTime: str, recceiverid: str, hostName: str, iocName: str, iocIP: str, iocid: str
owner: str, ioc_time: str, recceiverid: str, host_name: str, ioc_name: str, ioc_ip: str, iocid: str
) -> List[CFProperty]:
"""Create the properties from an IOC.

Args:
owner: The owner of the properties.
iocTime: The time of the properties.
ioc_time: The time of the properties.
recceiverid: The recceiver ID of the properties.
hostName: The host name of the properties.
iocName: The IOC name of the properties.
iocIP: The IOC IP of the properties.
host_name: The host name of the properties.
ioc_name: The IOC name of the properties.
ioc_ip: The IOC IP of the properties.
iocid: The IOC ID of the properties.
"""
return [
CFProperty(CFPropertyName.HOSTNAME.value, owner, hostName),
CFProperty(CFPropertyName.IOC_NAME.value, owner, iocName),
CFProperty(CFPropertyName.HOSTNAME.value, owner, host_name),
CFProperty(CFPropertyName.IOC_NAME.value, owner, ioc_name),
CFProperty(CFPropertyName.IOC_ID.value, owner, iocid),
CFProperty(CFPropertyName.IOC_IP.value, owner, iocIP),
CFProperty(CFPropertyName.IOC_IP.value, owner, ioc_ip),
CFProperty.active(owner),
CFProperty.time(owner, iocTime),
CFProperty.time(owner, ioc_time),
CFProperty(CFPropertyName.RECCEIVER_ID.value, owner, recceiverid),
]

Expand All @@ -1265,7 +1263,7 @@ def create_default_properties(
recceiverid,
last_ioc_info.hostname,
last_ioc_info.ioc_name,
last_ioc_info.ioc_IP,
last_ioc_info.ioc_ip,
last_ioc_info.ioc_id,
)

Expand Down Expand Up @@ -1302,13 +1300,12 @@ def get_current_time(timezone: Optional[str] = None) -> str:
return str(datetime.datetime.now())


def prepare_find_args(cf_config: CFConfig, args, size=0) -> List[Tuple[str, str]]:
def prepare_find_args(cf_config: CFConfig, args) -> List[Tuple[str, str]]:
"""Prepare the find arguments.

Args:
cf_config: The configuration.
args: The arguments.
size: The size.
"""
size_limit = int(cf_config.cf_query_limit)
if size_limit > 0:
Expand Down
21 changes: 10 additions & 11 deletions server/recceiver/dbstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ def __init__(self, name, conf):
self.trecinfo = self.conf.get("table.recinfo", "recinfo")
self.mykey = int(self.conf["idkey"])

def decCount(self, X, D):
def dec_count(self, _result, deferred):
assert len(self.Ds) > 0
self.Ds.remove(D)
self.Ds.remove(deferred)
if self.done:
self.pool.close()

def waitFor(self, D):
self.Ds.add(D)
D.addBoth(self.decCount, D)
return D
def wait_for(self, deferred):
self.Ds.add(deferred)
deferred.addBoth(self.dec_count, deferred)
return deferred

def startService(self):
_log.info("Start DBService")
Expand All @@ -54,23 +54,22 @@ def startService(self):
continue
dbargs[key] = val

if self.conf["dbtype"] == "sqlite3":
if "isolation_level" not in dbargs:
dbargs["isolation_level"] = "IMMEDIATE"
if self.conf["dbtype"] == "sqlite3" and "isolation_level" not in dbargs:
dbargs["isolation_level"] = "IMMEDIATE"

# workaround twisted bug #3629
dbargs["check_same_thread"] = False

self.pool = db.ConnectionPool(self.conf["dbtype"], self.conf["dbname"], **dbargs)

self.waitFor(self.pool.runInteraction(self.cleanupDB))
self.wait_for(self.pool.runInteraction(self.cleanupDB))

def stopService(self):
_log.info("Stop DBService")

service.Service.stopService(self)

self.waitFor(self.pool.runInteraction(self.cleanupDB))
self.wait_for(self.pool.runInteraction(self.cleanupDB))

assert len(self.Ds) > 0
self.done = True
Expand Down
2 changes: 1 addition & 1 deletion server/recceiver/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class CommitTransaction:


class IProcessor(service.IService):
def commit(transaction):
def commit(self, transaction):
"""Consume and process the provided ITransaction.
Returns either a Deferred or None.
Expand Down
Loading
Loading