Compare commits

..

30 commits
v0.0 ... master

Author SHA1 Message Date
Michael Hope c7d0f0e03b wordclock: finish implementing mqtt dimming support 2024-03-03 11:18:14 +01:00
Michael Hope ac77aa9ed1 wordclock: rename secrets to local_secrets to prevent colliding with the stdlib 2024-03-03 10:29:54 +01:00
Michael Hope c3ff8528bd wordclock: move lib/ files to root on the device and improve stamping 2024-03-03 10:29:13 +01:00
Michael Hope 840728ed05 wordclock: add typing everywhere 2024-03-01 17:13:18 +01:00
Michael Hope a0e254d964 wordclock: tidy up the Makefile and compile the modules 2024-02-29 21:39:00 +01:00
Michael Hope d537de05fc wordclock: drop the duplicate WLAN setup 2024-02-27 08:06:31 +01:00
Michael Hope a8c6c04e11 wordclock: run black and isort 2024-02-27 08:05:22 +01:00
Michael Hope e7d92b5013 wordclock: update to work with the latest MicroPython 2024-02-27 07:58:25 +01:00
Michael Hope ffead18f6e wordclock: add the copyrights everywhere 2020-12-01 21:15:19 +01:00
Michael Hope af5e3d44b8 wordclock: various tidy ups
Handle more errors so the clock doesn't stop.

Reset if any other exception occurs.

Handle an empty response from iptime.

Blink less often.

Dim the clock outside hours.
2020-11-29 20:51:54 +01:00
Michael Hope a08ad836c2 iptime: update to match astral 2.2 2020-10-25 19:53:32 +00:00
Michael Hope 8573ab58c1 iptime: cache the external IP 2020-01-05 13:44:19 +01:00
Michael Hope c2a63719a9 iptime: add Prometheus metrics 2020-01-02 15:40:29 +01:00
Michael Hope 1c86e5c746 iptime: also support requests from localhost. 2020-01-02 11:37:04 +01:00
Michael Hope 7750379117 iptime: add the missing requests dependency. 2020-01-02 11:34:25 +01:00
Michael Hope 52a984c726 iptime: fix the pyflake8 errors 2020-01-02 11:32:47 +01:00
Michael Hope 721c9fe70b iptime: added a setup script and tests 2020-01-02 11:27:54 +01:00
Michael Hope 4a3cf46a58 Added a license and basic readme. 2019-03-10 20:35:33 +01:00
Michael Hope 34f63bac66 hw: imported the SCAD and rendered layers 2019-03-10 20:35:03 +01:00
Michael Hope 67dfd2e378 src: renamed hw to src 2018-12-29 16:55:08 +01:00
Michael Hope 86a5c4d90e Added .gitignore 2018-12-29 16:54:53 +01:00
Michael Hope a07a986168 worldclock: improve performance, add intensity scaling.
Now does ~25 FPS when transitioning.  Fades based on intensity instead
of PWM level.
2018-12-29 16:28:55 +01:00
Michael Hope 018f224745 wordclock: created v1 2018-12-29 14:24:16 +01:00
Michael Hope 5fd0345df8 worldclock: add a blink and benchmark coroutine. 2018-12-10 16:34:22 +01:00
Michael Hope fc66f73494 iptime: switch to the real retry time. 2018-12-10 16:33:52 +01:00
Michael Hope 67ecf3b1bf asynced: allow fps() to ignore the first run and to catch up. 2018-12-10 16:33:33 +01:00
Michael Hope f11308e4e8 wordclock: added a Makefile, main 2018-12-10 15:53:00 +01:00
Michael Hope 8174e85c90 wordclock: moved to circuitpython, fixed esp8266 bugs 2018-12-10 15:52:34 +01:00
Michael Hope b82355d215 iptime: add a systemd file 2018-12-09 16:08:07 +01:00
Michael Hope 03fa7d15f7 iptime: lookup the external IP address if this comes from inside the network. 2018-12-09 16:07:46 +01:00
21 changed files with 676 additions and 175 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
*.stamp
local_secrets.py

