From 5101b22ff7f5429712ec66d93e7bce12e7ceb40b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan-Jonas=20S=C3=A4mann?= Date: Sun, 15 May 2022 16:09:37 +0200 Subject: [PATCH] Provide Influx2 output plugin Allready thougt about it earlier and requested in #29 --- tools/rpi/ahoy.yml.example | 9 ++ tools/rpi/hoymiles/__main__.py | 13 +++ tools/rpi/hoymiles/outputs.py | 138 ++++++++++++++++++++++++++++ tools/rpi/optional-requirements.txt | 1 + 4 files changed, 161 insertions(+) create mode 100644 tools/rpi/hoymiles/outputs.py create mode 100644 tools/rpi/optional-requirements.txt diff --git a/tools/rpi/ahoy.yml.example b/tools/rpi/ahoy.yml.example index 5c27c003..ef139c48 100644 --- a/tools/rpi/ahoy.yml.example +++ b/tools/rpi/ahoy.yml.example @@ -16,6 +16,15 @@ ahoy: user: 'username' password: 'password' + # Influx2 output + influxdb: + disabled: true + url: 'http://influxserver.local:8086' + org: 'myorg' + token: '' + bucket: 'telegraf/autogen' + measurement: 'hoymiles' + dtu: serial: 99978563001 diff --git a/tools/rpi/hoymiles/__main__.py b/tools/rpi/hoymiles/__main__.py index 0a8c435e..d97e1091 100644 --- a/tools/rpi/hoymiles/__main__.py +++ b/tools/rpi/hoymiles/__main__.py @@ -91,6 +91,8 @@ def poll_inverter(inverter, retries=4): if mqtt_client: mqtt_send_status(mqtt_client, inverter_ser, data, topic=inverter.get('mqtt', {}).get('topic', None)) + if influx_client: + influx_client.store_status(result) def mqtt_send_status(broker, inverter_ser, data, topic=None): """ @@ -225,6 +227,17 @@ if __name__ == '__main__': mqtt_client.loop_start() mqtt_client.on_message = mqtt_on_command + influx_client = None + influx_config = ahoy_config.get('influxdb', {}) + if influx_config and not influx_config.get('disabled', False): + from .outputs import InfluxOutputPlugin + influx_client = InfluxOutputPlugin( + influx_config.get('url'), + influx_config.get('token'), + org=influx_config.get('org', ''), + bucket=influx_config.get('bucket', None), + measurement=influx_config.get('measurement', 'hoymiles')) + if not radio.begin(): raise RuntimeError('Can\'t open radio') diff --git a/tools/rpi/hoymiles/outputs.py b/tools/rpi/hoymiles/outputs.py new file mode 100644 index 00000000..f1b6a2d1 --- /dev/null +++ b/tools/rpi/hoymiles/outputs.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 + +import socket +from datetime import datetime, timezone +from hoymiles.decoders import StatusResponse + +try: + from influxdb_client import InfluxDBClient +except ModuleNotFoundError: + pass + +class OutputPluginFactory: + def __init__(self, **params): + """Initialize output plugin""" + + self.inverter_ser = params.get('inverter_ser', 0) + + def store_status(self, data): + raise NotImplementedError('The current output plugin does not implement store_status') + +class InfluxOutputPlugin(OutputPluginFactory): + def __init__(self, url, token, **params): + super().__init__(**params) + + self._bucket = params.get('bucket', 'hoymiles/autogen') + self._org = params.get('org', '') + self._measurement = params.get('measurement', + f'inverter,host={socket.gethostname()}') + + client = InfluxDBClient(url, token, bucket=self._bucket) + self.api = client.write_api() + + def store_status(self, response): + """ + Publish StatusResponse object + + :param influxdb.InfluxDBClient influx_client: A connected instance to Influx database + :param str inverter_ser: inverter serial + :param hoymiles.StatusResponse data: decoded inverter StatusResponse + :type response: hoymiles.StatusResponse + :param measurement: Influx measurement name + :type measurement: str + """ + + if not isinstance(response, StatusResponse): + raise RuntimeError('Data needs to be instance of StatusResponse') + + data = response.__dict__() + + measurement = self._measurement + f',location={data["inverter_ser"]}' + + data_stack = [] + + time_rx = datetime.now() + if 'time' in data and isinstance(data['time'], datetime): + time_rx = data['time'] + + # InfluxDB uses UTC + utctime = datetime.fromtimestamp(time_rx.timestamp(), tz=timezone.utc) + + # InfluxDB requires nanoseconds + ctime = int(utctime.timestamp() * 1e9) + + # AC Data + phase_id = 0 + for phase in data['phases']: + data_stack.append(f'{measurement},phase={phase_id},type=power value={phase["power"]} {ctime}') + data_stack.append(f'{measurement},phase={phase_id},type=voltage value={phase["voltage"]} {ctime}') + data_stack.append(f'{measurement},phase={phase_id},type=current value={phase["current"]} {ctime}') + phase_id = phase_id + 1 + + # DC Data + string_id = 0 + for string in data['strings']: + data_stack.append(f'{measurement},string={string_id},type=total value={string["energy_total"]/1000:.4f} {ctime}') + data_stack.append(f'{measurement},string={string_id},type=power value={string["power"]:.2f} {ctime}') + data_stack.append(f'{measurement},string={string_id},type=voltage value={string["voltage"]:.3f} {ctime}') + data_stack.append(f'{measurement},string={string_id},type=current value={string["current"]:3f} {ctime}') + string_id = string_id + 1 + # Global + data_stack.append(f'{measurement},type=frequency value={data["frequency"]:.3f} {ctime}') + data_stack.append(f'{measurement},type=temperature value={data["temperature"]:.2f} {ctime}') + + self.api.write(self._bucket, self._org, data_stack) + +try: + import paho.mqtt.client +except ModuleNotFoundError: + pass + +class MqttOutputPlugin(OutputPluginFactory): + def __init__(self, *args, **params): + super().__init__(*args, **params) + + mqtt_client = paho.mqtt.client.Client() + mqtt_client.username_pw_set(params.get('user', None), params.get('password', None)) + mqtt_client.connect(params.get('host', '127.0.0.1'), params.get('port', 1883)) + mqtt_client.loop_start() + + self.client = mqtt_client + + def store_status(self, response, **params): + """ + Publish StatusResponse object + + :param paho.mqtt.client.Client broker: mqtt-client instance + :param str inverter_ser: inverter serial + :param hoymiles.StatusResponse data: decoded inverter StatusResponse + :param topic: custom mqtt topic prefix (default: hoymiles/{inverter_ser}) + :type topic: str + """ + + if not isinstance(response, StatusResponse): + raise RuntimeError('Data needs to be instance of StatusResponse') + + data = response.__dict__() + + topic = params.get('topic', f'hoymiles/{inverter_ser}') + + # AC Data + phase_id = 0 + for phase in data['phases']: + self.mqtt_client.publish(f'{topic}/emeter/{phase_id}/power', phase['power']) + self.mqtt_client.publish(f'{topic}/emeter/{phase_id}/voltage', phase['voltage']) + self.mqtt_client.publish(f'{topic}/emeter/{phase_id}/current', phase['current']) + phase_id = phase_id + 1 + + # DC Data + string_id = 0 + for string in data['strings']: + self.mqtt_client.publish(f'{topic}/emeter-dc/{string_id}/total', string['energy_total']/1000) + self.mqtt_client.publish(f'{topic}/emeter-dc/{string_id}/power', string['power']) + self.mqtt_client.publish(f'{topic}/emeter-dc/{string_id}/voltage', string['voltage']) + self.mqtt_client.publish(f'{topic}/emeter-dc/{string_id}/current', string['current']) + string_id = string_id + 1 + # Global + self.mqtt_client.publish(f'{topic}/frequency', data['frequency']) + self.mqtt_client.publish(f'{topic}/temperature', data['temperature']) diff --git a/tools/rpi/optional-requirements.txt b/tools/rpi/optional-requirements.txt new file mode 100644 index 00000000..19297f02 --- /dev/null +++ b/tools/rpi/optional-requirements.txt @@ -0,0 +1 @@ +influxdb-client>=1.28.0