Ver código fonte

refactor(resolver): enhance schema reference resolution with improved error handling and caching

tags/2.0.0-beta.1
Harry 2 meses atrás
pai
commit
46019ea927

+ 9
- 1
api/core/plugin/impl/datasource.py Ver arquivo

@@ -16,6 +16,7 @@ from core.plugin.entities.plugin_daemon import (
PluginDatasourceProviderEntity,
)
from core.plugin.impl.base import BasePluginClient
from core.schemas.resolver import resolve_dify_schema_refs
from services.tools.tools_transform_service import ToolTransformService


@@ -32,6 +33,9 @@ class PluginDatasourceManager(BasePluginClient):
provider_name = declaration.get("identity", {}).get("name")
for datasource in declaration.get("datasources", []):
datasource["identity"]["provider"] = provider_name
# resolve refs
if datasource.get("output_schema"):
datasource["output_schema"] = resolve_dify_schema_refs(datasource["output_schema"])

return json_response

@@ -69,6 +73,9 @@ class PluginDatasourceManager(BasePluginClient):
provider_name = declaration.get("identity", {}).get("name")
for datasource in declaration.get("datasources", []):
datasource["identity"]["provider"] = provider_name
# resolve refs
if datasource.get("output_schema"):
datasource["output_schema"] = resolve_dify_schema_refs(datasource["output_schema"])

return json_response

@@ -106,7 +113,8 @@ class PluginDatasourceManager(BasePluginClient):
if data:
for datasource in data.get("declaration", {}).get("datasources", []):
datasource["identity"]["provider"] = tool_provider_id.provider_name

if datasource.get("output_schema"):
datasource["output_schema"] = resolve_dify_schema_refs(datasource["output_schema"])
return json_response

