import socket import asynced from typing import Generator, Optional, Tuple _CONNECT = 1 _CONNACK = 2 _PUBLISH = 3 _PUBACK = 4 _PUBREC = 5 _PUBREL = 6 _PUBCOMP = 7 _SUBSCRIBE = 8 _SUBACK = 9 _UNSUBSCRIBE = 10 _UNSUBACK = 11 _PINGREQ = 12 _PINGRESP = 13 _DISCONNECT = 14 _DUP = 8 _QOS = 2 _RETAIN = 1 _UPDATE_SECONDS = 120 class Message: def __init__(self, topic: str, value: str): self.topic = topic self.value = value topic: str value: str def _getch() -> Generator[None, Optional[int], int]: """Returns the first non-None value sent to the generator""" while True: got = yield None if got is not None: return got def _get_packet() -> Generator[None, Optional[int], Tuple[int, bytearray]]: """Decodes and returns the next packet""" control = yield from _getch() size = 0 shift = 0 while True: got = yield from _getch() size |= (got & 0x7F) << shift shift += 7 if (got & 0x80) == 0: break payload = bytearray(size) for i in range(len(payload)): payload[i] = yield from _getch() return control, payload def _encode_string(value: str) -> bytearray: """Encodes a length-value string""" encoded = value.encode() return bytearray((len(encoded) >> 8, len(encoded) & 0xFF)) + encoded def _write_packet( sock, control: int, payload: bytearray | bytes, packet_id: Optional[int] = None, ) -> None: if packet_id is not None: header = bytearray( (control, len(payload) + 2, (packet_id >> 8) & 0xFF, packet_id & 0xFF) ) else: header = bytearray((control, len(payload))) print("mqtt: >", header, payload) sock.write(header + payload) def _send_connect(sock: socket.socket, username: str, password: str): flags = 0x02 if username: flags |= 0x80 if password: flags |= 0x40 payload = ( _encode_string("MQTT") + b"\x04" # Protocol level + bytearray((flags,)) + bytearray((0, _UPDATE_SECONDS * 2)) # Keep alive seconds + _encode_string("") # Client identifier ) if username: payload += _encode_string(username) if password: payload += _encode_string(password) _write_packet(sock, _CONNECT << 4, payload) def _send_subscribe(sock: socket.socket, topic: str): packet_id = hash(topic) & 0xFFFF _write_packet( sock, (_SUBSCRIBE << 4) | 2, _encode_string(topic) + # QOS b"\x00", packet_id, ) def _send_publish(sock: socket.socket, topic: str, value: str): _write_packet(sock, _PUBLISH << 4, _encode_string(topic) + value.encode()) def _sender(sock: socket.socket, path: str) -> Generator[None, None, None]: _send_subscribe(sock, path + "/#") while True: _send_publish(sock, path + "/available", "online") yield from asynced.delay(_UPDATE_SECONDS) def _receiver() -> Generator[Optional[Message], Optional[int], None]: while True: control, payload = yield from _get_packet() if (control >> 4) == _PUBLISH: qos = (control // _QOS) & 3 topic_length = (payload[0] << 8) | payload[1] topic = payload[2 : 2 + topic_length].decode() has_packet_id = qos == 1 or qos == 2 packet_id_length = 2 if has_packet_id else 0 value = payload[2 + topic_length + packet_id_length :].decode() print("mqtt: topic update:", topic, value) yield Message(topic, value) def client( sock: socket.socket, path: str, username: str = "", password: str = "", ) -> Generator[Optional[Message], Optional[int], None]: _send_connect(sock, username, password) connack, payload = yield from _get_packet() if connack != _CONNACK << 4: raise Exception("Expected CONNACK") if len(payload) < 2: raise Exception("CONNACK is too short") if payload[1] != 0: raise Exception("CONNACK has an error code") receiver = _receiver() sender = _sender(sock, path) got = yield None while True: sender.send(None) got = yield receiver.send(got)