Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,34 @@ NetworkHIDRelay
A :any:`NetworkHIDRelay` describes an `HIDRelay`_ resource available on a
remote computer.

ManagedGPIO
+++++++++++

A :any:`ManagedGPIO` resource describes a GPIO line that is managed by the
`gpio-manager <https://libgpiod.readthedocs.io/en/stable/gpio-manager.html>`__
daemon.

.. note::
The ``gpio-manager`` daemon needs to be running on the same system as the
gpiochip and configured to manage the specified GPIO line. See the
`gpiocli request <https://libgpiod.readthedocs.io/en/stable/gpiocli-request.html>`__
documentation for details.

.. code-block:: yaml

ManagedGPIO:
chip: /dev/gpiochip0
pin: 0

Arguments:
- chip (str | int): path to the gpiochip device, e.g. ``/dev/gpiochip0`` or
the gpiochip number, e.g. ``0``.
- pin (str | int): gpio pin name or offset within the gpiochip, e.g. ``0``

Used by:
- `GpioDigitalOutputDriver`_


SysfsGPIO
+++++++++

Expand All @@ -624,11 +652,42 @@ Arguments:
Used by:
- `GpioDigitalOutputDriver`_

NetworkManagedGPIO
++++++++++++++++++
A :any:`NetworkManagedGPIO` describes a `ManagedGPIO`_ resource available on a
remote computer.

NetworkSysfsGPIO
++++++++++++++++
A :any:`NetworkSysfsGPIO` describes a `SysfsGPIO`_ resource available on a
remote computer.

MatchedManagedGPIO
++++++++++++++++++
A :any:`MatchedManagedGPIO` describes a GPIO line, like a `ManagedGPIO`_.
The gpiochip is identified by matching udev properties. This allows
identification through hot-plugging or rebooting for controllers like
USB based gpiochips.

.. code-block:: yaml

MatchedManagedGPIO:
match:
'@SUBSYSTEM': 'usb'
'@ID_SERIAL_SHORT': 'D38EJ8LF'
pin: 0

The example would search for a USB gpiochip with the key ``ID_SERIAL_SHORT``
and the value ``D38EJ8LF`` and use the pin 0 of this device.
The ``ID_SERIAL_SHORT`` property is set by the usb_id builtin helper program.

Arguments:
- match (dict): key and value pairs for a udev match, see `udev Matching`_
- pin (str | int): gpio pin name or offset within the matched gpiochip.

Used by:
- `GpioDigitalOutputDriver`_

MatchedSysfsGPIO
++++++++++++++++
A :any:`MatchedSysfsGPIO` describes a GPIO line, like a `SysfsGPIO`_.
Expand Down Expand Up @@ -2357,13 +2416,16 @@ GpioDigitalOutputDriver
~~~~~~~~~~~~~~~~~~~~~~~
The :any:`GpioDigitalOutputDriver` writes a digital signal to a GPIO line.

This driver configures GPIO lines via
`the sysfs kernel interface <https://www.kernel.org/doc/html/latest/gpio/sysfs.html>`__.
This driver configures GPIO lines via `gpio-manager <https://libgpiod.readthedocs.io/en/stable/gpio-manager.html>`__
or `the legacy sysfs kernel interface <https://docs.kernel.org/next/admin-guide/gpio/sysfs.html>`__.
While the driver automatically exports the GPIO, it does not configure it in
any other way than as an output.

Binds to:
gpio:
- `ManagedGPIO`_
- `MatchedManagedGPIO`_
- `NetworkManagedGPIO`_
- `SysfsGPIO`_
- `MatchedSysfsGPIO`_
- `NetworkSysfsGPIO`_
Expand Down
9 changes: 9 additions & 0 deletions examples/managed-gpio/import-gpio.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
targets:
main:
resources:
RemotePlace:
name: gpio
drivers:
GpioDigitalOutputDriver: {}
options:
coordinator_address: 'labgrid:20408'
28 changes: 28 additions & 0 deletions examples/managed-gpio/managed_gpio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging
import time

from labgrid import Target
from labgrid.logging import basicConfig, StepLogger
from labgrid.driver import GpioDigitalOutputDriver
from labgrid.resource import ManagedGPIO

# enable info logging
basicConfig(level=logging.INFO)

# show labgrid steps on the console
StepLogger.start()

t = Target("main")
r = ManagedGPIO(t, name=None, chip="/dev/gpiochip0", pin=0)
d = GpioDigitalOutputDriver(t, name=None)

