| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- #
- # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import logging
- import json
- from copy import deepcopy
- from functools import partial
- import pandas as pd
-
- from agent.component import component_class
- from agent.component.base import ComponentBase
-
-
- class Canvas:
- """
- dsl = {
- "components": {
- "begin": {
- "obj":{
- "component_name": "Begin",
- "params": {},
- },
- "downstream": ["answer_0"],
- "upstream": [],
- },
- "answer_0": {
- "obj": {
- "component_name": "Answer",
- "params": {}
- },
- "downstream": ["retrieval_0"],
- "upstream": ["begin", "generate_0"],
- },
- "retrieval_0": {
- "obj": {
- "component_name": "Retrieval",
- "params": {}
- },
- "downstream": ["generate_0"],
- "upstream": ["answer_0"],
- },
- "generate_0": {
- "obj": {
- "component_name": "Generate",
- "params": {}
- },
- "downstream": ["answer_0"],
- "upstream": ["retrieval_0"],
- }
- },
- "history": [],
- "messages": [],
- "reference": [],
- "path": [["begin"]],
- "answer": []
- }
- """
-
- def __init__(self, dsl: str, tenant_id=None):
- self.path = []
- self.history = []
- self.messages = []
- self.answer = []
- self.components = {}
- self.dsl = json.loads(dsl) if dsl else {
- "components": {
- "begin": {
- "obj": {
- "component_name": "Begin",
- "params": {
- "prologue": "Hi there!"
- }
- },
- "downstream": [],
- "upstream": [],
- "parent_id": ""
- }
- },
- "history": [],
- "messages": [],
- "reference": [],
- "path": [],
- "answer": []
- }
- self._tenant_id = tenant_id
- self._embed_id = ""
- self.load()
-
- def load(self):
- self.components = self.dsl["components"]
- cpn_nms = set([])
- for k, cpn in self.components.items():
- cpn_nms.add(cpn["obj"]["component_name"])
-
- assert "Begin" in cpn_nms, "There have to be an 'Begin' component."
- assert "Answer" in cpn_nms, "There have to be an 'Answer' component."
-
- for k, cpn in self.components.items():
- cpn_nms.add(cpn["obj"]["component_name"])
- param = component_class(cpn["obj"]["component_name"] + "Param")()
- param.update(cpn["obj"]["params"])
- param.check()
- cpn["obj"] = component_class(cpn["obj"]["component_name"])(self, k, param)
- if cpn["obj"].component_name == "Categorize":
- for _, desc in param.category_description.items():
- if desc["to"] not in cpn["downstream"]:
- cpn["downstream"].append(desc["to"])
-
- self.path = self.dsl["path"]
- self.history = self.dsl["history"]
- self.messages = self.dsl["messages"]
- self.answer = self.dsl["answer"]
- self.reference = self.dsl["reference"]
- self._embed_id = self.dsl.get("embed_id", "")
-
- def __str__(self):
- self.dsl["path"] = self.path
- self.dsl["history"] = self.history
- self.dsl["messages"] = self.messages
- self.dsl["answer"] = self.answer
- self.dsl["reference"] = self.reference
- self.dsl["embed_id"] = self._embed_id
- dsl = {
- "components": {}
- }
- for k in self.dsl.keys():
- if k in ["components"]:
- continue
- dsl[k] = deepcopy(self.dsl[k])
-
- for k, cpn in self.components.items():
- if k not in dsl["components"]:
- dsl["components"][k] = {}
- for c in cpn.keys():
- if c == "obj":
- dsl["components"][k][c] = json.loads(str(cpn["obj"]))
- continue
- dsl["components"][k][c] = deepcopy(cpn[c])
- return json.dumps(dsl, ensure_ascii=False)
-
- def reset(self):
- self.path = []
- self.history = []
- self.messages = []
- self.answer = []
- self.reference = []
- for k, cpn in self.components.items():
- self.components[k]["obj"].reset()
- self._embed_id = ""
-
- def get_component_name(self, cid):
- for n in self.dsl["graph"]["nodes"]:
- if cid == n["id"]:
- return n["data"]["name"]
- return ""
-
- def run(self, running_hint_text = "is running...🕞", **kwargs):
- if not running_hint_text or not isinstance(running_hint_text, str):
- running_hint_text = "is running...🕞"
- bypass_begin = bool(kwargs.get("bypass_begin", False))
-
- if self.answer:
- cpn_id = self.answer[0]
- self.answer.pop(0)
- try:
- ans = self.components[cpn_id]["obj"].run(self.history, **kwargs)
- except Exception as e:
- ans = ComponentBase.be_output(str(e))
- self.path[-1].append(cpn_id)
- if kwargs.get("stream"):
- for an in ans():
- yield an
- else:
- yield ans
- return
-
- if not self.path:
- self.components["begin"]["obj"].run(self.history, **kwargs)
- self.path.append(["begin"])
- if bypass_begin:
- cpn = self.get_component("begin")
- downstream = cpn["downstream"]
- self.path.append(downstream)
-
-
-
- self.path.append([])
-
- ran = -1
- waiting = []
- without_dependent_checking = []
-
- def prepare2run(cpns):
- nonlocal ran, ans
- for c in cpns:
- if self.path[-1] and c == self.path[-1][-1]:
- continue
- cpn = self.components[c]["obj"]
- if cpn.component_name == "Answer":
- self.answer.append(c)
- else:
- logging.debug(f"Canvas.prepare2run: {c}")
- if c not in without_dependent_checking:
- cpids = cpn.get_dependent_components()
- if any([cc not in self.path[-1] for cc in cpids]):
- if c not in waiting:
- waiting.append(c)
- continue
- yield "*'{}'* {}".format(self.get_component_name(c), running_hint_text)
-
- if cpn.component_name.lower() == "iteration":
- st_cpn = cpn.get_start()
- assert st_cpn, "Start component not found for Iteration."
- if not st_cpn["obj"].end():
- cpn = st_cpn["obj"]
- c = cpn._id
-
- try:
- ans = cpn.run(self.history, **kwargs)
- except Exception as e:
- logging.exception(f"Canvas.run got exception: {e}")
- self.path[-1].append(c)
- ran += 1
- raise e
- self.path[-1].append(c)
-
- ran += 1
-
- downstream = self.components[self.path[-2][-1]]["downstream"]
- if not downstream and self.components[self.path[-2][-1]].get("parent_id"):
- cid = self.path[-2][-1]
- pid = self.components[cid]["parent_id"]
- o, _ = self.components[cid]["obj"].output(allow_partial=False)
- oo, _ = self.components[pid]["obj"].output(allow_partial=False)
- self.components[pid]["obj"].set_output(pd.concat([oo, o], ignore_index=True).dropna())
- downstream = [pid]
-
- for m in prepare2run(downstream):
- yield {"content": m, "running_status": True}
-
- while 0 <= ran < len(self.path[-1]):
- logging.debug(f"Canvas.run: {ran} {self.path}")
- cpn_id = self.path[-1][ran]
- cpn = self.get_component(cpn_id)
- if not any([cpn["downstream"], cpn.get("parent_id"), waiting]):
- break
-
- loop = self._find_loop()
- if loop:
- raise OverflowError(f"Too much loops: {loop}")
-
- downstream = []
- if cpn["obj"].component_name.lower() in ["switch", "categorize", "relevant"]:
- switch_out = cpn["obj"].output()[1].iloc[0, 0]
- assert switch_out in self.components, \
- "{}'s output: {} not valid.".format(cpn_id, switch_out)
- downstream = [switch_out]
- else:
- downstream = cpn["downstream"]
-
- if not downstream and cpn.get("parent_id"):
- pid = cpn["parent_id"]
- _, o = cpn["obj"].output(allow_partial=False)
- _, oo = self.components[pid]["obj"].output(allow_partial=False)
- self.components[pid]["obj"].set_output(pd.concat([oo.dropna(axis=1), o.dropna(axis=1)], ignore_index=True).dropna())
- downstream = [pid]
-
- for m in prepare2run(downstream):
- yield {"content": m, "running_status": True}
-
- if ran >= len(self.path[-1]) and waiting:
- without_dependent_checking = waiting
- waiting = []
- for m in prepare2run(without_dependent_checking):
- yield {"content": m, "running_status": True}
- without_dependent_checking = []
- ran -= 1
-
- if self.answer:
- cpn_id = self.answer[0]
- self.answer.pop(0)
- ans = self.components[cpn_id]["obj"].run(self.history, **kwargs)
- self.path[-1].append(cpn_id)
- if kwargs.get("stream"):
- assert isinstance(ans, partial)
- for an in ans():
- yield an
- else:
- yield ans
-
- else:
- raise Exception("The dialog flow has no way to interact with you. Please add an 'Interact' component to the end of the flow.")
-
- def get_component(self, cpn_id):
- return self.components[cpn_id]
-
- def get_tenant_id(self):
- return self._tenant_id
-
- def get_history(self, window_size):
- convs = []
- if window_size <= 0:
- return convs
- for role, obj in self.history[window_size * -1:]:
- if isinstance(obj, list) and obj and all([isinstance(o, dict) for o in obj]):
- convs.append({"role": role, "content": '\n'.join([str(s.get("content", "")) for s in obj])})
- else:
- convs.append({"role": role, "content": str(obj)})
- return convs
-
- def add_user_input(self, question):
- self.history.append(("user", question))
-
- def set_embedding_model(self, embed_id):
- self._embed_id = embed_id
-
- def get_embedding_model(self):
- return self._embed_id
-
- def _find_loop(self, max_loops=6):
- path = self.path[-1][::-1]
- if len(path) < 2:
- return False
-
- for i in range(len(path)):
- if path[i].lower().find("answer") == 0 or path[i].lower().find("iterationitem") == 0:
- path = path[:i]
- break
-
- if len(path) < 2:
- return False
-
- for loc in range(2, len(path) // 2):
- pat = ",".join(path[0:loc])
- path_str = ",".join(path)
- if len(pat) >= len(path_str):
- return False
- loop = max_loops
- while path_str.find(pat) == 0 and loop >= 0:
- loop -= 1
- if len(pat)+1 >= len(path_str):
- return False
- path_str = path_str[len(pat)+1:]
- if loop < 0:
- pat = " => ".join([p.split(":")[0] for p in path[0:loc]])
- return pat + " => " + pat
-
- return False
-
- def get_prologue(self):
- return self.components["begin"]["obj"]._param.prologue
-
- def set_global_param(self, **kwargs):
- for k, v in kwargs.items():
- for q in self.components["begin"]["obj"]._param.query:
- if k != q["key"]:
- continue
- q["value"] = v
-
- def get_preset_param(self):
- return self.components["begin"]["obj"]._param.query
-
- def get_component_input_elements(self, cpnnm):
- return self.components[cpnnm]["obj"].get_input_elements()
-
- def set_component_infor(self, cpn_id, infor):
- self.components[cpn_id]["obj"].set_infor(infor)
|