Anomaly Detection using Autoencoders
Anomaly Detection using Autoencoders
텐서플로우에서 오토인코더를 사용하여 사기 탐지를 수행한다.
오토인코더(autoencoder)가 무엇인지, 어떻게 동작하는지, 어디에 사용하는지 알아보고 마지막으로 비정상 탐지를 위한 오토인코더를 구현한다.
오토인코더는 입력 데이터의 숨은 표현을 포함하는 중간에 좁은 병목 레이어(narrow bottlenect layer)를 갖는 신경망을 사용하여 고차원 입력 데이터를 재구성하기 위해 사용되는 생산적 비지도 심층 학습(generative unsupervised deep learning) 알고리즘이다.
Source https://lilianweng.github.io/lil-log/2018/08/12/from-autoencoder-to-beta-vae.html
오토인코더는 인코더와 디코더로 구성된다.
- 인코더(encoder) 네트워크 : 고차원 입력 데이터를 받아들여 이를 숨은 저차원 데이터로 변환한다. 인코더 네트워크의 입력 크기는 출력의 크기보다 더 크다.
- 디코더(decoder) 네트워크 : 디코더 네트워크는 인코더 네트워크의 출력으로부터 입력을 받는다. 디코더의 목적은 입력 데이터를 재구축하는 것이다. 디코더 네트워크의 출력 크기는 입력 크기보다 더 크다.
오토인코더는 고차원 입력데이터를 받아들여 bottleneck hidden layer에서 이를 숨은 공간 표현(latent-space representation)으로 압축한다. 디코더는 원래의 입력 데이터를 재구축하기 위해 입력으로 데이터의 숨은 표현을 가져온다.
Autoencoders Usage
- 차원감소(Dimensionality Reduction). 인코더는 입력을 선형 및 비선형 대이터의 차원을 줄이기 위해 히든레이어로 인코딩한다. 그래서 PCA보다 훨씬 강력하다.
- 추천엔진(Recommendation Engines)
- 이상탐지(Anomaly Detection) : 오토인코더는 훈련의 일부로 재구성 오류(reconstruction error)를 최소화하려 한다. 이상치는 재구성 손실의 크기를 점검하여 탐지된다.
- 이미지 잡음제거(Denoising images) : 변질된 이이지가 원래 버전으로 복원될 수 있다.
- 이미지 인식(image recognition) : 겹겹이 쌓인 오토인코더(stacked autoencoder)는 이미지의 다른 특성을 학습하여 이미지 이식에 사용된다.
- 이미지 생성(image generation) : 오토인코더의 한 종류인 변형 오토인코더(VAE, Variational Autoencoder)는 이미지 생성에 사용된다.
다른 형태의 오토인코더에 대해서는 이글을 참조하자.
오토인코더를 이용한 이상탐지(anomaly detection)
고차원 데이터셋에서 이상치를 탐지하기 위해 다음단계를 따라해보자. 이 단계는 불균형 데이터셋(unbalanced dataset)에도 적용 가능하다.
- 훈련동안에는 인코더에 오직 정상 거래(normal transaction)만을 입력한다. 병목 레이어(bottlenect layer)가 정상적인 입력 데이터의 숨은 표현을 학습한다.
- 디코더는 원래 입력 데이터의 정상 거래를 재구성하기 위해 병목 레이어 출력을 사용한다.
- 사기 거래(fraudulent transaction)는 정상 거래와 차이가 있다. 오토인코더는 사기 거래를 재구성하는 것에 문제가 생기고 이로인해 재구성 오류(reconstruction error)가 높아진다.
- 여러분은 재구성 오류에 대한 특정 임계치를 기준으로 새로운 거래가 사기라고 표시할 수 있다.
오토인코더를 사용한 이상탐지 구현
이 글에서 데이터셋은 Credit Card Fraud Detection from Kaggle을 사용한다.
[다운로드 1, 2, 3, 4, 5, 6, 7]
필요 라이브러리 Import
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, recall_score, accuracy_score, precision_score
RANDOM_SEED = 2021
TEST_PCT = 0.3
LABELS = ["Normal","Fraud"]
데이터셋 Read
dataset = pd.read_csv("creditcard.csv")
탐험적 데이터 분석(Explortory Data Analysis)
#check for any nullvalues
print("Any nulls in the dataset ",dataset.isnull().values.any() )
print('-------')
print("No. of unique labels ", len(dataset['Class'].unique()))
print("Label values ",dataset.Class.unique())
#0 is for normal credit card transaction
#1 is for fraudulent credit card transaction
print('-------')
print("Break down of the Normal and Fraud Transactions")
print(pd.value_counts(dataset['Class'], sort = True) )
데이터 시각화
데이터셋에서 정상과 사기 거래의 수를 도표로 그려본다.
#Visualizing the imbalanced dataset
count_classes = pd.value_counts(dataset['Class'], sort = True)
count_classes.plot(kind = 'bar', rot=0)
plt.xticks(range(len(dataset['Class'].unique())), dataset.Class.unique())
plt.title("Frequency by observation number")
plt.xlabel("Class")
plt.ylabel("Number of Observations");
전체 정상과 사기 거래를 시각화 해본다.
# Save the normal and fradulent transactions in separate dataframe
normal_dataset = dataset[dataset.Class == 0]
fraud_dataset = dataset[dataset.Class == 1]
#Visualize transactionamounts for normal and fraudulent transactions
bins = np.linspace(200, 2500, 100)
plt.hist(normal_dataset.Amount, bins=bins, alpha=1, density=True, label='Normal')
plt.hist(fraud_dataset.Amount, bins=bins, alpha=0.5, density=True, label='Fraud')
plt.legend(loc='upper right')
plt.title("Transaction amount vs Percentage of transactions")
plt.xlabel("Transaction amount (USD)")
plt.ylabel("Percentage of transactions");
plt.show()
훈련/테스트 데이터셋 생성
데이터셋을 확인해 본다.
'Time'과 'Amount'는 스케일(scale)되지 않은 컬럼이다. 따라서 이 두 컬럼에 StandardScalar(sklearn의 표준화 함수)를 적용한다. 값을 0과 1시이로 정규화하는 것은 이 데이터셋에 잘 동작하지 않는다.
- 표준화(Standardization) : 평균을 기준으로 얼마만큼 떨어져 있는가?
$\frac{(x - \mu)}{\sigma}$
- $\mu$ : 평균, $\sigma$ : 표준편차
- 정규화(Normalization) : 전체 구간을 0~1사이로 나누어 관찰, 데이터셋에서 특정 데이터가 갖는 위치 관찰
$\frac{(x - x_{min})}{(x_{max} - x_{min})}$
$\frac{(x - x_{min})}{(x_{max} - x_{min})}$
sc=StandardScaler()
dataset['Time'] = sc.fit_transform(dataset['Time'].values.reshape(-1, 1))
dataset['Amount'] = sc.fit_transform(dataset['Amount'].values.reshape(-1, 1))
데이터셋에서 마지막 컬럼(class)이 목표 변수이다.
raw_data = dataset.values
# The last element contains if the transaction is normal which is represented by a 0 and if fraud then 1
labels = raw_data[:, -1]
# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]
train_data, test_data, train_labels, test_labels = train_test_split(
data, labels, test_size=0.2, random_state=2021
)
정규화(Normailzaiotn)
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)
train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)
train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)
오토인코더 훈련에는 정상 거래만을 사용한다!
정상 데이터는 목표변수에 0값을 갖는다. 정상과 사기 데이터셋을 만들기 위해 목표 변수를 사용한다.
train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)
#creating normal and fraud datasets
normal_train_data = train_data[~train_labels]
normal_test_data = test_data[~test_labels]
fraud_train_data = train_data[train_labels]
fraud_test_data = test_data[test_labels]
print(" No. of records in Fraud Train Data=",len(fraud_train_data))
print(" No. of records in Normal Train data=",len(normal_train_data))
print(" No. of records in Fraud Test Data=",len(fraud_test_data))
print(" No. of records in Normal Test data=",len(normal_test_data))
훈련 파라미터값 설정
nb_epoch = 50
batch_size = 64
input_dim = normal_train_data.shape[1] #num of columns, 30
encoding_dim = 14
hidden_dim_1 = int(encoding_dim / 2) #
hidden_dim_2=4
learning_rate = 1e-7
오토인코더 생성
오토인코더의 구조는 아래와 같다.
image by author
#input Layer
input_layer = tf.keras.layers.Input(shape=(input_dim, ))
#Encoder
encoder = tf.keras.layers.Dense(encoding_dim, activation="tanh", activity_regularizer=tf.keras.regularizers.l2(learning_rate))(input_layer)
encoder=tf.keras.layers.Dropout(0.2)(encoder)
encoder = tf.keras.layers.Dense(hidden_dim_1, activation='relu')(encoder)
encoder = tf.keras.layers.Dense(hidden_dim_2, activation=tf.nn.leaky_relu)(encoder)
# Decoder
decoder = tf.keras.layers.Dense(hidden_dim_1, activation='relu')(encoder)
decoder=tf.keras.layers.Dropout(0.2)(decoder)
decoder = tf.keras.layers.Dense(encoding_dim, activation='relu')(decoder)
decoder = tf.keras.layers.Dense(input_dim, activation='tanh')(decoder)
#Autoencoder
autoencoder = tf.keras.Model(inputs=input_layer, outputs=decoder)
autoencoder.summary()
checkpoints와 early stopping을 위한 콜백 정의
cp = tf.keras.callbacks.ModelCheckpoint(filepath="autoencoder_fraud.h5",
mode='min', monitor='val_loss', verbose=2, save_best_only=True)
# define our early stopping
early_stop = tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
min_delta=0.0001,
patience=10,
verbose=1,
mode='min',
restore_best_weights=True
컴파일
autoencoder.compile(metrics=['accuracy'],
loss='mean_squared_error',
optimizer='adam')
훈련
history = autoencoder.fit(normal_train_data, normal_train_data,
epochs=nb_epoch,
batch_size=batch_size,
shuffle=True,
validation_data=(test_data, test_data),
verbose=1,
callbacks=[cp, early_stop]
).history
훈련/테스트 손실 그래프
plt.plot(history['loss'], linewidth=2, label='Train')
plt.plot(history['val_loss'], linewidth=2, label='Test')
plt.legend(loc='upper right')
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
#plt.ylim(ymin=0.70,ymax=1)
plt.show()
테스트셋에서 이상 탐지
이상치는 재구성 손실(reconstruction loss)이 더 높은 데이터 포인트이다
테스트 데이터에서 재구성 손실을 계산하려면 테스트 데이터를 예측하고 테스트 데이타와 재구성된 테스트 데이터간 MSE(Mean Square Error)를 계산한다.
test_x_predictions = autoencoder.predict(test_data)
mse = np.mean(np.power(test_data - test_x_predictions, 2), axis=1)
error_df = pd.DataFrame({'Reconstruction_error': mse,
'True_class': test_labels})
테스트 데이터셋을 도표로 그리고 임계치값 조정이 필요하다면 각각의 재구성 오류가 시각화를 위해 임계치값을 설정된다.
threshold_fixed = 50
groups = error_df.groupby('True_class')
fig, ax = plt.subplots()
for name, group in groups:
ax.plot(group.index, group.Reconstruction_error, marker='o', ms=3.5, linestyle='',
label= "Fraud" if name == 1 else "Normal")
ax.hlines(threshold_fixed, ax.get_xlim()[0], ax.get_xlim()[1], colors="r", zorder=100, label='Threshold')
ax.legend()
plt.title("Reconstruction error for normal and fraud data")
plt.ylabel("Reconstruction error")
plt.xlabel("Data point index")
plt.show();
고정된 임계치보다 더 큰 재구성 오류가 있는 점을 이상치로 탐지한다. 위 그림에서는 임계치로 52값이 좋다.
이상 탐지 성능 평가
threshold_fixed =52
pred_y = [1 if e > threshold_fixed else 0 for e in error_df.Reconstruction_error.values]
error_df['pred'] =pred_y
conf_matrix = confusion_matrix(error_df.True_class, pred_y)
plt.figure(figsize=(4, 4))
sns.heatmap(conf_matrix, xticklabels=LABELS, yticklabels=LABELS, annot=True, fmt="d");
plt.title("Confusion matrix")
plt.ylabel('True class')
plt.xlabel('Predicted class')
plt.show()
# print Accuracy, precision and recall
print(" Accuracy: ",accuracy_score(error_df['True_class'], error_df['pred']))
print(" Recall: ",recall_score(error_df['True_class'], error_df['pred']))
print(" Precision: ",precision_score(error_df['True_class'], error_df['pred']))
여기서 사용한 데이터셋은 엄청나게 불균형하기 때문에 높은 정확도(accuracy)를 보이지만 낮은 재현률(recall)과 정밀도(precision)를 보인다.
정밀도와 재현률을 더 개선하기 위한 것으로는 좀더 관련된 특성, 오토인코더의 다른 구조, 다른 하이퍼파라미터 또는 다른 알고리즘을 추가할 수 있다.
Conclusion
오토인코더는 많은 긍정 데이터와 단지 약간의 부정 데이터만있는 불균형한 데이터셋이 있을 때 이상 탐지 알고리즘으로 사용될 수 있다. 오토인코더는 재구성 오류를 최소화하기 위해 훈련된다. 오토인코더를 정상 또는 긍정 데이터에 대해 훈련할 때 우리는 이상치는 정상 또는 긍정 데이터보다 더 높은 재구성 오류를 갖는다고 가정한다.