Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure tests are deterministic and faster #77

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 mypy pytest black colorama
python -m pip install flake8 mypy pytest black colorama stime
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi

- name: Run tests with pytest
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ classifiers = [
"Documentation" = "https://simple-pid.readthedocs.io/"

[project.optional-dependencies]
test = ["pytest"]
test = [
"pytest",
"stime",
]
doc = [
"furo==2023.3.27",
"myst-parser==1.0.0",
Expand Down
18 changes: 9 additions & 9 deletions simple_pid/pid.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ def _clamp(value, limits):
class PID(object):
"""A simple PID controller."""

MIN_DT = 1e-16

def __init__(
self,
Kp=1.0,
Expand Down Expand Up @@ -98,27 +100,25 @@ def __init__(
# Set initial state of the controller
self._integral = _clamp(starting_output, output_limits)

def __call__(self, input_, dt=None):
def __call__(self, input_):
"""
Update the PID controller.

Call the PID controller with *input_* and calculate and return a control output if
sample_time seconds has passed since the last update. If no new output is calculated,
return the previous output instead (or None if no value has been calculated yet).

:param dt: If set, uses this value for timestep instead of real time. This can be used in
simulations when simulation time is different from real time.
"""
if not self.auto_mode:
return self._last_output

now = self.time_fn()
if dt is None:
dt = now - self._last_time if (now - self._last_time) else 1e-16
elif dt <= 0:
raise ValueError('dt has negative value {}, must be positive'.format(dt))
dt = now - self._last_time if (now - self._last_time) else PID.MIN_DT

if self.sample_time is not None and dt < self.sample_time and self._last_output is not None:
if (
self.sample_time is not None
and dt < self.sample_time - PID.MIN_DT
and self._last_output is not None
):
# Only update every sample_time seconds
return self._last_output

Expand Down
2 changes: 1 addition & 1 deletion simple_pid/pid.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class PID(object):
time_fn: Optional[Callable[[], float]] = ...,
starting_output: float = ...,
) -> None: ...
def __call__(self, input_: float, dt: Optional[float] = ...) -> Optional[float]: ...
def __call__(self, input_: float) -> Optional[float]: ...
def __repr__(self) -> str: ...
@property
def components(self) -> _Components: ...
Expand Down
88 changes: 55 additions & 33 deletions tests/test_pid.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import time
import stime
import pytest
from simple_pid import PID

Expand All @@ -25,59 +26,63 @@ def test_P_negative_setpoint():


def test_I():
pid = PID(0, 10, 0, setpoint=10, sample_time=0.1)
time.sleep(0.1)
stime.reset(0)
pid = PID(0, 10, 0, setpoint=10, sample_time=0.1, time_fn=stime.monotonic)
stime.tick(0.1)

assert round(pid(0)) == 10.0 # Make sure we are close to expected value
time.sleep(0.1)
stime.tick(0.1)

assert round(pid(0)) == 20.0


def test_I_negative_setpoint():
pid = PID(0, 10, 0, setpoint=-10, sample_time=0.1)
time.sleep(0.1)
stime.reset(0)
pid = PID(0, 10, 0, setpoint=-10, sample_time=0.1, time_fn=stime.monotonic)
stime.tick(0.1)

assert round(pid(0)) == -10.0
time.sleep(0.1)
stime.tick(0.1)

assert round(pid(0)) == -20.0


def test_D():
pid = PID(0, 0, 0.1, setpoint=10, sample_time=0.1)
stime.reset(0)
pid = PID(0, 0, 0.1, setpoint=10, sample_time=0.1, time_fn=stime.monotonic)

# Should not compute derivative when there is no previous input (don't assume 0 as first input)
assert pid(0) == 0
time.sleep(0.1)
stime.tick(0.1)

# Derivative is 0 when input is the same
assert pid(0) == 0
assert pid(0) == 0
time.sleep(0.1)
stime.tick(0.1)

assert round(pid(5)) == -5
time.sleep(0.1)
stime.tick(0.1)
assert round(pid(15)) == -10


def test_D_negative_setpoint():
pid = PID(0, 0, 0.1, setpoint=-10, sample_time=0.1)
time.sleep(0.1)
stime.reset(0)
pid = PID(0, 0, 0.1, setpoint=-10, sample_time=0.1, time_fn=stime.monotonic)
stime.tick(0.1)

# Should not compute derivative when there is no previous input (don't assume 0 as first input)
assert pid(0) == 0
time.sleep(0.1)
stime.tick(0.1)

# Derivative is 0 when input is the same
assert pid(0) == 0
assert pid(0) == 0
time.sleep(0.1)
stime.tick(0.1)

assert round(pid(5)) == -5
time.sleep(0.1)
stime.tick(0.1)
assert round(pid(-5)) == 10
time.sleep(0.1)
stime.tick(0.1)
assert round(pid(-15)) == 10


Expand All @@ -89,11 +94,14 @@ def test_desired_state():


def test_output_limits():
pid = PID(100, 20, 40, setpoint=10, output_limits=(0, 100), sample_time=None)
time.sleep(0.1)
stime.reset(0)
pid = PID(
100, 20, 40, setpoint=10, output_limits=(0, 100), sample_time=None, time_fn=stime.monotonic
)
stime.tick(0.1)

assert 0 <= pid(0) <= 100
time.sleep(0.1)
stime.tick(0.1)

assert 0 <= pid(-100) <= 100

Expand Down Expand Up @@ -156,7 +164,8 @@ def test_starting_output():


def test_auto_mode():
pid = PID(1, 0, 0, setpoint=10, sample_time=None)
stime.reset(0)
pid = PID(1, 0, 0, setpoint=10, sample_time=None, time_fn=stime.monotonic)

# Ensure updates happen by default
assert pid(0) == 10
Expand All @@ -175,7 +184,7 @@ def test_auto_mode():

# Last update time should be reset to avoid huge dt
pid.auto_mode = False
time.sleep(1)
stime.tick(1)
pid.auto_mode = True
assert pid.time_fn() - pid._last_time < 0.01

Expand All @@ -186,11 +195,12 @@ def test_auto_mode():


def test_separate_components():
pid = PID(1, 0, 1, setpoint=10, sample_time=0.1)
stime.reset(0)
pid = PID(1, 0, 1, setpoint=10, sample_time=0.1, time_fn=stime.monotonic)

assert pid(0) == 10
assert pid.components == (10, 0, 0)
time.sleep(0.1)
stime.tick(0.1)

assert round(pid(5)) == -45
assert tuple(round(term) for term in pid.components) == (5, 0, -50)
Expand Down Expand Up @@ -236,42 +246,54 @@ def test_repr():


def test_converge_system():
pid = PID(1, 0.8, 0.04, setpoint=5, output_limits=(-5, 5))
stime.reset(0)
pid = PID(1, 0.8, 0.04, setpoint=5, output_limits=(-5, 5), time_fn=stime.monotonic)
pv = 0 # Process variable

def update_system(c, dt):
# Calculate a simple system model
return pv + c * dt - 1 * dt

start_time = time.time()
start_time = stime.time()
last_time = start_time

while time.time() - start_time < 120:
for _ in range(5):
stime.tick()
c = pid(pv)
pv = update_system(c, time.time() - last_time)
pv = update_system(c, stime.time() - last_time)

last_time = time.time()
last_time = stime.time()

# Check if system has converged
assert abs(pv - 5) < 0.1


def test_converge_diff_on_error():
pid = PID(1, 0.8, 0.04, setpoint=5, output_limits=(-5, 5), differential_on_measurement=False)
stime.reset(0)
pid = PID(
1,
0.8,
0.04,
setpoint=5,
output_limits=(-5, 5),
differential_on_measurement=False,
time_fn=stime.monotonic,
)
pv = 0 # Process variable

def update_system(c, dt):
# Calculate a simple system model
return pv + c * dt - 1 * dt

start_time = time.time()
start_time = stime.time()
last_time = start_time

while time.time() - start_time < 12:
for _ in range(5):
stime.tick()
c = pid(pv)
pv = update_system(c, time.time() - last_time)
pv = update_system(c, stime.time() - last_time)

last_time = time.time()
last_time = stime.time()

# Check if system has converged
assert abs(pv - 5) < 0.1
Expand Down