### What problem does this PR solve? This PR addresses critical memory leaks in the task executor's image processing pipeline. The current implementation fails to properly dispose of PIL Image objects and BytesIO buffers during chunk processing, leading to progressive memory accumulation that can cause the task executor to consume excessive memory over time. ### Background context - The `upload_to_minio` function processes images from document chunks and converts them to JPEG format for storage. - PIL Image objects hold significant memory resources that must be explicitly closed to prevent memory leaks. - BytesIO objects also consume memory and should be properly disposed of after use. - In high-throughput scenarios with many image-containing documents, these memory leaks can lead to out-of-memory errors and degraded performance. ### Specific issues fixed - PIL Image objects were not being explicitly closed after processing. - BytesIO buffers lacked proper cleanup in all code paths. - Converted images (RGBA/P to RGB) were not disposing of the original image object. - Memory references to large image data were not being cleared promptly. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Performance Improvement ### Changes made - Added explicit `d["image"].close()` calls after image processing operations. - Implemented proper cleanup of converted images when changing formats from RGBA/P to RGB. - Enhanced BytesIO cleanup with `try/finally` blocks to ensure disposal in all code paths. - Added explicit `del d["image"]` to clear memory references after processing. This fix ensures stable memory usage during long-running document processing tasks and prevents potential out-of-memory conditions in production environments.tags/v0.20.0
| @@ -289,18 +289,26 @@ async def build_chunks(task, progress_callback): | |||
| return | |||
| output_buffer = BytesIO() | |||
| if isinstance(d["image"], bytes): | |||
| output_buffer = BytesIO(d["image"]) | |||
| else: | |||
| # If the image is in RGBA mode, convert it to RGB mode before saving it in JPEG format. | |||
| if d["image"].mode in ("RGBA", "P"): | |||
| d["image"] = d["image"].convert("RGB") | |||
| d["image"].save(output_buffer, format='JPEG') | |||
| async with minio_limiter: | |||
| await trio.to_thread.run_sync(lambda: STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue())) | |||
| d["img_id"] = "{}-{}".format(task["kb_id"], d["id"]) | |||
| del d["image"] | |||
| docs.append(d) | |||
| try: | |||
| if isinstance(d["image"], bytes): | |||
| output_buffer.write(d["image"]) | |||
| output_buffer.seek(0) | |||
| else: | |||
| # If the image is in RGBA mode, convert it to RGB mode before saving it in JPEG format. | |||
| if d["image"].mode in ("RGBA", "P"): | |||
| converted_image = d["image"].convert("RGB") | |||
| d["image"].close() # Close original image | |||
| d["image"] = converted_image | |||
| d["image"].save(output_buffer, format='JPEG') | |||
| d["image"].close() # Close PIL image after saving | |||
| async with minio_limiter: | |||
| await trio.to_thread.run_sync(lambda: STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue())) | |||
| d["img_id"] = "{}-{}".format(task["kb_id"], d["id"]) | |||
| del d["image"] # Remove image reference | |||
| docs.append(d) | |||
| finally: | |||
| output_buffer.close() # Ensure BytesIO is always closed | |||
| except Exception: | |||
| logging.exception( | |||
| "Saving image of chunk {}/{}/{} got exception".format(task["location"], task["name"], d["id"])) | |||