Загрузил(а) файлы в 'Project/program_files'

This commit is contained in:
Дмитрий Пунов 2022-12-10 17:03:57 +03:00
parent 4f692d3288
commit 9777b815ca
5 changed files with 741 additions and 0 deletions

View File

@ -0,0 +1,27 @@
import serial
class B7_78():
def __init__(self, folder):
self.folder = folder
self.instr = serial.Serial(
self.folder, timeout=1, inter_byte_timeout=1)
self.instr.write(b'SYST:REM\n')
self.instr.write(b'SYST:REM\n')
def getCurrentDC(self):
self.instr.write(b'MEAS:CURR:DC?\n')
return float(self.instr.read(100).decode().rstrip())
def getVoltageDC(self):
self.instr.write(b'MEAS:VOLT:DC?\n')
return float(self.instr.read(100).decode().rstrip())
def getCurrentAC(self):
self.instr.write(b'MEAS:CURR:AC?\n')
return float(self.instr.read(100).decode().rstrip())
def getVoltageAC(self):
self.instr.write(b'MEAS:VOLT:AC?\n')
return float(self.instr.read(100).decode().rstrip())

View File

@ -0,0 +1,609 @@
import sys
import pandas as pd
import check_connection
import rw_device_params
import analize
import ammeter
import voltmeter
import power_unit
from device import Ui_Device
from main_window import Ui_MainWindow
from PyQt6 import QtCore
from PyQt6.QtWidgets import QApplication, QMainWindow, QWidget, QFileDialog
import pyqtgraph as pg
style_sheet_button_enabled = 'QPushButton{font-size:16px;border-width: 1px;border-style: solid;border-radius: 10px;background-color:green;}QPushButton::hover{background-color:red;}'
style_sheet_button_unabled = 'QPushButton{font-size:16px;border-width: 1px;border-style: solid;border-radius: 10px;background-color:rgb(44,47,52);}QPushButton::hover{background-color:red;}'
style_sheet_polarity_plus = 'QPushButton{font-size:16px;border-width: 1px;border-style: solid;border-radius: 10px;background-color:rgb(139,0,0);}QPushButton::hover{background-color:red;}'
style_sheet_polarity_minus = 'QPushButton{font-size:16px;border-width: 1px;border-style: solid;border-radius: 10px;background-color:rgb(2,14,93);}QPushButton::hover{background-color:red;}'
class ThreadCurrent(QtCore.QThread):
completed = QtCore.pyqtSignal()
def __init__(self, parent):
QtCore.QThread.__init__(self)
self.running = False
self.parent = parent
self.ammeter = ammeter
def run(self):
self.running = True
self.parent.measuredI = self.parent.ammeter.getCurrentDC()
self.completed.emit()
class ThreadVoltage(QtCore.QThread):
completed = QtCore.pyqtSignal()
def __init__(self, parent):
QtCore.QThread.__init__(self)
self.running = False
self.parent = parent
def run(self):
self.running = True
self.parent.measuredV = self.parent.voltmeter.getVoltageDC()
self.completed.emit()
class ThreadMeasureVac(QtCore.QThread):
def __init__(self, parent, start_voltage, end_voltage, step_voltage, polarity, A2, V2, PU):
QtCore.QThread.__init__(self)
self.running = False
self.parent = parent
self.step_voltage = step_voltage
self.current_voltage = start_voltage
self.end_voltage = end_voltage
self.ammeter = A2
self.voltmeter = V2
self.power_unit = PU
self.polarity = polarity
# Threads for voltmeter and ammeter
self.currThread = ThreadCurrent(self)
self.voltThread = ThreadVoltage(self)
# Variables to save measured data
self.measuredV = 0
self.measuredI = 0
def run(self):
while self.current_voltage < self.end_voltage:
self.power_unit.setVoltageDC(self.current_voltage)
self.sleep(1)
self.current_voltage += self.step_voltage
self.get_measure()
if self.polarity == True:
self.parent.I.append(self.measuredI*1e6)
self.parent.V.append(self.measuredV)
else:
self.parent.I.append(-1*self.measuredI*1e6)
self.parent.V.append(-1*self.measuredV)
self.power_unit.setVoltageDC(0)
def get_measure(self):
self.currThread.start()
self.voltThread.start()
self.sleep(2)
class ThreadMeasure(QtCore.QThread):
def __init__(self, parent, A1, V1):
QtCore.QThread.__init__(self)
self.running = False
self.parent = parent
self.ammeter = A1
self.voltmeter = V1
# Threads for voltmeter and ammeter
self.currThread = ThreadCurrent(self)
self.voltThread = ThreadVoltage(self)
# Variables to save measured data
self.measuredV = 0
self.measuredI = 0
def run(self):
while self.running != False:
self.get_measure()
self.parent.I.append(self.measuredI*1000)
self.parent.V.append(self.measuredV)
def get_measure(self):
self.currThread.start()
self.voltThread.start()
self.sleep(2)
class Window_Device(QWidget, Ui_Device):
def __init__(self, device_name):
super().__init__()
self.setupUi(self)
self.name = device_name
self.params = rw_device_params.read_device_params(self.name)
self.id_edit.setText(str(self.params[3]))
self.device_name.setText(str(self.params[2]))
self.device_status.setText(str(self.params[1]))
self.baud_rate_edit.setText(self.params[5])
self.connectSignalSlots()
self.setWindowTitle('Параметры {}'.format(self.name))
self.setFixedSize(506, 374)
def connectSignalSlots(self):
self.baud_rate_edit.textChanged.connect(self.baud_rate_changed)
self.id_edit.textChanged.connect(self.id_changed)
def baud_rate_changed(self, s):
rw_device_params.write_device_params(self.name, 'baud_rate', s)
def id_changed(self, s):
rw_device_params.write_device_params(self.name, 'id', s)
class Window(QMainWindow, Ui_MainWindow):
def __init__(self):
# Init window
super().__init__()
self.setupUi(self)
# Refresh list of connected deivces
self.update_devices()
self.connectSignalsSlots()
# Timer for updating window
self.timer_update_1 = QtCore.QTimer()
self.timer_update_1.setInterval(500)
self.timer_update_1.timeout.connect(self.update_measure_vac)
self.timer_update = QtCore.QTimer()
self.timer_update.setInterval(500)
self.timer_update.timeout.connect(self.update_measure)
self.graph_vac_1.setLabel(
'left', "<span style=\"color:white;font-size:30px\">I, мкА</span>")
self.graph_vac_1.setLabel(
'bottom', "<span style=\"color:white;font-size:30px\">U, В</span>")
self.graph_vac.setLabel(
'left', "<span style=\"color:white;font-size:30px\">I, мА</span>")
self.graph_vac.setLabel(
'bottom', "<span style=\"color:white;font-size:30px\">U, В</span>")
self.graph_vac_4.setLabel(
'left', "<span style=\"color:white;font-size:30px\">I, мкА</span>")
self.graph_vac_4.setLabel(
'bottom', "<span style=\"color:white;font-size:30px\">U, В</span>")
self.graph_vac_4.addLegend(
labelTextColor='white', labelTextSize='16pt')
# Variables
self.start_voltage = 0
self.end_voltage = 0
self.step_voltage = 0
self.exp_A = 0
self.I = []
self.V = []
self.polarity = True
self.time_estimated = 0
self.save_directory = ''
self.file_name = ''
self.thread = QtCore.QThread()
# Connect all signal slots
def connectSignalsSlots(self):
self.properties_V1.triggered.connect(
lambda: self.device('V1'))
self.properties_V2.triggered.connect(
lambda: self.device('V2'))
self.properties_A1.triggered.connect(
lambda: self.device('A1'))
self.properties_A2.triggered.connect(
lambda: self.device('A2'))
self.directory.clicked.connect(
lambda: self.choose_directory())
self.choose_folder.triggered.connect(
lambda: self.choose_directory())
self.begin_measure_1.clicked.connect(
lambda: self.measure_vac())
self.end_measure_1.clicked.connect(
lambda: self.end_of_measure_vac())
self.change_polarity_1.clicked.connect(
lambda: self.change_polarity())
self.confirm_measure_1.clicked.connect(
lambda: self.confirm_measure())
self.navigation.currentChanged.connect(
lambda: self.plot_to_run())
self.begin_measure.clicked.connect(
lambda: self.measure())
self.end_measure.clicked.connect(
lambda: self.end_of_measure())
# Updates list of connected devices
def update_devices(self):
self.devices = check_connection.check_devices_connection()
self.device_status()
# Updates device status on main_window
def device_status(self):
self.A1_status.setText(self.devices[2])
if self.A1_status.text() == 'Online':
self.A1_status.setStyleSheet('color : rgb(144,238,144);')
else:
self.A1_status.setStyleSheet('color : red;')
self.A2_status.setText(self.devices[3])
if self.A2_status.text() == 'Online':
self.A2_status.setStyleSheet('color : rgb(144,238,144);')
else:
self.A2_status.setStyleSheet('color : red;')
self.V1_Status.setText(self.devices[0])
if self.V1_Status.text() == 'Online':
self.V1_Status.setStyleSheet('color : rgb(144,238,144);')
else:
self.V1_Status.setStyleSheet('color : red;')
self.V2_status.setText(self.devices[1])
if self.V2_status.text() == 'Online':
self.V2_status.setStyleSheet('color : rgb(144,238,144);')
else:
self.V2_status.setStyleSheet('color : red;')
# Open window with device params
def device(self, device_name):
self.device_window = Window_Device(device_name)
self.device_window.show()
# Open a folder_window to choose folder
def choose_directory(self):
dlg = QFileDialog
folderpath = dlg.getExistingDirectory(self, 'Выбор директории')
self.save_directory = folderpath
self.chosen_directory.setText(self.save_directory)
# Start measurement
def measure(self):
if self.save_directory != '':
# Create voltmeter and ammeter
try:
self.A1 = ammeter.B7_78('/dev/ttyUSB0')
self.V1 = voltmeter.AKIP('/dev/usbtmc0')
except:
self.error.setText(QtCore.QTime.currentTime().toString(
)+' - [Errno2]')
# Start thread for measurments
self.thread = ThreadMeasure(
self, self.A1, self.V1)
self.thread.start()
self.thread.running = True
# Start timer
self.timer_update.start()
# Set graph params
self.graph_vac.setXRange(29, 36, padding=0)
self.graph_vac.setYRange(0, 6, padding=0)
self.graph_vac.showGrid(x=True, y=True, alpha=1)
# Set buttons properties
self.begin_measure.setEnabled(False)
self.end_measure.setEnabled(True)
self.end_measure.setStyleSheet(style_sheet_button_enabled)
self.begin_measure.setStyleSheet(style_sheet_button_unabled)
# Set experiment status
self.exp_status.setText('В процессе')
else:
self.error.setText(QtCore.QTime.currentTime().toString(
)+' - [Errno0]')
# Start measurment for vac
def measure_vac(self):
# Create voltmeter, ammeter and power_unit
try:
self.A2 = ammeter.B7_78('/dev/ttyUSB0')
self.PU = power_unit.Agilent('/dev/ttyUSB1')
self.V2 = voltmeter.AKIP('/dev/usbtmc0')
# Start thread for measurments
self.thread = ThreadMeasureVac(
self, self.start_voltage, self.end_voltage, self.step_voltage, self.polarity, self.A2, self.V2, self.PU)
self.thread.start()
# Start timer
self.timer_update_1.start()
# Set graph params
self.graph_vac_1.setXRange(-26, 26, padding=0)
self.graph_vac_1.setYRange(-130, 130, padding=0)
self.graph_vac_1.showGrid(x=True, y=True, alpha=1)
# Set buttons properties
self.begin_measure_1.setEnabled(False)
self.end_measure_1.setEnabled(True)
self.change_polarity_1.setEnabled(True)
self.change_polarity_1.setStyleSheet(style_sheet_button_enabled)
self.end_measure_1.setStyleSheet(style_sheet_button_enabled)
self.begin_measure_1.setStyleSheet(style_sheet_button_unabled)
# Set experiment status
self.exp_status_1.setText('В процессе')
except:
self.error.setText(QtCore.QTime.currentTime().toString(
)+' - [Errno2]')
# End measurement for vac
def end_of_measure_vac(self):
# Stop measurement thread
self.thread.quit()
self.timer_update_1.stop()
# Set experiment status
self.exp_status_1.setText('Окончен')
# Update button properties
self.change_polarity_1.setStyleSheet(style_sheet_button_unabled)
self.change_polarity_1.setEnabled(False)
self.confirm_measure_1.setEnabled(True)
self.begin_measure_1.setEnabled(False)
self.confirm_measure_1.setStyleSheet(style_sheet_button_enabled)
self.begin_measure_1.setStyleSheet(style_sheet_button_unabled)
self.exp_A_1.setEnabled(True)
self.start_voltage_1.setEnabled(True)
self.end_voltage_1.setEnabled(True)
self.step_voltage_1.setEnabled(True)
# Save collected data
df = pd.DataFrame(list(zip(self.V, self.I)), columns=['U', 'I'])
df.to_csv(self.save_directory+'/'+self.file_name+'mA.csv')
# Build fitted curve
self.I = df['I']
self.U = df['U']
df, params, pcov = analize.getApproxValues(pd.DataFrame(
list(zip(self.U, self.I)), columns=['U', 'I']),)
self.graph_vac_1.plot(
df['U'], df['I'], pen=pg.mkPen('yellow', width=1))
# Plot fitted curve
self.graph_vac_1.plot(self.U, self.I, symbol='o',
pen=None, symbolSize=10, symbolPen='yellow')
tmp = analize.discrepancy([self.I, self.U], params)
error = pg.ErrorBarItem(
x=self.U, y=self.I, top=tmp, bottom=tmp, beam=0.5)
self.graph_vac_1.addItem(error)
# Update value labels
self.I_n_1.setText(str(int(params[0])))
self.T_e_1.setText(str(int(params[1])))
self.I_n_6.setText(str(int(params[0])))
self.T_e_6.setText(str(int(params[1])))
# Zero variables
self.U = []
self.I = []
self.time_estimated = 0
# End measurement
def end_of_measure(self):
# Stop measurement thread
self.thread.running = False
self.timer_update.stop()
# Set experiment status
self.exp_status.setText('Окончен')
# Update button properties
self.begin_measure.setEnabled(False)
self.begin_measure.setStyleSheet(style_sheet_button_unabled)
# Save collected data
df = pd.DataFrame(list(zip(self.V, self.I)), columns=['U', 'I'])
df.to_csv(self.save_directory+'/'+'discharge_VAC.csv')
# Build fitted curve
# Update value labels
# Zero variables
self.U = []
self.I = []
self.time_estimated = 0
# Update labels and graph during vac experiment
def update_measure_vac(self):
# Update time
self.time_estimated += 0.5
# Update value labels
self.amount_of_dots_1.setText(str(len(self.I)))
self.time_from_begin_1.setText(str(self.time_estimated))
try:
self.last_A_1.setText(str(self.I[-1]))
self.last_V_1.setText(str(self.V[-1]))
except (IndexError, ValueError, KeyError):
pass
# Plot current V and I
self.graph_vac_1.plot(self.V, self.I, symbol='o',
pen=None, symbolSize=10, symbolPen='yellow')
# Update labels and graph during discharge experiment
def update_measure(self):
# Update time
self.time_estimated += 0.5
# Update value labels
self.amount_of_dots.setText(str(len(self.I)))
self.time_from_begin.setText(str(self.time_estimated))
try:
self.last_A.setText(str(self.I[-1]))
self.last_V.setText(str(self.V[-1]))
except (IndexError, ValueError):
pass
# Plot current V and I
self.graph_vac.plot(self.V, self.I, symbol='o',
pen=None, symbolSize=10, symbolPen='yellow')
# Start a thread with changed polarity
def change_polarity(self):
# Stop existing thread
self.thread.quit()
# Update polarity value
self.polarity = not self.polarity
# Creat new thread for measurement with changed polarity
self.thread = ThreadMeasureVac(
self, self.start_voltage, self.end_voltage, self.step_voltage, self.polarity, self.A2, self.V2, self.PU)
self.thread.start()
# Update button properties
if self.polarity == True:
self.change_polarity_1.setStyleSheet(style_sheet_polarity_plus)
else:
self.change_polarity_1.setStyleSheet(style_sheet_polarity_minus)
# Set experment params for vac
def confirm_measure(self):
if self.save_directory != '':
try:
# Save measure params from line_edits
self.exp_A = str(self.exp_A_1.text())
self.start_voltage = float(self.start_voltage_1.text())
self.end_voltage = float(self.end_voltage_1.text())
self.step_voltage = float(self.step_voltage_1.text())
self.file_name = str(self.exp_A_1.text())
# Set button properties
self.confirm_measure_1.setEnabled(False)
self.begin_measure_1.setEnabled(True)
self.confirm_measure_1.setStyleSheet(
style_sheet_button_unabled)
self.begin_measure_1.setStyleSheet(style_sheet_button_enabled)
self.exp_A_1.setEnabled(False)
self.start_voltage_1.setEnabled(False)
self.end_voltage_1.setEnabled(False)
self.step_voltage_1.setEnabled(False)
self.graph_vac_1.clear()
self.I_n_1.setText('Не определено')
self.T_e_1.setText('Не определено')
self.time_from_begin_1.setText('None')
self.amount_of_dots_1.setText('None')
self.last_V_1.setText('None')
self.last_A_1.setText('None')
self.exp_status_1.setText('Не начат')
self.graph_vac_1.plotItem.clear()
except ValueError:
self.error.setText(QtCore.QTime.currentTime().toString(
)+' - [Errno1]')
else:
self.error.setText(QtCore.QTime.currentTime().toString(
)+' - [Errno0]')
# Check which tab has been opened
def plot_to_run(self):
if self.navigation.currentIndex() == 3:
self.plot_all()
else:
self.graph_vac_4.clear()
# Plot all data on 3rd tab
def plot_all(self):
if self.save_directory != '':
T = [self.T_e_4, self.T_e_5, self.T_e_6]
I = [self.I_n_4, self.I_n_5, self.I_n_6]
B = [self.B_koef_4, self.B_koef_5, self.B_koef_6]
for file, color, T_e, I_n, B_koef in zip(['1.5mA.csv', '3.0mA.csv', '5.0mA.csv'], ['green', 'yellow', 'orange'], T, I, B):
try:
df = pd.read_csv(self.save_directory+'/'+file)
self.I = df['I']
self.U = df['U']
df, params, error = analize.getApproxValues(pd.DataFrame(
list(zip(self.U, self.I)), columns=['U', 'I']))
T_e.setText(str(int(params[1])) +
u"\u00B1" + str(int(error[1])))
I_n.setText(str(int(params[0])) +
u"\u00B1" + str(int(error[0])))
B_koef.setText(str('%.2f' % params[3]) +
u"\u00B1" + str('%.2f' % error[3]))
self.graph_vac_4.plot(
df['U'], df['I'], pen=pg.mkPen(color, width=1), name=file.rstrip('.csv'))
# Plot fitted curve
self.graph_vac_4.plot(self.U, self.I, symbol='o',
pen=None, symbolSize=7, symbolPen=color)
self.graph_vac_4.showGrid(x=True, y=True, alpha=1)
except FileNotFoundError:
self.error.setText(QtCore.QTime.currentTime().toString(
)+' - [Errno3]')
pass
else:
self.error.setText(QtCore.QTime.currentTime().toString(
)+' - [Errno0]')
if __name__ == "__main__":
app = QApplication(sys.argv)
win = Window()
win.show()
sys.exit(app.exec())

