This document describes classes designed to enhance the capability of
MicroPython's asyncio
when used in a microcontroller context.
- Introduction
1.1 API Design Callbacks vs. asynchronous interfaces.
1.2 Switches Electrical considerations. - Installation and usage
- Interfacing switches
3.1 ESwitch class Switch debouncer with event interface.
3.2 Switch class Switch debouncer with callbacks. - Interfacing pushbuttons Access short, long and double-click events.
4.1 EButton class Debounced pushbutton with Event-based interface.
4.2 Pushbutton class Debounced pushbutton with callback interface.
4.2.1 The suppress constructor argument
4.2.2 The sense constructor argument
4.3 ESP32Touch class
4.4 Keyboard class Retrieve characters from a keypad.
4.5 SwArray class Interface a crosspoint array of switches or buttons.
4.6 Suppress mode Reduce the number of events/callbacks. - ADC monitoring Pause until an ADC goes out of bounds
5.1 AADC class
5.2 Design note - Quadrature encoders Asynchronous interface for rotary encoders.
6.1 Encoder class - Ringbuf Queue A MicroPython optimised queue primitive.
- Delay_ms class A flexible retriggerable delay with callback or Event interface.
- Message Broker A flexible means of messaging between tasks.
9.1 Further examples
9.2 User agents User defined Agent classes.
9.3 Wildcard subscriptions
9.4 Notes - Additional functions
10.1 launch Run a coro or callback interchangeably.
10.2 set_global_exception Simplify debugging with a global exception handler.
The classes presented here include asynchronous interfaces to switches,
pushbuttons, incremental encoders and ADC's. Specifically they are interfaces to
devices defined in the machine
module rather than device drivers for external
hardware: as such they are grouped with synchronisation primitives. There are
also synchronisation primitives providing a microcontroller-optimised alternative
to the existing CPython-compatible primitives.
The traditional interface to asynchronous external events is via a callback.
When the event occurs, the device driver runs a user-specified callback. Some
classes described here offer a callback interface. Where callbacks are used the
term callable
implies a Python callable
: namely a function, bound method,
coroutine or bound coroutine. Any of these may be supplied as a callback
Newer class designs abandon callbacks in favour of asynchronous interfaces. This
is done by exposing Event
or asynchronous iterator interfaces. It is arguable
that callbacks are outdated. Handling of arguments and return values is
inelegant and there are usually better ways using asynchronous coding. In
particular MicroPython's asyncio
implements asynchronous interfaces in an
efficient manner. A task waiting on an Event
consumes minimal resources. If a
user wishes to use a callback it may readily be achieved using patterns like the
following. In this case the device is an asynchronous iterator:
async def run_callback(device, callback, *args):
async for result in device:
callback(result, *args)
or, where the device presents an Event
async def run_callback(device, callback, *args):
while True:
await device.wait() # Wait on the Event
device.clear() # Clear it down
From an electrical standpoint switches and pushbuttons are identical, however from a programming perspective a switch is either open or closed, while a pushbutton may be subject to single or double clicks, or to long presses. Consequently switch drivers expose a simpler interface with a consequent saving in code size.
All switch drivers rely on millisecond-level timing: callback functions must be designed to terminate rapidly. This applies to all functions in the application; coroutines should yield regularly. If these constraints are not met, switch events can be missed.
All switches are prone to contact bounce, with a consequent risk of spurious events: the drivers presented here embody debouncing. The phenomenon of contact bounce is discussed in this document.
Two ways of wiring switches are supported. For small numbers of switches, the
switch may link a pin to gnd
with the pin being configured as an input with a
pull up resistor. Interfacing such a switch is simple:
import asyncio
from machine import Pin
from primitives import ESwitch
es = ESwitch(Pin(16, Pin.IN, Pin.PULL_UP))
async def closure():
while True:
es.close.clear() # Clear the Event
await es.close.wait() # Wait for contact closure
print("Closed") # Run code
As the number of switches increases, consumption of GPIO pins can be problematic. A solution is to wire the switches as a crosspoint array with the driver polling each row in turn and reading the columns. This is the usual configuration of keypads.
Crosspoint connection requires precautions to cater for the case where multiple contacts are closed simultaneously, as this can have the effect of linking two output pins. Risk of damage is averted by defining the outputs as open drain. This allows for one key rollover: if a second key is pressed before the first is released, the keys will be read correctly. Invalid contact closures may be registered if more than two contacts are closed. This also applies where the matrix comprises switches rather than buttons. In this case diode isolation is required:
Whether or not diodes are used the column input pins must be pulled up. Scanning of the array occurs rapidly, and built-in pull-up resistors have a high value. If the capacitance between wires is high, spurious closures may be registered. To prevent this it is wise to add physical resistors between the input pins and 3.3V. A value in the region of 1KΩ to 5KΩ is recommended.
The latest release build of firmware or a newer preview build is recommended. To install the library, connect the target hardware to WiFi and issue:
import mip
For any target including non-networked ones use mpremote:
$ mpremote mip install "github:peterhinch/micropython-async/v3/primitives"
Drivers are imported with:
from primitives import Switch, Pushbutton, AADC
There is a test/demo program for the Switch and Pushbutton classes. On import this lists available tests. It assumes a Pyboard with a switch or pushbutton between X1 and Gnd. It is run as follows:
from primitives.tests.switches import *
test_sw() # For example
The test for the AADC
class requires a Pyboard with pins X1 and X5 linked. It
is run as follows:
from primitives.tests.adctest import test
The primitives
module provides ESwitch
and Switch
classes. The former is a
minimal driver providing an Event
interface. The latter supports callbacks and
from primitives import ESwitch #
This provides a debounced interface to a switch connected to gnd or to 3V3. A
pullup or pull down resistor should be supplied to ensure a valid logic level
when the switch is open. The default constructor arg lopen=1
is for a switch
connected between the pin and gnd, with a pullup to 3V3. Typically the pullup
is internal, the pin being as follows:
from machine import Pin
pin_id = 0 # Depends on hardware
pin = Pin(pin_id, Pin.IN, Pin.PULL_UP)
Constructor arguments:
instance: should be initialised as an input with a pullup or down as appropriate.lopen=1
Electrical level when switch is open circuit i.e. 1 is 3.3V, 0 is gnd.
Call syntax e.g.myswitch()
returns the logical debounced state of the switch i.e. 0 if open, 1 if closed.deinit()
No args. Cancels the polling task and clears boundEvent
Class variable:
Debounce time in ms.
Bound objects:
instance. Set on contact
instance. Set on contact open.
Application code is responsible for clearing the Event
Usage example:
import asyncio
from machine import Pin
from primitives import ESwitch
es = ESwitch(Pin("Y1", Pin.IN, Pin.PULL_UP))
async def closure():
while True:
await es.close.wait()
async def open():
while True:
async def main():
await closure() # Run forever
from primitives import Switch #
This can run callbacks or schedule coros on contact closure and/or opening. As
an alternative to a callback based interface, bound Event
objects may be
triggered on switch state changes.
This assumes a normally open switch connected between a pin and ground. The pin
should be initialised as an input with a pullup. A callable
may be specified
to run on contact closure or opening; where the callable
is a coroutine it
will be converted to a Task
and will run asynchronously. Debouncing is
implicit: contact bounce will not cause spurious execution of the callable
Constructor argument (mandatory):
The initialised Pin instance.
close_func(func, args=())
to run on contact closure,args
a tuple of arguments for thecallable
.open_func(func, args=())
to run on contact open,args
a tuple of arguments for thecallable
Call syntax e.g.myswitch()
returns the physical debounced state of the switch i.e. 0 if grounded, 1 if connected to3V3
No args. Cancels the running task.
Class attribute:
Debounce time in ms.
from pyb import LED
from machine import Pin
import asyncio
from primitives import Switch
async def pulse(led, ms):
await asyncio.sleep_ms(ms)
async def my_app():
pin = Pin('X1', Pin.IN, Pin.PULL_UP) # Hardware: switch to gnd
red = LED(1)
sw = Switch(pin)
sw.close_func(pulse, (red, 1000)) # Note how coro and args are passed
await asyncio.sleep(60) # Dummy application code # Run main application code
This enables a task to wait on a switch state as represented by a bound Event
instance. A bound contact closure Event
is created by passing None
, in which case the Event
is named .close
. Likewise a .open
is created by passing None
to open_func
The primitives
module provides the following classes for interfacing
pushbuttons. The following support normally open or normally closed buttons
connected to gnd or to 3V3:
Provides anEvent
based interface.Pushbutton
s and/or callbacks.
The following support normally open pushbuttons connected in a crosspoint array.Keyboard
An asynchronous iterator responding to button presses.SwArray
As above, but also supporting open, double and long events. The latter can also support switches in a diode-isolated array.
from primitives import EButton #
This extends the functionality of ESwitch
to provide additional events for
long and double presses.
This can support normally open or normally closed switches, connected to gnd
(with a pullup) or to 3V3
(with a pull-down). The Pin
object should be
initialised appropriately. The default state of the switch can be passed in the
optional "sense" parameter on the constructor, otherwise the assumption is that
on instantiation the button is not pressed.
The EButton class uses logical rather than physical state: a button's state
is considered True
if pressed, otherwise False
regardless of its physical
Constructor arguments:
Mandatory. The initialised Pin instance.suppress=False
. See Suppress mode.sense=None
. Optionally define the electrical connection: see section 4.2.1.
Call syntax e.g.mybutton()
Returns the logical debounced state of the button (True
corresponds to pressed).rawstate()
Returns the logical instantaneous state of the button. There is probably no reason to use this.deinit()
No args. Cancels the running task and clears all events.
Bound Event
Set on button press.release
Set on button release.long
Set if button press is longer thanEButton.long_press_ms
Set if two button preses occur withinEButton.double_click_ms
Application code is responsible for clearing any Event
s that are used.
Class attributes:
Debounce time in ms. Default 50.long_press_ms=1000
Threshold time in ms for a long press.double_click_ms=400
Threshold time in ms for a double-click.
In most applications it can be assumed that, at power-up, pushbuttons are not
pressed. The default None
value uses this assumption to read the pin state
and to assign the result to the False
(not pressed) state at power up. This
works with normally open or normally closed buttons wired to either supply
rail; this without programmer intervention.
In certain use cases this assumption does not hold, and sense
must explicitly
be specified. This defines the logical state of the un-pressed button. Hence
defines a button connected in such a way that when it is not pressed,
the voltage on the pin is gnd.
Whenever the pin value changes, the new value is compared with sense
determine whether the button is closed or open.
from primitives import Pushbutton #
This can support normally open or normally closed switches, connected to gnd
(with a pullup) or to 3V3
(with a pull-down). The Pin
object should be
initialised appropriately. The default state of the switch can be passed in the
optional "sense" parameter on the constructor, otherwise the assumption is that
on instantiation the button is not pressed.
The Pushbutton class uses logical rather than physical state: a button's state
is considered True
if pressed, otherwise False
regardless of its physical
instances may be specified to run on button press, release, double
click or long press events; where the callable
is a coroutine it will be
converted to a Task
and will run asynchronously.
Please see the note on timing in section 3.
Constructor arguments:
Mandatory. The initialised Pin instance.suppress
. See section 4.2.2.sense
. Option to define electrical connection. See section 4.2.1.
press_func(func=False, args=())
to run on button push,args
a tuple of arguments for thecallable
.release_func(func=False, args=())
to run on button release,args
a tuple of arguments for thecallable
.long_func(func=False, args=())
to run on long button push,args
a tuple of arguments for thecallable
.double_func(func=False, args=())
to run on double button push,args
a tuple of arguments for thecallable
Call syntax e.g.mybutton()
Returns the logical debounced state of the button (True
corresponds to pressed).rawstate()
Returns the logical instantaneous state of the button. There is probably no reason to use this.deinit()
No args. Cancels the running debounce task.
Methods 1 - 4 may be called at any time. If False
is passed for a callable,
any existing callback will be disabled. If None
is passed, a bound Event
created. See below for Event
Class variables:
Debounce time in ms. Default 50.long_press_ms
Threshold time in ms for a long press. Default 1000.double_click_ms
Threshold time in ms for a double-click. Default 400.
If these variables are changed, it should be done prior to instantiating the class. The double click time must be less than the long press time.
A simple Pyboard demo:
from pyb import LED
from machine import Pin
import asyncio
from primitives import Pushbutton
def toggle(led):
async def my_app():
pin = Pin('X1', Pin.IN, Pin.PULL_UP) # Pushbutton to gnd
red = LED(1)
pb = Pushbutton(pin)
pb.press_func(toggle, (red,)) # Note how function and args are passed
await asyncio.sleep(60) # Dummy # Run main application code
See Suppress mode for the purpose of this arg.
Note: suppress
affects the behaviour of the release_func
only. Other
callbacks including press_func
behave normally. If the suppress = True
constructor argument is set, the release_func
will be launched as follows:
- If
does not exist on rapid button release. - If
exists, after the expiration of the double-click timer. - If
exists and the press duration causeslong_func
to be launched,release_func
will not be launched. - If
exists and a double-click occurs,release_func
will not be launched.
In the typical case where long_func
and double_func
are both defined, this
ensures that only one of long_func
, double_func
and release_func
run. In
the case of a single short press, the release_func
will be delayed until the
expiry of the double-click timer (because until that time a second click might
In most applications it can be assumed that, at power-up, pushbuttons are not
pressed. The default None
value uses this assumption to assign the False
(not pressed) state at power up. It therefore works with normally open or
normally closed buttons wired to either supply rail. This without programmer
In certain use cases this assumption does not hold, and sense
must explicitly
be specified. This defines the logical state at power-up regardless of whether,
at that time, the button is pressed. Hence sense=0
defines a button connected
in such a way that when it is not pressed, the voltage on the pin is 0.
When the pin value changes, the new value is compared with sense
to determine
if the button is closed or open. This is to allow the designer to specify if
the closed
state of the button is active high
or active low
Event names, where None
is passed to a method listed below, are as follows:
method | Event |
press_func | press |
release_func | release |
long_func | long |
double_func | double |
from primitives import ESP32Touch #
This subclass of Pushbutton
supports ESP32 touchpads providing a callback
based interface. See the
official docs.
API and usage are as per Pushbutton
with the following provisos:
- The
constructor arg is not supported. - The
instance passed to the constructor must support the touch interface. It is instantiated without args, as per the example below. - There is an additional classmethod
which takes an integer arg. The arg represents the detection threshold as a percentage.
The driver determines the untouched state by periodically polling
and storing its maximum value. If it reads a value
below maximum * threshold / 100
a touch is deemed to have occurred. Default
threshold is currently 80% but this is subject to change.
Example usage:
from machine import Pin
import asyncio
from primitives import ESP32Touch
ESP32Touch.threshold(70) # optional
async def main():
tb = ESP32Touch(Pin(15), suppress=True)
tb.press_func(lambda : print("press"))
tb.double_func(lambda : print("double"))
tb.long_func(lambda : print("long"))
tb.release_func(lambda : print("release"))
while True:
await asyncio.sleep(1)
If a touchpad is touched on initialisation no callbacks will occur even when
the pad is released. Initial button state is always False
. Normal behaviour
will commence with subsequent touches.
The best threshold value depends on physical design. Directly touching a large
pad will result in a low value from
. A small pad
covered with an insulating film will yield a smaller change.
from primitives import Keyboard #
A Keyboard
provides an interface to a set of pushbuttons arranged as a
crosspoint array. If a key is pressed its array index (scan code) is placed on a
queue. Keypresses are retrieved with async for
. The driver operates by
polling each row, reading the response of each column. 1-key rollover is
supported - this is the case where a key is pressed before the prior key has
been released.
Constructor mandatory args:
A list or tuple of initialised open drain output pins.colpins
A list or tuple of initialised input pins (pulled up).
Constructor optional keyword only args:
Size of keyboard buffer.db_delay=50
Debounce delay in ms.
Cancels the running task.__getitem__(self, scan_code)
Returns abool
being the instantaneous debounced state of a given pin. Enables code that causes actions after a button press, for example on release or auto-repeat while pressed.
The Keyboard
class is subclassed from Ringbuf Queue
enabling scan codes to be retrieved with an asynchronous iterator.
Example usage:
import asyncio
from primitives import Keyboard
from machine import Pin
rowpins = [Pin(p, Pin.OPEN_DRAIN) for p in range(10, 14)]
colpins = [Pin(p, Pin.IN, Pin.PULL_UP) for p in range(16, 20)]
async def main():
kp = Keyboard(rowpins, colpins)
async for scan_code in kp:
if not scan_code:
break # Quit on key with code 0
In typical use the scan code would be used as the index into a string of
keyboard characters ordered to match the physical layout of the keys. If data
is not removed from the buffer, on overflow the oldest scan code is discarded.
There is no limit on the number of rows or columns however if more than 256 keys
are used, the bufsize
arg would need to be adapted to handle scan codes > 255.
In this case an array
or list
object would be passed.
Usage example. Keypresses on a numeric keypad are sent to a UART with auto repeat. Optionally link GPIO0 and GPIO1 to view the result.
import asyncio
from primitives import Keyboard
from machine import Pin, UART
cmap = b"123456789*0#" # Numeric keypad character map
async def repeat(kpad, scan_code, uart): # Send at least one char
ch = cmap[scan_code : scan_code + 1] # Get character
await asyncio.sleep_ms(400) # Longer initial delay
while kpad[scan_code]: # While key is pressed
await asyncio.sleep_ms(150) # Faster repeat
async def receiver(uart):
sreader = asyncio.StreamReader(uart)
while True:
res = await sreader.readexactly(1)
print('Received', res)
async def main(): # Run forever
rowpins = [Pin(p, Pin.OPEN_DRAIN) for p in range(10, 13)]
colpins = [Pin(p, Pin.IN, Pin.PULL_UP) for p in range(16, 20)]
uart = UART(0, 9600, tx=0, rx=1)
kpad = Keyboard(rowpins, colpins)
async for scan_code in kpad:
rpt = asyncio.create_task(repeat(kpad, scan_code, uart))
from primitives.sw_array import SwArray, CLOSE, OPEN, LONG, DOUBLE, SUPPRESS
An SwArray
is similar to a Keyboard
except that single, double and long
presses are supported. Items in the array may be switches or pushbuttons,
however if switches are used they must be diode-isolated. For the reason see
Switches. It is an asynchronous iterator with events
being retrieved with async for
: this returns a pair of integers being the scan
code and a bit representing the event which occurred.
Constructor mandatory args:
A list or tuple of initialised open drain output pins.colpins
A list or tuple of initialised input pins (pulled up).cfg
An integer defining conditions requiring a response. See Module Constants below.
Constructor optional keyword only args:
Size of buffer.
Cancels the running task.__getitem__(self, scan_code)
Returns abool
being the instantaneous debounced state of a given pin. Enables code that causes actions after a button press. For example after a press a pin might periodically be polled to achieve auto-repeat until released.
Synchronous bound method:
Return an integer representing a bitmap of the debounced state of all switches in the array. 1 == closed.
Class variables:
debounce_ms = 50
Assumed maximum duration of contact bounce.long_press_ms = 1000
Threshold for long press detection.double_click_ms = 400
Threshold for double-click detection.
Module constants.
The following constants are provided to simplify defining the cfg
arg. This may be defined as a bitwise or
of selected constants. For example if
bit is specified, switch closures will be reported. An omitted event
will be ignored. Where the array comprises switches it is usual to specify only
and/or OPEN
. This invokes a more efficient mode of operation because
timing is not required.
Report contact closure.OPEN
Contact opening.LONG
Contact closure longer thanlong_press_ms
Two closures in less thandouble_click_ms
Disambiguate. For explanation see Suppress mode. If all the above bits are set, a double click will result inDOUBLE
responses. If theOPEN
bit were clear, onlyDOUBLE
would occur.
The SwArray
class is subclassed from Ringbuf Queue.
This is an asynchronous iterator, enabling scan codes and event types to be
retrieved as state changes occur. The event type is a single bit corresponding
to the above constants.
Usage example:
import asyncio
from primitives.sw_array import SwArray, CLOSE, OPEN, LONG, DOUBLE, SUPPRESS
from machine import Pin
rowpins = [Pin(p, Pin.OPEN_DRAIN) for p in range(10, 14)]
colpins = [Pin(p, Pin.IN, Pin.PULL_UP) for p in range(16, 20)]
async def main():
swa = SwArray(rowpins, colpins, cfg)
async for scan_code, evt in swa:
print(scan_code, evt)
if not scan_code:
break # Quit on key with code 0
The pushbutton drivers support a mode known as suppress
. This option reduces
the number of events (or callbacks) that occur in the case of a double click.
Consider a button double-click. By default with suppress=False
the following
events will occur in order:
Similarly a long press will trigger press
, long
and release
in that
order. Some applications may require only one event to be triggered. Setting
ensures this. Outcomes are as follows:
Occurrence | Events set | Time of primary event |
Short press | press, release | After .double_click_ms |
Double press | double, release | When the second press occurs |
Long press | long, release | After long_press_ms |
The tradeoff is that the press
and release
events are delayed: the soonest
it is possible to detect the lack of a double click is .double_click_ms
after a short button press. Hence in the case of a short press when suppress
is True
, press
and release
events are set on expiration of the double
click timer.
The following script may be used to demonstrate the effect of suppress
. As
written, it assumes a Pi Pico with a push button attached between GPIO 18 and
Gnd, with the primitives installed.
from machine import Pin
import asyncio
from primitives import Pushbutton
btn = Pin(18, Pin.IN, Pin.PULL_UP) # Adapt for your hardware
async def main():
pb = Pushbutton(btn, suppress=True)
pb.release_func(print, ("SHORT",))
pb.double_func(print, ("DOUBLE",))
pb.long_func(print, ("LONG",))
await asyncio.sleep(60) # Run for one minute
The primitives.aadc
module provides the AADC
(asynchronous ADC) class. This
provides for coroutines which pause until the value returned by an ADC goes
outside predefined bounds. Bounds may be absolute or relative to the current
value. Data from ADC's is usually noisy. Relative bounds provide a simple (if
crude) means of eliminating this. Absolute bounds can be used to raise an alarm
or log data, if the value goes out of range. Typical usage:
import asyncio
from machine import ADC
import pyb
from primitives import AADC
aadc = AADC(ADC(pyb.Pin.board.X1))
async def foo():
while True:
value = await aadc(2000) # Trigger if value changes by 2000
from primitives import AADC #
instances are awaitable. This is the principal mode of use.
Constructor argument:
An instance ofmachine.ADC
Awaiting an instance:
Function call syntax is used with zero, one or two unsigned integer args. These
determine the bounds for the ADC value.
- No args: bounds are those set when the instance was last awaited.
- One integer arg: relative bounds are used. The current ADC value +- the arg.
- Two args
: absolute bounds.
Synchronous methods:
Get the current data from the ADC. Iflast
returns the last data read from the ADC. Returns a 16-bit unsigned int as permachine.ADC.read_u16
By default a task awaiting anAADC
instance will pause until the value returned by the ADC exceeds the specified bounds. Issuingsense(False)
inverts this logic: a task will pause until the ADC value is within the specified bounds. Issuingsense(True)
restores normal operation.
In the sample below the coroutine pauses until the ADC is in range, then pauses until it goes out of range.
import asyncio
from machine import ADC
from primitives import AADC
aadc = AADC(ADC('X1'))
async def foo():
while True:
value = await aadc(25_000, 39_000) # Wait until in range
print('In range:', value)
value = await aadc() # Wait until out of range
print('Out of range:', value)
class uses the asyncio
stream I/O mechanism. This is not the most
obvious design. It was chosen because the plan for asyncio
is that it will
include an option for prioritising I/O. I wanted this class to be able to use
this for applications requiring rapid response.
The Encoder class is an asynchronous driver for control knobs based on quadrature encoder switches such as this Adafruit product. The driver is not intended for applications such as CNC machines. Drivers for NC machines must never miss an edge. Contact bounce or vibration induced jitter can cause transitions to occur at a high rate; these must be tracked which challenges software based solutions.
Another issue affecting some solutions is that callbacks occur in an interrupt context. This can lead to concurrency issues. These issues, along with general discussion of MicroPython encoder drivers, are covered in this doc.
This driver runs the user supplied callback in an asyncio
context, so that
the callback runs only when other tasks have yielded to the scheduler. This
ensures that the callback runs with the same rules as apply to any asyncio
task. This offers safety, even if the task triggers complex application
The Encoder
can be instantiated in such a way that its effective resolution
can be reduced. A virtual encoder with lower resolution can be useful in some
applications. In particular it can track the "clicks" of a mechanical detent.
The driver allows limits to be assigned to the virtual encoder's value so that a dial running from (say) 0 to 100 may be implemented. If limits are used, encoder values no longer approximate absolute angles: the user might continue to rotate the dial when its value is "stuck" at an endstop.
The callback only runs if a change in position of the virtual encoder has
occurred. In consequence of the callback running in an asyncio
context, by
the time it is scheduled, the encoder's position may have changed by more than
one increment. The callback receives two args, the absolute value of the
virtual encoder at the time it was triggered and the signed change in this
value since the previous time the callback ran.
from primitives import Encoder #
Existing users: the delay
parameter is now a constructor arg rather than a
class varaiable.
Constructor arguments:
instances for the switch. Should be set asPin.IN
and have pullups.pin_y
Initial value.div=1
A value > 1 causes the motion rate of the encoder to be divided down, to produce a virtual encoder with lower resolution. This can enable tracking of mechanical detents - typical values are then 4 or 2 pulses per click.vmin=None
By default thevalue
of the encoder can vary without limit. Optionally maximum and/or minimum limits can be set.vmax=None
As above. Ifvmin
are specified, aValueError
will be thrown if the initial valuev
does not conform with the limits.mod=None
An integerN > 0
causes the divided value to be reduced moduloN
- useful for controlling rotary devices.callback=lambda a, b : None
Optional callback function. The callback receives two integer args,v
being the virtual encoder's current value anddelta
being the signed difference between the current value and the previous one. Further args may be appended by the following.args=()
An optional tuple of positional args for the callback.delay=100
After motion is detected the driver waits fordelay
ms before reading the current position. A delay limits the rate at which the callback is invoked and improves debouncing. This is a minimal approach. See this script for a way to create a callback which runs only when the encoder stops moving.
Synchronous method:
No args. Returns an integer being the virtual encoder's current value.
Not all combinations of arguments make mathematical sense. The order in which operations are applied is:
- Apply division if specified.
- Restrict the divided value by any maximum or minimum.
- Reduce modulo N if specified.
An Encoder
instance is an asynchronous iterator. This enables it to be used
as follows, with successive values being retrieved with async for
from machine import Pin
import asyncio
from primitives import Encoder
async def main():
px = Pin(16, Pin.IN, Pin.PULL_UP) # Change to match hardware
py = Pin(17, Pin.IN, Pin.PULL_UP)
enc = Encoder(px, py, div=4) # div mtches mechanical detents
async for value in enc:
print(f"Value = {value}")
See this doc for further information on encoders and their limitations.
from primitives import RingbufQueue #
The API of the Queue
aims for CPython compatibility. This is at some cost to
efficiency. As the name suggests, the RingbufQueue
class uses a pre-allocated
circular buffer which may be of any mutable type supporting the buffer protocol
e.g. list
, array
or bytearray
It should be noted that Queue
, RingbufQueue
(and CPython's Queue
) are not
thread safe. See Threading.
Attributes of RingbufQueue
- It is of fixed size,
can grow to arbitrary size. - It uses pre-allocated buffers of various types (
uses alist
). - It is an asynchronous iterator allowing retrieval with
async for
. - It has an "overwrite oldest data" synchronous write mode.
Constructor mandatory arg:
Buffer for the queue, e.g. list, bytearray or array. If an integer is passed, a list of this size is created. A buffer of sizeN
can hold a maximum ofN-1
items. Note that, where items on the queue are suitably limited, bytearrays or arrays are more efficient than lists.
Synchronous methods (immediate return):
No arg. Returns the number of items in the queue.empty
No arg. ReturnsTrue
if the queue is empty.full
No arg. ReturnsTrue
if the queue is full.get_nowait
No arg. Returns an object from the queue. RaisesIndexError
if the queue is empty.put_nowait
Arg: the object to put on the queue. RaisesIndexError
if the queue is full. If the calling code ignores the exception the oldest item in the queue will be overwritten. In some applications this can be of use.peek
No arg. Returns oldest entry without removing it from the queue. This is a superset of the CPython compatible methods.
Asynchronous methods:
Arg: the object to put on the queue. If the queue is full, it will block until space is available.get
Return an object from the queue. If empty, block until an item is available.
Retrieving items from the queue:
The RingbufQueue
is an asynchronous iterator. Results are retrieved using
async for
async def handle_queued_data(q):
async for obj in q:
await asyncio.sleep(0) # See below
# Process obj
The sleep
is necessary if you have multiple tasks waiting on the queue,
otherwise one task hogs all the data.
The following illustrates putting items onto a RingbufQueue
where the queue is
not allowed to stall: where it becomes full, new items overwrite the oldest ones
in the queue:
def add_item(q, data):
except IndexError:
from primitives import Delay_ms #
This implements the software equivalent of a retriggerable monostable or a
watchdog timer. It has an internal boolean running
state. When instantiated
the Delay_ms
instance does nothing, with running
until triggered.
Then running
becomes True
and a timer is initiated. This can be prevented
from timing out by triggering it again (with a new timeout duration). So long
as it is triggered before the time specified in the preceding trigger it will
never time out.
If it does time out the running
state will revert to False
. This can be
interrogated by the object's running()
method. In addition a callable
be specified to the constructor. A callable
can be a callback or a coroutine.
A callback will execute when a timeout occurs; where the callable
is a
coroutine it will be converted to a Task
and run asynchronously.
Constructor arguments (defaults in brackets):
to call on timeout (defaultNone
A tuple of arguments for thecallable
Unused arg, retained to avoid breaking code.duration
Integer, default 1000 ms. The default timer period where no value is passed to thetrigger
Synchronous methods:
optional argumentduration=0
. A timeout will occur afterduration
ms unless retriggered. If no arg is passed the period will be that of theduration
passed to the constructor. The method can be called from a hard or soft ISR. It is now valid forduration
to be less than the current time outstanding.stop
No argument. Cancels the timeout, setting therunning
. The timer can be restarted by issuingtrigger
again. Also clears theEvent
described inwait
No argument. Returns the running status of the object.__call__
Alias for running.rvalue
No argument. If a timeout has occurred and a callback has run, returns the return value of the callback. If a coroutine was passed, returns theTask
instance. This allows theTask
to be cancelled or awaited.callback
. Allows the callable and its args to be assigned, reassigned or disabled at run time.deinit
No args. Cancels the running task. See Object scope.clear
No args. Clears theEvent
described inwait
No args. Sets theEvent
described inwait
Asynchronous method:
One or more tasks may wait on aDelay_ms
instance. Pause until the delay instance has timed out.
In this example a Delay_ms
instance is created with the default duration of
1 sec. It is repeatedly triggered for 5 secs, preventing the callback from
running. One second after the triggering ceases, the callback runs.
import asyncio
from primitives import Delay_ms
async def my_app():
d = Delay_ms(callback, ('Callback running',))
print('Holding off callback')
for _ in range(10): # Hold off for 5 secs
await asyncio.sleep_ms(500)
print('Callback will run in 1s')
await asyncio.sleep(2)
def callback(v):
asyncio.new_event_loop() # Clear retained state
This example illustrates multiple tasks waiting on a Delay_ms
. No callback is
import asyncio
from primitives import Delay_ms
async def foo(n, d):
await d.wait()
d.clear() # Task waiting on the Event must clear it
print('Done in foo no.', n)
async def my_app():
d = Delay_ms()
tasks = [None] * 4 # For CPython compaibility must store a reference see Note
for n in range(4):
tasks[n] = asyncio.create_task(foo(n, d))
print('Waiting on d')
await d.wait()
print('Done in my_app.')
await asyncio.sleep(1)
print('Test complete.')
_ = asyncio.new_event_loop() # Clear retained state
from primitives import Broker #
The Broker
class provides a flexible means of messaging between running tasks.
It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task
publishes to a topic. Objects subscribed to that topic will receive the message.
This enables one to one, one to many, many to one or many to many messaging.
A task subscribes to a topic via an agent
. This is stored by the broker. When
the broker publishes a message, every agent
subscribed to the message topic
will be triggered. In the simplest case the agent
is a Queue
instance: the
broker puts the topic and message onto the subscriber's queue for retrieval.
More advanced agents can perform actions in response to a message, such as
calling a function, launching a task
or lighting an LED.
Agents may be subscribed and unsubscribed dynamically. The publishing task has
no "knowledge" of the number or type of agents subscribed to a topic. The module
is not threadsafe: Broker
methods should not be called from a hard ISR or from
another thread.
All are synchronous.
- Constructor This has no args.
subscribe(topic, agent, *args)
will be triggered by messages with a matchingtopic
. Any additional args will be passed to theagent
when it is triggered.unsubscribe(topic, agent, *args)
will stop being triggered. If args were passed on subscription, the same args must be passed.publish(topic, message)
instances subscribed totopic
will be triggered, receivingtopic
plus any further args that were passed tosubscribe
The topic
arg is typically a string but may be any hashable object. A
is an arbitrary Python object.
An agent
may be an instance of any of the following types. Args refers to any
arguments passed to the agent
on subscription.
Received messages are queued as a 2-tuple(topic, message)
assuming no subscription args - otheriwse(topic, message, (args...))
Received messages are queued as described above.function
Called when a message is received. Args:topic
plus any further subscription args.bound method
Called when a message is received. Args:topic
plus any further args.coroutine
Converted to atask
when a message is received. Args:topic
plus any further subscription args.bound coroutine
Converted to atask
when a message is received. Args:topic
plus any further subscription args.Event
Set when a message is received.user_agent
Instance of a user class. See user agents below.
Note that synchronous agent
instances must run to completion quickly otherwise
the publish
method will be slowed. See Notes for
further details on queue behaviour.
Enables printing of debug messages.
import asyncio
from primitives import Broker, RingbufQueue
broker = Broker()
async def sender(t):
for x in range(t):
await asyncio.sleep(1)
broker.publish("foo_topic", f"test {x}")
async def receiver():
queue = RingbufQueue(20)
broker.subscribe("foo_topic", queue)
async for topic, message in queue:
print(topic, message)
async def main():
rx = asyncio.create_task(receiver())
await sender(10)
await asyncio.sleep(2)
An interesting application is to extend MQTT into the Python code (see mqtt_as). This is as simple as:
async def messages(client):
async for topic, msg, retained in client.queue:
broker.publish(topic.decode(), msg.decode())
Assuming the MQTT client is subscribed to multiple topics, message strings are
directed to agents, each dedicated to handling a topic. An agent
might operate
an interface or queue the message for a running task.
The following illustrates a use case for passing args to an agent
(pin nos.
are for Pyoard 1.1).
import asyncio
from primitives import Broker
from machine import Pin
red = Pin("A13", Pin.OUT, value=0) # Pin nos. for Pyboard V1.1
green = Pin("A14", Pin.OUT, value=0)
broker = Broker()
async def flash():
broker.publish("led", 1)
await asyncio.sleep(1)
broker.publish("led", 0)
def recv(topic, message, led):
led(message) # Light or extinguish an LED
async def main():
broker.subscribe("led", recv, red)
broker.subscribe("led", recv, green)
for _ in range(10):
await flash()
await asyncio.sleep(1)
broker.unsubscribe("led", recv, green) # Arg(s) must be passed
for _ in range(3):
await flash()
await asyncio.sleep(1)
A task can wait on multiple topics using a RingbufQueue
import asyncio
from primitives import Broker, RingbufQueue
broker = Broker()
async def receiver():
q = RingbufQueue(10)
broker.subscribe("foo_topic", q)
broker.subscribe("bar_topic", q)
async for topic, message in q:
print(f"Received Topic: {topic} Message: {message}")
async def sender(t):
for x in range(t):
await asyncio.sleep(1)
broker.publish("foo_topic", f"test {x}")
broker.publish("bar_topic", f"test {x}")
broker.publish("ignore me", f"test {x}")
async def main():
rx = asyncio.create_task(receiver())
await sender(10)
await asyncio.sleep(2)
here the receiver
task waits on two topics. The asynchronous iterator returns
messages as they are published.
An agent
can be an instance of a user class. The class must be a subclass of
, and it must support a synchronous .put
method. Arguments are topic
and message
, followed by any further args passed on subscription. The method
should run to completion quickly.
import asyncio
from primitives import Broker, Agent
broker = Broker()
class MyAgent(Agent):
def put(sef, topic, message, arg):
print(f"User agent. Topic: {topic} Message: {message} Arg: {arg}")
async def sender(t):
for x in range(t):
await asyncio.sleep(1)
broker.publish("foo_topic", f"test {x}")
async def main():
broker.subscribe("foo_topic", MyAgent(), 42)
await sender(10)
In the case of publications whose topics are strings, a single call to
can subscribe an agent
to multiple topics. This is by wildcard
matching. By default exact matching is used, however this can be changed to use
regular expressions as in this code fragment:
from primitives import Broker, RegExp
broker.subscribe(RegExp(".*_topic"), some_agent)
In this case some_agent
would be triggered by publications to foo_topic
because the string ".*_topic"
matches these by the rules of
regular expressions.
As in the real world, publication carries no guarantee of readership. If at the
time of publication there are no tasks with subscribed agent
instances, the
message will silently be lost.
Arguments must be hashable objects. Mutable objects such as lists and
dictionaries are not permitted. If an object can be added to a set
it is
valid. In general, interfaces such as Pin
instances are OK.
An agent
can be subscribed to multiple topic
s. An agent
may be subscribed
to a topic
multiple times only if each instance has different arguments.
If a message causes a queue to fill, a message will silently be lost. It is the
responsibility of the subscriber to avoid this. In the case of a Queue
instance the lost message is the one causing the overflow. In the case of
the oldest message in the queue is discarded. In some
applications this behaviour is preferable. In general RingbufQueue
preferred as it is optimised for microcontroller use and supports retrieval by
an asynchronous iterator.
If either queue type is subscribed with args, a publication will create a queue
entry that is a 3-tuple (topic, message, (args...))
. There is no obvious use
case for this.
An agent
instance is owned by a subscribing tasks but is executed by a
publishing task. If a function used as an agent
throws an exception, the
traceback will point to a Broker.publish
The Broker
class throws a ValueError
if .subscribe
is called with an
invalid agent
type. There are a number of non-fatal conditions which can occur
such as a queue overflow or an attempt to unsubscribe an agent
twice. The
will report these if Broker.Verbose=True
Import as follows:
from primitives import launch
enables a function to accept a coro or a callback interchangeably. It
accepts the callable plus a tuple of args. If a callback is passed, launch
runs it and returns the callback's return value. If a coro is passed, it is
converted to a task
and run asynchronously. The return value is the task
instance. A usage example is in primitives/
Import as follows:
from primitives import set_global_exception
is a convenience funtion to enable a global exception
handler to simplify debugging. The function takes no args. It is called as
import asyncio
from primitives import set_global_exception
async def main():
# Main body of application code omitted
asyncio.new_event_loop() # Clear retained state
This is explained in the tutorial. In essence if an exception occurs in a task, the default behaviour is for the task to stop but for the rest of the code to continue to run. This means that the failure can be missed and the sequence of events can be hard to deduce. A global handler ensures that the entire application stops allowing the traceback and other debug prints to be studied.