使用 python 连接安捷伦 34970A 的串口设备
近日需要通过 Python 连接一台安捷伦 34970A 的串口设备,并实现控制其参数。因为安捷伦自带 VISA 的驱动,所以直接通过 PyVISA 连接 下面对控制过程进行记录,以留后续查阅:
- 安装 Python 环境
- 安装 PyVISA
- 安装 Keysight IO Library Suite
Keysight IO Library Suite
TIP
IO library Suite 是"是德仪器"的通用驱动,是使用 IVI 驱动的前提条件。安装 IVI 驱动之前需要先下载安装 IOlibrary suite 驱动。IO library Suite 支持绝大多数的仪器控制接口如 GPIB、RS232、LAN 甚至包括 PXI 总线仪器等。其中包含了灵活方便的软件工具如 Connection Expert、IO Monitor 等。仪器的 IVI 驱动是专门针对某一类仪器编写的驱动,对 VISA 函数做了针对性的封装。参考 IVI 驱动中的范例和函数帮助文档可以更容易更快地完成自己的开发应用。
在 IOconnect expert 中可以看到连接的设备,以及进行指令测试
- 通过官方的工具 Agilent BenchLink Data Logger 3 连接设备,发送命令
- 用 IO Monitor 来监听,给设备发送的具体命令,以及命令的返回值
- 在结合文档Keysight 34970A Command Reference 来查找每个命令的含义
PyVISA
TIP
PyVISA 是一个 Python 库,用于通过 VISA(Virtual Instrument Software Architecture)接口与仪器进行通信。
安装依赖库
pip install pyvisa pyvisa-py psutil zeroconf -i https://mirrors.aliyun.com/pypi/simple/
连接设备
TIP
一旦连接到仪器,你可以发送命令和读取数据。大多数仪器使用 SCPI(Standard Commands for Programmable Instruments)命令集
import pyvisa
import time
# 创建 Resource Manager
rm = pyvisa.ResourceManager()
# 列出可用的仪器资源
resources = rm.list_resources()
print("visa资源列表:", resources)
# 链接34970 可以通过 Keysight IO Library Suit 查看
v34970A = rm.open_resource('ASRL4::INSTR')
idn = v34970A.query('*IDN?') # 设备信息
print(idn)
完整示例
以下是一个完整的示例,演示如何与采集器进行基本的通信:
import pyvisa
import time
# 创建 Resource Manager
rm = pyvisa.ResourceManager()
# 列出可用的仪器资源
resources = rm.list_resources()
print("Available resources:", resources)
# 链接34970
v34970A = rm.open_resource('ASRL4::INSTR')
idn = v34970A.query('*IDN?') # 设备信息
print(idn)
v34970A.write('SYST:ERR?') # 检查是否有错误
print("SYST:ERR?", v34970A.read())
v34970A.write('INST:DMM:INST?') # 检查DMM是否安装,0是未安装,1是安装
print("INST:DMM:INST?", v34970A.read())
v34970A.write('INST:DMM?') # 返回DMM的状态
print("INST:DMM?", v34970A.read())
v34970A.write('SYST:CTYP? 100') # 返回100模块标识
print("SYST:CTYP? 100", v34970A.read())
v34970A.write('SYST:CTYP? 200') # 返回200模块标识
print("SYST:CTYP? 200", v34970A.read())
v34970A.write('STAT:OPER:COND?') # 读取条件存储器
print("STAT:OPER:COND?", v34970A.read())
v34970A.write('DATA:POINts?') # 返回内存中的数据
print("DATA:POINts?", v34970A.read())
v34970A.write('ROUTe:SCAN?') # 返回通道列表
print("ROUTe:SCAN?", v34970A.read())
v34970A.write('DISP?') # 返回面板显示设置,0是关闭,1是打开
print("DISP?", v34970A.read())
v34970A.write('*RST') # 重置仪器
v34970A.write('MMEM:LOG:ENAB OFF') # 从扫描列表中清除所有通道
v34970A.write('ROUT:SCAN (@)') # 从扫描列表中清除所有通道
v34970A.write('DISP ON') # 关闭面板显示
v34970A.write('OUTP:ALAR:MODE LATC') # 设置警告输出模式:LATC锁存模式
v34970A.write('OUTP:ALAR:SLOP NEG') # 设置警告输出斜率为负-
v34970A.write('CONF:TEMP RTD,85 ,(@101)') # 将101通道设置为等待触发状态
v34970A.write('TEMP:NPLC 1 ,(@101)') # 设置101通道的集成时间:1PLC
v34970A.write('UNIT:TEMP C,(@101)') # 设置101通道的温度单位:摄氏度 C
v34970A.write('TEMP:TRAN:RTD:TYPE 85,(@101)') # 设置101通道的传感类型:85
v34970A.write('TEMP:TRAN:RTD:RES 1000,(@101)') # 设置101通道的电阻:1000
v34970A.write('ROUT:CHAN:DEL 0.001,(@101)') # 设置101通道的信号延迟:0.001
v34970A.write('ROUT:CHAN:DEL:AUTO ON,(@101)') # 设置101通道的自动延迟:开启
v34970A.write('CALC:SCAL:GAIN 1,(@101)') # 设置101通道的增益值:1
v34970A.write('CALC:SCAL:OFFS -0.92,(@101)') # 设置101通道的偏移值:-0.92
v34970A.write("CALC:SCAL:UNIT 'C',(@101)") # 设置101通道的读取单位:摄氏度 C
v34970A.write('CALC:SCAL:STAT ON,(@101)') # 设置101通道的Mx+B缩放:启动
v34970A.write('OUTP:ALAR1:SOUR (@101)') # 清除101通道的警告输出行
v34970A.write('CALC:LIM:UPP MAX,(@101)') # 设置101通道的警告上限:最大
v34970A.write('CALC:LIM:LOW:STAT OFF ,(@101)') # 设置101通道的下限警告:关闭
v34970A.write('CALC:LIM:UPP:STAT OFF ,(@101)') # 设置101通道的上限警告:关闭
v34970A.write('CALC:LIM:LOW 0,(@101)') # 设置101通道的警告下限:0
v34970A.write('CALC:LIM:UPP 1,(@101)') # 设置101通道的警告上限:1
v34970A.write('ROUT:SCAN (@101,102,103,104,105,106,107,108)') # 指定扫描列表
v34970A.write('TRIG:TIM 10.000') # 设置触发间隔为10秒
v34970A.write('TRIG:SOUR Timer') # 选择触发
v34970A.write('TRIG:COUN INF') # 选择计数器:连续计数器
v34970A.write('SYST:DATE 2024,9,18') # 设置日期
v34970A.write('SYST:TIME 11,40,28') # 设置时间
v34970A.write('FORM:READ:ALAR ON') # 开启警告信息
v34970A.write('FORM:READ:CHAN ON') # 开启通道信息
v34970A.write('FORM:READ:TIME ON') # 允许包含时间戳
v34970A.write('FORM:READ:UNIT ON') # 允许包含单位
v34970A.write('FORM:READ:TIME:TYPE REL') # 开启相对时间格式
v34970A.write('INIT') # 初始化命令,对指定通道进行扫描
while True:
v34970A.write('DATA:POIN?') # 查询缓存中有多少组数据
count = v34970A.read().replace('+', '').strip()
print("DATA:POIN?", count)
v34970A.write('R? '+count) # 读取缓存中所有的数据,并在内存中删除
print("R? ", v34970A.read())
time.sleep(10) # 休息10秒
# v34970A.write('ABOR') # 停止
简化代码
其中读取数据只用到了最后 10 行代码,前面都是一些配置命令,那可以用 Agilent BenchLink Data Logger 3 设置要配置参数,然后导出成Config.blcfg
文件,通过读取文件的方式来发送配置命令,简化之后的代码:
import pyvisa
import time
# 创建 Resource Manager
rm = pyvisa.ResourceManager()
# 列出可用的仪器资源
resources = rm.list_resources()
print("Available resources:", resources)
# 链接34970
v34970A = rm.open_resource('ASRL4::INSTR')
idn = v34970A.query('*IDN?') # 设备信息
print(idn)
with open('Config.blcfg', 'r') as file:
for line in file:
if not line.startswith('#'): # 跳过注释行
v34970A.write(line)
v34970A.write('INIT') # 初始化命令,对指定通道进行扫描
while True:
v34970A.write('DATA:POIN?') # 查询缓存中有多少组数据
count = v34970A.read().replace('+', '').strip()
print("DATA:POIN?", count)
v34970A.write('R? '+count) # 读取缓存中所有的数据,并在内存中删除
print("R? ", v34970A.read())
time.sleep(10) # 休息10秒
# v34970A.write('ABOR') # 停止
保存到数据库
前面通过 Python 已经实现了每 10 秒采集一次数据,并打印出来,但实际的需求是:
- 每 10 秒采集一次数据
- 每 5 分钟对前面采集的数据进行平均,并保存到数据库
- 这里保存到数据库采用的是调用后端 API 的方式,而不是直接保存到数据库
- 采用 tkinter 写一个简易的 GUI 界面
import datetime
import json
import tkinter as tk
import tkinter.filedialog
from collections import deque
import pyvisa
import time
import decimal
import threading
import tkinter.scrolledtext as scrolledtext
import requests
# 创建窗口
root = tk.Tk()
root.title('Agilent_34970A')
# 创建 Resource Manager
rm = pyvisa.ResourceManager()
v34970A = rm.open_resource('ASRL4::INSTR')
def on_open_button_click():
# 链接34970
log_text.delete(1.0, tk.END)
v34970A.write('*RST') # 重置仪器
idn = v34970A.query('*IDN?') # 设备信息
log_text.insert(1.0, idn)
thread = threading.Thread(target=long_running_task)
thread.start()
def long_running_task():
# 导入配置文件的日志记录
filetypes = (
('Text files', '*.blcfg'),
('All files', '*.*')
)
filename = tk.filedialog.askopenfilename(
title='Select file',
initialdir="/",
filetypes=filetypes
)
log_text.insert(tk.END, filename + "\n")
if filename:
with open(filename, 'r') as file:
for line in file:
if not line.startswith('#'):
v34970A.write(line) # 导入配置文件
v34970A.write('SYST:ERR?') # 检查是否有错误
t = line.replace("\n", "") + "=>" + v34970A.read()
log_text.insert(tk.END, t)
# 自动滚动到底部
log_text.yview(tk.END)
log_text.insert(tk.END, "end\n")
log_text.yview(tk.END)
run_button.config(state=tk.NORMAL)
def on_run_button_click():
# 获取当前日期和时间
now = datetime.datetime.now()
formatted_now = now.strftime("%Y-%m-%d %H:%M:%S")
global running
if running:
# 停止循环
running = False
v34970A.write('ABOR') # 停止
run_button.config(text="开始扫描")
log_text.insert(tk.END, formatted_now + " 停止扫描\n")
else:
# 开始循环
running = True
run_button.config(text="停止扫描")
log_text.insert(tk.END, formatted_now + " 开始扫描\n")
thread = threading.Thread(target=data_running_task)
thread.start()
def data_running_task():
data_points = {
"101": [],
"102": [],
"103": [],
"104": [],
"105": [],
"106": [],
"107": [],
"108": [],
} # 存储数据点
start_time = time.time() # 记录开始时间
v34970A.write('INIT') # 初始化命令,对指定通道进行扫描
while True:
current_time = time.time()
elapsed_minutes = (current_time - start_time) / 60
v34970A.write('DATA:POIN?') # 查询缓存中有多少数据
count = v34970A.read().replace('+', '').strip()
v34970A.write('R? ' + count) # 读取缓存中所有数值,并在内存中删除
res = v34970A.read()
if count == '8':
res_list = res.replace('#3311', '').split(",0,")
single_list = [str(decimal.Decimal(i.split(' ')[0])) for i in res_list]
# 获取当前日期和时间
now = datetime.datetime.now()
formatted_now = now.strftime("%Y-%m-%d %H:%M:%S")
log_text.insert(tk.END, formatted_now + " " + ",".join(single_list) + "\n")
# 将数据添加到队列
data_points["101"].append(float(single_list[0]))
data_points["102"].append(float(single_list[1]))
data_points["103"].append(float(single_list[2]))
data_points["104"].append(float(single_list[3]))
data_points["105"].append(float(single_list[4]))
data_points["106"].append(float(single_list[5]))
data_points["107"].append(float(single_list[6]))
data_points["108"].append(float(single_list[7]))
# 每5分钟保存一次数据
if elapsed_minutes >= 5:
api_data = save_api(data_points)
log_text.insert(tk.END, api_data + "\n")
# 清空数据列表并重置计时器
data_points = {
"101": [],
"102": [],
"103": [],
"104": [],
"105": [],
"106": [],
"107": [],
"108": [],
}
start_time = time.time()
# 自动滚动到底部
log_text.yview(tk.END)
# 自动滚动到底部
log_text.yview(tk.END)
time.sleep(10)
def on_window_close():
"""处理窗口关闭事件"""
v34970A.write('ABOR') # 停止
root.destroy()
def save_api(data_points):
now = datetime.datetime.now()
formatted_now = now.strftime("%Y-%m-%d %H:%M:%S")
"""保存到数据库"""
url = "API接口地址"
payload = json.dumps({
"inflow_101": round(sum(data_points["101"]) / len(data_points["101"]), 3),
"outflow_102": round(sum(data_points["102"]) / len(data_points["102"]), 3),
"inflow_103": round(sum(data_points["103"]) / len(data_points["103"]), 3),
"outflow_104": round(sum(data_points["104"]) / len(data_points["104"]), 3),
"wet_bulb_105": round(sum(data_points["105"]) / len(data_points["105"]), 3),
"dry_bulb_106": round(sum(data_points["106"]) / len(data_points["106"]), 3),
"wet_bulb_107": round(sum(data_points["107"]) / len(data_points["107"]), 3),
"dry_bulb_108": round(sum(data_points["108"]) / len(data_points["108"]), 3),
"test_time": formatted_now
})
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
return response.text
# 创建一个多行文本框
log_text = scrolledtext.ScrolledText(root, wrap=tk.WORD, height=10)
log_text.pack(fill=tk.BOTH, expand=True)
# 创建一个容器来放置按钮
button_frame = tk.Frame(root)
button_frame.pack(fill=tk.X)
# 创建导入按钮
open_button = tk.Button(button_frame, text='导入配置文件', command=on_open_button_click)
open_button.pack(side=tk.LEFT, padx=5, pady=5)
# 定义状态变量
running = False
# 创建开始/停止按钮
run_button = tk.Button(button_frame, text="开始扫描", command=on_run_button_click)
run_button.pack(side=tk.RIGHT, padx=5, pady=5)
run_button.config(state=tk.DISABLED)
# 绑定窗口关闭事件
root.protocol("WM_DELETE_WINDOW", on_window_close)
# 设置窗口大小可变
root.minsize(800, 400) # 设置最小尺寸
root.mainloop()