View File

@ -0,0 +1,53 @@
import re
import subprocess
import os
import rw_device_params
# Check single device connection by device ID and return status of connection
# Input example: '0000:0000'
# Output example: 'Online'
def check_connection(device_ID):
df = str(subprocess.check_output("lsusb"))
if re.search(device_ID, df):
return True
else:
return False
# Check all device connection by their ID's and returns list of connection status
# Output example: ['Online', 'Offline', 'Online', 'Offline]
def check_devices_connection():
status = []
for root, dirs, files in os.walk('data'):
for filename in files:
filename = filename.rstrip('.dat')
params = rw_device_params.read_device_params(filename)
id = params[3]
if check_connection(id):
status.append('Online')
rw_device_params.write_device_params(
filename, 'status', 'Online')
else:
status.append('Offline')
rw_device_params.write_device_params(
filename, 'status', 'Offline')
return status
# Finf ttyUSB_ file for device by it's ID
# Input example: '0000:0000'
# Output example: 'ttyUSB0'
def get_device_folder_tty(id):
folder = subprocess.check_output(
['python3', '-m', 'serial.tools.list_ports', '-q', id]).decode().rstrip('\n').strip()
return folder
def get_device_folder_tmc(id):
folder = subprocess.check_output(
['python3', '-m', 'serial.tools.list_ports', '-q', id]).decode().rstrip('\n').strip()
return folder

