| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- #
- # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import re
- from agent.component.base import ComponentBase, ComponentParamBase
-
-
- class TemplateParam(ComponentParamBase):
- """
- Define the Generate component parameters.
- """
-
- def __init__(self):
- super().__init__()
- self.content = ""
- self.parameters = []
-
- def check(self):
- self.check_empty(self.content, "[Template] Content")
- return True
-
-
- class Template(ComponentBase):
- component_name = "Template"
-
- def get_dependent_components(self):
- cpnts = set([para["component_id"].split("@")[0] for para in self._param.parameters \
- if para.get("component_id") \
- and para["component_id"].lower().find("answer") < 0 \
- and para["component_id"].lower().find("begin") < 0])
- return list(cpnts)
-
- def _run(self, history, **kwargs):
- content = self._param.content
-
- self._param.inputs = []
- for para in self._param.parameters:
- if not para.get("component_id"): continue
- component_id = para["component_id"].split("@")[0]
- if para["component_id"].lower().find("@") >= 0:
- cpn_id, key = para["component_id"].split("@")
- for p in self._canvas.get_component(cpn_id)["obj"]._param.query:
- if p["key"] == key:
- kwargs[para["key"]] = p.get("value", "")
- self._param.inputs.append(
- {"component_id": para["component_id"], "content": kwargs[para["key"]]})
- break
- else:
- assert False, f"Can't find parameter '{key}' for {cpn_id}"
- continue
-
- cpn = self._canvas.get_component(component_id)["obj"]
- if cpn.component_name.lower() == "answer":
- hist = self._canvas.get_history(1)
- if hist:
- hist = hist[0]["content"]
- else:
- hist = ""
- kwargs[para["key"]] = hist
- continue
-
- _, out = cpn.output(allow_partial=False)
- if "content" not in out.columns:
- kwargs[para["key"]] = ""
- else:
- kwargs[para["key"]] = " - "+"\n - ".join([o if isinstance(o, str) else str(o) for o in out["content"]])
- self._param.inputs.append({"component_id": para["component_id"], "content": kwargs[para["key"]]})
-
- for n, v in kwargs.items():
- content = re.sub(r"\{%s\}" % re.escape(n), str(v).replace("\\", " "), content)
-
- return Template.be_output(content)
|