Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

fix(nursery): Fix Closure Trap Issues in Trio Concurrent Tasks (#7106) ## Problem Description Multiple files in the RAGFlow project contain closure trap issues when using lambda functions with `trio.open_nursery()`. This problem causes concurrent tasks created in loops to reference the same variable, resulting in all tasks processing the same data (the data from the last iteration) rather than each task processing its corresponding data from the loop. ## Issue Details When using a `lambda` to create a closure function and passing it to `nursery.start_soon()` within a loop, the lambda function captures a reference to the loop variable rather than its value. For example: ```python # Problematic code async with trio.open_nursery() as nursery: for d in docs: nursery.start_soon(lambda: doc_keyword_extraction(chat_mdl, d, topn)) ``` In this pattern, when concurrent tasks begin execution, `d` has already become the value after the loop ends (typically the last element), causing all tasks to use the same data. ## Fix Solution Changed the way concurrent tasks are created with `nursery.start_soon()` by leveraging Trio's API design to directly pass the function and its arguments separately: ```python # Fixed code async with trio.open_nursery() as nursery: for d in docs: nursery.start_soon(doc_keyword_extraction, chat_mdl, d, topn) ``` This way, each task uses the parameter values at the time of the function call, rather than references captured through closures. ## Fixed Files Fixed closure traps in the following files: 1. `rag/svr/task_executor.py`: 3 fixes, involving document keyword extraction, question generation, and tag processing 2. `rag/raptor.py`: 1 fix, involving document summarization 3. `graphrag/utils.py`: 2 fixes, involving graph node and edge processing 4. `graphrag/entity_resolution.py`: 2 fixes, involving entity resolution and graph node merging 5. `graphrag/general/mind_map_extractor.py`: 2 fixes, involving document processing 6. `graphrag/general/extractor.py`: 3 fixes, involving content processing and graph node/edge merging 7. `graphrag/general/community_reports_extractor.py`: 1 fix, involving community report extraction ## Potential Impact This fix resolves a serious concurrency issue that could have caused: - Data processing errors (processing duplicate data) - Performance degradation (all tasks working on the same data) - Inconsistent results (some data not being processed) After the fix, all concurrent tasks should correctly process their respective data, improving system correctness and reliability.
pirms 6 mēnešiem
fix(nursery): Fix Closure Trap Issues in Trio Concurrent Tasks (#7106) ## Problem Description Multiple files in the RAGFlow project contain closure trap issues when using lambda functions with `trio.open_nursery()`. This problem causes concurrent tasks created in loops to reference the same variable, resulting in all tasks processing the same data (the data from the last iteration) rather than each task processing its corresponding data from the loop. ## Issue Details When using a `lambda` to create a closure function and passing it to `nursery.start_soon()` within a loop, the lambda function captures a reference to the loop variable rather than its value. For example: ```python # Problematic code async with trio.open_nursery() as nursery: for d in docs: nursery.start_soon(lambda: doc_keyword_extraction(chat_mdl, d, topn)) ``` In this pattern, when concurrent tasks begin execution, `d` has already become the value after the loop ends (typically the last element), causing all tasks to use the same data. ## Fix Solution Changed the way concurrent tasks are created with `nursery.start_soon()` by leveraging Trio's API design to directly pass the function and its arguments separately: ```python # Fixed code async with trio.open_nursery() as nursery: for d in docs: nursery.start_soon(doc_keyword_extraction, chat_mdl, d, topn) ``` This way, each task uses the parameter values at the time of the function call, rather than references captured through closures. ## Fixed Files Fixed closure traps in the following files: 1. `rag/svr/task_executor.py`: 3 fixes, involving document keyword extraction, question generation, and tag processing 2. `rag/raptor.py`: 1 fix, involving document summarization 3. `graphrag/utils.py`: 2 fixes, involving graph node and edge processing 4. `graphrag/entity_resolution.py`: 2 fixes, involving entity resolution and graph node merging 5. `graphrag/general/mind_map_extractor.py`: 2 fixes, involving document processing 6. `graphrag/general/extractor.py`: 3 fixes, involving content processing and graph node/edge merging 7. `graphrag/general/community_reports_extractor.py`: 1 fix, involving community report extraction ## Potential Impact This fix resolves a serious concurrency issue that could have caused: - Data processing errors (processing duplicate data) - Performance degradation (all tasks working on the same data) - Inconsistent results (some data not being processed) After the fix, all concurrent tasks should correctly process their respective data, improving system correctness and reliability.
pirms 6 mēnešiem
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #
  2. # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import logging
  17. import itertools
  18. import re
  19. from dataclasses import dataclass
  20. from typing import Any, Callable
  21. import networkx as nx
  22. import trio
  23. from graphrag.general.extractor import Extractor
  24. from rag.nlp import is_english
  25. import editdistance
  26. from graphrag.entity_resolution_prompt import ENTITY_RESOLUTION_PROMPT
  27. from rag.llm.chat_model import Base as CompletionLLM
  28. from graphrag.utils import perform_variable_replacements, chat_limiter, GraphChange
  29. DEFAULT_RECORD_DELIMITER = "##"
  30. DEFAULT_ENTITY_INDEX_DELIMITER = "<|>"
  31. DEFAULT_RESOLUTION_RESULT_DELIMITER = "&&"
  32. @dataclass
  33. class EntityResolutionResult:
  34. """Entity resolution result class definition."""
  35. graph: nx.Graph
  36. change: GraphChange
  37. class EntityResolution(Extractor):
  38. """Entity resolution class definition."""
  39. _resolution_prompt: str
  40. _output_formatter_prompt: str
  41. _record_delimiter_key: str
  42. _entity_index_delimiter_key: str
  43. _resolution_result_delimiter_key: str
  44. def __init__(
  45. self,
  46. llm_invoker: CompletionLLM,
  47. ):
  48. super().__init__(llm_invoker)
  49. """Init method definition."""
  50. self._llm = llm_invoker
  51. self._resolution_prompt = ENTITY_RESOLUTION_PROMPT
  52. self._record_delimiter_key = "record_delimiter"
  53. self._entity_index_dilimiter_key = "entity_index_delimiter"
  54. self._resolution_result_delimiter_key = "resolution_result_delimiter"
  55. self._input_text_key = "input_text"
  56. async def __call__(self, graph: nx.Graph,
  57. subgraph_nodes: set[str],
  58. prompt_variables: dict[str, Any] | None = None,
  59. callback: Callable | None = None) -> EntityResolutionResult:
  60. """Call method definition."""
  61. if prompt_variables is None:
  62. prompt_variables = {}
  63. # Wire defaults into the prompt variables
  64. self.prompt_variables = {
  65. **prompt_variables,
  66. self._record_delimiter_key: prompt_variables.get(self._record_delimiter_key)
  67. or DEFAULT_RECORD_DELIMITER,
  68. self._entity_index_dilimiter_key: prompt_variables.get(self._entity_index_dilimiter_key)
  69. or DEFAULT_ENTITY_INDEX_DELIMITER,
  70. self._resolution_result_delimiter_key: prompt_variables.get(self._resolution_result_delimiter_key)
  71. or DEFAULT_RESOLUTION_RESULT_DELIMITER,
  72. }
  73. nodes = sorted(graph.nodes())
  74. entity_types = sorted(set(graph.nodes[node].get('entity_type', '-') for node in nodes))
  75. node_clusters = {entity_type: [] for entity_type in entity_types}
  76. for node in nodes:
  77. node_clusters[graph.nodes[node].get('entity_type', '-')].append(node)
  78. candidate_resolution = {entity_type: [] for entity_type in entity_types}
  79. for k, v in node_clusters.items():
  80. candidate_resolution[k] = [(a, b) for a, b in itertools.combinations(v, 2) if (a in subgraph_nodes or b in subgraph_nodes) and self.is_similarity(a, b)]
  81. num_candidates = sum([len(candidates) for _, candidates in candidate_resolution.items()])
  82. callback(msg=f"Identified {num_candidates} candidate pairs")
  83. resolution_result = set()
  84. resolution_batch_size = 100
  85. async with trio.open_nursery() as nursery:
  86. for candidate_resolution_i in candidate_resolution.items():
  87. if not candidate_resolution_i[1]:
  88. continue
  89. for i in range(0, len(candidate_resolution_i[1]), resolution_batch_size):
  90. candidate_batch = candidate_resolution_i[0], candidate_resolution_i[1][i:i + resolution_batch_size]
  91. nursery.start_soon(self._resolve_candidate, candidate_batch, resolution_result)
  92. callback(msg=f"Resolved {num_candidates} candidate pairs, {len(resolution_result)} of them are selected to merge.")
  93. change = GraphChange()
  94. connect_graph = nx.Graph()
  95. connect_graph.add_edges_from(resolution_result)
  96. async with trio.open_nursery() as nursery:
  97. for sub_connect_graph in nx.connected_components(connect_graph):
  98. merging_nodes = list(sub_connect_graph)
  99. nursery.start_soon(self._merge_graph_nodes, graph, merging_nodes, change)
  100. # Update pagerank
  101. pr = nx.pagerank(graph)
  102. for node_name, pagerank in pr.items():
  103. graph.nodes[node_name]["pagerank"] = pagerank
  104. return EntityResolutionResult(
  105. graph=graph,
  106. change=change,
  107. )
  108. async def _resolve_candidate(self, candidate_resolution_i: tuple[str, list[tuple[str, str]]], resolution_result: set[str]):
  109. gen_conf = {"temperature": 0.5}
  110. pair_txt = [
  111. f'When determining whether two {candidate_resolution_i[0]}s are the same, you should only focus on critical properties and overlook noisy factors.\n']
  112. for index, candidate in enumerate(candidate_resolution_i[1]):
  113. pair_txt.append(
  114. f'Question {index + 1}: name of{candidate_resolution_i[0]} A is {candidate[0]} ,name of{candidate_resolution_i[0]} B is {candidate[1]}')
  115. sent = 'question above' if len(pair_txt) == 1 else f'above {len(pair_txt)} questions'
  116. pair_txt.append(
  117. f'\nUse domain knowledge of {candidate_resolution_i[0]}s to help understand the text and answer the {sent} in the format: For Question i, Yes, {candidate_resolution_i[0]} A and {candidate_resolution_i[0]} B are the same {candidate_resolution_i[0]}./No, {candidate_resolution_i[0]} A and {candidate_resolution_i[0]} B are different {candidate_resolution_i[0]}s. For Question i+1, (repeat the above procedures)')
  118. pair_prompt = '\n'.join(pair_txt)
  119. variables = {
  120. **self.prompt_variables,
  121. self._input_text_key: pair_prompt
  122. }
  123. text = perform_variable_replacements(self._resolution_prompt, variables=variables)
  124. logging.info(f"Created resolution prompt {len(text)} bytes for {len(candidate_resolution_i[1])} entity pairs of type {candidate_resolution_i[0]}")
  125. async with chat_limiter:
  126. response = await trio.to_thread.run_sync(lambda: self._chat(text, [{"role": "user", "content": "Output:"}], gen_conf))
  127. logging.debug(f"_resolve_candidate chat prompt: {text}\nchat response: {response}")
  128. result = self._process_results(len(candidate_resolution_i[1]), response,
  129. self.prompt_variables.get(self._record_delimiter_key,
  130. DEFAULT_RECORD_DELIMITER),
  131. self.prompt_variables.get(self._entity_index_dilimiter_key,
  132. DEFAULT_ENTITY_INDEX_DELIMITER),
  133. self.prompt_variables.get(self._resolution_result_delimiter_key,
  134. DEFAULT_RESOLUTION_RESULT_DELIMITER))
  135. for result_i in result:
  136. resolution_result.add(candidate_resolution_i[1][result_i[0] - 1])
  137. def _process_results(
  138. self,
  139. records_length: int,
  140. results: str,
  141. record_delimiter: str,
  142. entity_index_delimiter: str,
  143. resolution_result_delimiter: str
  144. ) -> list:
  145. ans_list = []
  146. records = [r.strip() for r in results.split(record_delimiter)]
  147. for record in records:
  148. pattern_int = f"{re.escape(entity_index_delimiter)}(\d+){re.escape(entity_index_delimiter)}"
  149. match_int = re.search(pattern_int, record)
  150. res_int = int(str(match_int.group(1) if match_int else '0'))
  151. if res_int > records_length:
  152. continue
  153. pattern_bool = f"{re.escape(resolution_result_delimiter)}([a-zA-Z]+){re.escape(resolution_result_delimiter)}"
  154. match_bool = re.search(pattern_bool, record)
  155. res_bool = str(match_bool.group(1) if match_bool else '')
  156. if res_int and res_bool:
  157. if res_bool.lower() == 'yes':
  158. ans_list.append((res_int, "yes"))
  159. return ans_list
  160. def is_similarity(self, a, b):
  161. if is_english(a) and is_english(b):
  162. if editdistance.eval(a, b) <= min(len(a), len(b)) // 2:
  163. return True
  164. if len(set(a) & set(b)) > 1:
  165. return True
  166. return False