Browse Source

Can now interpret contents of cmds 1 and 2

pull/2/head
Martin Grill 3 years ago
parent
commit
8ec61d40db
  1. 57
      tools/rpi/ahoy.py

57
tools/rpi/ahoy.py

@ -8,6 +8,7 @@ import argparse
import time
import struct
import crcmod
from datetime import datetime
from RF24 import RF24, RF24_PA_LOW, RF24_PA_MAX, RF24_250KBPS
radio = RF24(22, 0, 1000000)
@ -25,6 +26,7 @@ inv_ser = 114174608145 # my inverter
f_crc_m = crcmod.predefined.mkPredefinedCrcFun('modbus')
f_crc8 = crcmod.mkCrcFun(0x101, initCrc=0, xorOut=0)
def ser_to_hm_addr(s):
"""
Calculate the 4 bytes that the HM devices use in their internal messages to
@ -33,6 +35,7 @@ def ser_to_hm_addr(s):
bcd = int(str(s)[-8:], base=16)
return struct.pack('>L', bcd)
def ser_to_esb_addr(s):
"""
Convert a Hoymiles inverter/DTU serial number into its
@ -48,9 +51,11 @@ def ser_to_esb_addr(s):
air_order = ser_to_hm_addr(s)[::-1] + b'\x01'
return air_order[::-1]
def compose_0x80_msg(dst_ser_no=72220200, src_ser_no=72220200, ts=None):
"""
Create a valid 0x80 request with the given parameters, and containing the current system time.
Create a valid 0x80 request with the given parameters, and containing the
current system time.
"""
if not ts:
@ -80,12 +85,50 @@ def compose_0x80_msg(dst_ser_no=72220200, src_ser_no=72220200, ts=None):
p = p + struct.pack('B', crc8)
return p
def print_addr(a):
print(f"ser# {a} ", end='')
print(f" -> HM {' '.join([f'{x:02x}' for x in ser_to_hm_addr(a)])}", end='')
print(f" -> ESB {' '.join([f'{x:02x}' for x in ser_to_esb_addr(a)])}")
def on_receive(p):
"""
Callback: get's invoked whenever a packet has been received.
:param p: Payload of the received packet.
"""
d = {}
ts = datetime.utcnow()
ts_unixtime = ts.timestamp()
print(ts.isoformat(), end=' ')
# interpret content
if p[0] == 0x95:
src, dst, cmd = struct.unpack('>LLB', p[1:10])
src_s = f'{src:08x}'
dst_s = f'{dst:08x}'
print(f'MSG src={src_s}, dst={dst_s}, cmd={cmd}, ', end=' ')
if cmd==1:
unknown1, u1, i1, p1, u2, i2, p2, unknown2 = struct.unpack(
'>HHHHHHHH', p[10:26])
print(f'u1={u1/10}V, i1={i1/100}A, p1={p1/10}W, ', end='')
print(f'u2={u2/10}V, i2={i2/100}A, p2={p2/10}W, ', end='')
print(f'unknown1={unknown1}, unknown2={unknown2}')
elif cmd==2:
uk1, uk2, uk3, uk4, uk5, u, f, p = struct.unpack(
'>HHHHHHHH', p[10:26])
print(f'u={u/10}V, f={f/100}Hz, p={p/10}W, ', end='')
print(f'uk1={uk1}, ', end='')
print(f'uk2={uk2}, ', end='')
print(f'uk3={uk3}, ', end='')
print(f'uk4={uk4}, ', end='')
print(f'uk5={uk5}')
else:
print(f'unknown cmd {cmd}')
else:
print(f'unknown frame id {p[0]}')
def main_loop():
@ -112,7 +155,8 @@ def main_loop():
radio.startListening()
if ctr<3:
radio.printPrettyDetails()
pass
# radio.printPrettyDetails()
t_end = time.monotonic_ns()+1e9
while time.monotonic_ns() < t_end:
@ -122,17 +166,20 @@ def main_loop():
payload = radio.read(size)
print(f"Received {size} bytes on pipe {pipe_number}: " +
" ".join([f"{b:02x}" for b in payload]))
on_receive(payload)
radio.stopListening() # put radio in TX mode
radio.setChannel(40)
radio.openWritingPipe(ser_to_esb_addr(inv_ser))
if ctr<3:
radio.printPrettyDetails()
pass
# radio.printPrettyDetails()
ts = int(time.time())
payload = compose_0x80_msg(src_ser_no=dtu_ser, dst_ser_no=inv_ser, ts=ts)
print(f"{ctr:5d}: len={len(payload)} | " + " ".join([f"{b:02x}" for b in payload]))
print(f"{ctr:5d}: len={len(payload)} | " + " ".join([f"{b:02x}" for b in payload]),
flush=True)
radio.write(payload) # will always yield 'True' because auto-ack is disabled
ctr = ctr + 1
@ -147,7 +194,7 @@ if __name__ == "__main__":
radio.setPALevel(RF24_PA_LOW) # RF24_PA_MAX is default
# radio.printDetails(); # (smaller) function that prints raw register values
radio.printPrettyDetails(); # (larger) function that prints human readable data
# radio.printPrettyDetails(); # (larger) function that prints human readable data
try:
main_loop()

Loading…
Cancel
Save