概述
Pysmu专业数据分析工具是一个基于Python的高级数据分析系统,专门用于ADALM1000设备的数据采集、分析和报告生成。该工具提供了全面的统计分析、频域分析、异常检测和专业报告生成功能。
功能特性
高级分析功能
基础统计分析: 均值、标准差、最值、中位数、四分位数等
高级统计指标: 偏度、峰度、变异系数、信噪比、总谐波失真等
频域分析: 功率谱密度、主频分量、带宽分析
稳定性分析: 滑动窗口统计、趋势分析
异常检测: Z-score和IQR方法的异常值检测
功率分析: 瞬时功率、视在功率、功率因数、阻抗计算
专业可视化
时域信号对比图: 双通道电压波形显示
功率谱密度图: 频域特性分析
统计分布直方图: 数据分布可视化
稳定性分析图: 滑动窗口统计趋势
异常检测图表: 异常率对比分析
关键指标仪表盘: 综合评估表格
多格式输出
CSV数据文件: 原始数据和分析结果
PNG高清图表: 300 DPI专业图表
PDF综合报告: 包含封面、图表、数据表格和原始数据
JSON汇总报告: 机器可读的分析结果
安装要求
必需依赖
pip install numpy pandas matplotlib scipy pysmu
系统要求
-
Python 3.7+ -
Windows/Linux/macOS -
ADALM1000设备及驱动 -
libsmu C++库
使用方法
基本使用
# 进入示例目录
cd c:UsersAdministratorDownloadslibsmu-masterbindingspythonexamples
# 运行专业分析工具
python professional_analysis_english.py
程序流程
-
设备检测: 自动检测连接的ADALM1000设备 -
设备配置: 配置通道模式和目标值 -
数据采集: 采集指定数量的样本数据 -
数据分析: 执行全面的统计和频域分析 -
结果保存: 生成CSV、PNG、PDF和JSON文件 -
报告生成: 创建专业的分析报告
参数控制
主要可配置参数
1. 采样参数
# 在 run_professional_analysis() 方法中
sample_count = 2000 # 采样数量 (默认: 2000)
sample_rate = 100000 # 采样率 (默认: 100kHz)
2. 通道配置
# 通道A配置 (电压源模式)
target_voltage_a = idx % 6 # 目标电压 (0-5V)
dev.channels['A'].mode = Mode.SVMI # 源电压测电流模式
# 通道B配置 (电流源模式)
target_current_b = 0.05 # 目标电流 (默认: 50mA)
dev.channels['B'].mode = Mode.SIMV # 源电流测电压模式
3. 分析参数
# 稳定性分析窗口大小
window_size = 100 # 滑动窗口大小 (默认: 100)
# 异常检测阈值
threshold = 3 # Z-score阈值 (默认: 3)
# 频域分析参数
fundamental_freq = None # 基频 (自动检测)
4. 输出控制
# 输出目录
output_dir = "analysis_results" # 默认输出目录
# 图表分辨率
dpi = 300 # 图表DPI (默认: 300)
# 显示样本数
display_samples = 1000 # 图表显示的样本数 (默认: 1000)
自定义配置示例
# 创建自定义分析器
analyzer = ProfessionalAnalyzer(output_dir="my_analysis")
# 修改采样参数
# 在 run_professional_analysis() 中修改:
sample_count = 5000# 增加采样数量
# 修改通道配置
target_voltage_a = 2.5# 设置固定电压
target_current_b = 0.1 # 增加电流到100mA
# 修改分析参数
window_size = 200 # 增加窗口大小
threshold = 2.5 # 降低异常检测阈值
输出文件解读
文件结构
analysis_results/
├── csv_data/
│ ├── device_0_raw_data_YYYYMMDD_HHMMSS.csv
│ └── device_0_analysis_YYYYMMDD_HHMMSS.csv
├── plots/
│ └── device_0_analysis_YYYYMMDD_HHMMSS.png
└── reports/
├── device_0_report_YYYYMMDD_HHMMSS.pdf
└── summary_report_YYYYMMDD_HHMMSS.json
1. 原始数据文件 (raw_data.csv)
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2. 分析结果文件 (analysis.csv)
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
分析类别说明
Basic_Statistics (基础统计)
-
mean: 平均值 -
std: 标准差 -
min/max: 最小/最大值 -
median: 中位数 -
rms: 均方根值
Advanced_Statistics (高级统计)
-
snr_db: 信噪比 (dB) -
thd: 总谐波失真 (%) -
cv: 变异系数 (%) -
skewness: 偏度 -
kurtosis: 峰度
Frequency_Analysis (频域分析)
-
dominant_frequency: 主频 (Hz) -
bandwidth_3db: 3dB带宽 (Hz) -
total_power: 总功率 (W)
3. PDF报告内容
第一页: 封面和摘要
-
设备信息 (序列号、固件版本、硬件版本) -
测试时间和样本数量 -
关键指标摘要 -
整体质量评估
第二页: 分析图表
-
时域信号对比: 显示双通道电压波形 -
功率谱密度: 频域特性分析 -
电压分布直方图: 统计分布可视化 -
稳定性分析: 滑动窗口统计 -
异常检测结果: 异常率对比 -
关键指标仪表盘: 综合评估表格
第三页: 详细数据表格
-
基础统计参数对比 -
高级统计指标 -
分类整理的数值数据
第四页: 原始数据样本
-
前50个样本的详细数据 -
包含时间戳和双通道测量值 -
总样本数统计信息
4. 图表解读
时域信号图
-
X轴: 时间 (秒) -
Y轴: 电压 (V) -
蓝线: 通道A电压 -
红线: 通道B电压
功率谱密度图
-
X轴: 频率 (Hz) -
Y轴: 功率谱密度 (对数刻度) -
用途: 识别信号频率成分和噪声
稳定性分析图
-
X轴: 窗口索引 -
Y轴: 电压 (V) -
实线: 滑动窗口均值 -
阴影: 标准差范围
异常检测图
-
X轴: 检测方法 -
Y轴: 异常率 (%) -
蓝色: 通道A异常率 -
红色: 通道B异常率
5. 质量评估标准
|
|
|
|
|
|---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
故障排除
常见问题
1. 设备未检测到
Error: No devices detected
解决方案:
-
检查ADALM1000设备连接 -
确认驱动程序已安装 -
验证libsmu库安装
2. 依赖库缺失
Missing dependencies: No module named 'pandas'
解决方案:
pip install pandas scipy matplotlib
3. 数据采集失败
Data acquisition failed: timeout
解决方案:
-
减少采样数量 -
检查设备连接稳定性 -
重启设备和程序
4. 内存不足
解决方案:
-
减少采样数量 ( sample_count) -
减少显示样本数 ( display_samples) -
关闭不必要的程序
性能优化
1. 提高采集速度
# 减少采样数量
sample_count = 1000 # 从2000减少到1000
# 减少分析复杂度
window_size = 50 # 减少窗口大小
2. 提高分析精度
# 增加采样数量
sample_count = 5000 # 增加到5000
# 增加窗口大小
window_size = 200 # 增加窗口大小
3. 减少输出文件大小
# 降低图表分辨率
dpi = 150 # 从300降低到150
# 减少原始数据保存
# 注释掉不需要的CSV保存代码
扩展开发
添加自定义分析
def custom_analysis(self, signal_data):
"""
自定义分析函数
"""
# 添加您的分析逻辑
result = np.custom_function(signal_data)
return {'custom_metric': result}
# 在 advanced_voltage_analysis 中调用
custom_result = self.custom_analysis(voltages)
修改输出格式
# 添加Excel输出
def save_excel_report(self, device_idx, analysis_results):
filename = f"device_{device_idx}_report_{self.timestamp}.xlsx"
# 实现Excel保存逻辑
添加实时监控
# 实时数据流处理
def real_time_analysis(self):
while True:
samples = session.get_samples(100)
# 实时分析逻辑
time.sleep(0.1)
技术支持
联系信息
-
项目主页: libsmu GitHub -
文档: pysmu Documentation -
问题报告: GitHub Issues -
以及联系鄙人(🤣毕竟我写的)
版本信息
-
当前版本: 1.0.0 -
兼容pysmu: ≥1.0.0 -
Python要求: ≥3.7
更新日志
v1.0.0 (2025-8-23)
-
初始版本发布 -
完整的统计分析功能 -
PDF报告生成 -
多格式数据输出 -
专业图表可视化
注意: 本工具仅用于ADALM1000设备的数据分析,使用前请确保设备正确连接并安装相应驱动程序。
写在后面,中文实在是支持不了一点,都是小方格,我搞不定了,所以全英文的版本了:
Font configuration set
pysmu Professional Data Analysis Tool
==================================================
Detected 1 device(s)
Configuring device 0: serial 2032205054383031323130323130323 : fw 2.17 : hw F
Channel A: SVMI mode, target voltage 0V
Channel B: SIMV mode, target current 0.05A
Starting professional data acquisition...
Successfully acquired 2000 samples
============================================================
Device 0 Professional Data Analysis
============================================================
Raw data saved: analysis_resultscsv_datadevice_0_raw_data_20250823_110101.csv
Performing advanced data analysis...
Analysis results saved: analysis_resultscsv_datadevice_0_analysis_20250823_110101.csv
Generating professional charts...
Analysis chart saved: analysis_resultsplotsdevice_0_analysis_20250823_110101.png
这是输出日志
Generating PDF report...
PDF report generated: analysis_resultsreportsdevice_0_report_20250823_110101.pdf
=== Key Analysis Results ===
Channel A - Average Voltage: 0.003477V, SNR: 37.0dB, THD: 0.000%
Channel B - Average Voltage: 5.112401V, SNR: infdB, THD: 0.000%
=== Output Files ===
Raw data: analysis_resultscsv_datadevice_0_raw_data_20250823_110101.csv
Analysis results: analysis_resultscsv_datadevice_0_analysis_20250823_110101.csv
Analysis charts: analysis_resultsplotsdevice_0_analysis_20250823_110101.png
PDF report: analysis_resultsreportsdevice_0_report_20250823_110101.pdf
Professional analysis completed! All files saved in: analysis_results
Summary report saved: analysis_resultsreportssummary_report_20250823_110101.json
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
pysmu Professional Data Analysis Tool (English Version)
Features:
1. Advanced statistical analysis (including frequency domain analysis, correlation analysis, etc.)
2. Data export (CSV, Excel formats)
3. PDF report generation (with charts and detailed analysis)
4. Professional visualization (multiple chart types)
5. Data quality assessment and anomaly detection
6. Calibration recommendations and maintenance reminders
7. Batch device testing support
"""
from __future__ import print_function
import sys
import os
import csv
import json
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.backends.backend_pdf import PdfPages
from scipy import stats, signal
from scipy.fft import fft, fftfreq
import warnings
warnings.filterwarnings('ignore')
# Set font configuration
import matplotlib
matplotlib.rcParams['font.sans-serif'] = ['DejaVu Sans', 'Arial']
matplotlib.rcParams['axes.unicode_minus'] = False
print("Font configuration set")
try:
from pysmu import Session, Mode
except ImportError:
print("Error: Cannot import pysmu library")
print("Please ensure pysmu and libsmu are properly installed")
sys.exit(1)
class ProfessionalAnalyzer:
"""
Professional Data Analyzer Class
"""
def __init__(self, output_dir="analysis_results"):
"""
Initialize analyzer
Args:
output_dir: Output directory
"""
self.output_dir = output_dir
self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.create_output_directory()
# Analysis results storage
self.analysis_results = {}
self.device_info = {}
self.raw_data = {}
def create_output_directory(self):
"""
Create output directory structure
"""
dirs = [
self.output_dir,
os.path.join(self.output_dir, 'csv_data'),
os.path.join(self.output_dir, 'plots'),
os.path.join(self.output_dir, 'reports')
]
for dir_path in dirs:
ifnot os.path.exists(dir_path):
os.makedirs(dir_path)
print(f"Created directory: {dir_path}")
def advanced_voltage_analysis(self, samples, channel_name, sample_rate=100000):
"""
Advanced voltage data analysis
Args:
samples: Sample data
channel_name: Channel name
sample_rate: Sampling rate
Returns:
dict: Detailed analysis results
"""
if channel_name == 'A':
voltages = np.array([x[0][0] for x in samples])
currents = np.array([x[0][1] for x in samples])
else:
voltages = np.array([x[1][0] for x in samples])
currents = np.array([x[1][1] for x in samples])
# Basic statistics
basic_stats = {
'mean': np.mean(voltages),
'std': np.std(voltages),
'var': np.var(voltages),
'min': np.min(voltages),
'max': np.max(voltages),
'range': np.ptp(voltages),
'median': np.median(voltages),
'q25': np.percentile(voltages, 25),
'q75': np.percentile(voltages, 75),
'iqr': np.percentile(voltages, 75) - np.percentile(voltages, 25),
'rms': np.sqrt(np.mean(voltages**2)),
'peak_to_peak': np.ptp(voltages),
'count': len(voltages)
}
# Advanced statistics
advanced_stats = {
'skewness': stats.skew(voltages),
'kurtosis': stats.kurtosis(voltages),
'cv': basic_stats['std'] / abs(basic_stats['mean']) * 100if basic_stats['mean'] != 0else float('inf'),
'snr_db': 20 * np.log10(abs(basic_stats['mean']) / basic_stats['std']) if basic_stats['std'] != 0else float('inf'),
'thd': self.calculate_thd(voltages, sample_rate),
'crest_factor': basic_stats['max'] / basic_stats['rms'] if basic_stats['rms'] != 0else float('inf')
}
# Frequency domain analysis
freq_analysis = self.frequency_domain_analysis(voltages, sample_rate)
# Stability analysis
stability_analysis = self.stability_analysis(voltages)
# Anomaly detection
anomaly_detection = self.detect_anomalies(voltages)
# Power analysis (if current data available)
power_analysis = self.power_analysis(voltages, currents)
return {
'basic_stats': basic_stats,
'advanced_stats': advanced_stats,
'frequency_analysis': freq_analysis,
'stability_analysis': stability_analysis,
'anomaly_detection': anomaly_detection,
'power_analysis': power_analysis,
'raw_voltages': voltages,
'raw_currents': currents
}
def calculate_thd(self, signal_data, sample_rate, fundamental_freq=None):
"""
Calculate Total Harmonic Distortion (THD)
Args:
signal_data: Signal data
sample_rate: Sampling rate
fundamental_freq: Fundamental frequency (auto-detect if None)
Returns:
float: THD value (%)
"""
try:
# FFT analysis
fft_data = np.abs(fft(signal_data))
freqs = fftfreq(len(signal_data), 1/sample_rate)
# Take positive frequency part only
positive_freqs = freqs[:len(freqs)//2]
positive_fft = fft_data[:len(fft_data)//2]
if fundamental_freq isNone:
# Auto-detect fundamental frequency
fundamental_idx = np.argmax(positive_fft[1:]) + 1# Exclude DC component
fundamental_freq = positive_freqs[fundamental_idx]
# Calculate fundamental and harmonic amplitudes
fundamental_amplitude = np.max(positive_fft)
# Calculate energy of first 5 harmonics
harmonic_energy = 0
for i in range(2, 6): # 2nd to 5th harmonics
harmonic_freq = fundamental_freq * i
if harmonic_freq < np.max(positive_freqs):
harmonic_idx = np.argmin(np.abs(positive_freqs - harmonic_freq))
harmonic_energy += positive_fft[harmonic_idx]**2
# THD calculation
thd = np.sqrt(harmonic_energy) / fundamental_amplitude * 100
return thd
except Exception:
return0.0
def frequency_domain_analysis(self, signal_data, sample_rate):
"""
Frequency domain analysis
Args:
signal_data: Signal data
sample_rate: Sampling rate
Returns:
dict: Frequency domain analysis results
"""
try:
# FFT analysis
fft_data = fft(signal_data)
freqs = fftfreq(len(signal_data), 1/sample_rate)
# Power spectral density
psd = np.abs(fft_data)**2
# Take positive frequency part only
positive_freqs = freqs[:len(freqs)//2]
positive_psd = psd[:len(psd)//2]
# Dominant frequency components
dominant_freq_idx = np.argmax(positive_psd[1:]) + 1# Exclude DC
dominant_freq = positive_freqs[dominant_freq_idx]
# Bandwidth analysis
total_power = np.sum(positive_psd)
half_power = total_power / 2
# 3dB bandwidth
max_power_idx = np.argmax(positive_psd)
half_max_power = positive_psd[max_power_idx] / 2
bandwidth_3db = 0
for i in range(1, len(positive_psd)):
if positive_psd[i] >= half_max_power:
bandwidth_3db = positive_freqs[i]
break
return {
'dominant_frequency': dominant_freq,
'bandwidth_3db': bandwidth_3db,
'total_power': total_power,
'dc_component': positive_psd[0],
'ac_power': total_power - positive_psd[0],
'frequencies': positive_freqs,
'psd': positive_psd
}
except Exception as e:
return {
'dominant_frequency': 0,
'bandwidth_3db': 0,
'total_power': 0,
'dc_component': 0,
'ac_power': 0,
'error': str(e)
}
def stability_analysis(self, signal_data, window_size=100):
"""
Stability analysis
Args:
signal_data: Signal data
window_size: Sliding window size
Returns:
dict: Stability analysis results
"""
try:
# Sliding window statistics
windowed_means = []
windowed_stds = []
for i in range(0, len(signal_data) - window_size, window_size//2):
window = signal_data[i:i+window_size]
windowed_means.append(np.mean(window))
windowed_stds.append(np.std(window))
windowed_means = np.array(windowed_means)
windowed_stds = np.array(windowed_stds)
# Stability indicators
mean_stability = np.std(windowed_means) / np.mean(np.abs(windowed_means)) * 100if np.mean(np.abs(windowed_means)) != 0else float('inf')
std_stability = np.std(windowed_stds) / np.mean(windowed_stds) * 100if np.mean(windowed_stds) != 0else float('inf')
# Trend analysis
time_indices = np.arange(len(windowed_means))
slope, intercept, r_value, p_value, std_err = stats.linregress(time_indices, windowed_means)
return {
'mean_stability_percent': mean_stability,
'std_stability_percent': std_stability,
'trend_slope': slope,
'trend_r_squared': r_value**2,
'trend_p_value': p_value,
'windowed_means': windowed_means,
'windowed_stds': windowed_stds
}
except Exception as e:
return {
'mean_stability_percent': float('inf'),
'std_stability_percent': float('inf'),
'trend_slope': 0,
'trend_r_squared': 0,
'trend_p_value': 1,
'error': str(e)
}
def detect_anomalies(self, signal_data, threshold=3):
"""
Anomaly detection
Args:
signal_data: Signal data
threshold: Z-score threshold
Returns:
dict: Anomaly detection results
"""
try:
# Z-score anomaly detection
z_scores = np.abs(stats.zscore(signal_data))
anomaly_indices = np.where(z_scores > threshold)[0]
# IQR anomaly detection
q25, q75 = np.percentile(signal_data, [25, 75])
iqr = q75 - q25
lower_bound = q25 - 1.5 * iqr
upper_bound = q75 + 1.5 * iqr
iqr_anomaly_indices = np.where((signal_data < lower_bound) | (signal_data > upper_bound))[0]
return {
'zscore_anomalies': len(anomaly_indices),
'zscore_anomaly_rate': len(anomaly_indices) / len(signal_data) * 100,
'zscore_anomaly_indices': anomaly_indices.tolist(),
'iqr_anomalies': len(iqr_anomaly_indices),
'iqr_anomaly_rate': len(iqr_anomaly_indices) / len(signal_data) * 100,
'iqr_anomaly_indices': iqr_anomaly_indices.tolist(),
'iqr_bounds': {'lower': lower_bound, 'upper': upper_bound}
}
except Exception as e:
return {
'zscore_anomalies': 0,
'zscore_anomaly_rate': 0,
'iqr_anomalies': 0,
'iqr_anomaly_rate': 0,
'error': str(e)
}
def power_analysis(self, voltages, currents):
"""
Power analysis
Args:
voltages: Voltage data
currents: Current data
Returns:
dict: Power analysis results
"""
try:
# Instantaneous power
instantaneous_power = voltages * currents
# Average power
average_power = np.mean(instantaneous_power)
# RMS power
rms_voltage = np.sqrt(np.mean(voltages**2))
rms_current = np.sqrt(np.mean(currents**2))
apparent_power = rms_voltage * rms_current
# Power factor
power_factor = average_power / apparent_power if apparent_power != 0else0
# Resistance calculation
resistance = rms_voltage / rms_current if rms_current != 0else float('inf')
return {
'average_power': average_power,
'apparent_power': apparent_power,
'power_factor': power_factor,
'rms_voltage': rms_voltage,
'rms_current': rms_current,
'resistance': resistance,
'instantaneous_power': instantaneous_power
}
except Exception as e:
return {
'average_power': 0,
'apparent_power': 0,
'power_factor': 0,
'rms_voltage': 0,
'rms_current': 0,
'resistance': float('inf'),
'error': str(e)
}
def save_raw_data_csv(self, device_idx, samples):
"""
Save raw data to CSV file
Args:
device_idx: Device index
samples: Sample data
"""
filename = f"device_{device_idx}_raw_data_{self.timestamp}.csv"
filepath = os.path.join(self.output_dir, 'csv_data', filename)
# Prepare data
data = []
for i, sample in enumerate(samples):
data.append({
'Sample_Index': i + 1,
'Timestamp': i / 100000, # Assume 100kHz sampling rate
'Channel_A_Voltage': sample[0][0],
'Channel_A_Current': sample[0][1],
'Channel_B_Voltage': sample[1][0],
'Channel_B_Current': sample[1][1],
'Channel_A_Power': sample[0][0] * sample[0][1],
'Channel_B_Power': sample[1][0] * sample[1][1]
})
# Save to CSV
df = pd.DataFrame(data)
df.to_csv(filepath, index=False, encoding='utf-8-sig')
print(f"Raw data saved: {filepath}")
return filepath
def save_analysis_results_csv(self, device_idx, analysis_results):
"""
Save analysis results to CSV file
Args:
device_idx: Device index
analysis_results: Analysis results
"""
filename = f"device_{device_idx}_analysis_{self.timestamp}.csv"
filepath = os.path.join(self.output_dir, 'csv_data', filename)
# Prepare analysis results data
results_data = []
for channel in ['A', 'B']:
if channel in analysis_results:
result = analysis_results[channel]
# Basic statistics
for key, value in result['basic_stats'].items():
results_data.append({
'Device': device_idx,
'Channel': channel,
'Category': 'Basic_Statistics',
'Parameter': key,
'Value': value,
'Unit': 'V'if'voltage'in key.lower() or key in ['mean', 'std', 'min', 'max', 'range', 'median', 'rms'] else''
})
# Advanced statistics
for key, value in result['advanced_stats'].items():
unit = ''
if key == 'cv':
unit = '%'
elif key == 'snr_db':
unit = 'dB'
elif key == 'thd':
unit = '%'
results_data.append({
'Device': device_idx,
'Channel': channel,
'Category': 'Advanced_Statistics',
'Parameter': key,
'Value': value,
'Unit': unit
})
# Frequency domain analysis
for key, value in result['frequency_analysis'].items():
if key notin ['frequencies', 'psd', 'error']:
unit = 'Hz'if'freq'in key.lower() else'W'if'power'in key.lower() else''
results_data.append({
'Device': device_idx,
'Channel': channel,
'Category': 'Frequency_Analysis',
'Parameter': key,
'Value': value,
'Unit': unit
})
# Save to CSV
df = pd.DataFrame(results_data)
df.to_csv(filepath, index=False, encoding='utf-8-sig')
print(f"Analysis results saved: {filepath}")
return filepath
def create_professional_plots(self, device_idx, analysis_results):
"""
Create professional charts
Args:
device_idx: Device index
analysis_results: Analysis results
"""
# Set chart style - use clean default style
plt.style.use('default')
# Create large chart
fig = plt.figure(figsize=(20, 16))
gs = fig.add_gridspec(4, 4, hspace=0.3, wspace=0.3)
# Chart title
fig.suptitle(f'Device {device_idx} Professional Data Analysis ReportnTime: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
fontsize=16, fontweight='bold')
# 1. Time domain signal comparison (2x2)
ax1 = fig.add_subplot(gs[0:2, 0:2])
voltages_a = analysis_results['A']['raw_voltages'][:1000] # Limit display points
voltages_b = analysis_results['B']['raw_voltages'][:1000]
time_axis = np.arange(len(voltages_a)) / 100000# Assume 100kHz sampling rate
ax1.plot(time_axis, voltages_a, 'b-', linewidth=1, alpha=0.8, label='Channel A')
ax1.plot(time_axis, voltages_b, 'r-', linewidth=1, alpha=0.8, label='Channel B')
ax1.set_title('Time Domain Signal Comparison', fontsize=14, fontweight='bold')
ax1.set_xlabel('Time (s)')
ax1.set_ylabel('Voltage (V)')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. Frequency spectrum analysis (top right)
ax2 = fig.add_subplot(gs[0, 2:])
freq_a = analysis_results['A']['frequency_analysis']
freq_b = analysis_results['B']['frequency_analysis']

if'frequencies'in freq_a and'psd'in freq_a:
freqs_a = freq_a['frequencies'][:len(freq_a['frequencies'])//10] # Limit display range
psd_a = freq_a['psd'][:len(freq_a['psd'])//10]
ax2.semilogy(freqs_a, psd_a, 'b-', alpha=0.8, label='Channel A')
if'frequencies'in freq_b and'psd'in freq_b:
freqs_b = freq_b['frequencies'][:len(freq_b['frequencies'])//10]
psd_b = freq_b['psd'][:len(freq_b['psd'])//10]
ax2.semilogy(freqs_b, psd_b, 'r-', alpha=0.8, label='Channel B')
ax2.set_title('Power Spectral Density', fontsize=12, fontweight='bold')
ax2.set_xlabel('Frequency (Hz)')
ax2.set_ylabel('Power Spectral Density')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 3. Statistical distribution comparison (right middle)
ax3 = fig.add_subplot(gs[1, 2:])
ax3.hist(voltages_a, bins=50, alpha=0.6, color='blue', label='Channel A', density=True)
ax3.hist(voltages_b, bins=50, alpha=0.6, color='red', label='Channel B', density=True)
ax3.axvline(np.mean(voltages_a), color='blue', linestyle='--', alpha=0.8)
ax3.axvline(np.mean(voltages_b), color='red', linestyle='--', alpha=0.8)
ax3.set_title('Voltage Distribution Histogram', fontsize=12, fontweight='bold')
ax3.set_xlabel('Voltage (V)')
ax3.set_ylabel('Probability Density')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 4. Stability analysis (bottom left)
ax4 = fig.add_subplot(gs[2, 0:2])
stability_a = analysis_results['A']['stability_analysis']
stability_b = analysis_results['B']['stability_analysis']
if'windowed_means'in stability_a:
window_indices = np.arange(len(stability_a['windowed_means']))
ax4.plot(window_indices, stability_a['windowed_means'], 'b-o', markersize=3, label='Channel A Mean')
ax4.fill_between(window_indices,
stability_a['windowed_means'] - stability_a['windowed_stds'],
stability_a['windowed_means'] + stability_a['windowed_stds'],
alpha=0.2, color='blue')
if'windowed_means'in stability_b:
window_indices = np.arange(len(stability_b['windowed_means']))
ax4.plot(window_indices, stability_b['windowed_means'], 'r-o', markersize=3, label='Channel B Mean')
ax4.fill_between(window_indices,
stability_b['windowed_means'] - stability_b['windowed_stds'],
stability_b['windowed_means'] + stability_b['windowed_stds'],
alpha=0.2, color='red')
ax4.set_title('Stability Analysis (Sliding Window)', fontsize=12, fontweight='bold')
ax4.set_xlabel('Window Index')
ax4.set_ylabel('Voltage (V)')
ax4.legend()
ax4.grid(True, alpha=0.3)
# 5. Anomaly detection (bottom right)
ax5 = fig.add_subplot(gs[2, 2:])
anomaly_a = analysis_results['A']['anomaly_detection']
anomaly_b = analysis_results['B']['anomaly_detection']
categories = ['Z-score Anomalies', 'IQR Anomalies']
anomaly_rates_a = [anomaly_a.get('zscore_anomaly_rate', 0), anomaly_a.get('iqr_anomaly_rate', 0)]
anomaly_rates_b = [anomaly_b.get('zscore_anomaly_rate', 0), anomaly_b.get('iqr_anomaly_rate', 0)]
x = np.arange(len(categories))
width = 0.35
ax5.bar(x - width/2, anomaly_rates_a, width, label='Channel A', color='blue', alpha=0.7)
ax5.bar(x + width/2, anomaly_rates_b, width, label='Channel B', color='red', alpha=0.7)
ax5.set_title('Anomaly Detection Results', fontsize=12, fontweight='bold')
ax5.set_xlabel('Detection Method')
ax5.set_ylabel('Anomaly Rate (%)')
ax5.set_xticks(x)
ax5.set_xticklabels(categories)
ax5.legend()
ax5.grid(True, alpha=0.3)
# 6. Key metrics dashboard (bottom)
ax6 = fig.add_subplot(gs[3, :])
ax6.axis('off')
# Create metrics table
metrics_data = []
metrics_data.append(['Parameter', 'Channel A', 'Channel B', 'Difference', 'Assessment'])
# Average voltage
mean_a = analysis_results['A']['basic_stats']['mean']
mean_b = analysis_results['B']['basic_stats']['mean']
diff_mean = abs(mean_a - mean_b)
eval_mean = 'Excellent'if diff_mean < 0.001else'Good'if diff_mean < 0.01else'Fair'
metrics_data.append(['Average Voltage (V)', f'{mean_a:.6f}', f'{mean_b:.6f}', f'{diff_mean:.6f}', eval_mean])
# Standard deviation
std_a = analysis_results['A']['basic_stats']['std']
std_b = analysis_results['B']['basic_stats']['std']
diff_std = abs(std_a - std_b)
eval_std = 'Excellent'if max(std_a, std_b) < 0.0001else'Good'if max(std_a, std_b) < 0.001else'Fair'
metrics_data.append(['Standard Deviation (V)', f'{std_a:.6f}', f'{std_b:.6f}', f'{diff_std:.6f}', eval_std])
# Signal-to-noise ratio
snr_a = analysis_results['A']['advanced_stats']['snr_db']
snr_b = analysis_results['B']['advanced_stats']['snr_db']
diff_snr = abs(snr_a - snr_b)
eval_snr = 'Excellent'if min(snr_a, snr_b) > 60else'Good'if min(snr_a, snr_b) > 40else'Fair'
metrics_data.append(['SNR (dB)', f'{snr_a:.1f}', f'{snr_b:.1f}', f'{diff_snr:.1f}', eval_snr])
# THD
thd_a = analysis_results['A']['advanced_stats']['thd']
thd_b = analysis_results['B']['advanced_stats']['thd']
diff_thd = abs(thd_a - thd_b)
eval_thd = 'Excellent'if max(thd_a, thd_b) < 1else'Good'if max(thd_a, thd_b) < 5else'Fair'
metrics_data.append(['THD (%)', f'{thd_a:.3f}', f'{thd_b:.3f}', f'{diff_thd:.3f}', eval_thd])
# Create table
table = ax6.table(cellText=metrics_data[1:], colLabels=metrics_data[0],
cellLoc='center', loc='center', bbox=[0, 0, 1, 1])
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1, 2)
# Set table style
for i in range(len(metrics_data)):
for j in range(len(metrics_data[0])):
cell = table[(i, j)]
if i == 0: # Header
cell.set_facecolor('#4CAF50')
cell.set_text_props(weight='bold', color='white')
elif j == 4: # Assessment column
if i > 0:
eval_text = metrics_data[i][j]
if eval_text == 'Excellent':
cell.set_facecolor('#E8F5E8')
elif eval_text == 'Good':
cell.set_facecolor('#FFF3E0')
else:
cell.set_facecolor('#FFEBEE')
# Save chart with high resolution
plot_filename = f"device_{device_idx}_analysis_{self.timestamp}.png"
plot_filepath = os.path.join(self.output_dir, 'plots', plot_filename)
plt.savefig(plot_filepath, dpi=300, bbox_inches='tight', facecolor='white', edgecolor='none')
print(f"Analysis chart saved: {plot_filepath}")
# Display chart (non-blocking)
plt.show(block=False)
plt.pause(0.1) # Brief pause to ensure display
# Close chart to free memory
plt.close(fig)
return plot_filepath
def generate_pdf_report(self, device_idx, analysis_results, device_info):
"""
Generate PDF report
Args:
device_idx: Device index
analysis_results: Analysis results
device_info: Device information
"""
report_filename = f"device_{device_idx}_report_{self.timestamp}.pdf"
report_filepath = os.path.join(self.output_dir, 'reports', report_filename)
with PdfPages(report_filepath) as pdf:
# First page: Cover and summary
fig, ax = plt.subplots(figsize=(8.5, 11))
ax.axis('off')
# Title
ax.text(0.5, 0.9, 'pysmu Professional Data Analysis Report',
horizontalalignment='center', fontsize=24, fontweight='bold')
# Device information
ax.text(0.5, 0.8, f'Device {device_idx}',
horizontalalignment='center', fontsize=18)
ax.text(0.5, 0.75, f'Serial Number: {device_info.get("serial", "Unknown")}',
horizontalalignment='center', fontsize=14)
ax.text(0.5, 0.7, f'Firmware Version: {device_info.get("fwver", "Unknown")}',
horizontalalignment='center', fontsize=14)
ax.text(0.5, 0.65, f'Hardware Version: {device_info.get("hwver", "Unknown")}',
horizontalalignment='center', fontsize=14)
# Test time
ax.text(0.5, 0.55, f'Test Time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
horizontalalignment='center', fontsize=14)
# Summary information
ax.text(0.1, 0.45, 'Test Summary:', fontsize=16, fontweight='bold')
summary_text = f"""
· Sample Count: {analysis_results['A']['basic_stats']['count']} samples
· Channel A Average Voltage: {analysis_results['A']['basic_stats']['mean']:.6f} V
· Channel B Average Voltage: {analysis_results['B']['basic_stats']['mean']:.6f} V
· Channel A SNR: {analysis_results['A']['advanced_stats']['snr_db']:.1f} dB
· Channel B SNR: {analysis_results['B']['advanced_stats']['snr_db']:.1f} dB
· Channel A THD: {analysis_results['A']['advanced_stats']['thd']:.3f} %
· Channel B THD: {analysis_results['B']['advanced_stats']['thd']:.3f} %
"""
ax.text(0.1, 0.35, summary_text, fontsize=12, verticalalignment='top')
# Quality assessment
ax.text(0.1, 0.15, 'Overall Assessment:', fontsize=16, fontweight='bold')
# Simple quality assessment logic
snr_avg = (analysis_results['A']['advanced_stats']['snr_db'] +
analysis_results['B']['advanced_stats']['snr_db']) / 2
thd_max = max(analysis_results['A']['advanced_stats']['thd'],
analysis_results['B']['advanced_stats']['thd'])
if snr_avg > 60and thd_max < 1:
quality = "Excellent - Device performance is outstanding"
elif snr_avg > 40and thd_max < 5:
quality = "Good - Device performance meets requirements"
else:
quality = "Fair - Recommend checking device calibration"
ax.text(0.1, 0.1, quality, fontsize=14, color='green'if'Excellent'in quality else'orange'if'Good'in quality else'red')
pdf.savefig(fig, bbox_inches='tight')
plt.close(fig)
# Second page: Analysis charts with row layout
fig = plt.figure(figsize=(8.5, 11))
gs = fig.add_gridspec(6, 1, hspace=0.4)
# Chart title
fig.suptitle(f'Device {device_idx} Analysis ChartsnTime: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
fontsize=14, fontweight='bold')
# 1. Time domain signal comparison (Row 1)
ax1 = fig.add_subplot(gs[0])
voltages_a = analysis_results['A']['raw_voltages'][:1000]
voltages_b = analysis_results['B']['raw_voltages'][:1000]
time_axis = np.arange(len(voltages_a)) / 100000
ax1.plot(time_axis, voltages_a, 'b-', linewidth=1, alpha=0.8, label='Channel A')
ax1.plot(time_axis, voltages_b, 'r-', linewidth=1, alpha=0.8, label='Channel B')
ax1.set_title('Time Domain Signal Comparison', fontsize=10, fontweight='bold')
ax1.set_xlabel('Time (s)', fontsize=8)
ax1.set_ylabel('Voltage (V)', fontsize=8)
ax1.legend(fontsize=8)
ax1.grid(True, alpha=0.3)
ax1.tick_params(labelsize=7)
# 2. Frequency spectrum analysis (Row 2)
ax2 = fig.add_subplot(gs[1])
freq_a = analysis_results['A']['frequency_analysis']
freq_b = analysis_results['B']['frequency_analysis']
if'frequencies'in freq_a and'psd'in freq_a:
freqs_a = freq_a['frequencies'][:len(freq_a['frequencies'])//10]
psd_a = freq_a['psd'][:len(freq_a['psd'])//10]
ax2.semilogy(freqs_a, psd_a, 'b-', alpha=0.8, label='Channel A')
if'frequencies'in freq_b and'psd'in freq_b:
freqs_b = freq_b['frequencies'][:len(freq_b['frequencies'])//10]
psd_b = freq_b['psd'][:len(freq_b['psd'])//10]
ax2.semilogy(freqs_b, psd_b, 'r-', alpha=0.8, label='Channel B')
ax2.set_title('Power Spectral Density', fontsize=10, fontweight='bold')
ax2.set_xlabel('Frequency (Hz)', fontsize=8)
ax2.set_ylabel('Power Spectral Density', fontsize=8)
ax2.legend(fontsize=8)
ax2.grid(True, alpha=0.3)
ax2.tick_params(labelsize=7)
# 3. Statistical distribution comparison (Row 3)
ax3 = fig.add_subplot(gs[2])
ax3.hist(voltages_a, bins=50, alpha=0.6, color='blue', label='Channel A', density=True)
ax3.hist(voltages_b, bins=50, alpha=0.6, color='red', label='Channel B', density=True)
ax3.axvline(np.mean(voltages_a), color='blue', linestyle='--', alpha=0.8)
ax3.axvline(np.mean(voltages_b), color='red', linestyle='--', alpha=0.8)
ax3.set_title('Voltage Distribution Histogram', fontsize=10, fontweight='bold')
ax3.set_xlabel('Voltage (V)', fontsize=8)
ax3.set_ylabel('Probability Density', fontsize=8)
ax3.legend(fontsize=8)
ax3.grid(True, alpha=0.3)
ax3.tick_params(labelsize=7)
# 4. Stability analysis (Row 4)
ax4 = fig.add_subplot(gs[3])
stability_a = analysis_results['A']['stability_analysis']
stability_b = analysis_results['B']['stability_analysis']
if'windowed_means'in stability_a:
window_indices = np.arange(len(stability_a['windowed_means']))
ax4.plot(window_indices, stability_a['windowed_means'], 'b-o', markersize=2, label='Channel A Mean')
ax4.fill_between(window_indices,
stability_a['windowed_means'] - stability_a['windowed_stds'],
stability_a['windowed_means'] + stability_a['windowed_stds'],
alpha=0.2, color='blue')
if'windowed_means'in stability_b:
window_indices = np.arange(len(stability_b['windowed_means']))
ax4.plot(window_indices, stability_b['windowed_means'], 'r-o', markersize=2, label='Channel B Mean')
ax4.fill_between(window_indices,
stability_b['windowed_means'] - stability_b['windowed_stds'],
stability_b['windowed_means'] + stability_b['windowed_stds'],
alpha=0.2, color='red')
ax4.set_title('Stability Analysis (Sliding Window)', fontsize=10, fontweight='bold')
ax4.set_xlabel('Window Index', fontsize=8)
ax4.set_ylabel('Voltage (V)', fontsize=8)
ax4.legend(fontsize=8)
ax4.grid(True, alpha=0.3)
ax4.tick_params(labelsize=7)
# 5. Anomaly detection (Row 5)
ax5 = fig.add_subplot(gs[4])
anomaly_a = analysis_results['A']['anomaly_detection']
anomaly_b = analysis_results['B']['anomaly_detection']
categories = ['Z-score Anomalies', 'IQR Anomalies']
anomaly_rates_a = [anomaly_a.get('zscore_anomaly_rate', 0), anomaly_a.get('iqr_anomaly_rate', 0)]
anomaly_rates_b = [anomaly_b.get('zscore_anomaly_rate', 0), anomaly_b.get('iqr_anomaly_rate', 0)]
x = np.arange(len(categories))
width = 0.35
ax5.bar(x - width/2, anomaly_rates_a, width, label='Channel A', color='blue', alpha=0.7)
ax5.bar(x + width/2, anomaly_rates_b, width, label='Channel B', color='red', alpha=0.7)
ax5.set_title('Anomaly Detection Results', fontsize=10, fontweight='bold')
ax5.set_xlabel('Detection Method', fontsize=8)
ax5.set_ylabel('Anomaly Rate (%)', fontsize=8)
ax5.set_xticks(x)
ax5.set_xticklabels(categories, fontsize=7)
ax5.legend(fontsize=8)
ax5.grid(True, alpha=0.3)
ax5.tick_params(labelsize=7)
# 6. Key metrics dashboard (Row 6)
ax6 = fig.add_subplot(gs[5])
ax6.axis('off')
# Create metrics table
metrics_data = []
metrics_data.append(['Parameter', 'Channel A', 'Channel B', 'Difference', 'Assessment'])
# Average voltage
mean_a = analysis_results['A']['basic_stats']['mean']
mean_b = analysis_results['B']['basic_stats']['mean']
diff_mean = abs(mean_a - mean_b)
eval_mean = 'Excellent'if diff_mean < 0.001else'Good'if diff_mean < 0.01else'Fair'
metrics_data.append(['Average Voltage (V)', f'{mean_a:.6f}', f'{mean_b:.6f}', f'{diff_mean:.6f}', eval_mean])
# Standard deviation
std_a = analysis_results['A']['basic_stats']['std']
std_b = analysis_results['B']['basic_stats']['std']
diff_std = abs(std_a - std_b)
eval_std = 'Excellent'if max(std_a, std_b) < 0.0001else'Good'if max(std_a, std_b) < 0.001else'Fair'
metrics_data.append(['Standard Deviation (V)', f'{std_a:.6f}', f'{std_b:.6f}', f'{diff_std:.6f}', eval_std])
# Signal-to-noise ratio
snr_a = analysis_results['A']['advanced_stats']['snr_db']
snr_b = analysis_results['B']['advanced_stats']['snr_db']
diff_snr = abs(snr_a - snr_b)
eval_snr = 'Excellent'if min(snr_a, snr_b) > 60else'Good'if min(snr_a, snr_b) > 40else'Fair'
metrics_data.append(['SNR (dB)', f'{snr_a:.1f}', f'{snr_b:.1f}', f'{diff_snr:.1f}', eval_snr])
# THD
thd_a = analysis_results['A']['advanced_stats']['thd']
thd_b = analysis_results['B']['advanced_stats']['thd']
diff_thd = abs(thd_a - thd_b)
eval_thd = 'Excellent'if max(thd_a, thd_b) < 1else'Good'if max(thd_a, thd_b) < 5else'Fair'
metrics_data.append(['THD (%)', f'{thd_a:.3f}', f'{thd_b:.3f}', f'{diff_thd:.3f}', eval_thd])
# Create table
table = ax6.table(cellText=metrics_data[1:], colLabels=metrics_data[0],
cellLoc='center', loc='center', bbox=[0, 0, 1, 1])
table.auto_set_font_size(False)
table.set_fontsize(7)
table.scale(1, 1.2)
# Set table style
for i in range(len(metrics_data)):
for j in range(len(metrics_data[0])):
cell = table[(i, j)]
if i == 0: # Header
cell.set_facecolor('#4CAF50')
cell.set_text_props(weight='bold', color='white')
elif j == 4: # Assessment column
if i > 0:
eval_text = metrics_data[i][j]
if eval_text == 'Excellent':
cell.set_facecolor('#E8F5E8')
elif eval_text == 'Good':
cell.set_facecolor('#FFF3E0')
else:
cell.set_facecolor('#FFEBEE')
# Save to PDF with high resolution
pdf.savefig(fig, dpi=300, bbox_inches='tight')
plt.close(fig)
# Third page: Detailed data tables
fig, ax = plt.subplots(figsize=(8.5, 11))
ax.axis('off')
ax.text(0.5, 0.95, 'Detailed Analysis Data',
horizontalalignment='center', fontsize=18, fontweight='bold')
# Create detailed data table
detailed_data = []
detailed_data.append(['Category', 'Parameter', 'Channel A', 'Channel B', 'Unit'])
# Basic statistical data
basic_params = [
('Mean', 'mean', 'V'),
('Std Dev', 'std', 'V'),
('Minimum', 'min', 'V'),
('Maximum', 'max', 'V'),
('Median', 'median', 'V'),
('RMS', 'rms', 'V')
]
for param_name, param_key, unit in basic_params:
val_a = analysis_results['A']['basic_stats'][param_key]
val_b = analysis_results['B']['basic_stats'][param_key]
detailed_data.append(['Basic Stats', param_name, f'{val_a:.6f}', f'{val_b:.6f}', unit])
# Advanced statistical data
advanced_params = [
('Skewness', 'skewness', ''),
('Kurtosis', 'kurtosis', ''),
('CV', 'cv', '%'),
('SNR', 'snr_db', 'dB'),
('THD', 'thd', '%'),
('Crest Factor', 'crest_factor', '')
]
for param_name, param_key, unit in advanced_params:
val_a = analysis_results['A']['advanced_stats'][param_key]
val_b = analysis_results['B']['advanced_stats'][param_key]
if param_key in ['cv', 'thd']:
detailed_data.append(['Advanced Stats', param_name, f'{val_a:.3f}', f'{val_b:.3f}', unit])
elif param_key == 'snr_db':
detailed_data.append(['Advanced Stats', param_name, f'{val_a:.1f}', f'{val_b:.1f}', unit])
else:
detailed_data.append(['Advanced Stats', param_name, f'{val_a:.4f}', f'{val_b:.4f}', unit])
# Create table
table = ax.table(cellText=detailed_data[1:], colLabels=detailed_data[0],
cellLoc='center', loc='center', bbox=[0, 0.1, 1, 0.8])
table.auto_set_font_size(False)
table.set_fontsize(9)
table.scale(1, 1.5)
# Set table style
for i in range(len(detailed_data)):
for j in range(len(detailed_data[0])):
cell = table[(i, j)]
if i == 0: # Header
cell.set_facecolor('#2196F3')
cell.set_text_props(weight='bold', color='white')
elif j == 0: # Category column
cell.set_facecolor('#E3F2FD')
cell.set_text_props(weight='bold')
pdf.savefig(fig, bbox_inches='tight')
plt.close(fig)
# Fourth page: Raw data display
fig, ax = plt.subplots(figsize=(8.5, 11))
ax.axis('off')
ax.text(0.5, 0.95, 'Raw Data Sample (First 50 samples)',
horizontalalignment='center', fontsize=16, fontweight='bold')
# Create raw data table
raw_data_samples = self.raw_data[device_idx][:50] # Show first 50 samples
raw_data_table = []
raw_data_table.append(['Sample', 'Time (s)', 'Ch A Voltage (V)', 'Ch A Current (A)', 'Ch B Voltage (V)', 'Ch B Current (A)'])
for i, sample in enumerate(raw_data_samples):
raw_data_table.append([
str(i + 1),
f'{i / 100000:.6f}',
f'{sample[0][0]:.6f}',
f'{sample[0][1]:.6f}',
f'{sample[1][0]:.6f}',
f'{sample[1][1]:.6f}'
])
# Create table with smaller font for more data
table = ax.table(cellText=raw_data_table[1:], colLabels=raw_data_table[0],
cellLoc='center', loc='center', bbox=[0, 0.1, 1, 0.8])
table.auto_set_font_size(False)
table.set_fontsize(6)
table.scale(1, 0.8)
# Set table style
for i in range(len(raw_data_table)):
for j in range(len(raw_data_table[0])):
cell = table[(i, j)]
if i == 0: # Header
cell.set_facecolor('#2196F3')
cell.set_text_props(weight='bold', color='white')
elif i % 2 == 0: # Alternate row colors
cell.set_facecolor('#F5F5F5')
# Add note
ax.text(0.5, 0.05, f'Note: Showing first 50 samples out of {len(self.raw_data[device_idx])} total samples',
horizontalalignment='center', fontsize=10, style='italic')
pdf.savefig(fig, bbox_inches='tight')
plt.close(fig)
print(f"PDF report generated: {report_filepath}")
return report_filepath
def run_professional_analysis(self):
"""
Run professional analysis workflow
"""
print("pysmu Professional Data Analysis Tool")
print("=" * 50)
# Create session
try:
session = Session()
except Exception as e:
print(f"Failed to create session: {e}")
return
# Check devices
ifnot session.devices:
print("Error: No devices detected")
print("Please ensure:")
print("1. ADALM1000 device is connected")
print("2. Drivers are properly installed")
print("3. libsmu C++ library is installed")
return
print(f"Detected {len(session.devices)} device(s)")
# Configure devices
for idx, dev in enumerate(session.devices):
print(f"nConfiguring device {idx}: {dev}")
# Save device information
self.device_info[idx] = {
'serial': dev.serial,
'fwver': dev.fwver,
'hwver': dev.hwver
}
# Channel A: Source voltage mode
target_voltage_a = idx % 6
dev.channels['A'].mode = Mode.SVMI
dev.channels['A'].constant(target_voltage_a)
print(f" Channel A: SVMI mode, target voltage {target_voltage_a}V")
# Channel B: Source current mode
target_current_b = 0.05
dev.channels['B'].mode = Mode.SIMV
dev.channels['B'].constant(target_current_b)
print(f" Channel B: SIMV mode, target current {target_current_b}A")
# Data acquisition
print("nStarting professional data acquisition...")
sample_count = 2000# Increase sample count for better analysis precision
try:
all_samples = session.get_samples(sample_count)
print(f"Successfully acquired {sample_count} samples")
except Exception as e:
print(f"Data acquisition failed: {e}")
return
# Analyze data for each device
for dev_idx, samples in enumerate(all_samples):
print(f"n{'='*60}")
print(f"Device {dev_idx} Professional Data Analysis")
print(f"{'='*60}")
# Save raw data
self.raw_data[dev_idx] = samples
csv_filepath = self.save_raw_data_csv(dev_idx, samples)
# Perform advanced analysis
print("nPerforming advanced data analysis...")
analysis_a = self.advanced_voltage_analysis(samples, 'A')
analysis_b = self.advanced_voltage_analysis(samples, 'B')
self.analysis_results[dev_idx] = {
'A': analysis_a,
'B': analysis_b
}
# Save analysis results
analysis_csv_filepath = self.save_analysis_results_csv(dev_idx, self.analysis_results[dev_idx])
# Generate charts
print("nGenerating professional charts...")
plot_filepath = self.create_professional_plots(dev_idx, self.analysis_results[dev_idx])
# Generate PDF report
print("nGenerating PDF report...")
pdf_filepath = self.generate_pdf_report(dev_idx, self.analysis_results[dev_idx], self.device_info[dev_idx])
# Print key results
print("n=== Key Analysis Results ===")
print(f"Channel A - Average Voltage: {analysis_a['basic_stats']['mean']:.6f}V, SNR: {analysis_a['advanced_stats']['snr_db']:.1f}dB, THD: {analysis_a['advanced_stats']['thd']:.3f}%")
print(f"Channel B - Average Voltage: {analysis_b['basic_stats']['mean']:.6f}V, SNR: {analysis_b['advanced_stats']['snr_db']:.1f}dB, THD: {analysis_b['advanced_stats']['thd']:.3f}%")
print(f"n=== Output Files ===")
print(f"Raw data: {csv_filepath}")
print(f"Analysis results: {analysis_csv_filepath}")
print(f"Analysis charts: {plot_filepath}")
print(f"PDF report: {pdf_filepath}")
print(f"nProfessional analysis completed! All files saved in: {self.output_dir}")
# Generate summary report
self.generate_summary_report()
def generate_summary_report(self):
"""
Generate summary report
"""
summary_filename = f"summary_report_{self.timestamp}.json"
summary_filepath = os.path.join(self.output_dir, 'reports', summary_filename)
summary_data = {
'timestamp': self.timestamp,
'analysis_time': datetime.now().isoformat(),
'devices_analyzed': len(self.analysis_results),
'device_info': self.device_info,
'analysis_summary': {}
}
for dev_idx, results in self.analysis_results.items():
summary_data['analysis_summary'][dev_idx] = {
'channel_A': {
'mean_voltage': results['A']['basic_stats']['mean'],
'std_voltage': results['A']['basic_stats']['std'],
'snr_db': results['A']['advanced_stats']['snr_db'],
'thd_percent': results['A']['advanced_stats']['thd']
},
'channel_B': {
'mean_voltage': results['B']['basic_stats']['mean'],
'std_voltage': results['B']['basic_stats']['std'],
'snr_db': results['B']['advanced_stats']['snr_db'],
'thd_percent': results['B']['advanced_stats']['thd']
}
}
with open(summary_filepath, 'w', encoding='utf-8') as f:
json.dump(summary_data, f, indent=2, ensure_ascii=False)
print(f"Summary report saved: {summary_filepath}")
def main():
"""
Main function
"""
# Check dependencies
try:
import pandas as pd
from scipy import stats, signal
except ImportError as e:
print(f"Missing dependencies: {e}")
print("Please install: pip install pandas scipy")
return
# Create analyzer and run
analyzer = ProfessionalAnalyzer()
analyzer.run_professional_analysis()
if __name__ == '__main__':
main()