craft-software/Legacy/TF_Control/evaporate_LN2.py
2025-07-04 15:52:40 +02:00

249 lines
11 KiB
Python

from PyQt6.QtGui import *
from PyQt6.QtWidgets import *
from PyQt6.QtCore import *
import multiprocessing
import multiprocessing.managers
import time
from timeit import default_timer as timer
from datetime import datetime
import collections
from simple_pid import PID
import traceback, sys, os
import numpy as np
import pyqtgraph as pg
from design_files.evaporate_LN2_design import Ui_MainWindow
# Get the current script's directory
current_dir = os.path.dirname(os.path.abspath(__file__))
# Get the parent directory by going one level up
parent_dir = os.path.dirname(current_dir)
# Add the parent directory to sys.path
sys.path.append(parent_dir)
from scripts import import_txt
class WorkerSignals(QObject):
'''
Defines the signals available from a running worker thread.
Supported signals are:
finished: No data
error: tuple (exctype, value, traceback.format_exc() )
result: object data returned from processing, anything
progress: int indicating % progress
'''
finished = pyqtSignal()
error = pyqtSignal(tuple)
result = pyqtSignal(object)
progress = pyqtSignal(list)
class Worker(QRunnable):
'''
Worker thread
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread. Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
'''
def __init__(self, fn, *args, **kwargs):
super(Worker, self).__init__()
# Store constructor arguments (re-used for processing)
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
# Add the callback to our kwargs
self.kwargs['progress_callback'] = self.signals.progress
@pyqtSlot()
def run(self):
'''
Initialise the runner function with passed args, kwargs.
'''
# Retrieve args/kwargs here; and fire processing using them
try:
result = self.fn(*self.args, **self.kwargs)
except:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(result) # Return the result of the processing
finally:
self.signals.finished.emit() # Done
def get_float(Qline,default = 0): #gets value from QLineEdit and converts it to float. If text is empty or cannot be converted, it returns "default" which is 0, if not specified
try:
out = float(Qline.text())
except:
out = default
return(out)
def update_single_entry(dict,name,ind,val):
'''updates a single value in global vars. To do so it gets the current value of "dict('name')", replaces "val" at indec "ind" and sends this back'''
data = dict.get(f"{name}") #get data
print(data)
data[ind] = val #replace entry
dict.update({f"{name}":data}) #send data back
def update_single_entry(dict,name,ind,val): #updates a single value in global vars. To do so it gets the current value of "dict('name')", replaces "val" at indec "ind" and sends this back
data = dict.get(f"{name}") #get data
data[ind] = val #replace entry
dict.update({f"{name}":data}) #send data back
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self, *args, **kwargs):
# Get the current script's directory
self.current_dir = os.path.dirname(os.path.abspath(__file__))
# Get the parent directory by going one level up
self.parent_dir = os.path.dirname(current_dir)
#establish connection to global variables}
try: #try to connect to global variables
manager = multiprocessing.managers.BaseManager(address=('localhost',5001), authkey=b'')
manager.connect()
manager.connect()
manager.register('sync_BK_9132B')
manager.register('sync_LS_336')
self.sync_BK_9132B = manager.sync_BK_9132B()
self.sync_LS_336 = manager.sync_LS_336()
except: #open global variables, if no connection can be made (i.e. it is not running). Then connect to it
# subprocess.call(['D:\\Python instrument drivers\\env\\Scripts\\python.exe', 'D:\\Python instrument drivers\\StandAlones\\global_variables.py'])
self.global_vars = QProcess()
self.global_vars.start(self.current_dir+"\\.venv\\Scripts\\python.exe", [self.current_dir+'\\global_variables_TF.py'])
manager.connect()
manager.register('sync_BK_9132B')
manager.register('sync_LS_336')
self.sync_BK_9132B = manager.sync_BK_9132B()
self.sync_LS_336 = manager.sync_LS_336()
print('!!!\nI opened global variables myself. If you close me, global variables will shut down too. Consider starting global variables in own instance for more security\n!!!')
#fill in variables, in case they are not defined in global variables
#import Gui from QT designer file
super(MainWindow, self).__init__(*args, **kwargs)
self.setupUi(self)
#setup plot
#define signals and slots
self.actionMake_current_values_default.triggered.connect(self.save_default)
self.actionReset_default_values.triggered.connect(self.load_default)
#define constants
self.running = True #true while app is running
self.timing_ = 1 #wait time in function "update_Data".
self.SB_all = [self.dSB_T_stop, self.dSB_P] #list of all Spinboxes, helps for saving and loading of default values
self.checkBoxes_all = [self.checkBox_PID]
#read default values from config and set values in gui
self.load_default()
#set up pyQT threadpool
self.threadpool = QThreadPool()
#start standard threads
worker = Worker(self.update_Data)
worker.signals.progress.connect(self.update_gui) #The values from update_Data must be transmitted via a signal, so that the gui is set in the main thread. If the plots are not updated in the main thread, they freeze.
self.threadpool.start(worker)
def update_Data(self, progress_callback):
'''gets data from global variables and triggers update_gui'''
#set small voltage to get a value of R
#['setU', 'setI', 'OutputOn']
update_single_entry(self.sync_BK_9132B,'setU',0,1)
update_single_entry(self.sync_BK_9132B,'setI',0,2)
update_single_entry(self.sync_BK_9132B,'OutputOn',0,True)
T_is = self.sync_LS_336.get('T')[3] #get value of PT-100
time.sleep(2)
R = 40
P = 0
while self.running == True:
if T_is < self.dSB_T_stop.value():
#turn heater on
T_is = self.sync_LS_336.get('T')[3] #get value of PT-100
U = self.sync_BK_9132B.get('U')[0] #get U
I = self.sync_BK_9132B.get('I')[0] #get I
P = self.sync_BK_9132B.get('P')[0]
try:
R = U/I #get R
set_U = np.sqrt(self.dSB_P.value()*R) #calculate U so that P max is reached
except ZeroDivisionError:
R = np.nan
set_U = 1 #set 1 V to measure R in next iteration
update_single_entry(self.sync_BK_9132B,'setU',0,set_U) #send new value to device
update_single_entry(self.sync_BK_9132B,'setI',0,2) #send new value to device
update_single_entry(self.sync_BK_9132B,'OutputOn',0,True)
else:
T_is = self.sync_LS_336.get('T')[3] #get value of PT-100
#turn off heater
update_single_entry(self.sync_BK_9132B,'setU',0,0)
update_single_entry(self.sync_BK_9132B,'setI',0,0)
update_single_entry(self.sync_BK_9132B,'OutputOn',0,False)
P = self.sync_BK_9132B.get('P')[0]
progress_callback.emit([T_is,R,P])
time.sleep(1)
#turn off heater when programm is closed
update_single_entry(self.sync_BK_9132B,'setU',0,0)
update_single_entry(self.sync_BK_9132B,'setI',0,0)
update_single_entry(self.sync_BK_9132B,'OutputOn',0,False)
def update_gui(self,data):#sets labels to new values
self.Label_T_is.setText(f"{data[0]}")
self.Label_R.setText(f"{data[1]}")
self.Label_P_is.setText(f"{data[2]}")
def save_default(self):
#saves current set values to txt file in subdirectory configs. Saves values from al spin boxes and text lines.
#Overwrites old values in config file.
path = self.current_dir+'\\configs\\evaporate_LN2_config.txt' #To make shure the config file is at the right place, independent from where the program is started the location of the file is retrieved
file = open(path,'w')
for SB in self.SB_all:
temp = f"{SB.value()}"
file.write(temp+'\t')
for c in self.checkBoxes_all:
file.write(str(c.isChecked())+'\t')
file.write('\n')
file.close
def load_default(self):
#reads default values from config file in subdirectory config and sets the values in gui. (If no config file exists, it does nothing.)
path = self.current_dir+'\\configs\\evaporate_LN2_config.txt' #To make shure the config file is read from the right place, independent from where the program is started the location of the file is retrieved
try: #exit function if config file does not exist
vals = import_txt.read_raw(path)
except:
return
for SB,v in zip(self.SB_all,vals[0]):
if type(SB) == QDoubleSpinBox:
v = float(v) #convert string in txt to float, so number can be set in dSB
else:
v = int(v) #convert string in txt to int, so number can be set in SB
SB.setValue(v)
for c,v in zip(self.checkBoxes_all,vals[0][len(self.SB_all):]):
c.setChecked(v == 'True')
def closeEvent(self,event): #when window is closed self.running and self.monitor are set to False, so all threads stop
self.running = False
time.sleep(2)
event.accept()
app = QApplication(sys.argv)
window = MainWindow()
window.show()
app.exec()