| @@ -0,0 +1,913 @@ | |||
| import hashlib | |||
| from io import BytesIO | |||
| from unittest.mock import patch | |||
| import pytest | |||
| from faker import Faker | |||
| from werkzeug.exceptions import NotFound | |||
| from configs import dify_config | |||
| from models.account import Account, Tenant | |||
| from models.enums import CreatorUserRole | |||
| from models.model import EndUser, UploadFile | |||
| from services.errors.file import FileTooLargeError, UnsupportedFileTypeError | |||
| from services.file_service import FileService | |||
| class TestFileService: | |||
| """Integration tests for FileService using testcontainers.""" | |||
| @pytest.fixture | |||
| def mock_external_service_dependencies(self): | |||
| """Mock setup for external service dependencies.""" | |||
| with ( | |||
| patch("services.file_service.storage") as mock_storage, | |||
| patch("services.file_service.file_helpers") as mock_file_helpers, | |||
| patch("services.file_service.ExtractProcessor") as mock_extract_processor, | |||
| ): | |||
| # Setup default mock returns | |||
| mock_storage.save.return_value = None | |||
| mock_storage.load.return_value = BytesIO(b"mock file content") | |||
| mock_file_helpers.get_signed_file_url.return_value = "https://example.com/signed-url" | |||
| mock_file_helpers.verify_image_signature.return_value = True | |||
| mock_file_helpers.verify_file_signature.return_value = True | |||
| mock_extract_processor.load_from_upload_file.return_value = "extracted text content" | |||
| yield { | |||
| "storage": mock_storage, | |||
| "file_helpers": mock_file_helpers, | |||
| "extract_processor": mock_extract_processor, | |||
| } | |||
| def _create_test_account(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Helper method to create a test account for testing. | |||
| Args: | |||
| db_session_with_containers: Database session from testcontainers infrastructure | |||
| mock_external_service_dependencies: Mock dependencies | |||
| Returns: | |||
| Account: Created account instance | |||
| """ | |||
| fake = Faker() | |||
| # Create account | |||
| account = Account( | |||
| email=fake.email(), | |||
| name=fake.name(), | |||
| interface_language="en-US", | |||
| status="active", | |||
| ) | |||
| from extensions.ext_database import db | |||
| db.session.add(account) | |||
| db.session.commit() | |||
| # Create tenant for the account | |||
| tenant = Tenant( | |||
| name=fake.company(), | |||
| status="normal", | |||
| ) | |||
| db.session.add(tenant) | |||
| db.session.commit() | |||
| # Create tenant-account join | |||
| from models.account import TenantAccountJoin, TenantAccountRole | |||
| join = TenantAccountJoin( | |||
| tenant_id=tenant.id, | |||
| account_id=account.id, | |||
| role=TenantAccountRole.OWNER.value, | |||
| current=True, | |||
| ) | |||
| db.session.add(join) | |||
| db.session.commit() | |||
| # Set current tenant for account | |||
| account.current_tenant = tenant | |||
| return account | |||
| def _create_test_end_user(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Helper method to create a test end user for testing. | |||
| Args: | |||
| db_session_with_containers: Database session from testcontainers infrastructure | |||
| mock_external_service_dependencies: Mock dependencies | |||
| Returns: | |||
| EndUser: Created end user instance | |||
| """ | |||
| fake = Faker() | |||
| end_user = EndUser( | |||
| tenant_id=str(fake.uuid4()), | |||
| type="web", | |||
| name=fake.name(), | |||
| is_anonymous=False, | |||
| session_id=fake.uuid4(), | |||
| ) | |||
| from extensions.ext_database import db | |||
| db.session.add(end_user) | |||
| db.session.commit() | |||
| return end_user | |||
| def _create_test_upload_file(self, db_session_with_containers, mock_external_service_dependencies, account): | |||
| """ | |||
| Helper method to create a test upload file for testing. | |||
| Args: | |||
| db_session_with_containers: Database session from testcontainers infrastructure | |||
| mock_external_service_dependencies: Mock dependencies | |||
| account: Account instance | |||
| Returns: | |||
| UploadFile: Created upload file instance | |||
| """ | |||
| fake = Faker() | |||
| upload_file = UploadFile( | |||
| tenant_id=account.current_tenant_id if hasattr(account, "current_tenant_id") else str(fake.uuid4()), | |||
| storage_type="local", | |||
| key=f"upload_files/test/{fake.uuid4()}.txt", | |||
| name="test_file.txt", | |||
| size=1024, | |||
| extension="txt", | |||
| mime_type="text/plain", | |||
| created_by_role=CreatorUserRole.ACCOUNT, | |||
| created_by=account.id, | |||
| created_at=fake.date_time(), | |||
| used=False, | |||
| hash=hashlib.sha3_256(b"test content").hexdigest(), | |||
| source_url="", | |||
| ) | |||
| from extensions.ext_database import db | |||
| db.session.add(upload_file) | |||
| db.session.commit() | |||
| return upload_file | |||
| # Test upload_file method | |||
| def test_upload_file_success(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test successful file upload with valid parameters. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| filename = "test_document.pdf" | |||
| content = b"test file content" | |||
| mimetype = "application/pdf" | |||
| upload_file = FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=account, | |||
| ) | |||
| assert upload_file is not None | |||
| assert upload_file.name == filename | |||
| assert upload_file.size == len(content) | |||
| assert upload_file.extension == "pdf" | |||
| assert upload_file.mime_type == mimetype | |||
| assert upload_file.created_by == account.id | |||
| assert upload_file.created_by_role == CreatorUserRole.ACCOUNT.value | |||
| assert upload_file.used is False | |||
| assert upload_file.hash == hashlib.sha3_256(content).hexdigest() | |||
| # Verify storage was called | |||
| mock_external_service_dependencies["storage"].save.assert_called_once() | |||
| # Verify database state | |||
| from extensions.ext_database import db | |||
| db.session.refresh(upload_file) | |||
| assert upload_file.id is not None | |||
| def test_upload_file_with_end_user(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test file upload with end user instead of account. | |||
| """ | |||
| fake = Faker() | |||
| end_user = self._create_test_end_user(db_session_with_containers, mock_external_service_dependencies) | |||
| filename = "test_image.jpg" | |||
| content = b"test image content" | |||
| mimetype = "image/jpeg" | |||
| upload_file = FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=end_user, | |||
| ) | |||
| assert upload_file is not None | |||
| assert upload_file.created_by == end_user.id | |||
| assert upload_file.created_by_role == CreatorUserRole.END_USER.value | |||
| def test_upload_file_with_datasets_source(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test file upload with datasets source parameter. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| filename = "test_document.pdf" | |||
| content = b"test file content" | |||
| mimetype = "application/pdf" | |||
| upload_file = FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=account, | |||
| source="datasets", | |||
| source_url="https://example.com/source", | |||
| ) | |||
| assert upload_file is not None | |||
| assert upload_file.source_url == "https://example.com/source" | |||
| def test_upload_file_invalid_filename_characters( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file upload with invalid filename characters. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| filename = "test/file<name>.txt" | |||
| content = b"test content" | |||
| mimetype = "text/plain" | |||
| with pytest.raises(ValueError, match="Filename contains invalid characters"): | |||
| FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=account, | |||
| ) | |||
| def test_upload_file_filename_too_long(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test file upload with filename that exceeds length limit. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| # Create a filename longer than 200 characters | |||
| long_name = "a" * 250 | |||
| filename = f"{long_name}.txt" | |||
| content = b"test content" | |||
| mimetype = "text/plain" | |||
| upload_file = FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=account, | |||
| ) | |||
| # Verify filename was truncated (the logic truncates the base name to 200 chars + extension) | |||
| # So the total length should be <= 200 + len(extension) + 1 (for the dot) | |||
| assert len(upload_file.name) <= 200 + len(upload_file.extension) + 1 | |||
| assert upload_file.name.endswith(".txt") | |||
| # Verify the base name was truncated | |||
| base_name = upload_file.name[:-4] # Remove .txt | |||
| assert len(base_name) <= 200 | |||
| def test_upload_file_datasets_unsupported_type( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file upload for datasets with unsupported file type. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| filename = "test_image.jpg" | |||
| content = b"test content" | |||
| mimetype = "image/jpeg" | |||
| with pytest.raises(UnsupportedFileTypeError): | |||
| FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=account, | |||
| source="datasets", | |||
| ) | |||
| def test_upload_file_too_large(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test file upload with file size exceeding limit. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| filename = "large_image.jpg" | |||
| # Create content larger than the limit | |||
| content = b"x" * (dify_config.UPLOAD_IMAGE_FILE_SIZE_LIMIT * 1024 * 1024 + 1) | |||
| mimetype = "image/jpeg" | |||
| with pytest.raises(FileTooLargeError): | |||
| FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=account, | |||
| ) | |||
| # Test is_file_size_within_limit method | |||
| def test_is_file_size_within_limit_image_success( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file size check for image files within limit. | |||
| """ | |||
| extension = "jpg" | |||
| file_size = dify_config.UPLOAD_IMAGE_FILE_SIZE_LIMIT * 1024 * 1024 # Exactly at limit | |||
| result = FileService.is_file_size_within_limit(extension=extension, file_size=file_size) | |||
| assert result is True | |||
| def test_is_file_size_within_limit_video_success( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file size check for video files within limit. | |||
| """ | |||
| extension = "mp4" | |||
| file_size = dify_config.UPLOAD_VIDEO_FILE_SIZE_LIMIT * 1024 * 1024 # Exactly at limit | |||
| result = FileService.is_file_size_within_limit(extension=extension, file_size=file_size) | |||
| assert result is True | |||
| def test_is_file_size_within_limit_audio_success( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file size check for audio files within limit. | |||
| """ | |||
| extension = "mp3" | |||
| file_size = dify_config.UPLOAD_AUDIO_FILE_SIZE_LIMIT * 1024 * 1024 # Exactly at limit | |||
| result = FileService.is_file_size_within_limit(extension=extension, file_size=file_size) | |||
| assert result is True | |||
| def test_is_file_size_within_limit_document_success( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file size check for document files within limit. | |||
| """ | |||
| extension = "pdf" | |||
| file_size = dify_config.UPLOAD_FILE_SIZE_LIMIT * 1024 * 1024 # Exactly at limit | |||
| result = FileService.is_file_size_within_limit(extension=extension, file_size=file_size) | |||
| assert result is True | |||
| def test_is_file_size_within_limit_image_exceeded( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file size check for image files exceeding limit. | |||
| """ | |||
| extension = "jpg" | |||
| file_size = dify_config.UPLOAD_IMAGE_FILE_SIZE_LIMIT * 1024 * 1024 + 1 # Exceeds limit | |||
| result = FileService.is_file_size_within_limit(extension=extension, file_size=file_size) | |||
| assert result is False | |||
| def test_is_file_size_within_limit_unknown_extension( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file size check for unknown file extension. | |||
| """ | |||
| extension = "xyz" | |||
| file_size = dify_config.UPLOAD_FILE_SIZE_LIMIT * 1024 * 1024 # Uses default limit | |||
| result = FileService.is_file_size_within_limit(extension=extension, file_size=file_size) | |||
| assert result is True | |||
| # Test upload_text method | |||
| def test_upload_text_success(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test successful text upload. | |||
| """ | |||
| fake = Faker() | |||
| text = "This is a test text content" | |||
| text_name = "test_text.txt" | |||
| # Mock current_user | |||
| with patch("services.file_service.current_user") as mock_current_user: | |||
| mock_current_user.current_tenant_id = str(fake.uuid4()) | |||
| mock_current_user.id = str(fake.uuid4()) | |||
| upload_file = FileService.upload_text(text=text, text_name=text_name) | |||
| assert upload_file is not None | |||
| assert upload_file.name == text_name | |||
| assert upload_file.size == len(text) | |||
| assert upload_file.extension == "txt" | |||
| assert upload_file.mime_type == "text/plain" | |||
| assert upload_file.used is True | |||
| assert upload_file.used_by == mock_current_user.id | |||
| # Verify storage was called | |||
| mock_external_service_dependencies["storage"].save.assert_called_once() | |||
| def test_upload_text_name_too_long(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test text upload with name that exceeds length limit. | |||
| """ | |||
| fake = Faker() | |||
| text = "test content" | |||
| long_name = "a" * 250 # Longer than 200 characters | |||
| # Mock current_user | |||
| with patch("services.file_service.current_user") as mock_current_user: | |||
| mock_current_user.current_tenant_id = str(fake.uuid4()) | |||
| mock_current_user.id = str(fake.uuid4()) | |||
| upload_file = FileService.upload_text(text=text, text_name=long_name) | |||
| # Verify name was truncated | |||
| assert len(upload_file.name) <= 200 | |||
| assert upload_file.name == "a" * 200 | |||
| # Test get_file_preview method | |||
| def test_get_file_preview_success(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test successful file preview generation. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| upload_file = self._create_test_upload_file( | |||
| db_session_with_containers, mock_external_service_dependencies, account | |||
| ) | |||
| # Update file to have document extension | |||
| upload_file.extension = "pdf" | |||
| from extensions.ext_database import db | |||
| db.session.commit() | |||
| result = FileService.get_file_preview(file_id=upload_file.id) | |||
| assert result == "extracted text content" | |||
| mock_external_service_dependencies["extract_processor"].load_from_upload_file.assert_called_once() | |||
| def test_get_file_preview_file_not_found(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test file preview with non-existent file. | |||
| """ | |||
| fake = Faker() | |||
| non_existent_id = str(fake.uuid4()) | |||
| with pytest.raises(NotFound, match="File not found"): | |||
| FileService.get_file_preview(file_id=non_existent_id) | |||
| def test_get_file_preview_unsupported_file_type( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file preview with unsupported file type. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| upload_file = self._create_test_upload_file( | |||
| db_session_with_containers, mock_external_service_dependencies, account | |||
| ) | |||
| # Update file to have non-document extension | |||
| upload_file.extension = "jpg" | |||
| from extensions.ext_database import db | |||
| db.session.commit() | |||
| with pytest.raises(UnsupportedFileTypeError): | |||
| FileService.get_file_preview(file_id=upload_file.id) | |||
| def test_get_file_preview_text_truncation(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test file preview with text that exceeds preview limit. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| upload_file = self._create_test_upload_file( | |||
| db_session_with_containers, mock_external_service_dependencies, account | |||
| ) | |||
| # Update file to have document extension | |||
| upload_file.extension = "pdf" | |||
| from extensions.ext_database import db | |||
| db.session.commit() | |||
| # Mock long text content | |||
| long_text = "x" * 5000 # Longer than PREVIEW_WORDS_LIMIT | |||
| mock_external_service_dependencies["extract_processor"].load_from_upload_file.return_value = long_text | |||
| result = FileService.get_file_preview(file_id=upload_file.id) | |||
| assert len(result) == 3000 # PREVIEW_WORDS_LIMIT | |||
| assert result == "x" * 3000 | |||
| # Test get_image_preview method | |||
| def test_get_image_preview_success(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test successful image preview generation. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| upload_file = self._create_test_upload_file( | |||
| db_session_with_containers, mock_external_service_dependencies, account | |||
| ) | |||
| # Update file to have image extension | |||
| upload_file.extension = "jpg" | |||
| from extensions.ext_database import db | |||
| db.session.commit() | |||
| timestamp = "1234567890" | |||
| nonce = "test_nonce" | |||
| sign = "test_signature" | |||
| generator, mime_type = FileService.get_image_preview( | |||
| file_id=upload_file.id, | |||
| timestamp=timestamp, | |||
| nonce=nonce, | |||
| sign=sign, | |||
| ) | |||
| assert generator is not None | |||
| assert mime_type == upload_file.mime_type | |||
| mock_external_service_dependencies["file_helpers"].verify_image_signature.assert_called_once() | |||
| def test_get_image_preview_invalid_signature(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test image preview with invalid signature. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| upload_file = self._create_test_upload_file( | |||
| db_session_with_containers, mock_external_service_dependencies, account | |||
| ) | |||
| # Mock invalid signature | |||
| mock_external_service_dependencies["file_helpers"].verify_image_signature.return_value = False | |||
| timestamp = "1234567890" | |||
| nonce = "test_nonce" | |||
| sign = "invalid_signature" | |||
| with pytest.raises(NotFound, match="File not found or signature is invalid"): | |||
| FileService.get_image_preview( | |||
| file_id=upload_file.id, | |||
| timestamp=timestamp, | |||
| nonce=nonce, | |||
| sign=sign, | |||
| ) | |||
| def test_get_image_preview_file_not_found(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test image preview with non-existent file. | |||
| """ | |||
| fake = Faker() | |||
| non_existent_id = str(fake.uuid4()) | |||
| timestamp = "1234567890" | |||
| nonce = "test_nonce" | |||
| sign = "test_signature" | |||
| with pytest.raises(NotFound, match="File not found or signature is invalid"): | |||
| FileService.get_image_preview( | |||
| file_id=non_existent_id, | |||
| timestamp=timestamp, | |||
| nonce=nonce, | |||
| sign=sign, | |||
| ) | |||
| def test_get_image_preview_unsupported_file_type( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test image preview with non-image file type. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| upload_file = self._create_test_upload_file( | |||
| db_session_with_containers, mock_external_service_dependencies, account | |||
| ) | |||
| # Update file to have non-image extension | |||
| upload_file.extension = "pdf" | |||
| from extensions.ext_database import db | |||
| db.session.commit() | |||
| timestamp = "1234567890" | |||
| nonce = "test_nonce" | |||
| sign = "test_signature" | |||
| with pytest.raises(UnsupportedFileTypeError): | |||
| FileService.get_image_preview( | |||
| file_id=upload_file.id, | |||
| timestamp=timestamp, | |||
| nonce=nonce, | |||
| sign=sign, | |||
| ) | |||
| # Test get_file_generator_by_file_id method | |||
| def test_get_file_generator_by_file_id_success( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test successful file generator retrieval. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| upload_file = self._create_test_upload_file( | |||
| db_session_with_containers, mock_external_service_dependencies, account | |||
| ) | |||
| timestamp = "1234567890" | |||
| nonce = "test_nonce" | |||
| sign = "test_signature" | |||
| generator, file_obj = FileService.get_file_generator_by_file_id( | |||
| file_id=upload_file.id, | |||
| timestamp=timestamp, | |||
| nonce=nonce, | |||
| sign=sign, | |||
| ) | |||
| assert generator is not None | |||
| assert file_obj == upload_file | |||
| mock_external_service_dependencies["file_helpers"].verify_file_signature.assert_called_once() | |||
| def test_get_file_generator_by_file_id_invalid_signature( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file generator retrieval with invalid signature. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| upload_file = self._create_test_upload_file( | |||
| db_session_with_containers, mock_external_service_dependencies, account | |||
| ) | |||
| # Mock invalid signature | |||
| mock_external_service_dependencies["file_helpers"].verify_file_signature.return_value = False | |||
| timestamp = "1234567890" | |||
| nonce = "test_nonce" | |||
| sign = "invalid_signature" | |||
| with pytest.raises(NotFound, match="File not found or signature is invalid"): | |||
| FileService.get_file_generator_by_file_id( | |||
| file_id=upload_file.id, | |||
| timestamp=timestamp, | |||
| nonce=nonce, | |||
| sign=sign, | |||
| ) | |||
| def test_get_file_generator_by_file_id_file_not_found( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file generator retrieval with non-existent file. | |||
| """ | |||
| fake = Faker() | |||
| non_existent_id = str(fake.uuid4()) | |||
| timestamp = "1234567890" | |||
| nonce = "test_nonce" | |||
| sign = "test_signature" | |||
| with pytest.raises(NotFound, match="File not found or signature is invalid"): | |||
| FileService.get_file_generator_by_file_id( | |||
| file_id=non_existent_id, | |||
| timestamp=timestamp, | |||
| nonce=nonce, | |||
| sign=sign, | |||
| ) | |||
| # Test get_public_image_preview method | |||
| def test_get_public_image_preview_success(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test successful public image preview generation. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| upload_file = self._create_test_upload_file( | |||
| db_session_with_containers, mock_external_service_dependencies, account | |||
| ) | |||
| # Update file to have image extension | |||
| upload_file.extension = "jpg" | |||
| from extensions.ext_database import db | |||
| db.session.commit() | |||
| generator, mime_type = FileService.get_public_image_preview(file_id=upload_file.id) | |||
| assert generator is not None | |||
| assert mime_type == upload_file.mime_type | |||
| mock_external_service_dependencies["storage"].load.assert_called_once() | |||
| def test_get_public_image_preview_file_not_found( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test public image preview with non-existent file. | |||
| """ | |||
| fake = Faker() | |||
| non_existent_id = str(fake.uuid4()) | |||
| with pytest.raises(NotFound, match="File not found or signature is invalid"): | |||
| FileService.get_public_image_preview(file_id=non_existent_id) | |||
| def test_get_public_image_preview_unsupported_file_type( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test public image preview with non-image file type. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| upload_file = self._create_test_upload_file( | |||
| db_session_with_containers, mock_external_service_dependencies, account | |||
| ) | |||
| # Update file to have non-image extension | |||
| upload_file.extension = "pdf" | |||
| from extensions.ext_database import db | |||
| db.session.commit() | |||
| with pytest.raises(UnsupportedFileTypeError): | |||
| FileService.get_public_image_preview(file_id=upload_file.id) | |||
| # Test edge cases and boundary conditions | |||
| def test_upload_file_empty_content(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test file upload with empty content. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| filename = "empty.txt" | |||
| content = b"" | |||
| mimetype = "text/plain" | |||
| upload_file = FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=account, | |||
| ) | |||
| assert upload_file is not None | |||
| assert upload_file.size == 0 | |||
| def test_upload_file_special_characters_in_name( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file upload with special characters in filename (but valid ones). | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| filename = "test-file_with_underscores_and.dots.txt" | |||
| content = b"test content" | |||
| mimetype = "text/plain" | |||
| upload_file = FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=account, | |||
| ) | |||
| assert upload_file is not None | |||
| assert upload_file.name == filename | |||
| def test_upload_file_different_case_extensions( | |||
| self, db_session_with_containers, mock_external_service_dependencies | |||
| ): | |||
| """ | |||
| Test file upload with different case extensions. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| filename = "test.PDF" | |||
| content = b"test content" | |||
| mimetype = "application/pdf" | |||
| upload_file = FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=account, | |||
| ) | |||
| assert upload_file is not None | |||
| assert upload_file.extension == "pdf" # Should be converted to lowercase | |||
| def test_upload_text_empty_text(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test text upload with empty text. | |||
| """ | |||
| fake = Faker() | |||
| text = "" | |||
| text_name = "empty.txt" | |||
| # Mock current_user | |||
| with patch("services.file_service.current_user") as mock_current_user: | |||
| mock_current_user.current_tenant_id = str(fake.uuid4()) | |||
| mock_current_user.id = str(fake.uuid4()) | |||
| upload_file = FileService.upload_text(text=text, text_name=text_name) | |||
| assert upload_file is not None | |||
| assert upload_file.size == 0 | |||
| def test_file_size_limits_edge_cases(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test file size limits with edge case values. | |||
| """ | |||
| # Test exactly at limit | |||
| for extension, limit_config in [ | |||
| ("jpg", dify_config.UPLOAD_IMAGE_FILE_SIZE_LIMIT), | |||
| ("mp4", dify_config.UPLOAD_VIDEO_FILE_SIZE_LIMIT), | |||
| ("mp3", dify_config.UPLOAD_AUDIO_FILE_SIZE_LIMIT), | |||
| ("pdf", dify_config.UPLOAD_FILE_SIZE_LIMIT), | |||
| ]: | |||
| file_size = limit_config * 1024 * 1024 | |||
| result = FileService.is_file_size_within_limit(extension=extension, file_size=file_size) | |||
| assert result is True | |||
| # Test one byte over limit | |||
| file_size = limit_config * 1024 * 1024 + 1 | |||
| result = FileService.is_file_size_within_limit(extension=extension, file_size=file_size) | |||
| assert result is False | |||
| def test_upload_file_with_source_url(self, db_session_with_containers, mock_external_service_dependencies): | |||
| """ | |||
| Test file upload with source URL that gets overridden by signed URL. | |||
| """ | |||
| fake = Faker() | |||
| account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) | |||
| filename = "test.pdf" | |||
| content = b"test content" | |||
| mimetype = "application/pdf" | |||
| source_url = "https://original-source.com/file.pdf" | |||
| upload_file = FileService.upload_file( | |||
| filename=filename, | |||
| content=content, | |||
| mimetype=mimetype, | |||
| user=account, | |||
| source_url=source_url, | |||
| ) | |||
| # When source_url is provided, it should be preserved | |||
| assert upload_file.source_url == source_url | |||
| # The signed URL should only be set when source_url is empty | |||
| # Let's test that scenario | |||
| upload_file2 = FileService.upload_file( | |||
| filename="test2.pdf", | |||
| content=b"test content 2", | |||
| mimetype="application/pdf", | |||
| user=account, | |||
| source_url="", # Empty source_url | |||
| ) | |||
| # Should have the signed URL when source_url is empty | |||
| assert upload_file2.source_url == "https://example.com/signed-url" | |||