Compare commits
3 Commits
cb350b6288
...
d9c183fbd8
Author | SHA1 | Date | |
---|---|---|---|
d9c183fbd8 | |||
c5cfcb00f7 | |||
c2782327c3 |
@@ -1,21 +1,13 @@
|
||||
"""
|
||||
Usage: python backdoor_detection.py your_file_path
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
from typing import List, Tuple, Dict
|
||||
import sys
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp"}
|
||||
OUTPUT_FORMATS = ["html", "md", "txt"]
|
||||
|
||||
|
||||
def read_file_content(file_path: str) -> str:
|
||||
"""
|
||||
Reads and returns the content of a specified file. Exits the program with an error if the file does not exist or cannot be read.
|
||||
|
||||
:param file_path: The full path to the file.
|
||||
:return: The text content of the file.
|
||||
:raises FileNotFoundError: If the file does not exist.
|
||||
:raises IOError: If the file cannot be read.
|
||||
"""
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
return file.read()
|
||||
@@ -27,61 +19,150 @@ def read_file_content(file_path: str) -> str:
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def find_dangerous_functions(file_content: str) -> Dict[str, List[Tuple[int, str]]]:
|
||||
"""
|
||||
Searches the given code text for potentially dangerous function calls and classifies results by risk level.
|
||||
Ignores comments in the code.
|
||||
def remove_comments(code: str, extension: str) -> str:
|
||||
if extension == ".py":
|
||||
return code.split("#")[0].strip()
|
||||
elif extension in {".js", ".cpp"}:
|
||||
code = re.sub(r"//.*", "", code)
|
||||
code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL)
|
||||
return code.strip()
|
||||
return code.strip()
|
||||
|
||||
:param file_content: String content of the code file.
|
||||
:return: Dictionary with risk levels as keys and lists of tuples (line number, matched line content) as values.
|
||||
"""
|
||||
# Define dangerous functions and their risk levels
|
||||
patterns: Dict[str, str] = {
|
||||
r"\bsystem\(": "high",
|
||||
r"\bexec\(": "high",
|
||||
r"\bpopen\(": "medium",
|
||||
r"\beval\(": "high",
|
||||
r"\bsubprocess\.run\(": "medium",
|
||||
|
||||
def find_dangerous_functions(
|
||||
file_content: str, file_extension: str
|
||||
) -> Dict[str, List[Tuple[int, str]]]:
|
||||
patterns = {
|
||||
".py": {
|
||||
r"\bsystem\(": "high",
|
||||
r"\bexec\(": "high",
|
||||
r"\bpopen\(": "medium",
|
||||
r"\beval\(": "high",
|
||||
r"\bsubprocess\.run\(": "medium",
|
||||
},
|
||||
".js": {
|
||||
r"\beval\(": "high",
|
||||
r"\bexec\(": "high",
|
||||
r"\bchild_process\.exec\(": "high",
|
||||
},
|
||||
".cpp": {
|
||||
r"\bsystem\(": "high",
|
||||
},
|
||||
}
|
||||
# Store results classified by risk level
|
||||
classified_results = {"high": [], "medium": [], "low": []}
|
||||
risk_patterns = patterns.get(file_extension, {})
|
||||
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
for line_number, line in enumerate(file_content.split("\n"), start=1):
|
||||
# Remove comments from the line
|
||||
clean_line = line.split("#")[0].strip()
|
||||
if not clean_line: # Skip empty or comment-only lines
|
||||
clean_line = remove_comments(line, file_extension)
|
||||
if not clean_line:
|
||||
continue
|
||||
found = False
|
||||
for pattern, risk_level in patterns.items():
|
||||
for pattern, risk_level in risk_patterns.items():
|
||||
if re.search(pattern, clean_line):
|
||||
classified_results[risk_level].append((line_number, clean_line))
|
||||
found = True
|
||||
break # Stop checking other patterns once a match is found
|
||||
break
|
||||
if not found:
|
||||
classified_results["none"].append((line_number, clean_line))
|
||||
return classified_results
|
||||
|
||||
|
||||
def main(file_path: str):
|
||||
"""
|
||||
Main function that reads file content, checks for dangerous functions, and outputs classified results by risk level.
|
||||
def output_results(
|
||||
results: Dict[str, List[Tuple[int, str]]], output_format: str, file_path: str
|
||||
):
|
||||
# Create the 'results' directory if it does not exist
|
||||
results_dir = "../results"
|
||||
if not os.path.exists(results_dir):
|
||||
os.makedirs(results_dir)
|
||||
|
||||
:param file_path: File path input from the command line.
|
||||
"""
|
||||
file_content = read_file_content(file_path)
|
||||
classified_dangerous = find_dangerous_functions(file_content)
|
||||
for risk_level in [
|
||||
"high",
|
||||
"medium",
|
||||
]: # Only iterate over high and medium risk levels
|
||||
occurrences = classified_dangerous[risk_level]
|
||||
if occurrences:
|
||||
print(f"Dangerous functions found at risk level {risk_level}:")
|
||||
for line_num, func in occurrences:
|
||||
print(f" Line {line_num}: {func}")
|
||||
base_name = os.path.basename(file_path)
|
||||
output_file = os.path.join(
|
||||
results_dir, f"{os.path.splitext(base_name)[0]}.{output_format}"
|
||||
)
|
||||
|
||||
if output_format == "html":
|
||||
output_html(results, output_file)
|
||||
elif output_format == "md":
|
||||
output_markdown(results, output_file)
|
||||
elif output_format == "txt":
|
||||
output_text(results, output_file)
|
||||
|
||||
|
||||
def output_html(results: Dict[str, List[Tuple[int, str]]], file_name: str):
|
||||
html_output = f"<html><head><title>Analysis of {file_name}</title></head><body>"
|
||||
html_output += "<h1>Security Analysis Report</h1>"
|
||||
for risk_level, entries in results.items():
|
||||
html_output += f"<h2>{risk_level.capitalize()} Risk</h2><ul>"
|
||||
for line_num, line in entries:
|
||||
html_output += f"<li>Line {line_num}: {line}</li>"
|
||||
html_output += "</ul>"
|
||||
html_output += "</body></html>"
|
||||
with open(file_name, "w") as file:
|
||||
file.write(html_output)
|
||||
|
||||
|
||||
def output_markdown(results: Dict[str, List[Tuple[int, str]]], file_name: str):
|
||||
md_output = f"# Security Analysis Report for {file_name}\n"
|
||||
for risk_level, entries in results.items():
|
||||
md_output += f"## {risk_level.capitalize()} Risk\n"
|
||||
for line_num, line in entries:
|
||||
md_output += f"- Line {line_num}: {line}\n"
|
||||
with open(file_name, "w") as file:
|
||||
file.write(md_output)
|
||||
|
||||
|
||||
def output_text(results: Dict[str, List[Tuple[int, str]]], file_name: str):
|
||||
text_output = f"Security Analysis Report for {file_name}\n"
|
||||
for risk_level, entries in results.items():
|
||||
text_output += f"{risk_level.capitalize()} Risk:\n"
|
||||
for line_num, line in entries:
|
||||
text_output += f" Line {line_num}: {line}\n"
|
||||
with open(file_name, "w") as file:
|
||||
file.write(text_output)
|
||||
|
||||
|
||||
def process_path(path: str, output_format: str):
|
||||
if os.path.isdir(path):
|
||||
for root, dirs, files in os.walk(path):
|
||||
for file in files:
|
||||
file_extension = os.path.splitext(file)[1]
|
||||
if file_extension in SUPPORTED_EXTENSIONS:
|
||||
file_path = os.path.join(root, file)
|
||||
print(f"Processing {file_path}...")
|
||||
file_results = find_dangerous_functions(
|
||||
read_file_content(file_path), file_extension
|
||||
)
|
||||
output_results(file_results, output_format, file_path)
|
||||
elif os.path.isfile(path):
|
||||
file_extension = os.path.splitext(path)[1]
|
||||
if file_extension in SUPPORTED_EXTENSIONS:
|
||||
file_results = find_dangerous_functions(
|
||||
read_file_content(path), file_extension
|
||||
)
|
||||
output_results(file_results, output_format, path)
|
||||
else:
|
||||
print(f"No dangerous functions found at risk level {risk_level}.")
|
||||
print("Unsupported file type.")
|
||||
else:
|
||||
print("Invalid path.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def test():
|
||||
print("hello world")
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: python backdoor_detection.py <path> <output_format>")
|
||||
sys.exit(1)
|
||||
path = sys.argv[1]
|
||||
output_format = sys.argv[2]
|
||||
if output_format not in OUTPUT_FORMATS:
|
||||
print(
|
||||
f"Unsupported output format. Supported formats are: {', '.join(OUTPUT_FORMATS)}"
|
||||
)
|
||||
sys.exit(1)
|
||||
process_path(path, output_format)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python script.py <file_path>")
|
||||
sys.exit(1)
|
||||
main(sys.argv[1])
|
||||
main()
|
||||
|
1
results/test_backdoor_detection.html
Normal file
1
results/test_backdoor_detection.html
Normal file
@@ -0,0 +1 @@
|
||||
<html><head><title>Analysis of ../results\test_backdoor_detection.html</title></head><body><h1>Security Analysis Report</h1><h2>High Risk</h2><ul><li>Line 8: os.system('rm -rf /')</li><li>Line 9: exec('print("Hello")')</li><li>Line 10: eval('2 + 2')</li><li>Line 13: self.assertIn((2, "os.system('rm -rf /')"), results["high"])</li><li>Line 14: self.assertIn((3, "exec('print(\"Hello\")')"), results["high"])</li><li>Line 15: self.assertIn((4, "eval('2 + 2')"), results["high"])</li><li>Line 40: eval('2 + 2')</li><li>Line 45: (3, "eval('2 + 2')"),</li></ul><h2>Medium Risk</h2><ul><li>Line 19: subprocess.run(['ls', '-l'])</li><li>Line 21: os.popen('ls')</li><li>Line 24: self.assertIn((2, "subprocess.run(['ls', '-l'])"), results["medium"])</li><li>Line 25: self.assertIn((4, "os.popen('ls')"), results["medium"])</li><li>Line 41: subprocess.run(['echo', 'hello'])</li><li>Line 49: (4, "subprocess.run(['echo', 'hello'])"),</li></ul><h2>Low Risk</h2><ul></ul><h2>None Risk</h2><ul><li>Line 1: import unittest</li><li>Line 2: from detection.backdoor_detection import find_dangerous_functions</li><li>Line 5: class TestBackdoorDetection(unittest.TestCase):</li><li>Line 6: def test_high_risk_detection(self):</li><li>Line 7: content = """import os</li><li>Line 11: """</li><li>Line 12: results = find_dangerous_functions(content)</li><li>Line 17: def test_medium_risk_detection(self):</li><li>Line 18: content = """import subprocess</li><li>Line 20: import os</li><li>Line 22: """</li><li>Line 23: results = find_dangerous_functions(content)</li><li>Line 27: def test_no_risk_detection(self):</li><li>Line 28: content = """a = 10</li><li>Line 29: b = a + 5</li><li>Line 30: print('This should not be detected as risky.')</li><li>Line 31: """</li><li>Line 32: results = find_dangerous_functions(content)</li><li>Line 33: self.assertEqual(len(results["high"]), 0)</li><li>Line 34: self.assertEqual(len(results["medium"]), 0)</li><li>Line 35: self.assertEqual(len(results["low"]), 0)</li><li>Line 37: def test_inclusion_of_comments(self):</li><li>Line 38: content = """</li><li>Line 39: print('This is a safe line')</li><li>Line 42: """</li><li>Line 43: results = find_dangerous_functions(content)</li><li>Line 44: self.assertIn(</li><li>Line 46: results["high"],</li><li>Line 47: )</li><li>Line 48: self.assertIn(</li><li>Line 50: results["medium"],</li><li>Line 51: )</li><li>Line 54: if __name__ == "__main__":</li><li>Line 55: unittest.main()</li></ul></body></html>
|
53
results/test_backdoor_detection.md
Normal file
53
results/test_backdoor_detection.md
Normal file
@@ -0,0 +1,53 @@
|
||||
# Security Analysis Report for ../results\test_backdoor_detection.md
|
||||
## High Risk
|
||||
- Line 8: os.system('rm -rf /')
|
||||
- Line 9: exec('print("Hello")')
|
||||
- Line 10: eval('2 + 2')
|
||||
- Line 13: self.assertIn((2, "os.system('rm -rf /')"), results["high"])
|
||||
- Line 14: self.assertIn((3, "exec('print(\"Hello\")')"), results["high"])
|
||||
- Line 15: self.assertIn((4, "eval('2 + 2')"), results["high"])
|
||||
- Line 40: eval('2 + 2')
|
||||
- Line 45: (3, "eval('2 + 2')"),
|
||||
## Medium Risk
|
||||
- Line 19: subprocess.run(['ls', '-l'])
|
||||
- Line 21: os.popen('ls')
|
||||
- Line 24: self.assertIn((2, "subprocess.run(['ls', '-l'])"), results["medium"])
|
||||
- Line 25: self.assertIn((4, "os.popen('ls')"), results["medium"])
|
||||
- Line 41: subprocess.run(['echo', 'hello'])
|
||||
- Line 49: (4, "subprocess.run(['echo', 'hello'])"),
|
||||
## Low Risk
|
||||
## None Risk
|
||||
- Line 1: import unittest
|
||||
- Line 2: from detection.backdoor_detection import find_dangerous_functions
|
||||
- Line 5: class TestBackdoorDetection(unittest.TestCase):
|
||||
- Line 6: def test_high_risk_detection(self):
|
||||
- Line 7: content = """import os
|
||||
- Line 11: """
|
||||
- Line 12: results = find_dangerous_functions(content)
|
||||
- Line 17: def test_medium_risk_detection(self):
|
||||
- Line 18: content = """import subprocess
|
||||
- Line 20: import os
|
||||
- Line 22: """
|
||||
- Line 23: results = find_dangerous_functions(content)
|
||||
- Line 27: def test_no_risk_detection(self):
|
||||
- Line 28: content = """a = 10
|
||||
- Line 29: b = a + 5
|
||||
- Line 30: print('This should not be detected as risky.')
|
||||
- Line 31: """
|
||||
- Line 32: results = find_dangerous_functions(content)
|
||||
- Line 33: self.assertEqual(len(results["high"]), 0)
|
||||
- Line 34: self.assertEqual(len(results["medium"]), 0)
|
||||
- Line 35: self.assertEqual(len(results["low"]), 0)
|
||||
- Line 37: def test_inclusion_of_comments(self):
|
||||
- Line 38: content = """
|
||||
- Line 39: print('This is a safe line')
|
||||
- Line 42: """
|
||||
- Line 43: results = find_dangerous_functions(content)
|
||||
- Line 44: self.assertIn(
|
||||
- Line 46: results["high"],
|
||||
- Line 47: )
|
||||
- Line 48: self.assertIn(
|
||||
- Line 50: results["medium"],
|
||||
- Line 51: )
|
||||
- Line 54: if __name__ == "__main__":
|
||||
- Line 55: unittest.main()
|
53
results/test_backdoor_detection.txt
Normal file
53
results/test_backdoor_detection.txt
Normal file
@@ -0,0 +1,53 @@
|
||||
Security Analysis Report for ../results\test_backdoor_detection.txt
|
||||
High Risk:
|
||||
Line 8: os.system('rm -rf /')
|
||||
Line 9: exec('print("Hello")')
|
||||
Line 10: eval('2 + 2')
|
||||
Line 13: self.assertIn((2, "os.system('rm -rf /')"), results["high"])
|
||||
Line 14: self.assertIn((3, "exec('print(\"Hello\")')"), results["high"])
|
||||
Line 15: self.assertIn((4, "eval('2 + 2')"), results["high"])
|
||||
Line 40: eval('2 + 2')
|
||||
Line 45: (3, "eval('2 + 2')"),
|
||||
Medium Risk:
|
||||
Line 19: subprocess.run(['ls', '-l'])
|
||||
Line 21: os.popen('ls')
|
||||
Line 24: self.assertIn((2, "subprocess.run(['ls', '-l'])"), results["medium"])
|
||||
Line 25: self.assertIn((4, "os.popen('ls')"), results["medium"])
|
||||
Line 41: subprocess.run(['echo', 'hello'])
|
||||
Line 49: (4, "subprocess.run(['echo', 'hello'])"),
|
||||
Low Risk:
|
||||
None Risk:
|
||||
Line 1: import unittest
|
||||
Line 2: from detection.backdoor_detection import find_dangerous_functions
|
||||
Line 5: class TestBackdoorDetection(unittest.TestCase):
|
||||
Line 6: def test_high_risk_detection(self):
|
||||
Line 7: content = """import os
|
||||
Line 11: """
|
||||
Line 12: results = find_dangerous_functions(content)
|
||||
Line 17: def test_medium_risk_detection(self):
|
||||
Line 18: content = """import subprocess
|
||||
Line 20: import os
|
||||
Line 22: """
|
||||
Line 23: results = find_dangerous_functions(content)
|
||||
Line 27: def test_no_risk_detection(self):
|
||||
Line 28: content = """a = 10
|
||||
Line 29: b = a + 5
|
||||
Line 30: print('This should not be detected as risky.')
|
||||
Line 31: """
|
||||
Line 32: results = find_dangerous_functions(content)
|
||||
Line 33: self.assertEqual(len(results["high"]), 0)
|
||||
Line 34: self.assertEqual(len(results["medium"]), 0)
|
||||
Line 35: self.assertEqual(len(results["low"]), 0)
|
||||
Line 37: def test_inclusion_of_comments(self):
|
||||
Line 38: content = """
|
||||
Line 39: print('This is a safe line')
|
||||
Line 42: """
|
||||
Line 43: results = find_dangerous_functions(content)
|
||||
Line 44: self.assertIn(
|
||||
Line 46: results["high"],
|
||||
Line 47: )
|
||||
Line 48: self.assertIn(
|
||||
Line 50: results["medium"],
|
||||
Line 51: )
|
||||
Line 54: if __name__ == "__main__":
|
||||
Line 55: unittest.main()
|
@@ -1,5 +1,9 @@
|
||||
import unittest
|
||||
from detection.backdoor_detection import find_dangerous_functions
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.append(os.path.abspath("../detection"))
|
||||
from backdoor_detection import find_dangerous_functions
|
||||
|
||||
|
||||
class TestBackdoorDetection(unittest.TestCase):
|
||||
@@ -9,7 +13,8 @@ class TestBackdoorDetection(unittest.TestCase):
|
||||
exec('print("Hello")') # high risk
|
||||
eval('2 + 2') # high risk
|
||||
"""
|
||||
results = find_dangerous_functions(content)
|
||||
file_extension = ".py"
|
||||
results = find_dangerous_functions(content, file_extension)
|
||||
self.assertIn((2, "os.system('rm -rf /')"), results["high"])
|
||||
self.assertIn((3, "exec('print(\"Hello\")')"), results["high"])
|
||||
self.assertIn((4, "eval('2 + 2')"), results["high"])
|
||||
@@ -20,7 +25,8 @@ class TestBackdoorDetection(unittest.TestCase):
|
||||
import os
|
||||
os.popen('ls') # medium risk
|
||||
"""
|
||||
results = find_dangerous_functions(content)
|
||||
file_extension = ".py"
|
||||
results = find_dangerous_functions(content, file_extension)
|
||||
self.assertIn((2, "subprocess.run(['ls', '-l'])"), results["medium"])
|
||||
self.assertIn((4, "os.popen('ls')"), results["medium"])
|
||||
|
||||
@@ -29,7 +35,8 @@ class TestBackdoorDetection(unittest.TestCase):
|
||||
b = a + 5
|
||||
print('This should not be detected as risky.')
|
||||
"""
|
||||
results = find_dangerous_functions(content)
|
||||
file_extension = ".py"
|
||||
results = find_dangerous_functions(content, file_extension)
|
||||
self.assertEqual(len(results["high"]), 0)
|
||||
self.assertEqual(len(results["medium"]), 0)
|
||||
self.assertEqual(len(results["low"]), 0)
|
||||
@@ -40,7 +47,8 @@ class TestBackdoorDetection(unittest.TestCase):
|
||||
eval('2 + 2') # This should be high risk
|
||||
subprocess.run(['echo', 'hello']) # This should be medium risk
|
||||
"""
|
||||
results = find_dangerous_functions(content)
|
||||
file_extension = ".py"
|
||||
results = find_dangerous_functions(content, file_extension)
|
||||
self.assertIn(
|
||||
(3, "eval('2 + 2')"),
|
||||
results["high"],
|
||||
|
Reference in New Issue
Block a user