mirror of https://github.com/lumapu/ahoy.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
170 lines
5.7 KiB
170 lines
5.7 KiB
|
|
#handles at-commands send and response evaluation, a thread can catch the URCs of the module
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import datetime
|
|
import threading
|
|
import binascii
|
|
import serial.tools.list_ports
|
|
import _libs2.file_handling as fh
|
|
|
|
|
|
#some global variables
|
|
__version_info__ = ('2022', '12', '19')
|
|
__version_string__ = '%40s' % ('hm inverter handling version: ' + '-'.join(__version_info__))
|
|
lck2 = threading.Lock() # used for AT-commands to access serial port
|
|
com_stop = True
|
|
PROMT = '/> '
|
|
RESPO= '<< '
|
|
SENDI = '>> '
|
|
|
|
|
|
def get_version():
|
|
return __hm_handling_version__
|
|
|
|
|
|
# async reading thread of comport data (e.g. for URCs)
|
|
class AsyncComRead(threading.Thread):
|
|
def __init__(self, _com, _addTimestamp, _debug):
|
|
# calling superclass init
|
|
threading.Thread.__init__(self)
|
|
self.com = _com
|
|
self.addTS = _addTimestamp
|
|
self.debug = _debug
|
|
self.data = ''
|
|
|
|
def run(self):
|
|
global com_stop
|
|
if self.debug: print('\n +++ Thread: com reader started', end='', flush=True)
|
|
urc_millis = millis_since(0)
|
|
urc_ln = True
|
|
while not com_stop:
|
|
with lck2:
|
|
urc = []
|
|
while self.com.in_waiting:
|
|
line = self.com.readline().decode('ascii','ignore')
|
|
line = line.replace('\n','').replace('\r','')
|
|
if len(line) > 2: # filter out empty lines
|
|
if self.addTS:
|
|
urc.append('\n%s%s%s' % (timestamp(0), RESPO, line))
|
|
else:
|
|
urc.append('\n%s%s' % (RESPO, line))
|
|
|
|
#todo: return values into callback function
|
|
|
|
if not self.com.is_open or com_stop: break
|
|
if urc:
|
|
print('\r', end='', flush=True) # return to line-start
|
|
fh.my_print('\n%s' % ( (' '.join(urc)).replace('\n ', '\n') ))
|
|
# urc_millis = millis_since(0)
|
|
# urc_ln = False
|
|
print('\n%s' % (PROMT,), end = '') # print promt
|
|
|
|
# if (not urc_ln) and (millis_since(urc_millis) >= 3000):
|
|
# urc_ln = True
|
|
# print('\n%s' % (PROMT,), end = '')
|
|
|
|
time.sleep(0.1)
|
|
if self.debug: print('\n +++ Thread: com reader stopped', end='', flush=True)
|
|
|
|
|
|
#def parse_payload(_self, _data):
|
|
|
|
|
|
# opens the com-port and starts the Async com reading thread
|
|
def openCom_startComReadThread(_tp, _com, _debug):
|
|
global com_stop
|
|
#start urc com logging thread
|
|
if not _com.is_open: _com.open()
|
|
_tp = AsyncComRead(_com, True, _debug)
|
|
_tp.setDaemon = True
|
|
com_stop = False
|
|
_tp.start()
|
|
return _tp
|
|
|
|
|
|
# stops the com-reading thread and closes com-port
|
|
def stopComReadThread_closeCom(_tp, _com):
|
|
global com_stop
|
|
com_stop = True
|
|
_tp.join(1.1)
|
|
if _com.is_open: _com.close()
|
|
return com_stop
|
|
|
|
|
|
# opens com-port if the given port-string exists and starts the Async com-port reading thread
|
|
def check_port_and_openCom(_tp, _com, _portstr, MATCH_sec, TIMEOUT_sec, _DEBUG):
|
|
if (check_wait_port(_portstr, MATCH_sec, TIMEOUT_sec, _DEBUG) == 0):
|
|
fh.my_print('\n +++ Port enumerated and detected: ' + _portstr.lower())
|
|
# reopen port
|
|
_tp = openCom_startComReadThread(_tp, _com, _DEBUG)
|
|
else:
|
|
fh.my_print('\n +++ No USB com-port detected again --> end')
|
|
_tp = None
|
|
|
|
return _tp
|
|
|
|
|
|
|
|
# simply list the available com-ports of the system
|
|
def list_ports():
|
|
portlist = serial.tools.list_ports.comports()
|
|
fh.my_print('\n +++ detected ports:')
|
|
for element in portlist:
|
|
fh.my_print(' %s' % (str(element.device)))
|
|
|
|
|
|
|
|
|
|
## simple Terminal to enter and display cmds, resonses are read by urc thread
|
|
def simple_terminal(_com, _debug):
|
|
next = True
|
|
while(next):
|
|
time.sleep(1.0)
|
|
_cmd_in = input('%s'%(PROMT,))
|
|
fh.my_print('\n%s%s' % (PROMT, _cmd_in, ), False)
|
|
|
|
if "start" in _cmd_in:
|
|
#end terminal and return true
|
|
return True
|
|
elif "kill" in _cmd_in:
|
|
#ends terminal and returns false
|
|
return False
|
|
elif '_time' in _cmd_in or '_tnow' in _cmd_in:
|
|
_uartstr = 't{:d}'.format(int(time.time()))
|
|
_com.write( _uartstr.encode('ascii') )
|
|
fh.my_print('\n%s%s%s' % (timestamp(0), SENDI, _uartstr, ))
|
|
continue
|
|
|
|
_uartstr = _cmd_in.encode('ascii')
|
|
_com.write( ('%s%s' % (SENDI, _uartstr, )).encode('ascii') )
|
|
fh.my_print('\n%s%s%s' % (timestamp(0), SENDI, _uartstr, ))
|
|
|
|
return True
|
|
|
|
|
|
|
|
# calculates the time in milli-seconds from a given start time
|
|
def millis_since(thisstart):
|
|
return int(round(time.time() * 1000) - thisstart)
|
|
|
|
|
|
# different timestamp formats
|
|
def timestamp(format):
|
|
if(format==0):
|
|
return ('%s: ' % (datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%f")[:-3]))
|
|
elif format==1:
|
|
return str(time.time()) + ": "
|
|
elif format==2:
|
|
#for file name extention
|
|
return datetime.datetime.now().strftime("%Y-%m-%d_%H%M_")
|
|
elif format==3:
|
|
#for file name extention with seconds
|
|
return datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S_")
|
|
elif format==4:
|
|
return datetime.datetime.now().strftime("%H%M%S.%f")
|
|
else:
|
|
return datetime.datetime.now().strftime("%y/%m/%d,%H:%M:%S+01")
|
|
|
|
|