response = self._request_with_plugin_daemon_response(

+ 2
- 12
api/core/schemas/registry.py Ver arquivo

@@ -100,18 +100,8 @@ class SchemaRegistry:

def _parse_uri(self, uri: str) -> tuple[str, str]:
"""Parses a schema URI to extract version and schema name"""
import re
pattern = r"^https://dify\.ai/schemas/(v\d+)/(.+)\.json$"
match = re.match(pattern, uri)
if not match:
return "", ""
version = match.group(1)
schema_name = match.group(2)
return version, schema_name
from core.schemas.resolver import parse_dify_schema_uri
return parse_dify_schema_uri(uri)

def list_versions(self) -> list[str]:
"""Returns all available versions"""

+ 372
- 171
api/core/schemas/resolver.py Ver arquivo

@@ -1,207 +1,408 @@
import logging
import re
import threading
from collections import deque
from typing import Any, Optional
from dataclasses import dataclass
from typing import Any, Optional, Union

from core.schemas.registry import SchemaRegistry


def resolve_dify_schema_refs(schema: Any, registry: Optional[SchemaRegistry] = None, max_depth: int = 10) -> Any:
logger = logging.getLogger(__name__)

# Type aliases for better clarity
SchemaType = Union[dict[str, Any], list[Any], str, int, float, bool, None]
SchemaDict = dict[str, Any]

# Pre-compiled pattern for better performance
_DIFY_SCHEMA_PATTERN = re.compile(r"^https://dify\.ai/schemas/(v\d+)/(.+)\.json$")


class SchemaResolutionError(Exception):
"""Base exception for schema resolution errors"""
pass


class CircularReferenceError(SchemaResolutionError):
"""Raised when a circular reference is detected"""
def __init__(self, ref_uri: str, ref_path: list[str]):
self.ref_uri = ref_uri
self.ref_path = ref_path
super().__init__(f"Circular reference detected: {ref_uri} in path {' -> '.join(ref_path)}")


class MaxDepthExceededError(SchemaResolutionError):
"""Raised when maximum resolution depth is exceeded"""
def __init__(self, max_depth: int):
self.max_depth = max_depth
super().__init__(f"Maximum resolution depth ({max_depth}) exceeded")


class SchemaNotFoundError(SchemaResolutionError):
"""Raised when a referenced schema cannot be found"""
def __init__(self, ref_uri: str):
self.ref_uri = ref_uri
super().__init__(f"Schema not found: {ref_uri}")


@dataclass
class QueueItem:
"""Represents an item in the BFS queue"""
current: Any
parent: Optional[Any]
key: Optional[Union[str, int]]
depth: int
ref_path: set[str]


class SchemaResolver:
"""Resolver for Dify schema references with caching and optimizations"""
_cache: dict[str, SchemaDict] = {}
_cache_lock = threading.Lock()
def __init__(self, registry: Optional[SchemaRegistry] = None, max_depth: int = 10):
"""
Initialize the schema resolver
Args:
registry: Schema registry to use (defaults to default registry)
max_depth: Maximum depth for reference resolution
"""
self.registry = registry or SchemaRegistry.default_registry()
self.max_depth = max_depth
@classmethod
def clear_cache(cls) -> None:
"""Clear the global schema cache"""
with cls._cache_lock:
cls._cache.clear()
def resolve(self, schema: SchemaType) -> SchemaType:
"""
Resolve all $ref references in the schema
Performance optimization: quickly checks for $ref presence before processing.
Args:
schema: Schema to resolve
Returns:
Resolved schema with all references expanded
Raises:
CircularReferenceError: If circular reference detected
MaxDepthExceededError: If max depth exceeded
SchemaNotFoundError: If referenced schema not found
"""
if not isinstance(schema, (dict, list)):
return schema
# Fast path: if no Dify refs found, return original schema unchanged
# This avoids expensive deepcopy and BFS traversal for schemas without refs
if not _has_dify_refs(schema):
return schema
# Slow path: schema contains refs, perform full resolution
import copy
result = copy.deepcopy(schema)
# Initialize BFS queue
queue = deque([QueueItem(
current=result,
parent=None,
key=None,
depth=0,
ref_path=set()
)])
while queue:
item = queue.popleft()
# Process the current item
self._process_queue_item(queue, item)
return result
def _process_queue_item(self, queue: deque, item: QueueItem) -> None:
"""Process a single queue item"""
if isinstance(item.current, dict):
self._process_dict(queue, item)
elif isinstance(item.current, list):
self._process_list(queue, item)
def _process_dict(self, queue: deque, item: QueueItem) -> None:
"""Process a dictionary item"""
ref_uri = item.current.get("$ref")
if ref_uri and _is_dify_schema_ref(ref_uri):
# Handle $ref resolution
self._resolve_ref(queue, item, ref_uri)
else:
# Process nested items
for key, value in item.current.items():
if isinstance(value, (dict, list)):
next_depth = item.depth + 1
if next_depth >= self.max_depth:
raise MaxDepthExceededError(self.max_depth)
queue.append(QueueItem(
current=value,
parent=item.current,
key=key,
depth=next_depth,
ref_path=item.ref_path
))
def _process_list(self, queue: deque, item: QueueItem) -> None:
"""Process a list item"""
for idx, value in enumerate(item.current):
if isinstance(value, (dict, list)):
next_depth = item.depth + 1
if next_depth >= self.max_depth:
raise MaxDepthExceededError(self.max_depth)
queue.append(QueueItem(
current=value,
parent=item.current,
key=idx,
depth=next_depth,
ref_path=item.ref_path
))
def _resolve_ref(self, queue: deque, item: QueueItem, ref_uri: str) -> None:
"""Resolve a $ref reference"""
# Check for circular reference
if ref_uri in item.ref_path:
# Mark as circular and skip
item.current["$circular_ref"] = True
logger.warning("Circular reference detected: %s", ref_uri)
return
# Get resolved schema (from cache or registry)
resolved_schema = self._get_resolved_schema(ref_uri)
if not resolved_schema:
logger.warning("Schema not found: %s", ref_uri)
return
# Update ref path
new_ref_path = item.ref_path | {ref_uri}
# Replace the reference with resolved schema
next_depth = item.depth + 1
if next_depth >= self.max_depth:
raise MaxDepthExceededError(self.max_depth)
if item.parent is None:
# Root level replacement
item.current.clear()
item.current.update(resolved_schema)
queue.append(QueueItem(
current=item.current,
parent=None,
key=None,
depth=next_depth,
ref_path=new_ref_path
))
else:
# Update parent container
item.parent[item.key] = resolved_schema.copy()
queue.append(QueueItem(
current=item.parent[item.key],
parent=item.parent,
key=item.key,
depth=next_depth,
ref_path=new_ref_path
))
def _get_resolved_schema(self, ref_uri: str) -> Optional[SchemaDict]:
"""Get resolved schema from cache or registry"""
# Check cache first
with self._cache_lock:
if ref_uri in self._cache:
return self._cache[ref_uri].copy()
# Fetch from registry
schema = self.registry.get_schema(ref_uri)
if not schema:
return None
# Clean and cache
cleaned = _remove_metadata_fields(schema)
with self._cache_lock:
self._cache[ref_uri] = cleaned
return cleaned.copy()


def resolve_dify_schema_refs(
schema: SchemaType,
registry: Optional[SchemaRegistry] = None,
max_depth: int = 30
) -> SchemaType:
"""
Resolve $ref references in Dify schema to actual schema content

This is a convenience function that creates a resolver and resolves the schema.
Performance optimization: quickly checks for $ref presence before processing.
Args:
schema: Schema object that may contain $ref references
registry: Optional schema registry, defaults to default registry
max_depth: Maximum depth to prevent infinite loops (default: 10)

max_depth: Maximum depth to prevent infinite loops (default: 30)
Returns:
Schema with all $ref references resolved to actual content

Raises:
RecursionError: If maximum depth is exceeded
CircularReferenceError: If circular reference detected
MaxDepthExceededError: If maximum depth exceeded
SchemaNotFoundError: If referenced schema not found
"""
if registry is None:
registry = SchemaRegistry.default_registry()

return _resolve_refs_bfs(schema, registry, max_depth)
# Fast path: if no Dify refs found, return original schema unchanged
# This avoids expensive deepcopy and BFS traversal for schemas without refs
if not _has_dify_refs(schema):
return schema
# Slow path: schema contains refs, perform full resolution
resolver = SchemaResolver(registry, max_depth)
return resolver.resolve(schema)


def _resolve_refs_bfs(schema: Any, registry: SchemaRegistry, max_depth: int) -> Any:
def _remove_metadata_fields(schema: dict) -> dict:
"""
Resolve $ref references using Breadth-First Search (BFS) approach with cycle detection

Remove metadata fields from schema that shouldn't be included in resolved output
Args:
schema: Schema object to process
registry: Schema registry for lookups
max_depth: Maximum allowed depth

schema: Schema dictionary
Returns:
Schema with references resolved
Cleaned schema without metadata fields
"""
# Create a copy and remove metadata fields
cleaned = schema.copy()
metadata_fields = ["$id", "$schema", "version"]
for field in metadata_fields:
cleaned.pop(field, None)
return cleaned

Raises:
RecursionError: If maximum depth exceeded or circular reference detected
def _is_dify_schema_ref(ref_uri: Any) -> bool:
"""
import copy

# Deep copy the schema to avoid modifying original
result = copy.deepcopy(schema)

# Queue stores tuples: (current_value, parent_container, key_or_index, depth, ref_path)
# parent_container is the dict/list that contains current_value
# key_or_index is the key (for dict) or index (for list) to access current_value in parent
# ref_path is a tuple of resolved reference URIs to detect cycles
queue = deque([(result, None, None, 0, ())])

while queue:
current, parent, key, depth, ref_path = queue.popleft()

# Process based on type
if isinstance(current, dict):
# Check if this is a $ref reference
if "$ref" in current:
ref_uri = current["$ref"]

# Only resolve Dify schema references
if _is_dify_schema_ref(ref_uri):
# Check for circular reference
if ref_uri in ref_path:
# Found a cycle - leave the ref as-is to avoid infinite loop
# Could also raise an error here if preferred
current["$circular_ref"] = True # Mark as circular for debugging
continue

resolved_schema = registry.get_schema(ref_uri)
if resolved_schema:
# Remove metadata fields from resolved schema
cleaned_schema = _remove_metadata_fields(resolved_schema)

# Check depth limit before adding to queue
if depth + 1 > max_depth:
raise RecursionError(
f"Maximum depth ({max_depth}) exceeded while resolving schema references"
)

# Update ref_path with current reference
new_ref_path = ref_path + (ref_uri,)

# Replace the reference with resolved schema
if parent is None:
# Root level replacement
result = copy.deepcopy(cleaned_schema)
# Add the resolved schema back to queue for further processing
queue.append((result, None, None, depth + 1, new_ref_path))
else:
# Update parent container (works for both dict and list)
if isinstance(parent, (dict, list)):
parent[key] = copy.deepcopy(cleaned_schema)
# Add the resolved schema to queue for further processing
queue.append((parent[key], parent, key, depth + 1, new_ref_path))
# If schema not found, leave the original ref as-is
# Non-Dify reference, leave as-is
else:
# Regular dict, add all values to queue for processing
for k, v in current.items():
if isinstance(v, (dict, list)):
# Check depth limit before adding to queue
if depth + 1 > max_depth:
raise RecursionError(
f"Maximum depth ({max_depth}) exceeded while resolving schema references"
)
queue.append((v, current, k, depth + 1, ref_path))

elif isinstance(current, list):
# Process list items
for idx, item in enumerate(current):
if isinstance(item, (dict, list)):
# Check depth limit before adding to queue (fixed: should be > not >=)
if depth + 1 > max_depth:
raise RecursionError(f"Maximum depth ({max_depth}) exceeded while resolving schema references")
queue.append((item, current, idx, depth + 1, ref_path))

# Primitive values don't need processing

return result


def _resolve_refs_recursive(schema: Any, registry: SchemaRegistry, max_depth: int, current_depth: int) -> Any:
Check if the reference URI is a Dify schema reference
Args:
ref_uri: URI to check
Returns:
True if it's a Dify schema reference
"""
Recursively resolve $ref references in schema
if not isinstance(ref_uri, str):
return False
# Use pre-compiled pattern for better performance
return bool(_DIFY_SCHEMA_PATTERN.match(ref_uri))

Args:
schema: Schema object to process
registry: Schema registry for lookups
max_depth: Maximum allowed recursion depth
current_depth: Current recursion depth

def _has_dify_refs_recursive(schema: SchemaType) -> bool:
"""
Recursively check if a schema contains any Dify $ref references
This is the fallback method when string-based detection is not possible.
Args:
schema: Schema to check for references
Returns:
Schema with references resolved

Raises:
RecursionError: If maximum depth exceeded
True if any Dify $ref is found, False otherwise
"""
# Check recursion depth
if current_depth >= max_depth:
raise RecursionError(f"Maximum recursion depth ({max_depth}) exceeded while resolving schema references")

if isinstance(schema, dict):
# Check if this is a $ref reference
if "$ref" in schema:
ref_uri = schema["$ref"]

# Only resolve Dify schema references
if _is_dify_schema_ref(ref_uri):
resolved_schema = registry.get_schema(ref_uri)
if resolved_schema:
# Remove metadata fields from resolved schema
cleaned_schema = _remove_metadata_fields(resolved_schema)
# Recursively resolve the cleaned schema in case it contains more refs
return _resolve_refs_recursive(cleaned_schema, registry, max_depth, current_depth + 1)
else:
# If schema not found, return original ref (might be external or invalid)
return schema
else:
# Non-Dify reference, return as-is
return schema
else:
# Regular dict, recursively process all values
resolved_dict = {}
for key, value in schema.items():
resolved_dict[key] = _resolve_refs_recursive(value, registry, max_depth, current_depth + 1)
return resolved_dict

# Check if this dict has a $ref field
ref_uri = schema.get("$ref")
if ref_uri and _is_dify_schema_ref(ref_uri):
return True
# Check nested values
for value in schema.values():
if _has_dify_refs_recursive(value):
return True
elif isinstance(schema, list):
# Process list items recursively
return [_resolve_refs_recursive(item, registry, max_depth, current_depth + 1) for item in schema]

else:
# Primitive value, return as-is
return schema
# Check each item in the list
for item in schema:
if _has_dify_refs_recursive(item):
return True
# Primitive types don't contain refs
return False


def _remove_metadata_fields(schema: dict) -> dict:
def _has_dify_refs_hybrid(schema: SchemaType) -> bool:
"""
Remove metadata fields from schema that shouldn't be included in resolved output
Hybrid detection: fast string scan followed by precise recursive check
Performance optimization using two-phase detection:
1. Fast string scan to quickly eliminate schemas without $ref
2. Precise recursive validation only for potential candidates
Args:
schema: Schema to check for references
Returns:
True if any Dify $ref is found, False otherwise
"""
if not isinstance(schema, dict):
return schema

# Create a copy and remove metadata fields
cleaned = schema.copy()
metadata_fields = ["$id", "$schema", "version"]

for field in metadata_fields:
cleaned.pop(field, None)

return cleaned
# Phase 1: Fast string-based pre-filtering
try:
import json
schema_str = json.dumps(schema, separators=(',', ':'))
# Quick elimination: no $ref at all
if '"$ref"' not in schema_str:
return False
# Quick elimination: no Dify schema URLs
if 'https://dify.ai/schemas/' not in schema_str:
return False
except (TypeError, ValueError, OverflowError):
# JSON serialization failed (e.g., circular references, non-serializable objects)
# Fall back to recursive detection
logger.debug("JSON serialization failed for schema, using recursive detection")
return _has_dify_refs_recursive(schema)
# Phase 2: Precise recursive validation
# Only executed for schemas that passed string pre-filtering
return _has_dify_refs_recursive(schema)


def _has_dify_refs(schema: SchemaType) -> bool:
"""
Check if a schema contains any Dify $ref references
Uses hybrid detection for optimal performance:
- Fast string scan for quick elimination
- Precise recursive check for validation
Args:
schema: Schema to check for references
Returns:
True if any Dify $ref is found, False otherwise
"""
return _has_dify_refs_hybrid(schema)


def _is_dify_schema_ref(ref_uri: str) -> bool:
def parse_dify_schema_uri(uri: str) -> tuple[str, str]:
"""
Check if the reference URI is a Dify schema reference
Parse a Dify schema URI to extract version and schema name
Args:
uri: Schema URI to parse
Returns:
Tuple of (version, schema_name) or ("", "") if invalid
"""
if not isinstance(ref_uri, str):
return False

# Match Dify schema URI pattern: https://dify.ai/schemas/v*/name.json
pattern = r"^https://dify\.ai/schemas/(v\d+)/(.+)\.json$"
return bool(re.match(pattern, ref_uri))
match = _DIFY_SCHEMA_PATTERN.match(uri)
if not match:
return "", ""
return match.group(1), match.group(2)

+ 722
- 2
api/tests/unit_tests/core/schemas/test_resolver.py Ver arquivo

@@ -1,8 +1,21 @@
import time
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import MagicMock, patch

import pytest

from core.schemas import resolve_dify_schema_refs
from core.schemas.registry import SchemaRegistry
from core.schemas.resolver import (
MaxDepthExceededError,
SchemaResolver,
_has_dify_refs,
_has_dify_refs_hybrid,
_has_dify_refs_recursive,
_is_dify_schema_ref,
_remove_metadata_fields,
parse_dify_schema_uri,
)


class TestSchemaResolver:
@@ -11,6 +24,12 @@ class TestSchemaResolver:
def setup_method(self):
"""Setup method to initialize test resources"""
self.registry = SchemaRegistry.default_registry()
# Clear cache before each test
SchemaResolver.clear_cache()

def teardown_method(self):
"""Cleanup after each test"""
SchemaResolver.clear_cache()

def test_simple_ref_resolution(self):
"""Test resolving a simple $ref to a complete schema"""
@@ -156,5 +175,706 @@ class TestSchemaResolver:
assert resolved["type"] == "object"
# Should raise error with very low max_depth
with pytest.raises(RecursionError, match="Maximum recursion depth"):
resolve_dify_schema_refs(deep_schema, max_depth=5)
with pytest.raises(MaxDepthExceededError) as exc_info:
resolve_dify_schema_refs(deep_schema, max_depth=5)
assert exc_info.value.max_depth == 5

def test_circular_reference_detection(self):
"""Test that circular references are detected and handled"""
# Mock registry with circular reference
mock_registry = MagicMock()
mock_registry.get_schema.side_effect = lambda uri: {
"$ref": "https://dify.ai/schemas/v1/circular.json",
"type": "object"
}
schema = {"$ref": "https://dify.ai/schemas/v1/circular.json"}
resolved = resolve_dify_schema_refs(schema, registry=mock_registry)
# Should mark circular reference
assert "$circular_ref" in resolved

def test_schema_not_found_handling(self):
"""Test handling of missing schemas"""
# Mock registry that returns None for unknown schemas
mock_registry = MagicMock()
mock_registry.get_schema.return_value = None
schema = {"$ref": "https://dify.ai/schemas/v1/unknown.json"}
resolved = resolve_dify_schema_refs(schema, registry=mock_registry)
# Should keep the original $ref when schema not found
assert resolved["$ref"] == "https://dify.ai/schemas/v1/unknown.json"

def test_primitive_types_unchanged(self):
"""Test that primitive types are returned unchanged"""
assert resolve_dify_schema_refs("string") == "string"
assert resolve_dify_schema_refs(123) == 123
assert resolve_dify_schema_refs(True) is True
assert resolve_dify_schema_refs(None) is None
assert resolve_dify_schema_refs(3.14) == 3.14

def test_cache_functionality(self):
"""Test that caching works correctly"""
schema = {"$ref": "https://dify.ai/schemas/v1/file.json"}
# First resolution should fetch from registry
resolved1 = resolve_dify_schema_refs(schema)
# Mock the registry to return different data
with patch.object(self.registry, "get_schema") as mock_get:
mock_get.return_value = {"type": "different"}
# Second resolution should use cache
resolved2 = resolve_dify_schema_refs(schema)
# Should be the same as first resolution (from cache)
assert resolved1 == resolved2
# Mock should not have been called
mock_get.assert_not_called()
# Clear cache and try again
SchemaResolver.clear_cache()
# Now it should fetch again
resolved3 = resolve_dify_schema_refs(schema)
assert resolved3 == resolved1

def test_thread_safety(self):
"""Test that the resolver is thread-safe"""
schema = {
"type": "object",
"properties": {
f"prop_{i}": {"$ref": "https://dify.ai/schemas/v1/file.json"}
for i in range(10)
}
}
results = []
def resolve_in_thread():
try:
result = resolve_dify_schema_refs(schema)
results.append(result)
return True
except Exception as e:
results.append(e)
return False
# Run multiple threads concurrently
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(resolve_in_thread) for _ in range(20)]
success = all(f.result() for f in futures)
assert success
# All results should be the same
first_result = results[0]
assert all(r == first_result for r in results if not isinstance(r, Exception))

