craft-software/Legacy/TF_Control/AMR_calibration.py
2025-07-04 15:52:40 +02:00

377 lines
18 KiB
Python

from PyQt6.QtGui import *
from PyQt6.QtWidgets import *
from PyQt6.QtCore import *
import multiprocessing
import multiprocessing.managers
import time
from timeit import default_timer as timer
from datetime import datetime
import csv
import random
import traceback, sys, os
import numpy as np
from scipy.optimize import curve_fit
from design_files.AMR_calibration_design import Ui_MainWindow
# Get the current script's directory
current_dir = os.path.dirname(os.path.abspath(__file__))
# Get the parent directory by going one level up
parent_dir = os.path.dirname(current_dir)
# Add the parent directory to sys.path
sys.path.append(parent_dir)
from scripts import import_txt
class WorkerSignals(QObject):
'''
Defines the signals available from a running worker thread.
Supported signals are:
finished: No data
error: tuple (exctype, value, traceback.format_exc() )
result: object data returned from processing, anything
progress: int indicating % progress
'''
finished = pyqtSignal()
error = pyqtSignal(tuple)
result = pyqtSignal(object)
progress = pyqtSignal(list)
class Worker(QRunnable):
'''
Worker thread
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread. Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
'''
def __init__(self, fn, *args, **kwargs):
super(Worker, self).__init__()
# Store constructor arguments (re-used for processing)
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
# Add the callback to our kwargs
self.kwargs['progress_callback'] = self.signals.progress
@pyqtSlot()
def run(self):
'''
Initialise the runner function with passed args, kwargs.
'''
# Retrieve args/kwargs here; and fire processing using them
try:
result = self.fn(*self.args, **self.kwargs)
except:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(result) # Return the result of the processing
finally:
self.signals.finished.emit() # Done
def get_float(Qline,default = 0): #gets value from QLineEdit and converts it to float. If text is empty or cannot be converted, it returns "default" which is 0, if not specified
try:
out = float(Qline.text())
except:
out = default
return(out)
def update_single_entry(dict,name,ind,val): #updates a single value in global vars. To do so it gets the current value of "dict('name')", replaces "val" at indec "ind" and sends this back
data = dict.get(f"{name}") #get data
data[ind] = val #replace entry
dict.update({f"{name}":data}) #send data back
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self, *args, **kwargs):
# Get the current script's directory
self.current_dir = os.path.dirname(os.path.abspath(__file__))
# Get the parent directory by going one level up
self.parent_dir = os.path.dirname(current_dir)
#establish connection to global variables}
try: #try to connect to global variables
manager = multiprocessing.managers.BaseManager(address=('localhost',5001), authkey=b'')
manager.connect()
manager.register('sync_BK_9131B')
manager.register('sync_BK_9174B')
manager.register('sync_BK_9174B_2')
manager.register('sync_imc')
manager.register('sync_converted')
self.sync_BK_9131B = manager.sync_BK_9131B()
self.sync_BK_9174B = manager.sync_BK_9174B()
self.sync_BK_9174B_2 = manager.sync_BK_9174B_2()
self.sync_imc = manager.sync_imc()
self.sync_converted = manager.sync_converted()
except: #open global variables, if no connection can be made (i.e. it is not running). Then connect to it
# subprocess.call(['D:\\Python instrument drivers\\env\\Scripts\\python.exe', 'D:\\Python instrument drivers\\StandAlones\\global_variables.py'])
self.global_vars = QProcess()
self.global_vars.start(self.current_dir+"\\.venv\\Scripts\\python.exe", [self.current_dir+'\\global_variables_TF.py'])
manager.connect()
manager.register('sync_BK_9131B')
manager.register('sync_BK_9174B')
manager.register('sync_BK_9174B_2')
manager.register('sync_imc')
manager.register('sync_converted')
self.sync_BK_9131B = manager.sync_BK_9131B()
self.sync_BK_9174B = manager.sync_BK_9174B()
self.sync_BK_9174B_2 = manager.sync_BK_9174B_2()
self.sync_imc = manager.sync_imc()
self.sync_converted = manager.sync_converted()
print('!!!\nI opened global variables myself. If you close me, global variables will shut down too. Consider starting global variables in own instance for more security\n!!!')
#fill in variables, in case they are not defined in global variables
#import Gui from QT designer file
super(MainWindow, self).__init__(*args, **kwargs)
self.setupUi(self)
#setup plot
#set up pyQT threadpool
self.threadpool = QThreadPool()
#start standard threads
worker = Worker(self.convert_Data)
self.threadpool.start(worker)
#define signals and slots
self.action_save_default.triggered.connect(self.save_default)
self.action_load_default.triggered.connect(self.load_default)
self.dSB_timing.valueChanged.connect(self.set_timing)
self.comboBox.currentIndexChanged.connect(self.select_output)
self.Button_calibrate.clicked.connect(self.start_calibration)
self.Button_load.clicked.connect(self.load_params)
#define constants
self.running = True #True as long as programm is running. Is set to false in close event
self.convert = False #true while when ComboBox is set to Flux density. If it is True, voltage data is converted to flux density.
self.timing = 0.1 #wait time in function "update_Data".
self.SB_all = [self.sB_nPoints, self.dSB_settling_time, self.dSB_timing] #list of all Spinboxes, helps for saving and loading of default values
self.lines_all = [self.line_savePath,self.line_loadPath] #list of all lines, helps for saving and loading of default values
#read default values from config and set values in gui
self.load_default()
#print programm name
print('AMR calibration')
def calibrate(self, progress_callback): #sets "number of points" random fields and records the FG and AMR response. With this data a 3 dimensional fit is performed
#define constants and functions
# I_all = [[1,0,0],[0,1,0],[0,0,1],[1,1,0],[1,0,1],[0,1,1],[1,1,1],[random.random(),random.random(),random.random()],[random.random(),random.random(),random.random()]]#list of all combinations for coil currents that are used
I_all = [[random.random()*0.2,random.random()*0.2,random.random()*0.2] for i in range(self.sB_nPoints.value())] #enable this for random fields
FG = [] #store fluxgate data for each field
V_all = [] #store output voltage of AMR sensors for each field. They are orders [15 times X_sensors, 15 times Y, 15 times Z]
def f(B,a,b,c,d): #function for fitting. (Dissertation Felix p.49 eq.3.2)
Bx,By,Bz = B
return a*Bx + b*By + c*Bz + d
#get data for fitting
##this part is for preset fields
# for n in range(2): #take all points twice, with and without relais activated
# #set field
# if n == 0:
# self.sync_BK_9131B.update({'OutputOn':[False,False,False]})
# i=0
# else:
# self.sync_BK_9131B.update({'SetU':[5,5,5]})
# self.sync_BK_9131B.update({'SetI':[1,1,1]})
# self.sync_BK_9131B.update({'OutputOn':[True,True,True]})
# i = len(I_all)-1
##if you don't want to use preset fields, comment the lines above and delete the tab infront of the following for loop
for i,I in enumerate(I_all):
if i == int(len(I_all)/2): #turn on relais after half of the points
self.sync_BK_9131B.update({'setU':[5,5,5]})
self.sync_BK_9131B.update({'setI':[1,1,1]})
self.sync_BK_9131B.update({'OutputOn':[True,True,True]})
self.sync_BK_9174B.update({'setI':I[0:2]})
update_single_entry(self.sync_BK_9174B_2,'setI',0,I[2])
self.sync_BK_9174B.update({'OutputOn':[True,True]})
update_single_entry(self.sync_BK_9174B_2,'OutputOn',0,True)
time.sleep(self.dSB_settling_time.value())
#get data
FG.append([i*100 for i in self.sync_imc.get('FG')]) #multiply times 100 to convert V to µT
V_all.append(self.sync_imc.get('AMR_x') + self.sync_imc.get('AMR_y') + self.sync_imc.get('AMR_z'))
#update "current point" label
self.lab_currentpoint.setText(f"{i}")
self.sync_BK_9131B.update({'OutputOn':[False,False,False]})#turn off relais
#reset power supplies to zero
I = [0 for n in range(3)]
self.sync_BK_9174B.update({'setI':[0,0]})
update_single_entry(self.sync_BK_9174B_2,'setI',0,0)
self.sync_BK_9174B.update({'OutputOn':[False,False]})
update_single_entry(self.sync_BK_9174B_2,'OutputOn',0,False)
self.sync_BK_9131B.update({'OutputOn':[False,False,False]})
#perform fit
FGn= np.array(FG).transpose() #convert fluxgate data to numpy array so curve_fit can use it
self.params = [] #store fit parameters
for n in range(45):
V = np.array([sub[n] for sub in V_all]) #get voltage data of one sensor at all fields, and convert them to numpy array so curve_fit can use it
self.params.append(curve_fit(f,FGn,V))
#update "current point" label to 0 to indicate that the fit is complete
self.lab_currentpoint.setText('0')
#store parameters in file
self.save_params()
def convert_Data(self, progress_callback):#is constantly running. If self.convert == True the voltage data is converted to B
Bx = [0 for i in range(15)] #store converted AMR data
By = [0 for i in range(15)]
Bz = [0 for i in range(15)]
while self.running == True:
if self.convert == True:
start = timer()
#get current voltage data from imc
Vx = self.sync_imc.get('AMR_x')
Vy = self.sync_imc.get('AMR_y')
Vz = self.sync_imc.get('AMR_z')
#solve system of linear equation according to Felix' diss p.49 eq. 3.3.
for i,V in enumerate(zip(Vx,Vy,Vz)):
V = np.array(V) #convert tuple from zip into np.array
V0 = np.array([self.params[i][0][3],self.params[i+15][0][3],self.params[i+30][0][3]]) #get the offset voltages of all sensors in group number i
S = np.array([self.params[i][0][0:3],self.params[i+15][0][0:3],self.params[i+30][0][0:3]]) #assemble the sensitivity matrix of group number i
try:
B = np.linalg.solve(S,V-V0) #solve the linear equation
except:
B = [0,0,0]
print(i)
Bx[i] = B[0]
By[i] = B[1]
Bz[i] = B[2]
#write converted data in sync_converted
self.sync_converted.update({'AMR_x':Bx})
self.sync_converted.update({'AMR_y':By})
self.sync_converted.update({'AMR_z':Bz})
end = timer()
# print(end-start)
try:
time.sleep(self.timing-(end-start))
except:
print(end-start)
else:
#just pass imc data to sync_converted which is accessed by "Main.py" for plotting the AMR data
Vx = self.sync_imc.get('AMR_x')
Vy = self.sync_imc.get('AMR_y')
Vz = self.sync_imc.get('AMR_z')
self.sync_converted.update({'AMR_x':Vx})
self.sync_converted.update({'AMR_y':Vy})
self.sync_converted.update({'AMR_z':Vz})
time.sleep(self.timing)
def start_calibration(self): #is called when "perform calibration" is clicked. Starts a new thread with self.calibrate().
worker_cal = Worker(self.calibrate)
self.threadpool.start(worker_cal)
def select_output(self): #is triggered when between "voltage" and "flux density" is switched. Sets self.convert to the corresponding value. In case "flux density" is chosen, it start a threat von "self.convert_data()"
if self.comboBox.currentText() == 'Voltage':
self.convert = False
else:
self.convert = True
def set_timing(self,t): #is triggered when timing is changed. Sets variable self.timing to the new value
self.timing = t
def save_params(self): #saves self.params (the fit parameters) to file. Is called at the end of self.calibrate
extension = "\\" + time.strftime("%Y-%m-%d_%H-%M-%S",time.gmtime(time.time()))+".txt"
path = self.line_savePath.text() + extension
path_short = path[:-4] + "_short.txt"
print(path)
#create list of only the parameters, without erros
params_short = []
for L in self.params:
params_short.append(L[0])
try:
file = open(path,'w')
for i in self.params:
file.write(f"{i}\t")
file_short = open(path_short,'w')
for i in params_short:
file_short.write(f"{i}\t")
file.close
file_short.close()
except:
print('Invalid path')
def load_params(self):#loads self.params (the fit parameters) from file and sets self.params to the new values. Is called when load button is pushed
path = self.line_loadPath.text()
try: #exit function if file does not exist
rows = []
with open(path, 'r') as file:
csvreader = csv.reader(file,delimiter = '\t')
for row in csvreader:
rows.append(row)
#recreate arrays
coeffs = []
for r in rows[0][0:45]: #last entry is '' at index 45
r_short = r[2:-1] #get rid of "[ "at the beginning, and "]" at the end
r_split = r_short.split(' ') #split at every space
coeff_list = [x for x in r_split if x != ''] #get rid of empty entries that can occur if a double space was written in the text file
coeff_list = [float(x) for x in coeff_list] #convert str to float
coeff_array = np.array(coeff_list)
coeffs.append([coeff_array,0]) #append a list with a dummy zero, just so the data structure is as it was when self.params is created by the programm
self.params = coeffs
print(self.params)
except:
print('could not load data')
return
def save_default(self):
#saves current set values to txt file in subdirectory configs. Saves values from al spin boxes and text lines.
#Overwrites old values in config file.
path = self.current_dir+'\\configs\\AMR_config.txt' #To make shure the config file is at the right place, independent from where the program is started the location of the file is retrieved
file = open(path,'w')
for SB in self.SB_all:
temp = f"{SB.value()}"
file.write(temp+'\t')
for l in self.lines_all:
file.write(l.text()+'\t')
file.write('\n')
file.close
def load_default(self):
#reads default values from config file in subdirectory config and sets the values in gui. (If no config file exists, it does nothing.)
path = self.current_dir+'\\configs\\AMR_config.txt' #To make shure the config file is read from the right place, independent from where the program is started the location of the file is retrieved
try: #exit function if config file does not exist
vals = import_txt.read_raw(path)
for l,v in zip(self.lines_all,vals[0][-len(self.lines_all):]):
l.setText(v)
except:
return
for SB,v in zip(self.SB_all,vals[0]):
if type(SB) == QDoubleSpinBox:
v = float(v) #convert string in txt to float, so number can be set in dSB
else:
v = int(v) #convert string in txt to int, so number can be set in SB
SB.setValue(v)
for l,v in zip(self.lines_all,vals[0][-len(self.lines_all):]):
l.setText(v)
def closeEvent(self,event): #when window is closed self.running and self.monitor are set to False, so all threads stop
self.running = False
time.sleep(1)
event.accept()
app = QApplication(sys.argv)
window = MainWindow()
window.show()
app.exec()