import json import sys import pprint import re import dataclasses import yaml import click from dataclass_wizard import YAMLWizard, DumpMeta, IS @dataclasses.dataclass class Parameter(YAMLWizard): name: str expression: str unit: str | None klass: str | None min: float | None max: float | None @dataclasses.dataclass class Pid(YAMLWizard): init: dict[str, str] target: str | None extended: bool pid: str receive_address: str | None parameters: list[Parameter] @dataclasses.dataclass class Car(YAMLWizard): model: str pids: list[Pid] @dataclasses.dataclass class Sensor(YAMLWizard): platform: str id: str name: str unit_of_measurement: str device_class: str | None state_class: str @dataclasses.dataclass class CanBusSend(YAMLWizard): can_id: int data: list[int] use_extended_id: bool @dataclasses.dataclass class GlobalsSet(YAMLWizard): id: str value: str Action = CanBusSend | GlobalsSet | str @dataclasses.dataclass class Script(YAMLWizard): id: str mode: str then: list[dict[str, Action]] @dataclasses.dataclass class OnFrame(YAMLWizard): can_id: int can_id_mask: int use_extended_id: bool then: list[dict[str, Action]] @dataclasses.dataclass class Canbus: platform: str on_frame: list[OnFrame] id: str = "!extend can0" @dataclasses.dataclass class Document(YAMLWizard): script: list[Script] sensor: list[Sensor] canbus: list[Canbus] for kind in (Document, Sensor, Script, Action, CanBusSend, OnFrame, Canbus): DumpMeta(key_transform="SNAKE", skip_if=IS(None)).bind_to(kind) def _cname(name: str): return re.sub(r"[^A-Za-z0-9]+", "_", name.lower()) def _id(name: str): return "car_" + _cname(name) def _float_or_none(d: dict, key: str) -> float | None: if key not in d: return None if not d[key]: return None return float(d[key]) def _parse_parameter(parameter: dict) -> Parameter: return Parameter( name=parameter["name"], expression=parameter["expression"], unit=parameter.get("unit", None), klass=parameter.get("class", None), min=_float_or_none(parameter, "min"), max=_float_or_none(parameter, "max"), ) # See https://cdn.sparkfun.com/assets/4/e/5/0/2/ELM327_AT_Commands.pdf _KNOWN_COMMANDS = ("ATSP", "ATCP", "ATSH", "ATFCSH", "ATCRA") def _parse_pid(pid: dict) -> Pid: init = {} for command in pid.get("pid_init", "").split(";"): if command: for prefix in _KNOWN_COMMANDS: if command.startswith(prefix): init[prefix] = command[len(prefix) :] break else: init[prefix] = None protocol = init.get("ATSP", None) extended = protocol in ("7",) if extended: target = init["ATCP"] + init["ATSH"] elif "ATSH" in init: target = init["ATSH"] else: target = None receive_address = init.get("ATCRA", None) return Pid( init=init, target=target, extended=extended, pid=pid["pid"], receive_address=receive_address, parameters=[ _parse_parameter(x) for x in pid["parameters"] if "parameters" in pid ], ) def _parse_car(car: dict) -> Car: return Car(model=car["car_model"], pids=[_parse_pid(x) for x in car["pids"]]) def _to_sensor(parameter: Parameter) -> Sensor: return Sensor( platform="template", id=_id(parameter.name), name=parameter.name, unit_of_measurement=parameter.unit, device_class=parameter.klass, state_class="measurement", ) def _encode_pid(pid: str) -> list[int]: out = [len(pid) // 2] for i in range(0, len(pid), 2): out.append(int(pid[i : i + 2], 16)) out = (out + [0x55] * 8)[:8] return out def _to_poll_pid(pid: Pid) -> Action: return CanBusSend( can_id=int(pid.target, 16), data=_encode_pid(pid.pid), use_extended_id=pid.extended, ) def _to_set_pid_index(idx: int) -> Action: return GlobalsSet(id="pid_index", value=str(idx)) def _to_poll_script(pids: list[Pid]) -> Script: actions = [] for i, pid in enumerate(pids): actions.append({"globals.set": _to_set_pid_index(i)}) actions.append({"canbus.send": _to_poll_pid(pid)}) actions.append({"delay": "100ms"}) return Script(id="poll_car", mode="single", then=actions) def _x_index(token: str) -> int: assert token[0] == "B" return int(token[1:]) def _rewrite_expression(exp: str) -> str: tokens = [] token = "" for ch in exp: if ch in "[]():+-*/": if token: tokens.append(token) tokens.append(ch) token = "" else: token += ch if token: tokens.append(token) # Replace sequences next = tokens while "[" in next: left = next.index("[") before = next[:left] start, colon, end, bracket = next[left + 1 : left + 1 + 4] after = next[left + 1 + 4 :] assert colon == ":" assert bracket == "]" next = before start = _x_index(start) end = _x_index(end) next.append("static_cast(0") for i in range(start, end + 1): next.append(f"| (x[{i}] << {(end-i)*8})") next.append(")") next += after for i, token in enumerate(next): if token[0] == "B": next[i] = f"static_cast(x[{_x_index(token)}])" return "".join(next) def _decode_parameters(pid: Pid) -> str: lines = [] for parameter in pid.parameters: lines.append(f"// {parameter.expression}") lines.append(f"{{ float value = {_rewrite_expression(parameter.expression)};") lines.append(f"id({_id(parameter.name)}).publish_state(value); }}") return "\n".join(lines) def _to_on_frame(pid: Pid, pid_index: int) -> OnFrame: decoder = [ { "if": { "condition": {"lambda": f"return id(pid_index) == {pid_index};"}, "then": [{"lambda": _decode_parameters(pid)}], } } ] return OnFrame( can_id=int(pid.receive_address, 16), can_id_mask=2 ** (29 if pid.extended else 11) - 1, use_extended_id=pid.extended, then=decoder, # then=[{"lambda": _decode_parameters(pid)}], ) def _to_canbus(pids: list[Pid]) -> Canbus: on_frames = [] for i, pid in enumerate(pids): on_frames.append(_to_on_frame(pid, i)) return Canbus(platform="esp32_can", on_frame=on_frames) def _to_document(car: Car) -> Document: sensors = [] scripts = [] canbuses = [] for pid in car.pids: for parameter in pid.parameters: sensors.append(_to_sensor(parameter)) scripts.append(_to_poll_script(car.pids)) canbuses.append(_to_canbus(car.pids)) return Document(sensor=sensors, script=scripts, canbus=canbuses) @click.command() @click.option("--profile", default="vehicle_profiles.json", type=click.File()) @click.option("--car-model", default="ID") def main(profile, car_model: str): db = json.load(profile) for car_dict in db["cars"]: if car_model in car_dict["car_model"]: car = _parse_car(car_dict) print(_to_document(car).to_yaml()) if __name__ == "__main__": main()