from PyQt6.QtGui import * from PyQt6.QtWidgets import * from PyQt6.QtCore import * import multiprocessing import multiprocessing.managers import time from timeit import default_timer as timer from datetime import datetime import csv import random import traceback, sys, os import numpy as np from scipy.optimize import curve_fit from design_files.AMR_calibration_design import Ui_MainWindow # Get the current script's directory current_dir = os.path.dirname(os.path.abspath(__file__)) # Get the parent directory by going one level up parent_dir = os.path.dirname(current_dir) # Add the parent directory to sys.path sys.path.append(parent_dir) from scripts import import_txt class WorkerSignals(QObject): ''' Defines the signals available from a running worker thread. Supported signals are: finished: No data error: tuple (exctype, value, traceback.format_exc() ) result: object data returned from processing, anything progress: int indicating % progress ''' finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(list) class Worker(QRunnable): ''' Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function ''' def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() # Add the callback to our kwargs self.kwargs['progress_callback'] = self.signals.progress @pyqtSlot() def run(self): ''' Initialise the runner function with passed args, kwargs. ''' # Retrieve args/kwargs here; and fire processing using them try: result = self.fn(*self.args, **self.kwargs) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype, value, traceback.format_exc())) else: self.signals.result.emit(result) # Return the result of the processing finally: self.signals.finished.emit() # Done def get_float(Qline,default = 0): #gets value from QLineEdit and converts it to float. If text is empty or cannot be converted, it returns "default" which is 0, if not specified try: out = float(Qline.text()) except: out = default return(out) def update_single_entry(dict,name,ind,val): #updates a single value in global vars. To do so it gets the current value of "dict('name')", replaces "val" at indec "ind" and sends this back data = dict.get(f"{name}") #get data data[ind] = val #replace entry dict.update({f"{name}":data}) #send data back class MainWindow(QMainWindow, Ui_MainWindow): def __init__(self, *args, **kwargs): # Get the current script's directory self.current_dir = os.path.dirname(os.path.abspath(__file__)) # Get the parent directory by going one level up self.parent_dir = os.path.dirname(current_dir) #establish connection to global variables} try: #try to connect to global variables manager = multiprocessing.managers.BaseManager(address=('localhost',5001), authkey=b'') manager.connect() manager.register('sync_BK_9131B') manager.register('sync_BK_9174B') manager.register('sync_BK_9174B_2') manager.register('sync_imc') manager.register('sync_converted') self.sync_BK_9131B = manager.sync_BK_9131B() self.sync_BK_9174B = manager.sync_BK_9174B() self.sync_BK_9174B_2 = manager.sync_BK_9174B_2() self.sync_imc = manager.sync_imc() self.sync_converted = manager.sync_converted() except: #open global variables, if no connection can be made (i.e. it is not running). Then connect to it # subprocess.call(['D:\\Python instrument drivers\\env\\Scripts\\python.exe', 'D:\\Python instrument drivers\\StandAlones\\global_variables.py']) self.global_vars = QProcess() self.global_vars.start(self.current_dir+"\\.venv\\Scripts\\python.exe", [self.current_dir+'\\global_variables_TF.py']) manager.connect() manager.register('sync_BK_9131B') manager.register('sync_BK_9174B') manager.register('sync_BK_9174B_2') manager.register('sync_imc') manager.register('sync_converted') self.sync_BK_9131B = manager.sync_BK_9131B() self.sync_BK_9174B = manager.sync_BK_9174B() self.sync_BK_9174B_2 = manager.sync_BK_9174B_2() self.sync_imc = manager.sync_imc() self.sync_converted = manager.sync_converted() print('!!!\nI opened global variables myself. If you close me, global variables will shut down too. Consider starting global variables in own instance for more security\n!!!') #fill in variables, in case they are not defined in global variables #import Gui from QT designer file super(MainWindow, self).__init__(*args, **kwargs) self.setupUi(self) #setup plot #set up pyQT threadpool self.threadpool = QThreadPool() #start standard threads worker = Worker(self.convert_Data) self.threadpool.start(worker) #define signals and slots self.action_save_default.triggered.connect(self.save_default) self.action_load_default.triggered.connect(self.load_default) self.dSB_timing.valueChanged.connect(self.set_timing) self.comboBox.currentIndexChanged.connect(self.select_output) self.Button_calibrate.clicked.connect(self.start_calibration) self.Button_load.clicked.connect(self.load_params) #define constants self.running = True #True as long as programm is running. Is set to false in close event self.convert = False #true while when ComboBox is set to Flux density. If it is True, voltage data is converted to flux density. self.timing = 0.1 #wait time in function "update_Data". self.SB_all = [self.sB_nPoints, self.dSB_settling_time, self.dSB_timing] #list of all Spinboxes, helps for saving and loading of default values self.lines_all = [self.line_savePath,self.line_loadPath] #list of all lines, helps for saving and loading of default values #read default values from config and set values in gui self.load_default() #print programm name print('AMR calibration') def calibrate(self, progress_callback): #sets "number of points" random fields and records the FG and AMR response. With this data a 3 dimensional fit is performed #define constants and functions # I_all = [[1,0,0],[0,1,0],[0,0,1],[1,1,0],[1,0,1],[0,1,1],[1,1,1],[random.random(),random.random(),random.random()],[random.random(),random.random(),random.random()]]#list of all combinations for coil currents that are used I_all = [[random.random()*0.2,random.random()*0.2,random.random()*0.2] for i in range(self.sB_nPoints.value())] #enable this for random fields FG = [] #store fluxgate data for each field V_all = [] #store output voltage of AMR sensors for each field. They are orders [15 times X_sensors, 15 times Y, 15 times Z] def f(B,a,b,c,d): #function for fitting. (Dissertation Felix p.49 eq.3.2) Bx,By,Bz = B return a*Bx + b*By + c*Bz + d #get data for fitting ##this part is for preset fields # for n in range(2): #take all points twice, with and without relais activated # #set field # if n == 0: # self.sync_BK_9131B.update({'OutputOn':[False,False,False]}) # i=0 # else: # self.sync_BK_9131B.update({'SetU':[5,5,5]}) # self.sync_BK_9131B.update({'SetI':[1,1,1]}) # self.sync_BK_9131B.update({'OutputOn':[True,True,True]}) # i = len(I_all)-1 ##if you don't want to use preset fields, comment the lines above and delete the tab infront of the following for loop for i,I in enumerate(I_all): if i == int(len(I_all)/2): #turn on relais after half of the points self.sync_BK_9131B.update({'setU':[5,5,5]}) self.sync_BK_9131B.update({'setI':[1,1,1]}) self.sync_BK_9131B.update({'OutputOn':[True,True,True]}) self.sync_BK_9174B.update({'setI':I[0:2]}) update_single_entry(self.sync_BK_9174B_2,'setI',0,I[2]) self.sync_BK_9174B.update({'OutputOn':[True,True]}) update_single_entry(self.sync_BK_9174B_2,'OutputOn',0,True) time.sleep(self.dSB_settling_time.value()) #get data FG.append([i*100 for i in self.sync_imc.get('FG')]) #multiply times 100 to convert V to µT V_all.append(self.sync_imc.get('AMR_x') + self.sync_imc.get('AMR_y') + self.sync_imc.get('AMR_z')) #update "current point" label self.lab_currentpoint.setText(f"{i}") self.sync_BK_9131B.update({'OutputOn':[False,False,False]})#turn off relais #reset power supplies to zero I = [0 for n in range(3)] self.sync_BK_9174B.update({'setI':[0,0]}) update_single_entry(self.sync_BK_9174B_2,'setI',0,0) self.sync_BK_9174B.update({'OutputOn':[False,False]}) update_single_entry(self.sync_BK_9174B_2,'OutputOn',0,False) self.sync_BK_9131B.update({'OutputOn':[False,False,False]}) #perform fit FGn= np.array(FG).transpose() #convert fluxgate data to numpy array so curve_fit can use it self.params = [] #store fit parameters for n in range(45): V = np.array([sub[n] for sub in V_all]) #get voltage data of one sensor at all fields, and convert them to numpy array so curve_fit can use it self.params.append(curve_fit(f,FGn,V)) #update "current point" label to 0 to indicate that the fit is complete self.lab_currentpoint.setText('0') #store parameters in file self.save_params() def convert_Data(self, progress_callback):#is constantly running. If self.convert == True the voltage data is converted to B Bx = [0 for i in range(15)] #store converted AMR data By = [0 for i in range(15)] Bz = [0 for i in range(15)] while self.running == True: if self.convert == True: start = timer() #get current voltage data from imc Vx = self.sync_imc.get('AMR_x') Vy = self.sync_imc.get('AMR_y') Vz = self.sync_imc.get('AMR_z') #solve system of linear equation according to Felix' diss p.49 eq. 3.3. for i,V in enumerate(zip(Vx,Vy,Vz)): V = np.array(V) #convert tuple from zip into np.array V0 = np.array([self.params[i][0][3],self.params[i+15][0][3],self.params[i+30][0][3]]) #get the offset voltages of all sensors in group number i S = np.array([self.params[i][0][0:3],self.params[i+15][0][0:3],self.params[i+30][0][0:3]]) #assemble the sensitivity matrix of group number i try: B = np.linalg.solve(S,V-V0) #solve the linear equation except: B = [0,0,0] print(i) Bx[i] = B[0] By[i] = B[1] Bz[i] = B[2] #write converted data in sync_converted self.sync_converted.update({'AMR_x':Bx}) self.sync_converted.update({'AMR_y':By}) self.sync_converted.update({'AMR_z':Bz}) end = timer() # print(end-start) try: time.sleep(self.timing-(end-start)) except: print(end-start) else: #just pass imc data to sync_converted which is accessed by "Main.py" for plotting the AMR data Vx = self.sync_imc.get('AMR_x') Vy = self.sync_imc.get('AMR_y') Vz = self.sync_imc.get('AMR_z') self.sync_converted.update({'AMR_x':Vx}) self.sync_converted.update({'AMR_y':Vy}) self.sync_converted.update({'AMR_z':Vz}) time.sleep(self.timing) def start_calibration(self): #is called when "perform calibration" is clicked. Starts a new thread with self.calibrate(). worker_cal = Worker(self.calibrate) self.threadpool.start(worker_cal) def select_output(self): #is triggered when between "voltage" and "flux density" is switched. Sets self.convert to the corresponding value. In case "flux density" is chosen, it start a threat von "self.convert_data()" if self.comboBox.currentText() == 'Voltage': self.convert = False else: self.convert = True def set_timing(self,t): #is triggered when timing is changed. Sets variable self.timing to the new value self.timing = t def save_params(self): #saves self.params (the fit parameters) to file. Is called at the end of self.calibrate extension = "\\" + time.strftime("%Y-%m-%d_%H-%M-%S",time.gmtime(time.time()))+".txt" path = self.line_savePath.text() + extension path_short = path[:-4] + "_short.txt" print(path) #create list of only the parameters, without erros params_short = [] for L in self.params: params_short.append(L[0]) try: file = open(path,'w') for i in self.params: file.write(f"{i}\t") file_short = open(path_short,'w') for i in params_short: file_short.write(f"{i}\t") file.close file_short.close() except: print('Invalid path') def load_params(self):#loads self.params (the fit parameters) from file and sets self.params to the new values. Is called when load button is pushed path = self.line_loadPath.text() try: #exit function if file does not exist rows = [] with open(path, 'r') as file: csvreader = csv.reader(file,delimiter = '\t') for row in csvreader: rows.append(row) #recreate arrays coeffs = [] for r in rows[0][0:45]: #last entry is '' at index 45 r_short = r[2:-1] #get rid of "[ "at the beginning, and "]" at the end r_split = r_short.split(' ') #split at every space coeff_list = [x for x in r_split if x != ''] #get rid of empty entries that can occur if a double space was written in the text file coeff_list = [float(x) for x in coeff_list] #convert str to float coeff_array = np.array(coeff_list) coeffs.append([coeff_array,0]) #append a list with a dummy zero, just so the data structure is as it was when self.params is created by the programm self.params = coeffs print(self.params) except: print('could not load data') return def save_default(self): #saves current set values to txt file in subdirectory configs. Saves values from al spin boxes and text lines. #Overwrites old values in config file. path = self.current_dir+'\\configs\\AMR_config.txt' #To make shure the config file is at the right place, independent from where the program is started the location of the file is retrieved file = open(path,'w') for SB in self.SB_all: temp = f"{SB.value()}" file.write(temp+'\t') for l in self.lines_all: file.write(l.text()+'\t') file.write('\n') file.close def load_default(self): #reads default values from config file in subdirectory config and sets the values in gui. (If no config file exists, it does nothing.) path = self.current_dir+'\\configs\\AMR_config.txt' #To make shure the config file is read from the right place, independent from where the program is started the location of the file is retrieved try: #exit function if config file does not exist vals = import_txt.read_raw(path) for l,v in zip(self.lines_all,vals[0][-len(self.lines_all):]): l.setText(v) except: return for SB,v in zip(self.SB_all,vals[0]): if type(SB) == QDoubleSpinBox: v = float(v) #convert string in txt to float, so number can be set in dSB else: v = int(v) #convert string in txt to int, so number can be set in SB SB.setValue(v) for l,v in zip(self.lines_all,vals[0][-len(self.lines_all):]): l.setText(v) def closeEvent(self,event): #when window is closed self.running and self.monitor are set to False, so all threads stop self.running = False time.sleep(1) event.accept() app = QApplication(sys.argv) window = MainWindow() window.show() app.exec()