You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

93 lines
2.3 KiB

# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import urllib.urequest
import asynced
import ujson
import time
def _fetch():
try:
resp = urllib.urequest.urlopen('https://juju.nz/api/iptime/now')
t = ujson.load(resp)
resp.close()
return t
except OSError as ex:
print(ex)
return None
except ValueError as ex:
print(ex)
return None
def _jitter(mid: int) -> int:
return random.randint(mid, mid * 120 // 100)
def _sync() -> generator:
while True:
# Poll quickly until the first result comes in.
while True:
got = _fetch()
if got is not None:
yield got
break
yield from asynced.delay(_jitter(15))
# Poll slowly until the connection drops.
while True:
yield from asynced.delay(_jitter(60 * 60))
got = _fetch()
if got is None:
break
yield got
def day_sec() -> generator:
s = _sync()
# Spin until the first result comes in.
for got in s:
if got is None:
yield None, None
continue
local = time.monotonic()
base = got.get('day_sec', None)
if base is not None:
break
good = got
for got in s:
now = base + time.monotonic() - local
yield now % (60 * 60 * 24), good
if got is not None:
# Update the baseline.
b2 = got.get('day_sec', None)
if b2 is not None:
local = time.monotonic()
base = b2
good = got
def test():
for secs, meta in day_sec():
print(secs)
time.sleep(0.3)
if __name__ == '__main__':
test()