167 lines
4.1 KiB
Python
167 lines
4.1 KiB
Python
import socket
|
|
|
|
import asynced
|
|
|
|
from typing import Generator, Optional, Tuple
|
|
|
|
_CONNECT = 1
|
|
_CONNACK = 2
|
|
_PUBLISH = 3
|
|
_PUBACK = 4
|
|
_PUBREC = 5
|
|
_PUBREL = 6
|
|
_PUBCOMP = 7
|
|
_SUBSCRIBE = 8
|
|
_SUBACK = 9
|
|
_UNSUBSCRIBE = 10
|
|
_UNSUBACK = 11
|
|
_PINGREQ = 12
|
|
_PINGRESP = 13
|
|
_DISCONNECT = 14
|
|
|
|
_DUP = 8
|
|
_QOS = 2
|
|
_RETAIN = 1
|
|
|
|
_UPDATE_SECONDS = 120
|
|
|
|
|
|
class Message:
|
|
def __init__(self, topic: str, value: str):
|
|
self.topic = topic
|
|
self.value = value
|
|
|
|
topic: str
|
|
value: str
|
|
|
|
|
|
def _getch() -> Generator[None, Optional[int], int]:
|
|
"""Returns the first non-None value sent to the generator"""
|
|
while True:
|
|
got = yield None
|
|
if got is not None:
|
|
return got
|
|
|
|
|
|
def _get_packet() -> Generator[None, Optional[int], Tuple[int, bytearray]]:
|
|
"""Decodes and returns the next packet"""
|
|
control = yield from _getch()
|
|
size = 0
|
|
shift = 0
|
|
while True:
|
|
got = yield from _getch()
|
|
size |= (got & 0x7F) << shift
|
|
shift += 7
|
|
if (got & 0x80) == 0:
|
|
break
|
|
payload = bytearray(size)
|
|
for i in range(len(payload)):
|
|
payload[i] = yield from _getch()
|
|
return control, payload
|
|
|
|
|
|
def _encode_string(value: str) -> bytearray:
|
|
"""Encodes a length-value string"""
|
|
encoded = value.encode()
|
|
return bytearray((len(encoded) >> 8, len(encoded) & 0xFF)) + encoded
|
|
|
|
|
|
def _write_packet(
|
|
sock,
|
|
control: int,
|
|
payload: bytearray | bytes,
|
|
packet_id: Optional[int] = None,
|
|
) -> None:
|
|
if packet_id is not None:
|
|
header = bytearray(
|
|
(control, len(payload) + 2, (packet_id >> 8) & 0xFF, packet_id & 0xFF)
|
|
)
|
|
else:
|
|
header = bytearray((control, len(payload)))
|
|
print("mqtt: >", header, payload)
|
|
sock.write(header + payload)
|
|
|
|
|
|
def _send_connect(sock: socket.socket, username: str, password: str):
|
|
flags = 0x02
|
|
if username:
|
|
flags |= 0x80
|
|
if password:
|
|
flags |= 0x40
|
|
payload = (
|
|
_encode_string("MQTT")
|
|
+ b"\x04" # Protocol level
|
|
+ bytearray((flags,))
|
|
+ bytearray((0, _UPDATE_SECONDS * 2)) # Keep alive seconds
|
|
+ _encode_string("") # Client identifier
|
|
)
|
|
if username:
|
|
payload += _encode_string(username)
|
|
if password:
|
|
payload += _encode_string(password)
|
|
_write_packet(sock, _CONNECT << 4, payload)
|
|
|
|
|
|
def _send_subscribe(sock: socket.socket, topic: str):
|
|
packet_id = hash(topic) & 0xFFFF
|
|
_write_packet(
|
|
sock,
|
|
(_SUBSCRIBE << 4) | 2,
|
|
_encode_string(topic) +
|
|
# QOS
|
|
b"\x00",
|
|
packet_id,
|
|
)
|
|
|
|
|
|
def _send_publish(sock: socket.socket, topic: str, value: str):
|
|
_write_packet(sock, _PUBLISH << 4, _encode_string(topic) + value.encode())
|
|
|
|
|
|
def _sender(sock: socket.socket, path: str) -> Generator[None, None, None]:
|
|
_send_subscribe(sock, path + "/#")
|
|
|
|
while True:
|
|
_send_publish(sock, path + "/available", "online")
|
|
yield from asynced.delay(_UPDATE_SECONDS)
|
|
|
|
|
|
def _receiver() -> Generator[Optional[Message], Optional[int], None]:
|
|
while True:
|
|
control, payload = yield from _get_packet()
|
|
if (control >> 4) == _PUBLISH:
|
|
qos = (control // _QOS) & 3
|
|
topic_length = (payload[0] << 8) | payload[1]
|
|
topic = payload[2 : 2 + topic_length].decode()
|
|
has_packet_id = qos == 1 or qos == 2
|
|
packet_id_length = 2 if has_packet_id else 0
|
|
value = payload[2 + topic_length + packet_id_length :].decode()
|
|
print("mqtt: topic update:", topic, value)
|
|
yield Message(topic, value)
|
|
|
|
|
|
def client(
|
|
sock: socket.socket,
|
|
path: str,
|
|
username: str = "",
|
|
password: str = "",
|
|
) -> Generator[Optional[Message], Optional[int], None]:
|
|
_send_connect(sock, username, password)
|
|
|
|
connack, payload = yield from _get_packet()
|
|
if connack != _CONNACK << 4:
|
|
raise Exception("Expected CONNACK")
|
|
|
|
if len(payload) < 2:
|
|
raise Exception("CONNACK is too short")
|
|
if payload[1] != 0:
|
|
raise Exception("CONNACK has an error code")
|
|
|
|
receiver = _receiver()
|
|
sender = _sender(sock, path)
|
|
|
|
got = yield None
|
|
while True:
|
|
sender.send(None)
|
|
got = yield receiver.send(got)
|