| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- """Integration tests for ClickZetta Volume Storage."""
-
- import os
- import tempfile
- import unittest
-
- import pytest
-
- from extensions.storage.clickzetta_volume.clickzetta_volume_storage import (
- ClickZettaVolumeConfig,
- ClickZettaVolumeStorage,
- )
-
-
- class TestClickZettaVolumeStorage(unittest.TestCase):
- """Test cases for ClickZetta Volume Storage."""
-
- def setUp(self):
- """Set up test environment."""
- self.config = ClickZettaVolumeConfig(
- username=os.getenv("CLICKZETTA_USERNAME", "test_user"),
- password=os.getenv("CLICKZETTA_PASSWORD", "test_pass"),
- instance=os.getenv("CLICKZETTA_INSTANCE", "test_instance"),
- service=os.getenv("CLICKZETTA_SERVICE", "uat-api.clickzetta.com"),
- workspace=os.getenv("CLICKZETTA_WORKSPACE", "quick_start"),
- vcluster=os.getenv("CLICKZETTA_VCLUSTER", "default_ap"),
- schema_name=os.getenv("CLICKZETTA_SCHEMA", "dify"),
- volume_type="table",
- table_prefix="test_dataset_",
- )
-
- @pytest.mark.skipif(not os.getenv("CLICKZETTA_USERNAME"), reason="ClickZetta credentials not provided")
- def test_user_volume_operations(self):
- """Test basic operations with User Volume."""
- config = self.config
- config.volume_type = "user"
-
- storage = ClickZettaVolumeStorage(config)
-
- # Test file operations
- test_filename = "test_file.txt"
- test_content = b"Hello, ClickZetta Volume!"
-
- # Save file
- storage.save(test_filename, test_content)
-
- # Check if file exists
- assert storage.exists(test_filename)
-
- # Load file
- loaded_content = storage.load_once(test_filename)
- assert loaded_content == test_content
-
- # Test streaming
- stream_content = b""
- for chunk in storage.load_stream(test_filename):
- stream_content += chunk
- assert stream_content == test_content
-
- # Test download
- with tempfile.NamedTemporaryFile() as temp_file:
- storage.download(test_filename, temp_file.name)
- with open(temp_file.name, "rb") as f:
- downloaded_content = f.read()
- assert downloaded_content == test_content
-
- # Test scan
- files = storage.scan("", files=True, directories=False)
- assert test_filename in files
-
- # Delete file
- storage.delete(test_filename)
- assert not storage.exists(test_filename)
-
- @pytest.mark.skipif(not os.getenv("CLICKZETTA_USERNAME"), reason="ClickZetta credentials not provided")
- def test_table_volume_operations(self):
- """Test basic operations with Table Volume."""
- config = self.config
- config.volume_type = "table"
-
- storage = ClickZettaVolumeStorage(config)
-
- # Test file operations with dataset_id
- dataset_id = "12345"
- test_filename = f"{dataset_id}/test_file.txt"
- test_content = b"Hello, Table Volume!"
-
- # Save file
- storage.save(test_filename, test_content)
-
- # Check if file exists
- assert storage.exists(test_filename)
-
- # Load file
- loaded_content = storage.load_once(test_filename)
- assert loaded_content == test_content
-
- # Test scan for dataset
- files = storage.scan(dataset_id, files=True, directories=False)
- assert "test_file.txt" in files
-
- # Delete file
- storage.delete(test_filename)
- assert not storage.exists(test_filename)
-
- def test_config_validation(self):
- """Test configuration validation."""
- # Test missing required fields
- with pytest.raises(ValueError):
- ClickZettaVolumeConfig(
- username="", # Empty username should fail
- password="pass",
- instance="instance",
- )
-
- # Test invalid volume type
- with pytest.raises(ValueError):
- ClickZettaVolumeConfig(username="user", password="pass", instance="instance", volume_type="invalid_type")
-
- # Test external volume without volume_name
- with pytest.raises(ValueError):
- ClickZettaVolumeConfig(
- username="user",
- password="pass",
- instance="instance",
- volume_type="external",
- # Missing volume_name
- )
-
- def test_volume_path_generation(self):
- """Test volume path generation for different types."""
- storage = ClickZettaVolumeStorage(self.config)
-
- # Test table volume path
- path = storage._get_volume_path("test.txt", "12345")
- assert path == "test_dataset_12345/test.txt"
-
- # Test path with existing dataset_id prefix
- path = storage._get_volume_path("12345/test.txt")
- assert path == "12345/test.txt"
-
- # Test user volume
- storage._config.volume_type = "user"
- path = storage._get_volume_path("test.txt")
- assert path == "test.txt"
-
- def test_sql_prefix_generation(self):
- """Test SQL prefix generation for different volume types."""
- storage = ClickZettaVolumeStorage(self.config)
-
- # Test table volume SQL prefix
- prefix = storage._get_volume_sql_prefix("12345")
- assert prefix == "TABLE VOLUME test_dataset_12345"
-
- # Test user volume SQL prefix
- storage._config.volume_type = "user"
- prefix = storage._get_volume_sql_prefix()
- assert prefix == "USER VOLUME"
-
- # Test external volume SQL prefix
- storage._config.volume_type = "external"
- storage._config.volume_name = "my_external_volume"
- prefix = storage._get_volume_sql_prefix()
- assert prefix == "VOLUME my_external_volume"
-
-
- if __name__ == "__main__":
- unittest.main()
|