338 lines
8.9 KiB
Python
338 lines
8.9 KiB
Python
# Copyright 2020 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import array
|
|
import binascii
|
|
import sys
|
|
import socket
|
|
|
|
import machine
|
|
import neopixel
|
|
import network
|
|
import micropython
|
|
|
|
import asynced
|
|
import compat
|
|
import iptime
|
|
import mqtt
|
|
import local_secrets
|
|
|
|
from typing import Optional, Generator, Iterable, Tuple
|
|
|
|
if "ESP32" in str(sys.implementation):
|
|
LED_BUILTIN = 2
|
|
else:
|
|
LED_BUILTIN = 16
|
|
|
|
# fmt: off
|
|
RANGES = {
|
|
'dude': 0,
|
|
'ok': 5,
|
|
'ten': 8,
|
|
'seven': 11,
|
|
'twelve': 16,
|
|
'even': 22,
|
|
'eleven': 22,
|
|
'eight': 28,
|
|
'four': 33,
|
|
'five': 37,
|
|
'two': 41,
|
|
'three': 44,
|
|
'six': 49,
|
|
'one': 52,
|
|
'past': 55,
|
|
'run': 60,
|
|
'nine': 62,
|
|
'to': 66,
|
|
'TEN': 69,
|
|
'HALF': 73,
|
|
'TWENTY': 77,
|
|
'TWENTYFIVE': 77,
|
|
'FIVE': 83,
|
|
'QUARTER': 90,
|
|
'A': 98,
|
|
'AM': 106,
|
|
'PM': 108,
|
|
'is': 102,
|
|
'it': 99,
|
|
'its': 99,
|
|
}
|
|
|
|
CIEL8 = (0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 5, 5, 6, 6, 7, 8, 8, 9,
|
|
10, 11, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 22, 23, 25, 26, 28,
|
|
30, 31, 33, 35, 37, 39, 40, 42, 44, 46, 49, 51, 53, 55, 57, 60, 63,
|
|
66, 68, 71, 74, 77, 80, 83, 86, 90, 93, 96, 100, 103, 107, 111, 115,
|
|
118, 122, 127, 131, 136, 140, 144, 149, 153, 158, 163, 168, 173, 179,
|
|
184, 189, 195, 200, 206, 212, 218, 224, 230, 236, 242, 249, 255)
|
|
|
|
MINUTES = ('oclock', 'FIVE past', 'TEN past', 'QUARTER past', 'TWENTY past',
|
|
'TWENTYFIVE past', 'HALF past', 'TWENTYFIVE to', 'TWENTY to',
|
|
'QUARTER to', 'TEN to', 'FIVE to')
|
|
|
|
HOURS = ('twelve', 'one', 'two', 'three', 'four', 'five', 'six', 'seven',
|
|
'eight', 'nine', 'ten', 'eleven')
|
|
|
|
AMPM = ('AM', 'PM')
|
|
|
|
ITS = ('its', 'it is', 'its dude', 'it is dude', 'its ok', 'it is ok')
|
|
|
|
COLOUR = (200, 255, 200)
|
|
# fmt: on
|
|
|
|
|
|
def _split(secs: int) -> Tuple[str, str, str]:
|
|
"""Converts a seconds-of-day into minutes, hours, and AM/PM"""
|
|
secs = int(secs)
|
|
mins = (secs // 60) % 60
|
|
minute = mins // 5
|
|
hour = (secs // 3600) % 24
|
|
if mins >= 35:
|
|
hour += 1
|
|
ampm = (hour // 12) % len(AMPM)
|
|
hour %= len(HOURS)
|
|
return MINUTES[minute], HOURS[hour], AMPM[ampm]
|
|
|
|
|
|
def to_rgb(i, colour) -> tuple:
|
|
return ((i * colour[0]) >> 8, (i * colour[1]) >> 8, (i * colour[2]) >> 8)
|
|
|
|
|
|
class Frame:
|
|
Max = 255
|
|
Zeros = tuple([0] * (11 * 10))
|
|
|
|
def __init__(self, n: int = 11 * 10, pixels: Optional[array.array] = None):
|
|
self.pixels = pixels if pixels else list(self.Zeros)
|
|
|
|
def set(self, idx: int, value: int) -> "Frame":
|
|
self.pixels[idx] = value
|
|
return self
|
|
|
|
def __add__(self, other):
|
|
return Frame(
|
|
pixels=[min(self.Max, x + y) for x, y in zip(self.pixels, other.pixels)]
|
|
)
|
|
|
|
def muldiv(self, num: int, den: int):
|
|
return Frame(pixels=[x * num // den for x in self.pixels])
|
|
|
|
|
|
@micropython.native
|
|
def merge(f1: Frame, f2: Frame, num1: int, num2: int, den: int) -> Frame:
|
|
"""Merges two frames together."""
|
|
m = Frame.Max
|
|
p1 = f1.pixels
|
|
p2 = f2.pixels
|
|
o = array.array("H", Frame.Zeros)
|
|
for i in range(len(p1)):
|
|
k = (p1[i] * num1 + p2[i] * num2) // den
|
|
if k >= m:
|
|
k = m
|
|
o[i] = k
|
|
return Frame(pixels=o)
|
|
|
|
|
|
def _render(words: Iterable[str], brightness: int) -> Frame:
|
|
"""Renders a list of words into a frame"""
|
|
f = Frame()
|
|
for word in words:
|
|
for w in word.split():
|
|
idx = RANGES.get(w, None)
|
|
if idx is not None:
|
|
for i in range(len(w)):
|
|
f.set(idx + i, brightness)
|
|
else:
|
|
print("warning: %s is missing" % w)
|
|
return f
|
|
|
|
|
|
def _prefix() -> Tuple[str]:
|
|
"""Returns a random suffix word such as 'dude'"""
|
|
return (ITS[compat.randint(0, len(ITS) - 1)],)
|
|
|
|
|
|
def _show(f: Frame, n):
|
|
"""Renders a frame to a NeoPixel handler"""
|
|
p = f.pixels
|
|
buf = bytearray(len(p) * 3)
|
|
r, g, b = COLOUR
|
|
i = 0
|
|
|
|
for c in p:
|
|
n.buf[i + 1] = (r * c) >> 8
|
|
n.buf[i + 0] = (g * c) >> 8
|
|
n.buf[i + 2] = (b * c) >> 8
|
|
i += 3
|
|
n.write()
|
|
|
|
|
|
def _scan() -> Generator[Frame, None, None]:
|
|
"""Returns a stream of frames that scan across the LEDs"""
|
|
f = Frame()
|
|
while True:
|
|
for i in range(len(f.pixels)):
|
|
yield f.set(i, Frame.Max)
|
|
f = f.muldiv(80, 100)
|
|
|
|
|
|
def _intensity(num: int, den: int) -> int:
|
|
idx = num * len(CIEL8) // den
|
|
if idx >= len(CIEL8):
|
|
return 255
|
|
return CIEL8[idx]
|
|
|
|
|
|
def _crossfade(src: Frame, dest: Frame, fade_time: int):
|
|
for at in asynced.wait(fade_time):
|
|
yield merge(
|
|
src,
|
|
dest,
|
|
_intensity(fade_time - at, fade_time),
|
|
_intensity(at, fade_time),
|
|
Frame.Max,
|
|
)
|
|
|
|
|
|
def _run(brightness_queue: list[int]) -> Generator[Optional[Frame], None, None]:
|
|
words = None
|
|
fade_time = 2000
|
|
secs = iptime.day_sec()
|
|
brightness = 255
|
|
|
|
# Flash until time is synced.
|
|
sc = _scan()
|
|
for s in secs:
|
|
if s is not None:
|
|
break
|
|
yield next(sc)
|
|
|
|
src = Frame()
|
|
while True:
|
|
changed = False
|
|
if brightness_queue:
|
|
brightness = brightness_queue[-1]
|
|
brightness_queue.clear()
|
|
changed = True
|
|
|
|
now = next(secs)
|
|
next_words = _split(now)
|
|
if words != next_words:
|
|
words = next_words
|
|
changed = True
|
|
|
|
if changed:
|
|
next_frame = _render(_prefix() + next_words, brightness)
|
|
yield from _crossfade(src, next_frame, fade_time)
|
|
src = next_frame
|
|
else:
|
|
yield from asynced.delay(2)
|
|
|
|
|
|
def blink() -> Generator[None, None, None]:
|
|
led = machine.Pin(LED_BUILTIN, machine.Pin.OUT)
|
|
while True:
|
|
led.on()
|
|
yield from asynced.delay(0.03)
|
|
led.off()
|
|
yield from asynced.delay(2.9)
|
|
|
|
|
|
def bench() -> Generator[None, None, None]:
|
|
yield None
|
|
i = 30
|
|
while True:
|
|
start = compat.monotonic()
|
|
for _ in range(i):
|
|
yield None
|
|
elapsed = compat.monotonic() - start
|
|
if elapsed > 0:
|
|
print("%d in %.3f %.3f/1 %.1f FPS" % (i, elapsed, elapsed / i, i / elapsed))
|
|
|
|
|
|
def _connect(wlan: network.WLAN) -> Generator[None, None, None]:
|
|
yield None
|
|
|
|
while True:
|
|
if not wlan.isconnected():
|
|
wlan.active(True)
|
|
wlan.connect(local_secrets.WLAN_ESSID, local_secrets.WLAN_PASSWORD)
|
|
else:
|
|
# Override the DNS server as MicroPython and dnsmasq seem
|
|
# to be incompatible.
|
|
cfg = list(wlan.ifconfig())
|
|
wlan.ifconfig(cfg[:3] + ["8.8.8.8"])
|
|
yield from asynced.delay(10)
|
|
|
|
|
|
def _mqtt_client(
|
|
is_connected, brightness_queue: list[int]
|
|
) -> Generator[None, None, None]:
|
|
while True:
|
|
yield from asynced.delay(3)
|
|
|
|
if not is_connected():
|
|
continue
|
|
|
|
sock = socket.socket()
|
|
try:
|
|
target = socket.getaddrinfo(local_secrets.MQTT_HOST, 1883)[0][-1]
|
|
print("mqtt: connecting to", target)
|
|
sock.connect(target)
|
|
sock.setblocking(False)
|
|
|
|
topic = "light/wc-" + binascii.hexlify(machine.unique_id()).decode()
|
|
client = mqtt.client(
|
|
sock, topic, local_secrets.MQTT_USERNAME, local_secrets.MQTT_PASSWORD
|
|
)
|
|
while True:
|
|
got = sock.read()
|
|
if not got:
|
|
client.send(None)
|
|
yield None
|
|
continue
|
|
|
|
for ch in got:
|
|
message = client.send(ch)
|
|
if not message:
|
|
continue
|
|
client.send(None)
|
|
if message.topic.endswith("/brightness"):
|
|
brightness_queue.append(int(message.value))
|
|
except Exception as ex:
|
|
print("mqtt: exception:", ex)
|
|
finally:
|
|
print("mqtt: close")
|
|
sock.close()
|
|
|
|
|
|
def main():
|
|
num = 11 * 10
|
|
n = neopixel.NeoPixel(machine.Pin(2), num)
|
|
wlan = network.WLAN(network.STA_IF)
|
|
brightness_queue = []
|
|
routines = (
|
|
_connect(wlan),
|
|
_mqtt_client(wlan.isconnected, brightness_queue),
|
|
_run(brightness_queue),
|
|
blink(),
|
|
asynced.fps(30),
|
|
bench(),
|
|
)
|
|
while True:
|
|
for r in routines:
|
|
f = next(r)
|
|
if f is None:
|
|
continue
|
|
elif isinstance(f, Frame):
|
|
_show(f, n)
|