wordclock/src/iptime.py

77 lines
1.6 KiB
Python

import random
import urllib.urequest
import asynced
import ujson
import time
def _fetch():
try:
resp = urllib.urequest.urlopen('https://juju.nz/api/iptime/now')
t = ujson.load(resp)
resp.close()
return t
except OSError as ex:
print(ex)
return None
def _jitter(mid: int) -> int:
return random.randint(mid, mid * 120 // 100)
def _sync() -> generator:
while True:
# Poll quickly until the first result comes in.
while True:
got = _fetch()
if got is not None:
yield got
break
yield from asynced.delay(_jitter(15))
# Poll slowly until the connection drops.
while True:
yield from asynced.delay(_jitter(60 * 60))
got = _fetch()
if got is None:
break
yield got
def day_sec() -> generator:
s = _sync()
# Spin until the first result comes in.
for got in s:
if got is None:
yield None, None
continue
local = time.monotonic()
base = got.get('day_sec', None)
if base is not None:
break
good = got
for got in s:
now = base + time.monotonic() - local
yield now % (60 * 60 * 24), good
if got is not None:
# Update the baseline.
b2 = got.get('day_sec', None)
if b2 is not None:
local = time.monotonic()
base = b2
good = got
def test():
for secs, meta in day_sec():
print(secs)
time.sleep(0.3)
if __name__ == '__main__':
test()