View File

@ -0,0 +1,19 @@
import check_connection
import serial
class Agilent():
def __init__(self, folder):
self.folder = folder
self.instr = serial.Serial(
self.folder, timeout=1, inter_byte_timeout=1)
msg = 'OUTput on\n'
self.instr.write(msg.encode('ascii'))
def setVoltageDC(self, volt):
self.instr.write(('VOLTage '+str(volt)+'\n').encode('ascii'))
def getID(self):
self.instr.write(('*IDN?\n').encode('ascii'))
print(self.instr.read(100).decode().rstrip())

View File

@ -0,0 +1,33 @@
# Dict for all devices params
all_params = {'func': 0, 'status': 1, 'name': 2,
'id': 3, 'type_of_connection': 4, 'baud_rate': 5}
# Read device params from .dat device file
def read_device_params(name):
file = open('data/{}.dat'.format(name), 'r')
params = file.readline().split()
file.close()
return params
# Change device params in .dat device file
def write_device_params(name, param, value):
try:
file = open('data/{}.dat'.format(name), 'r+')
params = file.readline().split()
if value != '':
params[all_params[param]] = str(value)
else:
params[all_params[param]] = 'none'
file.seek(0)
file.truncate()
file.write(' '.join(params))
file.close()
except IndexError:
print('ErrorType = FileError: File.dat for this device does not exist')