cad5b14382
mcp-py 的 run_code/secure_sandbox 此前全是桩。落地两层防御:
1) 静态守卫 sandbox.SecureSandbox.static_guard(纯 AST,执行前第一道)
- 拦危险导入(os/sys/subprocess/socket/ctypes/pickle/requests…)、危险调用
(eval/exec/compile/__import__/open…)、逃逸属性(__subclasses__/__globals__…)、语法错误。
- 返回 (放行, 原因)。
2) 隔离执行 interpreter.CodeInterpreter.execute(Docker,真隔离)
- network_disabled 禁网;user=65534 非 root + cap_drop=ALL + no-new-privileges;
read_only 根 + /tmp tmpfs;mem/memswap(禁swap)/nano_cpus/pids_limit 限资源;
python -I 隔离模式;wait 超时即 kill;容器一次性 remove。
- 无 Docker SDK/daemon 时 available()=False 优雅降级,不阻断服务。
gateway:run_code(标准档 256m/0.5cpu/10s) 与 secure_sandbox(紧档 128m/5s) 均走
守卫→隔离,结果整理为 stdout/stderr/exit 可读文本。pyproject 启用 docker 依赖。
验证:
- 守卫 6 单测(放行安全码 / 拦危险导入·调用·逃逸属性 / 语法错误)全过。
- 隔离 4 项实跑(真 Docker):sum(range(10))→45 exit0;非root uid=65534;
禁网 urlopen 失败(DNS解析错);while True 超时 3s 被 kill。
- 无 Docker 降级测过。
生产加固:可把执行运行时换 gVisor(runsc)/Kata(已在注释/PROGRESS 标注)。
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
70 lines
2.8 KiB
Python
70 lines
2.8 KiB
Python
"""安全代码沙箱 · 静态代码守卫(AST 扫描,纯逻辑,执行前的第一道防线)。
|
||
|
||
真正的执行隔离在 interpreter.CodeInterpreter(Docker:禁网/非 root/丢能力/限资源/一次性)。
|
||
本守卫是纵深防御:在进容器前先拦掉明显的逃逸/越权/原生调用向量。
|
||
生产可把执行运行时换成 gVisor(runsc) / Kata 进一步硬化。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import ast
|
||
|
||
# 危险模块:导入即拒(执行 / 原生 / 网络 / 序列化反序列化 / 进程等逃逸或越权向量)。
|
||
_DENY_MODULES = {
|
||
"os", "sys", "subprocess", "socket", "ctypes", "importlib", "pickle",
|
||
"marshal", "shutil", "multiprocessing", "threading", "pty", "signal",
|
||
"mmap", "fcntl", "resource", "requests", "urllib", "http", "ftplib",
|
||
"telnetlib", "smtplib", "asyncio", "platform", "pdb", "gc",
|
||
}
|
||
|
||
# 危险内置调用:拒(动态执行 / 文件 / 解释器内省入口)。
|
||
_DENY_CALLS = {
|
||
"eval", "exec", "compile", "__import__", "open", "input",
|
||
"breakpoint", "exit", "quit", "globals", "vars", "memoryview",
|
||
}
|
||
|
||
# 危险属性:拒(典型沙箱逃逸链:obj.__class__.__subclasses__()... / __globals__ ...)。
|
||
_DENY_ATTRS = {
|
||
"__subclasses__", "__globals__", "__builtins__", "__bases__",
|
||
"__mro__", "__reduce__", "__reduce_ex__", "__code__", "__dict__",
|
||
"__getattribute__", "__base__",
|
||
}
|
||
|
||
|
||
class SecureSandbox:
|
||
"""Harness · 静态代码守卫。"""
|
||
|
||
def static_guard(self, code: str) -> tuple[bool, str]:
|
||
"""AST 扫描危险导入 / 调用 / 属性。返回 (是否放行, 拒绝原因)。"""
|
||
try:
|
||
tree = ast.parse(code)
|
||
except SyntaxError as e:
|
||
return False, f"语法错误:{e.msg}"
|
||
for node in ast.walk(tree):
|
||
if isinstance(node, ast.Import):
|
||
for alias in node.names:
|
||
top = alias.name.split(".")[0]
|
||
if top in _DENY_MODULES:
|
||
return False, f"禁止导入模块:{alias.name}"
|
||
elif isinstance(node, ast.ImportFrom):
|
||
top = (node.module or "").split(".")[0]
|
||
if top in _DENY_MODULES:
|
||
return False, f"禁止导入模块:{node.module}"
|
||
elif isinstance(node, ast.Call):
|
||
name = _call_name(node.func)
|
||
if name in _DENY_CALLS:
|
||
return False, f"禁止调用:{name}"
|
||
elif isinstance(node, ast.Attribute):
|
||
if node.attr in _DENY_ATTRS:
|
||
return False, f"禁止访问属性:{node.attr}"
|
||
return True, ""
|
||
|
||
|
||
def _call_name(func: ast.AST) -> str:
|
||
"""取被调用者的名字:普通名(eval) 或属性末段(o.system)。"""
|
||
if isinstance(func, ast.Name):
|
||
return func.id
|
||
if isinstance(func, ast.Attribute):
|
||
return func.attr
|
||
return ""
|