import sys import pandas as pd import check_connection import rw_device_params import analize import ammeter import voltmeter import power_unit from device import Ui_Device from main_window import Ui_MainWindow from PyQt6 import QtCore from PyQt6.QtWidgets import QApplication, QMainWindow, QWidget, QFileDialog import pyqtgraph as pg style_sheet_button_enabled = 'QPushButton{font-size:16px;border-width: 1px;border-style: solid;border-radius: 10px;background-color:green;}QPushButton::hover{background-color:red;}' style_sheet_button_unabled = 'QPushButton{font-size:16px;border-width: 1px;border-style: solid;border-radius: 10px;background-color:rgb(44,47,52);}QPushButton::hover{background-color:red;}' style_sheet_polarity_plus = 'QPushButton{font-size:16px;border-width: 1px;border-style: solid;border-radius: 10px;background-color:rgb(139,0,0);}QPushButton::hover{background-color:red;}' style_sheet_polarity_minus = 'QPushButton{font-size:16px;border-width: 1px;border-style: solid;border-radius: 10px;background-color:rgb(2,14,93);}QPushButton::hover{background-color:red;}' class ThreadCurrent(QtCore.QThread): completed = QtCore.pyqtSignal() def __init__(self, parent): QtCore.QThread.__init__(self) self.running = False self.parent = parent self.ammeter = ammeter def run(self): self.running = True self.parent.measuredI = self.parent.ammeter.getCurrentDC() self.completed.emit() class ThreadVoltage(QtCore.QThread): completed = QtCore.pyqtSignal() def __init__(self, parent): QtCore.QThread.__init__(self) self.running = False self.parent = parent def run(self): self.running = True self.parent.measuredV = self.parent.voltmeter.getVoltageDC() self.completed.emit() class ThreadMeasureVac(QtCore.QThread): def __init__(self, parent, start_voltage, end_voltage, step_voltage, polarity, A2, V2, PU): QtCore.QThread.__init__(self) self.running = False self.parent = parent self.step_voltage = step_voltage self.current_voltage = start_voltage self.end_voltage = end_voltage self.ammeter = A2 self.voltmeter = V2 self.power_unit = PU self.polarity = polarity # Threads for voltmeter and ammeter self.currThread = ThreadCurrent(self) self.voltThread = ThreadVoltage(self) # Variables to save measured data self.measuredV = 0 self.measuredI = 0 def run(self): while self.current_voltage < self.end_voltage: self.power_unit.setVoltageDC(self.current_voltage) self.sleep(1) self.current_voltage += self.step_voltage self.get_measure() if self.polarity == True: self.parent.I.append(self.measuredI*1e6) self.parent.V.append(self.measuredV) else: self.parent.I.append(-1*self.measuredI*1e6) self.parent.V.append(-1*self.measuredV) self.power_unit.setVoltageDC(0) def get_measure(self): self.currThread.start() self.voltThread.start() self.sleep(2) class ThreadMeasure(QtCore.QThread): def __init__(self, parent, A1, V1): QtCore.QThread.__init__(self) self.running = False self.parent = parent self.ammeter = A1 self.voltmeter = V1 # Threads for voltmeter and ammeter self.currThread = ThreadCurrent(self) self.voltThread = ThreadVoltage(self) # Variables to save measured data self.measuredV = 0 self.measuredI = 0 def run(self): while self.running != False: self.get_measure() self.parent.I.append(self.measuredI*1000) self.parent.V.append(self.measuredV) def get_measure(self): self.currThread.start() self.voltThread.start() self.sleep(2) class Window_Device(QWidget, Ui_Device): def __init__(self, device_name): super().__init__() self.setupUi(self) self.name = device_name self.params = rw_device_params.read_device_params(self.name) self.id_edit.setText(str(self.params[3])) self.device_name.setText(str(self.params[2])) self.device_status.setText(str(self.params[1])) self.baud_rate_edit.setText(self.params[5]) self.connectSignalSlots() self.setWindowTitle('Параметры {}'.format(self.name)) self.setFixedSize(506, 374) def connectSignalSlots(self): self.baud_rate_edit.textChanged.connect(self.baud_rate_changed) self.id_edit.textChanged.connect(self.id_changed) def baud_rate_changed(self, s): rw_device_params.write_device_params(self.name, 'baud_rate', s) def id_changed(self, s): rw_device_params.write_device_params(self.name, 'id', s) class Window(QMainWindow, Ui_MainWindow): def __init__(self): # Init window super().__init__() self.setupUi(self) # Refresh list of connected deivces self.update_devices() self.connectSignalsSlots() # Timer for updating window self.timer_update_1 = QtCore.QTimer() self.timer_update_1.setInterval(500) self.timer_update_1.timeout.connect(self.update_measure_vac) self.timer_update = QtCore.QTimer() self.timer_update.setInterval(500) self.timer_update.timeout.connect(self.update_measure) self.graph_vac_1.setLabel( 'left', "I, мкА") self.graph_vac_1.setLabel( 'bottom', "U, В") self.graph_vac.setLabel( 'left', "I, мА") self.graph_vac.setLabel( 'bottom', "U, В") self.graph_vac_4.setLabel( 'left', "I, мкА") self.graph_vac_4.setLabel( 'bottom', "U, В") self.graph_vac_4.addLegend( labelTextColor='white', labelTextSize='16pt') # Variables self.start_voltage = 0 self.end_voltage = 0 self.step_voltage = 0 self.exp_A = 0 self.I = [] self.V = [] self.polarity = True self.time_estimated = 0 self.save_directory = '' self.file_name = '' self.thread = QtCore.QThread() # Connect all signal slots def connectSignalsSlots(self): self.properties_V1.triggered.connect( lambda: self.device('V1')) self.properties_V2.triggered.connect( lambda: self.device('V2')) self.properties_A1.triggered.connect( lambda: self.device('A1')) self.properties_A2.triggered.connect( lambda: self.device('A2')) self.directory.clicked.connect( lambda: self.choose_directory()) self.choose_folder.triggered.connect( lambda: self.choose_directory()) self.begin_measure_1.clicked.connect( lambda: self.measure_vac()) self.end_measure_1.clicked.connect( lambda: self.end_of_measure_vac()) self.change_polarity_1.clicked.connect( lambda: self.change_polarity()) self.confirm_measure_1.clicked.connect( lambda: self.confirm_measure()) self.navigation.currentChanged.connect( lambda: self.plot_to_run()) self.begin_measure.clicked.connect( lambda: self.measure()) self.end_measure.clicked.connect( lambda: self.end_of_measure()) # Updates list of connected devices def update_devices(self): self.devices = check_connection.check_devices_connection() self.device_status() # Updates device status on main_window def device_status(self): self.A1_status.setText(self.devices[2]) if self.A1_status.text() == 'Online': self.A1_status.setStyleSheet('color : rgb(144,238,144);') else: self.A1_status.setStyleSheet('color : red;') self.A2_status.setText(self.devices[3]) if self.A2_status.text() == 'Online': self.A2_status.setStyleSheet('color : rgb(144,238,144);') else: self.A2_status.setStyleSheet('color : red;') self.V1_Status.setText(self.devices[0]) if self.V1_Status.text() == 'Online': self.V1_Status.setStyleSheet('color : rgb(144,238,144);') else: self.V1_Status.setStyleSheet('color : red;') self.V2_status.setText(self.devices[1]) if self.V2_status.text() == 'Online': self.V2_status.setStyleSheet('color : rgb(144,238,144);') else: self.V2_status.setStyleSheet('color : red;') # Open window with device params def device(self, device_name): self.device_window = Window_Device(device_name) self.device_window.show() # Open a folder_window to choose folder def choose_directory(self): dlg = QFileDialog folderpath = dlg.getExistingDirectory(self, 'Выбор директории') self.save_directory = folderpath self.chosen_directory.setText(self.save_directory) # Start measurement def measure(self): if self.save_directory != '': # Create voltmeter and ammeter try: self.A1 = ammeter.B7_78('/dev/ttyUSB0') self.V1 = voltmeter.AKIP('/dev/usbtmc0') except: self.error.setText(QtCore.QTime.currentTime().toString( )+' - [Errno2]') # Start thread for measurments self.thread = ThreadMeasure( self, self.A1, self.V1) self.thread.start() self.thread.running = True # Start timer self.timer_update.start() # Set graph params self.graph_vac.setXRange(29, 36, padding=0) self.graph_vac.setYRange(0, 6, padding=0) self.graph_vac.showGrid(x=True, y=True, alpha=1) # Set buttons properties self.begin_measure.setEnabled(False) self.end_measure.setEnabled(True) self.end_measure.setStyleSheet(style_sheet_button_enabled) self.begin_measure.setStyleSheet(style_sheet_button_unabled) # Set experiment status self.exp_status.setText('В процессе') else: self.error.setText(QtCore.QTime.currentTime().toString( )+' - [Errno0]') # Start measurment for vac def measure_vac(self): # Create voltmeter, ammeter and power_unit try: self.A2 = ammeter.B7_78('/dev/ttyUSB0') self.PU = power_unit.Agilent('/dev/ttyUSB1') self.V2 = voltmeter.AKIP('/dev/usbtmc0') # Start thread for measurments self.thread = ThreadMeasureVac( self, self.start_voltage, self.end_voltage, self.step_voltage, self.polarity, self.A2, self.V2, self.PU) self.thread.start() # Start timer self.timer_update_1.start() # Set graph params self.graph_vac_1.setXRange(-26, 26, padding=0) self.graph_vac_1.setYRange(-130, 130, padding=0) self.graph_vac_1.showGrid(x=True, y=True, alpha=1) # Set buttons properties self.begin_measure_1.setEnabled(False) self.end_measure_1.setEnabled(True) self.change_polarity_1.setEnabled(True) self.change_polarity_1.setStyleSheet(style_sheet_button_enabled) self.end_measure_1.setStyleSheet(style_sheet_button_enabled) self.begin_measure_1.setStyleSheet(style_sheet_button_unabled) # Set experiment status self.exp_status_1.setText('В процессе') except: self.error.setText(QtCore.QTime.currentTime().toString( )+' - [Errno2]') # End measurement for vac def end_of_measure_vac(self): # Stop measurement thread self.thread.quit() self.timer_update_1.stop() # Set experiment status self.exp_status_1.setText('Окончен') # Update button properties self.change_polarity_1.setStyleSheet(style_sheet_button_unabled) self.change_polarity_1.setEnabled(False) self.confirm_measure_1.setEnabled(True) self.begin_measure_1.setEnabled(False) self.confirm_measure_1.setStyleSheet(style_sheet_button_enabled) self.begin_measure_1.setStyleSheet(style_sheet_button_unabled) self.exp_A_1.setEnabled(True) self.start_voltage_1.setEnabled(True) self.end_voltage_1.setEnabled(True) self.step_voltage_1.setEnabled(True) # Save collected data df = pd.DataFrame(list(zip(self.V, self.I)), columns=['U', 'I']) df.to_csv(self.save_directory+'/'+self.file_name+'mA.csv') # Build fitted curve self.I = df['I'] self.U = df['U'] df, params, pcov = analize.getApproxValues(pd.DataFrame( list(zip(self.U, self.I)), columns=['U', 'I']),) self.graph_vac_1.plot( df['U'], df['I'], pen=pg.mkPen('yellow', width=1)) # Plot fitted curve self.graph_vac_1.plot(self.U, self.I, symbol='o', pen=None, symbolSize=10, symbolPen='yellow') tmp = analize.discrepancy([self.I, self.U], params) error = pg.ErrorBarItem( x=self.U, y=self.I, top=tmp, bottom=tmp, beam=0.5) self.graph_vac_1.addItem(error) # Update value labels self.I_n_1.setText(str(int(params[0]))) self.T_e_1.setText(str(int(params[1]))) self.I_n_6.setText(str(int(params[0]))) self.T_e_6.setText(str(int(params[1]))) # Zero variables self.U = [] self.I = [] self.time_estimated = 0 # End measurement def end_of_measure(self): # Stop measurement thread self.thread.running = False self.timer_update.stop() # Set experiment status self.exp_status.setText('Окончен') # Update button properties self.begin_measure.setEnabled(False) self.begin_measure.setStyleSheet(style_sheet_button_unabled) # Save collected data df = pd.DataFrame(list(zip(self.V, self.I)), columns=['U', 'I']) df.to_csv(self.save_directory+'/'+'discharge_VAC.csv') # Build fitted curve # Update value labels # Zero variables self.U = [] self.I = [] self.time_estimated = 0 # Update labels and graph during vac experiment def update_measure_vac(self): # Update time self.time_estimated += 0.5 # Update value labels self.amount_of_dots_1.setText(str(len(self.I))) self.time_from_begin_1.setText(str(self.time_estimated)) try: self.last_A_1.setText(str(self.I[-1])) self.last_V_1.setText(str(self.V[-1])) except (IndexError, ValueError, KeyError): pass # Plot current V and I self.graph_vac_1.plot(self.V, self.I, symbol='o', pen=None, symbolSize=10, symbolPen='yellow') # Update labels and graph during discharge experiment def update_measure(self): # Update time self.time_estimated += 0.5 # Update value labels self.amount_of_dots.setText(str(len(self.I))) self.time_from_begin.setText(str(self.time_estimated)) try: self.last_A.setText(str(self.I[-1])) self.last_V.setText(str(self.V[-1])) except (IndexError, ValueError): pass # Plot current V and I self.graph_vac.plot(self.V, self.I, symbol='o', pen=None, symbolSize=10, symbolPen='yellow') # Start a thread with changed polarity def change_polarity(self): # Stop existing thread self.thread.quit() # Update polarity value self.polarity = not self.polarity # Creat new thread for measurement with changed polarity self.thread = ThreadMeasureVac( self, self.start_voltage, self.end_voltage, self.step_voltage, self.polarity, self.A2, self.V2, self.PU) self.thread.start() # Update button properties if self.polarity == True: self.change_polarity_1.setStyleSheet(style_sheet_polarity_plus) else: self.change_polarity_1.setStyleSheet(style_sheet_polarity_minus) # Set experment params for vac def confirm_measure(self): if self.save_directory != '': try: # Save measure params from line_edits self.exp_A = str(self.exp_A_1.text()) self.start_voltage = float(self.start_voltage_1.text()) self.end_voltage = float(self.end_voltage_1.text()) self.step_voltage = float(self.step_voltage_1.text()) self.file_name = str(self.exp_A_1.text()) # Set button properties self.confirm_measure_1.setEnabled(False) self.begin_measure_1.setEnabled(True) self.confirm_measure_1.setStyleSheet( style_sheet_button_unabled) self.begin_measure_1.setStyleSheet(style_sheet_button_enabled) self.exp_A_1.setEnabled(False) self.start_voltage_1.setEnabled(False) self.end_voltage_1.setEnabled(False) self.step_voltage_1.setEnabled(False) self.graph_vac_1.clear() self.I_n_1.setText('Не определено') self.T_e_1.setText('Не определено') self.time_from_begin_1.setText('None') self.amount_of_dots_1.setText('None') self.last_V_1.setText('None') self.last_A_1.setText('None') self.exp_status_1.setText('Не начат') self.graph_vac_1.plotItem.clear() except ValueError: self.error.setText(QtCore.QTime.currentTime().toString( )+' - [Errno1]') else: self.error.setText(QtCore.QTime.currentTime().toString( )+' - [Errno0]') # Check which tab has been opened def plot_to_run(self): if self.navigation.currentIndex() == 3: self.plot_all() else: self.graph_vac_4.clear() # Plot all data on 3rd tab def plot_all(self): if self.save_directory != '': T = [self.T_e_4, self.T_e_5, self.T_e_6] I = [self.I_n_4, self.I_n_5, self.I_n_6] B = [self.B_koef_4, self.B_koef_5, self.B_koef_6] for file, color, T_e, I_n, B_koef in zip(['1.5mA.csv', '3.0mA.csv', '5.0mA.csv'], ['green', 'yellow', 'orange'], T, I, B): try: df = pd.read_csv(self.save_directory+'/'+file) self.I = df['I'] self.U = df['U'] df, params, error = analize.getApproxValues(pd.DataFrame( list(zip(self.U, self.I)), columns=['U', 'I'])) T_e.setText(str(int(params[1])) + u"\u00B1" + str(int(error[1]))) I_n.setText(str(int(params[0])) + u"\u00B1" + str(int(error[0]))) B_koef.setText(str('%.2f' % params[3]) + u"\u00B1" + str('%.2f' % error[3])) self.graph_vac_4.plot( df['U'], df['I'], pen=pg.mkPen(color, width=1), name=file.rstrip('.csv')) # Plot fitted curve self.graph_vac_4.plot(self.U, self.I, symbol='o', pen=None, symbolSize=7, symbolPen=color) self.graph_vac_4.showGrid(x=True, y=True, alpha=1) except FileNotFoundError: self.error.setText(QtCore.QTime.currentTime().toString( )+' - [Errno3]') pass else: self.error.setText(QtCore.QTime.currentTime().toString( )+' - [Errno0]') if __name__ == "__main__": app = QApplication(sys.argv) win = Window() win.show() sys.exit(app.exec())