def test_mixed_nested_structures(self):
"""Test resolving refs in complex mixed structures"""
complex_schema = {
"type": "object",
"properties": {
"files": {
"type": "array",
"items": {"$ref": "https://dify.ai/schemas/v1/file.json"}
},
"nested": {
"type": "object",
"properties": {
"qa": {"$ref": "https://dify.ai/schemas/v1/qa_structure.json"},
"data": {
"type": "array",
"items": {
"type": "object",
"properties": {
"general": {"$ref": "https://dify.ai/schemas/v1/general_structure.json"}
}
}
}
}
}
}
}
resolved = resolve_dify_schema_refs(complex_schema, max_depth=20)
# Check structure is preserved
assert resolved["type"] == "object"
assert "files" in resolved["properties"]
assert "nested" in resolved["properties"]
# Check refs are resolved
assert resolved["properties"]["files"]["items"]["type"] == "object"
assert resolved["properties"]["files"]["items"]["title"] == "File Schema"
assert resolved["properties"]["nested"]["properties"]["qa"]["type"] == "object"
assert resolved["properties"]["nested"]["properties"]["qa"]["title"] == "Q&A Structure Schema"


class TestUtilityFunctions:
"""Test utility functions"""
def test_is_dify_schema_ref(self):
"""Test _is_dify_schema_ref function"""
# Valid Dify refs
assert _is_dify_schema_ref("https://dify.ai/schemas/v1/file.json")
assert _is_dify_schema_ref("https://dify.ai/schemas/v2/complex_name.json")
assert _is_dify_schema_ref("https://dify.ai/schemas/v999/test-file.json")
# Invalid refs
assert not _is_dify_schema_ref("https://example.com/schema.json")
assert not _is_dify_schema_ref("https://dify.ai/other/path.json")
assert not _is_dify_schema_ref("not a uri")
assert not _is_dify_schema_ref("")
assert not _is_dify_schema_ref(None)
assert not _is_dify_schema_ref(123)
assert not _is_dify_schema_ref(["list"])
def test_has_dify_refs(self):
"""Test _has_dify_refs function"""
# Schemas with Dify refs
assert _has_dify_refs({"$ref": "https://dify.ai/schemas/v1/file.json"})
assert _has_dify_refs({
"type": "object",
"properties": {
"data": {"$ref": "https://dify.ai/schemas/v1/file.json"}
}
})
assert _has_dify_refs([
{"type": "string"},
{"$ref": "https://dify.ai/schemas/v1/file.json"}
])
assert _has_dify_refs({
"type": "array",
"items": {
"type": "object",
"properties": {
"nested": {"$ref": "https://dify.ai/schemas/v1/qa_structure.json"}
}
}
})
# Schemas without Dify refs
assert not _has_dify_refs({"type": "string"})
assert not _has_dify_refs({
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "number"}
}
})
assert not _has_dify_refs([
{"type": "string"},
{"type": "number"},
{"type": "object", "properties": {"name": {"type": "string"}}}
])
# Schemas with non-Dify refs (should return False)
assert not _has_dify_refs({"$ref": "https://example.com/schema.json"})
assert not _has_dify_refs({
"type": "object",
"properties": {
"external": {"$ref": "https://example.com/external.json"}
}
})
# Primitive types
assert not _has_dify_refs("string")
assert not _has_dify_refs(123)
assert not _has_dify_refs(True)
assert not _has_dify_refs(None)
def test_has_dify_refs_hybrid_vs_recursive(self):
"""Test that hybrid and recursive detection give same results"""
test_schemas = [
# No refs
{"type": "string"},
{"type": "object", "properties": {"name": {"type": "string"}}},
[{"type": "string"}, {"type": "number"}],
# With Dify refs
{"$ref": "https://dify.ai/schemas/v1/file.json"},
{
"type": "object",
"properties": {
"data": {"$ref": "https://dify.ai/schemas/v1/file.json"}
}
},
[
{"type": "string"},
{"$ref": "https://dify.ai/schemas/v1/qa_structure.json"}
],
# With non-Dify refs
{"$ref": "https://example.com/schema.json"},
{
"type": "object",
"properties": {
"external": {"$ref": "https://example.com/external.json"}
}
},
# Complex nested
{
"type": "object",
"properties": {
"level1": {
"type": "object",
"properties": {
"level2": {
"type": "array",
"items": {"$ref": "https://dify.ai/schemas/v1/file.json"}
}
}
}
}
},
# Edge cases
{"description": "This mentions $ref but is not a reference"},
{"$ref": "not-a-url"},
# Primitive types
"string", 123, True, None, []
]
for schema in test_schemas:
hybrid_result = _has_dify_refs_hybrid(schema)
recursive_result = _has_dify_refs_recursive(schema)
assert hybrid_result == recursive_result, f"Mismatch for schema: {schema}"
def test_parse_dify_schema_uri(self):
"""Test parse_dify_schema_uri function"""
# Valid URIs
assert parse_dify_schema_uri("https://dify.ai/schemas/v1/file.json") == ("v1", "file")
assert parse_dify_schema_uri("https://dify.ai/schemas/v2/complex_name.json") == ("v2", "complex_name")
assert parse_dify_schema_uri("https://dify.ai/schemas/v999/test-file.json") == ("v999", "test-file")
# Invalid URIs
assert parse_dify_schema_uri("https://example.com/schema.json") == ("", "")
assert parse_dify_schema_uri("invalid") == ("", "")
assert parse_dify_schema_uri("") == ("", "")
def test_remove_metadata_fields(self):
"""Test _remove_metadata_fields function"""
schema = {
"$id": "should be removed",
"$schema": "should be removed",
"version": "should be removed",
"type": "object",
"title": "should remain",
"properties": {}
}
cleaned = _remove_metadata_fields(schema)
assert "$id" not in cleaned
assert "$schema" not in cleaned
assert "version" not in cleaned
assert cleaned["type"] == "object"
assert cleaned["title"] == "should remain"
assert "properties" in cleaned
# Original should be unchanged
assert "$id" in schema


