Browse Source

Merge pull request #65 from Sprinterfreak/pypackage

Add payload debug function print_table_unpack
pull/69/head
lumapu 3 years ago
committed by GitHub
parent
commit
b995cc0a93
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 229
      tools/rpi/hoymiles/decoders/__init__.py

229
tools/rpi/hoymiles/decoders/__init__.py

@ -10,6 +10,52 @@ from datetime import datetime, timedelta
import crcmod import crcmod
f_crc_m = crcmod.predefined.mkPredefinedCrcFun('modbus') f_crc_m = crcmod.predefined.mkPredefinedCrcFun('modbus')
f_crc8 = crcmod.mkCrcFun(0x101, initCrc=0, xorOut=0)
def g_unpack(s_fmt, s_buf):
"""Chunk unpack helper
:param s_fmt: struct format string
:type s_fmt: str
:param s_buf: buffer to unpack
:type s_buf: bytes
:return: decoded data iterator
:rtype: generator object
"""
cs = struct.calcsize(s_fmt)
s_exc = len(s_buf) % cs
return struct.iter_unpack(s_fmt, s_buf[:len(s_buf) - s_exc])
def print_table_unpack(s_fmt, payload, cw=6):
"""
Print table of decoded numbers with different offsets
Helps recognizing values in unknown payloads
:param s_fmt: struct format string
:type s_fmt: str
:param payload: bytes data
:type payload: bytes
:param cw: cell width
:type cw: int
:return: None
"""
l_hexlified = [f'{byte:02x}' for byte in payload]
print(f'{"Pos": <{cw}}', end='')
print(''.join([f'{num: >{cw}}' for num in range(0, len(payload))]))
print(f'{"Hex": <{cw}}', end='')
print(''.join([f'{byte: >{cw}}' for byte in l_hexlified]))
l_fmt = struct.calcsize(s_fmt)
if len(payload) >= l_fmt:
for offset in range(0, l_fmt):
print(f'{s_fmt: <{cw}}', end='')
print(' ' * cw * offset, end='')
print(''.join(
[f'{num[0]: >{cw*l_fmt}}' for num in g_unpack(s_fmt, payload[offset:])]))
class Response: class Response:
""" All Response Shared methods """ """ All Response Shared methods """
@ -140,8 +186,18 @@ class UnknownResponse(Response):
""" """
return ' '.join([f'{byte:02x}' for byte in self.response]) return ' '.join([f'{byte:02x}' for byte in self.response])
@property def validate_crc8(self):
def valid_crc(self): """
Checks if self.response has valid CRC8
:return: if crc is available and correct
:rtype: bool
"""
# check crc
pcrc = struct.unpack('>B', self.response[-1:])[0]
return f_crc8(self.response[:-1]) == pcrc
def validate_crc_m(self):
""" """
Checks if self.response has valid Modbus CRC Checks if self.response has valid Modbus CRC
@ -152,113 +208,10 @@ class UnknownResponse(Response):
pcrc = struct.unpack('>H', self.response[-2:])[0] pcrc = struct.unpack('>H', self.response[-2:])[0]
return f_crc_m(self.response[:-2]) == pcrc return f_crc_m(self.response[:-2]) == pcrc
@property def unpack_table(self, *args):
def dump_longs(self): """Access shared debug function"""
"""Get all data, interpreted as long""" print_table_unpack(*args)
if len(self.response) < 3:
return None
res = self.response
rem = len(res) % 16
res = res[:rem*-1]
vals = None
if len(res) % 16 == 0:
rlen = len(res)/4
vals = struct.unpack(f'>{int(rlen)}L', res)
return vals
@property
def dump_longs_pad1(self):
"""Get all data, interpreted as long"""
if len(self.response) < 5:
return None
res = self.response[2:]
rem = len(res) % 16
res = res[:rem*-1]
vals = None
if len(res) % 16 == 0:
rlen = len(res)/4
vals = struct.unpack(f'>{int(rlen)}L', res)
return vals
@property
def dump_longs_pad2(self):
"""Get all data, interpreted as long"""
if len(self.response) < 7:
return None
res = self.response[4:]
rem = len(res) % 16
res = res[:rem*-1]
vals = None
if len(res) % 16 == 0:
rlen = len(res)/4
vals = struct.unpack(f'>{int(rlen)}L', res)
return vals
@property
def dump_longs_pad3(self):
"""Get all data, interpreted as long"""
if len(self.response) < 9:
return None
res = self.response[6:]
rem = len(res) % 16
res = res[:rem*-1]
vals = None
if len(res) % 16 == 0:
rlen = len(res)/4
vals = struct.unpack(f'>{int(rlen)}L', res)
return vals
@property
def dump_shorts(self):
"""Get all data, interpreted as short"""
if len(self.response) < 3:
return None
res = self.response
rem = len(res) % 4
res = res[:rem*-1]
vals = None
if len(res) % 4 == 0:
rlen = len(res)/2
vals = struct.unpack(f'>{int(rlen)}H', res)
return vals
@property
def dump_shorts_pad1(self):
"""Get all data, interpreted as short"""
if len(self.response) < 4:
return None
res = self.response[1:]
rem = len(res) % 4
res = res[:rem*-1]
vals = None
if len(res) % 4 == 0:
rlen = len(res)/2
vals = struct.unpack(f'>{int(rlen)}H', res)
return vals
class EventsResponse(UnknownResponse): class EventsResponse(UnknownResponse):
""" Hoymiles micro-inverter event log decode helper """ """ Hoymiles micro-inverter event log decode helper """
@ -337,7 +290,7 @@ class EventsResponse(UnknownResponse):
def __init__(self, *args, **params): def __init__(self, *args, **params):
super().__init__(*args, **params) super().__init__(*args, **params)
crc_valid = self.valid_crc crc_valid = self.validate_crc_m()
if crc_valid: if crc_valid:
print(' payload has valid modbus crc') print(' payload has valid modbus crc')
self.response = self.response[:-2] self.response = self.response[:-2]
@ -365,7 +318,12 @@ class DebugDecodeAny(UnknownResponse):
def __init__(self, *args, **params): def __init__(self, *args, **params):
super().__init__(*args, **params) super().__init__(*args, **params)
crc_valid = self.valid_crc crc8_valid = self.validate_crc8()
if crc8_valid:
print(' payload has valid crc8')
self.response = self.response[:-1]
crc_valid = self.validate_crc_m()
if crc_valid: if crc_valid:
print(' payload has valid modbus crc') print(' payload has valid modbus crc')
self.response = self.response[:-2] self.response = self.response[:-2]
@ -373,41 +331,17 @@ class DebugDecodeAny(UnknownResponse):
l_payload = len(self.response) l_payload = len(self.response)
print(f' payload has {l_payload} bytes') print(f' payload has {l_payload} bytes')
longs = self.dump_longs print()
if not longs: print('Field view: int')
print(' type long : unable to decode (len or not mod 4)') print_table_unpack('>B', self.response)
else:
print(' type long : ' + str(longs))
longs = self.dump_longs_pad1 print()
if not longs: print('Field view: shorts')
print(' type long pad1 : unable to decode (len or not mod 4)') print_table_unpack('>H', self.response)
else:
print(' type long pad1 : ' + str(longs))
longs = self.dump_longs_pad2 print()
if not longs: print('Field view: longs')
print(' type long pad2 : unable to decode (len or not mod 4)') print_table_unpack('>L', self.response)
else:
print(' type long pad2 : ' + str(longs))
longs = self.dump_longs_pad3
if not longs:
print(' type long pad3 : unable to decode (len or not mod 4)')
else:
print(' type long pad3 : ' + str(longs))
shorts = self.dump_shorts
if not shorts:
print(' type short : unable to decode (len or not mod 2)')
else:
print(' type short : ' + str(shorts))
shorts = self.dump_shorts_pad1
if not shorts:
print(' type short pad1: unable to decode (len or not mod 2)')
else:
print(' type short pad1: ' + str(shorts))
try: try:
if len(self.response) > 2: if len(self.response) > 2:
@ -423,6 +357,9 @@ class DebugDecodeAny(UnknownResponse):
# 1121-Series Intervers, 1 MPPT, 1 Phase # 1121-Series Intervers, 1 MPPT, 1 Phase
class Hm300Decode02(EventsResponse):
""" Inverter generic events log """
class Hm300Decode0B(StatusResponse): class Hm300Decode0B(StatusResponse):
""" 1121-series mirco-inverters status data """ """ 1121-series mirco-inverters status data """
@ -476,6 +413,9 @@ class Hm300Decode12(EventsResponse):
# 1141-Series Inverters, 2 MPPT, 1 Phase # 1141-Series Inverters, 2 MPPT, 1 Phase
class Hm600Decode02(EventsResponse):
""" Inverter generic events log """
class Hm600Decode0B(StatusResponse): class Hm600Decode0B(StatusResponse):
""" 1141-series mirco-inverters status data """ """ 1141-series mirco-inverters status data """
@ -558,6 +498,9 @@ class Hm600Decode12(EventsResponse):
# 1161-Series Inverters, 2 MPPT, 1 Phase # 1161-Series Inverters, 2 MPPT, 1 Phase
class Hm1200Decode02(EventsResponse):
""" Inverter generic events log """
class Hm1200Decode0B(StatusResponse): class Hm1200Decode0B(StatusResponse):
""" 1161-series mirco-inverters status data """ """ 1161-series mirco-inverters status data """

Loading…
Cancel
Save