p = t.get_driver("DigitalOutputProtocol")
print(t.resources)
p.set(True)
print(p.get())
time.sleep(2)
p.set(False)
print(p.get())
time.sleep(2)
p.set(True)
print(p.get())
25 changes: 25 additions & 0 deletions examples/managed-gpio/managed_gpio_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging
import time

from labgrid import Environment
from labgrid.logging import basicConfig, StepLogger

# enable info logging
basicConfig(level=logging.INFO)

# show labgrid steps on the console
StepLogger.start()

e = Environment("import-gpio.yaml")
t = e.get_target()

p = t.get_driver("DigitalOutputProtocol")
print(t.resources)
p.set(True)
print(p.get())
time.sleep(2)
p.set(False)
print(p.get())
time.sleep(2)
p.set(True)
print(p.get())
44 changes: 33 additions & 11 deletions labgrid/driver/gpiodriver.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""All GPIO-related drivers"""
from typing import Union

import attr

from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..resource.remote import NetworkSysfsGPIO
from ..resource.base import ManagedGPIO, SysfsGPIO
from ..resource.remote import NetworkManagedGPIO, NetworkSysfsGPIO
from ..resource.udev import MatchedManagedGPIO, MatchedSysfsGPIO
from ..step import step
from .common import Driver
from ..util.agentwrapper import AgentWrapper
Expand All @@ -12,22 +16,34 @@
@target_factory.reg_driver
@attr.s(eq=False)
class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol):
gpio: Union[ManagedGPIO, MatchedManagedGPIO, NetworkManagedGPIO, SysfsGPIO, MatchedSysfsGPIO, NetworkSysfsGPIO]

bindings = {
"gpio": {"SysfsGPIO", "MatchedSysfsGPIO", "NetworkSysfsGPIO"},
"gpio": {
"ManagedGPIO",
"MatchedManagedGPIO",
"NetworkManagedGPIO",
"SysfsGPIO",
"MatchedSysfsGPIO",
"NetworkSysfsGPIO",
},
}

def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None

def on_activate(self):
if isinstance(self.gpio, NetworkSysfsGPIO):
host = self.gpio.host
else:
host = None
host = self.gpio.host if isinstance(self.gpio, (NetworkSysfsGPIO, NetworkManagedGPIO)) else None

self.wrapper = AgentWrapper(host)
self.proxy = self.wrapper.load('sysfsgpio')

self.is_sysfs = isinstance(self.gpio, (SysfsGPIO, MatchedSysfsGPIO, NetworkSysfsGPIO))

if self.is_sysfs:
self.proxy = self.wrapper.load('sysfsgpio')
else:
self.proxy = self.wrapper.load('managed_gpio')

def on_deactivate(self):
self.wrapper.close()
Expand All @@ -36,10 +52,16 @@ def on_deactivate(self):

@Driver.check_active
@step(args=['status'])
def set(self, status):
self.proxy.set(self.gpio.index, status)
def set(self, status: bool) -> None:
if self.is_sysfs:
self.proxy.set(self.gpio.index, status)
else:
self.proxy.set(self.gpio.chip, self.gpio.pin, status)

@Driver.check_active
@step(result=True)
def get(self):
return self.proxy.get(self.gpio.index)
def get(self) -> bool:
if self.is_sysfs:
return self.proxy.get(self.gpio.index)

return self.proxy.get(self.gpio.chip, self.gpio.pin)
14 changes: 10 additions & 4 deletions labgrid/remote/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ def power(self):
name = self.args.name
target = self._get_target(place)
from ..resource.power import NetworkPowerPort, PDUDaemonPort
from ..resource.remote import NetworkUSBPowerPort, NetworkSiSPMPowerPort, NetworkSysfsGPIO
from ..resource.remote import NetworkUSBPowerPort, NetworkSiSPMPowerPort, NetworkManagedGPIO, NetworkSysfsGPIO
from ..resource import TasmotaPowerPort, NetworkYKUSHPowerPort

drv = None
Expand All @@ -960,7 +960,7 @@ def power(self):
drv = self._get_driver_or_new(target, "TasmotaPowerDriver", name=name)
elif isinstance(resource, NetworkYKUSHPowerPort):
drv = self._get_driver_or_new(target, "YKUSHPowerDriver", name=name)
elif isinstance(resource, NetworkSysfsGPIO):
elif isinstance(resource, (NetworkManagedGPIO, NetworkSysfsGPIO)):
self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name)
drv = self._get_driver_or_new(target, "DigitalOutputPowerDriver", name=name)
if drv:
Expand All @@ -980,7 +980,13 @@ def digital_io(self):
name = self.args.name
target = self._get_target(place)
from ..resource import ModbusTCPCoil, OneWirePIO, HttpDigitalOutput, WaveshareModbusTCPCoil
from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay
from ..resource.remote import (
NetworkDeditecRelais8,
NetworkHIDRelay,
NetworkLXAIOBusPIO,
NetworkManagedGPIO,
NetworkSysfsGPIO,
)

drv = None
try:
Expand All @@ -999,7 +1005,7 @@ def digital_io(self):
drv = self._get_driver_or_new(target, "HttpDigitalOutputDriver", name=name)
elif isinstance(resource, NetworkDeditecRelais8):
drv = self._get_driver_or_new(target, "DeditecRelaisDriver", name=name)
elif isinstance(resource, NetworkSysfsGPIO):
elif isinstance(resource, (NetworkManagedGPIO, NetworkSysfsGPIO)):
drv = self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name)
elif isinstance(resource, NetworkLXAIOBusPIO):
drv = self._get_driver_or_new(target, "LXAIOBusPIODriver", name=name)
Expand Down
33 changes: 32 additions & 1 deletion labgrid/remote/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from urllib.parse import urlsplit
import warnings
from pathlib import Path
from typing import Dict, Type
from typing import Any, Dict, Type
from socket import gethostname, getfqdn

import attr
Expand Down Expand Up @@ -639,6 +639,37 @@ def _get_params(self):
exports["SNMPEthernetPort"] = EthernetPortExport


@attr.s(eq=False)
class ManagedGPIOExport(ResourceExport):
"""ResourceExport for GPIO lines accessed via gpio-manager D-Bus service"""

def __attrs_post_init__(self):
super().__attrs_post_init__()

if self.cls == "ManagedGPIO":
from ..resource.base import ManagedGPIO

self.local = ManagedGPIO(target=None, name=None, **self.local_params)
elif self.cls == "MatchedManagedGPIO":
from ..resource.udev import MatchedManagedGPIO

self.local = MatchedManagedGPIO(target=None, name=None, **self.local_params)

self.data["cls"] = "NetworkManagedGPIO"

def _get_params(self) -> dict[str, Any]:
"""Helper function to return parameters"""
return {
"host": self.host,
"chip": self.local.chip,
"pin": self.local.pin,
}


exports["ManagedGPIO"] = ManagedGPIOExport
exports["MatchedManagedGPIO"] = ManagedGPIOExport


@attr.s(eq=False)
class GPIOSysFSExport(ResourceExport):
_gpio_sysfs_path_prefix = "/sys/class/gpio"
Expand Down
3 changes: 2 additions & 1 deletion labgrid/resource/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import SerialPort, NetworkInterface, EthernetPort, SysfsGPIO
from .base import SerialPort, NetworkInterface, EthernetPort, ManagedGPIO, SysfsGPIO
from .ethernetport import SNMPEthernetPort
from .serialport import RawSerialPort, NetworkSerialPort
from .modbus import ModbusTCPCoil, WaveshareModbusTCPCoil
Expand All @@ -15,6 +15,7 @@
HIDRelay,
IMXUSBLoader,
LXAUSBMux,
MatchedManagedGPIO,
MatchedSysfsGPIO,
MXSUSBLoader,
RKUSBLoader,
Expand Down
11 changes: 11 additions & 0 deletions labgrid/resource/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ class EthernetPort(Resource):
interface = attr.ib(default=None)


@target_factory.reg_resource
@attr.s(eq=False)
class ManagedGPIO(Resource):
"""The basic ManagedGPIO contains an index

Args:
chip (str): path to gpiochip device.
pin (str | int): name or index of target gpio line."""
chip = attr.ib(default=None, validator=attr.validators.instance_of(str))
pin = attr.ib(default=None, validator=attr.validators.instance_of((str, int)))

@target_factory.reg_resource
@attr.s(eq=False)
class SysfsGPIO(Resource):
Expand Down
13 changes: 13 additions & 0 deletions labgrid/resource/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,19 @@ def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()

@target_factory.reg_resource
@attr.s(eq=False)
class NetworkManagedGPIO(NetworkResource, ManagedResource):
manager_cls = RemotePlaceManager

"""The NetworkManagedGPIO describes a remotely accessible gpio line"""
chip = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(str)))
pin = attr.ib(validator=attr.validators.optional(attr.validators.instance_of((str, int))))

def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()


@target_factory.reg_resource
@attr.s(eq=False)
Expand Down
Loading
Loading