DatvRx/status.py
2024-06-04 19:03:30 +02:00

127 lines
4.3 KiB
Python

import os
import curses
import time
MERThreshold = 0
def display_percent(stdscr, id, meaning, value):
stdscr.addstr(2+id, 0, f"{meaning}: {float(value)/100:.2f} %\n")
def display_mer(stdscr, id, meaning, value):
stdscr.addstr(2+id, 0, f"{meaning}: {float(value)/10:.1f} dB (need {MERThreshold:.1f} dB)\n")
def display_yes_no(stdscr, id, meaning, value):
stdscr.addstr(2+id, 0, f"{meaning}: {'yes' if value != 0 else 'no'}\n")
def display_state(stdscr, id, meaning, value):
states = [
"initialising",
"searching",
"found headers",
"DVB-S lock",
"DVB-S2 lock"
]
if value < len(states):
stdscr.addstr(2+id, 0, f"{meaning}: {states[value]}\n")
else:
stdscr.addstr(2+id, 0, f"{meaning}: {value}\n")
def display_modcod_s(stdscr, id, meaning, value):
modcods = [
"QPSK 1/2",
"QPSK 2/3",
"QPSK 3/4",
"QPSK 5/6",
"QPSK 6/7",
"QPSK 7/8"
]
if value < len(modcods):
stdscr.addstr(2+id, 0, f"{meaning}: {modcods[value]}\n")
else:
stdscr.addstr(2+id, 0, f"{meaning}: {value}\n")
global MERThreshold
MERThreshold = 0
def display_modcod_s2(stdscr, id, meaning, value):
modcods = [
"DummyPL", "QPSK 1/4", "QPSK 1/3", "QPSK 2/5", "QPSK 1/2", "QPSK 3/5", "QPSK 2/3",
"QPSK 3/4", "QPSK 4/5", "QPSK 5/6", "QPSK 8/9", "QPSK 9/10", "8PSK 3/5", "8PSK 2/3",
"8PSK 3/4", "8PSK 5/6", "8PSK 8/9", "8PSK 9/10", "16APSK 2/3", "16APSK 3/4", "16APSK 4/5",
"16APSK 5/6", "16APSK 8/9", "16APSK 9/10", "32APSK 3/4", "32APSK 4/5", "32APSK 5/6",
"32APSK 8/9", "32APSK 9/10"
]
MER_thresholds = [
0, -2.3, -1.2, -0.3, 1.0, 2.3, 3.1, 4.1, 4.7, 5.2, 6.2, 6.5,
5.5, 6.6, 7.9, 9.4, 10.7, 11.0, 9.0, 10.2, 11.0, 11.6, 12.9,
13.2, 12.8, 13.7, 14.3, 15.7, 16.1
]
if value < len(modcods):
stdscr.addstr(2+id, 0, f"{meaning}: {modcods[value]}\n")
else:
stdscr.addstr(2+id, 0, f"{meaning}: {value}\n")
global MERThreshold
if value < len(MER_thresholds):
MERThreshold = MER_thresholds[value]
else:
MERThreshold = 0
def display_status(stdscr, id, value):
meanings = [
"", "State", "LNA Gain", "Puncture Rate", "I Symbol Power", "Q Symbol Power",
"Carrier Frequency", "I Constellation", "Q Constellation", "Symbol Rate",
"Viterbi Error Rate", "BER", "MER", "Service Provider", "Service", "Null Ratio",
"ES PID", "ES Type", "MODCOD", "Short Frames", "Pilot Symbols", "LDPC Error Count",
"BCH Error Count", "BCH Uncorrected", "LNB Voltage Enabled", "LNB H Polarisation",
"AGC1 Gain", "AGC2 Gain"
]
if id < len(meanings):
if id in [19, 20, 23, 24, 25]:
display_yes_no(stdscr, id, meanings[id], int(value))
elif id == 1:
state = int(value)
display_state(stdscr, id, meanings[id], state)
elif id in [10, 11]:
display_percent(stdscr, id, meanings[id], int(value))
elif id == 12:
display_mer(stdscr, id, meanings[id], int(value))
elif id == 18:
state = int(value)
if state == 3:
display_modcod_s(stdscr, id, meanings[id], int(value))
elif state == 4:
display_modcod_s2(stdscr, id, meanings[id], int(value))
else:
stdscr.addstr(2+id, 0, f"{meanings[id]}: {value}\n")
else:
stdscr.addstr(2+id, 0, f"{id}: {value}\n")
def main(stdscr):
os.system('clear')
stdscr.addstr(1, 0, "LongMynd DATV Receiver Status\n")
try:
status_fifo = open("longmynd_main_status", "r")
except Exception as e:
stdscr.addstr(2, 0, f"Failed to open status fifo: {e}\n")
stdscr.refresh()
time.sleep(2)
return
stdscr.scrollok(True)
stdscr.addstr(2, 0, "Listening\n")
stdscr.refresh()
while True:
status_message = status_fifo.readline().strip()
if status_message.startswith('$'):
parts = status_message[1:].split(',')
if parts:
n = int(parts[0])
m_str = parts[1] if len(parts) > 1 else ''
display_status(stdscr, n, m_str)
stdscr.refresh()
if __name__ == "__main__":
curses.wrapper(main)