class TestSchemaResolverClass:
"""Test SchemaResolver class specifically"""
def test_resolver_initialization(self):
"""Test resolver initialization"""
# Default initialization
resolver = SchemaResolver()
assert resolver.max_depth == 10
assert resolver.registry is not None
# Custom initialization
custom_registry = MagicMock()
resolver = SchemaResolver(registry=custom_registry, max_depth=5)
assert resolver.max_depth == 5
assert resolver.registry is custom_registry
def test_cache_sharing(self):
"""Test that cache is shared between resolver instances"""
SchemaResolver.clear_cache()
schema = {"$ref": "https://dify.ai/schemas/v1/file.json"}
# First resolver populates cache
resolver1 = SchemaResolver()
result1 = resolver1.resolve(schema)
# Second resolver should use the same cache
resolver2 = SchemaResolver()
with patch.object(resolver2.registry, "get_schema") as mock_get:
result2 = resolver2.resolve(schema)
# Should not call registry since it's in cache
mock_get.assert_not_called()
assert result1 == result2
def test_resolver_with_list_schema(self):
"""Test resolver with list as root schema"""
list_schema = [
{"$ref": "https://dify.ai/schemas/v1/file.json"},
{"type": "string"},
{"$ref": "https://dify.ai/schemas/v1/qa_structure.json"}
]
resolver = SchemaResolver()
resolved = resolver.resolve(list_schema)
assert isinstance(resolved, list)
assert len(resolved) == 3
assert resolved[0]["type"] == "object"
assert resolved[0]["title"] == "File Schema"
assert resolved[1] == {"type": "string"}
assert resolved[2]["type"] == "object"
assert resolved[2]["title"] == "Q&A Structure Schema"


