| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465 |
- """
- TestContainers-based integration test configuration for Dify API.
-
- This module provides containerized test infrastructure using TestContainers library
- to spin up real database and service instances for integration testing. This approach
- ensures tests run against actual service implementations rather than mocks, providing
- more reliable and realistic test scenarios.
- """
-
- import logging
- import os
- from collections.abc import Generator
- from pathlib import Path
-
- import pytest
- from flask import Flask
- from flask.testing import FlaskClient
- from sqlalchemy import Engine, text
- from sqlalchemy.orm import Session
- from testcontainers.core.container import DockerContainer
- from testcontainers.core.waiting_utils import wait_for_logs
- from testcontainers.postgres import PostgresContainer
- from testcontainers.redis import RedisContainer
-
- from app_factory import create_app
- from extensions.ext_database import db
-
- # Configure logging for test containers
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
- logger = logging.getLogger(__name__)
-
-
- class DifyTestContainers:
- """
- Manages all test containers required for Dify integration tests.
-
- This class provides a centralized way to manage multiple containers
- needed for comprehensive integration testing, including databases,
- caches, and search engines.
- """
-
- def __init__(self):
- """Initialize container management with default configurations."""
- self.postgres: PostgresContainer | None = None
- self.redis: RedisContainer | None = None
- self.dify_sandbox: DockerContainer | None = None
- self.dify_plugin_daemon: DockerContainer | None = None
- self._containers_started = False
- logger.info("DifyTestContainers initialized - ready to manage test containers")
-
- def start_containers_with_env(self):
- """
- Start all required containers for integration testing.
-
- This method initializes and starts PostgreSQL, Redis
- containers with appropriate configurations for Dify testing. Containers
- are started in dependency order to ensure proper initialization.
- """
- if self._containers_started:
- logger.info("Containers already started - skipping container startup")
- return
-
- logger.info("Starting test containers for Dify integration tests...")
-
- # Start PostgreSQL container for main application database
- # PostgreSQL is used for storing user data, workflows, and application state
- logger.info("Initializing PostgreSQL container...")
- self.postgres = PostgresContainer(
- image="postgres:14-alpine",
- )
- self.postgres.start()
- db_host = self.postgres.get_container_host_ip()
- db_port = self.postgres.get_exposed_port(5432)
- os.environ["DB_HOST"] = db_host
- os.environ["DB_PORT"] = str(db_port)
- os.environ["DB_USERNAME"] = self.postgres.username
- os.environ["DB_PASSWORD"] = self.postgres.password
- os.environ["DB_DATABASE"] = self.postgres.dbname
- logger.info(
- "PostgreSQL container started successfully - Host: %s, Port: %s User: %s, Database: %s",
- db_host,
- db_port,
- self.postgres.username,
- self.postgres.dbname,
- )
-
- # Wait for PostgreSQL to be ready
- logger.info("Waiting for PostgreSQL to be ready to accept connections...")
- wait_for_logs(self.postgres, "is ready to accept connections", timeout=30)
- logger.info("PostgreSQL container is ready and accepting connections")
-
- # Install uuid-ossp extension for UUID generation
- logger.info("Installing uuid-ossp extension...")
- try:
- import psycopg2
-
- conn = psycopg2.connect(
- host=db_host,
- port=db_port,
- user=self.postgres.username,
- password=self.postgres.password,
- database=self.postgres.dbname,
- )
- conn.autocommit = True
- cursor = conn.cursor()
- cursor.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";')
- cursor.close()
- conn.close()
- logger.info("uuid-ossp extension installed successfully")
- except Exception as e:
- logger.warning("Failed to install uuid-ossp extension: %s", e)
-
- # Create plugin database for dify-plugin-daemon
- logger.info("Creating plugin database...")
- try:
- conn = psycopg2.connect(
- host=db_host,
- port=db_port,
- user=self.postgres.username,
- password=self.postgres.password,
- database=self.postgres.dbname,
- )
- conn.autocommit = True
- cursor = conn.cursor()
- cursor.execute("CREATE DATABASE dify_plugin;")
- cursor.close()
- conn.close()
- logger.info("Plugin database created successfully")
- except Exception as e:
- logger.warning("Failed to create plugin database: %s", e)
-
- # Set up storage environment variables
- os.environ["STORAGE_TYPE"] = "opendal"
- os.environ["OPENDAL_SCHEME"] = "fs"
- os.environ["OPENDAL_FS_ROOT"] = "storage"
-
- # Start Redis container for caching and session management
- # Redis is used for storing session data, cache entries, and temporary data
- logger.info("Initializing Redis container...")
- self.redis = RedisContainer(image="redis:6-alpine", port=6379)
- self.redis.start()
- redis_host = self.redis.get_container_host_ip()
- redis_port = self.redis.get_exposed_port(6379)
- os.environ["REDIS_HOST"] = redis_host
- os.environ["REDIS_PORT"] = str(redis_port)
- logger.info("Redis container started successfully - Host: %s, Port: %s", redis_host, redis_port)
-
- # Wait for Redis to be ready
- logger.info("Waiting for Redis to be ready to accept connections...")
- wait_for_logs(self.redis, "Ready to accept connections", timeout=30)
- logger.info("Redis container is ready and accepting connections")
-
- # Start Dify Sandbox container for code execution environment
- # Dify Sandbox provides a secure environment for executing user code
- logger.info("Initializing Dify Sandbox container...")
- self.dify_sandbox = DockerContainer(image="langgenius/dify-sandbox:latest")
- self.dify_sandbox.with_exposed_ports(8194)
- self.dify_sandbox.env = {
- "API_KEY": "test_api_key",
- }
- self.dify_sandbox.start()
- sandbox_host = self.dify_sandbox.get_container_host_ip()
- sandbox_port = self.dify_sandbox.get_exposed_port(8194)
- os.environ["CODE_EXECUTION_ENDPOINT"] = f"http://{sandbox_host}:{sandbox_port}"
- os.environ["CODE_EXECUTION_API_KEY"] = "test_api_key"
- logger.info("Dify Sandbox container started successfully - Host: %s, Port: %s", sandbox_host, sandbox_port)
-
- # Wait for Dify Sandbox to be ready
- logger.info("Waiting for Dify Sandbox to be ready to accept connections...")
- wait_for_logs(self.dify_sandbox, "config init success", timeout=60)
- logger.info("Dify Sandbox container is ready and accepting connections")
-
- # Start Dify Plugin Daemon container for plugin management
- # Dify Plugin Daemon provides plugin lifecycle management and execution
- logger.info("Initializing Dify Plugin Daemon container...")
- self.dify_plugin_daemon = DockerContainer(image="langgenius/dify-plugin-daemon:0.3.0-local")
- self.dify_plugin_daemon.with_exposed_ports(5002)
- self.dify_plugin_daemon.env = {
- "DB_HOST": db_host,
- "DB_PORT": str(db_port),
- "DB_USERNAME": self.postgres.username,
- "DB_PASSWORD": self.postgres.password,
- "DB_DATABASE": "dify_plugin",
- "REDIS_HOST": redis_host,
- "REDIS_PORT": str(redis_port),
- "REDIS_PASSWORD": "",
- "SERVER_PORT": "5002",
- "SERVER_KEY": "test_plugin_daemon_key",
- "MAX_PLUGIN_PACKAGE_SIZE": "52428800",
- "PPROF_ENABLED": "false",
- "DIFY_INNER_API_URL": f"http://{db_host}:5001",
- "DIFY_INNER_API_KEY": "test_inner_api_key",
- "PLUGIN_REMOTE_INSTALLING_HOST": "0.0.0.0",
- "PLUGIN_REMOTE_INSTALLING_PORT": "5003",
- "PLUGIN_WORKING_PATH": "/app/storage/cwd",
- "FORCE_VERIFYING_SIGNATURE": "false",
- "PYTHON_ENV_INIT_TIMEOUT": "120",
- "PLUGIN_MAX_EXECUTION_TIMEOUT": "600",
- "PLUGIN_STDIO_BUFFER_SIZE": "1024",
- "PLUGIN_STDIO_MAX_BUFFER_SIZE": "5242880",
- "PLUGIN_STORAGE_TYPE": "local",
- "PLUGIN_STORAGE_LOCAL_ROOT": "/app/storage",
- "PLUGIN_INSTALLED_PATH": "plugin",
- "PLUGIN_PACKAGE_CACHE_PATH": "plugin_packages",
- "PLUGIN_MEDIA_CACHE_PATH": "assets",
- }
-
- try:
- self.dify_plugin_daemon.start()
- plugin_daemon_host = self.dify_plugin_daemon.get_container_host_ip()
- plugin_daemon_port = self.dify_plugin_daemon.get_exposed_port(5002)
- os.environ["PLUGIN_DAEMON_URL"] = f"http://{plugin_daemon_host}:{plugin_daemon_port}"
- os.environ["PLUGIN_DAEMON_KEY"] = "test_plugin_daemon_key"
- logger.info(
- "Dify Plugin Daemon container started successfully - Host: %s, Port: %s",
- plugin_daemon_host,
- plugin_daemon_port,
- )
-
- # Wait for Dify Plugin Daemon to be ready
- logger.info("Waiting for Dify Plugin Daemon to be ready to accept connections...")
- wait_for_logs(self.dify_plugin_daemon, "start plugin manager daemon", timeout=60)
- logger.info("Dify Plugin Daemon container is ready and accepting connections")
- except Exception as e:
- logger.warning("Failed to start Dify Plugin Daemon container: %s", e)
- logger.info("Continuing without plugin daemon - some tests may be limited")
- self.dify_plugin_daemon = None
-
- self._containers_started = True
- logger.info("All test containers started successfully")
-
- def stop_containers(self):
- """
- Stop and clean up all test containers.
-
- This method ensures proper cleanup of all containers to prevent
- resource leaks and conflicts between test runs.
- """
- if not self._containers_started:
- logger.info("No containers to stop - containers were not started")
- return
-
- logger.info("Stopping and cleaning up test containers...")
- containers = [self.redis, self.postgres, self.dify_sandbox, self.dify_plugin_daemon]
- for container in containers:
- if container:
- try:
- container_name = container.image
- logger.info("Stopping container: %s", container_name)
- container.stop()
- logger.info("Successfully stopped container: %s", container_name)
- except Exception as e:
- # Log error but don't fail the test cleanup
- logger.warning("Failed to stop container %s: %s", container, e)
-
- self._containers_started = False
- logger.info("All test containers stopped and cleaned up successfully")
-
-
- # Global container manager instance
- _container_manager = DifyTestContainers()
-
-
- def _get_migration_dir() -> Path:
- conftest_dir = Path(__file__).parent
- return conftest_dir.parent.parent / "migrations"
-
-
- def _get_engine_url(engine: Engine):
- try:
- return engine.url.render_as_string(hide_password=False).replace("%", "%%")
- except AttributeError:
- return str(engine.url).replace("%", "%%")
-
-
- _UUIDv7SQL = r"""
- /* Main function to generate a uuidv7 value with millisecond precision */
- CREATE FUNCTION uuidv7() RETURNS uuid
- AS
- $$
- -- Replace the first 48 bits of a uuidv4 with the current
- -- number of milliseconds since 1970-01-01 UTC
- -- and set the "ver" field to 7 by setting additional bits
- SELECT encode(
- set_bit(
- set_bit(
- overlay(uuid_send(gen_random_uuid()) placing
- substring(int8send((extract(epoch from clock_timestamp()) * 1000)::bigint) from
- 3)
- from 1 for 6),
- 52, 1),
- 53, 1), 'hex')::uuid;
- $$ LANGUAGE SQL VOLATILE PARALLEL SAFE;
-
- COMMENT ON FUNCTION uuidv7 IS
- 'Generate a uuid-v7 value with a 48-bit timestamp (millisecond precision) and 74 bits of randomness';
-
- CREATE FUNCTION uuidv7_boundary(timestamptz) RETURNS uuid
- AS
- $$
- /* uuid fields: version=0b0111, variant=0b10 */
- SELECT encode(
- overlay('\x00000000000070008000000000000000'::bytea
- placing substring(int8send(floor(extract(epoch from $1) * 1000)::bigint) from 3)
- from 1 for 6),
- 'hex')::uuid;
- $$ LANGUAGE SQL STABLE STRICT PARALLEL SAFE;
-
- COMMENT ON FUNCTION uuidv7_boundary(timestamptz) IS
- 'Generate a non-random uuidv7 with the given timestamp (first 48 bits) and all random bits to 0.
- As the smallest possible uuidv7 for that timestamp, it may be used as a boundary for partitions.';
- """
-
-
- def _create_app_with_containers() -> Flask:
- """
- Create Flask application configured to use test containers.
-
- This function creates a Flask application instance that is configured
- to connect to the test containers instead of the default development
- or production databases.
-
- Returns:
- Flask: Configured Flask application for containerized testing
- """
- logger.info("Creating Flask application with test container configuration...")
-
- # Re-create the config after environment variables have been set
- from configs import dify_config
-
- # Force re-creation of config with new environment variables
- dify_config.__dict__.clear()
- dify_config.__init__()
-
- # Create and configure the Flask application
- logger.info("Initializing Flask application...")
- app = create_app()
- logger.info("Flask application created successfully")
-
- # Initialize database schema
- logger.info("Creating database schema...")
-
- with app.app_context():
- with db.engine.connect() as conn, conn.begin():
- conn.execute(text(_UUIDv7SQL))
- db.create_all()
- # migration_dir = _get_migration_dir()
- # alembic_config = Config()
- # alembic_config.config_file_name = str(migration_dir / "alembic.ini")
- # alembic_config.set_main_option("sqlalchemy.url", _get_engine_url(db.engine))
- # alembic_config.set_main_option("script_location", str(migration_dir))
- # alembic_command.upgrade(revision="head", config=alembic_config)
- logger.info("Database schema created successfully")
-
- logger.info("Flask application configured and ready for testing")
- return app
-
-
- @pytest.fixture(scope="session")
- def set_up_containers_and_env() -> Generator[DifyTestContainers, None, None]:
- """
- Session-scoped fixture to manage test containers.
-
- This fixture ensures containers are started once per test session
- and properly cleaned up when all tests are complete. This approach
- improves test performance by reusing containers across multiple tests.
-
- Yields:
- DifyTestContainers: Container manager instance
- """
- logger.info("=== Starting test session container management ===")
- _container_manager.start_containers_with_env()
- logger.info("Test containers ready for session")
- yield _container_manager
- logger.info("=== Cleaning up test session containers ===")
- _container_manager.stop_containers()
- logger.info("Test session container cleanup completed")
-
-
- @pytest.fixture(scope="session")
- def flask_app_with_containers(set_up_containers_and_env) -> Flask:
- """
- Session-scoped Flask application fixture using test containers.
-
- This fixture provides a Flask application instance that is configured
- to use the test containers for all database and service connections.
-
- Args:
- containers: Container manager fixture
-
- Returns:
- Flask: Configured Flask application
- """
- logger.info("=== Creating session-scoped Flask application ===")
- app = _create_app_with_containers()
- logger.info("Session-scoped Flask application created successfully")
- return app
-
-
- @pytest.fixture
- def flask_req_ctx_with_containers(flask_app_with_containers) -> Generator[None, None, None]:
- """
- Request context fixture for containerized Flask application.
-
- This fixture provides a Flask request context for tests that need
- to interact with the Flask application within a request scope.
-
- Args:
- flask_app_with_containers: Flask application fixture
-
- Yields:
- None: Request context is active during yield
- """
- logger.debug("Creating Flask request context...")
- with flask_app_with_containers.test_request_context():
- logger.debug("Flask request context active")
- yield
- logger.debug("Flask request context closed")
-
-
- @pytest.fixture
- def test_client_with_containers(flask_app_with_containers) -> Generator[FlaskClient, None, None]:
- """
- Test client fixture for containerized Flask application.
-
- This fixture provides a Flask test client that can be used to make
- HTTP requests to the containerized application for integration testing.
-
- Args:
- flask_app_with_containers: Flask application fixture
-
- Yields:
- FlaskClient: Test client instance
- """
- logger.debug("Creating Flask test client...")
- with flask_app_with_containers.test_client() as client:
- logger.debug("Flask test client ready")
- yield client
- logger.debug("Flask test client closed")
-
-
- @pytest.fixture
- def db_session_with_containers(flask_app_with_containers) -> Generator[Session, None, None]:
- """
- Database session fixture for containerized testing.
-
- This fixture provides a SQLAlchemy database session that is connected
- to the test PostgreSQL container, allowing tests to interact with
- the database directly.
-
- Args:
- flask_app_with_containers: Flask application fixture
-
- Yields:
- Session: Database session instance
- """
- logger.debug("Creating database session...")
- with flask_app_with_containers.app_context():
- session = db.session()
- logger.debug("Database session created and ready")
- try:
- yield session
- finally:
- session.close()
- logger.debug("Database session closed")
|