315 lines
7.3 KiB
Python
315 lines
7.3 KiB
Python
import json
|
|
import sys
|
|
import pprint
|
|
import re
|
|
import dataclasses
|
|
|
|
import yaml
|
|
import click
|
|
|
|
from dataclass_wizard import YAMLWizard, DumpMeta, IS
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Parameter(YAMLWizard):
|
|
name: str
|
|
expression: str
|
|
unit: str | None
|
|
klass: str | None
|
|
min: float | None
|
|
max: float | None
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Pid(YAMLWizard):
|
|
init: dict[str, str]
|
|
target: str | None
|
|
extended: bool
|
|
pid: str
|
|
receive_address: str | None
|
|
parameters: list[Parameter]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Car(YAMLWizard):
|
|
model: str
|
|
pids: list[Pid]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Sensor(YAMLWizard):
|
|
platform: str
|
|
id: str
|
|
name: str
|
|
unit_of_measurement: str
|
|
device_class: str | None
|
|
state_class: str
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class CanBusSend(YAMLWizard):
|
|
can_id: int
|
|
data: list[int]
|
|
use_extended_id: bool
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class GlobalsSet(YAMLWizard):
|
|
id: str
|
|
value: str
|
|
|
|
|
|
Action = CanBusSend | GlobalsSet | str
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Script(YAMLWizard):
|
|
id: str
|
|
mode: str
|
|
then: list[dict[str, Action]]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class OnFrame(YAMLWizard):
|
|
can_id: int
|
|
can_id_mask: int
|
|
use_extended_id: bool
|
|
then: list[dict[str, Action]]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Canbus:
|
|
platform: str
|
|
on_frame: list[OnFrame]
|
|
id: str = "!extend can0"
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Document(YAMLWizard):
|
|
script: list[Script]
|
|
sensor: list[Sensor]
|
|
canbus: list[Canbus]
|
|
|
|
|
|
for kind in (Document, Sensor, Script, Action, CanBusSend, OnFrame, Canbus):
|
|
DumpMeta(key_transform="SNAKE", skip_if=IS(None)).bind_to(kind)
|
|
|
|
|
|
def _cname(name: str):
|
|
return re.sub(r"[^A-Za-z0-9]+", "_", name.lower())
|
|
|
|
|
|
def _id(name: str):
|
|
return "car_" + _cname(name)
|
|
|
|
|
|
def _float_or_none(d: dict, key: str) -> float | None:
|
|
if key not in d:
|
|
return None
|
|
if not d[key]:
|
|
return None
|
|
return float(d[key])
|
|
|
|
|
|
def _parse_parameter(parameter: dict) -> Parameter:
|
|
return Parameter(
|
|
name=parameter["name"],
|
|
expression=parameter["expression"],
|
|
unit=parameter.get("unit", None),
|
|
klass=parameter.get("class", None),
|
|
min=_float_or_none(parameter, "min"),
|
|
max=_float_or_none(parameter, "max"),
|
|
)
|
|
|
|
|
|
# See https://cdn.sparkfun.com/assets/4/e/5/0/2/ELM327_AT_Commands.pdf
|
|
_KNOWN_COMMANDS = ("ATSP", "ATCP", "ATSH", "ATFCSH", "ATCRA")
|
|
|
|
|
|
def _parse_pid(pid: dict) -> Pid:
|
|
init = {}
|
|
for command in pid.get("pid_init", "").split(";"):
|
|
if command:
|
|
for prefix in _KNOWN_COMMANDS:
|
|
if command.startswith(prefix):
|
|
init[prefix] = command[len(prefix) :]
|
|
break
|
|
else:
|
|
init[prefix] = None
|
|
|
|
protocol = init.get("ATSP", None)
|
|
extended = protocol in ("7",)
|
|
if extended:
|
|
target = init["ATCP"] + init["ATSH"]
|
|
elif "ATSH" in init:
|
|
target = init["ATSH"]
|
|
else:
|
|
target = None
|
|
receive_address = init.get("ATCRA", None)
|
|
return Pid(
|
|
init=init,
|
|
target=target,
|
|
extended=extended,
|
|
pid=pid["pid"],
|
|
receive_address=receive_address,
|
|
parameters=[
|
|
_parse_parameter(x) for x in pid["parameters"] if "parameters" in pid
|
|
],
|
|
)
|
|
|
|
|
|
def _parse_car(car: dict) -> Car:
|
|
return Car(model=car["car_model"], pids=[_parse_pid(x) for x in car["pids"]])
|
|
|
|
|
|
def _to_sensor(parameter: Parameter) -> Sensor:
|
|
return Sensor(
|
|
platform="template",
|
|
id=_id(parameter.name),
|
|
name=parameter.name,
|
|
unit_of_measurement=parameter.unit,
|
|
device_class=parameter.klass,
|
|
state_class="measurement",
|
|
)
|
|
|
|
|
|
def _encode_pid(pid: str) -> list[int]:
|
|
out = [len(pid) // 2]
|
|
for i in range(0, len(pid), 2):
|
|
out.append(int(pid[i : i + 2], 16))
|
|
out = (out + [0x55] * 8)[:8]
|
|
return out
|
|
|
|
|
|
def _to_poll_pid(pid: Pid) -> Action:
|
|
return CanBusSend(
|
|
can_id=int(pid.target, 16),
|
|
data=_encode_pid(pid.pid),
|
|
use_extended_id=pid.extended,
|
|
)
|
|
|
|
|
|
def _to_set_pid_index(idx: int) -> Action:
|
|
return GlobalsSet(id="pid_index", value=str(idx))
|
|
|
|
|
|
def _to_poll_script(pids: list[Pid]) -> Script:
|
|
actions = []
|
|
for i, pid in enumerate(pids):
|
|
actions.append({"globals.set": _to_set_pid_index(i)})
|
|
actions.append({"canbus.send": _to_poll_pid(pid)})
|
|
actions.append({"delay": "100ms"})
|
|
|
|
return Script(id="poll_car", mode="single", then=actions)
|
|
|
|
|
|
def _x_index(token: str) -> int:
|
|
assert token[0] == "B"
|
|
return int(token[1:])
|
|
|
|
|
|
def _rewrite_expression(exp: str) -> str:
|
|
tokens = []
|
|
token = ""
|
|
|
|
for ch in exp:
|
|
if ch in "[]():+-*/":
|
|
if token:
|
|
tokens.append(token)
|
|
tokens.append(ch)
|
|
token = ""
|
|
else:
|
|
token += ch
|
|
|
|
if token:
|
|
tokens.append(token)
|
|
|
|
# Replace sequences
|
|
next = tokens
|
|
while "[" in next:
|
|
left = next.index("[")
|
|
before = next[:left]
|
|
start, colon, end, bracket = next[left + 1 : left + 1 + 4]
|
|
after = next[left + 1 + 4 :]
|
|
assert colon == ":"
|
|
assert bracket == "]"
|
|
next = before
|
|
start = _x_index(start)
|
|
end = _x_index(end)
|
|
next.append("static_cast<float>(0")
|
|
for i in range(start, end + 1):
|
|
next.append(f"| (x[{i}] << {(end-i)*8})")
|
|
next.append(")")
|
|
next += after
|
|
|
|
for i, token in enumerate(next):
|
|
if token[0] == "B":
|
|
next[i] = f"static_cast<float>(x[{_x_index(token)}])"
|
|
|
|
return "".join(next)
|
|
|
|
|
|
def _decode_parameters(pid: Pid) -> str:
|
|
lines = []
|
|
|
|
for parameter in pid.parameters:
|
|
lines.append(f"// {parameter.expression}")
|
|
lines.append(f"{{ float value = {_rewrite_expression(parameter.expression)};")
|
|
lines.append(f"id({_id(parameter.name)}).publish_state(value); }}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _to_on_frame(pid: Pid, pid_index: int) -> OnFrame:
|
|
decoder = [
|
|
{
|
|
"if": {
|
|
"condition": {"lambda": f"return id(pid_index) == {pid_index};"},
|
|
"then": [{"lambda": _decode_parameters(pid)}],
|
|
}
|
|
}
|
|
]
|
|
|
|
return OnFrame(
|
|
can_id=int(pid.receive_address, 16),
|
|
can_id_mask=2 ** (29 if pid.extended else 11) - 1,
|
|
use_extended_id=pid.extended,
|
|
then=decoder,
|
|
# then=[{"lambda": _decode_parameters(pid)}],
|
|
)
|
|
|
|
|
|
def _to_canbus(pids: list[Pid]) -> Canbus:
|
|
on_frames = []
|
|
for i, pid in enumerate(pids):
|
|
on_frames.append(_to_on_frame(pid, i))
|
|
|
|
return Canbus(platform="esp32_can", on_frame=on_frames)
|
|
|
|
|
|
def _to_document(car: Car) -> Document:
|
|
sensors = []
|
|
scripts = []
|
|
canbuses = []
|
|
for pid in car.pids:
|
|
for parameter in pid.parameters:
|
|
sensors.append(_to_sensor(parameter))
|
|
scripts.append(_to_poll_script(car.pids))
|
|
canbuses.append(_to_canbus(car.pids))
|
|
return Document(sensor=sensors, script=scripts, canbus=canbuses)
|
|
|
|
|
|
@click.command()
|
|
@click.option("--profile", default="vehicle_profiles.json", type=click.File())
|
|
@click.option("--car-model", default="ID")
|
|
def main(profile, car_model: str):
|
|
db = json.load(profile)
|
|
|
|
for car_dict in db["cars"]:
|
|
if car_model in car_dict["car_model"]:
|
|
car = _parse_car(car_dict)
|
|
print(_to_document(car).to_yaml())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|