Compare commits

..

No commits in common. "c7d0f0e03baf3970a4e0b0659cbb07aff5211dda" and "d537de05fc43d4a4c6d1a5c2c60a4cd45336e223" have entirely different histories.

10 changed files with 82 additions and 344 deletions

2
.gitignore vendored
View file

@ -1,2 +1,2 @@
*.stamp
local_secrets.py
secrets.py

3
.gitmodules vendored
View file

@ -1,3 +0,0 @@
[submodule "third_party/micropython"]
path = third_party/micropython
url = https://github.com/micropython/micropython.git

View file

@ -11,25 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SRC = $(wildcard *.py) $(wildcard lib/*.py)
SRC = $(wildcard *.py)
MPY_CROSS = ../third_party/micropython/mpy-cross/build/mpy-cross
put: $(SRC:%.py=build/put-%.stamp)
put: $(SRC:%.py=put-%.stamp)
ampy -p /dev/ttyUSB? ls
build/put-%.stamp: %.py
mkdir -p $(@D)
ampy -p /dev/ttyUSB? put $< $(<F)
touch -r $< $@
build/%.mpy: %.py $(MPY_CROSS)
mkdir -p $(@D)
$(MPY_CROSS) -O1 -march=xtensa -o $@ $<
$(MPY_CROSS):
$(MAKE) -C $(dir $(@D))
clean:
rm -rf build
put-%.stamp: %.py
ampy -p /dev/ttyUSB? put $*.py
cp -a $< $@

View file

@ -15,16 +15,14 @@ import time
import compat
from typing import Generator
def delay(secs: float) -> Generator[None, None, None]:
def delay(secs: float) -> generator:
end = compat.monotonic() + secs
while compat.monotonic() < end:
yield None
def fps(fps: float) -> Generator[None, None, None]:
def fps(fps: float) -> generator:
yield None
dt = 1 / fps
until = compat.monotonic() + dt
@ -39,12 +37,16 @@ def fps(fps: float) -> Generator[None, None, None]:
yield None
def wait(stop_ms: int) -> Generator[int, None, None]:
"""Waits for stop_ms to pass, yielding how many ms have passed so far."""
def wait(stop: int) -> generator:
start = compat.monotonic()
while True:
elapsed = int((compat.monotonic() - start) * 1000)
elapsed = min(stop_ms, elapsed)
elapsed = min(stop, elapsed)
yield elapsed
if elapsed >= stop_ms:
if elapsed >= stop:
break
def test():
for f in delay(2.0):
print(utime.ticks_ms())

View file

@ -14,16 +14,14 @@
import random
import time
import utime
def monotonic():
return time.ticks_ms() * 1e-3
def monotonic() -> float:
return utime.ticks_ms() * 1e-3
def sleep(secs):
time.sleep_ms(int(secs * 1e3))
def sleep(secs: float) -> None:
utime.sleep_ms(int(secs * 1e3))
def randint(start: int, limit: int) -> int:
def randint(start, limit):
return start + random.getrandbits(30) % (limit - start + 1)

View file

@ -13,7 +13,6 @@
# limitations under the License.
import random
import time
from typing import Generator, Optional
import urequests
@ -21,7 +20,7 @@ import asynced
import compat
def _fetch() -> Optional[dict]:
def _fetch():
try:
return urequests.get("http://worldtimeapi.org/api/ip").json()
except OSError as ex:
@ -36,7 +35,7 @@ def _jitter(mid: int) -> int:
return compat.randint(mid, mid * 120 // 100)
def _sync() -> Generator[Optional[dict], None, None]:
def _sync() -> generator:
while True:
# Poll quickly until the first result comes in.
while True:
@ -65,12 +64,12 @@ def _get_day_sec(resp):
return float(hms[0]) * 3600 + float(hms[1]) * 60 + float(hms[2])
def day_sec() -> Generator[Optional[float], None, None]:
def day_sec() -> generator:
s = _sync()
# Spin until the first result comes in.
for got in s:
if got is None:
yield None
yield None, None
continue
local = compat.monotonic()
base = _get_day_sec(got)
@ -78,10 +77,9 @@ def day_sec() -> Generator[Optional[float], None, None]:
break
good = got
assert base is not None
for got in s:
now = base + compat.monotonic() - local
yield now % (60 * 60 * 24)
yield now % (60 * 60 * 24), good
if got is not None:
# Update the baseline.
@ -93,7 +91,7 @@ def day_sec() -> Generator[Optional[float], None, None]:
def test():
for secs in day_sec():
for secs, meta in day_sec():
print(secs)
compat.sleep(0.3)

View file

@ -1,7 +0,0 @@
Generator = None
Optional = None
Tuple = None
Iterable = None

View file

@ -1,166 +0,0 @@
import socket
import asynced
from typing import Generator, Optional, Tuple
_CONNECT = 1
_CONNACK = 2
_PUBLISH = 3
_PUBACK = 4
_PUBREC = 5
_PUBREL = 6
_PUBCOMP = 7
_SUBSCRIBE = 8
_SUBACK = 9
_UNSUBSCRIBE = 10
_UNSUBACK = 11
_PINGREQ = 12
_PINGRESP = 13
_DISCONNECT = 14
_DUP = 8
_QOS = 2
_RETAIN = 1
_UPDATE_SECONDS = 120
class Message:
def __init__(self, topic: str, value: str):
self.topic = topic
self.value = value
topic: str
value: str
def _getch() -> Generator[None, Optional[int], int]:
"""Returns the first non-None value sent to the generator"""
while True:
got = yield None
if got is not None:
return got
def _get_packet() -> Generator[None, Optional[int], Tuple[int, bytearray]]:
"""Decodes and returns the next packet"""
control = yield from _getch()
size = 0
shift = 0
while True:
got = yield from _getch()
size |= (got & 0x7F) << shift
shift += 7
if (got & 0x80) == 0:
break
payload = bytearray(size)
for i in range(len(payload)):
payload[i] = yield from _getch()
return control, payload
def _encode_string(value: str) -> bytearray:
"""Encodes a length-value string"""
encoded = value.encode()
return bytearray((len(encoded) >> 8, len(encoded) & 0xFF)) + encoded
def _write_packet(
sock,
control: int,
payload: bytearray | bytes,
packet_id: Optional[int] = None,
) -> None:
if packet_id is not None:
header = bytearray(
(control, len(payload) + 2, (packet_id >> 8) & 0xFF, packet_id & 0xFF)
)
else:
header = bytearray((control, len(payload)))
print("mqtt: >", header, payload)
sock.write(header + payload)
def _send_connect(sock: socket.socket, username: str, password: str):
flags = 0x02
if username:
flags |= 0x80
if password:
flags |= 0x40
payload = (
_encode_string("MQTT")
+ b"\x04" # Protocol level
+ bytearray((flags,))
+ bytearray((0, _UPDATE_SECONDS * 2)) # Keep alive seconds
+ _encode_string("") # Client identifier
)
if username:
payload += _encode_string(username)
if password:
payload += _encode_string(password)
_write_packet(sock, _CONNECT << 4, payload)
def _send_subscribe(sock: socket.socket, topic: str):
packet_id = hash(topic) & 0xFFFF
_write_packet(
sock,
(_SUBSCRIBE << 4) | 2,
_encode_string(topic) +
# QOS
b"\x00",
packet_id,
)
def _send_publish(sock: socket.socket, topic: str, value: str):
_write_packet(sock, _PUBLISH << 4, _encode_string(topic) + value.encode())
def _sender(sock: socket.socket, path: str) -> Generator[None, None, None]:
_send_subscribe(sock, path + "/#")
while True:
_send_publish(sock, path + "/available", "online")
yield from asynced.delay(_UPDATE_SECONDS)
def _receiver() -> Generator[Optional[Message], Optional[int], None]:
while True:
control, payload = yield from _get_packet()
if (control >> 4) == _PUBLISH:
qos = (control // _QOS) & 3
topic_length = (payload[0] << 8) | payload[1]
topic = payload[2 : 2 + topic_length].decode()
has_packet_id = qos == 1 or qos == 2
packet_id_length = 2 if has_packet_id else 0
value = payload[2 + topic_length + packet_id_length :].decode()
print("mqtt: topic update:", topic, value)
yield Message(topic, value)
def client(
sock: socket.socket,
path: str,
username: str = "",
password: str = "",
) -> Generator[Optional[Message], Optional[int], None]:
_send_connect(sock, username, password)
connack, payload = yield from _get_packet()
if connack != _CONNACK << 4:
raise Exception("Expected CONNACK")
if len(payload) < 2:
raise Exception("CONNACK is too short")
if payload[1] != 0:
raise Exception("CONNACK has an error code")
receiver = _receiver()
sender = _sender(sock, path)
got = yield None
while True:
sender.send(None)
got = yield receiver.send(got)

View file

@ -12,27 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import array
import binascii
import sys
import socket
import random
import secrets
import time
import machine
import neopixel
import network
import micropython
import asynced
import compat
import iptime
import mqtt
import local_secrets
from typing import Optional, Generator, Iterable, Tuple
if "ESP32" in str(sys.implementation):
LED_BUILTIN = 2
else:
LED_BUILTIN = 16
# fmt: off
RANGES = {
@ -90,8 +80,7 @@ COLOUR = (200, 255, 200)
# fmt: on
def _split(secs: int) -> Tuple[str, str, str]:
"""Converts a seconds-of-day into minutes, hours, and AM/PM"""
def split(secs: int) -> tuple:
secs = int(secs)
mins = (secs // 60) % 60
minute = mins // 5
@ -111,11 +100,13 @@ class Frame:
Max = 255
Zeros = tuple([0] * (11 * 10))
def __init__(self, n: int = 11 * 10, pixels: Optional[array.array] = None):
self.pixels = pixels if pixels else list(self.Zeros)
def __init__(self, n: int = 11 * 10, pixels: list = None):
if pixels is None:
pixels = list(self.Zeros)
self.pixels = pixels
def set(self, idx: int, value: int) -> "Frame":
self.pixels[idx] = value
def set(self, idx: int) -> "Frame":
self.pixels[idx] = self.Max
return self
def __add__(self, other):
@ -129,7 +120,6 @@ class Frame:
@micropython.native
def merge(f1: Frame, f2: Frame, num1: int, num2: int, den: int) -> Frame:
"""Merges two frames together."""
m = Frame.Max
p1 = f1.pixels
p2 = f2.pixels
@ -142,27 +132,24 @@ def merge(f1: Frame, f2: Frame, num1: int, num2: int, den: int) -> Frame:
return Frame(pixels=o)
def _render(words: Iterable[str], brightness: int) -> Frame:
"""Renders a list of words into a frame"""
def render(words: list) -> Frame:
f = Frame()
for word in words:
for w in word.split():
idx = RANGES.get(w, None)
if idx is not None:
for i in range(len(w)):
f.set(idx + i, brightness)
f.set(idx + i)
else:
print("warning: %s is missing" % w)
return f
def _prefix() -> Tuple[str]:
"""Returns a random suffix word such as 'dude'"""
def prefix() -> tuple:
return (ITS[compat.randint(0, len(ITS) - 1)],)
def _show(f: Frame, n):
"""Renders a frame to a NeoPixel handler"""
def show(f: Frame, n):
p = f.pixels
buf = bytearray(len(p) * 3)
r, g, b = COLOUR
@ -176,96 +163,89 @@ def _show(f: Frame, n):
n.write()
def _scan() -> Generator[Frame, None, None]:
"""Returns a stream of frames that scan across the LEDs"""
def scan() -> generator:
f = Frame()
while True:
for i in range(len(f.pixels)):
yield f.set(i, Frame.Max)
yield f.set(i)
f = f.muldiv(80, 100)
def _intensity(num: int, den: int) -> int:
def intensity(num: int, den: int) -> int:
idx = num * len(CIEL8) // den
if idx >= len(CIEL8):
return 255
return CIEL8[idx]
def _crossfade(src: Frame, dest: Frame, fade_time: int):
for at in asynced.wait(fade_time):
yield merge(
src,
dest,
_intensity(fade_time - at, fade_time),
_intensity(at, fade_time),
Frame.Max,
)
def brightness(secs):
hour = int(secs) // 60 // 60
if hour >= 7 and hour < 22:
return 255
return 1200
def _run(brightness_queue: list[int]) -> Generator[Optional[Frame], None, None]:
def run() -> generator:
words = None
fade_time = 2000
fade = 2000
secs = iptime.day_sec()
brightness = 255
# Flash until time is synced.
sc = _scan()
for s in secs:
if s is not None:
sc = scan()
for s, meta in secs:
if s is not None and meta is not None:
break
yield next(sc)
src = Frame()
while True:
changed = False
if brightness_queue:
brightness = brightness_queue[-1]
brightness_queue.clear()
changed = True
now = next(secs)
next_words = _split(now)
if words != next_words:
words = next_words
changed = True
if changed:
next_frame = _render(_prefix() + next_words, brightness)
yield from _crossfade(src, next_frame, fade_time)
src = next_frame
else:
yield None
s, m2 = next(secs)
nxt = split(s)
if nxt == words:
yield from asynced.delay(2)
continue
dest = render(prefix() + nxt)
for at in asynced.wait(fade):
yield merge(
src,
dest,
intensity(fade - at, fade),
intensity(at, fade),
brightness(s),
)
words = nxt
src = dest
def blink() -> Generator[None, None, None]:
led = machine.Pin(LED_BUILTIN, machine.Pin.OUT)
def blink() -> generator:
led = machine.Pin(16, machine.Pin.OUT)
while True:
led.on()
yield from asynced.delay(0.03)
led.off()
yield from asynced.delay(0.03)
led.on()
yield from asynced.delay(2.9)
def bench() -> Generator[None, None, None]:
def bench() -> generator:
yield None
i = 30
while True:
start = compat.monotonic()
for _ in range(i):
yield None
yield from range(i)
elapsed = compat.monotonic() - start
if elapsed > 0:
print("%d in %.3f %.3f/1 %.1f FPS" % (i, elapsed, elapsed / i, i / elapsed))
def _connect(wlan: network.WLAN) -> Generator[None, None, None]:
def connect(wlan) -> generator:
yield None
while True:
if not wlan.isconnected():
wlan.active(True)
wlan.connect(local_secrets.WLAN_ESSID, local_secrets.WLAN_PASSWORD)
wlan.connect(secrets.WLAN_ESSID, secrets.WLAN_PASSWORD)
else:
# Override the DNS server as MicroPython and dnsmasq seem
# to be incompatible.
@ -274,64 +254,15 @@ def _connect(wlan: network.WLAN) -> Generator[None, None, None]:
yield from asynced.delay(10)
def _mqtt_client(
is_connected, brightness_queue: list[int]
) -> Generator[None, None, None]:
while True:
yield from asynced.delay(3)
if not is_connected():
continue
sock = socket.socket()
try:
target = socket.getaddrinfo(local_secrets.MQTT_HOST, 1883)[0][-1]
print("mqtt: connecting to", target)
sock.connect(target)
sock.setblocking(False)
topic = "light/wc-" + binascii.hexlify(machine.unique_id()).decode()
client = mqtt.client(
sock, topic, local_secrets.MQTT_USERNAME, local_secrets.MQTT_PASSWORD
)
while True:
got = sock.read()
if not got:
client.send(None)
yield None
continue
for ch in got:
message = client.send(ch)
if not message:
continue
client.send(None)
if message.topic.endswith("/brightness"):
brightness_queue.append(int(message.value))
except Exception as ex:
print("mqtt: exception:", ex)
finally:
print("mqtt: close")
sock.close()
def main():
num = 11 * 10
n = neopixel.NeoPixel(machine.Pin(2), num)
wlan = network.WLAN(network.STA_IF)
brightness_queue = []
routines = (
_connect(wlan),
_mqtt_client(wlan.isconnected, brightness_queue),
_run(brightness_queue),
blink(),
asynced.fps(30),
bench(),
)
routines = (connect(wlan), run(), blink(), asynced.fps(30), bench())
while True:
for r in routines:
f = next(r)
if f is None:
continue
elif isinstance(f, Frame):
_show(f, n)
show(f, n)

@ -1 +0,0 @@
Subproject commit 678707c8b07323c5b914778708a2858387c3b60c