You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. #
  2. # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import logging
  17. import json
  18. from copy import deepcopy
  19. from functools import partial
  20. import pandas as pd
  21. from agent.component import component_class
  22. from agent.component.base import ComponentBase
  23. class Canvas:
  24. """
  25. dsl = {
  26. "components": {
  27. "begin": {
  28. "obj":{
  29. "component_name": "Begin",
  30. "params": {},
  31. },
  32. "downstream": ["answer_0"],
  33. "upstream": [],
  34. },
  35. "answer_0": {
  36. "obj": {
  37. "component_name": "Answer",
  38. "params": {}
  39. },
  40. "downstream": ["retrieval_0"],
  41. "upstream": ["begin", "generate_0"],
  42. },
  43. "retrieval_0": {
  44. "obj": {
  45. "component_name": "Retrieval",
  46. "params": {}
  47. },
  48. "downstream": ["generate_0"],
  49. "upstream": ["answer_0"],
  50. },
  51. "generate_0": {
  52. "obj": {
  53. "component_name": "Generate",
  54. "params": {}
  55. },
  56. "downstream": ["answer_0"],
  57. "upstream": ["retrieval_0"],
  58. }
  59. },
  60. "history": [],
  61. "messages": [],
  62. "reference": [],
  63. "path": [["begin"]],
  64. "answer": []
  65. }
  66. """
  67. def __init__(self, dsl: str, tenant_id=None):
  68. self.path = []
  69. self.history = []
  70. self.messages = []
  71. self.answer = []
  72. self.components = {}
  73. self.dsl = json.loads(dsl) if dsl else {
  74. "components": {
  75. "begin": {
  76. "obj": {
  77. "component_name": "Begin",
  78. "params": {
  79. "prologue": "Hi there!"
  80. }
  81. },
  82. "downstream": [],
  83. "upstream": [],
  84. "parent_id": ""
  85. }
  86. },
  87. "history": [],
  88. "messages": [],
  89. "reference": [],
  90. "path": [],
  91. "answer": []
  92. }
  93. self._tenant_id = tenant_id
  94. self._embed_id = ""
  95. self.load()
  96. def load(self):
  97. self.components = self.dsl["components"]
  98. cpn_nms = set([])
  99. for k, cpn in self.components.items():
  100. cpn_nms.add(cpn["obj"]["component_name"])
  101. assert "Begin" in cpn_nms, "There have to be an 'Begin' component."
  102. assert "Answer" in cpn_nms, "There have to be an 'Answer' component."
  103. for k, cpn in self.components.items():
  104. cpn_nms.add(cpn["obj"]["component_name"])
  105. param = component_class(cpn["obj"]["component_name"] + "Param")()
  106. param.update(cpn["obj"]["params"])
  107. param.check()
  108. cpn["obj"] = component_class(cpn["obj"]["component_name"])(self, k, param)
  109. if cpn["obj"].component_name == "Categorize":
  110. for _, desc in param.category_description.items():
  111. if desc["to"] not in cpn["downstream"]:
  112. cpn["downstream"].append(desc["to"])
  113. self.path = self.dsl["path"]
  114. self.history = self.dsl["history"]
  115. self.messages = self.dsl["messages"]
  116. self.answer = self.dsl["answer"]
  117. self.reference = self.dsl["reference"]
  118. self._embed_id = self.dsl.get("embed_id", "")
  119. def __str__(self):
  120. self.dsl["path"] = self.path
  121. self.dsl["history"] = self.history
  122. self.dsl["messages"] = self.messages
  123. self.dsl["answer"] = self.answer
  124. self.dsl["reference"] = self.reference
  125. self.dsl["embed_id"] = self._embed_id
  126. dsl = {
  127. "components": {}
  128. }
  129. for k in self.dsl.keys():
  130. if k in ["components"]:
  131. continue
  132. dsl[k] = deepcopy(self.dsl[k])
  133. for k, cpn in self.components.items():
  134. if k not in dsl["components"]:
  135. dsl["components"][k] = {}
  136. for c in cpn.keys():
  137. if c == "obj":
  138. dsl["components"][k][c] = json.loads(str(cpn["obj"]))
  139. continue
  140. dsl["components"][k][c] = deepcopy(cpn[c])
  141. return json.dumps(dsl, ensure_ascii=False)
  142. def reset(self):
  143. self.path = []
  144. self.history = []
  145. self.messages = []
  146. self.answer = []
  147. self.reference = []
  148. for k, cpn in self.components.items():
  149. self.components[k]["obj"].reset()
  150. self._embed_id = ""
  151. def get_component_name(self, cid):
  152. for n in self.dsl["graph"]["nodes"]:
  153. if cid == n["id"]:
  154. return n["data"]["name"]
  155. return ""
  156. def run(self, running_hint_text = "is running...🕞", **kwargs):
  157. if not running_hint_text or not isinstance(running_hint_text, str):
  158. running_hint_text = "is running...🕞"
  159. bypass_begin = bool(kwargs.get("bypass_begin", False))
  160. if self.answer:
  161. cpn_id = self.answer[0]
  162. self.answer.pop(0)
  163. try:
  164. ans = self.components[cpn_id]["obj"].run(self.history, **kwargs)
  165. except Exception as e:
  166. ans = ComponentBase.be_output(str(e))
  167. self.path[-1].append(cpn_id)
  168. if kwargs.get("stream"):
  169. for an in ans():
  170. yield an
  171. else:
  172. yield ans
  173. return
  174. if not self.path:
  175. self.components["begin"]["obj"].run(self.history, **kwargs)
  176. self.path.append(["begin"])
  177. if bypass_begin:
  178. cpn = self.get_component("begin")
  179. downstream = cpn["downstream"]
  180. self.path.append(downstream)
  181. self.path.append([])
  182. ran = -1
  183. waiting = []
  184. without_dependent_checking = []
  185. def prepare2run(cpns):
  186. nonlocal ran, ans
  187. for c in cpns:
  188. if self.path[-1] and c == self.path[-1][-1]:
  189. continue
  190. cpn = self.components[c]["obj"]
  191. if cpn.component_name == "Answer":
  192. self.answer.append(c)
  193. else:
  194. logging.debug(f"Canvas.prepare2run: {c}")
  195. if c not in without_dependent_checking:
  196. cpids = cpn.get_dependent_components()
  197. if any([cc not in self.path[-1] for cc in cpids]):
  198. if c not in waiting:
  199. waiting.append(c)
  200. continue
  201. yield "*'{}'* {}".format(self.get_component_name(c), running_hint_text)
  202. if cpn.component_name.lower() == "iteration":
  203. st_cpn = cpn.get_start()
  204. assert st_cpn, "Start component not found for Iteration."
  205. if not st_cpn["obj"].end():
  206. cpn = st_cpn["obj"]
  207. c = cpn._id
  208. try:
  209. ans = cpn.run(self.history, **kwargs)
  210. except Exception as e:
  211. logging.exception(f"Canvas.run got exception: {e}")
  212. self.path[-1].append(c)
  213. ran += 1
  214. raise e
  215. self.path[-1].append(c)
  216. ran += 1
  217. downstream = self.components[self.path[-2][-1]]["downstream"]
  218. if not downstream and self.components[self.path[-2][-1]].get("parent_id"):
  219. cid = self.path[-2][-1]
  220. pid = self.components[cid]["parent_id"]
  221. o, _ = self.components[cid]["obj"].output(allow_partial=False)
  222. oo, _ = self.components[pid]["obj"].output(allow_partial=False)
  223. self.components[pid]["obj"].set_output(pd.concat([oo, o], ignore_index=True).dropna())
  224. downstream = [pid]
  225. for m in prepare2run(downstream):
  226. yield {"content": m, "running_status": True}
  227. while 0 <= ran < len(self.path[-1]):
  228. logging.debug(f"Canvas.run: {ran} {self.path}")
  229. cpn_id = self.path[-1][ran]
  230. cpn = self.get_component(cpn_id)
  231. if not any([cpn["downstream"], cpn.get("parent_id"), waiting]):
  232. break
  233. loop = self._find_loop()
  234. if loop:
  235. raise OverflowError(f"Too much loops: {loop}")
  236. downstream = []
  237. if cpn["obj"].component_name.lower() in ["switch", "categorize", "relevant"]:
  238. switch_out = cpn["obj"].output()[1].iloc[0, 0]
  239. assert switch_out in self.components, \
  240. "{}'s output: {} not valid.".format(cpn_id, switch_out)
  241. downstream = [switch_out]
  242. else:
  243. downstream = cpn["downstream"]
  244. if not downstream and cpn.get("parent_id"):
  245. pid = cpn["parent_id"]
  246. _, o = cpn["obj"].output(allow_partial=False)
  247. _, oo = self.components[pid]["obj"].output(allow_partial=False)
  248. self.components[pid]["obj"].set_output(pd.concat([oo.dropna(axis=1), o.dropna(axis=1)], ignore_index=True).dropna())
  249. downstream = [pid]
  250. for m in prepare2run(downstream):
  251. yield {"content": m, "running_status": True}
  252. if ran >= len(self.path[-1]) and waiting:
  253. without_dependent_checking = waiting
  254. waiting = []
  255. for m in prepare2run(without_dependent_checking):
  256. yield {"content": m, "running_status": True}
  257. without_dependent_checking = []
  258. ran -= 1
  259. if self.answer:
  260. cpn_id = self.answer[0]
  261. self.answer.pop(0)
  262. ans = self.components[cpn_id]["obj"].run(self.history, **kwargs)
  263. self.path[-1].append(cpn_id)
  264. if kwargs.get("stream"):
  265. assert isinstance(ans, partial)
  266. for an in ans():
  267. yield an
  268. else:
  269. yield ans
  270. else:
  271. raise Exception("The dialog flow has no way to interact with you. Please add an 'Interact' component to the end of the flow.")
  272. def get_component(self, cpn_id):
  273. return self.components[cpn_id]
  274. def get_tenant_id(self):
  275. return self._tenant_id
  276. def get_history(self, window_size):
  277. convs = []
  278. if window_size <= 0:
  279. return convs
  280. for role, obj in self.history[window_size * -1:]:
  281. if isinstance(obj, list) and obj and all([isinstance(o, dict) for o in obj]):
  282. convs.append({"role": role, "content": '\n'.join([str(s.get("content", "")) for s in obj])})
  283. else:
  284. convs.append({"role": role, "content": str(obj)})
  285. return convs
  286. def add_user_input(self, question):
  287. self.history.append(("user", question))
  288. def set_embedding_model(self, embed_id):
  289. self._embed_id = embed_id
  290. def get_embedding_model(self):
  291. return self._embed_id
  292. def _find_loop(self, max_loops=6):
  293. path = self.path[-1][::-1]
  294. if len(path) < 2:
  295. return False
  296. for i in range(len(path)):
  297. if path[i].lower().find("answer") == 0 or path[i].lower().find("iterationitem") == 0:
  298. path = path[:i]
  299. break
  300. if len(path) < 2:
  301. return False
  302. for loc in range(2, len(path) // 2):
  303. pat = ",".join(path[0:loc])
  304. path_str = ",".join(path)
  305. if len(pat) >= len(path_str):
  306. return False
  307. loop = max_loops
  308. while path_str.find(pat) == 0 and loop >= 0:
  309. loop -= 1
  310. if len(pat)+1 >= len(path_str):
  311. return False
  312. path_str = path_str[len(pat)+1:]
  313. if loop < 0:
  314. pat = " => ".join([p.split(":")[0] for p in path[0:loc]])
  315. return pat + " => " + pat
  316. return False
  317. def get_prologue(self):
  318. return self.components["begin"]["obj"]._param.prologue
  319. def set_global_param(self, **kwargs):
  320. for k, v in kwargs.items():
  321. for q in self.components["begin"]["obj"]._param.query:
  322. if k != q["key"]:
  323. continue
  324. q["value"] = v
  325. def get_preset_param(self):
  326. return self.components["begin"]["obj"]._param.query
  327. def get_component_input_elements(self, cpnnm):
  328. return self.components[cpnnm]["obj"].get_input_elements()
  329. def set_component_infor(self, cpn_id, infor):
  330. self.components[cpn_id]["obj"].set_infor(infor)