728x90
confluence 데이터를 바탕으로 chat gpt 직접 구현해보려고 한다.
[구성]
1편 : confluence API 를 활용하여 데이터 저장하기 ( store )
2편 : 벡터db에 저장해놓은 데이터 업데이트하기 ( update )
개발환경 : vscode, python, fastapi, vectorDB
domain > ai > router > knowledge_router.py
from fastapi import APIRouter
router = APIRouter(
prefix = "api/knowledge"
)
@router.post("/kms/store")
def store_kmsdata_vectordb(collection_name: str, space_name:str, limit_number:int = None):
return store_kms_vectordb(path = "./static/chroma_kms", space_name = space_name, collection_name = collection_name, limit_data = limit_number)
domain>knowledge>manager>omni_confluence_manager.py
# kms 데이터 collection 저장
def store_kms_vectordb(path="./static/chroma_kms",
space_name="OSSDevelopment2",
collection_name="kms_data", lilmit_data=None) :
# kms data loader
load_result = load_kms_data_by_space(space_name)
# chunk
chunked_df = chunk_documents(load_result)
# filter
filtered_df = filter_dataframe(chunked_df)
filtered_df = filtered_df.iloc[:limit_data]
# 데이터 가공
filtered_df[['use', 'subject', 'clean_content']] = filtered_df.page_contens.apply(extract_kms_info)
embeded_df = enrich_embedding_column(filtered_df)
# collection 에 저장
collection = create_or_get_collection(path, collection_name)
add_documents(collection, embeded_df)
return len(collection.get().get('ids'))
# kms data loader
# JSON 형태로 반환이 됨
def load_kms_data_by_space(space_name="OSSDevelopment2") :
loader = ConfluenceLoader(
url=settings.kms_url,
username=settings.kms_id,
api_key=settings.kms_password,
space_key=space_name
)
load_result = loader.load()
return load_result
# chunk
def chunk_documents(documents_list) :
merged_df = pd.DataFrame()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=8000, chunk_overlap=1000)
splitted_load_result = text_splitter.split_documents(documents_list)
for doc in splitted_load_result :
document_dict = doc.metadata
document_dict['page_contents'] = doc.page_content
document_df = pd.Series(document_dict).to_frame().T
if(merged_df.empty) :
merged_df = document_df
else :
merged_df = pd.concat([merged_df, document_df])
merged_df = merged_df.reset_index(drop=True)
merged_df['contents_length'] = merged_df['page_contents'].apply(len)
return merged_df
# filter
# contents_length 가 100보다 큰 애들만 새로 담으면서 쓰레기 데이터는 날리는 작업
def filter_dataframe(documents_df) :
target_df = documents_df[documents_df.contents_length > 100].reset_index(drop=True)
class KMSDocument(BaseModel):
use : bool = Field(description = "지식데이터로 활용 가능 여부")
subject : str = Field(description = "첨부 된 데이터의 주제")
content : str = Field(description = "첨부 된 데이터를 RAG에 활용 될 수 있도록 최적화 한 데이터")
def extract_kms_info(content) :
kms_validate_parser = PydanticOutputParser(pydantic_object = KMSDocument)
template = """
사내 kms 정보를 수집 및 RAG(Retrival Argument Generation)에 활용하여 Large Language Model기반의
챗봇을 통해서 제공하려고 합니다. 수집 된 데이터의 활용 가능성 및 주제, 정리된 내용이 필요합니다.
### 아래 입력 데이터 기반으로 1.활용 가능성 / 2.주제 / 3.정리 및 다듬은 내용을 한글로 출력해주세요
{page_content}
### 출력 양식은 아래와 같습니다.
{format_instructions}
"""
extract_prompt = PromptTemplate(
template = template,
input_varibales = ['page_content'],
partial_variables = {"format_instructions" : kms_validate_parser.get_format_instructions()}
)
kms_validation_chain = extract_prompt | llm | kms_validate_parser
try :
result = kms_validation_chain.invoke({"page_content" : content})
return pd.Series({"use":result.use, "subject":result.subject, "content":result.content})
except:
return pd.Series({"use":False, "subject":"except","content":"except"})
def enrich_embedding_column(documents_df) :
documents_df['embedding'] = documents_df.clean_content.apply(lambda text : embedding.embed_query(text))
return documents_df
def create_or_get_collection(path = "./static/kms_data", collection_name = "kms_data") :
chroma_client = chromadb.PersistentClient(path)
collection_name = collection_name
try :
collection = chroma_client.get_collection(collection_name)
except :
collection = chroma_client.create_collection(name = collection_name)
return collection
def add_documents(collection, final_df) :
for idx, row in final_df.iterrows() :
metadata = {
"title" : row['title'],
"id" : row['id'],
"source" : row['source'],
"when" : row['when'],
"use" : row['use'],
"subject" : row['subject'],
}
collection.add(
ids=[str(row['id']) + '-' + str(idx)], #고유id로 사용
embeddings = [row['embedding']],
documents = row['page_contents'],
metadatas = [metadata]
)
print("chroma db 에 데이터가 성공적으로 저장되었습니다.")
참고 1.
RecursiveCharacterTextSplitter 의 split_documents 메소드 까보면
metadata, page_content 형태로 만들어줌
splitted_load_result = [
DocumentChunk(
metadata = {
"id" : "12345", "title" : " ... ", "url" : " ... ", "created_at" : " ...",
...
},
page_content = " This is the content .... "
),
DocumentCunk (...),
...
]
참고 2.
document_dict 구조
document_dict = {
"id"="12345",
"title"="12345",
"url"="12345",
....
"page_contents" : "This is the content ... "
}
**pd.Series(document_dict).to_frame().T 설명
pd.Series
document_dict 를 Pandas Series 로 변환.
Pandas Series 는 1차원 배열로, 딕셔너리의 키가 series의 index, 값이 series의 값이 됨.
이 series 의 표 형태를 dataframe 으로 변환할 준비를 함.
id 12345
title ...
url ..
...
page_contents This is...
to.frame()
siries 를 dataframe 으로 변환.
pandas 의 dataframe 은 표 형태의 데이터 구조이다.
문장 실행 결과
0
id 12345
title ...
url ..
...
page_contents This is...
여기서 dataframe은 열(column)을 가지고 있으며, 그 열 아이디는 0이다.
series 의 각 항목이 행(row) 이 됨
.T (Transpose)
데이터프레임을 전치(Transpose)함.
전치란 ? 행과 열을 뒤바꾸는 작업
문장 실행 결과
id title url ... page_contens
0 12345 ... ... ... This is ...
다시 정리하자면,
1. 딕셔너리를 series 로 변환 (pd.Series(document_dict))
2. series 를 dataframe 으로 변환 (.to_frame())
3. dataframe 을 전치하여 행과 열을 바꿈 (.T)
결국, document_df는 document_dict 의 데이터를 포함하는 하나의 행을 가진 dataFrame이 됨.
이를 통해 청크된 각 문서를 데이터 프레임 형식으로 쉽게 관리하고 병합할 수 있다.
참고 3.
merged_df = pd.concat([merged_df, document_df])
두개의 pandas dataframe을 병합하는 과정임.
pd.concat() 함수는 dataframe 을 이어 붙이는데 사용됨.
참고 4.
merged_df.reset_index(drop=True)
dataframe 의 index를 재설정 하는데 사용
reset_index 메소드는 기존 인덱스를 제거하고, 새로운 순차적 인덱스를 설정
목적 : 데이터프레임을 병합하거나 특정 작업을 수행 후, 인덱스를 깔끔하게 할당하기 위함
drop=True : 기존 인덱스 열을 삭제.
참고 5.
merged_df['contents_length'] = merged_df['page_contens'].apply(len)
page_contens 열에 있는 각 문자열의 길이를 계산하여 contens_length 라는 새로운 열을 추가
.apply(len) : 각 문자열에 대해 len 함수를 적용하여 문자열의 길이를 계산
각 청크의 내용 길이를 나타내는 contents_length 열을 가지고 있게 되어, 이후의 분석이나 처리가 가능하게됨
참고 6.
filtered_df = filtered_df.iloc[:limit_data]
Pandas dataframe에서 특정범위의 행을 선택하는 코드.
목적 : filtered_df dataframe에서 처음 limit_data개의 행만을 선택하여 다시 filtered_df로 저장하는 것. 이를 통해 dataframe의 크기를 제한 할 수 있음.
.iloc[ ] : iloc는 Pandas의 인덱싱방식을 제공하는 메서드. 특정 행과 열을 선택 할 수 있음
[:limit_data] : 슬라이싱 방법. 0부터 limit_data번째 행까지를 선택.
pandas슬라이싱에서 None은 슬라이싱의 끝을 의미함.
참고 7.
pydanticOutputParser
pydantic 라이브러리를 활용하여 json 이나 다른 데이터 형식에서 데이터를 올바르게 추출하고, 이를 정의된 스키마에 따라 구조화 하는데 사용 됨.
python의 데이터 검증 및 설정 관리 라이브러리.
'(ktds) 24.08.05 ~ > 생성형AI' 카테고리의 다른 글
langchain, openai 활용하여 응답 생성하기 (두번의 invoke, history를 이용하여 대화의 흐름 유지) (0) | 2025.01.06 |
---|