|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- #
- # Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import ast
- from typing import List, Tuple
-
- from core.logger import logger
- from models.enums import SupportLanguage
-
-
- class SecurePythonAnalyzer(ast.NodeVisitor):
- """
- An AST-based analyzer for detecting unsafe Python code patterns.
- """
-
- DANGEROUS_IMPORTS = {"os", "subprocess", "sys", "shutil", "socket", "ctypes", "pickle", "threading", "multiprocessing", "asyncio", "http.client", "ftplib", "telnetlib"}
-
- DANGEROUS_CALLS = {
- "eval",
- "exec",
- "open",
- "__import__",
- "compile",
- "input",
- "system",
- "popen",
- "remove",
- "rename",
- "rmdir",
- "chdir",
- "chmod",
- "chown",
- "getattr",
- "setattr",
- "globals",
- "locals",
- "shutil.rmtree",
- "subprocess.call",
- "subprocess.Popen",
- "ctypes",
- "pickle.load",
- "pickle.loads",
- "pickle.dump",
- "pickle.dumps",
- }
-
- def __init__(self):
- self.unsafe_items: List[Tuple[str, int]] = []
-
- def visit_Import(self, node: ast.Import):
- """Check for dangerous imports."""
- for alias in node.names:
- if alias.name.split(".")[0] in self.DANGEROUS_IMPORTS:
- self.unsafe_items.append((f"Import: {alias.name}", node.lineno))
- self.generic_visit(node)
-
- def visit_ImportFrom(self, node: ast.ImportFrom):
- """Check for dangerous imports from specific modules."""
- if node.module and node.module.split(".")[0] in self.DANGEROUS_IMPORTS:
- self.unsafe_items.append((f"From Import: {node.module}", node.lineno))
- self.generic_visit(node)
-
- def visit_Call(self, node: ast.Call):
- """Check for dangerous function calls."""
- if isinstance(node.func, ast.Name) and node.func.id in self.DANGEROUS_CALLS:
- self.unsafe_items.append((f"Call: {node.func.id}", node.lineno))
- self.generic_visit(node)
-
- def visit_Attribute(self, node: ast.Attribute):
- """Check for dangerous attribute access."""
- if isinstance(node.value, ast.Name) and node.value.id in self.DANGEROUS_IMPORTS:
- self.unsafe_items.append((f"Attribute Access: {node.value.id}.{node.attr}", node.lineno))
- self.generic_visit(node)
-
- def visit_BinOp(self, node: ast.BinOp):
- """Check for possible unsafe operations like concatenating strings with commands."""
- # This could be useful to detect `eval("os." + "system")`
- if isinstance(node.left, ast.Constant) and isinstance(node.right, ast.Constant):
- self.unsafe_items.append(("Possible unsafe string concatenation", node.lineno))
- self.generic_visit(node)
-
- def visit_FunctionDef(self, node: ast.FunctionDef):
- """Check for dangerous function definitions (e.g., user-defined eval)."""
- if node.name in self.DANGEROUS_CALLS:
- self.unsafe_items.append((f"Function Definition: {node.name}", node.lineno))
- self.generic_visit(node)
-
- def visit_Assign(self, node: ast.Assign):
- """Check for assignments to variables that might lead to dangerous operations."""
- for target in node.targets:
- if isinstance(target, ast.Name) and target.id in self.DANGEROUS_CALLS:
- self.unsafe_items.append((f"Assignment to dangerous variable: {target.id}", node.lineno))
- self.generic_visit(node)
-
- def visit_Lambda(self, node: ast.Lambda):
- """Check for lambda functions with dangerous operations."""
- if isinstance(node.body, ast.Call) and isinstance(node.body.func, ast.Name) and node.body.func.id in self.DANGEROUS_CALLS:
- self.unsafe_items.append(("Lambda with dangerous function call", node.lineno))
- self.generic_visit(node)
-
- def visit_ListComp(self, node: ast.ListComp):
- """Check for list comprehensions with dangerous operations."""
- # First, visit the generators to check for any issues there
- for elem in node.generators:
- if isinstance(elem, ast.comprehension):
- self.generic_visit(elem)
-
- if isinstance(node.elt, ast.Call) and isinstance(node.elt.func, ast.Name) and node.elt.func.id in self.DANGEROUS_CALLS:
- self.unsafe_items.append(("List comprehension with dangerous function call", node.lineno))
- self.generic_visit(node)
-
- def visit_DictComp(self, node: ast.DictComp):
- """Check for dictionary comprehensions with dangerous operations."""
- # Check for dangerous calls in both the key and value expressions of the dictionary comprehension
- if isinstance(node.key, ast.Call) and isinstance(node.key.func, ast.Name) and node.key.func.id in self.DANGEROUS_CALLS:
- self.unsafe_items.append(("Dict comprehension with dangerous function call in key", node.lineno))
-
- if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in self.DANGEROUS_CALLS:
- self.unsafe_items.append(("Dict comprehension with dangerous function call in value", node.lineno))
-
- # Visit other sub-nodes (e.g., the generators in the comprehension)
- self.generic_visit(node)
-
- def visit_SetComp(self, node: ast.SetComp):
- """Check for set comprehensions with dangerous operations."""
- for elt in node.generators:
- if isinstance(elt, ast.comprehension):
- self.generic_visit(elt)
-
- if isinstance(node.elt, ast.Call) and isinstance(node.elt.func, ast.Name) and node.elt.func.id in self.DANGEROUS_CALLS:
- self.unsafe_items.append(("Set comprehension with dangerous function call", node.lineno))
-
- self.generic_visit(node)
-
- def visit_Yield(self, node: ast.Yield):
- """Check for yield statements that could be used to produce unsafe values."""
- if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in self.DANGEROUS_CALLS:
- self.unsafe_items.append(("Yield with dangerous function call", node.lineno))
- self.generic_visit(node)
-
-
- def analyze_code_security(code: str, language: SupportLanguage) -> Tuple[bool, List[Tuple[str, int]]]:
- """
- Analyze the provided code string and return whether it's safe and why.
-
- :param code: The source code to analyze.
- :param language: The programming language of the code.
- :return: (is_safe: bool, issues: List of (description, line number))
- """
- if language == SupportLanguage.PYTHON:
- try:
- tree = ast.parse(code)
- analyzer = SecurePythonAnalyzer()
- analyzer.visit(tree)
- return len(analyzer.unsafe_items) == 0, analyzer.unsafe_items
- except Exception as e:
- logger.error(f"[SafeCheck] Python parsing failed: {str(e)}")
- return False, [(f"Parsing Error: {str(e)}", -1)]
- else:
- logger.warning(f"[SafeCheck] Unsupported language for security analysis: {language} — defaulting to SAFE (manual review recommended)")
- return True, [(f"Unsupported language for security analysis: {language} — defaulted to SAFE, manual review recommended", -1)]
|