|
import gradio as gr |
|
import os |
|
import time |
|
|
|
import sys |
|
import pinecone |
|
|
|
from openai import OpenAI |
|
from langchain_pinecone import PineconeVectorStore |
|
from pinecone import Pinecone, ServerlessSpec |
|
|
|
from langchain_openai import ChatOpenAI |
|
from langchain_openai import OpenAIEmbeddings |
|
|
|
from langchain.chains import RetrievalQA |
|
|
|
from langchain_community.document_loaders import PyPDFLoader |
|
from langchain_text_splitters import CharacterTextSplitter |
|
|
|
import glob |
|
|
|
|
|
from langdetect import detect |
|
from googletrans import Translator |
|
|
|
class CNC_QA: |
|
def __init__(self): |
|
print("Initialing CNC_QA ") |
|
os.environ['PYTHINTRACEMALLOC'] = '1' |
|
|
|
|
|
|
|
self.PINECONE_INDEX ="meldas" |
|
self.PINECONE_ENV = "gcp-starter" |
|
|
|
|
|
self.add_files=False |
|
|
|
self.files=glob.glob(f'./Doc/*') |
|
|
|
self.vectorstore = self.initialize_vectorstore(index_name=self.PINECONE_INDEX) |
|
|
|
if self.add_files==True: |
|
self.add_documents() |
|
|
|
llm = self.load_llm(model_id="gpt-4o") |
|
|
|
self.bot = RetrievalQA.from_chain_type( |
|
llm=llm, |
|
chain_type='stuff', |
|
retriever=self.vectorstore.as_retriever(), |
|
return_source_documents=True |
|
) |
|
|
|
def load_embedding_model(self,model_name): |
|
print(f'loading embedding model:{model_name}') |
|
embeddings = OpenAIEmbeddings( |
|
model=model_name, |
|
) |
|
return embeddings |
|
|
|
def initialize_vectorstore(self,index_name): |
|
model_name = "text-embedding-3-small" |
|
embeddings = self.load_embedding_model(model_name=model_name) |
|
|
|
print(f'loading vectorstore:{index_name}') |
|
self.pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY")) |
|
|
|
existing_indexes = [index_info["name"] for index_info in self.pc.list_indexes()] |
|
if index_name not in existing_indexes: |
|
print(f'Index:{self.PINECONE_INDEX} is not found....') |
|
print(f'Creating new Index:{self.PINECONE_INDEX}') |
|
self.add_files=True |
|
self.pc.create_index( |
|
name=self.PINECONE_INDEX, |
|
dimension=1536, |
|
metric="cosine", |
|
spec=ServerlessSpec(cloud="aws", region="us-east-1"), |
|
) |
|
while not self.pc.describe_index(self.PINECONE_INDEX).status["ready"]: |
|
time.sleep(1) |
|
print(f'Created new Index:{self.PINECONE_INDEX}') |
|
|
|
self.show_index() |
|
index = self.pc.Index(self.PINECONE_INDEX) |
|
vectorstore = PineconeVectorStore(index=index, embedding=embeddings) |
|
return vectorstore |
|
|
|
def show_index(self): |
|
print(f'detail of Index:{self.PINECONE_INDEX}') |
|
index = self.pc.Index(self.PINECONE_INDEX) |
|
while not self.pc.describe_index(self.PINECONE_INDEX).status["ready"]: |
|
time.sleep(1) |
|
print(index.describe_index_stats()) |
|
|
|
def delete_documents(self): |
|
print(f'delete documents.....') |
|
self.pc.delete_index(self.PINECONE_INDEX) |
|
self.show_index |
|
|
|
def add_documents(self): |
|
print(f'add documents.....') |
|
for i,file in enumerate(self.files): |
|
print(f'{i+1}/{len(self.files)}:{file}') |
|
loader = PyPDFLoader(file) |
|
documents = loader.load_and_split() |
|
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100) |
|
docs = text_splitter.split_documents(documents) |
|
self.vectorstore.add_documents(documents=docs) |
|
print('Documents are loaded on database') |
|
self.show_index() |
|
|
|
def load_llm(self,model_id): |
|
print(f'load llm:{model_id}') |
|
llm = ChatOpenAI( |
|
model=model_id, |
|
temperature=0, |
|
max_tokens=None, |
|
timeout=None, |
|
max_retries=2, |
|
) |
|
return llm |
|
|
|
def echo(self,message,history): |
|
|
|
|
|
if message == 'meldas_del': |
|
self.delete_documents() |
|
elif message == "Who are you?": |
|
message = "I am MELDAS. I will answer your question about Mitsubishi CNC" |
|
else: |
|
text_en ,lang_original = self.translate_message(message,'en') |
|
ans = self.bot(text_en) |
|
|
|
try: |
|
result,lang = self.translate_message(ans['result'],lang_original) |
|
except: |
|
result='' |
|
try: |
|
source = ans['source_documents'][0].metadata['source'] |
|
except: |
|
source = '' |
|
try: |
|
page_content = ans['source_documents'][0].page_content |
|
except: |
|
page_content = '' |
|
|
|
OtherSource = '' |
|
for source_documents in ans['source_documents']: |
|
OtherSource += source_documents.metadata['source'] + '\n' |
|
|
|
message = result + '\n\n' + '■Document\n' + source + '\n\n' + '■Page\n' + page_content + '\n\n' + '■References Documents\n' + OtherSource |
|
|
|
return message |
|
|
|
|
|
def translate_message(self,message,lang_dest): |
|
import asyncio |
|
|
|
async def translate_text(text,dest='en'): |
|
async with Translator() as translator: |
|
result = await translator.translate(text, dest = dest) |
|
print(result) |
|
return result.text,result.src |
|
|
|
|
|
text, lang = asyncio.run(translate_text(text = message,dest=lang_dest)) |
|
|
|
print(f'元言語:{lang} -> 翻訳言語:{lang_dest}') |
|
print(message) |
|
print(text) |
|
return text, lang |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
print("start") |
|
|
|
meldas = CNC_QA() |
|
|
|
demo = gr.ChatInterface(fn=meldas.echo, examples=["Who are you?"], title="MELDAS AI") |
|
|
|
demo.launch(debug=True) |
|
|