127 lines
4.3 KiB
Python
127 lines
4.3 KiB
Python
import os
|
|
import curses
|
|
import time
|
|
|
|
MERThreshold = 0
|
|
|
|
def display_percent(stdscr, id, meaning, value):
|
|
stdscr.addstr(2+id, 0, f"{meaning}: {float(value)/100:.2f} %\n")
|
|
|
|
def display_mer(stdscr, id, meaning, value):
|
|
stdscr.addstr(2+id, 0, f"{meaning}: {float(value)/10:.1f} dB (need {MERThreshold:.1f} dB)\n")
|
|
|
|
def display_yes_no(stdscr, id, meaning, value):
|
|
stdscr.addstr(2+id, 0, f"{meaning}: {'yes' if value != 0 else 'no'}\n")
|
|
|
|
def display_state(stdscr, id, meaning, value):
|
|
states = [
|
|
"initialising",
|
|
"searching",
|
|
"found headers",
|
|
"DVB-S lock",
|
|
"DVB-S2 lock"
|
|
]
|
|
if value < len(states):
|
|
stdscr.addstr(2+id, 0, f"{meaning}: {states[value]}\n")
|
|
else:
|
|
stdscr.addstr(2+id, 0, f"{meaning}: {value}\n")
|
|
|
|
def display_modcod_s(stdscr, id, meaning, value):
|
|
modcods = [
|
|
"QPSK 1/2",
|
|
"QPSK 2/3",
|
|
"QPSK 3/4",
|
|
"QPSK 5/6",
|
|
"QPSK 6/7",
|
|
"QPSK 7/8"
|
|
]
|
|
if value < len(modcods):
|
|
stdscr.addstr(2+id, 0, f"{meaning}: {modcods[value]}\n")
|
|
else:
|
|
stdscr.addstr(2+id, 0, f"{meaning}: {value}\n")
|
|
global MERThreshold
|
|
MERThreshold = 0
|
|
|
|
def display_modcod_s2(stdscr, id, meaning, value):
|
|
modcods = [
|
|
"DummyPL", "QPSK 1/4", "QPSK 1/3", "QPSK 2/5", "QPSK 1/2", "QPSK 3/5", "QPSK 2/3",
|
|
"QPSK 3/4", "QPSK 4/5", "QPSK 5/6", "QPSK 8/9", "QPSK 9/10", "8PSK 3/5", "8PSK 2/3",
|
|
"8PSK 3/4", "8PSK 5/6", "8PSK 8/9", "8PSK 9/10", "16APSK 2/3", "16APSK 3/4", "16APSK 4/5",
|
|
"16APSK 5/6", "16APSK 8/9", "16APSK 9/10", "32APSK 3/4", "32APSK 4/5", "32APSK 5/6",
|
|
"32APSK 8/9", "32APSK 9/10"
|
|
]
|
|
MER_thresholds = [
|
|
0, -2.3, -1.2, -0.3, 1.0, 2.3, 3.1, 4.1, 4.7, 5.2, 6.2, 6.5,
|
|
5.5, 6.6, 7.9, 9.4, 10.7, 11.0, 9.0, 10.2, 11.0, 11.6, 12.9,
|
|
13.2, 12.8, 13.7, 14.3, 15.7, 16.1
|
|
]
|
|
if value < len(modcods):
|
|
stdscr.addstr(2+id, 0, f"{meaning}: {modcods[value]}\n")
|
|
else:
|
|
stdscr.addstr(2+id, 0, f"{meaning}: {value}\n")
|
|
global MERThreshold
|
|
if value < len(MER_thresholds):
|
|
MERThreshold = MER_thresholds[value]
|
|
else:
|
|
MERThreshold = 0
|
|
|
|
def display_status(stdscr, id, value):
|
|
meanings = [
|
|
"", "State", "LNA Gain", "Puncture Rate", "I Symbol Power", "Q Symbol Power",
|
|
"Carrier Frequency", "I Constellation", "Q Constellation", "Symbol Rate",
|
|
"Viterbi Error Rate", "BER", "MER", "Service Provider", "Service", "Null Ratio",
|
|
"ES PID", "ES Type", "MODCOD", "Short Frames", "Pilot Symbols", "LDPC Error Count",
|
|
"BCH Error Count", "BCH Uncorrected", "LNB Voltage Enabled", "LNB H Polarisation",
|
|
"AGC1 Gain", "AGC2 Gain"
|
|
]
|
|
if id < len(meanings):
|
|
if id in [19, 20, 23, 24, 25]:
|
|
display_yes_no(stdscr, id, meanings[id], int(value))
|
|
elif id == 1:
|
|
state = int(value)
|
|
display_state(stdscr, id, meanings[id], state)
|
|
elif id in [10, 11]:
|
|
display_percent(stdscr, id, meanings[id], int(value))
|
|
elif id == 12:
|
|
display_mer(stdscr, id, meanings[id], int(value))
|
|
elif id == 18:
|
|
state = int(value)
|
|
if state == 3:
|
|
display_modcod_s(stdscr, id, meanings[id], int(value))
|
|
elif state == 4:
|
|
display_modcod_s2(stdscr, id, meanings[id], int(value))
|
|
else:
|
|
stdscr.addstr(2+id, 0, f"{meanings[id]}: {value}\n")
|
|
else:
|
|
stdscr.addstr(2+id, 0, f"{id}: {value}\n")
|
|
|
|
def main(stdscr):
|
|
os.system('clear')
|
|
stdscr.addstr(1, 0, "LongMynd DATV Receiver Status\n")
|
|
|
|
try:
|
|
status_fifo = open("longmynd_main_status", "r")
|
|
except Exception as e:
|
|
stdscr.addstr(2, 0, f"Failed to open status fifo: {e}\n")
|
|
stdscr.refresh()
|
|
time.sleep(2)
|
|
return
|
|
|
|
stdscr.scrollok(True)
|
|
stdscr.addstr(2, 0, "Listening\n")
|
|
stdscr.refresh()
|
|
|
|
while True:
|
|
status_message = status_fifo.readline().strip()
|
|
if status_message.startswith('$'):
|
|
parts = status_message[1:].split(',')
|
|
if parts:
|
|
n = int(parts[0])
|
|
m_str = parts[1] if len(parts) > 1 else ''
|
|
display_status(stdscr, n, m_str)
|
|
stdscr.refresh()
|
|
|
|
if __name__ == "__main__":
|
|
curses.wrapper(main)
|
|
|