Screenplay: Pointless Blinking With Python, asyncio, and libgpiod (and a Raspberry Pi of Course)

Setup Before Presentation

Starting Point

Greeting

In a subdirectory:

Multiple Background Threads

$ strace -f python3 code/thread-multi.py
...
[pid  4677] write(1, "hello left\n", 11hello left
) = 11
[pid  4677] clock_gettime64(CLOCK_MONOTONIC, {tv_sec=164646, tv_nsec=833862215}) = 0
[pid  4677] _newselect(0, NULL, NULL, NULL, {tv_sec=0, tv_usec=500000} <unfinished ...>
[pid  4679] <... _newselect resumed>)   = 0 (Timeout)
[pid  4679] write(1, "                        hello mi"..., 61                        hello middle
) = 61
[pid  4679] clock_gettime64(CLOCK_MONOTONIC, {tv_sec=164646, tv_nsec=845864201}) = 0
[pid  4679] _newselect(0, NULL, NULL, NULL, {tv_sec=1, tv_usec=0} <unfinished ...>
[pid  4678] <... _newselect resumed>)   = 0 (Timeout)
[pid  4678] write(1, "                                "..., 61                                                 hello right
) = 61
  • Three independent PIDs, using select() to implement time.sleep() (NULL fds)

  • Fourth (main thread) also involved occasionally, likely due to Python’s weird thread management (GIL)

  • Managed by OS scheduler

  • ⟶ scheduling jitter, heavy (?) OS load

Blink

#!/usr/bin/env python

from threading import Thread
import time

def hello_left():
    for _ in range(10):
        print('hello left')
        time.sleep(0.5)

def hello_right():
    for _ in range(10):
        print('hello right'.rjust(60))
        time.sleep(0.4)

def hello_middle():
    for _ in range(10):
        print('hello middle'.center(60))
        time.sleep(0.3)

t1 = Thread(target=hello_left)
t2 = Thread(target=hello_right)
t3 = Thread(target=hello_middle)

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Enter asyncio

  • Replace threading and time with asyncio

  • async functions, await asyncio.sleep(...)

  • async def main()

  • asyncio.run(main())

$ strace -f code/async-multi.py
...
epoll_wait(3, [], 1, 201)               = 0
...
  • Single thread!

  • Timeouts apparently multiplexed on the event loop’s timeout parameter

Blink

#!/usr/bin/env python

import asyncio

async def hello_left():
    for _ in range(5):
        print('hello left')
        await asyncio.sleep(0.5)

async def hello_right():
    for _ in range(5):
        print('hello right'.rjust(60))
        await asyncio.sleep(0.8)

async def hello_middle():
    for _ in range(4):
        print('hello middle'.center(60))
        await asyncio.sleep(1)

async def main():
    t1 = asyncio.create_task(hello_left())
    t2 = asyncio.create_task(hello_middle())
    t3 = asyncio.create_task(hello_right())

    await t1
    await t2
    await t3

asyncio.run(main())

Character Device Based GPIO

  • The way to go for GPIO on Linux

  • Alternative: sysfs GPIO

    • Unmaintained

    • Not immune to hotplug GPIO (e.g. USB GPIO controller) ⟶ fixed number range

    • Not reset when application crashes (a feature that is not wanted by most people)

    • No pullup/pulldown configuration

  • RPi.GPIO

    • Raspberry specific

    • Weird (a background thread fires event/interrupts)

    • Bound to go away (I hope)

Libgpiod V2: New Major Release with a Ton of New Features - Bartosz Golaszewski

GPIO Device

  • /dev/gpiochip0: character device

  • Opened by processes

  • Communication via ioctl()

  • Selectable ⟶ events/interrupts

  • User space library: libgpiod

$ ls -l /dev/gpiochip0
ls -l crw-rw---- 1 root gpio 254, 0 Apr  9 13:30 /dev/gpiochip0

Most Basic Feature: Setting GPIO Values

  • Request GPIO 11, 10, 27

  • … configuring for output

#!/usr/bin/env python

import gpiod
import time

REQUEST = gpiod.request_lines(
    '/dev/gpiochip0',
    consumer='mytest',
    config={(11,10,27): gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})

REQUEST.set_values({11: gpiod.line.Value(1),
                    10: gpiod.line.Value(1),
                    27: gpiod.line.Value(1),
                   })

time.sleep(1)

REQUEST.set_values({11: gpiod.line.Value(0),
                    10: gpiod.line.Value(0),
                    27: gpiod.line.Value(0),
                   })

Entire Matrix On/Off

GPIO

#!/usr/bin/env python

import gpiod
import time

