# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import random import time from typing import Generator, Optional import urequests import asynced import compat def _fetch() -> Optional[dict]: try: return urequests.get("http://worldtimeapi.org/api/ip").json() except OSError as ex: print("OSError", ex) return None except ValueError as ex: print("ValueError", ex) return None def _jitter(mid: int) -> int: return compat.randint(mid, mid * 120 // 100) def _sync() -> Generator[Optional[dict], None, None]: while True: # Poll quickly until the first result comes in. while True: got = _fetch() if got is not None: yield got break yield from asynced.delay(_jitter(5)) # Poll slowly until the connection drops. while True: yield from asynced.delay(_jitter(60 * 60)) got = _fetch() if got is None: break yield got def _get_day_sec(resp): parts = resp.get("datetime", "").split("T") if len(parts) != 2: return None hms = parts[1].split("+")[0].split(":") if len(hms) != 3: return None return float(hms[0]) * 3600 + float(hms[1]) * 60 + float(hms[2]) def day_sec() -> Generator[Optional[float], None, None]: s = _sync() # Spin until the first result comes in. for got in s: if got is None: yield None continue local = compat.monotonic() base = _get_day_sec(got) if base is not None: break good = got assert base is not None for got in s: now = base + compat.monotonic() - local yield now % (60 * 60 * 24) if got is not None: # Update the baseline. b2 = _get_day_sec(got) if b2 is not None: local = compat.monotonic() base = b2 good = got def test(): for secs in day_sec(): print(secs) compat.sleep(0.3) if __name__ == "__main__": test()