craft-software/Legacy/TF_Control/B_Field_Compensation_control.py
2025-07-04 15:52:40 +02:00

599 lines
29 KiB
Python

from PyQt6.QtGui import *
from PyQt6.QtWidgets import *
from PyQt6.QtCore import *
import multiprocessing
import multiprocessing.managers
import time
import traceback,sys,os
import numpy as np
import pyqtgraph as pg
from scripts import import_txt
# Get the current script's directory
current_dir = os.path.dirname(os.path.abspath(__file__))
# Get the parent directory by going one level up
parent_dir = os.path.dirname(current_dir)
# Add the parent directory to sys.path
sys.path.append(parent_dir)
from design_files.B_Field_Compensation_design import Ui_MainWindow
class WorkerSignals(QObject):
'''
Defines the signals available from a running worker thread.
Supported signals are:
finished: No data
error: tuple (exctype, value, traceback.format_exc() )
result: object data returned from processing, anything
progress: int indicating % progress
'''
finished = pyqtSignal()
error = pyqtSignal(tuple)
result = pyqtSignal(object)
progress = pyqtSignal(list)
class Worker(QRunnable):
'''
Worker thread
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread. Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
'''
def __init__(self, fn, *args, **kwargs):
super(Worker, self).__init__()
# Store constructor arguments (re-used for processing)
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
# Add the callback to our kwargs
self.kwargs['progress_callback'] = self.signals.progress
@pyqtSlot()
def run(self):
'''
Initialise the runner function with passed args, kwargs.
'''
# Retrieve args/kwargs here; and fire processing using them
try:
result = self.fn(*self.args, **self.kwargs)
except:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(result) # Return the result of the processing
finally:
self.signals.finished.emit() # Done
def get_float(Qline,default = 0): #gets value from QLineEdit and converts it to float. If text is empty or cannot be converted, it returns "default" which is 0, if not specified
try:
out = float(Qline.text())
except:
out = default
return(out)
def update_single_entry(dict,name,ind,val): #updates a single value in global vars. To do so it gets the current value of "dict('name')", replaces "val" at indec "ind" and sends this back
data = dict.get(f"{name}") #get data
data[ind] = val #replace entry
dict.update({f"{name}":data}) #send data back
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self, *args, **kwargs):
# Get the current script's directory
self.current_dir = os.path.dirname(os.path.abspath(__file__))
# Get the parent directory by going one level up
self.parent_dir = os.path.dirname(current_dir)
#establish connection to global variables
try: #try to connect to global variables
self.manager = multiprocessing.managers.BaseManager(address=('localhost',5001), authkey=b'')
self.manager.connect()
self.manager.register('syncdict')
self.manager.register('sync_imc')
self.manager.register('sync_BK_9174B')
self.manager.register('sync_BK_9174B_2')
self.manager.register('sync_BK_9131B')
self.syncdict = self.manager.syncdict()
self.sync_imc = self.manager.sync_imc()
self.sync_BK_9174B = self.manager.sync_BK_9174B()
self.sync_BK_9174B_2 = self.manager.sync_BK_9174B_2()
self.sync_BK_9131B = self.manager.sync_BK_9131B()
except: #open global variables, if no connection can be made (i.e. it is not running). Then connect to it
# subprocess.call(['D:\\Python instrument drivers\\env\\Scripts\\python.exe', 'D:\\Python instrument drivers\\StandAlones\\global_variables.py'])
self.global_vars = QProcess()
self.global_vars.start(self.current_dir+"\\.venv\\Scripts\\python.exe", [self.current_dir+'\\global_variables_TF.py'])
self.manager.connect()
self.manager.register('syncdict')
self.manager.register('sync_imc')
self.manager.register('sync_BK_9174B')
self.manager.register('sync_BK_9174B_2')
self.manager.register('sync_BK_9131B')
self.syncdict = self.manager.syncdict()
self.sync_imc = self.manager.sync_imc()
self.sync_BK_9174B = self.manager.sync_BK_9174B()
self.sync_BK_9174B_2 = self.manager.sync_BK_9174B_2()
self.sync_BK_9131B = self.manager.sync_BK_9131B()
print('!!!\nI opened global variables myself. If you close me, global variables will shut down too. Consider starting global variables in own instance for more security\n!!!')
#fill in variables, if they are not defined in global variables
self.syncdict.update({'B_set':[0,0,0], 'Start_Compensation':False, 'Status_Compensation': 0, 'Temperature_Caution': False})
# Status_Compensation: 0: No B-field compensation performed
# 1: Compensation is running
# 2: Compensation finished and succeeded
# 3: Compensation failed
#import Gui from QT designer file
super(MainWindow, self).__init__(*args, **kwargs)
self.setupUi(self)
#setup plot
self.graphWidget_x.setBackground('w')
self.graphWidget_x.setTitle("Compensation on x-axis")
self.graphWidget_x.setLabel('left', 'B_x (µT)')
self.graphWidget_x.setLabel('bottom', '#iterations')
self.graphWidget_y.setBackground('w')
self.graphWidget_y.setTitle("Compensation on y-axis")
self.graphWidget_y.setLabel('left', 'B_y (µT)')
self.graphWidget_y.setLabel('bottom', '#iterations')
self.graphWidget_z.setBackground('w')
self.graphWidget_z.setTitle("Compensation on z-axis")
self.graphWidget_z.setLabel('left', 'B_z (µT)')
self.graphWidget_z.setLabel('bottom', '#iterations')
pen1 = pg.mkPen(color=(255, 0, 0), width=2)
pen2 = pg.mkPen(color=(0, 0, 255), width=2)
self.plot_x_1 = self.graphWidget_x.plot([0,1],[1,0],pen = pen1, name = 'B_x_set')
self.plot_x_2 = self.graphWidget_x.plot([0,1],[1,0],pen = pen2, name = 'B_x')
self.graphWidget_x.addLegend()
self.plot_y_1 = self.graphWidget_y.plot([0,1],[1,0],pen = pen1, name = 'B_y_set')
self.plot_y_2 = self.graphWidget_y.plot([0,1],[1,0],pen = pen2, name = 'B_y')
self.graphWidget_y.addLegend()
self.plot_z_1 = self.graphWidget_z.plot([0,1],[1,0],pen = pen1, name = 'B_z_set')
self.plot_z_2 = self.graphWidget_z.plot([0,1],[1,0],pen = pen2, name = 'B_z')
self.graphWidget_z.addLegend()
#set up pyQT threadpool
self.threadpool = QThreadPool()
#define and standard threads.
# worker_save = Worker(self.save)
# self.threadpool.start(worker_save)
#define signals and slots
self.actionSet_default.triggered.connect(self.set_default)
self.actionReset_default.triggered.connect(self.read_default)
self.line_B_x_set.editingFinished.connect(self.set_B_set)
self.line_B_y_set.editingFinished.connect(self.set_B_set)
self.line_B_z_set.editingFinished.connect(self.set_B_set)
self.Button_Set_Field.clicked.connect(self.set_start_compensation)
self.checkBox_y2_coil.stateChanged.connect(self.set_y2_coil_Enabled)
self.comboBox_Temp_Sensor.currentIndexChanged.connect(self.set_temp_sensor)
self.line_Max_Temp.editingFinished.connect(self.set_max_temperature)
self.checkBox_Turn_Off_Coils.stateChanged.connect(self.set_Turn_Off_Coils)
#define constants
self.running = True
self.B_set = [0,0,0] #list containing x,y,z set values
self.Start_Compensation = False # boolean, which has to be set true in order to start compensation
self.Coil_Constant = [0,0,0] #Coil constant in muT/A
self.set_old = [0,0,1] #variable to save the 'old' set values to compare them to the global variables. It differs from set_new in the first iteration. This ensures that new parameters are send to the device
self.set_new = [0,0,0] #variable to save the new set values to compare them to the old ones
self.lines_config_float = [self.line_B_x_set,self.line_B_y_set, self.line_B_z_set, self.line_Max_Difference, self.line_Max_iterations, self.line_B_equil_time, self.line_HHC_param_x, self.line_HHC_param_y, self.line_HHC_param_y2, self.line_HHC_param_z, self.line_Max_Temp]#is used for config file
self.lines_config_strings = []#is used for config file
self.checkboxes_config = [self.checkBox_y2_coil, self.checkBox_Turn_Off_Coils]#is used for config file
self.combobox_config = [self.comboBox_Temp_Sensor]#is used for config file
self.y2_coil_Enabled = False # Boolean if second y-coil pair should be used
self.temp_sensor = 0 # Selected temperature sensor Channel
self.max_temperature = 320 # Maximum temperature, at which coils should be turned off
self.Turn_Off_Coils = False #Boolean if Coils should be turned off for temperatures above self.max_temperature
self.LS336_never_connected_before = True
#read default values from config and set them in gui
self.read_default()
#write values from gui to global variables.
self.set_B_set()
self.set_max_temperature()
# Start standard thread
self.worker = Worker(self.update_all)
self.threadpool.start(self.worker)
# Start temperature monitoring thread
worker_temperature = Worker(self.temperature_thread)
self.threadpool.start(worker_temperature)
def update_all(self, progress_callback):
#Checks if global variables changed from last iteration.
while self.running == True:
for i,n in enumerate(['B_set', 'Start_Compensation']): #get new set values from global variables and compare to old ones.
self.set_new[i] = self.syncdict.get(n)
if self.set_new != self.set_old: #if a button is clicked or global variables are changed the program checks which setting has been changed.
if self.set_new[0] != self.set_old[0]: #if B_set is changed new B_set parameters are saved locally
self.B_set = self.set_new[0]
if self.set_new[1] != self.set_old[1]: #if Start_Compensation is changed new Start_Compensation is saved locally
self.Start_Compensation = self.set_new[1]
self.update_setValues(self.set_new) #Change GUI text lines
self.set_old = self.set_new[:] #List needs to be sliced so that only values are taken and not just a pointer is created
if self.Start_Compensation == True: #Start Field Compensation
# Update compensation status in GUI
self.line_iteration_step.setText('Starting compensation.')
self.line_iteration_step.setStyleSheet("background-color: white")
# Update global variable
self.syncdict.update({'Status_Compensation':1})
i = 0 #iteration number
I = [0,0,0] #HHC Current
# Pull coil constants from GUI lines
self.Coil_Constant = [get_float(self.line_HHC_param_x), get_float(self.line_HHC_param_y), get_float(self.line_HHC_param_z)]
#set HHC current to 0A
self.sync_BK_9174B.update({'setI':[0, 0]})
if self.checkBox_y2_coil.isChecked() == True:
self.sync_BK_9174B_2.update({'setI':[0, 0]})
else:
update_single_entry(self.sync_BK_9174B_2, "setI", 0, 0)
#set max. voltage of HHC to 70V (in order to get constant current mode)
self.sync_BK_9174B.update({'setU':[70, 70]})
if self.checkBox_y2_coil.isChecked() == True:
self.sync_BK_9174B_2.update({'setU':[70, 70]})
else:
update_single_entry(self.sync_BK_9174B_2, "setU", 0, 70)
#turn on power supplies for HHC
self.sync_BK_9174B.update({'OutputOn':[True, True]})
if self.checkBox_y2_coil.isChecked() == True:
self.sync_BK_9174B_2.update({'OutputOn':[True, True]})
else:
update_single_entry(self.sync_BK_9174B_2, "OutputOn", 0, True)
#Turn off x,y,z-coil relais
self.sync_BK_9131B.update({'OutputOn':[False, False, False]})
#Set current and voltage of x,y,z-coil relais
self.sync_BK_9131B.update({'setI':[1,1,1]})
self.sync_BK_9131B.update({'setU':[5,5,5]})
time.sleep(1)
B_measured = self.sync_imc.get('FG') # Pull fluxgate data from global variables
for x in [0,1,2]:
B_measured[x] = B_measured[x]*100 #Conversion to muT
# Create array with all measured B-values for plotting
B_array = np.empty(shape=(0,3))
B_array = np.append(B_array,[B_measured],axis=0) # Save first measurement point already taken
# Uptdate plots
self.plot_x_1.setData([0,1],[self.B_set[0],self.B_set[0]])
self.plot_x_2.setData(np.arange(1),B_array[:,0])
self.plot_y_1.setData([0,1],[self.B_set[1],self.B_set[1]])
self.plot_y_2.setData(np.arange(1),B_array[:,1])
self.plot_z_1.setData([0,1],[self.B_set[2],self.B_set[2]])
self.plot_z_2.setData(np.arange(1),B_array[:,2])
# Calculate the difference between the set B-field and the measured values
B_Delta_x = (self.B_set[0] - B_measured[0])
B_Delta_y = (self.B_set[1] - B_measured[1])
B_Delta_z = (self.B_set[2] - B_measured[2])
# Create lists containing old and new coil-flip status
FlipStatus_new = [False, False, False]
FlipStatus_old = [False, False, False]
# Iterations are performed until the maximum number of iterations is reached or the difference is smaller than the given one
while i <= get_float(self.line_Max_iterations) and (np.abs(B_Delta_x) > get_float(self.line_Max_Difference) or np.abs(B_Delta_y) > get_float(self.line_Max_Difference) or np.abs(B_Delta_z) > get_float(self.line_Max_Difference)):
for k in [0,1,2]: # iterate on all three axes
I[k] = I[k] + (self.B_set[k] - B_measured[k])*(1/self.Coil_Constant[k]) # calculate new current set value
if I[k] < 0: # if current is negative, change the new flip status
FlipStatus_new[k] = True
else:
FlipStatus_new[k] = False
if self.checkBox_y2_coil.isChecked() == True and I[k] > 3:
I[k] = 3
if self.checkBox_y2_coil.isChecked() == False and I[k] >1.5:
I[k] = 1.5
print(I)
print(FlipStatus_new)
print(f"old{FlipStatus_old}")
if FlipStatus_new != FlipStatus_old: # if one of the flip status has changed compared to the old status, switch on the relais for flipping the current direction
self.sync_BK_9131B.update({'OutputOn':FlipStatus_new})
print("Switch")
time.sleep(0.5)
# set old flip status to the new one
FlipStatus_old = FlipStatus_new.copy()
for k in [0,1,2]: # iterate on all three axes and set the current of the power supplies
if k==0: # in case of x-direction
update_single_entry(self.sync_BK_9174B, "setI", k, np.abs(I[k]))
print(self.y2_coil_Enabled)
print(I[1])
if k==1: # in case of y-direction
if self.y2_coil_Enabled == True:
I_set = np.abs(I[k])/2
if I_set > 1.5:
I_set = 1.5
update_single_entry(self.sync_BK_9174B, "setI", 1, I_set)
update_single_entry(self.sync_BK_9174B_2, "setI", 1, I_set)
print("Klappt")
else: #Using just one power supply is enough
I_set = np.abs(I[k])
if I_set > 1.5:
I_set = 1.5
update_single_entry(self.sync_BK_9174B, "setI", 1, I_set)
if k==2: # in case of z-direction use BK_9174B_2
update_single_entry(self.sync_BK_9174B_2, "setI", 0, np.abs(I[k]))
# else: # in case of x- or y-direction use BK_9174B
# update_single_entry(self.sync_BK_9174B, "setI", k, np.abs(I[k]))
time.sleep(get_float(self.line_B_equil_time)) # wait for a given time period
B_measured = self.sync_imc.get('FG') # Pull fluxgate data from global variables
for x in [0,1,2]:
B_measured[x] = B_measured[x]*100 #Conversion to muT
# Calculate the difference between the set B-field and the measured values
B_Delta_x = (self.B_set[0] - B_measured[0])
B_Delta_y = (self.B_set[1] - B_measured[1])
B_Delta_z = (self.B_set[2] - B_measured[2])
B_array = np.append(B_array,[B_measured],axis=0) # append measured field for plotting
# plot B-fields of all three axes
self.plot_x_1.setData([0,i+2],[self.B_set[0],self.B_set[0]])
self.plot_x_2.setData(np.arange(i+2),B_array[:,0])
self.plot_y_1.setData([0,i+2],[self.B_set[1],self.B_set[1]])
self.plot_y_2.setData(np.arange(i+2),B_array[:,1])
self.plot_z_1.setData([0,i+2],[self.B_set[2],self.B_set[2]])
self.plot_z_2.setData(np.arange(i+2),B_array[:,2])
# if one B_set value is zero, calculating the percentual deviation is not possible
if self.B_set[0] != 0:
B_dev_x = 100*(self.B_set[0] - B_measured[0])/self.B_set[0]
self.line_B_x_dev.setText(str(np.round(B_dev_x,4)))
else:
self.line_B_x_dev.setText('/')
if self.B_set[1] != 0:
B_dev_y = 100*(self.B_set[1] - B_measured[1])/self.B_set[1]
self.line_B_y_dev.setText(str(np.round(B_dev_y,4)))
else:
self.line_B_y_dev.setText('/')
if self.B_set[2] != 0:
B_dev_z = 100*(self.B_set[2] - B_measured[2])/self.B_set[2]
self.line_B_z_dev.setText(str(np.round(B_dev_z,4)))
else:
self.line_B_z_dev.setText('/')
self.line_iteration_step.setText(str(i))
i=i+1
self.syncdict.update({'Start_Compensation':False})
if i <= get_float(self.line_Max_iterations):
self.line_iteration_step.setText('Compensation finished!')
self.line_iteration_step.setStyleSheet("background-color: lightgreen")
self.syncdict.update({'Status_Compensation':2})
else:
self.line_iteration_step.setText('Compensation failed!')
self.line_iteration_step.setStyleSheet("background-color: red")
self.syncdict.update({'Status_Compensation':3})
print("Closed")
def temperature_thread(self,progress_callback):
Caution_Message_Written = False
while self.running == True:
if self.temp_sensor > 0: #Read temperature if sensor is selected
if self.LS336_never_connected_before == True: #If never connected to LS336 before: Connect to LS336:
print("Try to connect to LS336")
try:
self.manager.register('sync_LS_336')
self.sync_LS_336 = self.manager.sync_LS_336()
LS336_connected = True
except:
print("Connecting to LS336 failed!")
LS336_connected = False
time.sleep(1)
self.LS336_never_connected_before = False
if LS336_connected == True:
T = self.sync_LS_336.get('T')[int(self.temp_sensor - 1)]
else:
T = 0
self.line_coil_temperature.setText(f"{np.round(T,2)}")
if self.Turn_Off_Coils == True and T > self.max_temperature:
if Caution_Message_Written == False:
print("Caution: Coil temperature above maximum!")
self.syncdict.update({'Temperature_Caution': True}) # Set temperature warning in global params
self.line_coil_temperature.setStyleSheet("background-color: red")
Caution_Message_Written = True
#turn off power supplies
self.sync_BK_9174B.update({'OutputOn':[False, False]})
self.sync_BK_9174B_2.update({'OutputOn':[False, False]})
if T <= self.max_temperature and Caution_Message_Written == True:
self.line_coil_temperature.setStyleSheet("background-color: white")
self.syncdict.update({'Temperature_Caution': False}) # Set temperature warning in global params to False
Caution_Message_Written = False
else:
LS336_never_connected_before = False
def update_setValues(self,values):
#sets setvalues obtained from update_all in gui
self.line_B_x_set.setText(f"{values[0][0]}")
self.line_B_y_set.setText(f"{values[0][1]}")
self.line_B_z_set.setText(f"{values[0][2]}")
def set_B_set(self):
#updates the B_set values in global variables. The change will be detected by update_all
B_set = [get_float(self.line_B_x_set), get_float(self.line_B_y_set), get_float(self.line_B_z_set)]
self.syncdict.update({'B_set':B_set})
def set_start_compensation(self):
self.syncdict.update({'Start_Compensation':True})
def set_y2_coil_Enabled(self):
if self.y2_coil_Enabled == False:
self.y2_coil_Enabled = True
else:
self.y2_coil_Enabled = False
#Turn off BK_9174B_2 CH2
update_single_entry(self.sync_BK_9174B_2, "setI", 1, 0)
update_single_entry(self.sync_BK_9174B_2, "OutputOn", 1, False)
def set_temp_sensor(self, value):
self.temp_sensor = value #0: No Sensor, 1:A, 2:B, 3:C, 4:D
def set_max_temperature(self):
self.max_temperature = get_float(self.line_Max_Temp)
def set_Turn_Off_Coils(self):
self.Turn_Off_Coils = self.checkBox_Turn_Off_Coils.isChecked()
def save(self, progress_callback):
#if save checkbox is checked it writes measurement values to file specified in line.filePath. There the full path including file extension must be given.
while self.running == True:
time.sleep(self.timing_save) #wait is at beginning so first point is not corrupted when app just started.
if self.checkBox_save.isChecked() == True:
path = self.line_filePath.text()
if os.path.isfile(path) == False:
with open(path,'a') as file:
file.write('date\tV Ch:1[V]\tI Ch:1[A]\tP Ch:1[W]\tV Ch:2[V]\tI Ch:2[A]\tP Ch:2[W]\tV Ch:3[V]\tI Ch:3[A]\tP Ch:3[W]\n')
file = open(path,'a')
file.write(time.strftime("%Y-%m-%d_%H-%M-%S",time.localtime(self.t[-1]))+'\t')
for i in [0,1,2]: #Loop for all three channels
file.write(f"{self.Voltage[-1,i]}\t")
file.write(f"{self.Current[-1,i]}\t")
file.write(f"{self.Power[-1,i]}\t")
file.write('\n')
file.close
def set_default(self):
#saves current set values to txt file in subdirectory configs. All entries that are saved are defined in self.lines_config
#Overwrites old values in config file.
current_dir = os.path.dirname(os.path.abspath(__file__))
path = current_dir+'\\configs\\B_Field_Compensation_config.txt' #To make shure the config file is at the right place, independent from where the program is started the location of the file is retrieved
file = open(path,'w')
for l in self.lines_config_float:
temp = f"{get_float(l)}"
file.write(temp+'\t')
for l in self.lines_config_strings:
file.write(l.text()+'\t')
for c in self.checkboxes_config:
file.write(str(c.isChecked())+'\t')
for c in self.combobox_config:
file.write(str(c.currentIndex())+'\t')
file.write('\n')
file.close
def read_default(self):
#reads default values from config file in subdirectory config and sets the values in gui. Then self.change is set to true so values are send
#to device. (If no config file exists, it does nothing.)
current_dir = os.path.dirname(os.path.abspath(__file__))
path = current_dir+'\\configs\\B_Field_Compensation_config.txt' #To make shure the config file is read from the right place, independent from where the program is started the location of the file is retrieved
try: #exit function if config file does not exist
vals = import_txt.read_raw(path)
except:
print('no config file found on')
print(path)
return
formats = ['.2f', '.2f', '.2f','.2f', '.2f','.0f','.2f', '.2f' , '.2f' , '.2f', '.2f']
for l,v,f in zip(self.lines_config_float,vals[0],formats):
v = float(v) #convert string in txt to float, so number can be formatted according to "formats" when it's set
l.setText(format(v,f))
for l,v in zip(self.lines_config_strings,vals[0][len(self.lines_config_float):]):
l.setText(v)
for c,v in zip(self.checkboxes_config,vals[0][len(self.lines_config_float)+len(self.lines_config_strings):]):
c.setChecked(v == 'True')
for c,v in zip(self.combobox_config,vals[0][len(self.lines_config_float)+len(self.lines_config_strings)+len(self.checkboxes_config):]):
c.setCurrentIndex(int(v))
self.change = True
def closeEvent(self,event): #when window is closed self.running is set to False, so all threads stop
self.running = False
time.sleep(1)
event.accept()
app = QApplication(sys.argv)
window = MainWindow()
window.show()
app.exec()