MATRIX = (
    (11, 10, 27,  4,  2),
    ( 0,  9, 22, 17,  3),
    ( 5, 20,  1, 25, 18),
    ( 6, 16,  7, 24, 15),
    (13, 12,  8, 23, 14),
)
ALL_IOS = sum(MATRIX, start=())

REQUEST = gpiod.request_lines(
    '/dev/gpiochip0',
    consumer='blink',
    config={ALL_IOS: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})

REQUEST.set_values({i: gpiod.line.Value(1) for i in ALL_IOS})
time.sleep(0.5)
REQUEST.set_values({i: gpiod.line.Value(0) for i in ALL_IOS})

Bringing All Together

  • Continue blink.py

  • Morph hello*() in blink(ios, interval, ntimes=None) (None -> itertools.count(), else range())

  • import gpiod, next to asyncio

  • Pull in snippet set_values

  • Pull in snippet blink-raw

GPIO

Interval

11

0.5

10

0.4

27

0.3

4

0.2

2

0.1

Blink

#!/usr/bin/env python

import gpiod
import asyncio
import itertools

MATRIX = (
    (11, 10, 27,  4,  2),
    ( 0,  9, 22, 17,  3),
    ( 5, 20,  1, 25, 18),
    ( 6, 16,  7, 24, 15),
    (13, 12,  8, 23, 14),
)
ALL_IOS = sum(MATRIX, start=())

REQUEST = gpiod.request_lines(
    '/dev/gpiochip0',
    consumer='blink',
    config={ALL_IOS: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})

def SET_VALUES(ios, b):
    REQUEST.set_values({i: gpiod.line.Value(b) for i in ios})

async def blink(ios, interval, ntimes=None):
    loop = (ntimes is None) and itertools.count() or range(ntimes)
    for _ in loop:
        SET_VALUES(ios, 1)
        await asyncio.sleep(interval)
        SET_VALUES(ios, 0)
        await asyncio.sleep(interval)

async def main():
    tasks = [
        asyncio.create_task(blink((11,), 0.5)),
        asyncio.create_task(blink((10,), 0.4)),
        asyncio.create_task(blink((27,), 0.3)),
        asyncio.create_task(blink(( 4,), 0.2)),
        asyncio.create_task(blink(( 2,), 0.1)),
    ]
    for t in tasks:
        await t

asyncio.run(main())

Modularize

  • Note this is not about clean coding 🐷

  • Cram stuff into stuff.py

  • All but main() goes there

Stuff

import asyncio
import gpiod
import itertools

MATRIX = (
    (11, 10, 27,  4,  2),
    ( 0,  9, 22, 17,  3),
    ( 5, 20,  1, 25, 18),
    ( 6, 16,  7, 24, 15),
    (13, 12,  8, 23, 14),
)
ALL_IOS = sum(MATRIX, start=())

REQUEST = gpiod.request_lines(
    '/dev/gpiochip0',
    consumer='blink',
    config={ALL_IOS: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})

def SET_VALUES(ios, b):
    REQUEST.set_values({i: gpiod.line.Value(b) for i in ios})

async def blink(ios, interval, ntimes=None):
    loop = (ntimes is None) and itertools.count() or range(ntimes)
    for _ in loop:
        SET_VALUES(ios, 1)
        await asyncio.sleep(interval)
        SET_VALUES(ios, 0)
        await asyncio.sleep(interval)

Blink

#!/usr/bin/env python

from stuff_raw import *          # <-- this is not about "clean code"
import asyncio

async def main():
    tasks = [
        asyncio.create_task(blink((11,), 0.5)),
        asyncio.create_task(blink((10,), 0.4)),
        asyncio.create_task(blink((27,), 0.3)),
        asyncio.create_task(blink(( 4,), 0.2)),
        asyncio.create_task(blink(( 2,), 0.1)),
    ]

    for t in tasks:
        await t

asyncio.run(main())

Coroutines?

  • Hm … too much of asyncio.create_task() here

  • Instantiate coroutines first, not using create-task() immediately

    • Coroutines do nothing

    • await, or create_task()

  • Pass to all() (pull in from snippet all-initial)

  • await that instead

Blink

#!/usr/bin/env python

from stuff_raw import *
import asyncio
import numpy

async def all(coros):
    tasks = [asyncio.create_task(c) for c in coros]
    for t in tasks:
        await t

async def main():
    cols = numpy.transpose(MATRIX).tolist()

    await all([
        blink(cols[0], 0.5),
        blink(cols[1], 0.4),
        blink(cols[2], 0.3),
        blink(cols[3], 0.2),
        blink(cols[4], 0.1),
    ])

asyncio.run(main())

