You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

infinity_conn.py 23KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. import logging
  2. import os
  3. import re
  4. import json
  5. import time
  6. import copy
  7. import infinity
  8. from infinity.common import ConflictType, InfinityException, SortType
  9. from infinity.index import IndexInfo, IndexType
  10. from infinity.connection_pool import ConnectionPool
  11. from infinity.errors import ErrorCode
  12. from rag import settings
  13. from rag.settings import PAGERANK_FLD
  14. from rag.utils import singleton
  15. import polars as pl
  16. from polars.series.series import Series
  17. from api.utils.file_utils import get_project_base_directory
  18. from rag.utils.doc_store_conn import (
  19. DocStoreConnection,
  20. MatchExpr,
  21. MatchTextExpr,
  22. MatchDenseExpr,
  23. FusionExpr,
  24. OrderByExpr,
  25. )
  26. logger = logging.getLogger('ragflow.infinity_conn')
  27. def equivalent_condition_to_str(condition: dict) -> str | None:
  28. assert "_id" not in condition
  29. cond = list()
  30. for k, v in condition.items():
  31. if not isinstance(k, str) or k in ["kb_id"] or not v:
  32. continue
  33. if isinstance(v, list):
  34. inCond = list()
  35. for item in v:
  36. if isinstance(item, str):
  37. inCond.append(f"'{item}'")
  38. else:
  39. inCond.append(str(item))
  40. if inCond:
  41. strInCond = ", ".join(inCond)
  42. strInCond = f"{k} IN ({strInCond})"
  43. cond.append(strInCond)
  44. elif isinstance(v, str):
  45. cond.append(f"{k}='{v}'")
  46. else:
  47. cond.append(f"{k}={str(v)}")
  48. return " AND ".join(cond) if cond else "1=1"
  49. def concat_dataframes(df_list: list[pl.DataFrame], selectFields: list[str]) -> pl.DataFrame:
  50. """
  51. Concatenate multiple dataframes into one.
  52. """
  53. df_list = [df for df in df_list if not df.is_empty()]
  54. if df_list:
  55. return pl.concat(df_list)
  56. schema = dict()
  57. for field_name in selectFields:
  58. if field_name == 'score()': # Workaround: fix schema is changed to score()
  59. schema['SCORE'] = str
  60. else:
  61. schema[field_name] = str
  62. return pl.DataFrame(schema=schema)
  63. @singleton
  64. class InfinityConnection(DocStoreConnection):
  65. def __init__(self):
  66. self.dbName = settings.INFINITY.get("db_name", "default_db")
  67. infinity_uri = settings.INFINITY["uri"]
  68. if ":" in infinity_uri:
  69. host, port = infinity_uri.split(":")
  70. infinity_uri = infinity.common.NetworkAddress(host, int(port))
  71. self.connPool = None
  72. logger.info(f"Use Infinity {infinity_uri} as the doc engine.")
  73. for _ in range(24):
  74. try:
  75. connPool = ConnectionPool(infinity_uri)
  76. inf_conn = connPool.get_conn()
  77. res = inf_conn.show_current_node()
  78. if res.error_code == ErrorCode.OK and res.server_status == "started":
  79. self._migrate_db(inf_conn)
  80. self.connPool = connPool
  81. connPool.release_conn(inf_conn)
  82. break
  83. connPool.release_conn(inf_conn)
  84. logger.warn(f"Infinity status: {res.server_status}. Waiting Infinity {infinity_uri} to be healthy.")
  85. time.sleep(5)
  86. except Exception as e:
  87. logger.warning(f"{str(e)}. Waiting Infinity {infinity_uri} to be healthy.")
  88. time.sleep(5)
  89. if self.connPool is None:
  90. msg = f"Infinity {infinity_uri} is unhealthy in 120s."
  91. logger.error(msg)
  92. raise Exception(msg)
  93. logger.info(f"Infinity {infinity_uri} is healthy.")
  94. def _migrate_db(self, inf_conn):
  95. inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
  96. fp_mapping = os.path.join(
  97. get_project_base_directory(), "conf", "infinity_mapping.json"
  98. )
  99. if not os.path.exists(fp_mapping):
  100. raise Exception(f"Mapping file not found at {fp_mapping}")
  101. schema = json.load(open(fp_mapping))
  102. table_names = inf_db.list_tables().table_names
  103. for table_name in table_names:
  104. inf_table = inf_db.get_table(table_name)
  105. index_names = inf_table.list_indexes().index_names
  106. if "q_vec_idx" not in index_names:
  107. # Skip tables not created by me
  108. continue
  109. column_names = inf_table.show_columns()["name"]
  110. column_names = set(column_names)
  111. for field_name, field_info in schema.items():
  112. if field_name in column_names:
  113. continue
  114. res = inf_table.add_columns({field_name: field_info})
  115. assert res.error_code == infinity.ErrorCode.OK
  116. logger.info(
  117. f"INFINITY added following column to table {table_name}: {field_name} {field_info}"
  118. )
  119. if field_info["type"] != "varchar" or "analyzer" not in field_info:
  120. continue
  121. inf_table.create_index(
  122. f"text_idx_{field_name}",
  123. IndexInfo(
  124. field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}
  125. ),
  126. ConflictType.Ignore,
  127. )
  128. """
  129. Database operations
  130. """
  131. def dbType(self) -> str:
  132. return "infinity"
  133. def health(self) -> dict:
  134. """
  135. Return the health status of the database.
  136. """
  137. inf_conn = self.connPool.get_conn()
  138. res = inf_conn.show_current_node()
  139. self.connPool.release_conn(inf_conn)
  140. res2 = {
  141. "type": "infinity",
  142. "status": "green" if res.error_code == 0 and res.server_status == "started" else "red",
  143. "error": res.error_msg,
  144. }
  145. return res2
  146. """
  147. Table operations
  148. """
  149. def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int):
  150. table_name = f"{indexName}_{knowledgebaseId}"
  151. inf_conn = self.connPool.get_conn()
  152. inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
  153. fp_mapping = os.path.join(
  154. get_project_base_directory(), "conf", "infinity_mapping.json"
  155. )
  156. if not os.path.exists(fp_mapping):
  157. raise Exception(f"Mapping file not found at {fp_mapping}")
  158. schema = json.load(open(fp_mapping))
  159. vector_name = f"q_{vectorSize}_vec"
  160. schema[vector_name] = {"type": f"vector,{vectorSize},float"}
  161. inf_table = inf_db.create_table(
  162. table_name,
  163. schema,
  164. ConflictType.Ignore,
  165. )
  166. inf_table.create_index(
  167. "q_vec_idx",
  168. IndexInfo(
  169. vector_name,
  170. IndexType.Hnsw,
  171. {
  172. "M": "16",
  173. "ef_construction": "50",
  174. "metric": "cosine",
  175. "encode": "lvq",
  176. },
  177. ),
  178. ConflictType.Ignore,
  179. )
  180. for field_name, field_info in schema.items():
  181. if field_info["type"] != "varchar" or "analyzer" not in field_info:
  182. continue
  183. inf_table.create_index(
  184. f"text_idx_{field_name}",
  185. IndexInfo(
  186. field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}
  187. ),
  188. ConflictType.Ignore,
  189. )
  190. self.connPool.release_conn(inf_conn)
  191. logger.info(
  192. f"INFINITY created table {table_name}, vector size {vectorSize}"
  193. )
  194. def deleteIdx(self, indexName: str, knowledgebaseId: str):
  195. table_name = f"{indexName}_{knowledgebaseId}"
  196. inf_conn = self.connPool.get_conn()
  197. db_instance = inf_conn.get_database(self.dbName)
  198. db_instance.drop_table(table_name, ConflictType.Ignore)
  199. self.connPool.release_conn(inf_conn)
  200. logger.info(f"INFINITY dropped table {table_name}")
  201. def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
  202. table_name = f"{indexName}_{knowledgebaseId}"
  203. try:
  204. inf_conn = self.connPool.get_conn()
  205. db_instance = inf_conn.get_database(self.dbName)
  206. _ = db_instance.get_table(table_name)
  207. self.connPool.release_conn(inf_conn)
  208. return True
  209. except Exception as e:
  210. logger.warning(f"INFINITY indexExist {str(e)}")
  211. return False
  212. """
  213. CRUD operations
  214. """
  215. def search(
  216. self, selectFields: list[str],
  217. highlightFields: list[str],
  218. condition: dict,
  219. matchExprs: list[MatchExpr],
  220. orderBy: OrderByExpr,
  221. offset: int,
  222. limit: int,
  223. indexNames: str | list[str],
  224. knowledgebaseIds: list[str],
  225. aggFields: list[str] = [],
  226. rank_feature: dict | None = None
  227. ) -> list[dict] | pl.DataFrame:
  228. """
  229. TODO: Infinity doesn't provide highlight
  230. """
  231. if isinstance(indexNames, str):
  232. indexNames = indexNames.split(",")
  233. assert isinstance(indexNames, list) and len(indexNames) > 0
  234. inf_conn = self.connPool.get_conn()
  235. db_instance = inf_conn.get_database(self.dbName)
  236. df_list = list()
  237. table_list = list()
  238. for essential_field in ["id"]:
  239. if essential_field not in selectFields:
  240. selectFields.append(essential_field)
  241. if matchExprs:
  242. for essential_field in ["score()", PAGERANK_FLD]:
  243. selectFields.append(essential_field)
  244. # Prepare expressions common to all tables
  245. filter_cond = None
  246. filter_fulltext = ""
  247. if condition:
  248. filter_cond = equivalent_condition_to_str(condition)
  249. for matchExpr in matchExprs:
  250. if isinstance(matchExpr, MatchTextExpr):
  251. if filter_cond and "filter" not in matchExpr.extra_options:
  252. matchExpr.extra_options.update({"filter": filter_cond})
  253. fields = ",".join(matchExpr.fields)
  254. filter_fulltext = f"filter_fulltext('{fields}', '{matchExpr.matching_text}')"
  255. if filter_cond:
  256. filter_fulltext = f"({filter_cond}) AND {filter_fulltext}"
  257. minimum_should_match = matchExpr.extra_options.get("minimum_should_match", 0.0)
  258. if isinstance(minimum_should_match, float):
  259. str_minimum_should_match = str(int(minimum_should_match * 100)) + "%"
  260. matchExpr.extra_options["minimum_should_match"] = str_minimum_should_match
  261. for k, v in matchExpr.extra_options.items():
  262. if not isinstance(v, str):
  263. matchExpr.extra_options[k] = str(v)
  264. logger.debug(f"INFINITY search MatchTextExpr: {json.dumps(matchExpr.__dict__)}")
  265. elif isinstance(matchExpr, MatchDenseExpr):
  266. if filter_cond and "filter" not in matchExpr.extra_options:
  267. matchExpr.extra_options.update({"filter": filter_fulltext})
  268. for k, v in matchExpr.extra_options.items():
  269. if not isinstance(v, str):
  270. matchExpr.extra_options[k] = str(v)
  271. logger.debug(f"INFINITY search MatchDenseExpr: {json.dumps(matchExpr.__dict__)}")
  272. elif isinstance(matchExpr, FusionExpr):
  273. logger.debug(f"INFINITY search FusionExpr: {json.dumps(matchExpr.__dict__)}")
  274. order_by_expr_list = list()
  275. if orderBy.fields:
  276. for order_field in orderBy.fields:
  277. if order_field[1] == 0:
  278. order_by_expr_list.append((order_field[0], SortType.Asc))
  279. else:
  280. order_by_expr_list.append((order_field[0], SortType.Desc))
  281. total_hits_count = 0
  282. # Scatter search tables and gather the results
  283. for indexName in indexNames:
  284. for knowledgebaseId in knowledgebaseIds:
  285. table_name = f"{indexName}_{knowledgebaseId}"
  286. try:
  287. table_instance = db_instance.get_table(table_name)
  288. except Exception:
  289. continue
  290. table_list.append(table_name)
  291. builder = table_instance.output(selectFields)
  292. if len(matchExprs) > 0:
  293. for matchExpr in matchExprs:
  294. if isinstance(matchExpr, MatchTextExpr):
  295. fields = ",".join(matchExpr.fields)
  296. builder = builder.match_text(
  297. fields,
  298. matchExpr.matching_text,
  299. matchExpr.topn,
  300. matchExpr.extra_options,
  301. )
  302. elif isinstance(matchExpr, MatchDenseExpr):
  303. builder = builder.match_dense(
  304. matchExpr.vector_column_name,
  305. matchExpr.embedding_data,
  306. matchExpr.embedding_data_type,
  307. matchExpr.distance_type,
  308. matchExpr.topn,
  309. matchExpr.extra_options,
  310. )
  311. elif isinstance(matchExpr, FusionExpr):
  312. builder = builder.fusion(
  313. matchExpr.method, matchExpr.topn, matchExpr.fusion_params
  314. )
  315. else:
  316. if len(filter_cond) > 0:
  317. builder.filter(filter_cond)
  318. if orderBy.fields:
  319. builder.sort(order_by_expr_list)
  320. builder.offset(offset).limit(limit)
  321. kb_res, extra_result = builder.option({"total_hits_count": True}).to_pl()
  322. if extra_result:
  323. total_hits_count += int(extra_result["total_hits_count"])
  324. logger.debug(f"INFINITY search table: {str(table_name)}, result: {str(kb_res)}")
  325. df_list.append(kb_res)
  326. self.connPool.release_conn(inf_conn)
  327. res = concat_dataframes(df_list, selectFields)
  328. if matchExprs:
  329. res = res.sort(pl.col("SCORE") + pl.col(PAGERANK_FLD), descending=True, maintain_order=True)
  330. res = res.limit(limit)
  331. logger.debug(f"INFINITY search final result: {str(res)}")
  332. return res, total_hits_count
  333. def get(
  334. self, chunkId: str, indexName: str, knowledgebaseIds: list[str]
  335. ) -> dict | None:
  336. inf_conn = self.connPool.get_conn()
  337. db_instance = inf_conn.get_database(self.dbName)
  338. df_list = list()
  339. assert isinstance(knowledgebaseIds, list)
  340. table_list = list()
  341. for knowledgebaseId in knowledgebaseIds:
  342. table_name = f"{indexName}_{knowledgebaseId}"
  343. table_list.append(table_name)
  344. table_instance = None
  345. try:
  346. table_instance = db_instance.get_table(table_name)
  347. except Exception:
  348. logger.warning(
  349. f"Table not found: {table_name}, this knowledge base isn't created in Infinity. Maybe it is created in other document engine.")
  350. continue
  351. kb_res, _ = table_instance.output(["*"]).filter(f"id = '{chunkId}'").to_pl()
  352. logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(kb_res)}")
  353. df_list.append(kb_res)
  354. self.connPool.release_conn(inf_conn)
  355. res = concat_dataframes(df_list, ["id"])
  356. res_fields = self.getFields(res, res.columns)
  357. return res_fields.get(chunkId, None)
  358. def insert(
  359. self, documents: list[dict], indexName: str, knowledgebaseId: str = None
  360. ) -> list[str]:
  361. inf_conn = self.connPool.get_conn()
  362. db_instance = inf_conn.get_database(self.dbName)
  363. table_name = f"{indexName}_{knowledgebaseId}"
  364. try:
  365. table_instance = db_instance.get_table(table_name)
  366. except InfinityException as e:
  367. # src/common/status.cppm, kTableNotExist = 3022
  368. if e.error_code != ErrorCode.TABLE_NOT_EXIST:
  369. raise
  370. vector_size = 0
  371. patt = re.compile(r"q_(?P<vector_size>\d+)_vec")
  372. for k in documents[0].keys():
  373. m = patt.match(k)
  374. if m:
  375. vector_size = int(m.group("vector_size"))
  376. break
  377. if vector_size == 0:
  378. raise ValueError("Cannot infer vector size from documents")
  379. self.createIdx(indexName, knowledgebaseId, vector_size)
  380. table_instance = db_instance.get_table(table_name)
  381. docs = copy.deepcopy(documents)
  382. for d in docs:
  383. assert "_id" not in d
  384. assert "id" in d
  385. for k, v in d.items():
  386. if k in ["important_kwd", "question_kwd", "entities_kwd"]:
  387. assert isinstance(v, list)
  388. d[k] = "###".join(v)
  389. elif k == 'kb_id':
  390. if isinstance(d[k], list):
  391. d[k] = d[k][0] # since d[k] is a list, but we need a str
  392. elif k == "position_int":
  393. assert isinstance(v, list)
  394. arr = [num for row in v for num in row]
  395. d[k] = "_".join(f"{num:08x}" for num in arr)
  396. elif k in ["page_num_int", "top_int"]:
  397. assert isinstance(v, list)
  398. d[k] = "_".join(f"{num:08x}" for num in v)
  399. ids = ["'{}'".format(d["id"]) for d in docs]
  400. str_ids = ", ".join(ids)
  401. str_filter = f"id IN ({str_ids})"
  402. table_instance.delete(str_filter)
  403. # for doc in documents:
  404. # logger.info(f"insert position_int: {doc['position_int']}")
  405. # logger.info(f"InfinityConnection.insert {json.dumps(documents)}")
  406. table_instance.insert(docs)
  407. self.connPool.release_conn(inf_conn)
  408. logger.debug(f"INFINITY inserted into {table_name} {str_ids}.")
  409. return []
  410. def update(
  411. self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str
  412. ) -> bool:
  413. # if 'position_int' in newValue:
  414. # logger.info(f"update position_int: {newValue['position_int']}")
  415. inf_conn = self.connPool.get_conn()
  416. db_instance = inf_conn.get_database(self.dbName)
  417. table_name = f"{indexName}_{knowledgebaseId}"
  418. table_instance = db_instance.get_table(table_name)
  419. if "exist" in condition:
  420. del condition["exist"]
  421. filter = equivalent_condition_to_str(condition)
  422. for k, v in list(newValue.items()):
  423. if k.endswith("_kwd") and isinstance(v, list):
  424. newValue[k] = " ".join(v)
  425. elif k == 'kb_id':
  426. if isinstance(newValue[k], list):
  427. newValue[k] = newValue[k][0] # since d[k] is a list, but we need a str
  428. elif k == "position_int":
  429. assert isinstance(v, list)
  430. arr = [num for row in v for num in row]
  431. newValue[k] = "_".join(f"{num:08x}" for num in arr)
  432. elif k in ["page_num_int", "top_int"]:
  433. assert isinstance(v, list)
  434. newValue[k] = "_".join(f"{num:08x}" for num in v)
  435. elif k == "remove" and v in [PAGERANK_FLD]:
  436. del newValue[k]
  437. newValue[v] = 0
  438. logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.")
  439. table_instance.update(filter, newValue)
  440. self.connPool.release_conn(inf_conn)
  441. return True
  442. def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int:
  443. inf_conn = self.connPool.get_conn()
  444. db_instance = inf_conn.get_database(self.dbName)
  445. table_name = f"{indexName}_{knowledgebaseId}"
  446. filter = equivalent_condition_to_str(condition)
  447. try:
  448. table_instance = db_instance.get_table(table_name)
  449. except Exception:
  450. logger.warning(
  451. f"Skipped deleting `{filter}` from table {table_name} since the table doesn't exist."
  452. )
  453. return 0
  454. logger.debug(f"INFINITY delete table {table_name}, filter {filter}.")
  455. res = table_instance.delete(filter)
  456. self.connPool.release_conn(inf_conn)
  457. return res.deleted_rows
  458. """
  459. Helper functions for search result
  460. """
  461. def getTotal(self, res: tuple[pl.DataFrame, int] | pl.DataFrame) -> int:
  462. if isinstance(res, tuple):
  463. return res[1]
  464. return len(res)
  465. def getChunkIds(self, res: tuple[pl.DataFrame, int] | pl.DataFrame) -> list[str]:
  466. if isinstance(res, tuple):
  467. res = res[0]
  468. return list(res["id"])
  469. def getFields(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, fields: list[str]) -> list[str, dict]:
  470. if isinstance(res, tuple):
  471. res = res[0]
  472. res_fields = {}
  473. if not fields:
  474. return {}
  475. num_rows = len(res)
  476. column_id = res["id"]
  477. for i in range(num_rows):
  478. id = column_id[i]
  479. m = {"id": id}
  480. for fieldnm in fields:
  481. if fieldnm not in res:
  482. m[fieldnm] = None
  483. continue
  484. v = res[fieldnm][i]
  485. if isinstance(v, Series):
  486. v = list(v)
  487. elif fieldnm in ["important_kwd", "question_kwd", "entities_kwd"]:
  488. assert isinstance(v, str)
  489. v = [kwd for kwd in v.split("###") if kwd]
  490. elif fieldnm == "position_int":
  491. assert isinstance(v, str)
  492. if v:
  493. arr = [int(hex_val, 16) for hex_val in v.split('_')]
  494. v = [arr[i:i + 5] for i in range(0, len(arr), 5)]
  495. else:
  496. v = []
  497. elif fieldnm in ["page_num_int", "top_int"]:
  498. assert isinstance(v, str)
  499. if v:
  500. v = [int(hex_val, 16) for hex_val in v.split('_')]
  501. else:
  502. v = []
  503. else:
  504. if not isinstance(v, str):
  505. v = str(v)
  506. # if fieldnm.endswith("_tks"):
  507. # v = rmSpace(v)
  508. m[fieldnm] = v
  509. res_fields[id] = m
  510. return res_fields
  511. def getHighlight(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, keywords: list[str], fieldnm: str):
  512. if isinstance(res, tuple):
  513. res = res[0]
  514. ans = {}
  515. num_rows = len(res)
  516. column_id = res["id"]
  517. for i in range(num_rows):
  518. id = column_id[i]
  519. txt = res[fieldnm][i]
  520. txt = re.sub(r"[\r\n]", " ", txt, flags=re.IGNORECASE | re.MULTILINE)
  521. txts = []
  522. for t in re.split(r"[.?!;\n]", txt):
  523. for w in keywords:
  524. t = re.sub(
  525. r"(^|[ .?/'\"\(\)!,:;-])(%s)([ .?/'\"\(\)!,:;-])"
  526. % re.escape(w),
  527. r"\1<em>\2</em>\3",
  528. t,
  529. flags=re.IGNORECASE | re.MULTILINE,
  530. )
  531. if not re.search(
  532. r"<em>[^<>]+</em>", t, flags=re.IGNORECASE | re.MULTILINE
  533. ):
  534. continue
  535. txts.append(t)
  536. ans[id] = "...".join(txts)
  537. return ans
  538. def getAggregation(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, fieldnm: str):
  539. """
  540. TODO: Infinity doesn't provide aggregation
  541. """
  542. return list()
  543. """
  544. SQL
  545. """
  546. def sql(sql: str, fetch_size: int, format: str):
  547. raise NotImplementedError("Not implemented")