您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

security.py 7.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. #
  2. # Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import ast
  17. from typing import List, Tuple
  18. from core.logger import logger
  19. from models.enums import SupportLanguage
  20. class SecurePythonAnalyzer(ast.NodeVisitor):
  21. """
  22. An AST-based analyzer for detecting unsafe Python code patterns.
  23. """
  24. DANGEROUS_IMPORTS = {"os", "subprocess", "sys", "shutil", "socket", "ctypes", "pickle", "threading", "multiprocessing", "asyncio", "http.client", "ftplib", "telnetlib"}
  25. DANGEROUS_CALLS = {
  26. "eval",
  27. "exec",
  28. "open",
  29. "__import__",
  30. "compile",
  31. "input",
  32. "system",
  33. "popen",
  34. "remove",
  35. "rename",
  36. "rmdir",
  37. "chdir",
  38. "chmod",
  39. "chown",
  40. "getattr",
  41. "setattr",
  42. "globals",
  43. "locals",
  44. "shutil.rmtree",
  45. "subprocess.call",
  46. "subprocess.Popen",
  47. "ctypes",
  48. "pickle.load",
  49. "pickle.loads",
  50. "pickle.dump",
  51. "pickle.dumps",
  52. }
  53. def __init__(self):
  54. self.unsafe_items: List[Tuple[str, int]] = []
  55. def visit_Import(self, node: ast.Import):
  56. """Check for dangerous imports."""
  57. for alias in node.names:
  58. if alias.name.split(".")[0] in self.DANGEROUS_IMPORTS:
  59. self.unsafe_items.append((f"Import: {alias.name}", node.lineno))
  60. self.generic_visit(node)
  61. def visit_ImportFrom(self, node: ast.ImportFrom):
  62. """Check for dangerous imports from specific modules."""
  63. if node.module and node.module.split(".")[0] in self.DANGEROUS_IMPORTS:
  64. self.unsafe_items.append((f"From Import: {node.module}", node.lineno))
  65. self.generic_visit(node)
  66. def visit_Call(self, node: ast.Call):
  67. """Check for dangerous function calls."""
  68. if isinstance(node.func, ast.Name) and node.func.id in self.DANGEROUS_CALLS:
  69. self.unsafe_items.append((f"Call: {node.func.id}", node.lineno))
  70. self.generic_visit(node)
  71. def visit_Attribute(self, node: ast.Attribute):
  72. """Check for dangerous attribute access."""
  73. if isinstance(node.value, ast.Name) and node.value.id in self.DANGEROUS_IMPORTS:
  74. self.unsafe_items.append((f"Attribute Access: {node.value.id}.{node.attr}", node.lineno))
  75. self.generic_visit(node)
  76. def visit_BinOp(self, node: ast.BinOp):
  77. """Check for possible unsafe operations like concatenating strings with commands."""
  78. # This could be useful to detect `eval("os." + "system")`
  79. if isinstance(node.left, ast.Constant) and isinstance(node.right, ast.Constant):
  80. self.unsafe_items.append(("Possible unsafe string concatenation", node.lineno))
  81. self.generic_visit(node)
  82. def visit_FunctionDef(self, node: ast.FunctionDef):
  83. """Check for dangerous function definitions (e.g., user-defined eval)."""
  84. if node.name in self.DANGEROUS_CALLS:
  85. self.unsafe_items.append((f"Function Definition: {node.name}", node.lineno))
  86. self.generic_visit(node)
  87. def visit_Assign(self, node: ast.Assign):
  88. """Check for assignments to variables that might lead to dangerous operations."""
  89. for target in node.targets:
  90. if isinstance(target, ast.Name) and target.id in self.DANGEROUS_CALLS:
  91. self.unsafe_items.append((f"Assignment to dangerous variable: {target.id}", node.lineno))
  92. self.generic_visit(node)
  93. def visit_Lambda(self, node: ast.Lambda):
  94. """Check for lambda functions with dangerous operations."""
  95. if isinstance(node.body, ast.Call) and isinstance(node.body.func, ast.Name) and node.body.func.id in self.DANGEROUS_CALLS:
  96. self.unsafe_items.append(("Lambda with dangerous function call", node.lineno))
  97. self.generic_visit(node)
  98. def visit_ListComp(self, node: ast.ListComp):
  99. """Check for list comprehensions with dangerous operations."""
  100. # First, visit the generators to check for any issues there
  101. for elem in node.generators:
  102. if isinstance(elem, ast.comprehension):
  103. self.generic_visit(elem)
  104. if isinstance(node.elt, ast.Call) and isinstance(node.elt.func, ast.Name) and node.elt.func.id in self.DANGEROUS_CALLS:
  105. self.unsafe_items.append(("List comprehension with dangerous function call", node.lineno))
  106. self.generic_visit(node)
  107. def visit_DictComp(self, node: ast.DictComp):
  108. """Check for dictionary comprehensions with dangerous operations."""
  109. # Check for dangerous calls in both the key and value expressions of the dictionary comprehension
  110. if isinstance(node.key, ast.Call) and isinstance(node.key.func, ast.Name) and node.key.func.id in self.DANGEROUS_CALLS:
  111. self.unsafe_items.append(("Dict comprehension with dangerous function call in key", node.lineno))
  112. if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in self.DANGEROUS_CALLS:
  113. self.unsafe_items.append(("Dict comprehension with dangerous function call in value", node.lineno))
  114. # Visit other sub-nodes (e.g., the generators in the comprehension)
  115. self.generic_visit(node)
  116. def visit_SetComp(self, node: ast.SetComp):
  117. """Check for set comprehensions with dangerous operations."""
  118. for elt in node.generators:
  119. if isinstance(elt, ast.comprehension):
  120. self.generic_visit(elt)
  121. if isinstance(node.elt, ast.Call) and isinstance(node.elt.func, ast.Name) and node.elt.func.id in self.DANGEROUS_CALLS:
  122. self.unsafe_items.append(("Set comprehension with dangerous function call", node.lineno))
  123. self.generic_visit(node)
  124. def visit_Yield(self, node: ast.Yield):
  125. """Check for yield statements that could be used to produce unsafe values."""
  126. if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in self.DANGEROUS_CALLS:
  127. self.unsafe_items.append(("Yield with dangerous function call", node.lineno))
  128. self.generic_visit(node)
  129. def analyze_code_security(code: str, language: SupportLanguage) -> Tuple[bool, List[Tuple[str, int]]]:
  130. """
  131. Analyze the provided code string and return whether it's safe and why.
  132. :param code: The source code to analyze.
  133. :param language: The programming language of the code.
  134. :return: (is_safe: bool, issues: List of (description, line number))
  135. """
  136. if language == SupportLanguage.PYTHON:
  137. try:
  138. tree = ast.parse(code)
  139. analyzer = SecurePythonAnalyzer()
  140. analyzer.visit(tree)
  141. return len(analyzer.unsafe_items) == 0, analyzer.unsafe_items
  142. except Exception as e:
  143. logger.error(f"[SafeCheck] Python parsing failed: {str(e)}")
  144. return False, [(f"Parsing Error: {str(e)}", -1)]
  145. else:
  146. logger.warning(f"[SafeCheck] Unsupported language for security analysis: {language} — defaulting to SAFE (manual review recommended)")
  147. return True, [(f"Unsupported language for security analysis: {language} — defaulted to SAFE, manual review recommended", -1)]