Compare commits

..

13 commits
master ... v0.0

21 changed files with 174 additions and 675 deletions

1
.gitignore vendored
View file

@ -1,2 +1 @@
*.stamp *.stamp
local_secrets.py

3
.gitmodules vendored
View file

@ -1,3 +0,0 @@
[submodule "third_party/micropython"]
path = third_party/micropython
url = https://github.com/micropython/micropython.git

14
gen.py
View file

@ -1,17 +1,3 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generate the word to offset mapping for the clock."""
import pprint import pprint
import itertools import itertools

View file

@ -1,17 +1,3 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/* ITLISASAMPM */ /* ITLISASAMPM */
/* ACQUARTERDC */ /* ACQUARTERDC */
/* TWENTYFIVEX */ /* TWENTYFIVEX */

View file

View file

@ -1,75 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import functools
import time
from geoip import geolite2
import astral
import astral.sun
import dateutil.tz
import flask
import requests
import prometheus_flask_exporter # type: ignore
app = flask.Flask(__name__)
_ = prometheus_flask_exporter.PrometheusMetrics(app)
def _to_sec(dt):
return dt.hour * 60 * 60 + dt.minute * 60 + dt.second
@functools.lru_cache(maxsize=5)
def _external_ip():
r = requests.get('https://api.ipify.org?format=json')
return r.json().get('ip')
@app.route('/now')
def now():
ip = flask.request.args.get('ip')
if not ip:
ip = flask.request.remote_addr
if ip.startswith('192.168.') or ip.startswith('10.') or ip.startswith(
'127.0.'):
ip = _external_ip()
loc = geolite2.lookup(ip)
if loc is None:
flask.abort(
400,
'LOOKUP_FAILURE: unable to resolve the location of IP address %r' %
ip)
tz = dateutil.tz.gettz(loc.timezone)
epoch = time.time()
now = datetime.datetime.now(tz)
city = astral.LocationInfo((loc.ip, loc.country, loc.location[0],
loc.location[1], loc.timezone, 0))
sun = astral.sun.sun(city.observer, date=now)
dawn = sun['dawn']
dusk = sun['dusk']
resp = dict(ip=loc.ip,
country=loc.country,
latitude=loc.location[0],
longitude=loc.location[1],
timezone=loc.timezone,
dawn=dawn,
dawn_sec=_to_sec(dawn),
dusk=dusk,
dusk_sec=_to_sec(dusk),
local_time=now,
day_sec=_to_sec(now),
posix_sec=int(epoch))
return flask.jsonify(resp)

View file

@ -1,18 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import iptime.iptime
def test_parse():
_ = iptime.iptime

View file

@ -1,34 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from setuptools import setup, find_packages
setup(
name='iptime',
version_format='{tag}.dev{commitcount}+{gitsha}',
setup_requires=['setuptools-git-version'],
url='https://juju.nz/src/michaelh/wordclock',
author='Michael Hope',
author_email='michaelh@juju.nz',
description='Word clock services',
packages=find_packages(),
install_requires=[
'Flask>=1.1.1',
'prometheus-flask-exporter>=0.12.1',
'astral>=1.7.1',
'python-dateutil>=2.7.5',
'python-geoip-geolite2-yplan',
'python-geoip-python3>=1.3',
'requests>=2.21',
],
)

View file

@ -1,35 +1,8 @@
# Copyright 2020 Google LLC SRC = $(wildcard *.py)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SRC = $(wildcard *.py) $(wildcard lib/*.py)
MPY_CROSS = ../third_party/micropython/mpy-cross/build/mpy-cross put: $(SRC:%.py=put-%.stamp)
put: $(SRC:%.py=build/put-%.stamp)
ampy -p /dev/ttyUSB? ls ampy -p /dev/ttyUSB? ls
build/put-%.stamp: %.py put-%.stamp: %.py
mkdir -p $(@D) ampy -p /dev/ttyUSB? put $*.py
ampy -p /dev/ttyUSB? put $< $(<F) cp -a $< $@
touch -r $< $@
build/%.mpy: %.py $(MPY_CROSS)
mkdir -p $(@D)
$(MPY_CROSS) -O1 -march=xtensa -o $@ $<
$(MPY_CROSS):
$(MAKE) -C $(dir $(@D))
clean:
rm -rf build

View file

@ -1,37 +1,20 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time import time
import compat
from typing import Generator def delay(secs: float) -> generator:
end = time.monotonic() + secs
while time.monotonic() < end:
def delay(secs: float) -> Generator[None, None, None]:
end = compat.monotonic() + secs
while compat.monotonic() < end:
yield None yield None
def fps(fps: float) -> Generator[None, None, None]: def fps(fps: float) -> generator:
yield None yield None
dt = 1 / fps dt = 1 / fps
until = compat.monotonic() + dt until = time.monotonic() + dt
while True: while True:
remain = until - compat.monotonic() remain = until - time.monotonic()
if remain > 0: if remain > 0:
compat.sleep(remain) time.sleep(remain)
until += dt until += dt
if remain < -dt: if remain < -dt:
# Catch up a bit # Catch up a bit
@ -39,12 +22,16 @@ def fps(fps: float) -> Generator[None, None, None]:
yield None yield None
def wait(stop_ms: int) -> Generator[int, None, None]: def wait(stop: int) -> generator:
"""Waits for stop_ms to pass, yielding how many ms have passed so far.""" start = time.monotonic()
start = compat.monotonic()
while True: while True:
elapsed = int((compat.monotonic() - start) * 1000) elapsed = int((time.monotonic() - start) * 1000)
elapsed = min(stop_ms, elapsed) elapsed = min(stop, elapsed)
yield elapsed yield elapsed
if elapsed >= stop_ms: if elapsed >= stop:
break break
def test():
for f in delay(2.0):
print(utime.ticks_ms())

View file

@ -1,29 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import time
import utime
def monotonic() -> float:
return utime.ticks_ms() * 1e-3
def sleep(secs: float) -> None:
utime.sleep_ms(int(secs * 1e3))
def randint(start: int, limit: int) -> int:
return start + random.getrandbits(30) % (limit - start + 1)

View file

@ -1,42 +1,27 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random import random
import time import urllib.urequest
from typing import Generator, Optional
import urequests
import asynced import asynced
import compat import ujson
import time
def _fetch() -> Optional[dict]: def _fetch():
try: try:
return urequests.get("http://worldtimeapi.org/api/ip").json() resp = urllib.urequest.urlopen('https://juju.nz/api/iptime/now')
t = ujson.load(resp)
resp.close()
return t
except OSError as ex: except OSError as ex:
print("OSError", ex) print(ex)
return None
except ValueError as ex:
print("ValueError", ex)
return None return None
def _jitter(mid: int) -> int: def _jitter(mid: int) -> int:
return compat.randint(mid, mid * 120 // 100) return random.randint(mid, mid * 120 // 100)
def _sync() -> Generator[Optional[dict], None, None]: def _sync() -> generator:
while True: while True:
# Poll quickly until the first result comes in. # Poll quickly until the first result comes in.
while True: while True:
@ -44,7 +29,7 @@ def _sync() -> Generator[Optional[dict], None, None]:
if got is not None: if got is not None:
yield got yield got
break break
yield from asynced.delay(_jitter(5)) yield from asynced.delay(_jitter(15))
# Poll slowly until the connection drops. # Poll slowly until the connection drops.
while True: while True:
@ -55,48 +40,37 @@ def _sync() -> Generator[Optional[dict], None, None]:
yield got yield got
def _get_day_sec(resp): def day_sec() -> generator:
parts = resp.get("datetime", "").split("T")
if len(parts) != 2:
return None
hms = parts[1].split("+")[0].split(":")
if len(hms) != 3:
return None
return float(hms[0]) * 3600 + float(hms[1]) * 60 + float(hms[2])
def day_sec() -> Generator[Optional[float], None, None]:
s = _sync() s = _sync()
# Spin until the first result comes in. # Spin until the first result comes in.
for got in s: for got in s:
if got is None: if got is None:
yield None yield None, None
continue continue
local = compat.monotonic() local = time.monotonic()
base = _get_day_sec(got) base = got.get('day_sec', None)
if base is not None: if base is not None:
break break
good = got good = got
assert base is not None
for got in s: for got in s:
now = base + compat.monotonic() - local now = base + time.monotonic() - local
yield now % (60 * 60 * 24) yield now % (60 * 60 * 24), good
if got is not None: if got is not None:
# Update the baseline. # Update the baseline.
b2 = _get_day_sec(got) b2 = got.get('day_sec', None)
if b2 is not None: if b2 is not None:
local = compat.monotonic() local = time.monotonic()
base = b2 base = b2
good = got good = got
def test(): def test():
for secs in day_sec(): for secs, meta in day_sec():
print(secs) print(secs)
compat.sleep(0.3) time.sleep(0.3)
if __name__ == "__main__": if __name__ == '__main__':
test() test()

View file

@ -1,7 +0,0 @@
Generator = None
Optional = None
Tuple = None
Iterable = None

View file

@ -1,36 +1,12 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time import time
import machine
import network
import compat
import wordclock import wordclock
def main(): def main():
compat.sleep(1) time.sleep(1)
wordclock.main() wordclock.main()
if __name__ == "__main__": if __name__ == '__main__':
try: main()
main()
except Exception as ex:
print(ex)
finally:
print("Resetting")
compat.sleep(10)
machine.reset()

11
src/misc.py Normal file
View file

@ -0,0 +1,11 @@
def show_stdout(f: Frame):
out = '\033\143'
stride = 11
dots = '.123456789#'
for y in range(len(f.pixels) // stride):
s = f.pixels[y * stride:(y + 1) * stride]
if y & 1 != 0:
s.reverse()
d = [dots[x * len(dots) // (Frame.Max + 1)] for x in s]
out += ''.join(d) + '\n'
print(out + '\n')

View file

@ -1,166 +0,0 @@
import socket
import asynced
from typing import Generator, Optional, Tuple
_CONNECT = 1
_CONNACK = 2
_PUBLISH = 3
_PUBACK = 4
_PUBREC = 5
_PUBREL = 6
_PUBCOMP = 7
_SUBSCRIBE = 8
_SUBACK = 9
_UNSUBSCRIBE = 10
_UNSUBACK = 11
_PINGREQ = 12
_PINGRESP = 13
_DISCONNECT = 14
_DUP = 8
_QOS = 2
_RETAIN = 1
_UPDATE_SECONDS = 120
class Message:
def __init__(self, topic: str, value: str):
self.topic = topic
self.value = value
topic: str
value: str
def _getch() -> Generator[None, Optional[int], int]:
"""Returns the first non-None value sent to the generator"""
while True:
got = yield None
if got is not None:
return got
def _get_packet() -> Generator[None, Optional[int], Tuple[int, bytearray]]:
"""Decodes and returns the next packet"""
control = yield from _getch()
size = 0
shift = 0
while True:
got = yield from _getch()
size |= (got & 0x7F) << shift
shift += 7
if (got & 0x80) == 0:
break
payload = bytearray(size)
for i in range(len(payload)):
payload[i] = yield from _getch()
return control, payload
def _encode_string(value: str) -> bytearray:
"""Encodes a length-value string"""
encoded = value.encode()
return bytearray((len(encoded) >> 8, len(encoded) & 0xFF)) + encoded
def _write_packet(
sock,
control: int,
payload: bytearray | bytes,
packet_id: Optional[int] = None,
) -> None:
if packet_id is not None:
header = bytearray(
(control, len(payload) + 2, (packet_id >> 8) & 0xFF, packet_id & 0xFF)
)
else:
header = bytearray((control, len(payload)))
print("mqtt: >", header, payload)
sock.write(header + payload)
def _send_connect(sock: socket.socket, username: str, password: str):
flags = 0x02
if username:
flags |= 0x80
if password:
flags |= 0x40
payload = (
_encode_string("MQTT")
+ b"\x04" # Protocol level
+ bytearray((flags,))
+ bytearray((0, _UPDATE_SECONDS * 2)) # Keep alive seconds
+ _encode_string("") # Client identifier
)
if username:
payload += _encode_string(username)
if password:
payload += _encode_string(password)
_write_packet(sock, _CONNECT << 4, payload)
def _send_subscribe(sock: socket.socket, topic: str):
packet_id = hash(topic) & 0xFFFF
_write_packet(
sock,
(_SUBSCRIBE << 4) | 2,
_encode_string(topic) +
# QOS
b"\x00",
packet_id,
)
def _send_publish(sock: socket.socket, topic: str, value: str):
_write_packet(sock, _PUBLISH << 4, _encode_string(topic) + value.encode())
def _sender(sock: socket.socket, path: str) -> Generator[None, None, None]:
_send_subscribe(sock, path + "/#")
while True:
_send_publish(sock, path + "/available", "online")
yield from asynced.delay(_UPDATE_SECONDS)
def _receiver() -> Generator[Optional[Message], Optional[int], None]:
while True:
control, payload = yield from _get_packet()
if (control >> 4) == _PUBLISH:
qos = (control // _QOS) & 3
topic_length = (payload[0] << 8) | payload[1]
topic = payload[2 : 2 + topic_length].decode()
has_packet_id = qos == 1 or qos == 2
packet_id_length = 2 if has_packet_id else 0
value = payload[2 + topic_length + packet_id_length :].decode()
print("mqtt: topic update:", topic, value)
yield Message(topic, value)
def client(
sock: socket.socket,
path: str,
username: str = "",
password: str = "",
) -> Generator[Optional[Message], Optional[int], None]:
_send_connect(sock, username, password)
connack, payload = yield from _get_packet()
if connack != _CONNACK << 4:
raise Exception("Expected CONNACK")
if len(payload) < 2:
raise Exception("CONNACK is too short")
if payload[1] != 0:
raise Exception("CONNACK has an error code")
receiver = _receiver()
sender = _sender(sock, path)
got = yield None
while True:
sender.send(None)
got = yield receiver.send(got)

View file

@ -1,40 +1,15 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import array import array
import binascii import random
import sys import time
import socket
import machine
import neopixel
import network
import micropython
import asynced import asynced
import compat
import iptime import iptime
import mqtt
import local_secrets
from typing import Optional, Generator, Iterable, Tuple from neopixel_write import neopixel_write
import board
import machine
import neopixel
if "ESP32" in str(sys.implementation):
LED_BUILTIN = 2
else:
LED_BUILTIN = 16
# fmt: off
RANGES = { RANGES = {
'dude': 0, 'dude': 0,
'ok': 5, 'ok': 5,
@ -87,11 +62,9 @@ AMPM = ('AM', 'PM')
ITS = ('its', 'it is', 'its dude', 'it is dude', 'its ok', 'it is ok') ITS = ('its', 'it is', 'its dude', 'it is dude', 'its ok', 'it is ok')
COLOUR = (200, 255, 200) COLOUR = (200, 255, 200)
# fmt: on
def _split(secs: int) -> Tuple[str, str, str]: def split(secs: int) -> tuple:
"""Converts a seconds-of-day into minutes, hours, and AM/PM"""
secs = int(secs) secs = int(secs)
mins = (secs // 60) % 60 mins = (secs // 60) % 60
minute = mins // 5 minute = mins // 5
@ -111,17 +84,19 @@ class Frame:
Max = 255 Max = 255
Zeros = tuple([0] * (11 * 10)) Zeros = tuple([0] * (11 * 10))
def __init__(self, n: int = 11 * 10, pixels: Optional[array.array] = None): def __init__(self, n: int = 11 * 10, pixels: list = None):
self.pixels = pixels if pixels else list(self.Zeros) if pixels is None:
pixels = list(self.Zeros)
self.pixels = pixels
def set(self, idx: int, value: int) -> "Frame": def set(self, idx: int) -> Frame:
self.pixels[idx] = value self.pixels[idx] = self.Max
return self return self
def __add__(self, other): def __add__(self, other):
return Frame( return Frame(pixels=[
pixels=[min(self.Max, x + y) for x, y in zip(self.pixels, other.pixels)] min(self.Max, x + y) for x, y in zip(self.pixels, other.pixels)
) ])
def muldiv(self, num: int, den: int): def muldiv(self, num: int, den: int):
return Frame(pixels=[x * num // den for x in self.pixels]) return Frame(pixels=[x * num // den for x in self.pixels])
@ -129,11 +104,10 @@ class Frame:
@micropython.native @micropython.native
def merge(f1: Frame, f2: Frame, num1: int, num2: int, den: int) -> Frame: def merge(f1: Frame, f2: Frame, num1: int, num2: int, den: int) -> Frame:
"""Merges two frames together."""
m = Frame.Max m = Frame.Max
p1 = f1.pixels p1 = f1.pixels
p2 = f2.pixels p2 = f2.pixels
o = array.array("H", Frame.Zeros) o = array.array('H', Frame.Zeros)
for i in range(len(p1)): for i in range(len(p1)):
k = (p1[i] * num1 + p2[i] * num2) // den k = (p1[i] * num1 + p2[i] * num2) // den
if k >= m: if k >= m:
@ -142,196 +116,108 @@ def merge(f1: Frame, f2: Frame, num1: int, num2: int, den: int) -> Frame:
return Frame(pixels=o) return Frame(pixels=o)
def _render(words: Iterable[str], brightness: int) -> Frame: def render(words: list) -> Frame:
"""Renders a list of words into a frame"""
f = Frame() f = Frame()
for word in words: for word in words:
for w in word.split(): for w in word.split():
idx = RANGES.get(w, None) idx = RANGES.get(w, None)
if idx is not None: if idx is not None:
for i in range(len(w)): for i in range(len(w)):
f.set(idx + i, brightness) f.set(idx + i)
else: else:
print("warning: %s is missing" % w) print('warning: %s is missing' % w)
return f return f
def _prefix() -> Tuple[str]: def prefix() -> tuple:
"""Returns a random suffix word such as 'dude'""" return (ITS[random.randint(0, len(ITS) - 1)], )
return (ITS[compat.randint(0, len(ITS) - 1)],)
def _show(f: Frame, n): def show(f: Frame, n):
"""Renders a frame to a NeoPixel handler"""
p = f.pixels p = f.pixels
buf = bytearray(len(p) * 3) buf = bytearray(len(p) * 3)
r, g, b = COLOUR r, g, b = COLOUR
i = 0 i = 0
for c in p: for c in p:
n.buf[i + 1] = (r * c) >> 8 buf[i + 1] = (r * c) >> 8
n.buf[i + 0] = (g * c) >> 8 buf[i + 0] = (g * c) >> 8
n.buf[i + 2] = (b * c) >> 8 buf[i + 2] = (b * c) >> 8
i += 3 i += 3
n.write() neopixel_write(n.pin, buf)
def _scan() -> Generator[Frame, None, None]: def scan() -> generator:
"""Returns a stream of frames that scan across the LEDs"""
f = Frame() f = Frame()
while True: while True:
for i in range(len(f.pixels)): for i in range(len(f.pixels)):
yield f.set(i, Frame.Max) yield f.set(i)
f = f.muldiv(80, 100) f = f.muldiv(80, 100)
def _intensity(num: int, den: int) -> int: def intensity(num: int, den: int) -> int:
idx = num * len(CIEL8) // den idx = num * len(CIEL8) // den
if idx >= len(CIEL8): if idx >= len(CIEL8):
return 255 return 255
return CIEL8[idx] return CIEL8[idx]
def _crossfade(src: Frame, dest: Frame, fade_time: int): def run() -> generator:
for at in asynced.wait(fade_time):
yield merge(
src,
dest,
_intensity(fade_time - at, fade_time),
_intensity(at, fade_time),
Frame.Max,
)
def _run(brightness_queue: list[int]) -> Generator[Optional[Frame], None, None]:
words = None words = None
fade_time = 2000 fade = 2000
secs = iptime.day_sec() secs = iptime.day_sec()
brightness = 255
# Flash until time is synced. # Flash until time is synced.
sc = _scan() sc = scan()
for s in secs: for s, meta in secs:
if s is not None: if s is not None and meta is not None:
break break
yield next(sc) yield next(sc)
src = Frame() src = Frame()
while True: while True:
changed = False yield src
if brightness_queue: s, m2 = next(secs)
brightness = brightness_queue[-1] nxt = split(s)
brightness_queue.clear() if nxt == words:
changed = True yield from asynced.delay(1)
continue
now = next(secs) dest = render(prefix() + nxt)
next_words = _split(now) for at in asynced.wait(fade):
if words != next_words: yield merge(src, dest, intensity(fade - at, fade),
words = next_words intensity(at, fade), 255)
changed = True words = nxt
src = dest
if changed:
next_frame = _render(_prefix() + next_words, brightness)
yield from _crossfade(src, next_frame, fade_time)
src = next_frame
else:
yield from asynced.delay(2)
def blink() -> Generator[None, None, None]: def blink() -> generator:
led = machine.Pin(LED_BUILTIN, machine.Pin.OUT) led = machine.Pin(16, machine.Pin.OUT)
while True: while True:
led.on()
yield from asynced.delay(0.03)
led.off() led.off()
yield from asynced.delay(2.9) yield from asynced.delay(0.1)
led.on()
yield from asynced.delay(0.8)
def bench() -> Generator[None, None, None]: def bench() -> generator:
yield None yield None
i = 30 i = 30
while True: while True:
start = compat.monotonic() start = time.monotonic()
for _ in range(i): yield from range(i)
yield None elapsed = time.monotonic() - start
elapsed = compat.monotonic() - start print('%d in %.3f %.3f/1 %.1f FPS' % (i, elapsed, elapsed / i,
if elapsed > 0: i / elapsed))
print("%d in %.3f %.3f/1 %.1f FPS" % (i, elapsed, elapsed / i, i / elapsed))
def _connect(wlan: network.WLAN) -> Generator[None, None, None]:
yield None
while True:
if not wlan.isconnected():
wlan.active(True)
wlan.connect(local_secrets.WLAN_ESSID, local_secrets.WLAN_PASSWORD)
else:
# Override the DNS server as MicroPython and dnsmasq seem
# to be incompatible.
cfg = list(wlan.ifconfig())
wlan.ifconfig(cfg[:3] + ["8.8.8.8"])
yield from asynced.delay(10)
def _mqtt_client(
is_connected, brightness_queue: list[int]
) -> Generator[None, None, None]:
while True:
yield from asynced.delay(3)
if not is_connected():
continue
sock = socket.socket()
try:
target = socket.getaddrinfo(local_secrets.MQTT_HOST, 1883)[0][-1]
print("mqtt: connecting to", target)
sock.connect(target)
sock.setblocking(False)
topic = "light/wc-" + binascii.hexlify(machine.unique_id()).decode()
client = mqtt.client(
sock, topic, local_secrets.MQTT_USERNAME, local_secrets.MQTT_PASSWORD
)
while True:
got = sock.read()
if not got:
client.send(None)
yield None
continue
for ch in got:
message = client.send(ch)
if not message:
continue
client.send(None)
if message.topic.endswith("/brightness"):
brightness_queue.append(int(message.value))
except Exception as ex:
print("mqtt: exception:", ex)
finally:
print("mqtt: close")
sock.close()
def main(): def main():
num = 11 * 10 num = 11 * 10
n = neopixel.NeoPixel(machine.Pin(2), num) n = neopixel.NeoPixel(board.GPIO2, num, brightness=1, auto_write=False)
wlan = network.WLAN(network.STA_IF) routines = (run(), blink(), asynced.fps(30), bench())
brightness_queue = []
routines = (
_connect(wlan),
_mqtt_client(wlan.isconnected, brightness_queue),
_run(brightness_queue),
blink(),
asynced.fps(30),
bench(),
)
while True: while True:
for r in routines: for r in routines:
f = next(r) f = next(r)
if f is None: if f is None:
continue continue
elif isinstance(f, Frame): elif isinstance(f, Frame):
_show(f, n) show(f, n)

54
srv/iptime.py Normal file
View file

@ -0,0 +1,54 @@
import datetime
import sys
import time
from geoip import geolite2
import astral
import dateutil.tz
import flask
import requests
app = flask.Flask(__name__)
def to_sec(dt):
return dt.hour * 60 * 60 + dt.minute * 60 + dt.second
@app.route('/now')
def now():
ip = flask.request.args.get('ip')
if not ip:
ip = flask.request.remote_addr
if ip.startswith('192.168.') or ip.startswith('10.'):
r = requests.get('https://api.ipify.org?format=json')
ip = r.json().get('ip')
loc = geolite2.lookup(ip)
if loc is None:
flask.abort(
400,
'LOOKUP_FAILURE: unable to resolve the location of IP address %r' %
ip)
tz = dateutil.tz.gettz(loc.timezone)
epoch = time.time()
now = datetime.datetime.now(tz)
l = astral.Location((loc.ip, loc.country, loc.location[0], loc.location[1],
loc.timezone, 0))
s = l.sun()
dawn = s['dawn']
dusk = s['dusk']
resp = dict(
ip=loc.ip,
country=loc.country,
latitude=loc.location[0],
longitude=loc.location[1],
timezone=loc.timezone,
dawn=dawn,
dawn_sec=to_sec(dawn),
dusk=dusk,
dusk_sec=to_sec(dusk),
local_time=now,
day_sec=to_sec(now),
posix_sec=int(epoch))
return flask.jsonify(resp)

@ -1 +0,0 @@
Subproject commit 678707c8b07323c5b914778708a2858387c3b60c