autohuttli/genpids.py

315 lines
7.3 KiB
Python

import json
import sys
import pprint
import re
import dataclasses
import yaml
import click
from dataclass_wizard import YAMLWizard, DumpMeta, IS
@dataclasses.dataclass
class Parameter(YAMLWizard):
name: str
expression: str
unit: str | None
klass: str | None
min: float | None
max: float | None
@dataclasses.dataclass
class Pid(YAMLWizard):
init: dict[str, str]
target: str | None
extended: bool
pid: str
receive_address: str | None
parameters: list[Parameter]
@dataclasses.dataclass
class Car(YAMLWizard):
model: str
pids: list[Pid]
@dataclasses.dataclass
class Sensor(YAMLWizard):
platform: str
id: str
name: str
unit_of_measurement: str
device_class: str | None
state_class: str
@dataclasses.dataclass
class CanBusSend(YAMLWizard):
can_id: int
data: list[int]
use_extended_id: bool
@dataclasses.dataclass
class GlobalsSet(YAMLWizard):
id: str
value: str
Action = CanBusSend | GlobalsSet | str
@dataclasses.dataclass
class Script(YAMLWizard):
id: str
mode: str
then: list[dict[str, Action]]
@dataclasses.dataclass
class OnFrame(YAMLWizard):
can_id: int
can_id_mask: int
use_extended_id: bool
then: list[dict[str, Action]]
@dataclasses.dataclass
class Canbus:
platform: str
on_frame: list[OnFrame]
id: str = "!extend can0"
@dataclasses.dataclass
class Document(YAMLWizard):
script: list[Script]
sensor: list[Sensor]
canbus: list[Canbus]
for kind in (Document, Sensor, Script, Action, CanBusSend, OnFrame, Canbus):
DumpMeta(key_transform="SNAKE", skip_if=IS(None)).bind_to(kind)
def _cname(name: str):
return re.sub(r"[^A-Za-z0-9]+", "_", name.lower())
def _id(name: str):
return "car_" + _cname(name)
def _float_or_none(d: dict, key: str) -> float | None:
if key not in d:
return None
if not d[key]:
return None
return float(d[key])
def _parse_parameter(parameter: dict) -> Parameter:
return Parameter(
name=parameter["name"],
expression=parameter["expression"],
unit=parameter.get("unit", None),
klass=parameter.get("class", None),
min=_float_or_none(parameter, "min"),
max=_float_or_none(parameter, "max"),
)
# See https://cdn.sparkfun.com/assets/4/e/5/0/2/ELM327_AT_Commands.pdf
_KNOWN_COMMANDS = ("ATSP", "ATCP", "ATSH", "ATFCSH", "ATCRA")
def _parse_pid(pid: dict) -> Pid:
init = {}
for command in pid.get("pid_init", "").split(";"):
if command:
for prefix in _KNOWN_COMMANDS:
if command.startswith(prefix):
init[prefix] = command[len(prefix) :]
break
else:
init[prefix] = None
protocol = init.get("ATSP", None)
extended = protocol in ("7",)
if extended:
target = init["ATCP"] + init["ATSH"]
elif "ATSH" in init:
target = init["ATSH"]
else:
target = None
receive_address = init.get("ATCRA", None)
return Pid(
init=init,
target=target,
extended=extended,
pid=pid["pid"],
receive_address=receive_address,
parameters=[
_parse_parameter(x) for x in pid["parameters"] if "parameters" in pid
],
)
def _parse_car(car: dict) -> Car:
return Car(model=car["car_model"], pids=[_parse_pid(x) for x in car["pids"]])
def _to_sensor(parameter: Parameter) -> Sensor:
return Sensor(
platform="template",
id=_id(parameter.name),
name=parameter.name,
unit_of_measurement=parameter.unit,
device_class=parameter.klass,
state_class="measurement",
)
def _encode_pid(pid: str) -> list[int]:
out = [len(pid) // 2]
for i in range(0, len(pid), 2):
out.append(int(pid[i : i + 2], 16))
out = (out + [0x55] * 8)[:8]
return out
def _to_poll_pid(pid: Pid) -> Action:
return CanBusSend(
can_id=int(pid.target, 16),
data=_encode_pid(pid.pid),
use_extended_id=pid.extended,
)
def _to_set_pid_index(idx: int) -> Action:
return GlobalsSet(id="pid_index", value=str(idx))
def _to_poll_script(pids: list[Pid]) -> Script:
actions = []
for i, pid in enumerate(pids):
actions.append({"globals.set": _to_set_pid_index(i)})
actions.append({"canbus.send": _to_poll_pid(pid)})
actions.append({"delay": "100ms"})
return Script(id="poll_car", mode="single", then=actions)
def _x_index(token: str) -> int:
assert token[0] == "B"
return int(token[1:])
def _rewrite_expression(exp: str) -> str:
tokens = []
token = ""
for ch in exp:
if ch in "[]():+-*/":
if token:
tokens.append(token)
tokens.append(ch)
token = ""
else:
token += ch
if token:
tokens.append(token)
# Replace sequences
next = tokens
while "[" in next:
left = next.index("[")
before = next[:left]
start, colon, end, bracket = next[left + 1 : left + 1 + 4]
after = next[left + 1 + 4 :]
assert colon == ":"
assert bracket == "]"
next = before
start = _x_index(start)
end = _x_index(end)
next.append("static_cast<float>(0")
for i in range(start, end + 1):
next.append(f"| (x[{i}] << {(end-i)*8})")
next.append(")")
next += after
for i, token in enumerate(next):
if token[0] == "B":
next[i] = f"static_cast<float>(x[{_x_index(token)}])"
return "".join(next)
def _decode_parameters(pid: Pid) -> str:
lines = []
for parameter in pid.parameters:
lines.append(f"// {parameter.expression}")
lines.append(f"{{ float value = {_rewrite_expression(parameter.expression)};")
lines.append(f"id({_id(parameter.name)}).publish_state(value); }}")
return "\n".join(lines)
def _to_on_frame(pid: Pid, pid_index: int) -> OnFrame:
decoder = [
{
"if": {
"condition": {"lambda": f"return id(pid_index) == {pid_index};"},
"then": [{"lambda": _decode_parameters(pid)}],
}
}
]
return OnFrame(
can_id=int(pid.receive_address, 16),
can_id_mask=2 ** (29 if pid.extended else 11) - 1,
use_extended_id=pid.extended,
then=decoder,
# then=[{"lambda": _decode_parameters(pid)}],
)
def _to_canbus(pids: list[Pid]) -> Canbus:
on_frames = []
for i, pid in enumerate(pids):
on_frames.append(_to_on_frame(pid, i))
return Canbus(platform="esp32_can", on_frame=on_frames)
def _to_document(car: Car) -> Document:
sensors = []
scripts = []
canbuses = []
for pid in car.pids:
for parameter in pid.parameters:
sensors.append(_to_sensor(parameter))
scripts.append(_to_poll_script(car.pids))
canbuses.append(_to_canbus(car.pids))
return Document(sensor=sensors, script=scripts, canbus=canbuses)
@click.command()
@click.option("--profile", default="vehicle_profiles.json", type=click.File())
@click.option("--car-model", default="ID")
def main(profile, car_model: str):
db = json.load(profile)
for car_dict in db["cars"]:
if car_model in car_dict["car_model"]:
car = _parse_car(car_dict)
print(_to_document(car).to_yaml())
if __name__ == "__main__":
main()