현업에서 사용하는 One-Class SVM 구현: 금융 이상 거래 탐지
1. 소개: 금융 데이터에서의 이상 탐지
금융 산업에서 이상 거래 탐지는 매우 중요한 문제입니다. 정상 거래 데이터만 가지고 비정상적인 패턴을 찾아내야 하는 경우가 많은데, 이럴 때 One-Class SVM이 효과적인 솔루션이 될 수 있습니다. 이 글에서는 파이썬의 scikit-learn 라이브러리를 사용하여 One-Class SVM을 구현하고 금융 거래 데이터에 적용하는 방법을 알아보겠습니다.
2. 필요한 라이브러리 설치 및 임포트
# 필요한 라이브러리 설치
# pip install scikit-learn numpy pandas matplotlib seaborn
# 라이브러리 임포트
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc
# 시각화 설정
plt.rcParams['font.family'] = 'Malgun Gothic' # 한글 폰트 설정
plt.rcParams['axes.unicode_minus'] = False # 마이너스 기호 깨짐 방지
sns.set(style='whitegrid')
3. 데이터 준비
3.1 신용카드 거래 데이터 샘플 생성
실제 데이터가 없는 경우, 간단한 시뮬레이션 데이터를 생성하여 사용할 수 있습니다.
# 정상 거래 데이터 생성
np.random.seed(42)
n_samples = 10000
# 정상 거래 특성: 금액, 시간(24시간), 위치(위도/경도), 카테고리(원핫인코딩 형태)
normal_amount = np.random.lognormal(mean=10, sigma=1, size=n_samples) # 금액 (로그정규분포)
normal_time = np.random.normal(loc=14, scale=4, size=n_samples) % 24 # 시간 (정규분포, 주로 낮 시간대)
normal_lat = np.random.normal(loc=37.5, scale=0.1, size=n_samples) # 위도 (서울 중심)
normal_lng = np.random.normal(loc=127.0, scale=0.1, size=n_samples) # 경도 (서울 중심)
normal_category = np.eye(5)[np.random.choice(5, n_samples, p=[0.4, 0.3, 0.2, 0.05, 0.05])] # 카테고리 (5가지)
# 데이터프레임 생성
normal_data = np.column_stack([normal_amount, normal_time, normal_lat, normal_lng, normal_category])
normal_df = pd.DataFrame(normal_data,
columns=['금액', '시간', '위도', '경도', '카테고리_1', '카테고리_2',
'카테고리_3', '카테고리_4', '카테고리_5'])
# 이상치 데이터 생성 (100개)
n_anomalies = 100
anomaly_amount = np.random.lognormal(mean=13, sigma=1, size=n_anomalies) # 비정상적으로 큰 금액
anomaly_time = np.random.normal(loc=3, scale=2, size=n_anomalies) % 24 # 새벽 시간대
anomaly_lat = np.random.normal(loc=35.0, scale=5, size=n_anomalies) # 평소와 다른 위치
anomaly_lng = np.random.normal(loc=135.0, scale=5, size=n_anomalies) # 평소와 다른 위치
anomaly_category = np.eye(5)[np.random.choice(5, n_anomalies, p=[0.05, 0.05, 0.05, 0.4, 0.45])] # 다른 카테고리 분포
# 이상치 데이터프레임 생성
anomaly_data = np.column_stack([anomaly_amount, anomaly_time, anomaly_lat, anomaly_lng, anomaly_category])
anomaly_df = pd.DataFrame(anomaly_data,
columns=['금액', '시간', '위도', '경도', '카테고리_1', '카테고리_2',
'카테고리_3', '카테고리_4', '카테고리_5'])
# 테스트를 위해 정상 데이터와 이상치 데이터 결합
anomaly_df['정상여부'] = 0 # 0: 이상치
normal_df['정상여부'] = 1 # 1: 정상
# 훈련용 데이터는 정상 데이터만 사용
# 테스트용 데이터는 정상 + 이상치 혼합 사용
train_ratio = 0.7
train_normal = normal_df.sample(frac=train_ratio, random_state=42)
test_normal = normal_df.drop(train_normal.index)
test_data = pd.concat([test_normal, anomaly_df])
print(f"훈련 데이터 크기: {train_normal.shape}")
print(f"테스트 데이터 크기: {test_data.shape}")
3.2 데이터 전처리
# 특성과 레이블 분리
X_train = train_normal.drop('정상여부', axis=1)
X_test = test_data.drop('정상여부', axis=1)
y_test = test_data['정상여부']
# 데이터 스케일링 (One-Class SVM에 중요)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print("데이터 스케일링 완료!")
4. One-Class SVM 모델 구현 및 학습
# One-Class SVM 모델 정의
# nu: 이상치로 간주할 데이터 비율 (0~1 사이)
# kernel: 커널 함수 ('rbf', 'linear', 'poly', 'sigmoid')
# gamma: RBF 커널의 파라미터
ocsvm = OneClassSVM(nu=0.01, kernel='rbf', gamma='auto')
# 정상 데이터만으로 학습
print("모델 학습 중...")
ocsvm.fit(X_train_scaled)
print("모델 학습 완료!")
# 모델 예측 (1: 정상, -1: 이상치로 출력됨)
pred_train = ocsvm.predict(X_train_scaled)
pred_test = ocsvm.predict(X_test_scaled)
# 모델의 출력값을 우리 데이터의 레이블과 일치시키기 위해 변환
# -1 -> 0 (이상치), 1 -> 1 (정상)
pred_train = np.where(pred_train == -1, 0, 1)
pred_test = np.where(pred_test == -1, 0, 1)
# 신뢰도 점수 계산
score_train = ocsvm.decision_function(X_train_scaled)
score_test = ocsvm.decision_function(X_test_scaled)
5. 모델 평가
# 혼동 행렬 및 분류 보고서
print("테스트 데이터에 대한 평가 결과:")
print("\n혼동 행렬:")
cm = confusion_matrix(y_test, pred_test)
print(cm)
print("\n분류 보고서:")
target_names = ['이상치', '정상']
print(classification_report(y_test, pred_test, target_names=target_names))
# 시각화: 혼동 행렬
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=target_names, yticklabels=target_names)
plt.title('혼동 행렬')
plt.ylabel('실제 클래스')
plt.xlabel('예측 클래스')
plt.show()
# ROC 곡선 및 AUC 계산
fpr, tpr, _ = roc_curve(y_test, score_test)
roc_auc = auc(fpr, tpr)
# 시각화: ROC 곡선
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC 곡선 (AUC = {roc_auc:.3f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('One-Class SVM ROC 곡선')
plt.legend(loc="lower right")
plt.show()
6. 이상치 탐지 시각화 및 분석
# 결과 시각화: 신뢰도 점수 분포
plt.figure(figsize=(10, 6))
sns.histplot(score_test[y_test == 1], color='green', alpha=0.5, label='정상 거래', bins=50)
sns.histplot(score_test[y_test == 0], color='red', alpha=0.5, label='이상 거래', bins=50)
plt.axvline(x=0, color='blue', linestyle='--', label='결정 경계')
plt.legend()
plt.title('One-Class SVM 신뢰도 점수 분포')
plt.xlabel('신뢰도 점수')
plt.ylabel('빈도')
plt.tight_layout()
plt.show()
# 탐지된 이상치 분석
anomalies_detected = X_test.iloc[pred_test == 0].copy()
anomalies_detected['실제_정상여부'] = y_test.iloc[pred_test == 0].values
anomalies_detected['신뢰도_점수'] = score_test[pred_test == 0]
print("탐지된 이상치 샘플 (상위 10개):")
print(anomalies_detected.sort_values('신뢰도_점수').head(10))
# 오탐(False Positive) 분석 - 정상이지만 이상치로 탐지된 거래
false_positives = anomalies_detected[anomalies_detected['실제_정상여부'] == 1]
print(f"\n오탐 건수: {len(false_positives)} (전체 정상 거래 중 {len(false_positives)/sum(y_test==1)*100:.2f}%)")
# 미탐(False Negative) 분석 - 이상치이지만 정상으로 탐지된 거래
false_negatives = X_test.iloc[(pred_test == 1) & (y_test == 0)]
print(f"\n미탐 건수: {len(false_negatives)} (전체 이상 거래 중 {len(false_negatives)/sum(y_test==0)*100:.2f}%)")
7. 개인별 맞춤형 모델 구현 예시
# 고객 ID별 데이터 그룹화 시뮬레이션
np.random.seed(42)
# 고객 ID 생성 (10명의 고객)
customer_ids = [f'고객_{i+1}' for i in range(10)]
# 각 고객별 정상 거래 패턴 생성
customer_data = {}
customer_models = {}
for cid in customer_ids:
# 각 고객별 특성 생성 (고객마다 다른 패턴)
n_transactions = np.random.randint(300, 700) # 거래 건수
# 고객별 고유 패턴 설정
amount_mean = np.random.uniform(8, 12) # 평균 거래 금액 (로그 스케일)
time_mean = np.random.uniform(9, 18) # 평균 거래 시간
lat_base = np.random.uniform(37.4, 37.6) # 기본 위도
lng_base = np.random.uniform(126.9, 127.1) # 기본 경도
# 고객별 카테고리 선호도
category_prefs = np.random.dirichlet(np.ones(5))
# 정상 거래 생성
c_amount = np.random.lognormal(mean=amount_mean, sigma=0.8, size=n_transactions)
c_time = np.random.normal(loc=time_mean, scale=3, size=n_transactions) % 24
c_lat = np.random.normal(loc=lat_base, scale=0.05, size=n_transactions)
c_lng = np.random.normal(loc=lng_base, scale=0.05, size=n_transactions)
c_category = np.eye(5)[np.random.choice(5, n_transactions, p=category_prefs)]
# 데이터프레임 생성
c_data = np.column_stack([c_amount, c_time, c_lat, c_lng, c_category])
c_df = pd.DataFrame(c_data, columns=['금액', '시간', '위도', '경도',
'카테고리_1', '카테고리_2', '카테고리_3',
'카테고리_4', '카테고리_5'])
# 고객별 데이터 저장
customer_data[cid] = c_df
# 고객별 One-Class SVM 모델 학습
scaler = StandardScaler()
c_scaled = scaler.fit_transform(c_df)
# 모델 파라미터는 고객별로 최적화할 수 있음
model = OneClassSVM(nu=0.02, kernel='rbf', gamma='auto')
model.fit(c_scaled)
# 모델과 스케일러 함께 저장
customer_models[cid] = {'model': model, 'scaler': scaler}
print(f"{len(customer_ids)}명의 고객별 맞춤형 모델 생성 완료!")
# 고객별 이상 거래 탐지 함수
def detect_anomaly(customer_id, transaction):
"""
고객의 거래가 이상치인지 탐지
Args:
customer_id: 고객 ID
transaction: 거래 데이터 (DataFrame 형태)
Returns:
is_normal: 정상 여부 (True/False)
score: 신뢰도 점수
"""
if customer_id not in customer_models:
return False, -999 # 알 수 없는 고객
# 고객의 모델과 스케일러 가져오기
model = customer_models[customer_id]['model']
scaler = customer_models[customer_id]['scaler']
# 데이터 스케일링
scaled_transaction = scaler.transform(transaction)
# 예측
prediction = model.predict(scaled_transaction)[0]
score = model.decision_function(scaled_transaction)[0]
# 결과 반환 (1: 정상, -1: 이상치)
is_normal = prediction == 1
return is_normal, score
# 테스트: 고객별 정상/이상 거래 시뮬레이션
for cid in customer_ids[:3]: # 처음 3명의 고객만 테스트
# 정상 거래 예시
normal_transaction = customer_data[cid].iloc[[0]]
# 이상 거래 예시 (금액을 100배로 변경)
abnormal_transaction = customer_data[cid].iloc[[0]].copy()
abnormal_transaction['금액'] = abnormal_transaction['금액'] * 100
# 결과 출력
is_normal, score = detect_anomaly(cid, normal_transaction)
print(f"{cid} 정상 거래 테스트: {'정상' if is_normal else '이상'} (점수: {score:.4f})")
is_normal, score = detect_anomaly(cid, abnormal_transaction)
print(f"{cid} 이상 거래 테스트: {'정상' if is_normal else '이상'} (점수: {score:.4f})")
print("-" * 50)
8. 파라미터 튜닝 및 최적화
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer
# 이상치 탐지를 위한 사용자 정의 점수 함수
def anomaly_detection_score(y_true, y_pred):
# One-Class SVM 출력 변환 (-1 -> 0, 1 -> 1)
y_pred_transformed = np.where(y_pred == -1, 0, 1)
# 혼동 행렬 계산
tn, fp, fn, tp = confusion_matrix(y_true, y_pred_transformed).ravel()
# 사용자 정의 점수: 정밀도와 재현율의 조화평균 (F1 스코어)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
return f1
# 교차 검증을 위한 데이터 준비 (정상 데이터에 소량의 이상치 포함)
cv_data = pd.concat([train_normal.sample(frac=0.5), anomaly_df.sample(n=20)])
X_cv = cv_data.drop('정상여부', axis=1)
y_cv = cv_data['정상여부']
# X_cv 데이터 스케일링
X_cv_scaled = scaler.transform(X_cv)
# 그리드 서치를 위한 파라미터 그리드
param_grid = {
'nu': [0.01, 0.05, 0.1, 0.2],
'kernel': ['rbf'],
'gamma': ['scale', 'auto', 0.1, 0.01, 0.001]
}
# 사용자 정의 점수 함수를 사용한 그리드 서치
custom_scorer = make_scorer(anomaly_detection_score)
# One-Class SVM 모델
ocsvm_tuning = OneClassSVM()
# 그리드 서치 (주의: OneClassSVM은 일반적인 교차 검증과 맞지 않음)
# 여기서는 예시로 제공하지만, 실제로는 더 복잡한 검증 방법 필요
print("파라미터 튜닝 중...")
grid_search = GridSearchCV(
estimator=ocsvm_tuning,
param_grid=param_grid,
scoring=custom_scorer,
cv=3,
n_jobs=-1
)
grid_search.fit(X_cv_scaled, y_cv)
print("최적 파라미터:", grid_search.best_params_)
print("최고 점수:", grid_search.best_score_)
# 최적 파라미터로 모델 재학습
best_ocsvm = OneClassSVM(**grid_search.best_params_)
best_ocsvm.fit(X_train_scaled)
# 최적화된 모델 평가
best_pred_test = best_ocsvm.predict(X_test_scaled)
best_pred_test = np.where(best_pred_test == -1, 0, 1)
print("\n최적화된 모델 성능:")
print(classification_report(y_test, best_pred_test, target_names=target_names))
9. 모델 저장 및 배포
import joblib
import datetime
# 현재 시간 기준 모델 버전 생성
model_version = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# 모델, 스케일러 및 관련 메타데이터 저장
model_info = {
'model': best_ocsvm,
'scaler': scaler,
'train_date': datetime.datetime.now(),
'model_version': model_version,
'parameters': best_ocsvm.get_params(),
'features': X_train.columns.tolist()
}
# 모델 저장
model_filename = f'one_class_svm_model_{model_version}.pkl'
joblib.dump(model_info, model_filename)
print(f"모델 저장 완료: {model_filename}")
# 저장된 모델 로드 및 사용 예시
loaded_model_info = joblib.load(model_filename)
loaded_ocsvm = loaded_model_info['model']
loaded_scaler = loaded_model_info['scaler']
# 새로운 거래 데이터에 적용 예시
new_transaction = pd.DataFrame({
'금액': [50000],
'시간': [22],
'위도': [37.55],
'경도': [127.02],
'카테고리_1': [0],
'카테고리_2': [0],
'카테고리_3': [0],
'카테고리_4': [1],
'카테고리_5': [0]
})
# 스케일링 및 예측
new_scaled = loaded_scaler.transform(new_transaction)
new_pred = loaded_ocsvm.predict(new_scaled)[0]
new_score = loaded_ocsvm.decision_function(new_scaled)[0]
print(f"새 거래 예측 결과: {'정상' if new_pred == 1 else '이상'} (점수: {new_score:.4f})")
10. 실시간 이상 탐지 시스템 구현 예시
# 실시간 탐지 시스템 모의 구현 (간단한 예시)
import time
from collections import deque
class RealTimeAnomalyDetector:
def __init__(self, model, scaler, history_length=100, threshold=None):
self.model = model
self.scaler = scaler
self.history = deque(maxlen=history_length)
self.threshold = threshold # 사용자 정의 임계값
def detect(self, transaction):
"""
실시간 이상 거래 탐지
Args:
transaction: 거래 정보 (DataFrame)
Returns:
is_anomaly: 이상 여부
score: 이상 점수
alert_level: 경고 수준 (상/중/하)
"""
# 스케일링
scaled_transaction = self.scaler.transform(transaction)
# 모델 예측
prediction = self.model.predict(scaled_transaction)[0]
score = self.model.decision_function(scaled_transaction)[0]
# 임계값 적용 (만약 사용자 정의 임계값이 있다면)
if self.threshold is not None:
is_anomaly = score < self.threshold
else:
is_anomaly = prediction == -1
# 경고 수준 결정
if not is_anomaly:
alert_level = "정상"
else:
if score < -0.5:
alert_level = "상"
elif score < -0.2:
alert_level = "중"
else:
alert_level = "하"
# 결과 이력에 추가
result = {
'timestamp': datetime.datetime.now(),
'transaction': transaction.iloc[0].to_dict(),
'is_anomaly': is_anomaly,
'score': score,
'alert_level': alert_level
}
self.history.append(result)
return is_anomaly, score, alert_level
def get_recent_alerts(self):
"""최근 이상 거래 경고 반환"""
return [h for h in self.history if h['is_anomaly']]
# 실시간 탐지 시스템 초기화
rt_detector = RealTimeAnomalyDetector(
model=loaded_ocsvm,
scaler=loaded_scaler,
threshold=-0.1 # 사용자 정의 임계값 예시
)
# 실시간 시뮬레이션
def generate_transaction():
"""임의의 거래 생성"""
if np.random.random() > 0.9: # 10% 확률로 이상 거래 생성
amount = np.random.lognormal(mean=13, sigma=1) # 크게 높은 금액
time_of_day = np.random.normal(loc=3, scale=2) % 24 # 새벽 시간
lat = np.random.normal(loc=35, scale=5) # 다른 위치
lng = np.random.normal(loc=135, scale=5) # 다른 위치
category = np.zeros(5)
category[np.random.choice([3, 4])] = 1 # 드문 카테고리
else: # 정상 거래
amount = np.random.lognormal(mean=10, sigma=1)
time_of_day = np.random.normal(loc=14, scale=4) % 24
lat = np.random.normal(loc=37.5, scale=0.1)
lng = np.random.normal(loc=127.0, scale=0.1)
category = np.zeros(5)
category[np.random.choice([0, 1, 2], p=[0.4, 0.3, 0.3])] = 1
return pd.DataFrame({
'금액': [amount],
'시간': [time_of_day],
'위도': [lat],
'경도': [lng],
'카테고리_1': [category[0]],
'카테고리_2': [category[1]],
'카테고리_3': [category[2]],
'카테고리_4': [category[3]],
'카테고리_5': [category[4]]
})
# 실시간 처리 시뮬레이션 (20개 거래)
print("실시간 이상 거래 탐지 시뮬레이션 시작...")
for i in range(20):
# 새 거래 생성
transaction = generate_transaction()
# 이상 탐지
start_time = time.time()
is_anomaly, score, alert_level = rt_detector.detect(transaction)
detection_time = time.time() - start_time
# 결과 출력
status = "이상" if is_anomaly else "정상"
print(f"거래 {i+1}: {status} (점수: {score:.4f}, 경고: {alert_level}, 소요시간: {detection_time*1000:.2f}ms)")
# 실제 시스템에서는 이 부분에서 알림 발송 등의 처리 수행
if is_anomaly:
print(f" ⚠️ 경고! 이상 거래 탐지: 금액={transaction['금액'][0]:.2f}, 시간={transaction['시간'][0]:.1f}시")
# 실시간성 시뮬레이션을 위한 짧은 지연
time.sleep(0.5)
# 시뮬레이션 결과 요약
alerts = rt_detector.get_recent_alerts()
print(f"\n총 {len(alerts)}개의 이상 거래 탐지됨")
11. 결론 및 현업 적용 시 고려사항
One-Class SVM은 금융 이상 거래 탐지에 매우 효과적인 방법입니다. 특히 정상 거래 데이터만으로 학습할 수 있어 새로운 형태의 사기에 대응할 수 있다는 장점이 있습니다. 하지만 실제 현업에서 적용할 때는 다음과 같은 점을 고려해야 합니다:
개인화된 모델링: 각 고객별로 별도의 모델을 구축하면 더 정확한 이상 탐지가 가능합니다.
실시간 처리: 대규모 거래 데이터를 실시간으로 처리하기 위한 인프라 구축이 필요합니다.
특성 엔지니어링: 거래 금액, 시간, 위치 외에도 거래 빈도, 거래 패턴 등의 파생 변수를 생성하면 모델의 성능을 향상시킬 수 있습니다.
임계값 최적화: 오탐과 미탐 사이의 균형을 맞추기 위해 신뢰도 점수의 임계값을 최적화해야 합니다.
모델 모니터링 및 업데이트: 시간이 지남에 따라 정상 패턴이 변할 수 있으므로, 주기적인 모델 재학습이 필요합니다.
정상 패턴만으로 학습하는 One-Class SVM의 철학은 "정상만 알면 된다"는 단순하지만 강력한 원칙을 따릅니다. 이 방법론으로 금융 산업에서 보다 안전하고 효율적인 이상 거래 탐지 시스템을 구축할 수 있습니다.
'AI 개발' 카테고리의 다른 글
| 쿠팡 로켓배송의 수요 예측 재고 예측 - 딥러닝 기반 시계열 예측 (0) | 2025.05.21 |
|---|---|
| Hugging Face AI Agents 만드는 방법 (허깅페이스 AI 에이전트 개발) (1) | 2025.05.08 |
| 금융에서 사용하는 AI - 이상 거래 탐지 시스템 구현 : Isolation Forest (1) | 2025.05.03 |
| CLIP 이란? OpenAI에서 개발한 멀티모달 모델 (1) | 2025.04.24 |
| Dockerizing MCP – Docker 도커와 MCP 핵심 요약 (0) | 2025.04.23 |