| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- #
- # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import json
- import re
- from agent.component.base import ComponentBase, ComponentParamBase
- from jinja2 import Template as Jinja2Template
-
-
- class TemplateParam(ComponentParamBase):
- """
- Define the Generate component parameters.
- """
-
- def __init__(self):
- super().__init__()
- self.content = ""
- self.parameters = []
-
- def check(self):
- self.check_empty(self.content, "[Template] Content")
- return True
-
-
- class Template(ComponentBase):
- component_name = "Template"
-
- def get_dependent_components(self):
- inputs = self.get_input_elements()
- cpnts = set([i["key"] for i in inputs if i["key"].lower().find("answer") < 0 and i["key"].lower().find("begin") < 0])
- return list(cpnts)
-
- def get_input_elements(self):
- key_set = set([])
- res = []
- for r in re.finditer(r"\{([a-z]+[:@][a-z0-9_-]+)\}", self._param.content, flags=re.IGNORECASE):
- cpn_id = r.group(1)
- if cpn_id in key_set:
- continue
- if cpn_id.lower().find("begin@") == 0:
- cpn_id, key = cpn_id.split("@")
- for p in self._canvas.get_component(cpn_id)["obj"]._param.query:
- if p["key"] != key:
- continue
- res.append({"key": r.group(1), "name": p["name"]})
- key_set.add(r.group(1))
- continue
- cpn_nm = self._canvas.get_component_name(cpn_id)
- if not cpn_nm:
- continue
- res.append({"key": cpn_id, "name": cpn_nm})
- key_set.add(cpn_id)
- return res
-
- def _run(self, history, **kwargs):
- content = self._param.content
-
- self._param.inputs = []
- for para in self.get_input_elements():
- if para["key"].lower().find("begin@") == 0:
- cpn_id, key = para["key"].split("@")
- for p in self._canvas.get_component(cpn_id)["obj"]._param.query:
- if p["key"] == key:
- value = p.get("value", "")
- self.make_kwargs(para, kwargs, value)
- break
- else:
- assert False, f"Can't find parameter '{key}' for {cpn_id}"
- continue
-
- component_id = para["key"]
- cpn = self._canvas.get_component(component_id)["obj"]
- if cpn.component_name.lower() == "answer":
- hist = self._canvas.get_history(1)
- if hist:
- hist = hist[0]["content"]
- else:
- hist = ""
- self.make_kwargs(para, kwargs, hist)
- continue
-
- _, out = cpn.output(allow_partial=False)
-
- result = ""
- if "content" in out.columns:
- result = "\n".join(
- [o if isinstance(o, str) else str(o) for o in out["content"]]
- )
-
- self.make_kwargs(para, kwargs, result)
-
- template = Jinja2Template(content)
-
- try:
- content = template.render(kwargs)
- except Exception:
- pass
-
- for n, v in kwargs.items():
- if not isinstance(v, str):
- try:
- v = json.dumps(v, ensure_ascii=False)
- except Exception:
- pass
- content = re.sub(
- r"\{%s\}" % re.escape(n), v, content
- )
- content = re.sub(
- r"(#+)", r" \1 ", content
- )
-
- return Template.be_output(content)
-
- def make_kwargs(self, para, kwargs, value):
- self._param.inputs.append(
- {"component_id": para["key"], "content": value}
- )
- try:
- value = json.loads(value)
- except Exception:
- pass
- kwargs[para["key"]] = value
|