| from collections.abc import Mapping, Sequence | from collections.abc import Mapping, Sequence | ||||
| from typing import Any | from typing import Any | ||||
| import httpx | |||||
| from sqlalchemy import select | from sqlalchemy import select | ||||
| from constants import AUDIO_EXTENSIONS, DOCUMENT_EXTENSIONS, IMAGE_EXTENSIONS, VIDEO_EXTENSIONS | from constants import AUDIO_EXTENSIONS, DOCUMENT_EXTENSIONS, IMAGE_EXTENSIONS, VIDEO_EXTENSIONS | ||||
| file = File( | file = File( | ||||
| id=mapping.get("id"), | id=mapping.get("id"), | ||||
| filename=row.name, | filename=row.name, | ||||
| extension=row.extension, | |||||
| extension="." + row.extension, | |||||
| mime_type=row.mime_type, | mime_type=row.mime_type, | ||||
| tenant_id=tenant_id, | tenant_id=tenant_id, | ||||
| type=file_type, | type=file_type, | ||||
| url = mapping.get("url") | url = mapping.get("url") | ||||
| if not url: | if not url: | ||||
| raise ValueError("Invalid file url") | raise ValueError("Invalid file url") | ||||
| resp = ssrf_proxy.head(url, follow_redirects=True) | |||||
| resp.raise_for_status() | |||||
| # Try to extract filename from response headers or URL | |||||
| content_disposition = resp.headers.get("Content-Disposition") | |||||
| if content_disposition: | |||||
| filename = content_disposition.split("filename=")[-1].strip('"') | |||||
| resp = ssrf_proxy.head(url, follow_redirects=True) | |||||
| if resp.status_code == httpx.codes.OK: | |||||
| # Try to extract filename from response headers or URL | |||||
| content_disposition = resp.headers.get("Content-Disposition") | |||||
| if content_disposition: | |||||
| filename = content_disposition.split("filename=")[-1].strip('"') | |||||
| else: | |||||
| filename = url.split("/")[-1].split("?")[0] | |||||
| # Create the File object | |||||
| file_size = int(resp.headers.get("Content-Length", -1)) | |||||
| mime_type = str(resp.headers.get("Content-Type", "")) | |||||
| else: | else: | ||||
| filename = url.split("/")[-1].split("?")[0] | |||||
| filename = "" | |||||
| file_size = -1 | |||||
| mime_type = "" | |||||
| # If filename is empty, set a default one | # If filename is empty, set a default one | ||||
| if not filename: | if not filename: | ||||
| filename = "unknown_file" | filename = "unknown_file" | ||||
| # Determine file extension | # Determine file extension | ||||
| extension = "." + filename.split(".")[-1] if "." in filename else ".bin" | extension = "." + filename.split(".")[-1] if "." in filename else ".bin" | ||||
| # Create the File object | |||||
| file_size = int(resp.headers.get("Content-Length", -1)) | |||||
| mime_type = str(resp.headers.get("Content-Type", "")) | |||||
| if not mime_type: | if not mime_type: | ||||
| mime_type, _ = mimetypes.guess_type(url) | mime_type, _ = mimetypes.guess_type(url) | ||||
| file = File( | file = File( |