DatvRx/waterfall.py
2024-06-04 19:03:30 +02:00

190 lines
5.6 KiB
Python

#!/usr/bin/python3
# ZR6TG - Tom - 2022/06/23
# basic signals detection code ported from https://github.com/m0dts/QO-100-WB-Live-Tune
import asyncio
import pygame
import websockets
import os
import pygame.gfxdraw
# CONFIGURATION
FFT_URL = "wss://eshail.batc.org.uk/wb/fft" #official batc fft
COL_BG = (20, 29, 43)
COL_SPECTRUM = (217, 127, 43)
COL_TEXT = (224,224,224)
WIDTH = 800
HEIGHT = 480
class Graphics:
def __init__(self, width, height):
self.start_freq = 10490.5
self.width = width
self.height = height
self.x_tab = (self.width-50) /922
self.font = pygame.font.SysFont('freesans', 20)
self.bigfont = pygame.font.SysFont('freesans', 180)
self.mediumfont = pygame.font.SysFont('freesans', 50)
def align_symbolrate(self, width):
if width < 0.002: return 0
if width < 0.065: return 0.035
if width < 0.086: return 0.066
if width < 0.195: return 0.125
if width < 0.277: return 0.250
if width < 0.388: return 0.333
if width < 0.700: return 0.500
if width < 1.2: return 1.0
if width < 1.6: return 1.5
if width < 2.2: return 2
return int(width)
async def find_signals(self, fft_data):
signals = []
i = 0
j = 0
noise_level = 11000
signal_threshold = 18000
in_signal = False
start_signal = 0
end_signal = 0
mid_signal = 0
signal_strength = 0
signal_bw = 0
signal_freq = 0
acc = 0
acc_i = 0
i = 2
while i < len(fft_data):
if in_signal == False:
if (fft_data[i] + fft_data[i-1] + fft_data[i-2]) / 3 > signal_threshold:
in_signal = True
start_signal = i
else:
if (fft_data[i] + fft_data[i-1] + fft_data[i-2]) / 3 < signal_threshold:
in_signal = False
end_signal = i
acc = 0
acc_i = 0
j = int(start_signal + (0.3 * ( end_signal - start_signal )))
while ( j < start_signal + (0.8 * (end_signal - start_signal))):
acc = acc + fft_data[j]
acc_i = acc_i + 1
j+=1
if acc_i == 0:
in_signal = False
continue
signal_strength = acc / acc_i
# find real start
j = start_signal
while (fft_data[j] - noise_level) < 0.75 * (signal_strength - noise_level):
start_signal = j
j+=1
# find real end
j = end_signal
end_signal_orig = j
while (fft_data[j] - noise_level) < 0.75 * (signal_strength - noise_level):
end_signal = j
if j <= 0:
end_signal = end_signal_orig
fake_end = True
break
j -= 1
mid_signal = start_signal + ((end_signal - start_signal)/2)
signal_freq = self.start_freq + (((mid_signal + 1) / (len(fft_data)) * 9))
signal_bw = self.align_symbolrate((end_signal - start_signal) * (9 / len(fft_data)))
if signal_bw >= 0.033:
signals.append({'start': start_signal, 'end' : end_signal, 'mid' : mid_signal, 'freq' : signal_freq, 'signal_strength' : signal_strength/255, 'signal_bw' : signal_bw})
i += 1
return signals
async def update(self, window, fft):
polygon_data = []
fft_data = []
x = 0
while ( x < len(fft)-1 ):
db = [fft[x],fft[x+1]]
#y = int.from_bytes(db, 'little')/255
fft_data.append(int.from_bytes(db, 'little'))
polygon_data.append((25 + (x/2 * self.x_tab),480-int.from_bytes(db, 'little')/255))
x += 2
signals = await self.find_signals(fft_data)
polygon_data.append((25,480))
pygame.gfxdraw.filled_polygon(window, polygon_data,COL_SPECTRUM)
pygame.gfxdraw.aapolygon(window, polygon_data,(197,244,103))
for sig in signals:
text_width, text_height = self.font.size(str(round(sig['freq'],2)))
text = self.font.render(str(round(sig['freq'],2)) + "", True, COL_TEXT)
window.blit(text, (25 + (int(sig['mid']) * self.x_tab) - text_width/2, 480 - int(sig['signal_strength']) - 60))
text_width, text_height = self.font.size(str(int(sig['signal_bw'] * 1000)) + "Ks")
text = self.font.render(str(int(sig['signal_bw'] * 1000)) + "Ks", True, COL_TEXT)
window.blit(text, (25 + (int(sig['mid']) * self.x_tab) - text_width/2, 480 - int(sig['signal_strength']) - 40))
class Socket:
def __init__(self):
self.websocket = None
async def startup(self):
self.websocket = await websockets.connect(FFT_URL)
async def updateFFT(self):
return await self.websocket.recv()
async def main():
window = pygame.display.set_mode((WIDTH, HEIGHT))
pygame.mouse.set_visible(False)
graphics = Graphics(WIDTH, HEIGHT)
socket = Socket()
await socket.startup()
while True:
fft = await socket.updateFFT()
window.fill(COL_BG)
await graphics.update(window, fft)
pygame.display.flip()
if __name__ == "__main__":
pygame.init()
asyncio.run(main())
pygame.quit()