| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 | 
							- import re
 - from typing import Type
 - 
 - from flask import current_app
 - from langchain.embeddings import OpenAIEmbeddings
 - from langchain.tools import BaseTool
 - from pydantic import Field, BaseModel
 - 
 - from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
 - from core.embedding.cached_embedding import CacheEmbedding
 - from core.index.keyword_table_index.keyword_table_index import KeywordTableIndex, KeywordTableConfig
 - from core.index.vector_index.vector_index import VectorIndex
 - from core.llm.llm_builder import LLMBuilder
 - from extensions.ext_database import db
 - from models.dataset import Dataset, DocumentSegment
 - 
 - 
 - class DatasetRetrieverToolInput(BaseModel):
 -     dataset_id: str = Field(..., description="ID of dataset to be queried. MUST be UUID format.")
 -     query: str = Field(..., description="Query for the dataset to be used to retrieve the dataset.")
 - 
 - 
 - class DatasetRetrieverTool(BaseTool):
 -     """Tool for querying a Dataset."""
 -     name: str = "dataset"
 -     args_schema: Type[BaseModel] = DatasetRetrieverToolInput
 -     description: str = "use this to retrieve a dataset. "
 - 
 -     tenant_id: str
 -     dataset_id: str
 -     k: int = 3
 - 
 -     @classmethod
 -     def from_dataset(cls, dataset: Dataset, **kwargs):
 -         description = dataset.description
 -         if not description:
 -             description = 'useful for when you want to answer queries about the ' + dataset.name
 - 
 -         description = description.replace('\n', '').replace('\r', '')
 -         description += '\nID of dataset MUST be ' + dataset.id
 -         return cls(
 -             tenant_id=dataset.tenant_id,
 -             dataset_id=dataset.id,
 -             description=description,
 -             **kwargs
 -         )
 - 
 -     def _run(self, dataset_id: str, query: str) -> str:
 -         pattern = r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b'
 -         match = re.search(pattern, dataset_id, re.IGNORECASE)
 -         if match:
 -             dataset_id = match.group()
 - 
 -         dataset = db.session.query(Dataset).filter(
 -             Dataset.tenant_id == self.tenant_id,
 -             Dataset.id == dataset_id
 -         ).first()
 - 
 -         if not dataset:
 -             return f'[{self.name} failed to find dataset with id {dataset_id}.]'
 - 
 -         if dataset.indexing_technique == "economy":
 -             # use keyword table query
 -             kw_table_index = KeywordTableIndex(
 -                 dataset=dataset,
 -                 config=KeywordTableConfig(
 -                     max_keywords_per_chunk=5
 -                 )
 -             )
 - 
 -             documents = kw_table_index.search(query, search_kwargs={'k': self.k})
 -             return str("\n".join([document.page_content for document in documents]))
 -         else:
 -             model_credentials = LLMBuilder.get_model_credentials(
 -                 tenant_id=dataset.tenant_id,
 -                 model_provider=LLMBuilder.get_default_provider(dataset.tenant_id, 'text-embedding-ada-002'),
 -                 model_name='text-embedding-ada-002'
 -             )
 - 
 -             embeddings = CacheEmbedding(OpenAIEmbeddings(
 -                 **model_credentials
 -             ))
 - 
 -             vector_index = VectorIndex(
 -                 dataset=dataset,
 -                 config=current_app.config,
 -                 embeddings=embeddings
 -             )
 - 
 -             if self.k > 0:
 -                 documents = vector_index.search(
 -                     query,
 -                     search_type='similarity',
 -                     search_kwargs={
 -                         'k': self.k
 -                     }
 -                 )
 -             else:
 -                 documents = []
 - 
 -             hit_callback = DatasetIndexToolCallbackHandler(dataset.id)
 -             hit_callback.on_tool_end(documents)
 -             document_context_list = []
 -             index_node_ids = [document.metadata['doc_id'] for document in documents]
 -             segments = DocumentSegment.query.filter(DocumentSegment.completed_at.isnot(None),
 -                                                     DocumentSegment.status == 'completed',
 -                                                     DocumentSegment.enabled == True,
 -                                                     DocumentSegment.index_node_id.in_(index_node_ids)
 -                                                     ).all()
 - 
 -             if segments:
 -                 index_node_id_to_position = {id: position for position, id in enumerate(index_node_ids)}
 -                 sorted_segments = sorted(segments,
 -                                          key=lambda segment: index_node_id_to_position.get(segment.index_node_id,
 -                                                                                            float('inf')))
 -                 for segment in sorted_segments:
 -                     if segment.answer:
 -                         document_context_list.append(f'question:{segment.content} \nanswer:{segment.answer}')
 -                     else:
 -                         document_context_list.append(segment.content)
 - 
 -             return str("\n".join(document_context_list))
 - 
 -     async def _arun(self, tool_input: str) -> str:
 -         raise NotImplementedError()
 
 
  |