Browse Source

Add payload debug function print_table_unpack

Helps recognize values in unknown payloads by decoding data from payload
with different offsets, data types and print results as aligned table

Enhances DebugDecodeAny class
pull/65/head
Jan-Jonas Sämann 2 years ago
parent
commit
6b47291ac1
  1. 220
      tools/rpi/hoymiles/decoders/__init__.py

220
tools/rpi/hoymiles/decoders/__init__.py

@ -10,6 +10,52 @@ from datetime import datetime, timedelta
import crcmod
f_crc_m = crcmod.predefined.mkPredefinedCrcFun('modbus')
f_crc8 = crcmod.mkCrcFun(0x101, initCrc=0, xorOut=0)
def g_unpack(s_fmt, s_buf):
"""Chunk unpack helper
:param s_fmt: struct format string
:type s_fmt: str
:param s_buf: buffer to unpack
:type s_buf: bytes
:return: decoded data iterator
:rtype: generator object
"""
cs = struct.calcsize(s_fmt)
s_exc = len(s_buf) % cs
return struct.iter_unpack(s_fmt, s_buf[:len(s_buf) - s_exc])
def print_table_unpack(s_fmt, payload, cw=6):
"""
Print table of decoded numbers with different offsets
Helps recognizing values in unknown payloads
:param s_fmt: struct format string
:type s_fmt: str
:param payload: bytes data
:type payload: bytes
:param cw: cell width
:type cw: int
:return: None
"""
l_hexlified = [f'{byte:02x}' for byte in payload]
print(f'{"Pos": <{cw}}', end='')
print(''.join([f'{num: >{cw}}' for num in range(0, len(payload))]))
print(f'{"Hex": <{cw}}', end='')
print(''.join([f'{byte: >{cw}}' for byte in l_hexlified]))
l_fmt = struct.calcsize(s_fmt)
if len(payload) >= l_fmt:
for offset in range(0, l_fmt):
print(f'{s_fmt: <{cw}}', end='')
print(' ' * cw * offset, end='')
print(''.join(
[f'{num[0]: >{cw*l_fmt}}' for num in g_unpack(s_fmt, payload[offset:])]))
class Response:
""" All Response Shared methods """
@ -140,8 +186,18 @@ class UnknownResponse(Response):
"""
return ' '.join([f'{byte:02x}' for byte in self.response])
@property
def valid_crc(self):
def validate_crc8(self):
"""
Checks if self.response has valid CRC8
:return: if crc is available and correct
:rtype: bool
"""
# check crc
pcrc = struct.unpack('>B', self.response[-1:])[0]
return f_crc8(self.response[:-1]) == pcrc
def validate_crc_m(self):
"""
Checks if self.response has valid Modbus CRC
@ -152,113 +208,10 @@ class UnknownResponse(Response):
pcrc = struct.unpack('>H', self.response[-2:])[0]
return f_crc_m(self.response[:-2]) == pcrc
@property
def dump_longs(self):
"""Get all data, interpreted as long"""
if len(self.response) < 3:
return None
res = self.response
rem = len(res) % 16
res = res[:rem*-1]
vals = None
if len(res) % 16 == 0:
rlen = len(res)/4
vals = struct.unpack(f'>{int(rlen)}L', res)
return vals
@property
def dump_longs_pad1(self):
"""Get all data, interpreted as long"""
if len(self.response) < 5:
return None
res = self.response[2:]
rem = len(res) % 16
res = res[:rem*-1]
vals = None
if len(res) % 16 == 0:
rlen = len(res)/4
vals = struct.unpack(f'>{int(rlen)}L', res)
return vals
@property
def dump_longs_pad2(self):
"""Get all data, interpreted as long"""
if len(self.response) < 7:
return None
def unpack_table(self, *args):
"""Access shared debug function"""
print_table_unpack(*args)
res = self.response[4:]
rem = len(res) % 16
res = res[:rem*-1]
vals = None
if len(res) % 16 == 0:
rlen = len(res)/4
vals = struct.unpack(f'>{int(rlen)}L', res)
return vals
@property
def dump_longs_pad3(self):
"""Get all data, interpreted as long"""
if len(self.response) < 9:
return None
res = self.response[6:]
rem = len(res) % 16
res = res[:rem*-1]
vals = None
if len(res) % 16 == 0:
rlen = len(res)/4
vals = struct.unpack(f'>{int(rlen)}L', res)
return vals
@property
def dump_shorts(self):
"""Get all data, interpreted as short"""
if len(self.response) < 3:
return None
res = self.response
rem = len(res) % 4
res = res[:rem*-1]
vals = None
if len(res) % 4 == 0:
rlen = len(res)/2
vals = struct.unpack(f'>{int(rlen)}H', res)
return vals
@property
def dump_shorts_pad1(self):
"""Get all data, interpreted as short"""
if len(self.response) < 4:
return None
res = self.response[1:]
rem = len(res) % 4
res = res[:rem*-1]
vals = None
if len(res) % 4 == 0:
rlen = len(res)/2
vals = struct.unpack(f'>{int(rlen)}H', res)
return vals
class EventsResponse(UnknownResponse):
""" Hoymiles micro-inverter event log decode helper """
@ -337,7 +290,7 @@ class EventsResponse(UnknownResponse):
def __init__(self, *args, **params):
super().__init__(*args, **params)
crc_valid = self.valid_crc
crc_valid = self.validate_crc_m()
if crc_valid:
print(' payload has valid modbus crc')
self.response = self.response[:-2]
@ -365,7 +318,12 @@ class DebugDecodeAny(UnknownResponse):
def __init__(self, *args, **params):
super().__init__(*args, **params)
crc_valid = self.valid_crc
crc8_valid = self.validate_crc8()
if crc8_valid:
print(' payload has valid crc8')
self.response = self.response[:-1]
crc_valid = self.validate_crc_m()
if crc_valid:
print(' payload has valid modbus crc')
self.response = self.response[:-2]
@ -373,41 +331,17 @@ class DebugDecodeAny(UnknownResponse):
l_payload = len(self.response)
print(f' payload has {l_payload} bytes')
longs = self.dump_longs
if not longs:
print(' type long : unable to decode (len or not mod 4)')
else:
print(' type long : ' + str(longs))
print()
print('Field view: int')
print_table_unpack('>B', self.response)
longs = self.dump_longs_pad1
if not longs:
print(' type long pad1 : unable to decode (len or not mod 4)')
else:
print(' type long pad1 : ' + str(longs))
print()
print('Field view: shorts')
print_table_unpack('>H', self.response)
longs = self.dump_longs_pad2
if not longs:
print(' type long pad2 : unable to decode (len or not mod 4)')
else:
print(' type long pad2 : ' + str(longs))
longs = self.dump_longs_pad3
if not longs:
print(' type long pad3 : unable to decode (len or not mod 4)')
else:
print(' type long pad3 : ' + str(longs))
shorts = self.dump_shorts
if not shorts:
print(' type short : unable to decode (len or not mod 2)')
else:
print(' type short : ' + str(shorts))
shorts = self.dump_shorts_pad1
if not shorts:
print(' type short pad1: unable to decode (len or not mod 2)')
else:
print(' type short pad1: ' + str(shorts))
print()
print('Field view: longs')
print_table_unpack('>L', self.response)
try:
if len(self.response) > 2:

Loading…
Cancel
Save