- import logging
- import os
- import re
- import json
- import time
- import copy
- import infinity
- from infinity.common import ConflictType, InfinityException, SortType
- from infinity.index import IndexInfo, IndexType
- from infinity.connection_pool import ConnectionPool
- from infinity.errors import ErrorCode
- from rag import settings
- from rag.utils import singleton
- import polars as pl
- from polars.series.series import Series
- from api.utils.file_utils import get_project_base_directory
-
- from rag.utils.doc_store_conn import (
- DocStoreConnection,
- MatchExpr,
- MatchTextExpr,
- MatchDenseExpr,
- FusionExpr,
- OrderByExpr,
- )
-
- logger = logging.getLogger('ragflow.infinity_conn')
-
- def equivalent_condition_to_str(condition: dict) -> str|None:
- assert "_id" not in condition
- cond = list()
- for k, v in condition.items():
- if not isinstance(k, str) or k in ["kb_id"] or not v:
- continue
- if isinstance(v, list):
- inCond = list()
- for item in v:
- if isinstance(item, str):
- inCond.append(f"'{item}'")
- else:
- inCond.append(str(item))
- if inCond:
- strInCond = ", ".join(inCond)
- strInCond = f"{k} IN ({strInCond})"
- cond.append(strInCond)
- elif isinstance(v, str):
- cond.append(f"{k}='{v}'")
- else:
- cond.append(f"{k}={str(v)}")
- return " AND ".join(cond) if cond else "1=1"
-
-
- def concat_dataframes(df_list: list[pl.DataFrame], selectFields: list[str]) -> pl.DataFrame:
- """
- Concatenate multiple dataframes into one.
- """
- df_list = [df for df in df_list if not df.is_empty()]
- if df_list:
- return pl.concat(df_list)
- schema = dict()
- for field_name in selectFields:
- if field_name == 'score()': # Workaround: fix schema is changed to score()
- schema['SCORE'] = str
- else:
- schema[field_name] = str
- return pl.DataFrame(schema=schema)
-
- @singleton
- class InfinityConnection(DocStoreConnection):
- def __init__(self):
- self.dbName = settings.INFINITY.get("db_name", "default_db")
- infinity_uri = settings.INFINITY["uri"]
- if ":" in infinity_uri:
- host, port = infinity_uri.split(":")
- infinity_uri = infinity.common.NetworkAddress(host, int(port))
- self.connPool = None
- logger.info(f"Use Infinity {infinity_uri} as the doc engine.")
- for _ in range(24):
- try:
- connPool = ConnectionPool(infinity_uri)
- inf_conn = connPool.get_conn()
- res = inf_conn.show_current_node()
- if res.error_code == ErrorCode.OK and res.server_status=="started":
- self._migrate_db(inf_conn)
- self.connPool = connPool
- connPool.release_conn(inf_conn)
- break
- connPool.release_conn(inf_conn)
- logger.warn(f"Infinity status: {res.server_status}. Waiting Infinity {infinity_uri} to be healthy.")
- time.sleep(5)
- except Exception as e:
- logger.warning(f"{str(e)}. Waiting Infinity {infinity_uri} to be healthy.")
- time.sleep(5)
- if self.connPool is None:
- msg = f"Infinity {infinity_uri} didn't become healthy in 120s."
- logger.error(msg)
- raise Exception(msg)
- logger.info(f"Infinity {infinity_uri} is healthy.")
-
- def _migrate_db(self, inf_conn):
- inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
- fp_mapping = os.path.join(
- get_project_base_directory(), "conf", "infinity_mapping.json"
- )
- if not os.path.exists(fp_mapping):
- raise Exception(f"Mapping file not found at {fp_mapping}")
- schema = json.load(open(fp_mapping))
- table_names = inf_db.list_tables().table_names
- for table_name in table_names:
- inf_table = inf_db.get_table(table_name)
- index_names = inf_table.list_indexes().index_names
- if "q_vec_idx" not in index_names:
- # Skip tables not created by me
- continue
- column_names = inf_table.show_columns()["name"]
- column_names = set(column_names)
- for field_name, field_info in schema.items():
- if field_name in column_names:
- continue
- res = inf_table.add_columns({field_name: field_info})
- assert res.error_code == infinity.ErrorCode.OK
- logger.info(
- f"INFINITY added following column to table {table_name}: {field_name} {field_info}"
- )
- if field_info["type"] != "varchar" or "analyzer" not in field_info:
- continue
- inf_table.create_index(
- f"text_idx_{field_name}",
- IndexInfo(
- field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}
- ),
- ConflictType.Ignore,
- )
-
- """
- Database operations
- """
-
- def dbType(self) -> str:
- return "infinity"
-
- def health(self) -> dict:
- """
- Return the health status of the database.
- """
- inf_conn = self.connPool.get_conn()
- res = inf_conn.show_current_node()
- self.connPool.release_conn(inf_conn)
- res2 = {
- "type": "infinity",
- "status": "green" if res.error_code == 0 and res.server_status == "started" else "red",
- "error": res.error_msg,
- }
- return res2
-
- """
- Table operations
- """
-
- def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int):
- table_name = f"{indexName}_{knowledgebaseId}"
- inf_conn = self.connPool.get_conn()
- inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
-
- fp_mapping = os.path.join(
- get_project_base_directory(), "conf", "infinity_mapping.json"
- )
- if not os.path.exists(fp_mapping):
- raise Exception(f"Mapping file not found at {fp_mapping}")
- schema = json.load(open(fp_mapping))
- vector_name = f"q_{vectorSize}_vec"
- schema[vector_name] = {"type": f"vector,{vectorSize},float"}
- inf_table = inf_db.create_table(
- table_name,
- schema,
- ConflictType.Ignore,
- )
- inf_table.create_index(
- "q_vec_idx",
- IndexInfo(
- vector_name,
- IndexType.Hnsw,
- {
- "M": "16",
- "ef_construction": "50",
- "metric": "cosine",
- "encode": "lvq",
- },
- ),
- ConflictType.Ignore,
- )
- for field_name, field_info in schema.items():
- if field_info["type"] != "varchar" or "analyzer" not in field_info:
- continue
- inf_table.create_index(
- f"text_idx_{field_name}",
- IndexInfo(
- field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}
- ),
- ConflictType.Ignore,
- )
- self.connPool.release_conn(inf_conn)
- logger.info(
- f"INFINITY created table {table_name}, vector size {vectorSize}"
- )
-
- def deleteIdx(self, indexName: str, knowledgebaseId: str):
- table_name = f"{indexName}_{knowledgebaseId}"
- inf_conn = self.connPool.get_conn()
- db_instance = inf_conn.get_database(self.dbName)
- db_instance.drop_table(table_name, ConflictType.Ignore)
- self.connPool.release_conn(inf_conn)
- logger.info(f"INFINITY dropped table {table_name}")
-
- def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
- table_name = f"{indexName}_{knowledgebaseId}"
- try:
- inf_conn = self.connPool.get_conn()
- db_instance = inf_conn.get_database(self.dbName)
- _ = db_instance.get_table(table_name)
- self.connPool.release_conn(inf_conn)
- return True
- except Exception as e:
- logger.warning(f"INFINITY indexExist {str(e)}")
- return False
-
- """
- CRUD operations
- """
-
- def search(
- self,
- selectFields: list[str],
- highlightFields: list[str],
- condition: dict,
- matchExprs: list[MatchExpr],
- orderBy: OrderByExpr,
- offset: int,
- limit: int,
- indexNames: str | list[str],
- knowledgebaseIds: list[str],
- ) -> tuple[pl.DataFrame, int]:
- """
- TODO: Infinity doesn't provide highlight
- """
- if isinstance(indexNames, str):
- indexNames = indexNames.split(",")
- assert isinstance(indexNames, list) and len(indexNames) > 0
- inf_conn = self.connPool.get_conn()
- db_instance = inf_conn.get_database(self.dbName)
- df_list = list()
- table_list = list()
- for essential_field in ["id"]:
- if essential_field not in selectFields:
- selectFields.append(essential_field)
- if matchExprs:
- for essential_field in ["score()", "pagerank_fea"]:
- selectFields.append(essential_field)
-
- # Prepare expressions common to all tables
- filter_cond = None
- filter_fulltext = ""
- if condition:
- filter_cond = equivalent_condition_to_str(condition)
- for matchExpr in matchExprs:
- if isinstance(matchExpr, MatchTextExpr):
- if filter_cond and "filter" not in matchExpr.extra_options:
- matchExpr.extra_options.update({"filter": filter_cond})
- fields = ",".join(matchExpr.fields)
- filter_fulltext = f"filter_fulltext('{fields}', '{matchExpr.matching_text}')"
- if filter_cond:
- filter_fulltext = f"({filter_cond}) AND {filter_fulltext}"
- minimum_should_match = matchExpr.extra_options.get("minimum_should_match", 0.0)
- if isinstance(minimum_should_match, float):
- str_minimum_should_match = str(int(minimum_should_match * 100)) + "%"
- matchExpr.extra_options["minimum_should_match"] = str_minimum_should_match
- for k, v in matchExpr.extra_options.items():
- if not isinstance(v, str):
- matchExpr.extra_options[k] = str(v)
- logger.debug(f"INFINITY search MatchTextExpr: {json.dumps(matchExpr.__dict__)}")
- elif isinstance(matchExpr, MatchDenseExpr):
- if filter_cond and "filter" not in matchExpr.extra_options:
- matchExpr.extra_options.update({"filter": filter_fulltext})
- for k, v in matchExpr.extra_options.items():
- if not isinstance(v, str):
- matchExpr.extra_options[k] = str(v)
- logger.debug(f"INFINITY search MatchDenseExpr: {json.dumps(matchExpr.__dict__)}")
- elif isinstance(matchExpr, FusionExpr):
- logger.debug(f"INFINITY search FusionExpr: {json.dumps(matchExpr.__dict__)}")
-
- order_by_expr_list = list()
- if orderBy.fields:
- for order_field in orderBy.fields:
- if order_field[1] == 0:
- order_by_expr_list.append((order_field[0], SortType.Asc))
- else:
- order_by_expr_list.append((order_field[0], SortType.Desc))
-
- total_hits_count = 0
- # Scatter search tables and gather the results
- for indexName in indexNames:
- for knowledgebaseId in knowledgebaseIds:
- table_name = f"{indexName}_{knowledgebaseId}"
- try:
- table_instance = db_instance.get_table(table_name)
- except Exception:
- continue
- table_list.append(table_name)
- builder = table_instance.output(selectFields)
- if len(matchExprs) > 0:
- for matchExpr in matchExprs:
- if isinstance(matchExpr, MatchTextExpr):
- fields = ",".join(matchExpr.fields)
- builder = builder.match_text(
- fields,
- matchExpr.matching_text,
- matchExpr.topn,
- matchExpr.extra_options,
- )
- elif isinstance(matchExpr, MatchDenseExpr):
- builder = builder.match_dense(
- matchExpr.vector_column_name,
- matchExpr.embedding_data,
- matchExpr.embedding_data_type,
- matchExpr.distance_type,
- matchExpr.topn,
- matchExpr.extra_options,
- )
- elif isinstance(matchExpr, FusionExpr):
- builder = builder.fusion(
- matchExpr.method, matchExpr.topn, matchExpr.fusion_params
- )
- else:
- if len(filter_cond) > 0:
- builder.filter(filter_cond)
- if orderBy.fields:
- builder.sort(order_by_expr_list)
- builder.offset(offset).limit(limit)
- kb_res, extra_result = builder.option({"total_hits_count": True}).to_pl()
- if extra_result:
- total_hits_count += int(extra_result["total_hits_count"])
- logger.debug(f"INFINITY search table: {str(table_name)}, result: {str(kb_res)}")
- df_list.append(kb_res)
- self.connPool.release_conn(inf_conn)
- res = concat_dataframes(df_list, selectFields)
- if matchExprs:
- res = res.sort(pl.col("SCORE") + pl.col("pagerank_fea"), descending=True, maintain_order=True)
- res = res.limit(limit)
- logger.debug(f"INFINITY search final result: {str(res)}")
- return res, total_hits_count
-
- def get(
- self, chunkId: str, indexName: str, knowledgebaseIds: list[str]
- ) -> dict | None:
- inf_conn = self.connPool.get_conn()
- db_instance = inf_conn.get_database(self.dbName)
- df_list = list()
- assert isinstance(knowledgebaseIds, list)
- table_list = list()
- for knowledgebaseId in knowledgebaseIds:
- table_name = f"{indexName}_{knowledgebaseId}"
- table_list.append(table_name)
- table_instance = db_instance.get_table(table_name)
- kb_res, _ = table_instance.output(["*"]).filter(f"id = '{chunkId}'").to_pl()
- logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(kb_res)}")
- df_list.append(kb_res)
- self.connPool.release_conn(inf_conn)
- res = concat_dataframes(df_list, ["id"])
- res_fields = self.getFields(res, res.columns)
- return res_fields.get(chunkId, None)
-
- def insert(
- self, documents: list[dict], indexName: str, knowledgebaseId: str
- ) -> list[str]:
- inf_conn = self.connPool.get_conn()
- db_instance = inf_conn.get_database(self.dbName)
- table_name = f"{indexName}_{knowledgebaseId}"
- try:
- table_instance = db_instance.get_table(table_name)
- except InfinityException as e:
- # src/common/status.cppm, kTableNotExist = 3022
- if e.error_code != ErrorCode.TABLE_NOT_EXIST:
- raise
- vector_size = 0
- patt = re.compile(r"q_(?P<vector_size>\d+)_vec")
- for k in documents[0].keys():
- m = patt.match(k)
- if m:
- vector_size = int(m.group("vector_size"))
- break
- if vector_size == 0:
- raise ValueError("Cannot infer vector size from documents")
- self.createIdx(indexName, knowledgebaseId, vector_size)
- table_instance = db_instance.get_table(table_name)
-
- docs = copy.deepcopy(documents)
- for d in docs:
- assert "_id" not in d
- assert "id" in d
- for k, v in d.items():
- if k in ["important_kwd", "question_kwd", "entities_kwd"]:
- assert isinstance(v, list)
- d[k] = "###".join(v)
- elif k == 'kb_id':
- if isinstance(d[k], list):
- d[k] = d[k][0] # since d[k] is a list, but we need a str
- elif k == "position_int":
- assert isinstance(v, list)
- arr = [num for row in v for num in row]
- d[k] = "_".join(f"{num:08x}" for num in arr)
- elif k in ["page_num_int", "top_int"]:
- assert isinstance(v, list)
- d[k] = "_".join(f"{num:08x}" for num in v)
- ids = ["'{}'".format(d["id"]) for d in docs]
- str_ids = ", ".join(ids)
- str_filter = f"id IN ({str_ids})"
- table_instance.delete(str_filter)
- # for doc in documents:
- # logger.info(f"insert position_int: {doc['position_int']}")
- # logger.info(f"InfinityConnection.insert {json.dumps(documents)}")
- table_instance.insert(docs)
- self.connPool.release_conn(inf_conn)
- logger.debug(f"INFINITY inserted into {table_name} {str_ids}.")
- return []
-
- def update(
- self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str
- ) -> bool:
- # if 'position_int' in newValue:
- # logger.info(f"update position_int: {newValue['position_int']}")
- inf_conn = self.connPool.get_conn()
- db_instance = inf_conn.get_database(self.dbName)
- table_name = f"{indexName}_{knowledgebaseId}"
- table_instance = db_instance.get_table(table_name)
- if "exist" in condition:
- del condition["exist"]
- filter = equivalent_condition_to_str(condition)
- for k, v in list(newValue.items()):
- if k.endswith("_kwd") and isinstance(v, list):
- newValue[k] = " ".join(v)
- elif k == 'kb_id':
- if isinstance(newValue[k], list):
- newValue[k] = newValue[k][0] # since d[k] is a list, but we need a str
- elif k == "position_int":
- assert isinstance(v, list)
- arr = [num for row in v for num in row]
- newValue[k] = "_".join(f"{num:08x}" for num in arr)
- elif k in ["page_num_int", "top_int"]:
- assert isinstance(v, list)
- newValue[k] = "_".join(f"{num:08x}" for num in v)
- elif k == "remove" and v in ["pagerank_fea"]:
- del newValue[k]
- newValue[v] = 0
- logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.")
- table_instance.update(filter, newValue)
- self.connPool.release_conn(inf_conn)
- return True
-
- def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int:
- inf_conn = self.connPool.get_conn()
- db_instance = inf_conn.get_database(self.dbName)
- table_name = f"{indexName}_{knowledgebaseId}"
- filter = equivalent_condition_to_str(condition)
- try:
- table_instance = db_instance.get_table(table_name)
- except Exception:
- logger.warning(
- f"Skipped deleting `{filter}` from table {table_name} since the table doesn't exist."
- )
- return 0
- logger.debug(f"INFINITY delete table {table_name}, filter {filter}.")
- res = table_instance.delete(filter)
- self.connPool.release_conn(inf_conn)
- return res.deleted_rows
-
- """
- Helper functions for search result
- """
-
- def getTotal(self, res: tuple[pl.DataFrame, int] | pl.DataFrame) -> int:
- if isinstance(res, tuple):
- return res[1]
- return len(res)
-
- def getChunkIds(self, res: tuple[pl.DataFrame, int] | pl.DataFrame) -> list[str]:
- if isinstance(res, tuple):
- res = res[0]
- return list(res["id"])
-
- def getFields(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, fields: list[str]) -> list[str, dict]:
- if isinstance(res, tuple):
- res = res[0]
- res_fields = {}
- if not fields:
- return {}
- num_rows = len(res)
- column_id = res["id"]
- for i in range(num_rows):
- id = column_id[i]
- m = {"id": id}
- for fieldnm in fields:
- if fieldnm not in res:
- m[fieldnm] = None
- continue
- v = res[fieldnm][i]
- if isinstance(v, Series):
- v = list(v)
- elif fieldnm in ["important_kwd", "question_kwd", "entities_kwd"]:
- assert isinstance(v, str)
- v = [kwd for kwd in v.split("###") if kwd]
- elif fieldnm == "position_int":
- assert isinstance(v, str)
- if v:
- arr = [int(hex_val, 16) for hex_val in v.split('_')]
- v = [arr[i:i + 5] for i in range(0, len(arr), 5)]
- else:
- v = []
- elif fieldnm in ["page_num_int", "top_int"]:
- assert isinstance(v, str)
- if v:
- v = [int(hex_val, 16) for hex_val in v.split('_')]
- else:
- v = []
- else:
- if not isinstance(v, str):
- v = str(v)
- # if fieldnm.endswith("_tks"):
- # v = rmSpace(v)
- m[fieldnm] = v
- res_fields[id] = m
- return res_fields
-
- def getHighlight(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, keywords: list[str], fieldnm: str):
- if isinstance(res, tuple):
- res = res[0]
- ans = {}
- num_rows = len(res)
- column_id = res["id"]
- for i in range(num_rows):
- id = column_id[i]
- txt = res[fieldnm][i]
- txt = re.sub(r"[\r\n]", " ", txt, flags=re.IGNORECASE | re.MULTILINE)
- txts = []
- for t in re.split(r"[.?!;\n]", txt):
- for w in keywords:
- t = re.sub(
- r"(^|[ .?/'\"\(\)!,:;-])(%s)([ .?/'\"\(\)!,:;-])"
- % re.escape(w),
- r"\1<em>\2</em>\3",
- t,
- flags=re.IGNORECASE | re.MULTILINE,
- )
- if not re.search(
- r"<em>[^<>]+</em>", t, flags=re.IGNORECASE | re.MULTILINE
- ):
- continue
- txts.append(t)
- ans[id] = "...".join(txts)
- return ans
-
- def getAggregation(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, fieldnm: str):
- """
- TODO: Infinity doesn't provide aggregation
- """
- return list()
-
- """
- SQL
- """
-
- def sql(sql: str, fetch_size: int, format: str):
- raise NotImplementedError("Not implemented")
|