# pyright: basic from PyQt6.QtGui import * from PyQt6.QtWidgets import * from PyQt6.QtCore import * # import multiprocessing # import multiprocessing.managers import time import traceback,sys,os import numpy as np import pyqtgraph as pg # pyright: ignore #import import_txt # Get the current script's directory current_dir = os.path.dirname(os.path.abspath(__file__)) # Get the parent directory by going one level up parent_dir = os.path.dirname(current_dir) # Add the parent directory to sys.path sys.path.append(parent_dir) from BK9132B_driver import BK_9132B from BK_9132B_design import Ui_MainWindow as BK9132B_control_window class WorkerSignals(QObject): ''' Defines the signals available from a running worker thread. Supported signals are: finished: No data error: tuple (exctype, value, traceback.format_exc() ) result: object data returned from processing, anything progress: int indicating % progress ''' finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(list) class Worker(QRunnable): ''' Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function ''' def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() # Add the callback to our kwargs self.kwargs['progress_callback'] = self.signals.progress @pyqtSlot() def run(self): ''' Initialise the runner function with passed args, kwargs. ''' # Retrieve args/kwargs here; and fire processing using them try: result = self.fn(*self.args, **self.kwargs) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype, value, traceback.format_exc())) else: self.signals.result.emit(result) # Return the result of the processing finally: self.signals.finished.emit() # Done def get_float(Qline,default = 0): #gets value from QLineEdit and converts it to float. If text is empty or cannot be converted, it returns "default" which is 0, if not specified try: out = float(Qline.text()) except: out = default return(out) class BK9132B_Controller(QMainWindow, BK9132B_control_window): def setup_plots(self): self.graphWidget.setBackground('w') self.graphWidget.setTitle("Voltage, Current, Power") self.graphWidget.setLabel('left', 'Voltage [V], Current [A], Power [W]') self.graphWidget.setLabel('bottom', 'Time (H)') axis = pg.DateAxisItem() self.graphWidget.setAxisItems({'bottom':axis}) temp = [time.time(),time.time()-1] pen1 = pg.mkPen(color=(255, 0, 0), width=2) pen2 = pg.mkPen(color=(0, 0, 255), width=2) pen3 = pg.mkPen(color=(0, 255, 0), width=2) pen4 = pg.mkPen(color=(255, 255, 0), width=2) pen5 = pg.mkPen(color=(255, 0, 255), width=2) pen6 = pg.mkPen(color=(0, 255, 255), width=2) pen7 = pg.mkPen(color=(255, 127, 127), width=2) pen8 = pg.mkPen(color=(127, 255, 70), width=2) pen9 = pg.mkPen(color=(255, 127, 70), width=2) self.plot_1 = self.graphWidget.plot(temp,[1,0],pen = pen1, name = 'V Ch: 1') self.plot_2 = self.graphWidget.plot(temp,[1,0],pen = pen2, name = 'I Ch: 1') self.plot_3 = self.graphWidget.plot(temp,[1,0],pen = pen3, name = 'P Ch: 1') self.plot_4 = self.graphWidget.plot(temp,[1,0],pen = pen4, name = 'V Ch: 2') self.plot_5 = self.graphWidget.plot(temp,[1,0],pen = pen5, name = 'I Ch: 2') self.plot_6 = self.graphWidget.plot(temp,[1,0],pen = pen6, name = 'P Ch: 2') self.plot_7 = self.graphWidget.plot(temp,[1,0],pen = pen7, name = 'V Ch: 3') self.plot_8 = self.graphWidget.plot(temp,[1,0],pen = pen8, name = 'I Ch: 3') self.plot_9 = self.graphWidget.plot(temp,[1,0],pen = pen9, name = 'P Ch: 3') self.graphWidget.addLegend() def __init__(self, driver: BK_9132B, *args, **kwargs): self.driver = driver # Get the current script's directory self.current_dir = os.path.dirname(os.path.abspath(__file__)) # Get the parent directory by going one level up self.parent_dir = os.path.dirname(current_dir) #establish connection # to global variables # try: #try to connect to global variables # manager = multiprocessing.managers.BaseManager(address=('localhost',5001), authkey=b'') # manager.connect() # manager.register('sync_BK_9132B') # self.sync_BK_9132B = manager.sync_BK_9132B() # except: #open global variables, if no connection can be made (i.e. it is not running). Then connect to it # # subprocess.call(['D:\\Python instrument drivers\\env\\Scripts\\python.exe', 'D:\\Python instrument drivers\\StandAlones\\global_variables.py']) # self.global_vars = QProcess() # self.global_vars.start(self.current_dir+"\\env\\Scripts\\python.exe", [self.current_dir+'\\global_variables.py']) # manager.connect() # manager.register('sync_BK_9132B') # self.sync_BK_9132B = manager.sync_BK_9132B() # print('!!!\nI opened global variables myself. If you close me, global variables will shut down too. Consider starting global variables in own instance for more security\n!!!') #fill in variables, if they are not defined in global variables # self.sync_BK_9132B.update({'setU':[0,0,0], 'setI':[0,0,0], 'OutputOn':[False, False, False], 'U':[0,0,0], 'I':[0,0,0], 'P':[0,0,0]}) #import Gui from QT designer file super(BK9132B_Controller, self).__init__(*args, **kwargs) self.setupUi(self) #setup plot self.setup_plots() #set up pyQT threadpool self.threadpool = QThreadPool() #define signals and slots self.actionSet_default.triggered.connect(self.set_default) self.actionReset_default.triggered.connect(self.read_default) self.button_connect.clicked.connect(self.start_meas) self.line_Nplot.editingFinished.connect(self.set_Npoints) self.line_saveInterval.editingFinished.connect(self.change_timing) self.checkBox_disableplots.stateChanged.connect(self.set_displot) self.checkBox_V_1.stateChanged.connect(self.plot_hide) self.checkBox_I_1.stateChanged.connect(self.plot_hide) self.checkBox_P_1.stateChanged.connect(self.plot_hide) self.checkBox_V_2.stateChanged.connect(self.plot_hide) self.checkBox_I_2.stateChanged.connect(self.plot_hide) self.checkBox_P_2.stateChanged.connect(self.plot_hide) self.checkBox_V_3.stateChanged.connect(self.plot_hide) self.checkBox_I_3.stateChanged.connect(self.plot_hide) self.checkBox_P_3.stateChanged.connect(self.plot_hide) self.V_set_1.editingFinished.connect(self.set_V) self.I_set_1.editingFinished.connect(self.set_current) self.Output_1.stateChanged.connect(self.set_Output) self.V_set_2.editingFinished.connect(self.set_V) self.I_set_2.editingFinished.connect(self.set_current) self.Output_2.stateChanged.connect(self.set_Output) self.V_set_3.editingFinished.connect(self.set_V) self.I_set_3.editingFinished.connect(self.set_current) self.Output_3.stateChanged.connect(self.set_Output) #define constants self.Voltage = np.zeros((1,3)) #store voltage data self.Current = np.zeros((1,3)) #store current data self.Power = np.zeros((1,3)) #store power data self.t = [time.time()] #store timestamps self.Npoints = 200 #number of point to plot self.running = True #true while app is running self.disable_plot = False #constant to disable plot to improve performance. Is changed by checkbox checkBox_disableplots self.timing_save = 5 #save intervall [s] self.set_old = [0,0,1] #variable to save the 'old' set values to compare them to the global variables. It differs from set_new in the first iteration. This ensures that new parameters are send to the device self.set_new = [0,0,0] #variable to save the new set values to compare them to the old ones self.lines_config_float = [self.V_set_1,self.I_set_1,self.V_set_2,self.I_set_2,self.V_set_3,self.I_set_3,self.line_Nplot]#is used for config file self.lines_config_strings = [self.line_devAdr,self.line_filePath,self.line_saveInterval]#is used for config file self.checkboxes_config = [self.Output_1, self.Output_2, self.Output_3,self.checkBox_disableplots,self.checkBox_save, self.checkBox_V_1, self.checkBox_I_1, self.checkBox_P_1, self.checkBox_V_2, self.checkBox_I_2, self.checkBox_P_2, self.checkBox_V_3, self.checkBox_I_3, self.checkBox_P_3,]#is used for config file self.start_meas() #read default values from config and set them in gui self.read_default() # #write values from gui to global variables. self.set_V() self.set_current() self.set_Output() #update save intervall to the gui value self.change_timing() #define and standard threads. worker_save = Worker(self.save) self.threadpool.start(worker_save) def start_meas(self): #Connect to device address = self.line_devAdr.text() self.BK = BK_9132B(address) #start thread for communication with device self.worker = Worker(self.update_all) self.worker.signals.progress.connect(self.update_gui) self.threadpool.start(self.worker) def update_all(self, progress_callback): #get values from device and write them to global variables. Checks if global variables changed from last iteration. Also pass it to upddate_gui with emit(T) while self.running == True: # for i,n in enumerate(['setU', 'setI', 'OutputOn']): #get new set values from global variables and compare to old ones. # self.set_new[i] = self.sync_BK_9132B.get(n) # if self.set_new != self.set_old: #if a button is clicked or global variables are changed the program checks which setting has been changed. # if self.set_new[0] != self.set_old[0]: #if setU is changed new setU parameters are sent to device # for CH in [1,2,3]: #for loop for all 3 channels # self.BK.set_Voltage(CH, self.set_new[0][CH-1]) #CH is element of 1-3 but setU parameters have indices of 0-2 # if self.set_new[1] != self.set_old[1]: #if setI is changed new setI parameters are sent to device # for CH in [1,2,3]: #for loop for all 3 channels # self.BK.set_Current(CH, self.set_new[1][CH-1]) #CH is element of 1-3 but setI parameters have indices of 0-2 # if self.set_new[2] != self.set_old[2]: #if OutputOn is changed new OutputOn parameters are sent to device # for CH in [1,2,3]: #for loop for all 3 channels # self.BK.set_Output(CH, self.set_new[2][CH-1]) #CH is element of 1-3 but OutputOn parameters have indices of 0-2 self.update_setValues(self.set_new) #Change GUI U = self.BK.read_Voltage() #read Voltage of all three channels I = self.BK.read_Current() #read Current of all three channels P = self.BK.read_Power() #read Power of all three channels #Write measurement values into global variables # self.sync_BK_9132B.update({'U':U}) # self.sync_BK_9132B.update({'I':I}) # self.sync_BK_9132B.update({'P':P}) progress_callback.emit([U,I,P]) #Emits list of all three lists U,I,P self.set_old = self.set_new[:] #List needs to be sliced so that only values are taken and not just a pointer is created time.sleep(0.1) #disconnect device when self.running is set to False def update_gui(self, List): #Convert List into original format U = List[0] I = List[1] P = List[2] #set numbers self.V_1.setText(str(U[0])) self.I_1.setText(str(I[0])) self.P_1.setText(str(P[0])) self.V_2.setText(str(U[1])) self.I_2.setText(str(I[1])) self.P_2.setText(str(P[1])) self.V_3.setText(str(U[2])) self.I_3.setText(str(I[2])) self.P_3.setText(str(P[2])) #Create database for plotting self.Voltage = np.vstack([self.Voltage, np.array(U)]) self.Current = np.vstack([self.Current, np.array(I)]) self.Power = np.vstack([self.Power, np.array(P)]) # x = range(len(self.Temperature)) self.t.append(time.time()) #plot if self.disable_plot == False: self.plot_1.setData(self.t[-self.Npoints:],self.Voltage[-self.Npoints:,0]) self.plot_2.setData(self.t[-self.Npoints:],self.Current[-self.Npoints:,0]) self.plot_3.setData(self.t[-self.Npoints:],self.Power[-self.Npoints:,0]) self.plot_4.setData(self.t[-self.Npoints:],self.Voltage[-self.Npoints:,1]) self.plot_5.setData(self.t[-self.Npoints:],self.Current[-self.Npoints:,1]) self.plot_6.setData(self.t[-self.Npoints:],self.Power[-self.Npoints:,1]) self.plot_7.setData(self.t[-self.Npoints:],self.Voltage[-self.Npoints:,2]) self.plot_8.setData(self.t[-self.Npoints:],self.Current[-self.Npoints:,2]) self.plot_9.setData(self.t[-self.Npoints:],self.Power[-self.Npoints:,2]) def update_setValues(self, setV): #sets setvalues obtained from update_all in gui ['setU', 'setI', 'OutputOn'] self.V_set_1.setText(f"{setV[0][0]}") self.I_set_1.setText(f"{setV[1][0]}") self.Output_1.setChecked(setV[2][0]) self.V_set_2.setText(f"{setV[0][1]}") self.I_set_2.setText(f"{setV[1][1]}") self.Output_2.setChecked(setV[2][1]) self.V_set_3.setText(f"{setV[0][2]}") self.I_set_3.setText(f"{setV[1][2]}") self.Output_3.setChecked(setV[2][2]) def set_V(self): # Updates the voltage ##updates the set voltage in global variables. The change will be detected by update_all and it will be passed to the device setU = [get_float(self.V_set_1), get_float(self.V_set_2), get_float(self.V_set_3)] for channel,voltage in enumerate(setU): self.BK.set_Voltage(channel+1, voltage) # todo: fix the indexing mismatch at some point #self.driver.set_Voltage(CH= # self.sync_BK_9132B.update({'setU':setU}) def set_current(self): # Updates the current #updates the set current in global variables. The change will be detected by update_all and it will be passed to the device setI = [get_float(self.I_set_1), get_float(self.I_set_2), get_float(self.I_set_3)] for channel,current in enumerate(setI): self.BK.set_Current(channel+1, current) # todo: fix the indexing mismatch at some point def set_Output(self): # Updates the output (MG: what does output mean? Power, i.e. voltage * current?) #updates the set Output in global variables. The change will be detected by update_all and it will be passed to the device setOutput = [self.Output_1.isChecked(), self.Output_2.isChecked(), self.Output_3.isChecked()] for channel,current in enumerate(setOutput): self.BK.set_Output(channel+1, current) # todo: fix the indexing mismatch at some point def set_Npoints(self): #sets the number of points to plot self.Npoints = int(self.line_Nplot.text()) def set_displot(self): #sets variable to disable plot so checkbox state does not need be read out every iteration self.disable_plot = self.checkBox_disableplots.isChecked() def plot_hide(self): #shows or hides plots according to the checkboxes next to the plot area boxes = [self.checkBox_V_1, self.checkBox_I_1, self.checkBox_P_1, self.checkBox_V_2, self.checkBox_I_2, self.checkBox_P_2, self.checkBox_V_3, self.checkBox_I_3, self.checkBox_P_3] plots = plots = [self.plot_1, self.plot_2, self.plot_3, self.plot_4, self.plot_5, self.plot_6, self.plot_7, self.plot_8, self.plot_9] for b,p in zip(boxes,plots): if b.isChecked() == True: p.show() else: p.hide() def change_timing(self): #updates the timing which is used for "save". If no value it given, it is set to 1 s self.timing_save = get_float(self.line_saveInterval,1) def save(self, progress_callback): #if save checkbox is checked it writes measurement values to file specified in line.filePath. There the full path including file extension must be given. while self.running == True: time.sleep(self.timing_save) #wait is at beginning so first point is not corrupted when app just started. if self.checkBox_save.isChecked() == True: path = self.line_filePath.text() if os.path.isfile(path) == False: with open(path,'a') as file: file.write('date\tV Ch:1[V]\tI Ch:1[A]\tP Ch:1[W]\tV Ch:2[V]\tI Ch:2[A]\tP Ch:2[W]\tV Ch:3[V]\tI Ch:3[A]\tP Ch:3[W]\n') file = open(path,'a') file.write(time.strftime("%Y-%m-%d_%H-%M-%S",time.localtime(self.t[-1]))+'\t') for i in range(3): #Loop for all three channels file.write(f"{self.Voltage[-1,i]}\t") file.write(f"{self.Current[-1,i]}\t") file.write(f"{self.Power[-1,i]}\t") file.write('\n') file.close def set_default(self): #saves current set values to txt file in subdirectory configs. All entries that are saved are defined in self.lines_config #Overwrites old values in config file. current_dir = os.path.dirname(os.path.abspath(__file__)) path = current_dir+'\\configs\\BK9132B_config.txt' #To make shure the config file is at the right place, independent from where the program is started the location of the file is retrieved file = open(path,'w') for l in self.lines_config_float: temp = f"{get_float(l)}" file.write(temp+'\t') for l in self.lines_config_strings: file.write(l.text()+'\t') for c in self.checkboxes_config: file.write(str(c.isChecked())+'\t') file.write('\n') file.close # will fix this later with proper YAML config def read_default(self): #reads default values from config file in subdirectory config and sets the values in gui. Then self.change is set to true so values are send #to device. (If no config file exists, it does nothing.) # current_dir = os.path.dirname(os.path.abspath(__file__)) # path = current_dir+'\\configs\\BK9132B_config.txt' #To make shure the config file is read from the right place, independent from where the program is started the location of the file is retrieved # try: #exit function if config file does not exist # vals = import_txt.read_raw(path) # except: # print('no config file found on') # print(path) return # formats = ['.2f', '.2f', '.2f','.2f','.2f','.0f'] # for l,v,f in zip(self.lines_config_float,vals[0],formats): # v = float(v) #convert string in txt to float, so number can be formatted according to "formats" when it's set # l.setText(format(v,f)) # for l,v in zip(self.lines_config_strings,vals[0][len(self.lines_config_float):]): # l.setText(v) # for c,v in zip(self.checkboxes_config,vals[0][len(self.lines_config_float)+len(self.lines_config_strings):]): # c.setChecked(v == 'True') # self.change = True def closeEvent(self, event): #when window is closed self.running is set to False, so all threads stop self.running = False del(self.BK) time.sleep(1) # MG: again, why waste a second here? event.accept() if __name__ == "__main__": psu_driver = BK_9132B(address='USB0::0x1111::0x2222::0x1234::0::INSTR', useSim=True) app = QApplication(sys.argv) window = BK9132B_Controller(psu_driver) window.show() app.exec()