当前位置: 首页 > news >正文

基于自编码器的心电图信号异常检测(Python)

使用的数据集来自PTB心电图数据库,包括14552个心电图记录,包括两类:正常心跳和异常心跳,采样频率为125Hz。

import numpy as np
np.set_printoptions(suppress=True)
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.io import arff
from sklearn.model_selection import train_test_split
import matplotlib
matplotlib.rcParams["figure.figsize"] = (6, 4)
plt.style.use("ggplot")
import tensorflow as tf
from tensorflow import data
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import mae
from tensorflow.keras import layers
from tensorflow import keras
from sklearn.metrics import accuracy_score, recall_score, precision_score, confusion_matrix, f1_score, classification_report
import os
isGPU = tf.config.list_physical_devices('GPU')
directory_path = r'ECG Heartbeat Categorization Dataset'
for dirname, _, filenames in os.walk(directory_path):for filename in filenames:print(os.path.join(dirname, filename))
normal_df = pd.read_csv("ECG Heartbeat Categorization Dataset/ptbdb_normal.csv").iloc[:, :-1]
anomaly_df = pd.read_csv("ECG Heartbeat Categorization Dataset/ptbdb_abnormal.csv").iloc[:, :-1]
print("Shape of Normal data", normal_df.shape)
print("Shape of Abnormal data", anomaly_df.shape)
Shape of Normal data (4045, 187)
Shape of Abnormal data (10505, 187)
def plot_sample(normal, anomaly):index = np.random.randint(0, len(normal_df), 2)fig, ax = plt.subplots(1, 2, sharey=True, figsize=(10, 4))ax[0].plot(normal.iloc[index[0], :].values, label=f"Case {index[0]}")ax[0].plot(normal.iloc[index[1], :].values, label=f"Case {index[1]}")ax[0].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)ax[0].set_title("Normal")ax[1].plot(anomaly.iloc[index[0], :].values, label=f"Case {index[0]}")ax[1].plot(anomaly.iloc[index[1], :].values, label=f"Case {index[1]}")ax[1].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)ax[1].set_title("Anomaly")plt.tight_layout()plt.show()
plot_sample(normal_df, anomaly_df)

CLASS_NAMES = ["Normal", "Anomaly"]normal_df_copy = normal_df.copy()
anomaly_df_copy = anomaly_df.copy()
print(anomaly_df_copy.columns.equals(normal_df_copy.columns))
normal_df_copy = normal_df_copy.set_axis(range(1, 188), axis=1)
anomaly_df_copy = anomaly_df_copy.set_axis(range(1, 188), axis=1)
normal_df_copy = normal_df_copy.assign(target = CLASS_NAMES[0])
anomaly_df_copy = anomaly_df_copy.assign(target = CLASS_NAMES[1])df = pd.concat((normal_df_copy, anomaly_df_copy))
def plot_smoothed_mean(data, class_name = "normal", step_size=5, ax=None):df = pd.DataFrame(data)roll_df = df.rolling(step_size)smoothed_mean = roll_df.mean().dropna().reset_index(drop=True)smoothed_std = roll_df.std().dropna().reset_index(drop=True)margin = 3*smoothed_stdlower_bound = (smoothed_mean - margin).values.flatten()upper_bound = (smoothed_mean + margin).values.flatten()ax.plot(smoothed_mean.index, smoothed_mean)ax.fill_between(smoothed_mean.index, lower_bound, y2=upper_bound, alpha=0.3, color="red")ax.set_title(class_name, fontsize=9)
fig, axes = plt.subplots(1, 2, figsize=(8, 4), sharey=True)
axes = axes.flatten()
for i, label in enumerate(CLASS_NAMES, start=1):data_group = df.groupby("target")data = data_group.get_group(label).mean(axis=0, numeric_only=True).to_numpy()plot_smoothed_mean(data, class_name=label, step_size=20, ax=axes[i-1])
fig.suptitle("Plot of smoothed mean for each class", y=0.95, weight="bold")
plt.tight_layout()

