from PyQt6.QtGui import * from PyQt6.QtWidgets import * from PyQt6.QtCore import * import multiprocessing import multiprocessing.managers import time import traceback, sys, os import numpy as np import pyqtgraph as pg # Get the current script's directory current_dir = os.path.dirname(os.path.abspath(__file__)) # Get the parent directory by going one level up parent_dir = os.path.dirname(current_dir) # Add the parent directory to sys.path sys.path.append(parent_dir) from drivers import Keysight_U2042XA import import_txt from design_files.Keysight_U2042XA_design import Ui_Powermeter class WorkerSignals(QObject): ''' Defines the signals available from a running worker thread. Supported signals are: finished: No data error: tuple (exctype, value, traceback.format_exc() ) result: object data returned from processing, anything progress: int indicating % progress ''' finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(list) class Worker(QRunnable): ''' Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function ''' def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() # Add the callback to our kwargs self.kwargs['progress_callback'] = self.signals.progress @pyqtSlot() def run(self): ''' Initialise the runner function with passed args, kwargs. ''' # Retrieve args/kwargs here; and fire processing using them try: result = self.fn(*self.args, **self.kwargs) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype, value, traceback.format_exc())) else: self.signals.result.emit(result) # Return the result of the processing finally: self.signals.finished.emit() # Done def get_float(Qline,default = 0): #gets value from QLineEdit and converts it to float. If text is empty or cannot be converted, it returns "default" which is 0, if not specified try: out = float(Qline.text()) except: out = default return(out) class MainWindow(QMainWindow, Ui_Powermeter): def __init__(self, *args, **kwargs): # Get the current script's directory self.current_dir = os.path.dirname(os.path.abspath(__file__)) # Get the parent directory by going one level up self.parent_dir = os.path.dirname(current_dir) #establish connection to global variables try: #try to connect to global variables manager = multiprocessing.managers.BaseManager(address=('localhost',5001), authkey=b'') manager.connect() manager.register('sync_K_U2042XA') self.sync_K_U2042XA = manager.sync_K_U2042XA() except: #open global variables, if no connection can be made (i.e. it is not running). Then connect to it self.global_vars = QProcess() self.global_vars.start(self.current_dir+"\\env\\Scripts\\python.exe", [self.current_dir+'\\global_variables.py']) manager = multiprocessing.managers.BaseManager(address=('localhost',5001), authkey=b'') manager.connect() manager.register('sync_K_U2042XA') self.sync_K_U2042XA = manager.sync_K_U2042XA() print('!!!\nI opened global variables myself. If you close me, global variables will shut down too. Consider starting global variables in own instance for more security\n!!!') #fill in variables, if they are not defined in global variables self.sync_K_U2042XA.update({'P':0, 'mode':0, 'DC':0.01, 'Trac_par':[0,0,'',0]}) #P(Power) value from CW or Pulsed measurement, mode = 0=CW,1=Pulse,or 2=Trace, DC (Dutycycle) 0.001-99.999, Trac_par(trace parameters): [duration, trigger delay, trigger level ('' = auto), resolution (0=high,1=med,2=low)] #import Gui from QT designer file super(MainWindow, self).__init__(*args, **kwargs) self.setupUi(self) #setup plot temp = [time.time(),time.time()-1] pen1 = pg.mkPen(color=(255, 0, 0), width=2) w = self.graphWidget_in #number of plots can aesily be scaled up by adding more graphwiddgets to a list in enumerate (for i,w in enumeratre [self._in,self.ref,...]) w.setBackground('w') w.setLabel('left', 'Power [dBm]') w.setLabel('bottom', 'Time') w.showGrid(x = True, y = True, alpha = 0.5) axis = pg.DateAxisItem() w.setAxisItems({'bottom':axis}) w.setTitle('P_input') self.plot_P_in = w.plot(temp,[1,0],pen = pen1, name = 'Power') #set up pyQT threadpool self.threadpool = QThreadPool() #start standard threads worker_save = Worker(self.save) self.threadpool.start(worker_save) #define signals and slots self.actionSet_default.triggered.connect(self.set_default) self.actionReset_default.triggered.connect(self.read_default) self.button_connect.clicked.connect(self.start_meas) self.button_updateTrace.clicked.connect(self.set_Trac_par) self.line_Nplot.editingFinished.connect(self.set_Npoints) self.line_saveInterval.editingFinished.connect(self.change_timing) self.checkBox_disableplots.stateChanged.connect(self.set_displot) self.comboBox_mode.currentIndexChanged.connect(self.set_mode) self.line_Dutycycle.editingFinished.connect(self.set_DC) self.line_ATT.editingFinished.connect(self.set_ATT) #define constants self.Power = np.zeros((1,1)) #store temperature and power data self.t = [time.time()] #store timestamps self.Npoints = 200 #number of point to plot self.running = True #true while app is running self.disable_plot = False #constant to disable plot to improve performance. Is changed by checkbox checkBox_disableplots self.timing_save = 0.5 #save intervall self.Ptrace = [0,1] #dummy variable so programm does not trip when trace is activated self.ATT = 0 #store cable attenuation in following order [in,ref,trans] self.set_old = [0,0] #variable to save the 'old' set values to compare them to the global variables. Since the length is only 2, it differs from set_new in the first iteration. This ensures that new parameters are send to the device self.set_new = [0,0,0] #variable to save the new set values to compare them to the old ones self.lines_config_float = [self.line_freq, self.line_ATT,self.line_Duration, self.line_TrigDelay,self.line_Nplot] self.lines_config_strings = [self.line_TrigLevel, self.line_devAdr,self.line_filePath,self.line_saveInterval] self.checkboxes_config = [self.checkBox_disableplots,self.checkBox_save] #read default values from config file and set them in gui self.read_default() #write gui values values in global variables and set attenuation(local) self.set_mode() self.set_Trac_par() self.set_DC() self.set_ATT() #set save timing from gui self.change_timing() def start_meas(self): #Connect to devices. If last argument in init is 1 calibration is performed, if it is 0 no calibration is performed if self.checkBox_calibrate.isChecked() == True: cal = 1 else: cal = 0 address = self.line_devAdr.text() freq = get_float(self.line_freq) self.PM = Keysight_U2042XA.KeysightU2042XA(address, freq,cal) #start thread for communication with device worker = Worker(self.update_P) worker.signals.progress.connect(self.update_gui) self.threadpool.start(worker) #turn off connect button self.button_connect.setEnabled(False) def update_P(self, progress_callback): #gets powermeasurements, either CW or trace while self.running == True: #update as long as program is running for i,n in enumerate(['mode', 'DC', 'Trac_par']): #get new set values from global variables and compare to old ones. self.set_new[i] = self.sync_K_U2042XA.get(n) if self.set_new != self.set_old: #update device settings of device, if something changed if self.set_new[0] == 0: #CW Measurement self.PM.activate_CW() self.Power = np.delete(self.Power, np.s_[-1::], 0) #delete last row from self.Power since it contains -999 self.t.pop(-1) #delete corresponding time as well elif self.set_new[0] == 2: #trace measurement self.PM.activate_trace() if self.set_new[2][2] == '': self.PM.set_trigger_level() #set trigger level to auto else: self.PM.set_trigger_level(self.set_new[2][2]) #set trigger level to value self.PM.set_trigger_delay(float(self.line_TrigDelay.text())) #set trigger delay self.PM.set_trace_time(float(self.line_Duration.text())) #set trace time if self.set_new[2][3] == 0: #set trace resolution according to measurement mode from combobox_mode res = 'HRES' elif self.set_new[2][3] == 1: res = 'MRES' else: res = 'LRES' self.update_gui_setValues(self.set_new) #update set values in gui from global variables if self.set_new[0] == 0 or self.set_new[0] == 1: #If mode is CW or pulse, just read current value and emit signal with value P = self.PM.read() self.sync_K_U2042XA.update({'P':P}) #Pass current value to global variables progress_callback.emit([P]) #emit signal expects list, therefore, [] time.sleep(0.1) else: #if mode is trace, read the trace # time.sleep(0.5) self.Ptrace = self.PM.read_trace(res) #get trace data self.sync_K_U2042XA.update({'P':[-999]}) #set CW measurement to dummy -999 so Main program can handle it progress_callback.emit([-999]) #emit 0 so function update_gui is called and CW power is set to zero self.set_old = self.set_new.copy() #List needs to be copied so that only values are taken and not just a pointer is created del(self.PM) #disconnect device when self.running is set to False def update_gui(self,P): #sets CW number in corresponding label and updates plot. Plot is different when trace is plotted. #set numbers self.line_Power_in.setText(f"{P[0]+self.ATT:.3f}") if self.set_new[0] == 0 or self.set_new[0] == 1: #If mode is CW or pulse update P vs t plot #Create database for plotting self.Power = np.vstack([self.Power, np.array(P)+self.ATT]) self.t.append(time.time()) #plot if self.disable_plot == False: self.plot_P_in.setData(self.t[-self.Npoints:],self.Power[-self.Npoints:,0]) else: # if mode is trac, plot the trace if self.disable_plot == False: N = len(self.Ptrace) dur = float(self.line_Duration.text()) dx = dur/N x = [i*dx for i in range(0, N)] self.Ptrace = [x+self.ATT for x in self.Ptrace] # add attenuation to trace self.plot_P_in.setData(x,self.Ptrace) def update_gui_setValues(self,setV): #sets gui set_values to the values in global variables which are passed from update_P via setV #Trac_par(trace parameters): [duration, trigger delay, trigger level ('' = auto), resolution (0=high,1=med,2=low)] self.comboBox_mode.setCurrentIndex(setV[0]) self.line_Dutycycle.setText(str(setV[1])) self.line_Duration.setText(str(setV[2][0])) self.line_TrigDelay.setText(str(setV[2][1])) self.line_TrigLevel.setText(str(setV[2][2])) self.comboBox_res.setCurrentIndex(setV[2][3]) def set_Trac_par(self): #updates trace parameters in global variables. The parameters are explaine in init where they are "filled in" (around line 110) dur = get_float(self.line_Duration) delay = get_float(self.line_TrigDelay) res = self.comboBox_res.currentIndex() if self.line_TrigLevel == '': lev = '' else: lev = get_float(self.line_TrigLevel) self.sync_K_U2042XA.update({'Trac_par':[dur,delay,lev,res]}) def set_DC(self): #updates duty cycle in global variables. The parameters are explaine in init where they are "filled in" (around line 110) self.sync_K_U2042XA.update({'DC':get_float(self.line_Dutycycle)}) def set_mode(self): #updates measurement mode in global variables. The parameters are explaine in init where they are "filled in" (around line 110) self.sync_K_U2042XA.update({'mode':self.comboBox_mode.currentIndex()}) def set_ATT(self): #sets attenuation to current values self.ATT = get_float(self.line_ATT) def change_timing(self): #updates the timing which is used for "save". If no value it given, it is set to 1 s self.timing_save = get_float(self.line_saveInterval,1) def save(self, progress_callback): #if save checkbox is checked it writes measurement values to file specified in line.filePath. There the full path including file extension must be given. while self.running == True: if self.checkBox_save.isChecked() == True: path = self.line_filePath.text() if os.path.isfile(path) == False: with open(path,'a') as file: file.write('date\tPower[dBm]\n') file = open(path,'a') file.write(time.strftime("%Y-%m-%d_%H-%M-%S",time.localtime(self.t[-1]))+'\t') file.write(f"{self.Power[-1][0]}\n") file.close time.sleep(self.timing_save) def set_Npoints(self): #sets the number of points to plot self.Npoints = int(self.line_Nplot.text()) def set_displot(self): #sets variable to disable plot so checkbox state does not need be read out every iteration self.disable_plot = self.checkBox_disableplots.isChecked() def set_default(self): #saves current set values to txt file in subdirectory configs. All entries that are saved are defined in self.lines_config #Additionally resolution is saved. Overwrites old values in config file. current_dir = os.path.dirname(os.path.abspath(__file__)) path = current_dir+'\\configs\\Keysight_U2042XA_config.txt' #To make shure the config file is at the right place, independent from where the program is started the location of the file is retrieved file = open(path,'w') for l in self.lines_config_float: temp = f"{get_float(l)}" file.write(temp+'\t') for l in self.lines_config_strings: file.write(l.text()+'\t') for c in self.checkboxes_config: file.write(str(c.isChecked())+'\t') file.write(str(self.comboBox_res.currentIndex())) file.write('\n') file.close def read_default(self): #reads default values from config file in subdirectory config and sets the values in gui. Then self.change is set to true so values are send #to device. (If no config file exists, it does nothing.) current_dir = os.path.dirname(os.path.abspath(__file__)) path = current_dir+'\\configs\\Keysight_U2042XA_config.txt' #To make shure the config file is read from the right place, independent from where the program is started the location of the file is retrieved try: #exit function if config file does not exist vals = import_txt.read_raw(path) except: return formats = ['.3e', '.2f', '.2e','.2e','.0f'] for l,v,f in zip(self.lines_config_float,vals[0],formats): v = float(v) #convert string in txt to float, so number can be formatted according to "formats" when it's set l.setText(format(v,f)) for l,v in zip(self.lines_config_strings,vals[0][len(self.lines_config_float):]): l.setText(v) for c,v in zip(self.checkboxes_config,vals[0][len(self.lines_config_float)+len(self.lines_config_strings):]): c.setChecked(v == 'True') self.comboBox_res.setCurrentIndex(int(vals[0][-1])) self.change = True def closeEvent(self,event): #when window is closed self.running is set to False, so all threads stop self.running = False time.sleep(1) #make sure all thread can finish event.accept() app = QApplication(sys.argv) window = MainWindow() window.show() app.exec()