|
|
|
@@ -11,6 +11,7 @@ import docx |
|
|
|
import pandas as pd |
|
|
|
import pypandoc # type: ignore |
|
|
|
import pypdfium2 # type: ignore |
|
|
|
import webvtt # type: ignore |
|
|
|
import yaml # type: ignore |
|
|
|
from docx.document import Document |
|
|
|
from docx.oxml.table import CT_Tbl |
|
|
|
@@ -132,6 +133,8 @@ def _extract_text_by_mime_type(*, file_content: bytes, mime_type: str) -> str: |
|
|
|
return _extract_text_from_json(file_content) |
|
|
|
case "application/x-yaml" | "text/yaml": |
|
|
|
return _extract_text_from_yaml(file_content) |
|
|
|
case "text/vtt": |
|
|
|
return _extract_text_from_vtt(file_content) |
|
|
|
case _: |
|
|
|
raise UnsupportedFileTypeError(f"Unsupported MIME type: {mime_type}") |
|
|
|
|
|
|
|
@@ -139,7 +142,7 @@ def _extract_text_by_mime_type(*, file_content: bytes, mime_type: str) -> str: |
|
|
|
def _extract_text_by_file_extension(*, file_content: bytes, file_extension: str) -> str: |
|
|
|
"""Extract text from a file based on its file extension.""" |
|
|
|
match file_extension: |
|
|
|
case ".txt" | ".markdown" | ".md" | ".html" | ".htm" | ".xml" | ".vtt": |
|
|
|
case ".txt" | ".markdown" | ".md" | ".html" | ".htm" | ".xml": |
|
|
|
return _extract_text_from_plain_text(file_content) |
|
|
|
case ".json": |
|
|
|
return _extract_text_from_json(file_content) |
|
|
|
@@ -165,6 +168,8 @@ def _extract_text_by_file_extension(*, file_content: bytes, file_extension: str) |
|
|
|
return _extract_text_from_eml(file_content) |
|
|
|
case ".msg": |
|
|
|
return _extract_text_from_msg(file_content) |
|
|
|
case ".vtt": |
|
|
|
return _extract_text_from_vtt(file_content) |
|
|
|
case _: |
|
|
|
raise UnsupportedFileTypeError(f"Unsupported Extension Type: {file_extension}") |
|
|
|
|
|
|
|
@@ -462,3 +467,42 @@ def _extract_text_from_msg(file_content: bytes) -> str: |
|
|
|
return "\n".join([str(element) for element in elements]) |
|
|
|
except Exception as e: |
|
|
|
raise TextExtractionError(f"Failed to extract text from MSG: {str(e)}") from e |
|
|
|
|
|
|
|
|
|
|
|
def _extract_text_from_vtt(vtt_bytes: bytes) -> str: |
|
|
|
text = _extract_text_from_plain_text(vtt_bytes) |
|
|
|
|
|
|
|
# remove bom |
|
|
|
text = text.lstrip("\ufeff") |
|
|
|
|
|
|
|
raw_results = [] |
|
|
|
for caption in webvtt.from_string(text): |
|
|
|
raw_results.append((caption.voice, caption.text)) |
|
|
|
|
|
|
|
# Merge consecutive utterances by the same speaker |
|
|
|
merged_results = [] |
|
|
|
if raw_results: |
|
|
|
current_speaker, current_text = raw_results[0] |
|
|
|
|
|
|
|
for i in range(1, len(raw_results)): |
|
|
|
spk, txt = raw_results[i] |
|
|
|
if spk == None: |
|
|
|
merged_results.append((None, current_text)) |
|
|
|
continue |
|
|
|
|
|
|
|
if spk == current_speaker: |
|
|
|
# If it is the same speaker, merge the utterances (joined by space) |
|
|
|
current_text += " " + txt |
|
|
|
else: |
|
|
|
# If the speaker changes, register the utterance so far and move on |
|
|
|
merged_results.append((current_speaker, current_text)) |
|
|
|
current_speaker, current_text = spk, txt |
|
|
|
|
|
|
|
# Add the last element |
|
|
|
merged_results.append((current_speaker, current_text)) |
|
|
|
else: |
|
|
|
merged_results = raw_results |
|
|
|
|
|
|
|
# Return the result in the specified format: Speaker "text" style |
|
|
|
formatted = [f'{spk or ""} "{txt}"' for spk, txt in merged_results] |
|
|
|
return "\n".join(formatted) |