normal_df.drop("target", axis=1, errors="ignore", inplace=True)
normal = normal_df.to_numpy()
anomaly_df.drop("target", axis=1, errors="ignore", inplace=True)
anomaly = anomaly_df.to_numpy()X_train, X_test = train_test_split(normal, test_size=0.15, random_state=45, shuffle=True)
print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}, anomaly shape: {anomaly.shape}")
Train shape: (3438, 187), Test shape: (607, 187), anomaly shape: (10505, 187)
class AutoEncoder(Model):def __init__(self, input_dim, latent_dim):super(AutoEncoder, self).__init__()self.input_dim = input_dimself.latent_dim = latent_dimself.encoder = tf.keras.Sequential([layers.Input(shape=(input_dim,)),layers.Reshape((input_dim, 1)),  # Reshape to 3D for Conv1Dlayers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),layers.BatchNormalization(),layers.MaxPooling1D(2, padding="same"),layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),layers.BatchNormalization(),layers.MaxPooling1D(2, padding="same"),layers.Conv1D(latent_dim, 3, strides=1, activation='relu', padding="same"),layers.BatchNormalization(),layers.MaxPooling1D(2, padding="same"),])# Previously, I was using UpSampling. I am trying Transposed Convolution this time around.self.decoder = tf.keras.Sequential([layers.Conv1DTranspose(latent_dim, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),layers.BatchNormalization(),layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),layers.BatchNormalization(),layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),layers.BatchNormalization(),layers.Flatten(),layers.Dense(input_dim)])def call(self, X):encoded = self.encoder(X)decoded = self.decoder(encoded)return decodedinput_dim = X_train.shape[-1]
latent_dim = 32model = AutoEncoder(input_dim, latent_dim)
model.build((None, input_dim))
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss="mae")
model.summary()
Model: "auto_encoder"
_________________________________________________________________Layer (type)                Output Shape              Param #   
=================================================================sequential (Sequential)     (None, 24, 32)            63264     sequential_1 (Sequential)   (None, 187)               640603    =================================================================
Total params: 703867 (2.69 MB)
Trainable params: 702715 (2.68 MB)
Non-trainable params: 1152 (4.50 KB)
epochs = 100
batch_size = 128
early_stopping = EarlyStopping(patience=10, min_delta=1e-3, monitor="val_loss", restore_best_weights=True)history = model.fit(X_train, X_train, epochs=epochs, batch_size=batch_size,validation_split=0.1, callbacks=[early_stopping])

plt.plot(history.history['loss'], label="Training loss")
plt.plot(history.history['val_loss'], label="Validation loss", ls="--")
plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
plt.title("Training loss")
plt.ylabel("Loss")
plt.xlabel("Epoch")
plt.show()

train_mae = model.evaluate(X_train, X_train, verbose=0)
test_mae = model.evaluate(X_test, X_test, verbose=0)
anomaly_mae = model.evaluate(anomaly_df, anomaly_df, verbose=0)print("Training dataset error: ", train_mae)
print("Testing dataset error: ", test_mae)
print("Anormaly dataset error: ", anomaly_mae)

Training dataset error:  0.014224529266357422
Testing dataset error:  0.01488062646239996
Anormaly dataset error:  0.043484628200531006
def predict(model, X):pred = model.predict(X, verbose=False)loss = mae(pred, X)return pred, loss

_, train_loss = predict(model, X_train)
_, test_loss = predict(model, X_test)
_, anomaly_loss = predict(model, anomaly)
threshold = np.mean(train_loss) + np.std(train_loss) # Setting threshold for distinguish normal data from anomalous databins = 40
plt.figure(figsize=(9, 5), dpi=100)
sns.histplot(np.clip(train_loss, 0, 0.5), bins=bins, kde=True, label="Train Normal")
sns.histplot(np.clip(test_loss, 0, 0.5), bins=bins, kde=True, label="Test Normal")
sns.histplot(np.clip(anomaly_loss, 0, 0.5), bins=bins, kde=True, label="anomaly")ax = plt.gca()  # Get the current Axes
ylim = ax.get_ylim()
plt.vlines(threshold, 0, ylim[-1], color="k", ls="--")
plt.annotate(f"Threshold: {threshold:.3f}", xy=(threshold, ylim[-1]), xytext=(threshold+0.009, ylim[-1]),arrowprops=dict(facecolor='black', shrink=0.05), fontsize=9)
plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
plt.show()

def plot_examples(model, data, ax, title):pred, loss = predict(model, data)ax.plot(data.flatten(), label="Actual")ax.plot(pred[0], label = "Predicted")ax.fill_between(range(1, 188), data.flatten(), pred[0], alpha=0.3, color="r")ax.legend(shadow=True, frameon=True,facecolor="inherit", loc=1, fontsize=7)
#                bbox_to_anchor = (0, 0, 0.8, 0.25))ax.set_title(f"{title} (loss: {loss[0]:.3f})", fontsize=9.5)
fig, axes = plt.subplots(2, 5, sharey=True, sharex=True, figsize=(12, 6))
random_indexes = np.random.randint(0, len(X_train), size=5)for i, idx in enumerate(random_indexes):data = X_train[[idx]]plot_examples(model, data, ax=axes[0, i], title="Normal")for i, idx in enumerate(random_indexes):data = anomaly[[idx]]plot_examples(model, data, ax=axes[1, i], title="anomaly")plt.tight_layout()
fig.suptitle("Sample plots (Actual vs Reconstructed by the CNN autoencoder)", y=1.04, weight="bold")
fig.savefig("autoencoder.png")
plt.show()

Model Evaluation

def evaluate_model(model, data):pred, loss = predict(model, data)if id(data) == id(anomaly):accuracy = np.sum(loss > threshold)/len(data)else:accuracy = np.sum(loss <= threshold)/len(data)return f"Accuracy: {accuracy:.2%}"
print("Training", evaluate_model(model, X_train))
print("Testing", evaluate_model(model, X_test))
print("Anomaly", evaluate_model(model, anomaly))
Training Accuracy: 88.66%
Testing Accuracy: 84.51%
Anomaly Accuracy: 77.34%
def prepare_labels(model, train, test, anomaly, threshold=threshold):ytrue = np.concatenate((np.ones(len(X_train)+len(X_test), dtype=int), np.zeros(len(anomaly), dtype=int)))_, train_loss = predict(model, train)_, test_loss = predict(model, test)_, anomaly_loss = predict(model, anomaly)train_pred = (train_loss <= threshold).numpy().astype(int)test_pred = (test_loss <= threshold).numpy().astype(int)anomaly_pred = (anomaly_loss < threshold).numpy().astype(int)ypred = np.concatenate((train_pred, test_pred, anomaly_pred))return ytrue, ypred
def plot_confusion_matrix(model, train, test, anomaly, threshold=threshold):ytrue, ypred = prepare_labels(model, train, test, anomaly, threshold=threshold)accuracy = accuracy_score(ytrue, ypred)precision = precision_score(ytrue, ypred)recall = recall_score(ytrue, ypred)f1 = f1_score(ytrue, ypred)print(f"""\Accuracy: {accuracy:.2%}Precision: {precision:.2%}Recall: {recall:.2%}f1: {f1:.2%}\n""")cm = confusion_matrix(ytrue, ypred)cm_norm = confusion_matrix(ytrue, ypred, normalize="true")data = np.array([f"{count}\n({pct:.2%})" for count, pct in zip(cm.ravel(), cm_norm.ravel())]).reshape(cm.shape)labels = ["Anomaly", "Normal"]plt.figure(figsize=(5, 4))sns.heatmap(cm, annot=data, fmt="", xticklabels=labels, yticklabels=labels)plt.ylabel("Actual")plt.xlabel("Predicted")plt.title("Confusion Matrix", weight="bold")plt.tight_layout()
plot_confusion_matrix(model, X_train, X_test, anomaly, threshold=threshold)
Accuracy: 80.32%
Precision: 59.94%
Recall: 88.03%
f1: 71.32%

ytrue, ypred = prepare_labels(model, X_train, X_test, anomaly, threshold=threshold)
print(classification_report(ytrue, ypred, target_names=CLASS_NAMES))

precision recall f1-score support
Normal 0.94 0.77 0.85 10505
Anomaly 0.60 0.88 0.71 4045
accuracy 0.80 14550
macro avg 0.77 0.83 0.78 14550
weighted avg 0.85 0.80 0.81 14550

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

相关文章:

  • 近期学习文章
  • 基于单电阻采样的电流重构
  • <Rust><iced><resvg>基于rust使用iced构建GUI实例:使用resvg库实现svg转png
  • 数据仓库的实际应用示例-广告投放平台为例
  • 解决Qt中 -lGL无法找到的问题
  • Java数据类型与运算符
  • 小游戏app看广告app开发案例
  • NodeJs实现对本地 mysql 数据库的增删改查
  • Flink 窗口函数
  • 网络编程5----初识http
  • 类AAAAAAAAAAAA迭代
  • SpringBoot使用Redisson实现可重入分布式锁
  • 如何正确理解和评估品牌价值?
  • C语言循环中获取之前变量的值
  • fataadmin导出Exel文件图片太大
  • Angular 响应式表单之下拉框
  • Java小白进阶笔记(3)-初级面向对象
  • JS字符串转数字方法总结
  • Kibana配置logstash,报表一体化
  • MySQL主从复制读写分离及奇怪的问题
  • 从0到1:PostCSS 插件开发最佳实践
  • 基于 Ueditor 的现代化编辑器 Neditor 1.5.4 发布
  • 如何设计一个微型分布式架构?
  • 它承受着该等级不该有的简单, leetcode 564 寻找最近的回文数
  • ​软考-高级-信息系统项目管理师教程 第四版【第23章-组织通用管理-思维导图】​
  • # MySQL server 层和存储引擎层是怎么交互数据的?
  • #知识分享#笔记#学习方法
  • (6)设计一个TimeMap
  • (cos^2 X)的定积分,求积分 ∫sin^2(x) dx
  • (Python) SOAP Web Service (HTTP POST)
  • (二)构建dubbo分布式平台-平台功能导图
  • (附源码)spring boot网络空间安全实验教学示范中心网站 毕业设计 111454
  • (三十五)大数据实战——Superset可视化平台搭建
  • (学习日记)2024.03.12:UCOSIII第十四节:时基列表
  • (已更新)关于Visual Studio 2019安装时VS installer无法下载文件,进度条为0,显示网络有问题的解决办法
  • (转)AS3正则:元子符,元序列,标志,数量表达符
  • (转)创业的注意事项
  • *_zh_CN.properties 国际化资源文件 struts 防乱码等
  • .NET 6 在已知拓扑路径的情况下使用 Dijkstra,A*算法搜索最短路径
  • .NET Remoting Basic(10)-创建不同宿主的客户端与服务器端
  • .net反编译工具
  • .Net开发笔记(二十)创建一个需要授权的第三方组件
  • .NET实现之(自动更新)
  • /proc/vmstat 详解
  • @Conditional注解详解
  • @EventListener注解使用说明
  • [ element-ui:table ] 设置table中某些行数据禁止被选中,通过selectable 定义方法解决
  • []常用AT命令解释()
  • [C#]猫叫人醒老鼠跑 C#的委托及事件
  • [CISCN2019 华东北赛区]Web2
  • [cogs2652]秘术「天文密葬法」
  • [EULAR文摘] 脊柱放射学持续进展是否显著影响关节功能
  • [IOI2018] werewolf 狼人
  • [luogu4162 SCOI2009] 最长距离(最短路)
  • [python基础] python 2与python 3之间的区别 —— round