趋近智
自编码器通过最小化输入和输出之间的重构误差,善于学习数据的压缩表示。这一特点使其非常适合异常检测。其主要思路很简单:在表示正常状态或行为的数据上训练自编码器。当遇到显著偏离正常状态的数据(异常)时,由于自编码器仅在正常模式上进行过训练,它将难以准确重构此类数据。这会导致相比正常数据点更高的重构误差。通过设定一个误差阈值,我们可以区分正常实例和潜在异常。
图像数据的一个例子是MNIST数字。例如,可以将一个数字类别(例如“1”)视为正常,而其他数字类别则视为异常。
1. 数据准备
假设我们加载MNIST数据集。我们将数字“1”的图像指定为正常数据并用于训练。其他数字(“0”、“2”-“9”)将在评估期间作为异常。通过归一化像素值(例如,缩放到[0, 1])来预处理图像,如果使用全连接自编码器,则可能将其展平;如果使用卷积自编码器,则保持其2D结构。
# 数据加载(使用TensorFlow/Keras进行说明)
import tensorflow as tf
import numpy as np
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
# 归一化并重塑
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
# 选择“正常”数据(例如数字1)进行训练
x_train_normal = x_train[y_train == 1]
x_val_normal = x_test[y_test == 1][:500] # 使用部分测试集中的“1”用于验证阈值
x_test_normal = x_test[y_test == 1][500:]
x_test_anomalies = x_test[y_test != 1]
print(f"训练集正常数据形状: {x_train_normal.shape}")
print(f"验证集正常数据形状: {x_val_normal.shape}")
print(f"测试集正常数据形状: {x_test_normal.shape}")
print(f"测试集异常数据形状: {x_test_anomalies.shape}")
# 示例:为全连接自编码器展平图像
# x_train_normal = x_train_normal.reshape((len(x_train_normal), np.prod(x_train_normal.shape[1:])))
# x_val_normal = x_val_normal.reshape((len(x_val_normal), np.prod(x_val_normal.shape[1:])))
# x_test_normal = x_test_normal.reshape((len(x_test_normal), np.prod(x_test_normal.shape[1:])))
# x_test_anomalies = x_test_anomalies.reshape((len(x_test_anomalies), np.prod(x_test_anomalies.shape[1:])))
# input_shape = x_train_normal.shape[1]
# 示例:为卷积自编码器重塑(添加通道维度)
x_train_normal = np.expand_dims(x_train_normal, axis=-1)
x_val_normal = np.expand_dims(x_val_normal, axis=-1)
x_test_normal = np.expand_dims(x_test_normal, axis=-1)
x_test_anomalies = np.expand_dims(x_test_anomalies, axis=-1)
input_shape = x_train_normal.shape[1:]
2. 模型构建(卷积自编码器示例)
卷积自编码器 (CAE) 通常适合图像数据。
# CAE模型定义 (TensorFlow/Keras)
from tensorflow.keras import layers, models
latent_dim = 32
encoder_inputs = tf.keras.Input(shape=input_shape)
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same', strides=2)(encoder_inputs)
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same', strides=2)(x)
# 需要形状信息:最后一个Conv2D层的输出形状
# 假设输入28x28,经过两次步长为2的卷积后,形状为7x7x64
shape_before_flattening = tf.keras.backend.int_shape(x)[1:]
x = layers.Flatten()(x)
encoder_outputs = layers.Dense(latent_dim, name='encoder_output')(x)
encoder = models.Model(encoder_inputs, encoder_outputs, name='encoder')
decoder_inputs = tf.keras.Input(shape=(latent_dim,))
# 需要与编码器中展平前的形状匹配
x = layers.Dense(np.prod(shape_before_flattening))(decoder_inputs)
x = layers.Reshape(shape_before_flattening)(x)
x = layers.Conv2DTranspose(64, (3, 3), activation='relu', padding='same', strides=2)(x)
x = layers.Conv2DTranspose(32, (3, 3), activation='relu', padding='same', strides=2)(x)
decoder_outputs = layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x) # 灰度图输出通道为1
decoder = models.Model(decoder_inputs, decoder_outputs, name='decoder')
autoencoder = models.Model(encoder_inputs, decoder(encoder_outputs), name='autoencoder')
autoencoder.compile(optimizer='adam', loss='mse')
autoencoder.summary()
3. 训练
仅在 x_train_normal 数据上训练模型。
# 训练调用
history = autoencoder.fit(x_train_normal, x_train_normal,
epochs=20, # 根据需要调整训练轮次
batch_size=128,
shuffle=True,
validation_data=(x_val_normal, x_val_normal)) # 此处也使用正常验证数据
4. 计算重构误差
使用训练好的模型预测(重构)样本并计算每个样本的MSE。
# 计算验证集正常数据的重构误差
reconstructions_val = autoencoder.predict(x_val_normal)
val_errors = tf.keras.losses.mse(
tf.reshape(x_val_normal, (len(x_val_normal), -1)),
tf.reshape(reconstructions_val, (len(reconstructions_val), -1))
) # 计算每个样本的MSE
# 计算测试集正常数据的误差
reconstructions_test_normal = autoencoder.predict(x_test_normal)
test_normal_errors = tf.keras.losses.mse(
tf.reshape(x_test_normal, (len(x_test_normal), -1)),
tf.reshape(reconstructions_test_normal, (len(reconstructions_test_normal), -1))
)
# 计算测试集异常数据的误差
reconstructions_test_anomalies = autoencoder.predict(x_test_anomalies)
test_anomaly_errors = tf.keras.losses.mse(
tf.reshape(x_test_anomalies, (len(x_test_anomalies), -1)),
tf.reshape(reconstructions_test_anomalies, (len(reconstructions_test_anomalies), -1))
)
5. 阈值选择
使用来自正常验证集(val_errors)的误差来确定阈值。
# 示例:使用验证误差的第99百分位数作为阈值
threshold = np.percentile(val_errors.numpy(), 99)
print(f"异常阈值(第99百分位数): {threshold:.6f}")
# 替代方案:使用均值 + k * 标准差
# threshold = np.mean(val_errors.numpy()) + 3 * np.std(val_errors.numpy())
# print(f"异常阈值(均值 + 3*标准差): {threshold:.6f}")
6. 评估
根据阈值对测试样本进行分类并进行评估。
# 分类测试样本
normal_predictions = test_normal_errors <= threshold
anomaly_predictions = test_anomaly_errors > threshold
# 组合结果进行评估(示例)
true_positives = np.sum(anomaly_predictions)
false_negatives = len(test_anomaly_errors) - true_positives
true_negatives = np.sum(normal_predictions)
false_positives = len(test_normal_errors) - true_negatives
print(f"检测到的异常(真阳性): {true_positives} / {len(test_anomaly_errors)}")
print(f"正确分类为正常的样本(真阴性): {true_negatives} / {len(test_normal_errors)}")
print(f"错误分类为异常的正常样本(假阳性): {false_positives}")
print(f"漏报的异常(假阴性): {false_negatives}")
# 计算精确率、召回率、F1分数等指标
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
print(f"精确率: {precision:.4f}")
print(f"召回率: {recall:.4f}")
print(f"F1分数: {f1_score:.4f}")
绘制正常和异常测试数据的重构误差直方图,可以提供分离的视觉确认。
正常(蓝色)与异常(红色)测试样本的重构误差分布。异常通常表现出更高的误差,可以通过阈值进行分离。(注意:数值为示意)
这种实用方法说明了自编码器经过适当训练后,可以作为识别数据集中异常模式或离群点的有效工具,适用于图像处理、时间序列分析和网络安全等多种领域。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造