LlamaIndex

LlamaIndex를 이용한 Python 실시간 데이터 처리

Pro.Dev 2025. 1. 22. 12:31
반응형

LlamaIndex를 이용한 Python 실시간 데이터 처리

실시간으로 생성되는 데이터를 효율적으로 관리하고 검색하는 것은 많은 애플리케이션에서 중요한 요구사항입니다. 이번 글에서는 LlamaIndex를 활용해 실시간 데이터를 인덱싱하고 검색할 수 있는 Python 기반의 시스템을 구현하는 방법을 소개합니다.


1. 프로젝트 개요

목표

  • 실시간으로 생성되는 데이터를 LlamaIndex를 통해 인덱싱합니다.
  • 인덱싱된 데이터에 대해 자연어 질의를 통해 정보를 검색합니다.

주요 기능

  • 실시간 데이터 스트림 처리
  • LlamaIndex 기반의 데이터 인덱싱 및 검색
  • 지속적인 데이터 업데이트

2. 개발 환경 준비

2.1 필수 라이브러리 설치

다음 명령어를 실행하여 필요한 라이브러리를 설치합니다:

pip install llama-index openai

참고: OpenAI API 사용을 위해 API 키를 발급받아야 합니다.

2.2 기본 설정

Python 3.8 이상의 환경에서 개발하는 것을 권장하며, 가상 환경(venv, Conda)을 사용하면 프로젝트 의존성 관리를 간편하게 할 수 있습니다.


3. 실시간 데이터 처리 시스템 구현

3.1 데이터 스트림 시뮬레이션

실시간 데이터를 시뮬레이션하기 위해 queue.Queue를 활용합니다. 외부 API, IoT 센서 또는 로그 파일과 같은 실제 데이터를 대체합니다.

import time
import queue
from threading import Thread

# 실시간 데이터 스트림 생성
data_stream = queue.Queue()

# 데이터 생성 스레드
def generate_data():
    sample_data = [
        "Alice logged in",
        "Bob uploaded a file",
        "Charlie updated the settings",
        "Alice logged out",
        "Diana created a new account"
    ]
    for item in sample_data:
        data_stream.put(item)
        time.sleep(2)  # 2초마다 데이터 추가

Thread(target=generate_data, daemon=True).start()

3.2 실시간 데이터 인덱싱

스트림에서 데이터를 읽어와 LlamaIndex로 인덱싱합니다.

import os
from llama_index import GPTSimpleVectorIndex, Document

# OpenAI API 키 설정
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

# 초기 인덱스 생성
index = GPTSimpleVectorIndex([])

# 데이터 스트림 처리 함수
def process_stream():
    print("실시간 데이터 처리 시작...")
    while True:
        if not data_stream.empty():
            new_data = data_stream.get()
            print(f"새 데이터 수신: {new_data}")
            document = Document(new_data)
            index.insert(document)
            print("데이터 인덱싱 완료.")
        time.sleep(1)

Thread(target=process_stream, daemon=True).start()

3.3 실시간 검색 구현

사용자가 실시간으로 데이터를 검색할 수 있는 인터페이스를 제공합니다.

# 검색 질의 처리
while True:
    query = input("질문을 입력하세요 (종료하려면 'exit' 입력): ")
    if query.lower() == "exit":
        print("프로그램을 종료합니다.")
        break
    response = index.query(query)
    print(f"답변: {response}")

4. 실행 결과

실행 흐름

  1. 데이터 스트림 생성: 새로운 데이터가 실시간으로 생성되고 스트림에 추가됩니다.
  2. 실시간 인덱싱: 새로운 데이터가 수신되면 LlamaIndex에 추가됩니다.
  3. 질의 및 응답: 사용자가 입력한 질문에 대해 현재 인덱싱된 데이터를 기반으로 답변이 생성됩니다.

실행 예시

새 데이터 수신: Alice logged in
데이터 인덱싱 완료.

새 데이터 수신: Bob uploaded a file
데이터 인덱싱 완료.

질문을 입력하세요 (종료하려면 'exit' 입력): Alice는 무엇을 했나요?
답변: Alice logged in

질문을 입력하세요 (종료하려면 'exit' 입력): exit
프로그램을 종료합니다.

5. 확장 가능성

5.1 데이터 소스 확장

  • API 연동: 외부 API에서 데이터를 실시간으로 가져와 처리할 수 있습니다.
  • IoT 센서 데이터: IoT 장치에서 스트리밍되는 데이터를 처리하여 실시간 분석에 활용할 수 있습니다.

5.2 검색 성능 최적화

  • 멀티스레딩: 데이터 처리와 검색을 병렬로 처리하여 성능을 최적화합니다.
  • 데이터 정규화: 데이터의 일관성을 유지하여 검색 결과의 정확도를 높입니다.

5.3 사용자 인터페이스 개발

  • 웹 애플리케이션: Flask나 FastAPI를 사용해 실시간 검색 기능을 제공하는 웹 애플리케이션을 구현할 수 있습니다.
  • 대시보드: Streamlit을 활용해 실시간 데이터와 검색 결과를 시각화할 수 있습니다.

참고 자료

반응형