您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. #
  2. # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import ABC
  17. import builtins
  18. import json
  19. import os
  20. import logging
  21. from functools import partial
  22. from typing import Any, Tuple, Union
  23. import pandas as pd
  24. from agent import settings
  25. _FEEDED_DEPRECATED_PARAMS = "_feeded_deprecated_params"
  26. _DEPRECATED_PARAMS = "_deprecated_params"
  27. _USER_FEEDED_PARAMS = "_user_feeded_params"
  28. _IS_RAW_CONF = "_is_raw_conf"
  29. class ComponentParamBase(ABC):
  30. def __init__(self):
  31. self.output_var_name = "output"
  32. self.infor_var_name = "infor"
  33. self.message_history_window_size = 22
  34. self.query = []
  35. self.inputs = []
  36. self.debug_inputs = []
  37. def set_name(self, name: str):
  38. self._name = name
  39. return self
  40. def check(self):
  41. raise NotImplementedError("Parameter Object should be checked.")
  42. def output(self):
  43. return None
  44. @classmethod
  45. def _get_or_init_deprecated_params_set(cls):
  46. if not hasattr(cls, _DEPRECATED_PARAMS):
  47. setattr(cls, _DEPRECATED_PARAMS, set())
  48. return getattr(cls, _DEPRECATED_PARAMS)
  49. def _get_or_init_feeded_deprecated_params_set(self, conf=None):
  50. if not hasattr(self, _FEEDED_DEPRECATED_PARAMS):
  51. if conf is None:
  52. setattr(self, _FEEDED_DEPRECATED_PARAMS, set())
  53. else:
  54. setattr(
  55. self,
  56. _FEEDED_DEPRECATED_PARAMS,
  57. set(conf[_FEEDED_DEPRECATED_PARAMS]),
  58. )
  59. return getattr(self, _FEEDED_DEPRECATED_PARAMS)
  60. def _get_or_init_user_feeded_params_set(self, conf=None):
  61. if not hasattr(self, _USER_FEEDED_PARAMS):
  62. if conf is None:
  63. setattr(self, _USER_FEEDED_PARAMS, set())
  64. else:
  65. setattr(self, _USER_FEEDED_PARAMS, set(conf[_USER_FEEDED_PARAMS]))
  66. return getattr(self, _USER_FEEDED_PARAMS)
  67. def get_user_feeded(self):
  68. return self._get_or_init_user_feeded_params_set()
  69. def get_feeded_deprecated_params(self):
  70. return self._get_or_init_feeded_deprecated_params_set()
  71. @property
  72. def _deprecated_params_set(self):
  73. return {name: True for name in self.get_feeded_deprecated_params()}
  74. def __str__(self):
  75. return json.dumps(self.as_dict(), ensure_ascii=False)
  76. def as_dict(self):
  77. def _recursive_convert_obj_to_dict(obj):
  78. ret_dict = {}
  79. for attr_name in list(obj.__dict__):
  80. if attr_name in [_FEEDED_DEPRECATED_PARAMS, _DEPRECATED_PARAMS, _USER_FEEDED_PARAMS, _IS_RAW_CONF]:
  81. continue
  82. # get attr
  83. attr = getattr(obj, attr_name)
  84. if isinstance(attr, pd.DataFrame):
  85. ret_dict[attr_name] = attr.to_dict()
  86. continue
  87. if attr and type(attr).__name__ not in dir(builtins):
  88. ret_dict[attr_name] = _recursive_convert_obj_to_dict(attr)
  89. else:
  90. ret_dict[attr_name] = attr
  91. return ret_dict
  92. return _recursive_convert_obj_to_dict(self)
  93. def update(self, conf, allow_redundant=False):
  94. update_from_raw_conf = conf.get(_IS_RAW_CONF, True)
  95. if update_from_raw_conf:
  96. deprecated_params_set = self._get_or_init_deprecated_params_set()
  97. feeded_deprecated_params_set = (
  98. self._get_or_init_feeded_deprecated_params_set()
  99. )
  100. user_feeded_params_set = self._get_or_init_user_feeded_params_set()
  101. setattr(self, _IS_RAW_CONF, False)
  102. else:
  103. feeded_deprecated_params_set = (
  104. self._get_or_init_feeded_deprecated_params_set(conf)
  105. )
  106. user_feeded_params_set = self._get_or_init_user_feeded_params_set(conf)
  107. def _recursive_update_param(param, config, depth, prefix):
  108. if depth > settings.PARAM_MAXDEPTH:
  109. raise ValueError("Param define nesting too deep!!!, can not parse it")
  110. inst_variables = param.__dict__
  111. redundant_attrs = []
  112. for config_key, config_value in config.items():
  113. # redundant attr
  114. if config_key not in inst_variables:
  115. if not update_from_raw_conf and config_key.startswith("_"):
  116. setattr(param, config_key, config_value)
  117. else:
  118. setattr(param, config_key, config_value)
  119. # redundant_attrs.append(config_key)
  120. continue
  121. full_config_key = f"{prefix}{config_key}"
  122. if update_from_raw_conf:
  123. # add user feeded params
  124. user_feeded_params_set.add(full_config_key)
  125. # update user feeded deprecated param set
  126. if full_config_key in deprecated_params_set:
  127. feeded_deprecated_params_set.add(full_config_key)
  128. # supported attr
  129. attr = getattr(param, config_key)
  130. if type(attr).__name__ in dir(builtins) or attr is None:
  131. setattr(param, config_key, config_value)
  132. else:
  133. # recursive set obj attr
  134. sub_params = _recursive_update_param(
  135. attr, config_value, depth + 1, prefix=f"{prefix}{config_key}."
  136. )
  137. setattr(param, config_key, sub_params)
  138. if not allow_redundant and redundant_attrs:
  139. raise ValueError(
  140. f"cpn `{getattr(self, '_name', type(self))}` has redundant parameters: `{[redundant_attrs]}`"
  141. )
  142. return param
  143. return _recursive_update_param(param=self, config=conf, depth=0, prefix="")
  144. def extract_not_builtin(self):
  145. def _get_not_builtin_types(obj):
  146. ret_dict = {}
  147. for variable in obj.__dict__:
  148. attr = getattr(obj, variable)
  149. if attr and type(attr).__name__ not in dir(builtins):
  150. ret_dict[variable] = _get_not_builtin_types(attr)
  151. return ret_dict
  152. return _get_not_builtin_types(self)
  153. def validate(self):
  154. self.builtin_types = dir(builtins)
  155. self.func = {
  156. "ge": self._greater_equal_than,
  157. "le": self._less_equal_than,
  158. "in": self._in,
  159. "not_in": self._not_in,
  160. "range": self._range,
  161. }
  162. home_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
  163. param_validation_path_prefix = home_dir + "/param_validation/"
  164. param_name = type(self).__name__
  165. param_validation_path = "/".join(
  166. [param_validation_path_prefix, param_name + ".json"]
  167. )
  168. validation_json = None
  169. try:
  170. with open(param_validation_path, "r") as fin:
  171. validation_json = json.loads(fin.read())
  172. except BaseException:
  173. return
  174. self._validate_param(self, validation_json)
  175. def _validate_param(self, param_obj, validation_json):
  176. default_section = type(param_obj).__name__
  177. var_list = param_obj.__dict__
  178. for variable in var_list:
  179. attr = getattr(param_obj, variable)
  180. if type(attr).__name__ in self.builtin_types or attr is None:
  181. if variable not in validation_json:
  182. continue
  183. validation_dict = validation_json[default_section][variable]
  184. value = getattr(param_obj, variable)
  185. value_legal = False
  186. for op_type in validation_dict:
  187. if self.func[op_type](value, validation_dict[op_type]):
  188. value_legal = True
  189. break
  190. if not value_legal:
  191. raise ValueError(
  192. "Plase check runtime conf, {} = {} does not match user-parameter restriction".format(
  193. variable, value
  194. )
  195. )
  196. elif variable in validation_json:
  197. self._validate_param(attr, validation_json)
  198. @staticmethod
  199. def check_string(param, descr):
  200. if type(param).__name__ not in ["str"]:
  201. raise ValueError(
  202. descr + " {} not supported, should be string type".format(param)
  203. )
  204. @staticmethod
  205. def check_empty(param, descr):
  206. if not param:
  207. raise ValueError(
  208. descr + " does not support empty value."
  209. )
  210. @staticmethod
  211. def check_positive_integer(param, descr):
  212. if type(param).__name__ not in ["int", "long"] or param <= 0:
  213. raise ValueError(
  214. descr + " {} not supported, should be positive integer".format(param)
  215. )
  216. @staticmethod
  217. def check_positive_number(param, descr):
  218. if type(param).__name__ not in ["float", "int", "long"] or param <= 0:
  219. raise ValueError(
  220. descr + " {} not supported, should be positive numeric".format(param)
  221. )
  222. @staticmethod
  223. def check_nonnegative_number(param, descr):
  224. if type(param).__name__ not in ["float", "int", "long"] or param < 0:
  225. raise ValueError(
  226. descr
  227. + " {} not supported, should be non-negative numeric".format(param)
  228. )
  229. @staticmethod
  230. def check_decimal_float(param, descr):
  231. if type(param).__name__ not in ["float", "int"] or param < 0 or param > 1:
  232. raise ValueError(
  233. descr
  234. + " {} not supported, should be a float number in range [0, 1]".format(
  235. param
  236. )
  237. )
  238. @staticmethod
  239. def check_boolean(param, descr):
  240. if type(param).__name__ != "bool":
  241. raise ValueError(
  242. descr + " {} not supported, should be bool type".format(param)
  243. )
  244. @staticmethod
  245. def check_open_unit_interval(param, descr):
  246. if type(param).__name__ not in ["float"] or param <= 0 or param >= 1:
  247. raise ValueError(
  248. descr + " should be a numeric number between 0 and 1 exclusively"
  249. )
  250. @staticmethod
  251. def check_valid_value(param, descr, valid_values):
  252. if param not in valid_values:
  253. raise ValueError(
  254. descr
  255. + " {} is not supported, it should be in {}".format(param, valid_values)
  256. )
  257. @staticmethod
  258. def check_defined_type(param, descr, types):
  259. if type(param).__name__ not in types:
  260. raise ValueError(
  261. descr + " {} not supported, should be one of {}".format(param, types)
  262. )
  263. @staticmethod
  264. def check_and_change_lower(param, valid_list, descr=""):
  265. if type(param).__name__ != "str":
  266. raise ValueError(
  267. descr
  268. + " {} not supported, should be one of {}".format(param, valid_list)
  269. )
  270. lower_param = param.lower()
  271. if lower_param in valid_list:
  272. return lower_param
  273. else:
  274. raise ValueError(
  275. descr
  276. + " {} not supported, should be one of {}".format(param, valid_list)
  277. )
  278. @staticmethod
  279. def _greater_equal_than(value, limit):
  280. return value >= limit - settings.FLOAT_ZERO
  281. @staticmethod
  282. def _less_equal_than(value, limit):
  283. return value <= limit + settings.FLOAT_ZERO
  284. @staticmethod
  285. def _range(value, ranges):
  286. in_range = False
  287. for left_limit, right_limit in ranges:
  288. if (
  289. left_limit - settings.FLOAT_ZERO
  290. <= value
  291. <= right_limit + settings.FLOAT_ZERO
  292. ):
  293. in_range = True
  294. break
  295. return in_range
  296. @staticmethod
  297. def _in(value, right_value_list):
  298. return value in right_value_list
  299. @staticmethod
  300. def _not_in(value, wrong_value_list):
  301. return value not in wrong_value_list
  302. def _warn_deprecated_param(self, param_name, descr):
  303. if self._deprecated_params_set.get(param_name):
  304. logging.warning(
  305. f"{descr} {param_name} is deprecated and ignored in this version."
  306. )
  307. def _warn_to_deprecate_param(self, param_name, descr, new_param):
  308. if self._deprecated_params_set.get(param_name):
  309. logging.warning(
  310. f"{descr} {param_name} will be deprecated in future release; "
  311. f"please use {new_param} instead."
  312. )
  313. return True
  314. return False
  315. class ComponentBase(ABC):
  316. component_name: str
  317. def __str__(self):
  318. """
  319. {
  320. "component_name": "Begin",
  321. "params": {}
  322. }
  323. """
  324. out = getattr(self._param, self._param.output_var_name)
  325. if isinstance(out, pd.DataFrame) and "chunks" in out:
  326. del out["chunks"]
  327. setattr(self._param, self._param.output_var_name, out)
  328. return """{{
  329. "component_name": "{}",
  330. "params": {},
  331. "output": {},
  332. "inputs": {}
  333. }}""".format(self.component_name,
  334. self._param,
  335. json.dumps(json.loads(str(self._param)).get("output", {}), ensure_ascii=False),
  336. json.dumps(json.loads(str(self._param)).get("inputs", []), ensure_ascii=False)
  337. )
  338. def __init__(self, canvas, id, param: ComponentParamBase):
  339. from agent.canvas import Canvas # Local import to avoid cyclic dependency
  340. assert isinstance(canvas, Canvas), "canvas must be an instance of Canvas"
  341. self._canvas = canvas
  342. self._id = id
  343. self._param = param
  344. self._param.check()
  345. def get_dependent_components(self):
  346. cpnts = set([para["component_id"].split("@")[0] for para in self._param.query \
  347. if para.get("component_id") \
  348. and para["component_id"].lower().find("answer") < 0 \
  349. and para["component_id"].lower().find("begin") < 0])
  350. return list(cpnts)
  351. def run(self, history, **kwargs):
  352. logging.debug("{}, history: {}, kwargs: {}".format(self, json.dumps(history, ensure_ascii=False),
  353. json.dumps(kwargs, ensure_ascii=False)))
  354. self._param.debug_inputs = []
  355. try:
  356. res = self._run(history, **kwargs)
  357. self.set_output(res)
  358. except Exception as e:
  359. self.set_output(pd.DataFrame([{"content": str(e)}]))
  360. raise e
  361. return res
  362. def _run(self, history, **kwargs):
  363. raise NotImplementedError()
  364. def output(self, allow_partial=True) -> Tuple[str, Union[pd.DataFrame, partial]]:
  365. o = getattr(self._param, self._param.output_var_name)
  366. if not isinstance(o, partial):
  367. if not isinstance(o, pd.DataFrame):
  368. if isinstance(o, list):
  369. return self._param.output_var_name, pd.DataFrame(o).dropna()
  370. if o is None:
  371. return self._param.output_var_name, pd.DataFrame()
  372. return self._param.output_var_name, pd.DataFrame([{"content": str(o)}])
  373. return self._param.output_var_name, o
  374. if allow_partial or not isinstance(o, partial):
  375. if not isinstance(o, partial) and not isinstance(o, pd.DataFrame):
  376. return pd.DataFrame(o if isinstance(o, list) else [o]).dropna()
  377. return self._param.output_var_name, o
  378. outs = None
  379. for oo in o():
  380. if not isinstance(oo, pd.DataFrame):
  381. outs = pd.DataFrame(oo if isinstance(oo, list) else [oo]).dropna()
  382. else:
  383. outs = oo.dropna()
  384. return self._param.output_var_name, outs
  385. def reset(self):
  386. setattr(self._param, self._param.output_var_name, None)
  387. self._param.inputs = []
  388. def set_output(self, v):
  389. setattr(self._param, self._param.output_var_name, v)
  390. def set_infor(self, v):
  391. setattr(self._param, self._param.infor_var_name, v)
  392. def _fetch_outputs_from(self, sources: list[dict[str, Any]]) -> list[pd.DataFrame]:
  393. outs = []
  394. for q in sources:
  395. if q.get("component_id"):
  396. if "@" in q["component_id"] and q["component_id"].split("@")[0].lower().find("begin") >= 0:
  397. cpn_id, key = q["component_id"].split("@")
  398. for p in self._canvas.get_component(cpn_id)["obj"]._param.query:
  399. if p["key"] == key:
  400. outs.append(pd.DataFrame([{"content": p.get("value", "")}]))
  401. break
  402. else:
  403. assert False, f"Can't find parameter '{key}' for {cpn_id}"
  404. continue
  405. if q["component_id"].lower().find("answer") == 0:
  406. txt = []
  407. for r, c in self._canvas.history[::-1][:self._param.message_history_window_size][::-1]:
  408. txt.append(f"{r.upper()}:{c}")
  409. txt = "\n".join(txt)
  410. outs.append(pd.DataFrame([{"content": txt}]))
  411. continue
  412. outs.append(self._canvas.get_component(q["component_id"])["obj"].output(allow_partial=False)[1])
  413. elif q.get("value"):
  414. outs.append(pd.DataFrame([{"content": q["value"]}]))
  415. return outs
  416. def get_input(self):
  417. if self._param.debug_inputs:
  418. return pd.DataFrame([{"content": v["value"]} for v in self._param.debug_inputs if v.get("value")])
  419. reversed_cpnts = []
  420. if len(self._canvas.path) > 1:
  421. reversed_cpnts.extend(self._canvas.path[-2])
  422. reversed_cpnts.extend(self._canvas.path[-1])
  423. up_cpns = self.get_upstream()
  424. reversed_up_cpnts = [cpn for cpn in reversed_cpnts if cpn in up_cpns]
  425. if self._param.query:
  426. self._param.inputs = []
  427. outs = self._fetch_outputs_from(self._param.query)
  428. for out in outs:
  429. records = out.to_dict("records")
  430. content: str
  431. if len(records) > 1:
  432. content = "\n".join(
  433. [str(d["content"]) for d in records]
  434. )
  435. else:
  436. content = records[0]["content"]
  437. self._param.inputs.append({
  438. "component_id": records[0].get("component_id"),
  439. "content": content
  440. })
  441. if outs:
  442. df = pd.concat(outs, ignore_index=True)
  443. if "content" in df:
  444. df = df.drop_duplicates(subset=['content']).reset_index(drop=True)
  445. return df
  446. upstream_outs = []
  447. for u in reversed_up_cpnts[::-1]:
  448. if self.get_component_name(u) in ["switch", "concentrator"]:
  449. continue
  450. if self.component_name.lower() == "generate" and self.get_component_name(u) == "retrieval":
  451. o = self._canvas.get_component(u)["obj"].output(allow_partial=False)[1]
  452. if o is not None:
  453. o["component_id"] = u
  454. upstream_outs.append(o)
  455. continue
  456. #if self.component_name.lower()!="answer" and u not in self._canvas.get_component(self._id)["upstream"]: continue
  457. if self.component_name.lower().find("switch") < 0 \
  458. and self.get_component_name(u) in ["relevant", "categorize"]:
  459. continue
  460. if u.lower().find("answer") >= 0:
  461. for r, c in self._canvas.history[::-1]:
  462. if r == "user":
  463. upstream_outs.append(pd.DataFrame([{"content": c, "component_id": u}]))
  464. break
  465. break
  466. if self.component_name.lower().find("answer") >= 0 and self.get_component_name(u) in ["relevant"]:
  467. continue
  468. o = self._canvas.get_component(u)["obj"].output(allow_partial=False)[1]
  469. if o is not None:
  470. o["component_id"] = u
  471. upstream_outs.append(o)
  472. break
  473. assert upstream_outs, "Can't inference the where the component input is. Please identify whose output is this component's input."
  474. df = pd.concat(upstream_outs, ignore_index=True)
  475. if "content" in df:
  476. df = df.drop_duplicates(subset=['content']).reset_index(drop=True)
  477. self._param.inputs = []
  478. for _, r in df.iterrows():
  479. self._param.inputs.append({"component_id": r["component_id"], "content": r["content"]})
  480. return df
  481. def get_input_elements(self):
  482. assert self._param.query, "Please verify the input parameters first."
  483. eles = []
  484. for q in self._param.query:
  485. if q.get("component_id"):
  486. cpn_id = q["component_id"]
  487. if cpn_id.split("@")[0].lower().find("begin") >= 0:
  488. cpn_id, key = cpn_id.split("@")
  489. eles.extend(self._canvas.get_component(cpn_id)["obj"]._param.query)
  490. continue
  491. eles.append({"name": self._canvas.get_component_name(cpn_id), "key": cpn_id})
  492. else:
  493. eles.append({"key": q["value"], "name": q["value"], "value": q["value"]})
  494. return eles
  495. def get_stream_input(self):
  496. reversed_cpnts = []
  497. if len(self._canvas.path) > 1:
  498. reversed_cpnts.extend(self._canvas.path[-2])
  499. reversed_cpnts.extend(self._canvas.path[-1])
  500. up_cpns = self.get_upstream()
  501. reversed_up_cpnts = [cpn for cpn in reversed_cpnts if cpn in up_cpns]
  502. for u in reversed_up_cpnts[::-1]:
  503. if self.get_component_name(u) in ["switch", "answer"]:
  504. continue
  505. return self._canvas.get_component(u)["obj"].output()[1]
  506. @staticmethod
  507. def be_output(v):
  508. return pd.DataFrame([{"content": v}])
  509. def get_component_name(self, cpn_id):
  510. return self._canvas.get_component(cpn_id)["obj"].component_name.lower()
  511. def debug(self, **kwargs):
  512. return self._run([], **kwargs)
  513. def get_parent(self):
  514. pid = self._canvas.get_component(self._id)["parent_id"]
  515. return self._canvas.get_component(pid)["obj"]
  516. def get_upstream(self):
  517. cpn_nms = self._canvas.get_component(self._id)['upstream']
  518. return cpn_nms