### What problem does this PR solve? Fix nursery.start_soon. Close #5575 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)tags/v0.17.1
| for candidate_resolution_i in candidate_resolution.items(): | for candidate_resolution_i in candidate_resolution.items(): | ||||
| if not candidate_resolution_i[1]: | if not candidate_resolution_i[1]: | ||||
| continue | continue | ||||
| nursery.start_soon(self._resolve_candidate(candidate_resolution_i, resolution_result)) | |||||
| nursery.start_soon(lambda: self._resolve_candidate(candidate_resolution_i, resolution_result)) | |||||
| connect_graph = nx.Graph() | connect_graph = nx.Graph() | ||||
| removed_entities = [] | removed_entities = [] |
| async with trio.open_nursery() as nursery: | async with trio.open_nursery() as nursery: | ||||
| for i, (cid, ck) in enumerate(chunks): | for i, (cid, ck) in enumerate(chunks): | ||||
| ck = truncate(ck, int(self._llm.max_length*0.8)) | ck = truncate(ck, int(self._llm.max_length*0.8)) | ||||
| nursery.start_soon(self._process_single_content, (cid, ck), i, len(chunks), out_results) | |||||
| nursery.start_soon(lambda: self._process_single_content((cid, ck), i, len(chunks), out_results)) | |||||
| maybe_nodes = defaultdict(list) | maybe_nodes = defaultdict(list) | ||||
| maybe_edges = defaultdict(list) | maybe_edges = defaultdict(list) | ||||
| all_entities_data = [] | all_entities_data = [] | ||||
| async with trio.open_nursery() as nursery: | async with trio.open_nursery() as nursery: | ||||
| for en_nm, ents in maybe_nodes.items(): | for en_nm, ents in maybe_nodes.items(): | ||||
| nursery.start_soon(self._merge_nodes, en_nm, ents, all_entities_data) | |||||
| nursery.start_soon(lambda: self._merge_nodes(en_nm, ents, all_entities_data)) | |||||
| now = trio.current_time() | now = trio.current_time() | ||||
| if callback: | if callback: | ||||
| callback(msg = f"Entities merging done, {now-start_ts:.2f}s.") | callback(msg = f"Entities merging done, {now-start_ts:.2f}s.") | ||||
| all_relationships_data = [] | all_relationships_data = [] | ||||
| async with trio.open_nursery() as nursery: | async with trio.open_nursery() as nursery: | ||||
| for (src, tgt), rels in maybe_edges.items(): | for (src, tgt), rels in maybe_edges.items(): | ||||
| nursery.start_soon(self._merge_edges, src, tgt, rels, all_relationships_data) | |||||
| nursery.start_soon(lambda: self._merge_edges(src, tgt, rels, all_relationships_data)) | |||||
| now = trio.current_time() | now = trio.current_time() | ||||
| if callback: | if callback: | ||||
| callback(msg = f"Relationships merging done, {now-start_ts:.2f}s.") | callback(msg = f"Relationships merging done, {now-start_ts:.2f}s.") |
| for i in range(len(sections)): | for i in range(len(sections)): | ||||
| section_cnt = num_tokens_from_string(sections[i]) | section_cnt = num_tokens_from_string(sections[i]) | ||||
| if cnt + section_cnt >= token_count and texts: | if cnt + section_cnt >= token_count and texts: | ||||
| nursery.start_soon(self._process_document, "".join(texts), prompt_variables, res) | |||||
| nursery.start_soon(lambda: self._process_document("".join(texts), prompt_variables, res)) | |||||
| texts = [] | texts = [] | ||||
| cnt = 0 | cnt = 0 | ||||
| texts.append(sections[i]) | texts.append(sections[i]) | ||||
| cnt += section_cnt | cnt += section_cnt | ||||
| if texts: | if texts: | ||||
| nursery.start_soon(self._process_document, "".join(texts), prompt_variables, res) | |||||
| nursery.start_soon(lambda: self._process_document("".join(texts), prompt_variables, res)) | |||||
| if not res: | if not res: | ||||
| return MindMapResult(output={"id": "root", "children": []}) | return MindMapResult(output={"id": "root", "children": []}) | ||||
| merge_json = reduce(self._merge, res) | merge_json = reduce(self._merge, res) |
| return | return | ||||
| async with trio.open_nursery() as nursery: | async with trio.open_nursery() as nursery: | ||||
| for d in docs: | for d in docs: | ||||
| nursery.start_soon(doc_keyword_extraction, chat_mdl, d, task["parser_config"]["auto_keywords"]) | |||||
| nursery.start_soon(lambda: doc_keyword_extraction(chat_mdl, d, task["parser_config"]["auto_keywords"])) | |||||
| progress_callback(msg="Keywords generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st)) | progress_callback(msg="Keywords generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st)) | ||||
| if task["parser_config"].get("auto_questions", 0): | if task["parser_config"].get("auto_questions", 0): | ||||
| d["question_tks"] = rag_tokenizer.tokenize("\n".join(d["question_kwd"])) | d["question_tks"] = rag_tokenizer.tokenize("\n".join(d["question_kwd"])) | ||||
| async with trio.open_nursery() as nursery: | async with trio.open_nursery() as nursery: | ||||
| for d in docs: | for d in docs: | ||||
| nursery.start_soon(doc_question_proposal, chat_mdl, d, task["parser_config"]["auto_questions"]) | |||||
| nursery.start_soon(lambda: doc_question_proposal(chat_mdl, d, task["parser_config"]["auto_questions"])) | |||||
| progress_callback(msg="Question generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st)) | progress_callback(msg="Question generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st)) | ||||
| if task["kb_parser_config"].get("tag_kb_ids", []): | if task["kb_parser_config"].get("tag_kb_ids", []): | ||||
| d[TAG_FLD] = json.loads(cached) | d[TAG_FLD] = json.loads(cached) | ||||
| async with trio.open_nursery() as nursery: | async with trio.open_nursery() as nursery: | ||||
| for d in docs_to_tag: | for d in docs_to_tag: | ||||
| nursery.start_soon(doc_content_tagging, chat_mdl, d, topn_tags) | |||||
| nursery.start_soon(lambda: doc_content_tagging(chat_mdl, d, topn_tags)) | |||||
| progress_callback(msg="Tagging {} chunks completed in {:.2f}s".format(len(docs), timer() - st)) | progress_callback(msg="Tagging {} chunks completed in {:.2f}s".format(len(docs), timer() - st)) | ||||
| return docs | return docs |