### What problem does this PR solve? #9422 ### Type of change - [x] New Feature (non-breaking change which adds functionality)tags/v0.20.2
| def complete(): | def complete(): | ||||
| nonlocal hist | nonlocal hist | ||||
| need2cite = self._canvas.get_reference()["chunks"] and self._id.find("-->") < 0 | |||||
| need2cite = self._param.cite and self._canvas.get_reference()["chunks"] and self._id.find("-->") < 0 | |||||
| cited = False | cited = False | ||||
| if hist[0]["role"] == "system" and need2cite: | if hist[0]["role"] == "system" and need2cite: | ||||
| if len(hist) < 7: | if len(hist) < 7: |
| prompt = self.string_format(prompt, args) | prompt = self.string_format(prompt, args) | ||||
| for m in msg: | for m in msg: | ||||
| m["content"] = self.string_format(m["content"], args) | m["content"] = self.string_format(m["content"], args) | ||||
| if self._canvas.get_reference()["chunks"]: | |||||
| if self._param.cite and self._canvas.get_reference()["chunks"]: | |||||
| prompt += citation_prompt() | prompt += citation_prompt() | ||||
| return prompt, msg | return prompt, msg |
| nonlocal remain_candidates_to_resolve, callback | nonlocal remain_candidates_to_resolve, callback | ||||
| async with semaphore: | async with semaphore: | ||||
| try: | try: | ||||
| with trio.move_on_after(180) as cancel_scope: | |||||
| with trio.move_on_after(280) as cancel_scope: | |||||
| await self._resolve_candidate(candidate_batch, result_set, result_lock) | await self._resolve_candidate(candidate_batch, result_set, result_lock) | ||||
| remain_candidates_to_resolve = remain_candidates_to_resolve - len(candidate_batch[1]) | remain_candidates_to_resolve = remain_candidates_to_resolve - len(candidate_batch[1]) | ||||
| callback(msg=f"Resolved {len(candidate_batch[1])} pairs, {remain_candidates_to_resolve} are remained to resolve. ") | callback(msg=f"Resolved {len(candidate_batch[1])} pairs, {remain_candidates_to_resolve} are remained to resolve. ") | ||||
| logging.info(f"Created resolution prompt {len(text)} bytes for {len(candidate_resolution_i[1])} entity pairs of type {candidate_resolution_i[0]}") | logging.info(f"Created resolution prompt {len(text)} bytes for {len(candidate_resolution_i[1])} entity pairs of type {candidate_resolution_i[0]}") | ||||
| async with chat_limiter: | async with chat_limiter: | ||||
| try: | try: | ||||
| with trio.move_on_after(120) as cancel_scope: | |||||
| with trio.move_on_after(240) as cancel_scope: | |||||
| response = await trio.to_thread.run_sync(self._chat, text, [{"role": "user", "content": "Output:"}], {}) | response = await trio.to_thread.run_sync(self._chat, text, [{"role": "user", "content": "Output:"}], {}) | ||||
| if cancel_scope.cancelled_caught: | if cancel_scope.cancelled_caught: | ||||
| logging.warning("_resolve_candidate._chat timeout, skipping...") | logging.warning("_resolve_candidate._chat timeout, skipping...") |
| text = perform_variable_replacements(self._extraction_prompt, variables=prompt_variables) | text = perform_variable_replacements(self._extraction_prompt, variables=prompt_variables) | ||||
| async with chat_limiter: | async with chat_limiter: | ||||
| try: | try: | ||||
| with trio.move_on_after(80) as cancel_scope: | |||||
| with trio.move_on_after(180) as cancel_scope: | |||||
| response = await trio.to_thread.run_sync( self._chat, text, [{"role": "user", "content": "Output:"}], {}) | response = await trio.to_thread.run_sync( self._chat, text, [{"role": "user", "content": "Output:"}], {}) | ||||
| if cancel_scope.cancelled_caught: | if cancel_scope.cancelled_caught: | ||||
| logging.warning("extract_community_report._chat timeout, skipping...") | logging.warning("extract_community_report._chat timeout, skipping...") |
| ): | ): | ||||
| chunks.append(d["content_with_weight"]) | chunks.append(d["content_with_weight"]) | ||||
| subgraph = await generate_subgraph( | |||||
| LightKGExt | |||||
| if "method" not in row["kb_parser_config"].get("graphrag", {}) or row["kb_parser_config"]["graphrag"]["method"] != "general" | |||||
| else GeneralKGExt, | |||||
| tenant_id, | |||||
| kb_id, | |||||
| doc_id, | |||||
| chunks, | |||||
| language, | |||||
| row["kb_parser_config"]["graphrag"].get("entity_types", []), | |||||
| chat_model, | |||||
| embedding_model, | |||||
| callback, | |||||
| ) | |||||
| with trio.fail_after(len(chunks)*60): | |||||
| subgraph = await generate_subgraph( | |||||
| LightKGExt | |||||
| if "method" not in row["kb_parser_config"].get("graphrag", {}) or row["kb_parser_config"]["graphrag"]["method"] != "general" | |||||
| else GeneralKGExt, | |||||
| tenant_id, | |||||
| kb_id, | |||||
| doc_id, | |||||
| chunks, | |||||
| language, | |||||
| row["kb_parser_config"]["graphrag"].get("entity_types", []), | |||||
| chat_model, | |||||
| embedding_model, | |||||
| callback, | |||||
| ) | |||||
| if not subgraph: | if not subgraph: | ||||
| return | return | ||||
| return | return | ||||
| @timeout(60*60, 1) | |||||
| async def generate_subgraph( | async def generate_subgraph( | ||||
| extractor: Extractor, | extractor: Extractor, | ||||
| tenant_id: str, | tenant_id: str, |
| return res, tk_count | return res, tk_count | ||||
| @timeout(60*60, 1) | |||||
| @timeout(60*60*2, 1) | |||||
| async def do_handle_task(task): | async def do_handle_task(task): | ||||
| task_id = task["id"] | task_id = task["id"] | ||||
| task_from_page = task["from_page"] | task_from_page = task["from_page"] |