# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import random import urllib.urequest import asynced import ujson import time def _fetch(): try: resp = urllib.urequest.urlopen('https://juju.nz/api/iptime/now') t = ujson.load(resp) resp.close() return t except OSError as ex: print(ex) return None except ValueError as ex: print(ex) return None def _jitter(mid: int) -> int: return random.randint(mid, mid * 120 // 100) def _sync() -> generator: while True: # Poll quickly until the first result comes in. while True: got = _fetch() if got is not None: yield got break yield from asynced.delay(_jitter(15)) # Poll slowly until the connection drops. while True: yield from asynced.delay(_jitter(60 * 60)) got = _fetch() if got is None: break yield got def day_sec() -> generator: s = _sync() # Spin until the first result comes in. for got in s: if got is None: yield None, None continue local = time.monotonic() base = got.get('day_sec', None) if base is not None: break good = got for got in s: now = base + time.monotonic() - local yield now % (60 * 60 * 24), good if got is not None: # Update the baseline. b2 = got.get('day_sec', None) if b2 is not None: local = time.monotonic() base = b2 good = got def test(): for secs, meta in day_sec(): print(secs) time.sleep(0.3) if __name__ == '__main__': test()