|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429 |
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
-
- import os
- from copy import deepcopy
-
- import onnxruntime as ort
- from huggingface_hub import snapshot_download
-
- from .operators import *
- from rag.settings import cron_logger
-
-
- class Recognizer(object):
- def __init__(self, label_list, task_name, model_dir=None):
- """
- If you have trouble downloading HuggingFace models, -_^ this might help!!
-
- For Linux:
- export HF_ENDPOINT=https://hf-mirror.com
-
- For Windows:
- Good luck
- ^_-
-
- """
- if not model_dir:
- model_dir = snapshot_download(repo_id="InfiniFlow/deepdoc")
-
- model_file_path = os.path.join(model_dir, task_name + ".onnx")
- if not os.path.exists(model_file_path):
- raise ValueError("not find model file path {}".format(
- model_file_path))
- if ort.get_device() == "GPU":
- self.ort_sess = ort.InferenceSession(model_file_path, providers=['CUDAExecutionProvider'])
- else:
- self.ort_sess = ort.InferenceSession(model_file_path, providers=['CPUExecutionProvider'])
- self.input_names = [node.name for node in self.ort_sess.get_inputs()]
- self.output_names = [node.name for node in self.ort_sess.get_outputs()]
- self.input_shape = self.ort_sess.get_inputs()[0].shape[2:4]
- self.label_list = label_list
-
- @staticmethod
- def sort_Y_firstly(arr, threashold):
- # sort using y1 first and then x1
- arr = sorted(arr, key=lambda r: (r["top"], r["x0"]))
- for i in range(len(arr) - 1):
- for j in range(i, -1, -1):
- # restore the order using th
- if abs(arr[j + 1]["top"] - arr[j]["top"]) < threashold \
- and arr[j + 1]["x0"] < arr[j]["x0"]:
- tmp = deepcopy(arr[j])
- arr[j] = deepcopy(arr[j + 1])
- arr[j + 1] = deepcopy(tmp)
- return arr
-
- @staticmethod
- def sort_X_firstly(arr, threashold, copy=True):
- # sort using y1 first and then x1
- arr = sorted(arr, key=lambda r: (r["x0"], r["top"]))
- for i in range(len(arr) - 1):
- for j in range(i, -1, -1):
- # restore the order using th
- if abs(arr[j + 1]["x0"] - arr[j]["x0"]) < threashold \
- and arr[j + 1]["top"] < arr[j]["top"]:
- tmp = deepcopy(arr[j]) if copy else arr[j]
- arr[j] = deepcopy(arr[j + 1]) if copy else arr[j + 1]
- arr[j + 1] = deepcopy(tmp) if copy else tmp
- return arr
-
- @staticmethod
- def sort_C_firstly(arr, thr=0):
- # sort using y1 first and then x1
- # sorted(arr, key=lambda r: (r["x0"], r["top"]))
- arr = Recognizer.sort_X_firstly(arr, thr)
- for i in range(len(arr) - 1):
- for j in range(i, -1, -1):
- # restore the order using th
- if "C" not in arr[j] or "C" not in arr[j + 1]:
- continue
- if arr[j + 1]["C"] < arr[j]["C"] \
- or (
- arr[j + 1]["C"] == arr[j]["C"]
- and arr[j + 1]["top"] < arr[j]["top"]
- ):
- tmp = arr[j]
- arr[j] = arr[j + 1]
- arr[j + 1] = tmp
- return arr
-
- return sorted(arr, key=lambda r: (r.get("C", r["x0"]), r["top"]))
-
- @staticmethod
- def sort_R_firstly(arr, thr=0):
- # sort using y1 first and then x1
- # sorted(arr, key=lambda r: (r["top"], r["x0"]))
- arr = Recognizer.sort_Y_firstly(arr, thr)
- for i in range(len(arr) - 1):
- for j in range(i, -1, -1):
- if "R" not in arr[j] or "R" not in arr[j + 1]:
- continue
- if arr[j + 1]["R"] < arr[j]["R"] \
- or (
- arr[j + 1]["R"] == arr[j]["R"]
- and arr[j + 1]["x0"] < arr[j]["x0"]
- ):
- tmp = arr[j]
- arr[j] = arr[j + 1]
- arr[j + 1] = tmp
- return arr
-
- @staticmethod
- def overlapped_area(a, b, ratio=True):
- tp, btm, x0, x1 = a["top"], a["bottom"], a["x0"], a["x1"]
- if b["x0"] > x1 or b["x1"] < x0:
- return 0
- if b["bottom"] < tp or b["top"] > btm:
- return 0
- x0_ = max(b["x0"], x0)
- x1_ = min(b["x1"], x1)
- assert x0_ <= x1_, "Fuckedup! T:{},B:{},X0:{},X1:{} ==> {}".format(
- tp, btm, x0, x1, b)
- tp_ = max(b["top"], tp)
- btm_ = min(b["bottom"], btm)
- assert tp_ <= btm_, "Fuckedup! T:{},B:{},X0:{},X1:{} => {}".format(
- tp, btm, x0, x1, b)
- ov = (btm_ - tp_) * (x1_ - x0_) if x1 - \
- x0 != 0 and btm - tp != 0 else 0
- if ov > 0 and ratio:
- ov /= (x1 - x0) * (btm - tp)
- return ov
-
- @staticmethod
- def layouts_cleanup(boxes, layouts, far=2, thr=0.7):
- def notOverlapped(a, b):
- return any([a["x1"] < b["x0"],
- a["x0"] > b["x1"],
- a["bottom"] < b["top"],
- a["top"] > b["bottom"]])
-
- i = 0
- while i + 1 < len(layouts):
- j = i + 1
- while j < min(i + far, len(layouts)) \
- and (layouts[i].get("type", "") != layouts[j].get("type", "")
- or notOverlapped(layouts[i], layouts[j])):
- j += 1
- if j >= min(i + far, len(layouts)):
- i += 1
- continue
- if Recognizer.overlapped_area(layouts[i], layouts[j]) < thr \
- and Recognizer.overlapped_area(layouts[j], layouts[i]) < thr:
- i += 1
- continue
-
- if layouts[i].get("score") and layouts[j].get("score"):
- if layouts[i]["score"] > layouts[j]["score"]:
- layouts.pop(j)
- else:
- layouts.pop(i)
- continue
-
- area_i, area_i_1 = 0, 0
- for b in boxes:
- if not notOverlapped(b, layouts[i]):
- area_i += Recognizer.overlapped_area(b, layouts[i], False)
- if not notOverlapped(b, layouts[j]):
- area_i_1 += Recognizer.overlapped_area(b, layouts[j], False)
-
- if area_i > area_i_1:
- layouts.pop(j)
- else:
- layouts.pop(i)
-
- return layouts
-
- def create_inputs(self, imgs, im_info):
- """generate input for different model type
- Args:
- imgs (list(numpy)): list of images (np.ndarray)
- im_info (list(dict)): list of image info
- Returns:
- inputs (dict): input of model
- """
- inputs = {}
-
- im_shape = []
- scale_factor = []
- if len(imgs) == 1:
- inputs['image'] = np.array((imgs[0],)).astype('float32')
- inputs['im_shape'] = np.array(
- (im_info[0]['im_shape'],)).astype('float32')
- inputs['scale_factor'] = np.array(
- (im_info[0]['scale_factor'],)).astype('float32')
- return inputs
-
- for e in im_info:
- im_shape.append(np.array((e['im_shape'],)).astype('float32'))
- scale_factor.append(np.array((e['scale_factor'],)).astype('float32'))
-
- inputs['im_shape'] = np.concatenate(im_shape, axis=0)
- inputs['scale_factor'] = np.concatenate(scale_factor, axis=0)
-
- imgs_shape = [[e.shape[1], e.shape[2]] for e in imgs]
- max_shape_h = max([e[0] for e in imgs_shape])
- max_shape_w = max([e[1] for e in imgs_shape])
- padding_imgs = []
- for img in imgs:
- im_c, im_h, im_w = img.shape[:]
- padding_im = np.zeros(
- (im_c, max_shape_h, max_shape_w), dtype=np.float32)
- padding_im[:, :im_h, :im_w] = img
- padding_imgs.append(padding_im)
- inputs['image'] = np.stack(padding_imgs, axis=0)
- return inputs
-
- @staticmethod
- def find_overlapped(box, boxes_sorted_by_y, naive=False):
- if not boxes_sorted_by_y:
- return
- bxs = boxes_sorted_by_y
- s, e, ii = 0, len(bxs), 0
- while s < e and not naive:
- ii = (e + s) // 2
- pv = bxs[ii]
- if box["bottom"] < pv["top"]:
- e = ii
- continue
- if box["top"] > pv["bottom"]:
- s = ii + 1
- continue
- break
- while s < ii:
- if box["top"] > bxs[s]["bottom"]:
- s += 1
- break
- while e - 1 > ii:
- if box["bottom"] < bxs[e - 1]["top"]:
- e -= 1
- break
-
- max_overlaped_i, max_overlaped = None, 0
- for i in range(s, e):
- ov = Recognizer.overlapped_area(bxs[i], box)
- if ov <= max_overlaped:
- continue
- max_overlaped_i = i
- max_overlaped = ov
-
- return max_overlaped_i
-
- @staticmethod
- def find_overlapped_with_threashold(box, boxes, thr=0.3):
- if not boxes:
- return
- max_overlaped_i, max_overlaped, _max_overlaped = None, thr, 0
- s, e = 0, len(boxes)
- for i in range(s, e):
- ov = Recognizer.overlapped_area(box, boxes[i])
- _ov = Recognizer.overlapped_area(boxes[i], box)
- if (ov, _ov) < (max_overlaped, _max_overlaped):
- continue
- max_overlaped_i = i
- max_overlaped = ov
- _max_overlaped = _ov
-
- return max_overlaped_i
-
- def preprocess(self, image_list):
- inputs = []
- if "scale_factor" in self.input_names:
- preprocess_ops = []
- for op_info in [
- {'interp': 2, 'keep_ratio': False, 'target_size': [800, 608], 'type': 'LinearResize'},
- {'is_scale': True, 'mean': [0.485, 0.456, 0.406], 'std': [0.229, 0.224, 0.225], 'type': 'StandardizeImage'},
- {'type': 'Permute'},
- {'stride': 32, 'type': 'PadStride'}
- ]:
- new_op_info = op_info.copy()
- op_type = new_op_info.pop('type')
- preprocess_ops.append(eval(op_type)(**new_op_info))
-
- for im_path in image_list:
- im, im_info = preprocess(im_path, preprocess_ops)
- inputs.append({"image": np.array((im,)).astype('float32'),
- "scale_factor": np.array((im_info["scale_factor"],)).astype('float32')})
- else:
- hh, ww = self.input_shape
- for img in image_list:
- h, w = img.shape[:2]
- img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
- img = cv2.resize(np.array(img).astype('float32'), (ww, hh))
- # Scale input pixel values to 0 to 1
- img /= 255.0
- img = img.transpose(2, 0, 1)
- img = img[np.newaxis, :, :, :].astype(np.float32)
- inputs.append({self.input_names[0]: img, "scale_factor": [w/ww, h/hh]})
- return inputs
-
- def postprocess(self, boxes, inputs, thr):
- if "scale_factor" in self.input_names:
- bb = []
- for b in boxes:
- clsid, bbox, score = int(b[0]), b[2:], b[1]
- if score < thr:
- continue
- if clsid >= len(self.label_list):
- cron_logger.warning(f"bad category id")
- continue
- bb.append({
- "type": self.label_list[clsid].lower(),
- "bbox": [float(t) for t in bbox.tolist()],
- "score": float(score)
- })
- return bb
-
- def xywh2xyxy(x):
- # [x, y, w, h] to [x1, y1, x2, y2]
- y = np.copy(x)
- y[:, 0] = x[:, 0] - x[:, 2] / 2
- y[:, 1] = x[:, 1] - x[:, 3] / 2
- y[:, 2] = x[:, 0] + x[:, 2] / 2
- y[:, 3] = x[:, 1] + x[:, 3] / 2
- return y
-
- def compute_iou(box, boxes):
- # Compute xmin, ymin, xmax, ymax for both boxes
- xmin = np.maximum(box[0], boxes[:, 0])
- ymin = np.maximum(box[1], boxes[:, 1])
- xmax = np.minimum(box[2], boxes[:, 2])
- ymax = np.minimum(box[3], boxes[:, 3])
-
- # Compute intersection area
- intersection_area = np.maximum(0, xmax - xmin) * np.maximum(0, ymax - ymin)
-
- # Compute union area
- box_area = (box[2] - box[0]) * (box[3] - box[1])
- boxes_area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
- union_area = box_area + boxes_area - intersection_area
-
- # Compute IoU
- iou = intersection_area / union_area
-
- return iou
-
- def iou_filter(boxes, scores, iou_threshold):
- sorted_indices = np.argsort(scores)[::-1]
-
- keep_boxes = []
- while sorted_indices.size > 0:
- # Pick the last box
- box_id = sorted_indices[0]
- keep_boxes.append(box_id)
-
- # Compute IoU of the picked box with the rest
- ious = compute_iou(boxes[box_id, :], boxes[sorted_indices[1:], :])
-
- # Remove boxes with IoU over the threshold
- keep_indices = np.where(ious < iou_threshold)[0]
-
- # print(keep_indices.shape, sorted_indices.shape)
- sorted_indices = sorted_indices[keep_indices + 1]
-
- return keep_boxes
-
- boxes = np.squeeze(boxes).T
- # Filter out object confidence scores below threshold
- scores = np.max(boxes[:, 4:], axis=1)
- boxes = boxes[scores > thr, :]
- scores = scores[scores > thr]
- if len(boxes) == 0: return []
-
- # Get the class with the highest confidence
- class_ids = np.argmax(boxes[:, 4:], axis=1)
- boxes = boxes[:, :4]
- input_shape = np.array([inputs["scale_factor"][0], inputs["scale_factor"][1], inputs["scale_factor"][0], inputs["scale_factor"][1]])
- boxes = np.multiply(boxes, input_shape, dtype=np.float32)
- boxes = xywh2xyxy(boxes)
-
- unique_class_ids = np.unique(class_ids)
- indices = []
- for class_id in unique_class_ids:
- class_indices = np.where(class_ids == class_id)[0]
- class_boxes = boxes[class_indices, :]
- class_scores = scores[class_indices]
- class_keep_boxes = iou_filter(class_boxes, class_scores, 0.2)
- indices.extend(class_indices[class_keep_boxes])
-
- return [{
- "type": self.label_list[class_ids[i]].lower(),
- "bbox": [float(t) for t in boxes[i].tolist()],
- "score": float(scores[i])
- } for i in indices]
-
- def __call__(self, image_list, thr=0.7, batch_size=16):
- res = []
- imgs = []
- for i in range(len(image_list)):
- if not isinstance(image_list[i], np.ndarray):
- imgs.append(np.array(image_list[i]))
- else: imgs.append(image_list[i])
-
- batch_loop_cnt = math.ceil(float(len(imgs)) / batch_size)
- for i in range(batch_loop_cnt):
- start_index = i * batch_size
- end_index = min((i + 1) * batch_size, len(imgs))
- batch_image_list = imgs[start_index:end_index]
- inputs = self.preprocess(batch_image_list)
- print("preprocess")
- for ins in inputs:
- bb = self.postprocess(self.ort_sess.run(None, {k:v for k,v in ins.items() if k in self.input_names})[0], ins, thr)
- res.append(bb)
-
- #seeit.save_results(image_list, res, self.label_list, threshold=thr)
-
- return res
-
-
-
|