3
.gitmodules vendored Normal file
View file

@ -0,0 +1,3 @@
[submodule "third_party/micropython"]
path = third_party/micropython
url = https://github.com/micropython/micropython.git

14
gen.py
View file

@ -1,3 +1,17 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generate the word to offset mapping for the clock."""
import pprint
import itertools

View file

@ -1,3 +1,17 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/* ITLISASAMPM */
/* ACQUARTERDC */
/* TWENTYFIVEX */

0
iptime/__init__.py Normal file
View file

75
iptime/iptime.py Normal file
View file

@ -0,0 +1,75 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import functools
import time
from geoip import geolite2
import astral
import astral.sun
import dateutil.tz
import flask
import requests
import prometheus_flask_exporter # type: ignore
app = flask.Flask(__name__)
_ = prometheus_flask_exporter.PrometheusMetrics(app)
def _to_sec(dt):
return dt.hour * 60 * 60 + dt.minute * 60 + dt.second
@functools.lru_cache(maxsize=5)
def _external_ip():
r = requests.get('https://api.ipify.org?format=json')
return r.json().get('ip')
@app.route('/now')
def now():
ip = flask.request.args.get('ip')
if not ip:
ip = flask.request.remote_addr
if ip.startswith('192.168.') or ip.startswith('10.') or ip.startswith(
'127.0.'):
ip = _external_ip()
loc = geolite2.lookup(ip)
if loc is None:
flask.abort(
400,
'LOOKUP_FAILURE: unable to resolve the location of IP address %r' %
ip)
tz = dateutil.tz.gettz(loc.timezone)
epoch = time.time()
now = datetime.datetime.now(tz)
city = astral.LocationInfo((loc.ip, loc.country, loc.location[0],
loc.location[1], loc.timezone, 0))
sun = astral.sun.sun(city.observer, date=now)
dawn = sun['dawn']
dusk = sun['dusk']
resp = dict(ip=loc.ip,
country=loc.country,
latitude=loc.location[0],
longitude=loc.location[1],
timezone=loc.timezone,
dawn=dawn,
dawn_sec=_to_sec(dawn),
dusk=dusk,
dusk_sec=_to_sec(dusk),
local_time=now,
day_sec=_to_sec(now),
posix_sec=int(epoch))
return flask.jsonify(resp)

18
iptime/test_iptime.py Normal file
View file

@ -0,0 +1,18 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import iptime.iptime
def test_parse():
_ = iptime.iptime

34
setup.py Normal file
View file

@ -0,0 +1,34 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from setuptools import setup, find_packages
setup(
name='iptime',
version_format='{tag}.dev{commitcount}+{gitsha}',
setup_requires=['setuptools-git-version'],
url='https://juju.nz/src/michaelh/wordclock',
author='Michael Hope',
author_email='michaelh@juju.nz',
description='Word clock services',
packages=find_packages(),
install_requires=[
'Flask>=1.1.1',
'prometheus-flask-exporter>=0.12.1',
'astral>=1.7.1',
'python-dateutil>=2.7.5',
'python-geoip-geolite2-yplan',
'python-geoip-python3>=1.3',
'requests>=2.21',
],
)

View file

@ -1,8 +1,35 @@
SRC = $(wildcard *.py)
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SRC = $(wildcard *.py) $(wildcard lib/*.py)
put: $(SRC:%.py=put-%.stamp)
MPY_CROSS = ../third_party/micropython/mpy-cross/build/mpy-cross
put: $(SRC:%.py=build/put-%.stamp)
ampy -p /dev/ttyUSB? ls
put-%.stamp: %.py
ampy -p /dev/ttyUSB? put $*.py
cp -a $< $@
build/put-%.stamp: %.py
mkdir -p $(@D)
ampy -p /dev/ttyUSB? put $< $(<F)
touch -r $< $@
build/%.mpy: %.py $(MPY_CROSS)
mkdir -p $(@D)
$(MPY_CROSS) -O1 -march=xtensa -o $@ $<
$(MPY_CROSS):
$(MAKE) -C $(dir $(@D))
clean:
rm -rf build

View file

@ -1,20 +1,37 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import compat
def delay(secs: float) -> generator:
end = time.monotonic() + secs
while time.monotonic() < end:
from typing import Generator
def delay(secs: float) -> Generator[None, None, None]:
end = compat.monotonic() + secs
while compat.monotonic() < end:
yield None
def fps(fps: float) -> generator:
def fps(fps: float) -> Generator[None, None, None]:
yield None
dt = 1 / fps
until = time.monotonic() + dt
until = compat.monotonic() + dt
while True:
remain = until - time.monotonic()
remain = until - compat.monotonic()
if remain > 0:
time.sleep(remain)
compat.sleep(remain)
until += dt
if remain < -dt:
# Catch up a bit
@ -22,16 +39,12 @@ def fps(fps: float) -> generator:
yield None
def wait(stop: int) -> generator:
start = time.monotonic()
def wait(stop_ms: int) -> Generator[int, None, None]:
"""Waits for stop_ms to pass, yielding how many ms have passed so far."""
start = compat.monotonic()
while True:
elapsed = int((time.monotonic() - start) * 1000)
elapsed = min(stop, elapsed)
elapsed = int((compat.monotonic() - start) * 1000)
elapsed = min(stop_ms, elapsed)
yield elapsed
if elapsed >= stop:
if elapsed >= stop_ms:
break
def test():
for f in delay(2.0):
print(utime.ticks_ms())

29
src/compat.py Normal file
View file

@ -0,0 +1,29 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import time
import utime
def monotonic() -> float:
return utime.ticks_ms() * 1e-3
def sleep(secs: float) -> None:
utime.sleep_ms(int(secs * 1e3))
def randint(start: int, limit: int) -> int:
return start + random.getrandbits(30) % (limit - start + 1)

View file

@ -1,27 +1,42 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import urllib.urequest
import time
from typing import Generator, Optional
import urequests
import asynced
import ujson
import time
import compat
def _fetch():
def _fetch() -> Optional[dict]:
try:
resp = urllib.urequest.urlopen('https://juju.nz/api/iptime/now')
t = ujson.load(resp)
resp.close()
return t
return urequests.get("http://worldtimeapi.org/api/ip").json()
except OSError as ex:
print(ex)
print("OSError", ex)
return None
except ValueError as ex:
print("ValueError", ex)
return None
def _jitter(mid: int) -> int:
return random.randint(mid, mid * 120 // 100)
return compat.randint(mid, mid * 120 // 100)
def _sync() -> generator:
def _sync() -> Generator[Optional[dict], None, None]:
while True:
# Poll quickly until the first result comes in.
while True:
@ -29,7 +44,7 @@ def _sync() -> generator:
if got is not None:
yield got
break
yield from asynced.delay(_jitter(15))
yield from asynced.delay(_jitter(5))
# Poll slowly until the connection drops.
while True:
@ -40,37 +55,48 @@ def _sync() -> generator:
yield got
def day_sec() -> generator:
def _get_day_sec(resp):
parts = resp.get("datetime", "").split("T")
if len(parts) != 2:
return None
hms = parts[1].split("+")[0].split(":")
if len(hms) != 3:
return None
return float(hms[0]) * 3600 + float(hms[1]) * 60 + float(hms[2])
def day_sec() -> Generator[Optional[float], None, None]:
s = _sync()
# Spin until the first result comes in.
for got in s:
if got is None:
yield None, None
yield None
continue
local = time.monotonic()
base = got.get('day_sec', None)
local = compat.monotonic()
base = _get_day_sec(got)
if base is not None:
break
good = got
assert base is not None
for got in s:
now = base + time.monotonic() - local
yield now % (60 * 60 * 24), good
now = base + compat.monotonic() - local
yield now % (60 * 60 * 24)
if got is not None:
# Update the baseline.
b2 = got.get('day_sec', None)
b2 = _get_day_sec(got)
if b2 is not None:
local = time.monotonic()
local = compat.monotonic()
base = b2
good = got
def test():
for secs, meta in day_sec():
for secs in day_sec():
print(secs)
time.sleep(0.3)
compat.sleep(0.3)
if __name__ == '__main__':
if __name__ == "__main__":
test()

7
src/lib/typing.py Normal file
View file

@ -0,0 +1,7 @@
Generator = None
Optional = None
Tuple = None
Iterable = None

View file

@ -1,12 +1,36 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import machine
import network
import compat
import wordclock
def main():
time.sleep(1)
compat.sleep(1)
wordclock.main()
if __name__ == '__main__':
main()
if __name__ == "__main__":
try:
main()
except Exception as ex:
print(ex)
finally:
print("Resetting")
compat.sleep(10)
machine.reset()

View file

@ -1,11 +0,0 @@
def show_stdout(f: Frame):
out = '\033\143'
stride = 11
dots = '.123456789#'
for y in range(len(f.pixels) // stride):
s = f.pixels[y * stride:(y + 1) * stride]
if y & 1 != 0:
s.reverse()
d = [dots[x * len(dots) // (Frame.Max + 1)] for x in s]
out += ''.join(d) + '\n'
print(out + '\n')

166
src/mqtt.py Normal file
View file

@ -0,0 +1,166 @@
import socket
import asynced
from typing import Generator, Optional, Tuple
_CONNECT = 1
_CONNACK = 2
_PUBLISH = 3
_PUBACK = 4
_PUBREC = 5
_PUBREL = 6
_PUBCOMP = 7
_SUBSCRIBE = 8
_SUBACK = 9
_UNSUBSCRIBE = 10
_UNSUBACK = 11
_PINGREQ = 12
_PINGRESP = 13
_DISCONNECT = 14
_DUP = 8
_QOS = 2
_RETAIN = 1
_UPDATE_SECONDS = 120
class Message:
def __init__(self, topic: str, value: str):
self.topic = topic
self.value = value
topic: str
value: str
def _getch() -> Generator[None, Optional[int], int]:
"""Returns the first non-None value sent to the generator"""
while True:
got = yield None
if got is not None:
return got
def _get_packet() -> Generator[None, Optional[int], Tuple[int, bytearray]]:
"""Decodes and returns the next packet"""
control = yield from _getch()
size = 0
shift = 0
while True:
got = yield from _getch()
size |= (got & 0x7F) << shift
shift += 7
if (got & 0x80) == 0:
break
payload = bytearray(size)
for i in range(len(payload)):
payload[i] = yield from _getch()
return control, payload
def _encode_string(value: str) -> bytearray:
"""Encodes a length-value string"""
encoded = value.encode()
return bytearray((len(encoded) >> 8, len(encoded) & 0xFF)) + encoded
def _write_packet(
sock,
control: int,
payload: bytearray | bytes,
packet_id: Optional[int] = None,
) -> None:
if packet_id is not None:
header = bytearray(
(control, len(payload) + 2, (packet_id >> 8) & 0xFF, packet_id & 0xFF)
)
else:
header = bytearray((control, len(payload)))
print("mqtt: >", header, payload)
sock.write(header + payload)
def _send_connect(sock: socket.socket, username: str, password: str):
flags = 0x02
if username:
flags |= 0x80
if password:
flags |= 0x40
payload = (
_encode_string("MQTT")
+ b"\x04" # Protocol level
+ bytearray((flags,))
+ bytearray((0, _UPDATE_SECONDS * 2)) # Keep alive seconds
+ _encode_string("") # Client identifier
)
if username:
payload += _encode_string(username)
if password:
payload += _encode_string(password)
_write_packet(sock, _CONNECT << 4, payload)
def _send_subscribe(sock: socket.socket, topic: str):
packet_id = hash(topic) & 0xFFFF
_write_packet(
sock,
(_SUBSCRIBE << 4) | 2,
_encode_string(topic) +
# QOS
b"\x00",
packet_id,
)
def _send_publish(sock: socket.socket, topic: str, value: str):
_write_packet(sock, _PUBLISH << 4, _encode_string(topic) + value.encode())
def _sender(sock: socket.socket, path: str) -> Generator[None, None, None]:
_send_subscribe(sock, path + "/#")
while True:
_send_publish(sock, path + "/available", "online")
yield from asynced.delay(_UPDATE_SECONDS)
def _receiver() -> Generator[Optional[Message], Optional[int], None]:
while True:
control, payload = yield from _get_packet()
if (control >> 4) == _PUBLISH:
qos = (control // _QOS) & 3
topic_length = (payload[0] << 8) | payload[1]
topic = payload[2 : 2 + topic_length].decode()
has_packet_id = qos == 1 or qos == 2
packet_id_length = 2 if has_packet_id else 0
value = payload[2 + topic_length + packet_id_length :].decode()
print("mqtt: topic update:", topic, value)
yield Message(topic, value)
def client(
sock: socket.socket,
path: str,
username: str = "",
password: str = "",
) -> Generator[Optional[Message], Optional[int], None]:
_send_connect(sock, username, password)
connack, payload = yield from _get_packet()
if connack != _CONNACK << 4:
raise Exception("Expected CONNACK")
if len(payload) < 2:
raise Exception("CONNACK is too short")
if payload[1] != 0:
raise Exception("CONNACK has an error code")
receiver = _receiver()
sender = _sender(sock, path)
got = yield None
while True:
sender.send(None)
got = yield receiver.send(got)

View file

@ -1,15 +1,40 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import array
import random
import time
import binascii
import sys
import socket
import asynced
import iptime
from neopixel_write import neopixel_write
import board
import machine
import neopixel
import network
import micropython
import asynced
import compat
import iptime
import mqtt
import local_secrets
from typing import Optional, Generator, Iterable, Tuple
if "ESP32" in str(sys.implementation):
LED_BUILTIN = 2
else:
LED_BUILTIN = 16
# fmt: off
RANGES = {
'dude': 0,
'ok': 5,
@ -62,9 +87,11 @@ AMPM = ('AM', 'PM')
ITS = ('its', 'it is', 'its dude', 'it is dude', 'its ok', 'it is ok')
COLOUR = (200, 255, 200)
# fmt: on
def split(secs: int) -> tuple:
def _split(secs: int) -> Tuple[str, str, str]:
"""Converts a seconds-of-day into minutes, hours, and AM/PM"""
secs = int(secs)
mins = (secs // 60) % 60
minute = mins // 5
@ -84,19 +111,17 @@ class Frame:
Max = 255
Zeros = tuple([0] * (11 * 10))
def __init__(self, n: int = 11 * 10, pixels: list = None):
if pixels is None:
pixels = list(self.Zeros)
self.pixels = pixels
def __init__(self, n: int = 11 * 10, pixels: Optional[array.array] = None):
self.pixels = pixels if pixels else list(self.Zeros)
def set(self, idx: int) -> Frame:
self.pixels[idx] = self.Max
def set(self, idx: int, value: int) -> "Frame":
self.pixels[idx] = value
return self
def __add__(self, other):
return Frame(pixels=[
min(self.Max, x + y) for x, y in zip(self.pixels, other.pixels)
])
return Frame(
pixels=[min(self.Max, x + y) for x, y in zip(self.pixels, other.pixels)]
)
def muldiv(self, num: int, den: int):
return Frame(pixels=[x * num // den for x in self.pixels])
@ -104,10 +129,11 @@ class Frame:
@micropython.native
def merge(f1: Frame, f2: Frame, num1: int, num2: int, den: int) -> Frame:
"""Merges two frames together."""
m = Frame.Max
p1 = f1.pixels
p2 = f2.pixels
o = array.array('H', Frame.Zeros)
o = array.array("H", Frame.Zeros)
for i in range(len(p1)):
k = (p1[i] * num1 + p2[i] * num2) // den
if k >= m:
@ -116,108 +142,196 @@ def merge(f1: Frame, f2: Frame, num1: int, num2: int, den: int) -> Frame:
return Frame(pixels=o)
def render(words: list) -> Frame:
def _render(words: Iterable[str], brightness: int) -> Frame:
"""Renders a list of words into a frame"""
f = Frame()
for word in words:
for w in word.split():
idx = RANGES.get(w, None)
if idx is not None:
for i in range(len(w)):
f.set(idx + i)
f.set(idx + i, brightness)
else:
print('warning: %s is missing' % w)
print("warning: %s is missing" % w)
return f
def prefix() -> tuple:
return (ITS[random.randint(0, len(ITS) - 1)], )
def _prefix() -> Tuple[str]:
"""Returns a random suffix word such as 'dude'"""
return (ITS[compat.randint(0, len(ITS) - 1)],)
def show(f: Frame, n):
def _show(f: Frame, n):
"""Renders a frame to a NeoPixel handler"""
p = f.pixels
buf = bytearray(len(p) * 3)
r, g, b = COLOUR
i = 0
for c in p:
buf[i + 1] = (r * c) >> 8
buf[i + 0] = (g * c) >> 8
buf[i + 2] = (b * c) >> 8
n.buf[i + 1] = (r * c) >> 8
n.buf[i + 0] = (g * c) >> 8
n.buf[i + 2] = (b * c) >> 8
i += 3
neopixel_write(n.pin, buf)
n.write()
def scan() -> generator:
def _scan() -> Generator[Frame, None, None]:
"""Returns a stream of frames that scan across the LEDs"""
f = Frame()
while True:
for i in range(len(f.pixels)):
yield f.set(i)
yield f.set(i, Frame.Max)
f = f.muldiv(80, 100)
def intensity(num: int, den: int) -> int:
def _intensity(num: int, den: int) -> int:
idx = num * len(CIEL8) // den
if idx >= len(CIEL8):
return 255
return CIEL8[idx]
def run() -> generator:
def _crossfade(src: Frame, dest: Frame, fade_time: int):
for at in asynced.wait(fade_time):
yield merge(
src,
dest,
_intensity(fade_time - at, fade_time),
_intensity(at, fade_time),
Frame.Max,
)
def _run(brightness_queue: list[int]) -> Generator[Optional[Frame], None, None]:
words = None
fade = 2000
fade_time = 2000
secs = iptime.day_sec()
brightness = 255
# Flash until time is synced.
sc = scan()
for s, meta in secs:
if s is not None and meta is not None:
sc = _scan()
for s in secs:
if s is not None:
break
yield next(sc)
src = Frame()
while True:
yield src
s, m2 = next(secs)
nxt = split(s)
if nxt == words:
yield from asynced.delay(1)
continue
dest = render(prefix() + nxt)
for at in asynced.wait(fade):
yield merge(src, dest, intensity(fade - at, fade),
intensity(at, fade), 255)
words = nxt
src = dest
changed = False
if brightness_queue:
brightness = brightness_queue[-1]
brightness_queue.clear()
changed = True
now = next(secs)
next_words = _split(now)
if words != next_words:
words = next_words
changed = True
if changed:
next_frame = _render(_prefix() + next_words, brightness)
yield from _crossfade(src, next_frame, fade_time)
src = next_frame
else:
yield from asynced.delay(2)
def blink() -> generator:
led = machine.Pin(16, machine.Pin.OUT)
def blink() -> Generator[None, None, None]:
led = machine.Pin(LED_BUILTIN, machine.Pin.OUT)
while True:
led.off()
yield from asynced.delay(0.1)
led.on()
yield from asynced.delay(0.8)
yield from asynced.delay(0.03)
led.off()
yield from asynced.delay(2.9)
def bench() -> generator:
def bench() -> Generator[None, None, None]:
yield None
i = 30
while True:
start = time.monotonic()
yield from range(i)
elapsed = time.monotonic() - start
print('%d in %.3f %.3f/1 %.1f FPS' % (i, elapsed, elapsed / i,
i / elapsed))
start = compat.monotonic()
for _ in range(i):
yield None
elapsed = compat.monotonic() - start
if elapsed > 0:
print("%d in %.3f %.3f/1 %.1f FPS" % (i, elapsed, elapsed / i, i / elapsed))
def _connect(wlan: network.WLAN) -> Generator[None, None, None]:
yield None
while True:
if not wlan.isconnected():
wlan.active(True)
wlan.connect(local_secrets.WLAN_ESSID, local_secrets.WLAN_PASSWORD)
else:
# Override the DNS server as MicroPython and dnsmasq seem
# to be incompatible.
cfg = list(wlan.ifconfig())
wlan.ifconfig(cfg[:3] + ["8.8.8.8"])
yield from asynced.delay(10)
def _mqtt_client(
is_connected, brightness_queue: list[int]
) -> Generator[None, None, None]:
while True:
yield from asynced.delay(3)
if not is_connected():
continue
sock = socket.socket()
try:
target = socket.getaddrinfo(local_secrets.MQTT_HOST, 1883)[0][-1]
print("mqtt: connecting to", target)
sock.connect(target)
sock.setblocking(False)
topic = "light/wc-" + binascii.hexlify(machine.unique_id()).decode()
client = mqtt.client(
sock, topic, local_secrets.MQTT_USERNAME, local_secrets.MQTT_PASSWORD
)
while True:
got = sock.read()
if not got:
client.send(None)
yield None
continue
for ch in got:
message = client.send(ch)
if not message:
continue
client.send(None)
if message.topic.endswith("/brightness"):
brightness_queue.append(int(message.value))
except Exception as ex:
print("mqtt: exception:", ex)
finally:
print("mqtt: close")
sock.close()
def main():
num = 11 * 10
n = neopixel.NeoPixel(board.GPIO2, num, brightness=1, auto_write=False)
routines = (run(), blink(), asynced.fps(30), bench())
n = neopixel.NeoPixel(machine.Pin(2), num)
wlan = network.WLAN(network.STA_IF)
brightness_queue = []
routines = (
_connect(wlan),
_mqtt_client(wlan.isconnected, brightness_queue),
_run(brightness_queue),
blink(),
asynced.fps(30),
bench(),
)
while True:
for r in routines:
f = next(r)
if f is None:
continue
elif isinstance(f, Frame):
show(f, n)
_show(f, n)

View file

@ -1,54 +0,0 @@
import datetime
import sys
import time
from geoip import geolite2
import astral
import dateutil.tz
import flask
import requests
app = flask.Flask(__name__)
def to_sec(dt):
return dt.hour * 60 * 60 + dt.minute * 60 + dt.second
@app.route('/now')
def now():
ip = flask.request.args.get('ip')
if not ip:
ip = flask.request.remote_addr
if ip.startswith('192.168.') or ip.startswith('10.'):
r = requests.get('https://api.ipify.org?format=json')
ip = r.json().get('ip')
loc = geolite2.lookup(ip)
if loc is None:
flask.abort(
400,
'LOOKUP_FAILURE: unable to resolve the location of IP address %r' %
ip)
tz = dateutil.tz.gettz(loc.timezone)
epoch = time.time()
now = datetime.datetime.now(tz)
l = astral.Location((loc.ip, loc.country, loc.location[0], loc.location[1],
loc.timezone, 0))
s = l.sun()
dawn = s['dawn']
dusk = s['dusk']
resp = dict(
ip=loc.ip,
country=loc.country,
latitude=loc.location[0],
longitude=loc.location[1],
timezone=loc.timezone,
dawn=dawn,
dawn_sec=to_sec(dawn),
dusk=dusk,
dusk_sec=to_sec(dusk),
local_time=now,
day_sec=to_sec(now),
posix_sec=int(epoch))
return flask.jsonify(resp)

1
third_party/micropython vendored Submodule

@ -0,0 +1 @@
Subproject commit 678707c8b07323c5b914778708a2858387c3b60c