from PyQt6.QtGui import * from PyQt6.QtWidgets import * from PyQt6.QtCore import * import multiprocessing import multiprocessing.managers import time import traceback,sys,os import numpy as np import pyqtgraph as pg from scripts import import_txt # Get the current script's directory current_dir = os.path.dirname(os.path.abspath(__file__)) # Get the parent directory by going one level up parent_dir = os.path.dirname(current_dir) # Add the parent directory to sys.path sys.path.append(parent_dir) from design_files.B_Field_Compensation_design import Ui_MainWindow class WorkerSignals(QObject): ''' Defines the signals available from a running worker thread. Supported signals are: finished: No data error: tuple (exctype, value, traceback.format_exc() ) result: object data returned from processing, anything progress: int indicating % progress ''' finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(list) class Worker(QRunnable): ''' Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function ''' def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() # Add the callback to our kwargs self.kwargs['progress_callback'] = self.signals.progress @pyqtSlot() def run(self): ''' Initialise the runner function with passed args, kwargs. ''' # Retrieve args/kwargs here; and fire processing using them try: result = self.fn(*self.args, **self.kwargs) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype, value, traceback.format_exc())) else: self.signals.result.emit(result) # Return the result of the processing finally: self.signals.finished.emit() # Done def get_float(Qline,default = 0): #gets value from QLineEdit and converts it to float. If text is empty or cannot be converted, it returns "default" which is 0, if not specified try: out = float(Qline.text()) except: out = default return(out) def update_single_entry(dict,name,ind,val): #updates a single value in global vars. To do so it gets the current value of "dict('name')", replaces "val" at indec "ind" and sends this back data = dict.get(f"{name}") #get data data[ind] = val #replace entry dict.update({f"{name}":data}) #send data back class MainWindow(QMainWindow, Ui_MainWindow): def __init__(self, *args, **kwargs): # Get the current script's directory self.current_dir = os.path.dirname(os.path.abspath(__file__)) # Get the parent directory by going one level up self.parent_dir = os.path.dirname(current_dir) #establish connection to global variables try: #try to connect to global variables self.manager = multiprocessing.managers.BaseManager(address=('localhost',5001), authkey=b'') self.manager.connect() self.manager.register('syncdict') self.manager.register('sync_imc') self.manager.register('sync_BK_9174B') self.manager.register('sync_BK_9174B_2') self.manager.register('sync_BK_9131B') self.syncdict = self.manager.syncdict() self.sync_imc = self.manager.sync_imc() self.sync_BK_9174B = self.manager.sync_BK_9174B() self.sync_BK_9174B_2 = self.manager.sync_BK_9174B_2() self.sync_BK_9131B = self.manager.sync_BK_9131B() except: #open global variables, if no connection can be made (i.e. it is not running). Then connect to it # subprocess.call(['D:\\Python instrument drivers\\env\\Scripts\\python.exe', 'D:\\Python instrument drivers\\StandAlones\\global_variables.py']) self.global_vars = QProcess() self.global_vars.start(self.current_dir+"\\.venv\\Scripts\\python.exe", [self.current_dir+'\\global_variables_TF.py']) self.manager.connect() self.manager.register('syncdict') self.manager.register('sync_imc') self.manager.register('sync_BK_9174B') self.manager.register('sync_BK_9174B_2') self.manager.register('sync_BK_9131B') self.syncdict = self.manager.syncdict() self.sync_imc = self.manager.sync_imc() self.sync_BK_9174B = self.manager.sync_BK_9174B() self.sync_BK_9174B_2 = self.manager.sync_BK_9174B_2() self.sync_BK_9131B = self.manager.sync_BK_9131B() print('!!!\nI opened global variables myself. If you close me, global variables will shut down too. Consider starting global variables in own instance for more security\n!!!') #fill in variables, if they are not defined in global variables self.syncdict.update({'B_set':[0,0,0], 'Start_Compensation':False, 'Status_Compensation': 0, 'Temperature_Caution': False}) # Status_Compensation: 0: No B-field compensation performed # 1: Compensation is running # 2: Compensation finished and succeeded # 3: Compensation failed #import Gui from QT designer file super(MainWindow, self).__init__(*args, **kwargs) self.setupUi(self) #setup plot self.graphWidget_x.setBackground('w') self.graphWidget_x.setTitle("Compensation on x-axis") self.graphWidget_x.setLabel('left', 'B_x (µT)') self.graphWidget_x.setLabel('bottom', '#iterations') self.graphWidget_y.setBackground('w') self.graphWidget_y.setTitle("Compensation on y-axis") self.graphWidget_y.setLabel('left', 'B_y (µT)') self.graphWidget_y.setLabel('bottom', '#iterations') self.graphWidget_z.setBackground('w') self.graphWidget_z.setTitle("Compensation on z-axis") self.graphWidget_z.setLabel('left', 'B_z (µT)') self.graphWidget_z.setLabel('bottom', '#iterations') pen1 = pg.mkPen(color=(255, 0, 0), width=2) pen2 = pg.mkPen(color=(0, 0, 255), width=2) self.plot_x_1 = self.graphWidget_x.plot([0,1],[1,0],pen = pen1, name = 'B_x_set') self.plot_x_2 = self.graphWidget_x.plot([0,1],[1,0],pen = pen2, name = 'B_x') self.graphWidget_x.addLegend() self.plot_y_1 = self.graphWidget_y.plot([0,1],[1,0],pen = pen1, name = 'B_y_set') self.plot_y_2 = self.graphWidget_y.plot([0,1],[1,0],pen = pen2, name = 'B_y') self.graphWidget_y.addLegend() self.plot_z_1 = self.graphWidget_z.plot([0,1],[1,0],pen = pen1, name = 'B_z_set') self.plot_z_2 = self.graphWidget_z.plot([0,1],[1,0],pen = pen2, name = 'B_z') self.graphWidget_z.addLegend() #set up pyQT threadpool self.threadpool = QThreadPool() #define and standard threads. # worker_save = Worker(self.save) # self.threadpool.start(worker_save) #define signals and slots self.actionSet_default.triggered.connect(self.set_default) self.actionReset_default.triggered.connect(self.read_default) self.line_B_x_set.editingFinished.connect(self.set_B_set) self.line_B_y_set.editingFinished.connect(self.set_B_set) self.line_B_z_set.editingFinished.connect(self.set_B_set) self.Button_Set_Field.clicked.connect(self.set_start_compensation) self.checkBox_y2_coil.stateChanged.connect(self.set_y2_coil_Enabled) self.comboBox_Temp_Sensor.currentIndexChanged.connect(self.set_temp_sensor) self.line_Max_Temp.editingFinished.connect(self.set_max_temperature) self.checkBox_Turn_Off_Coils.stateChanged.connect(self.set_Turn_Off_Coils) #define constants self.running = True self.B_set = [0,0,0] #list containing x,y,z set values self.Start_Compensation = False # boolean, which has to be set true in order to start compensation self.Coil_Constant = [0,0,0] #Coil constant in muT/A self.set_old = [0,0,1] #variable to save the 'old' set values to compare them to the global variables. It differs from set_new in the first iteration. This ensures that new parameters are send to the device self.set_new = [0,0,0] #variable to save the new set values to compare them to the old ones self.lines_config_float = [self.line_B_x_set,self.line_B_y_set, self.line_B_z_set, self.line_Max_Difference, self.line_Max_iterations, self.line_B_equil_time, self.line_HHC_param_x, self.line_HHC_param_y, self.line_HHC_param_y2, self.line_HHC_param_z, self.line_Max_Temp]#is used for config file self.lines_config_strings = []#is used for config file self.checkboxes_config = [self.checkBox_y2_coil, self.checkBox_Turn_Off_Coils]#is used for config file self.combobox_config = [self.comboBox_Temp_Sensor]#is used for config file self.y2_coil_Enabled = False # Boolean if second y-coil pair should be used self.temp_sensor = 0 # Selected temperature sensor Channel self.max_temperature = 320 # Maximum temperature, at which coils should be turned off self.Turn_Off_Coils = False #Boolean if Coils should be turned off for temperatures above self.max_temperature self.LS336_never_connected_before = True #read default values from config and set them in gui self.read_default() #write values from gui to global variables. self.set_B_set() self.set_max_temperature() # Start standard thread self.worker = Worker(self.update_all) self.threadpool.start(self.worker) # Start temperature monitoring thread worker_temperature = Worker(self.temperature_thread) self.threadpool.start(worker_temperature) def update_all(self, progress_callback): #Checks if global variables changed from last iteration. while self.running == True: for i,n in enumerate(['B_set', 'Start_Compensation']): #get new set values from global variables and compare to old ones. self.set_new[i] = self.syncdict.get(n) if self.set_new != self.set_old: #if a button is clicked or global variables are changed the program checks which setting has been changed. if self.set_new[0] != self.set_old[0]: #if B_set is changed new B_set parameters are saved locally self.B_set = self.set_new[0] if self.set_new[1] != self.set_old[1]: #if Start_Compensation is changed new Start_Compensation is saved locally self.Start_Compensation = self.set_new[1] self.update_setValues(self.set_new) #Change GUI text lines self.set_old = self.set_new[:] #List needs to be sliced so that only values are taken and not just a pointer is created if self.Start_Compensation == True: #Start Field Compensation # Update compensation status in GUI self.line_iteration_step.setText('Starting compensation.') self.line_iteration_step.setStyleSheet("background-color: white") # Update global variable self.syncdict.update({'Status_Compensation':1}) i = 0 #iteration number I = [0,0,0] #HHC Current # Pull coil constants from GUI lines self.Coil_Constant = [get_float(self.line_HHC_param_x), get_float(self.line_HHC_param_y), get_float(self.line_HHC_param_z)] #set HHC current to 0A self.sync_BK_9174B.update({'setI':[0, 0]}) if self.checkBox_y2_coil.isChecked() == True: self.sync_BK_9174B_2.update({'setI':[0, 0]}) else: update_single_entry(self.sync_BK_9174B_2, "setI", 0, 0) #set max. voltage of HHC to 70V (in order to get constant current mode) self.sync_BK_9174B.update({'setU':[70, 70]}) if self.checkBox_y2_coil.isChecked() == True: self.sync_BK_9174B_2.update({'setU':[70, 70]}) else: update_single_entry(self.sync_BK_9174B_2, "setU", 0, 70) #turn on power supplies for HHC self.sync_BK_9174B.update({'OutputOn':[True, True]}) if self.checkBox_y2_coil.isChecked() == True: self.sync_BK_9174B_2.update({'OutputOn':[True, True]}) else: update_single_entry(self.sync_BK_9174B_2, "OutputOn", 0, True) #Turn off x,y,z-coil relais self.sync_BK_9131B.update({'OutputOn':[False, False, False]}) #Set current and voltage of x,y,z-coil relais self.sync_BK_9131B.update({'setI':[1,1,1]}) self.sync_BK_9131B.update({'setU':[5,5,5]}) time.sleep(1) B_measured = self.sync_imc.get('FG') # Pull fluxgate data from global variables for x in [0,1,2]: B_measured[x] = B_measured[x]*100 #Conversion to muT # Create array with all measured B-values for plotting B_array = np.empty(shape=(0,3)) B_array = np.append(B_array,[B_measured],axis=0) # Save first measurement point already taken # Uptdate plots self.plot_x_1.setData([0,1],[self.B_set[0],self.B_set[0]]) self.plot_x_2.setData(np.arange(1),B_array[:,0]) self.plot_y_1.setData([0,1],[self.B_set[1],self.B_set[1]]) self.plot_y_2.setData(np.arange(1),B_array[:,1]) self.plot_z_1.setData([0,1],[self.B_set[2],self.B_set[2]]) self.plot_z_2.setData(np.arange(1),B_array[:,2]) # Calculate the difference between the set B-field and the measured values B_Delta_x = (self.B_set[0] - B_measured[0]) B_Delta_y = (self.B_set[1] - B_measured[1]) B_Delta_z = (self.B_set[2] - B_measured[2]) # Create lists containing old and new coil-flip status FlipStatus_new = [False, False, False] FlipStatus_old = [False, False, False] # Iterations are performed until the maximum number of iterations is reached or the difference is smaller than the given one while i <= get_float(self.line_Max_iterations) and (np.abs(B_Delta_x) > get_float(self.line_Max_Difference) or np.abs(B_Delta_y) > get_float(self.line_Max_Difference) or np.abs(B_Delta_z) > get_float(self.line_Max_Difference)): for k in [0,1,2]: # iterate on all three axes I[k] = I[k] + (self.B_set[k] - B_measured[k])*(1/self.Coil_Constant[k]) # calculate new current set value if I[k] < 0: # if current is negative, change the new flip status FlipStatus_new[k] = True else: FlipStatus_new[k] = False if self.checkBox_y2_coil.isChecked() == True and I[k] > 3: I[k] = 3 if self.checkBox_y2_coil.isChecked() == False and I[k] >1.5: I[k] = 1.5 print(I) print(FlipStatus_new) print(f"old{FlipStatus_old}") if FlipStatus_new != FlipStatus_old: # if one of the flip status has changed compared to the old status, switch on the relais for flipping the current direction self.sync_BK_9131B.update({'OutputOn':FlipStatus_new}) print("Switch") time.sleep(0.5) # set old flip status to the new one FlipStatus_old = FlipStatus_new.copy() for k in [0,1,2]: # iterate on all three axes and set the current of the power supplies if k==0: # in case of x-direction update_single_entry(self.sync_BK_9174B, "setI", k, np.abs(I[k])) print(self.y2_coil_Enabled) print(I[1]) if k==1: # in case of y-direction if self.y2_coil_Enabled == True: I_set = np.abs(I[k])/2 if I_set > 1.5: I_set = 1.5 update_single_entry(self.sync_BK_9174B, "setI", 1, I_set) update_single_entry(self.sync_BK_9174B_2, "setI", 1, I_set) print("Klappt") else: #Using just one power supply is enough I_set = np.abs(I[k]) if I_set > 1.5: I_set = 1.5 update_single_entry(self.sync_BK_9174B, "setI", 1, I_set) if k==2: # in case of z-direction use BK_9174B_2 update_single_entry(self.sync_BK_9174B_2, "setI", 0, np.abs(I[k])) # else: # in case of x- or y-direction use BK_9174B # update_single_entry(self.sync_BK_9174B, "setI", k, np.abs(I[k])) time.sleep(get_float(self.line_B_equil_time)) # wait for a given time period B_measured = self.sync_imc.get('FG') # Pull fluxgate data from global variables for x in [0,1,2]: B_measured[x] = B_measured[x]*100 #Conversion to muT # Calculate the difference between the set B-field and the measured values B_Delta_x = (self.B_set[0] - B_measured[0]) B_Delta_y = (self.B_set[1] - B_measured[1]) B_Delta_z = (self.B_set[2] - B_measured[2]) B_array = np.append(B_array,[B_measured],axis=0) # append measured field for plotting # plot B-fields of all three axes self.plot_x_1.setData([0,i+2],[self.B_set[0],self.B_set[0]]) self.plot_x_2.setData(np.arange(i+2),B_array[:,0]) self.plot_y_1.setData([0,i+2],[self.B_set[1],self.B_set[1]]) self.plot_y_2.setData(np.arange(i+2),B_array[:,1]) self.plot_z_1.setData([0,i+2],[self.B_set[2],self.B_set[2]]) self.plot_z_2.setData(np.arange(i+2),B_array[:,2]) # if one B_set value is zero, calculating the percentual deviation is not possible if self.B_set[0] != 0: B_dev_x = 100*(self.B_set[0] - B_measured[0])/self.B_set[0] self.line_B_x_dev.setText(str(np.round(B_dev_x,4))) else: self.line_B_x_dev.setText('/') if self.B_set[1] != 0: B_dev_y = 100*(self.B_set[1] - B_measured[1])/self.B_set[1] self.line_B_y_dev.setText(str(np.round(B_dev_y,4))) else: self.line_B_y_dev.setText('/') if self.B_set[2] != 0: B_dev_z = 100*(self.B_set[2] - B_measured[2])/self.B_set[2] self.line_B_z_dev.setText(str(np.round(B_dev_z,4))) else: self.line_B_z_dev.setText('/') self.line_iteration_step.setText(str(i)) i=i+1 self.syncdict.update({'Start_Compensation':False}) if i <= get_float(self.line_Max_iterations): self.line_iteration_step.setText('Compensation finished!') self.line_iteration_step.setStyleSheet("background-color: lightgreen") self.syncdict.update({'Status_Compensation':2}) else: self.line_iteration_step.setText('Compensation failed!') self.line_iteration_step.setStyleSheet("background-color: red") self.syncdict.update({'Status_Compensation':3}) print("Closed") def temperature_thread(self,progress_callback): Caution_Message_Written = False while self.running == True: if self.temp_sensor > 0: #Read temperature if sensor is selected if self.LS336_never_connected_before == True: #If never connected to LS336 before: Connect to LS336: print("Try to connect to LS336") try: self.manager.register('sync_LS_336') self.sync_LS_336 = self.manager.sync_LS_336() LS336_connected = True except: print("Connecting to LS336 failed!") LS336_connected = False time.sleep(1) self.LS336_never_connected_before = False if LS336_connected == True: T = self.sync_LS_336.get('T')[int(self.temp_sensor - 1)] else: T = 0 self.line_coil_temperature.setText(f"{np.round(T,2)}") if self.Turn_Off_Coils == True and T > self.max_temperature: if Caution_Message_Written == False: print("Caution: Coil temperature above maximum!") self.syncdict.update({'Temperature_Caution': True}) # Set temperature warning in global params self.line_coil_temperature.setStyleSheet("background-color: red") Caution_Message_Written = True #turn off power supplies self.sync_BK_9174B.update({'OutputOn':[False, False]}) self.sync_BK_9174B_2.update({'OutputOn':[False, False]}) if T <= self.max_temperature and Caution_Message_Written == True: self.line_coil_temperature.setStyleSheet("background-color: white") self.syncdict.update({'Temperature_Caution': False}) # Set temperature warning in global params to False Caution_Message_Written = False else: LS336_never_connected_before = False def update_setValues(self,values): #sets setvalues obtained from update_all in gui self.line_B_x_set.setText(f"{values[0][0]}") self.line_B_y_set.setText(f"{values[0][1]}") self.line_B_z_set.setText(f"{values[0][2]}") def set_B_set(self): #updates the B_set values in global variables. The change will be detected by update_all B_set = [get_float(self.line_B_x_set), get_float(self.line_B_y_set), get_float(self.line_B_z_set)] self.syncdict.update({'B_set':B_set}) def set_start_compensation(self): self.syncdict.update({'Start_Compensation':True}) def set_y2_coil_Enabled(self): if self.y2_coil_Enabled == False: self.y2_coil_Enabled = True else: self.y2_coil_Enabled = False #Turn off BK_9174B_2 CH2 update_single_entry(self.sync_BK_9174B_2, "setI", 1, 0) update_single_entry(self.sync_BK_9174B_2, "OutputOn", 1, False) def set_temp_sensor(self, value): self.temp_sensor = value #0: No Sensor, 1:A, 2:B, 3:C, 4:D def set_max_temperature(self): self.max_temperature = get_float(self.line_Max_Temp) def set_Turn_Off_Coils(self): self.Turn_Off_Coils = self.checkBox_Turn_Off_Coils.isChecked() def save(self, progress_callback): #if save checkbox is checked it writes measurement values to file specified in line.filePath. There the full path including file extension must be given. while self.running == True: time.sleep(self.timing_save) #wait is at beginning so first point is not corrupted when app just started. if self.checkBox_save.isChecked() == True: path = self.line_filePath.text() if os.path.isfile(path) == False: with open(path,'a') as file: file.write('date\tV Ch:1[V]\tI Ch:1[A]\tP Ch:1[W]\tV Ch:2[V]\tI Ch:2[A]\tP Ch:2[W]\tV Ch:3[V]\tI Ch:3[A]\tP Ch:3[W]\n') file = open(path,'a') file.write(time.strftime("%Y-%m-%d_%H-%M-%S",time.localtime(self.t[-1]))+'\t') for i in [0,1,2]: #Loop for all three channels file.write(f"{self.Voltage[-1,i]}\t") file.write(f"{self.Current[-1,i]}\t") file.write(f"{self.Power[-1,i]}\t") file.write('\n') file.close def set_default(self): #saves current set values to txt file in subdirectory configs. All entries that are saved are defined in self.lines_config #Overwrites old values in config file. current_dir = os.path.dirname(os.path.abspath(__file__)) path = current_dir+'\\configs\\B_Field_Compensation_config.txt' #To make shure the config file is at the right place, independent from where the program is started the location of the file is retrieved file = open(path,'w') for l in self.lines_config_float: temp = f"{get_float(l)}" file.write(temp+'\t') for l in self.lines_config_strings: file.write(l.text()+'\t') for c in self.checkboxes_config: file.write(str(c.isChecked())+'\t') for c in self.combobox_config: file.write(str(c.currentIndex())+'\t') file.write('\n') file.close def read_default(self): #reads default values from config file in subdirectory config and sets the values in gui. Then self.change is set to true so values are send #to device. (If no config file exists, it does nothing.) current_dir = os.path.dirname(os.path.abspath(__file__)) path = current_dir+'\\configs\\B_Field_Compensation_config.txt' #To make shure the config file is read from the right place, independent from where the program is started the location of the file is retrieved try: #exit function if config file does not exist vals = import_txt.read_raw(path) except: print('no config file found on') print(path) return formats = ['.2f', '.2f', '.2f','.2f', '.2f','.0f','.2f', '.2f' , '.2f' , '.2f', '.2f'] for l,v,f in zip(self.lines_config_float,vals[0],formats): v = float(v) #convert string in txt to float, so number can be formatted according to "formats" when it's set l.setText(format(v,f)) for l,v in zip(self.lines_config_strings,vals[0][len(self.lines_config_float):]): l.setText(v) for c,v in zip(self.checkboxes_config,vals[0][len(self.lines_config_float)+len(self.lines_config_strings):]): c.setChecked(v == 'True') for c,v in zip(self.combobox_config,vals[0][len(self.lines_config_float)+len(self.lines_config_strings)+len(self.checkboxes_config):]): c.setCurrentIndex(int(v)) self.change = True def closeEvent(self,event): #when window is closed self.running is set to False, so all threads stop self.running = False time.sleep(1) event.accept() app = QApplication(sys.argv) window = MainWindow() window.show() app.exec()