class TestPerformance:
"""Performance-related tests"""
def test_cache_performance(self):
"""Test that caching improves performance"""
SchemaResolver.clear_cache()
# Create a schema with many references to the same schema
schema = {
"type": "object",
"properties": {
f"prop_{i}": {"$ref": "https://dify.ai/schemas/v1/file.json"}
for i in range(50) # Reduced to avoid depth issues
}
}
# First run (no cache) - run multiple times to warm up
results1 = []
for _ in range(3):
SchemaResolver.clear_cache()
start = time.perf_counter()
result1 = resolve_dify_schema_refs(schema)
time_no_cache = time.perf_counter() - start
results1.append(time_no_cache)
avg_time_no_cache = sum(results1) / len(results1)
# Second run (with cache) - run multiple times
results2 = []
for _ in range(3):
start = time.perf_counter()
result2 = resolve_dify_schema_refs(schema)
time_with_cache = time.perf_counter() - start
results2.append(time_with_cache)
avg_time_with_cache = sum(results2) / len(results2)
# Cache should make it faster (more lenient check)
assert result1 == result2
# Cache should provide some performance benefit
assert avg_time_with_cache <= avg_time_no_cache
def test_fast_path_performance_no_refs(self):
"""Test that schemas without $refs use fast path and avoid deep copying"""
# Create a moderately complex schema without any $refs (typical plugin output_schema)
no_refs_schema = {
"type": "object",
"properties": {
f"property_{i}": {
"type": "object",
"properties": {
"name": {"type": "string"},
"value": {"type": "number"},
"items": {
"type": "array",
"items": {"type": "string"}
}
}
}
for i in range(50)
}
}
# Measure fast path (no refs) performance
fast_times = []
for _ in range(10):
start = time.perf_counter()
result_fast = resolve_dify_schema_refs(no_refs_schema)
elapsed = time.perf_counter() - start
fast_times.append(elapsed)
avg_fast_time = sum(fast_times) / len(fast_times)
# Most importantly: result should be identical to input (no copying)
assert result_fast is no_refs_schema
# Create schema with $refs for comparison (same structure size)
with_refs_schema = {
"type": "object",
"properties": {
f"property_{i}": {"$ref": "https://dify.ai/schemas/v1/file.json"}
for i in range(20) # Fewer to avoid depth issues but still comparable
}
}
# Measure slow path (with refs) performance
SchemaResolver.clear_cache()
slow_times = []
for _ in range(10):
SchemaResolver.clear_cache()
start = time.perf_counter()
result_slow = resolve_dify_schema_refs(with_refs_schema, max_depth=50)
elapsed = time.perf_counter() - start
slow_times.append(elapsed)
avg_slow_time = sum(slow_times) / len(slow_times)
# The key benefit: fast path should be reasonably fast (main goal is no deep copy)
# and definitely avoid the expensive BFS resolution
# Even if detection has some overhead, it should still be faster for typical cases
print(f"Fast path (no refs): {avg_fast_time:.6f}s")
print(f"Slow path (with refs): {avg_slow_time:.6f}s")
# More lenient check: fast path should be at least somewhat competitive
# The main benefit is avoiding deep copy and BFS, not necessarily being 5x faster
assert avg_fast_time < avg_slow_time * 2 # Should not be more than 2x slower
def test_batch_processing_performance(self):
"""Test performance improvement for batch processing of schemas without refs"""
# Simulate the plugin tool scenario: many schemas, most without refs
schemas_without_refs = [
{
"type": "object",
"properties": {
f"field_{j}": {"type": "string" if j % 2 else "number"}
for j in range(10)
}
}
for i in range(100)
]
# Test batch processing performance
start = time.perf_counter()
results = [resolve_dify_schema_refs(schema) for schema in schemas_without_refs]
batch_time = time.perf_counter() - start
# Verify all results are identical to inputs (fast path used)
for original, result in zip(schemas_without_refs, results):
assert result is original
# Should be very fast - each schema should take < 0.001 seconds on average
avg_time_per_schema = batch_time / len(schemas_without_refs)
assert avg_time_per_schema < 0.001
def test_has_dify_refs_performance(self):
"""Test that _has_dify_refs is fast for large schemas without refs"""
# Create a very large schema without refs
large_schema = {
"type": "object",
"properties": {}
}
# Add many nested properties
current = large_schema
for i in range(100):
current["properties"][f"level_{i}"] = {
"type": "object",
"properties": {}
}
current = current["properties"][f"level_{i}"]
# _has_dify_refs should be fast even for large schemas
times = []
for _ in range(50):
start = time.perf_counter()
has_refs = _has_dify_refs(large_schema)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg_time = sum(times) / len(times)
# Should be False and fast
assert not has_refs
assert avg_time < 0.01 # Should complete in less than 10ms
def test_hybrid_vs_recursive_performance(self):
"""Test performance comparison between hybrid and recursive detection"""
# Create test schemas of different types and sizes
test_cases = [
# Case 1: Small schema without refs (most common case)
{
"name": "small_no_refs",
"schema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"value": {"type": "number"}
}
},
"expected": False
},
# Case 2: Medium schema without refs
{
"name": "medium_no_refs",
"schema": {
"type": "object",
"properties": {
f"field_{i}": {
"type": "object",
"properties": {
"name": {"type": "string"},
"value": {"type": "number"},
"items": {
"type": "array",
"items": {"type": "string"}
}
}
}
for i in range(20)
}
},
"expected": False
},
# Case 3: Large schema without refs
{
"name": "large_no_refs",
"schema": {
"type": "object",
"properties": {}
},
"expected": False
},
# Case 4: Schema with Dify refs
{
"name": "with_dify_refs",
"schema": {
"type": "object",
"properties": {
"file": {"$ref": "https://dify.ai/schemas/v1/file.json"},
"data": {"type": "string"}
}
},
"expected": True
},
# Case 5: Schema with non-Dify refs
{
"name": "with_external_refs",
"schema": {
"type": "object",
"properties": {
"external": {"$ref": "https://example.com/schema.json"},
"data": {"type": "string"}
}
},
"expected": False
}
]
# Add deep nesting to large schema
current = test_cases[2]["schema"]
for i in range(50):
current["properties"][f"level_{i}"] = {
"type": "object",
"properties": {}
}
current = current["properties"][f"level_{i}"]
# Performance comparison
for test_case in test_cases:
schema = test_case["schema"]
expected = test_case["expected"]
name = test_case["name"]
# Test correctness first
assert _has_dify_refs_hybrid(schema) == expected
assert _has_dify_refs_recursive(schema) == expected
# Measure hybrid performance
hybrid_times = []
for _ in range(10):
start = time.perf_counter()
result_hybrid = _has_dify_refs_hybrid(schema)
elapsed = time.perf_counter() - start
hybrid_times.append(elapsed)
# Measure recursive performance
recursive_times = []
for _ in range(10):
start = time.perf_counter()
result_recursive = _has_dify_refs_recursive(schema)
elapsed = time.perf_counter() - start
recursive_times.append(elapsed)
avg_hybrid = sum(hybrid_times) / len(hybrid_times)
avg_recursive = sum(recursive_times) / len(recursive_times)
print(f"{name}: hybrid={avg_hybrid:.6f}s, recursive={avg_recursive:.6f}s")
# Results should be identical
assert result_hybrid == result_recursive == expected
# For schemas without refs, hybrid should be competitive or better
if not expected: # No refs case
# Hybrid might be slightly slower due to JSON serialization overhead,
# but should not be dramatically worse
assert avg_hybrid < avg_recursive * 5 # At most 5x slower
def test_string_matching_edge_cases(self):
"""Test edge cases for string-based detection"""
# Case 1: False positive potential - $ref in description
schema_false_positive = {
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "This field explains how $ref works in JSON Schema"
}
}
}
# Both methods should return False
assert not _has_dify_refs_hybrid(schema_false_positive)
assert not _has_dify_refs_recursive(schema_false_positive)
# Case 2: Complex URL patterns
complex_schema = {
"type": "object",
"properties": {
"config": {
"type": "object",
"properties": {
"dify_url": {
"type": "string",
"default": "https://dify.ai/schemas/info"
},
"actual_ref": {
"$ref": "https://dify.ai/schemas/v1/file.json"
}
}
}
}
}
# Both methods should return True (due to actual_ref)
assert _has_dify_refs_hybrid(complex_schema)
assert _has_dify_refs_recursive(complex_schema)
# Case 3: Non-JSON serializable objects (should fall back to recursive)
import datetime
non_serializable = {
"type": "object",
"timestamp": datetime.datetime.now(),
"data": {"$ref": "https://dify.ai/schemas/v1/file.json"}
}
# Hybrid should fall back to recursive and still work
assert _has_dify_refs_hybrid(non_serializable)
assert _has_dify_refs_recursive(non_serializable)

Carregando…
Cancelar
Salvar