| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- #
- # Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
-
-
- from PIL import Image
-
- from rag.app.picture import vision_llm_chunk as picture_vision_llm_chunk
- from rag.prompts import vision_llm_figure_describe_prompt
-
-
- def vision_figure_parser_figure_data_wraper(figures_data_without_positions):
- return [(
- (figure_data[1], [figure_data[0]]),
- [(0, 0, 0, 0, 0)]
- ) for figure_data in figures_data_without_positions if isinstance(figure_data[1], Image.Image)]
-
-
- class VisionFigureParser:
- def __init__(self, vision_model, figures_data, *args, **kwargs):
- self.vision_model = vision_model
- self._extract_figures_info(figures_data)
- assert len(self.figures) == len(self.descriptions)
- assert not self.positions or (len(self.figures) == len(self.positions))
-
- def _extract_figures_info(self, figures_data):
- self.figures = []
- self.descriptions = []
- self.positions = []
-
- for item in figures_data:
- # position
- if len(item) == 2 and isinstance(item[1], list) and len(item[1]) == 1 and isinstance(item[1][0], tuple) and len(item[1][0]) == 5:
- img_desc = item[0]
- assert len(img_desc) == 2 and isinstance(img_desc[0], Image.Image) and isinstance(img_desc[1], list), "Should be (figure, [description])"
- self.figures.append(img_desc[0])
- self.descriptions.append(img_desc[1])
- self.positions.append(item[1])
- else:
- assert len(item) == 2 and isinstance(item, tuple) and isinstance(item[1], list), f"get {len(item)=}, {item=}"
- self.figures.append(item[0])
- self.descriptions.append(item[1])
-
- def _assemble(self):
- self.assembled = []
- self.has_positions = len(self.positions) != 0
- for i in range(len(self.figures)):
- figure = self.figures[i]
- desc = self.descriptions[i]
- pos = self.positions[i] if self.has_positions else None
-
- figure_desc = (figure, desc)
-
- if pos is not None:
- self.assembled.append((figure_desc, pos))
- else:
- self.assembled.append((figure_desc,))
-
- return self.assembled
-
- def __call__(self, **kwargs):
- callback = kwargs.get("callback", lambda prog, msg: None)
-
- for idx, img_binary in enumerate(self.figures or []):
- figure_num = idx # 0-based
-
- txt = picture_vision_llm_chunk(
- binary=img_binary,
- vision_model=self.vision_model,
- prompt=vision_llm_figure_describe_prompt(),
- callback=callback,
- )
-
- if txt:
- self.descriptions[figure_num] = txt + "\n".join(self.descriptions[figure_num])
-
- self._assemble()
-
- return self.assembled
|