diff --git a/tools/rpi/hoymiles/decoders/__init__.py b/tools/rpi/hoymiles/decoders/__init__.py index 1771a3e4..6836e73e 100644 --- a/tools/rpi/hoymiles/decoders/__init__.py +++ b/tools/rpi/hoymiles/decoders/__init__.py @@ -1,6 +1,9 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- import struct +import crcmod + +f_crc_m = crcmod.predefined.mkPredefinedCrcFun('modbus') class StatusResponse: e_keys = ['voltage','current','power','energy_total','energy_daily'] @@ -58,51 +61,148 @@ class UnknownResponse: def hex_ascii(self): return ' '.join([f'{b:02x}' for b in self.response]) + @property + def valid_crc(self): + # check crc + pcrc = struct.unpack('>H', self.response[-2:])[0] + return f_crc_m(self.response[:-2]) == pcrc + @property def dump_longs(self): + if len(self.response) < 5: + return None + res = self.response - n = len(res)/4 + + r = len(res) % 16 + res = res[:r*-1] vals = None - if n % 4 == 0: + if len(res) % 16 == 0: + n = len(res)/4 vals = struct.unpack(f'>{int(n)}L', res) return vals @property def dump_longs_pad1(self): - res = self.response[1:] - n = len(res)/4 + if len(self.response) < 7: + return None + + res = self.response[2:] + + r = len(res) % 16 + res = res[:r*-1] + + vals = None + if len(res) % 16 == 0: + n = len(res)/4 + vals = struct.unpack(f'>{int(n)}L', res) + + return vals + + @property + def dump_longs_pad2(self): + if len(self.response) < 9: + return None + + res = self.response[4:] + + r = len(res) % 16 + res = res[:r*-1] + + vals = None + if len(res) % 16 == 0: + n = len(res)/4 + vals = struct.unpack(f'>{int(n)}L', res) + + return vals + + @property + def dump_longs_pad3(self): + if len(self.response) < 11: + return None + + res = self.response[6:] + + r = len(res) % 16 + res = res[:r*-1] vals = None - if n % 4 == 0: + if len(res) % 16 == 0: + n = len(res)/4 vals = struct.unpack(f'>{int(n)}L', res) return vals @property def dump_shorts(self): - n = len(self.response)/2 + if len(self.response) < 5: + return None + + res = self.response + + r = len(res) % 4 + res = res[:r*-1] vals = None - if n % 2 == 0: - vals = struct.unpack(f'>{int(n)}H', self.response) + if len(res) % 4 == 0: + n = len(res)/2 + vals = struct.unpack(f'>{int(n)}H', res) + return vals @property def dump_shorts_pad1(self): + if len(self.response) < 6: + return None + res = self.response[1:] - n = len(res)/2 + + r = len(res) % 4 + res = res[:r*-1] vals = None - if n % 2 == 0: + if len(res) % 4 == 0: + n = len(res)/2 vals = struct.unpack(f'>{int(n)}H', res) + return vals +class HM600_Decode11(UnknownResponse): + def __init__(self, response): + self.response = response + + crc_valid = self.valid_crc + if crc_valid: + print(' payload has valid modbus crc') + self.response = response[:-2] + + status = self.response[:2] + + chunk_size = 12 + for c in range(2, len(self.response), chunk_size): + chunk = self.response[c:c+chunk_size] + print(' '.join([f'{b:02x}' for b in chunk]) + ': ') + print(' BBLHl : ' + str(struct.unpack('>BBLHl', chunk))) + print() + +class HM600_Decode12(HM600_Decode11): + def __init__(self, response): + super().__init__(response) + class DEBUG_DecodeAny(UnknownResponse): def __init__(self, response): self.response = response + crc_valid = self.valid_crc + if crc_valid: + print(' payload has valid modbus crc') + self.response = response[:-2] + + l_payload = len(self.response) + print(f' payload has {l_payload} bytes') + longs = self.dump_longs if not longs: print(' type long : unable to decode (len or not mod 4)') @@ -115,6 +215,18 @@ class DEBUG_DecodeAny(UnknownResponse): else: print(' type long pad1 : ' + str(longs)) + longs = self.dump_longs_pad2 + if not longs: + print(' type long pad2 : unable to decode (len or not mod 4)') + else: + print(' type long pad2 : ' + str(longs)) + + longs = self.dump_longs_pad3 + if not longs: + print(' type long pad3 : unable to decode (len or not mod 4)') + else: + print(' type long pad3 : ' + str(longs)) + shorts = self.dump_shorts if not shorts: print(' type short : unable to decode (len or not mod 2)')