#!/usr/bin/env python3 # -*- coding: UTF-8 -*- # (c) Xavier 2022 # pour débugger… en ligne de commande !-) # import pdb; pdb.set_trace() # DATV # for Python3 from tkinter import * import tkinter.messagebox import tkinter.filedialog import tkinter as tk import tkinter.ttk as ttk import os import time import json import shutil import vlc import sys import usb.core import usb.util from PIL import Image, ImageTk from ffpyplayer.player import MediaPlayer import threading import subprocess import socket import cv2 try: import configparser as configparser except ImportError: import ConfigParser as configparser # Var fileconfig = "config.json" fileflux = "flux.datv" DEVS = ['/dev/ttyUSB0', '/dev/ttyUSB1', '/dev/ttyUSB2'] F1 = ['1', '2', '3', '4', '5', '6', '7', '8', '9'] F2 = ['0', '250', '500', '750'] SR = ['66', '125', '250', '333', '500', '1000', '1500'] class DATV(tk.Frame): def __init__(self, root): super().__init__(root) self.champs = { 'IP': tk.StringVar(), 'PORT': tk.StringVar(), 'DEV': tk.StringVar(), 'FREQ': tk.StringVar(), 'F1': tk.StringVar(), 'F2': tk.StringVar(), 'SR': tk.StringVar(), } self.root = root self._create_bar() self.pack() self.RunVIDEO() def lireparam(self): with open(fileconfig) as mon_fichier: config = json.load(mon_fichier) return config def ecrireparam(self): self.StopStream() jsonString = '{' cpt = 0 for v, k in self.champs.items(): jsonString += '"' + v + '": "' + k.get() + '"' if cpt < 6: jsonString += ",\n" cpt = cpt + 1 jsonString += '}' file = open(fileconfig, "w") file.write(jsonString) file.close() self.Clean() self.RunVIDEO() @staticmethod def firstparam(fileconfig): jsonString = '{"IP": "127.0.0.1", "PORT": "1234", "DEV": "/dev/ttyUSB0", "FREQ": "10491500", "SR": "1500"}' file = open(fileconfig, "w") file.write(jsonString) file.close() def StopStream(self): #self.Clean() #self.photo = PhotoImage(file='img/btnjaune.gif') #espace_image = Canvas(self, width=120, height=120, bg='red') #espace_image.grid(row=3, columnspan=2, column=0, padx=10, pady=10) #espace_image.create_image(60, 60, image=self.photo) os.system('bash longmynd/stopsrv.sh') def RunStream(self): self.Clean() dev = usb.core.find(idVendor=0x0403, idProduct=0x6010) if dev is not None: data = self.lireparam() freq = data['FREQ'] + data['F1'] + data['F2'] print('IP=' + data['IP'] + ':' + data['PORT'] + ' freq=' + freq + ' SR=' + data['SR']) os.system('bash ./longmynd/gosrv.sh ' + data['IP'] + ' ' + data['PORT'] + ' ' + freq + ' ' + data['SR'] + '> flux.datv') time.sleep(2) return else: self.photo = PhotoImage(file='img/btnrouge.gif') espace_image = Canvas(self, width=120, height=120, bg='red') espace_image.grid(row=3, columnspan=2, column=0, padx=10, pady=10) espace_image.create_image(60, 60, image=self.photo) return def RunVIDEO(self): #Stream self.RunStream() # Créer un Canvas pour afficher la vidéo self.Canvas_video = tk.Canvas(self, bg='black', width=800, height=600) self.Canvas_video.grid(row=0, column=0, padx=1, pady=1) #self.photo = PhotoImage(file='nosignal.png') #self.Canvas_video.create_image(450, 300, image=self.photo) # Afficher la vidéo UDP self.RunVLC() def RunVLC(self): # Créer une instance VLC pour la vidéo data = self.lireparam() # Assurez-vous de gérer les erreurs de création de la fenêtre try: self.Instance = vlc.Instance('--no-xlib') self.player = self.Instance.media_player_new() m = self.Instance.media_new(str('udp://@'+data['IP']+':'+data['PORT'])) # Path, unicode self.player.set_media(m) # set the window id where to render VLC's video output h = self.Canvas_video.winfo_id() # .winfo_visualid()? self.player.set_xwindow(h) # fails on Windows self.player.play() # == -1 except Exception as e: print(f"Erreur de création de la sortie vidéo: {e}") tk.messagebox.showerror("Erreur VLC", f"Erreur de création de la sortie vidéo: {e}") def TestBeacon(self): self.Clean self.StopStream() dev = usb.core.find(idVendor=0x0403, idProduct=0x6010) if dev is not None: self.photo = PhotoImage(file='img/btnvert.gif') espace_image = Canvas(self, width=120, height=120, bg='green') espace_image.grid(row=3, columnspan=2, column=0, padx=10, pady=10) espace_image.create_image(60, 60, image=self.photo) data = self.lireparam() freq = "10491500" symb = "1500" os.system('bash ./longmynd/gosrv.sh ' + data['IP'] + ' ' + data['PORT'] + ' ' + freq + ' ' + symb + '> flux.datv') time.sleep(5) self.RunVIDEO() return else: self.photo = PhotoImage(file='img/btnrouge.gif') espace_image = Canvas(self, width=120, height=120, bg='red') espace_image.grid(row=3, columnspan=2, column=0, padx=10, pady=10) espace_image.create_image(60, 60, image=self.photo) return def Testconnexion(self): self.Clean() connected = False dev = usb.core.find(idVendor=0x0403, idProduct=0x6010) if dev is not None: connected = True if connected: self.photo = PhotoImage(file='img/btnvert.gif') espace_image = Canvas(self, width=120, height=120, bg='green') espace_image.grid(row=3, columnspan=2, column=0, padx=10, pady=10) espace_image.create_image(60, 60, image=self.photo) else: self.photo = PhotoImage(file='img/btnrouge.gif') espace_image = Canvas(self, width=120, height=120, bg='red') espace_image.grid(row=3, columnspan=2, column=0, padx=10, pady=10) espace_image.create_image(60, 60, image=self.photo) def Apropos(self): tk.messagebox.showinfo("A propos", "Reception DATV via Linux...\n Dev F4IYT Xavier\n") def Version(self): tk.messagebox.showinfo("Version", "DATVLin \n(C) 2023 F4IYT Xavier\n2.00") def Clean(self): for c in self.winfo_children(): c.destroy() self._create_bar() def _create_bar(self): logo = PhotoImage(file='img/datv_ico.png') menubar = Menu(self) self.icon = PhotoImage(file='./img/datv_ico.png') menubar.add_cascade(label="Command", image=self.icon) menufichier = Menu(menubar, tearoff=0) menufichier.add_command(label="Ouvrir memoire", command=app.quit) menufichier.add_command(label="Enregistrer memoire", command=app.quit) menufichier.add_separator() menufichier.add_command(label="Config", command=self._create_config) menufichier.add_separator() menufichier.add_command(label="Nettoyer Frame", command=self.Clean) menufichier.add_command(label="Quitter", command=app.quit) menubar.add_cascade(label="Fichier", menu=menufichier) menudatv = Menu(menubar, tearoff=0) menudatv.add_command(label="Test Connection", command=self.Testconnexion) menudatv.add_command(label="Test Beacon", command=self.TestBeacon) menudatv.add_command(label="Run Srv Stream", command=self.RunStream) menudatv.add_command(label="Stop Srv Stream", command=self.StopStream) menudatv.add_command(label="Config", command=self._create_config) menudatv.add_separator() menudatv.add_command(label="Run Video Stream", command=self.RunVIDEO) menudatv.add_command(label="Status DATV Rx", command=run_troisieme_program) menudatv.add_separator() menubar.add_cascade(label="DATV", menu=menudatv) menuaide = Menu(menubar, tearoff=0) menuaide.add_command(label="A propos", command=self.Apropos) menuaide.add_command(label="Version", command=self.Version) menubar.add_cascade(label="Aide", menu=menuaide) app.config(menu=menubar) def _create_config(self): self.Clean() self.champs = { 'IP': tk.StringVar(), 'PORT': tk.StringVar(), 'DEV': tk.StringVar(), 'FREQ': tk.StringVar(), 'F1': tk.StringVar(), 'F2': tk.StringVar(), 'SR': tk.StringVar(), } data = self.lireparam() print(data) Label0 = Label(self, text='CONFIG SRV STREAM: ') Label0.grid(column=0, row=0, sticky='w', pady=2) Label2 = Label(self, text='IP : ') Label2.grid(column=0, row=2, sticky='w') Champ2 = Entry(self, textvariable=self.champs['IP'], width=16) Champ2.focus_set() Champ2.grid(column=1, row=2, sticky='sw', columnspan=3, padx=10) Champ2.insert(0, data['IP']) Label3 = Label(self, text='PORT : ') Label3.grid(column=9, row=2, sticky='w') Champ3 = Entry(self, textvariable=self.champs['PORT'], width=7) Champ3.focus_set() Champ3.grid(column=10, row=2, sticky='sw', columnspan=3, padx=10) Champ3.insert(0, data['PORT']) Label4 = Label(self, text='Dev : ') Label4.grid(column=0, row=3, sticky='w') combo = ttk.Combobox(self, values=DEVS, textvariable=self.champs['DEV'], width=12) combo['state'] = 'readonly' combo.set(data['DEV']) combo.grid(column=1, row=3, columnspan=3) Label6 = Label(self, text='Fréquence : ') Label6.grid(column=0, row=4, sticky='w') Champ6 = Entry(self, textvariable=self.champs['FREQ'], width=4) Champ6.focus_set() Champ6.grid(column=1, row=4, sticky='sw', columnspan=3, padx=10) Champ6.insert(0, data['FREQ']) combo5 = ttk.Combobox(self, values=F1, textvariable=self.champs['F1'], width=2) combo5['state'] = 'readonly' combo5.set(data['F1']) combo5.grid(column=3, row=4) combo6 = ttk.Combobox(self, values=F2, textvariable=self.champs['F2'], width=3) combo6['state'] = 'readonly' combo6.set(data['F2']) combo6.grid(column=4, row=4) Label7 = Label(self, text='MHz') Label7.grid(column=6, row=4, sticky='w') Label4 = Label(self, text='Symbol : ') Label4.grid(column=9, row=4, sticky='w') combo4 = ttk.Combobox(self, values=SR, textvariable=self.champs['SR'], width=7) combo4['state'] = 'readonly' combo4.set(data['SR']) combo4.grid(column=10, row=4, columnspan=2) button = tk.Button(self, text="Valider", command=self.ecrireparam) button.grid(column=0, row=5) button = tk.Button(self, text="Fermer", command=self.Clean) button.grid(column=2, row=5) # Fonction exécutant le second programme def run_second_program(): # Commande pour exécuter le second programme, remplacez-la par la commande appropriée command = ["python3", "waterfall.py"] proc = subprocess.run(command) # Fonction exécutant le troisieme programme def run_troisieme_program(): # Commande pour exécuter le second programme, remplacez-la par la commande appropriée title = "DATV Status" command = ["gnome-terminal", "--title", title, "--", "./status", ""] proc = subprocess.run(command) if __name__ == '__main__': if not os.path.exists(fileconfig): firstparam(fileconfig) print("START INIT") app = Tk() app.title("DATV Srv STREAM") app.geometry("1024x800") DATV(app) # Création du thread pour exécuter le second programme thread = threading.Thread(target=run_second_program) # Démarrage du thread thread.start() app.mainloop() os.system('bash longmynd/stopsrv.sh') #kill(proc.pid)