| 
                        123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 | 
                        - """Integration tests for ClickZetta Volume Storage."""
 - 
 - import os
 - import tempfile
 - import unittest
 - 
 - import pytest
 - 
 - from extensions.storage.clickzetta_volume.clickzetta_volume_storage import (
 -     ClickZettaVolumeConfig,
 -     ClickZettaVolumeStorage,
 - )
 - 
 - 
 - class TestClickZettaVolumeStorage(unittest.TestCase):
 -     """Test cases for ClickZetta Volume Storage."""
 - 
 -     def setUp(self):
 -         """Set up test environment."""
 -         self.config = ClickZettaVolumeConfig(
 -             username=os.getenv("CLICKZETTA_USERNAME", "test_user"),
 -             password=os.getenv("CLICKZETTA_PASSWORD", "test_pass"),
 -             instance=os.getenv("CLICKZETTA_INSTANCE", "test_instance"),
 -             service=os.getenv("CLICKZETTA_SERVICE", "uat-api.clickzetta.com"),
 -             workspace=os.getenv("CLICKZETTA_WORKSPACE", "quick_start"),
 -             vcluster=os.getenv("CLICKZETTA_VCLUSTER", "default_ap"),
 -             schema_name=os.getenv("CLICKZETTA_SCHEMA", "dify"),
 -             volume_type="table",
 -             table_prefix="test_dataset_",
 -         )
 - 
 -     @pytest.mark.skipif(not os.getenv("CLICKZETTA_USERNAME"), reason="ClickZetta credentials not provided")
 -     def test_user_volume_operations(self):
 -         """Test basic operations with User Volume."""
 -         config = self.config
 -         config.volume_type = "user"
 - 
 -         storage = ClickZettaVolumeStorage(config)
 - 
 -         # Test file operations
 -         test_filename = "test_file.txt"
 -         test_content = b"Hello, ClickZetta Volume!"
 - 
 -         # Save file
 -         storage.save(test_filename, test_content)
 - 
 -         # Check if file exists
 -         assert storage.exists(test_filename)
 - 
 -         # Load file
 -         loaded_content = storage.load_once(test_filename)
 -         assert loaded_content == test_content
 - 
 -         # Test streaming
 -         stream_content = b""
 -         for chunk in storage.load_stream(test_filename):
 -             stream_content += chunk
 -         assert stream_content == test_content
 - 
 -         # Test download
 -         with tempfile.NamedTemporaryFile() as temp_file:
 -             storage.download(test_filename, temp_file.name)
 -             with open(temp_file.name, "rb") as f:
 -                 downloaded_content = f.read()
 -             assert downloaded_content == test_content
 - 
 -         # Test scan
 -         files = storage.scan("", files=True, directories=False)
 -         assert test_filename in files
 - 
 -         # Delete file
 -         storage.delete(test_filename)
 -         assert not storage.exists(test_filename)
 - 
 -     @pytest.mark.skipif(not os.getenv("CLICKZETTA_USERNAME"), reason="ClickZetta credentials not provided")
 -     def test_table_volume_operations(self):
 -         """Test basic operations with Table Volume."""
 -         config = self.config
 -         config.volume_type = "table"
 - 
 -         storage = ClickZettaVolumeStorage(config)
 - 
 -         # Test file operations with dataset_id
 -         dataset_id = "12345"
 -         test_filename = f"{dataset_id}/test_file.txt"
 -         test_content = b"Hello, Table Volume!"
 - 
 -         # Save file
 -         storage.save(test_filename, test_content)
 - 
 -         # Check if file exists
 -         assert storage.exists(test_filename)
 - 
 -         # Load file
 -         loaded_content = storage.load_once(test_filename)
 -         assert loaded_content == test_content
 - 
 -         # Test scan for dataset
 -         files = storage.scan(dataset_id, files=True, directories=False)
 -         assert "test_file.txt" in files
 - 
 -         # Delete file
 -         storage.delete(test_filename)
 -         assert not storage.exists(test_filename)
 - 
 -     def test_config_validation(self):
 -         """Test configuration validation."""
 -         # Test missing required fields
 -         with pytest.raises(ValueError):
 -             ClickZettaVolumeConfig(
 -                 username="",  # Empty username should fail
 -                 password="pass",
 -                 instance="instance",
 -             )
 - 
 -         # Test invalid volume type
 -         with pytest.raises(ValueError):
 -             ClickZettaVolumeConfig(username="user", password="pass", instance="instance", volume_type="invalid_type")
 - 
 -         # Test external volume without volume_name
 -         with pytest.raises(ValueError):
 -             ClickZettaVolumeConfig(
 -                 username="user",
 -                 password="pass",
 -                 instance="instance",
 -                 volume_type="external",
 -                 # Missing volume_name
 -             )
 - 
 -     def test_volume_path_generation(self):
 -         """Test volume path generation for different types."""
 -         storage = ClickZettaVolumeStorage(self.config)
 - 
 -         # Test table volume path
 -         path = storage._get_volume_path("test.txt", "12345")
 -         assert path == "test_dataset_12345/test.txt"
 - 
 -         # Test path with existing dataset_id prefix
 -         path = storage._get_volume_path("12345/test.txt")
 -         assert path == "12345/test.txt"
 - 
 -         # Test user volume
 -         storage._config.volume_type = "user"
 -         path = storage._get_volume_path("test.txt")
 -         assert path == "test.txt"
 - 
 -     def test_sql_prefix_generation(self):
 -         """Test SQL prefix generation for different volume types."""
 -         storage = ClickZettaVolumeStorage(self.config)
 - 
 -         # Test table volume SQL prefix
 -         prefix = storage._get_volume_sql_prefix("12345")
 -         assert prefix == "TABLE VOLUME test_dataset_12345"
 - 
 -         # Test user volume SQL prefix
 -         storage._config.volume_type = "user"
 -         prefix = storage._get_volume_sql_prefix()
 -         assert prefix == "USER VOLUME"
 - 
 -         # Test external volume SQL prefix
 -         storage._config.volume_type = "external"
 -         storage._config.volume_name = "my_external_volume"
 -         prefix = storage._get_volume_sql_prefix()
 -         assert prefix == "VOLUME my_external_volume"
 - 
 - 
 - if __name__ == "__main__":
 -     unittest.main()
 
 
  |