Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

canvas.py 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. #
  2. # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import json
  17. import traceback
  18. from abc import ABC
  19. from copy import deepcopy
  20. from functools import partial
  21. from agent.component import component_class
  22. from agent.component.base import ComponentBase
  23. from agent.settings import flow_logger, DEBUG
  24. class Canvas(ABC):
  25. """
  26. dsl = {
  27. "components": {
  28. "begin": {
  29. "obj":{
  30. "component_name": "Begin",
  31. "params": {},
  32. },
  33. "downstream": ["answer_0"],
  34. "upstream": [],
  35. },
  36. "answer_0": {
  37. "obj": {
  38. "component_name": "Answer",
  39. "params": {}
  40. },
  41. "downstream": ["retrieval_0"],
  42. "upstream": ["begin", "generate_0"],
  43. },
  44. "retrieval_0": {
  45. "obj": {
  46. "component_name": "Retrieval",
  47. "params": {}
  48. },
  49. "downstream": ["generate_0"],
  50. "upstream": ["answer_0"],
  51. },
  52. "generate_0": {
  53. "obj": {
  54. "component_name": "Generate",
  55. "params": {}
  56. },
  57. "downstream": ["answer_0"],
  58. "upstream": ["retrieval_0"],
  59. }
  60. },
  61. "history": [],
  62. "messages": [],
  63. "reference": [],
  64. "path": [["begin"]],
  65. "answer": []
  66. }
  67. """
  68. def __init__(self, dsl: str, tenant_id=None):
  69. self.path = []
  70. self.history = []
  71. self.messages = []
  72. self.answer = []
  73. self.components = {}
  74. self.dsl = json.loads(dsl) if dsl else {
  75. "components": {
  76. "begin": {
  77. "obj": {
  78. "component_name": "Begin",
  79. "params": {
  80. "prologue": "Hi there!"
  81. }
  82. },
  83. "downstream": [],
  84. "upstream": []
  85. }
  86. },
  87. "history": [],
  88. "messages": [],
  89. "reference": [],
  90. "path": [],
  91. "answer": []
  92. }
  93. self._tenant_id = tenant_id
  94. self._embed_id = ""
  95. self.load()
  96. def load(self):
  97. self.components = self.dsl["components"]
  98. cpn_nms = set([])
  99. for k, cpn in self.components.items():
  100. cpn_nms.add(cpn["obj"]["component_name"])
  101. assert "Begin" in cpn_nms, "There have to be an 'Begin' component."
  102. assert "Answer" in cpn_nms, "There have to be an 'Answer' component."
  103. for k, cpn in self.components.items():
  104. cpn_nms.add(cpn["obj"]["component_name"])
  105. param = component_class(cpn["obj"]["component_name"] + "Param")()
  106. param.update(cpn["obj"]["params"])
  107. param.check()
  108. cpn["obj"] = component_class(cpn["obj"]["component_name"])(self, k, param)
  109. if cpn["obj"].component_name == "Categorize":
  110. for _, desc in param.category_description.items():
  111. if desc["to"] not in cpn["downstream"]:
  112. cpn["downstream"].append(desc["to"])
  113. self.path = self.dsl["path"]
  114. self.history = self.dsl["history"]
  115. self.messages = self.dsl["messages"]
  116. self.answer = self.dsl["answer"]
  117. self.reference = self.dsl["reference"]
  118. self._embed_id = self.dsl.get("embed_id", "")
  119. def __str__(self):
  120. self.dsl["path"] = self.path
  121. self.dsl["history"] = self.history
  122. self.dsl["messages"] = self.messages
  123. self.dsl["answer"] = self.answer
  124. self.dsl["reference"] = self.reference
  125. self.dsl["embed_id"] = self._embed_id
  126. dsl = {
  127. "components": {}
  128. }
  129. for k in self.dsl.keys():
  130. if k in ["components"]:continue
  131. dsl[k] = deepcopy(self.dsl[k])
  132. for k, cpn in self.components.items():
  133. if k not in dsl["components"]:
  134. dsl["components"][k] = {}
  135. for c in cpn.keys():
  136. if c == "obj":
  137. dsl["components"][k][c] = json.loads(str(cpn["obj"]))
  138. continue
  139. dsl["components"][k][c] = deepcopy(cpn[c])
  140. return json.dumps(dsl, ensure_ascii=False)
  141. def reset(self):
  142. self.path = []
  143. self.history = []
  144. self.messages = []
  145. self.answer = []
  146. self.reference = []
  147. for k, cpn in self.components.items():
  148. self.components[k]["obj"].reset()
  149. self._embed_id = ""
  150. def run(self, **kwargs):
  151. ans = ""
  152. if self.answer:
  153. cpn_id = self.answer[0]
  154. self.answer.pop(0)
  155. try:
  156. ans = self.components[cpn_id]["obj"].run(self.history, **kwargs)
  157. except Exception as e:
  158. ans = ComponentBase.be_output(str(e))
  159. self.path[-1].append(cpn_id)
  160. if kwargs.get("stream"):
  161. assert isinstance(ans, partial)
  162. return ans
  163. self.history.append(("assistant", ans.to_dict("records")))
  164. return ans
  165. if not self.path:
  166. self.components["begin"]["obj"].run(self.history, **kwargs)
  167. self.path.append(["begin"])
  168. self.path.append([])
  169. ran = -1
  170. def prepare2run(cpns):
  171. nonlocal ran, ans
  172. for c in cpns:
  173. if self.path[-1] and c == self.path[-1][-1]: continue
  174. cpn = self.components[c]["obj"]
  175. if cpn.component_name == "Answer":
  176. self.answer.append(c)
  177. else:
  178. if DEBUG: print("RUN: ", c)
  179. cpids = cpn.get_dependent_components()
  180. if any([c not in self.path[-1] for c in cpids]):
  181. continue
  182. ans = cpn.run(self.history, **kwargs)
  183. self.path[-1].append(c)
  184. ran += 1
  185. prepare2run(self.components[self.path[-2][-1]]["downstream"])
  186. while 0 <= ran < len(self.path[-1]):
  187. if DEBUG: print(ran, self.path)
  188. cpn_id = self.path[-1][ran]
  189. cpn = self.get_component(cpn_id)
  190. if not cpn["downstream"]: break
  191. loop = self._find_loop()
  192. if loop: raise OverflowError(f"Too much loops: {loop}")
  193. if cpn["obj"].component_name.lower() in ["switch", "categorize", "relevant"]:
  194. switch_out = cpn["obj"].output()[1].iloc[0, 0]
  195. assert switch_out in self.components, \
  196. "{}'s output: {} not valid.".format(cpn_id, switch_out)
  197. try:
  198. prepare2run([switch_out])
  199. except Exception as e:
  200. for p in [c for p in self.path for c in p][::-1]:
  201. if p.lower().find("answer") >= 0:
  202. self.get_component(p)["obj"].set_exception(e)
  203. prepare2run([p])
  204. break
  205. traceback.print_exc()
  206. break
  207. continue
  208. try:
  209. prepare2run(cpn["downstream"])
  210. except Exception as e:
  211. for p in [c for p in self.path for c in p][::-1]:
  212. if p.lower().find("answer") >= 0:
  213. self.get_component(p)["obj"].set_exception(e)
  214. prepare2run([p])
  215. break
  216. traceback.print_exc()
  217. break
  218. if self.answer:
  219. cpn_id = self.answer[0]
  220. self.answer.pop(0)
  221. ans = self.components[cpn_id]["obj"].run(self.history, **kwargs)
  222. self.path[-1].append(cpn_id)
  223. if kwargs.get("stream"):
  224. assert isinstance(ans, partial)
  225. return ans
  226. self.history.append(("assistant", ans.to_dict("records")))
  227. return ans
  228. def get_component(self, cpn_id):
  229. return self.components[cpn_id]
  230. def get_tenant_id(self):
  231. return self._tenant_id
  232. def get_history(self, window_size):
  233. convs = []
  234. for role, obj in self.history[window_size * -1:]:
  235. if isinstance(obj, list) and obj and all([isinstance(o, dict) for o in obj]):
  236. convs.append({"role": role, "content": '\n'.join([str(s.get("content", "")) for s in obj])})
  237. else:
  238. convs.append({"role": role, "content": str(obj)})
  239. return convs
  240. def add_user_input(self, question):
  241. self.history.append(("user", question))
  242. def set_embedding_model(self, embed_id):
  243. self._embed_id = embed_id
  244. def get_embedding_model(self):
  245. return self._embed_id
  246. def _find_loop(self, max_loops=6):
  247. path = self.path[-1][::-1]
  248. if len(path) < 2: return False
  249. for i in range(len(path)):
  250. if path[i].lower().find("answer") >= 0:
  251. path = path[:i]
  252. break
  253. if len(path) < 2: return False
  254. for l in range(2, len(path) // 2):
  255. pat = ",".join(path[0:l])
  256. path_str = ",".join(path)
  257. if len(pat) >= len(path_str): return False
  258. loop = max_loops
  259. while path_str.find(pat) == 0 and loop >= 0:
  260. loop -= 1
  261. if len(pat)+1 >= len(path_str):
  262. return False
  263. path_str = path_str[len(pat)+1:]
  264. if loop < 0:
  265. pat = " => ".join([p.split(":")[0] for p in path[0:l]])
  266. return pat + " => " + pat
  267. return False
  268. def get_prologue(self):
  269. return self.components["begin"]["obj"]._param.prologue