- import logging
 - import os
 - import re
 - import json
 - import time
 - import copy
 - import infinity
 - from infinity.common import ConflictType, InfinityException, SortType
 - from infinity.index import IndexInfo, IndexType
 - from infinity.connection_pool import ConnectionPool
 - from infinity.errors import ErrorCode
 - from rag import settings
 - from rag.utils import singleton
 - import polars as pl
 - from polars.series.series import Series
 - from api.utils.file_utils import get_project_base_directory
 - 
 - from rag.utils.doc_store_conn import (
 -     DocStoreConnection,
 -     MatchExpr,
 -     MatchTextExpr,
 -     MatchDenseExpr,
 -     FusionExpr,
 -     OrderByExpr,
 - )
 - 
 - logger = logging.getLogger('ragflow.infinity_conn')
 - 
 - def equivalent_condition_to_str(condition: dict) -> str|None:
 -     assert "_id" not in condition
 -     cond = list()
 -     for k, v in condition.items():
 -         if not isinstance(k, str) or k in ["kb_id"] or not v:
 -             continue
 -         if isinstance(v, list):
 -             inCond = list()
 -             for item in v:
 -                 if isinstance(item, str):
 -                     inCond.append(f"'{item}'")
 -                 else:
 -                     inCond.append(str(item))
 -             if inCond:
 -                 strInCond = ", ".join(inCond)
 -                 strInCond = f"{k} IN ({strInCond})"
 -                 cond.append(strInCond)
 -         elif isinstance(v, str):
 -             cond.append(f"{k}='{v}'")
 -         else:
 -             cond.append(f"{k}={str(v)}")
 -     return " AND ".join(cond) if cond else "1=1"
 - 
 - 
 - def concat_dataframes(df_list: list[pl.DataFrame], selectFields: list[str]) -> pl.DataFrame:
 -     """
 -     Concatenate multiple dataframes into one.
 -     """
 -     df_list = [df for df in df_list if not df.is_empty()]
 -     if df_list:
 -         return pl.concat(df_list)
 -     schema = dict()
 -     for fieldnm in selectFields:
 -         schema[fieldnm] = str
 -     return pl.DataFrame(schema=schema)
 - 
 - @singleton
 - class InfinityConnection(DocStoreConnection):
 -     def __init__(self):
 -         self.dbName = settings.INFINITY.get("db_name", "default_db")
 -         infinity_uri = settings.INFINITY["uri"]
 -         if ":" in infinity_uri:
 -             host, port = infinity_uri.split(":")
 -             infinity_uri = infinity.common.NetworkAddress(host, int(port))
 -         self.connPool = None
 -         logger.info(f"Use Infinity {infinity_uri} as the doc engine.")
 -         for _ in range(24):
 -             try:
 -                 connPool = ConnectionPool(infinity_uri)
 -                 inf_conn = connPool.get_conn()
 -                 res = inf_conn.show_current_node()
 -                 if res.error_code == ErrorCode.OK and res.server_status=="started":
 -                     self._migrate_db(inf_conn)
 -                     self.connPool = connPool
 -                     connPool.release_conn(inf_conn)
 -                     break
 -                 connPool.release_conn(inf_conn)
 -                 logger.warn(f"Infinity status: {res.server_status}. Waiting Infinity {infinity_uri} to be healthy.")
 -                 time.sleep(5)
 -             except Exception as e:
 -                 logger.warning(f"{str(e)}. Waiting Infinity {infinity_uri} to be healthy.")
 -                 time.sleep(5)
 -         if self.connPool is None:
 -             msg = f"Infinity {infinity_uri} didn't become healthy in 120s."
 -             logger.error(msg)
 -             raise Exception(msg)
 -         logger.info(f"Infinity {infinity_uri} is healthy.")
 - 
 -     def _migrate_db(self, inf_conn):
 -         inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
 -         fp_mapping = os.path.join(
 -             get_project_base_directory(), "conf", "infinity_mapping.json"
 -         )
 -         if not os.path.exists(fp_mapping):
 -             raise Exception(f"Mapping file not found at {fp_mapping}")
 -         schema = json.load(open(fp_mapping))
 -         table_names = inf_db.list_tables().table_names
 -         for table_name in table_names:
 -             inf_table = inf_db.get_table(table_name)
 -             index_names = inf_table.list_indexes().index_names
 -             if "q_vec_idx" not in index_names:
 -                 # Skip tables not created by me
 -                 continue
 -             column_names = inf_table.show_columns()["name"]
 -             column_names = set(column_names)
 -             for field_name, field_info in schema.items():
 -                 if field_name in column_names:
 -                     continue
 -                 res = inf_table.add_columns({field_name: field_info})
 -                 assert res.error_code == infinity.ErrorCode.OK
 -                 logger.info(
 -                     f"INFINITY added following column to table {table_name}: {field_name} {field_info}"
 -                 )
 -                 if field_info["type"] != "varchar" or "analyzer" not in field_info:
 -                     continue
 -                 inf_table.create_index(
 -                     f"text_idx_{field_name}",
 -                     IndexInfo(
 -                         field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}
 -                     ),
 -                     ConflictType.Ignore,
 -                 )
 - 
 -     """
 -     Database operations
 -     """
 - 
 -     def dbType(self) -> str:
 -         return "infinity"
 - 
 -     def health(self) -> dict:
 -         """
 -         Return the health status of the database.
 -         TODO: Infinity-sdk provides health() to wrap `show global variables` and `show tables`
 -         """
 -         inf_conn = self.connPool.get_conn()
 -         res = inf_conn.show_current_node()
 -         self.connPool.release_conn(inf_conn)
 -         res2 = {
 -             "type": "infinity",
 -             "status": "green" if res.error_code == 0 and res.server_status == "started" else "red",
 -             "error": res.error_msg,
 -         }
 -         return res2
 - 
 -     """
 -     Table operations
 -     """
 - 
 -     def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int):
 -         table_name = f"{indexName}_{knowledgebaseId}"
 -         inf_conn = self.connPool.get_conn()
 -         inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
 - 
 -         fp_mapping = os.path.join(
 -             get_project_base_directory(), "conf", "infinity_mapping.json"
 -         )
 -         if not os.path.exists(fp_mapping):
 -             raise Exception(f"Mapping file not found at {fp_mapping}")
 -         schema = json.load(open(fp_mapping))
 -         vector_name = f"q_{vectorSize}_vec"
 -         schema[vector_name] = {"type": f"vector,{vectorSize},float"}
 -         inf_table = inf_db.create_table(
 -             table_name,
 -             schema,
 -             ConflictType.Ignore,
 -         )
 -         inf_table.create_index(
 -             "q_vec_idx",
 -             IndexInfo(
 -                 vector_name,
 -                 IndexType.Hnsw,
 -                 {
 -                     "M": "16",
 -                     "ef_construction": "50",
 -                     "metric": "cosine",
 -                     "encode": "lvq",
 -                 },
 -             ),
 -             ConflictType.Ignore,
 -         )
 -         for field_name, field_info in schema.items():
 -             if field_info["type"] != "varchar" or "analyzer" not in field_info:
 -                 continue
 -             inf_table.create_index(
 -                 f"text_idx_{field_name}",
 -                 IndexInfo(
 -                     field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}
 -                 ),
 -                 ConflictType.Ignore,
 -             )
 -         self.connPool.release_conn(inf_conn)
 -         logger.info(
 -             f"INFINITY created table {table_name}, vector size {vectorSize}"
 -         )
 - 
 -     def deleteIdx(self, indexName: str, knowledgebaseId: str):
 -         table_name = f"{indexName}_{knowledgebaseId}"
 -         inf_conn = self.connPool.get_conn()
 -         db_instance = inf_conn.get_database(self.dbName)
 -         db_instance.drop_table(table_name, ConflictType.Ignore)
 -         self.connPool.release_conn(inf_conn)
 -         logger.info(f"INFINITY dropped table {table_name}")
 - 
 -     def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
 -         table_name = f"{indexName}_{knowledgebaseId}"
 -         try:
 -             inf_conn = self.connPool.get_conn()
 -             db_instance = inf_conn.get_database(self.dbName)
 -             _ = db_instance.get_table(table_name)
 -             self.connPool.release_conn(inf_conn)
 -             return True
 -         except Exception as e:
 -             logger.warning(f"INFINITY indexExist {str(e)}")
 -         return False
 - 
 -     """
 -     CRUD operations
 -     """
 - 
 -     def search(
 -             self,
 -             selectFields: list[str],
 -             highlightFields: list[str],
 -             condition: dict,
 -             matchExprs: list[MatchExpr],
 -             orderBy: OrderByExpr,
 -             offset: int,
 -             limit: int,
 -             indexNames: str | list[str],
 -             knowledgebaseIds: list[str],
 -     ) -> tuple[pl.DataFrame, int]:
 -         """
 -         TODO: Infinity doesn't provide highlight
 -         """
 -         if isinstance(indexNames, str):
 -             indexNames = indexNames.split(",")
 -         assert isinstance(indexNames, list) and len(indexNames) > 0
 -         inf_conn = self.connPool.get_conn()
 -         db_instance = inf_conn.get_database(self.dbName)
 -         df_list = list()
 -         table_list = list()
 -         for essential_field in ["id"]:
 -             if essential_field not in selectFields:
 -                 selectFields.append(essential_field)
 -         if matchExprs:
 -             for essential_field in ["score()", "pagerank_fea"]:
 -                 selectFields.append(essential_field)
 - 
 -         # Prepare expressions common to all tables
 -         filter_cond = None
 -         filter_fulltext = ""
 -         if condition:
 -             filter_cond = equivalent_condition_to_str(condition)
 -         for matchExpr in matchExprs:
 -             if isinstance(matchExpr, MatchTextExpr):
 -                 if filter_cond and "filter" not in matchExpr.extra_options:
 -                     matchExpr.extra_options.update({"filter": filter_cond})
 -                 fields = ",".join(matchExpr.fields)
 -                 filter_fulltext = f"filter_fulltext('{fields}', '{matchExpr.matching_text}')"
 -                 if filter_cond:
 -                     filter_fulltext = f"({filter_cond}) AND {filter_fulltext}"
 -                 minimum_should_match = matchExpr.extra_options.get("minimum_should_match", 0.0)
 -                 if isinstance(minimum_should_match, float):
 -                     str_minimum_should_match = str(int(minimum_should_match * 100)) + "%"
 -                     matchExpr.extra_options["minimum_should_match"] = str_minimum_should_match
 -                 for k, v in matchExpr.extra_options.items():
 -                     if not isinstance(v, str):
 -                         matchExpr.extra_options[k] = str(v)
 -                 logger.debug(f"INFINITY search MatchTextExpr: {json.dumps(matchExpr.__dict__)}")
 -             elif isinstance(matchExpr, MatchDenseExpr):
 -                 if filter_cond and "filter" not in matchExpr.extra_options:
 -                     matchExpr.extra_options.update({"filter": filter_fulltext})
 -                 for k, v in matchExpr.extra_options.items():
 -                     if not isinstance(v, str):
 -                         matchExpr.extra_options[k] = str(v)
 -                 logger.debug(f"INFINITY search MatchDenseExpr: {json.dumps(matchExpr.__dict__)}")
 -             elif isinstance(matchExpr, FusionExpr):
 -                 logger.debug(f"INFINITY search FusionExpr: {json.dumps(matchExpr.__dict__)}")
 - 
 -         order_by_expr_list = list()
 -         if orderBy.fields:
 -             for order_field in orderBy.fields:
 -                 if order_field[1] == 0:
 -                     order_by_expr_list.append((order_field[0], SortType.Asc))
 -                 else:
 -                     order_by_expr_list.append((order_field[0], SortType.Desc))
 - 
 -         total_hits_count = 0
 -         # Scatter search tables and gather the results
 -         for indexName in indexNames:
 -             for knowledgebaseId in knowledgebaseIds:
 -                 table_name = f"{indexName}_{knowledgebaseId}"
 -                 try:
 -                     table_instance = db_instance.get_table(table_name)
 -                 except Exception:
 -                     continue
 -                 table_list.append(table_name)
 -                 builder = table_instance.output(selectFields)
 -                 if len(matchExprs) > 0:
 -                     for matchExpr in matchExprs:
 -                         if isinstance(matchExpr, MatchTextExpr):
 -                             fields = ",".join(matchExpr.fields)
 -                             builder = builder.match_text(
 -                                 fields,
 -                                 matchExpr.matching_text,
 -                                 matchExpr.topn,
 -                                 matchExpr.extra_options,
 -                             )
 -                         elif isinstance(matchExpr, MatchDenseExpr):
 -                             builder = builder.match_dense(
 -                                 matchExpr.vector_column_name,
 -                                 matchExpr.embedding_data,
 -                                 matchExpr.embedding_data_type,
 -                                 matchExpr.distance_type,
 -                                 matchExpr.topn,
 -                                 matchExpr.extra_options,
 -                             )
 -                         elif isinstance(matchExpr, FusionExpr):
 -                             builder = builder.fusion(
 -                                 matchExpr.method, matchExpr.topn, matchExpr.fusion_params
 -                             )
 -                 else:
 -                     if len(filter_cond) > 0:
 -                         builder.filter(filter_cond)
 -                 if orderBy.fields:
 -                     builder.sort(order_by_expr_list)
 -                 builder.offset(offset).limit(limit)
 -                 kb_res, extra_result = builder.option({"total_hits_count": True}).to_pl()
 -                 if extra_result:
 -                     total_hits_count += int(extra_result["total_hits_count"])
 -                 logger.debug(f"INFINITY search table: {str(table_name)}, result: {str(kb_res)}")
 -                 df_list.append(kb_res)
 -         self.connPool.release_conn(inf_conn)
 -         res = concat_dataframes(df_list, selectFields)
 -         if matchExprs:
 -             res = res.sort(pl.col("SCORE") + pl.col("pagerank_fea"), descending=True, maintain_order=True)
 -         res = res.limit(limit)
 -         logger.debug(f"INFINITY search final result: {str(res)}")
 -         return res, total_hits_count
 - 
 -     def get(
 -             self, chunkId: str, indexName: str, knowledgebaseIds: list[str]
 -     ) -> dict | None:
 -         inf_conn = self.connPool.get_conn()
 -         db_instance = inf_conn.get_database(self.dbName)
 -         df_list = list()
 -         assert isinstance(knowledgebaseIds, list)
 -         table_list = list()
 -         for knowledgebaseId in knowledgebaseIds:
 -             table_name = f"{indexName}_{knowledgebaseId}"
 -             table_list.append(table_name)
 -             table_instance = db_instance.get_table(table_name)
 -             kb_res, _ = table_instance.output(["*"]).filter(f"id = '{chunkId}'").to_pl()
 -             logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(kb_res)}")
 -             df_list.append(kb_res)
 -         self.connPool.release_conn(inf_conn)
 -         res = concat_dataframes(df_list, ["id"])
 -         res_fields = self.getFields(res, res.columns)
 -         return res_fields.get(chunkId, None)
 - 
 -     def insert(
 -             self, documents: list[dict], indexName: str, knowledgebaseId: str
 -     ) -> list[str]:
 -         inf_conn = self.connPool.get_conn()
 -         db_instance = inf_conn.get_database(self.dbName)
 -         table_name = f"{indexName}_{knowledgebaseId}"
 -         try:
 -             table_instance = db_instance.get_table(table_name)
 -         except InfinityException as e:
 -             # src/common/status.cppm, kTableNotExist = 3022
 -             if e.error_code != ErrorCode.TABLE_NOT_EXIST:
 -                 raise
 -             vector_size = 0
 -             patt = re.compile(r"q_(?P<vector_size>\d+)_vec")
 -             for k in documents[0].keys():
 -                 m = patt.match(k)
 -                 if m:
 -                     vector_size = int(m.group("vector_size"))
 -                     break
 -             if vector_size == 0:
 -                 raise ValueError("Cannot infer vector size from documents")
 -             self.createIdx(indexName, knowledgebaseId, vector_size)
 -             table_instance = db_instance.get_table(table_name)
 - 
 -         docs = copy.deepcopy(documents)
 -         for d in docs:
 -             assert "_id" not in d
 -             assert "id" in d
 -             for k, v in d.items():
 -                 if k in ["important_kwd", "question_kwd", "entities_kwd"]:
 -                     assert isinstance(v, list)
 -                     d[k] = "###".join(v)
 -                 elif k == 'kb_id':
 -                     if isinstance(d[k], list):
 -                         d[k] = d[k][0] # since d[k] is a list, but we need a str
 -                 elif k == "position_int":
 -                     assert isinstance(v, list)
 -                     arr = [num for row in v for num in row]
 -                     d[k] = "_".join(f"{num:08x}" for num in arr)
 -                 elif k in ["page_num_int", "top_int"]:
 -                     assert isinstance(v, list)
 -                     d[k] = "_".join(f"{num:08x}" for num in v)
 -         ids = ["'{}'".format(d["id"]) for d in docs]
 -         str_ids = ", ".join(ids)
 -         str_filter = f"id IN ({str_ids})"
 -         table_instance.delete(str_filter)
 -         # for doc in documents:
 -         #     logger.info(f"insert position_int: {doc['position_int']}")
 -         # logger.info(f"InfinityConnection.insert {json.dumps(documents)}")
 -         table_instance.insert(docs)
 -         self.connPool.release_conn(inf_conn)
 -         logger.debug(f"INFINITY inserted into {table_name} {str_ids}.")
 -         return []
 - 
 -     def update(
 -             self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str
 -     ) -> bool:
 -         # if 'position_int' in newValue:
 -         #     logger.info(f"update position_int: {newValue['position_int']}")
 -         inf_conn = self.connPool.get_conn()
 -         db_instance = inf_conn.get_database(self.dbName)
 -         table_name = f"{indexName}_{knowledgebaseId}"
 -         table_instance = db_instance.get_table(table_name)
 -         if "exist" in condition:
 -             del condition["exist"]
 -         filter = equivalent_condition_to_str(condition)
 -         for k, v in list(newValue.items()):
 -             if k.endswith("_kwd") and isinstance(v, list):
 -                 newValue[k] = " ".join(v)
 -             elif k == 'kb_id':
 -                 if isinstance(newValue[k], list):
 -                     newValue[k] = newValue[k][0] # since d[k] is a list, but we need a str
 -             elif k == "position_int":
 -                 assert isinstance(v, list)
 -                 arr = [num for row in v for num in row]
 -                 newValue[k] = "_".join(f"{num:08x}" for num in arr)
 -             elif k in ["page_num_int", "top_int"]:
 -                 assert isinstance(v, list)
 -                 newValue[k] = "_".join(f"{num:08x}" for num in v)
 -             elif k == "remove" and v in ["pagerank_fea"]:
 -                 del newValue[k]
 -                 newValue[v] = 0
 -         logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.")
 -         table_instance.update(filter, newValue)
 -         self.connPool.release_conn(inf_conn)
 -         return True
 - 
 -     def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int:
 -         inf_conn = self.connPool.get_conn()
 -         db_instance = inf_conn.get_database(self.dbName)
 -         table_name = f"{indexName}_{knowledgebaseId}"
 -         filter = equivalent_condition_to_str(condition)
 -         try:
 -             table_instance = db_instance.get_table(table_name)
 -         except Exception:
 -             logger.warning(
 -                 f"Skipped deleting `{filter}` from table {table_name} since the table doesn't exist."
 -             )
 -             return 0
 -         logger.debug(f"INFINITY delete table {table_name}, filter {filter}.")
 -         res = table_instance.delete(filter)
 -         self.connPool.release_conn(inf_conn)
 -         return res.deleted_rows
 - 
 -     """
 -     Helper functions for search result
 -     """
 - 
 -     def getTotal(self, res: tuple[pl.DataFrame, int] | pl.DataFrame) -> int:
 -         if isinstance(res, tuple):
 -             return res[1]
 -         return len(res)
 - 
 -     def getChunkIds(self, res: tuple[pl.DataFrame, int] | pl.DataFrame) -> list[str]:
 -         if isinstance(res, tuple):
 -             res = res[0]
 -         return list(res["id"])
 - 
 -     def getFields(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, fields: list[str]) -> list[str, dict]:
 -         if isinstance(res, tuple):
 -             res = res[0]
 -         res_fields = {}
 -         if not fields:
 -             return {}
 -         num_rows = len(res)
 -         column_id = res["id"]
 -         for i in range(num_rows):
 -             id = column_id[i]
 -             m = {"id": id}
 -             for fieldnm in fields:
 -                 if fieldnm not in res:
 -                     m[fieldnm] = None
 -                     continue
 -                 v = res[fieldnm][i]
 -                 if isinstance(v, Series):
 -                     v = list(v)
 -                 elif fieldnm in ["important_kwd", "question_kwd", "entities_kwd"]:
 -                     assert isinstance(v, str)
 -                     v = [kwd for kwd in v.split("###") if kwd]
 -                 elif fieldnm == "position_int":
 -                     assert isinstance(v, str)
 -                     if v:
 -                         arr = [int(hex_val, 16) for hex_val in v.split('_')]
 -                         v = [arr[i:i + 5] for i in range(0, len(arr), 5)]
 -                     else:
 -                         v = []
 -                 elif fieldnm in ["page_num_int", "top_int"]:
 -                     assert isinstance(v, str)
 -                     if v:
 -                         v = [int(hex_val, 16) for hex_val in v.split('_')]
 -                     else:
 -                         v = []
 -                 else:
 -                     if not isinstance(v, str):
 -                         v = str(v)
 -                     # if fieldnm.endswith("_tks"):
 -                     #     v = rmSpace(v)
 -                 m[fieldnm] = v
 -             res_fields[id] = m
 -         return res_fields
 - 
 -     def getHighlight(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, keywords: list[str], fieldnm: str):
 -         if isinstance(res, tuple):
 -             res = res[0]
 -         ans = {}
 -         num_rows = len(res)
 -         column_id = res["id"]
 -         for i in range(num_rows):
 -             id = column_id[i]
 -             txt = res[fieldnm][i]
 -             txt = re.sub(r"[\r\n]", " ", txt, flags=re.IGNORECASE | re.MULTILINE)
 -             txts = []
 -             for t in re.split(r"[.?!;\n]", txt):
 -                 for w in keywords:
 -                     t = re.sub(
 -                         r"(^|[ .?/'\"\(\)!,:;-])(%s)([ .?/'\"\(\)!,:;-])"
 -                         % re.escape(w),
 -                         r"\1<em>\2</em>\3",
 -                         t,
 -                         flags=re.IGNORECASE | re.MULTILINE,
 -                     )
 -                 if not re.search(
 -                         r"<em>[^<>]+</em>", t, flags=re.IGNORECASE | re.MULTILINE
 -                 ):
 -                     continue
 -                 txts.append(t)
 -             ans[id] = "...".join(txts)
 -         return ans
 - 
 -     def getAggregation(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, fieldnm: str):
 -         """
 -         TODO: Infinity doesn't provide aggregation
 -         """
 -         return list()
 - 
 -     """
 -     SQL
 -     """
 - 
 -     def sql(sql: str, fetch_size: int, format: str):
 -         raise NotImplementedError("Not implemented")
 
 
  |