Not Enough: sequence()

  • Not using create_task(): sequential execution

  • Pass a limit to blink()

  • ⟶ another one: sequence()

#!/usr/bin/env python

from stuff_raw import *
import asyncio
import numpy

async def all(coros):
    tasks = [asyncio.create_task(c) for c in coros]
    for t in tasks:
        await t

async def sequence(coros):
    for c in coros:
        await c

async def main():
    cols = numpy.transpose(MATRIX).tolist()

    await sequence([
        blink(cols[0], 0.5, 2),
        blink(cols[1], 0.4, 2),
        blink(cols[2], 0.3, 2),
        blink(cols[3], 0.2, 2),
        blink(cols[4], 0.1, 2),
    ])

asyncio.run(main())

Looping: forever()

#!/usr/bin/env python

from stuff_raw2 import *
import asyncio
import numpy

async def forever(coro):
    while True:
        await coro

async def main():
    cols = numpy.transpose(MATRIX).tolist()

    await forever(
        sequence([
            blink(cols[0], 0.3, 2),
            blink(cols[1], 0.2, 2),
            blink(cols[2], 0.1, 2),
            blink(cols[3], 0.01, 2),
            blink(cols[4], 0.05, 2),
        ])
    )

asyncio.run(main())
$ code/blink-forever.py
...
RuntimeError: cannot reuse already awaited coroutine
...
  • Need to create new coroutine at each execution

  • ⟶ factory?

A Stripped-Down Program (⟶ Factory)

#!/usr/bin/env python

from stuff_raw2 import *
import asyncio

async def forever(coro):
    while True:
        await coro

async def blink(ios, interval, ntimes=None):
    loop = (ntimes is None) and itertools.count() or range(ntimes)
    for _ in loop:
        SET_VALUES(ios, 1)
        await asyncio.sleep(interval)
        SET_VALUES(ios, 0)
        await asyncio.sleep(interval)

async def main():
    await forever(
        blink((11,), 0.3, 1),
    )

asyncio.run(main())

@program, Finally

Stuff

import asyncio
import gpiod
import itertools

MATRIX = (
    (11, 10, 27,  4,  2),
    ( 0,  9, 22, 17,  3),
    ( 5, 20,  1, 25, 18),
    ( 6, 16,  7, 24, 15),
    (13, 12,  8, 23, 14),
)
ALL_IOS = sum(MATRIX, start=())

REQUEST = gpiod.request_lines(
    '/dev/gpiochip0',
    consumer='blink',
    config={ALL_IOS: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})

def SET_VALUES(ios, b):
    REQUEST.set_values({i: gpiod.line.Value(b) for i in ios})

async def blink(ios, interval, ntimes=None):
    loop = (ntimes is None) and itertools.count() or range(ntimes)
    for _ in loop:
        SET_VALUES(ios, 1)
        await asyncio.sleep(interval)
        SET_VALUES(ios, 0)
        await asyncio.sleep(interval)

async def all(coros):
    tasks = [asyncio.create_task(c) for c in coros]
    for t in tasks:
        await t

async def sequence(coros):
    for c in coros:
        await c

Blink

#!/usr/bin/env python

from stuff_decorator_stage1 import *
import asyncio

async def forever(factory):
    while True:
        await factory()                  # <--- call to create_coro()

async def blink(ios, interval, ntimes=None):
    loop = (ntimes is None) and itertools.count() or range(ntimes)
    for _ in loop:
        SET_VALUES(ios, 1)
        await asyncio.sleep(interval)
        SET_VALUES(ios, 0)
        await asyncio.sleep(interval)

def create_factory_for_blink(blinkfunc):
    def factory(ios, interval, ntimes=None):
        def create_coro():
            return blinkfunc(ios, interval, ntimes)
        return create_coro
    return factory

blink = create_factory_for_blink(blink)  # <--- turn blink into blink-factory

async def main():
    await forever(
        blink((11,), 0.3, 1),            # <--- use blink-factory to instantiate coroutine
    )

asyncio.run(main())

Playground: cycle()

@program
async def cycle(ios, interval):
    import itertools

    for i in itertools.cycle(ios):
        p = blink((i,), interval, 1)
        await p()
  • Cycle row[0]

  • A-HA!!

Fast Forward: any(), And Cancellation

  • blink.py: insert prog-all-demo ⟶ explain

  • Show stuff.any() ⟶ opposite of all()

  • blink.py: insert prog-any-demo

Playground: on()

  • Show stuff.on()

  • Explain Future

  • blink.py: insert prog-any-on-demo

  • blink.py: insert prog-my-blink

Goodbye

  • blink.py: insert prog-smiley