feat(mcp-py): 代码沙箱落地 —— AST 静态守卫 + Docker 隔离执行(弃用桩)

mcp-py 的 run_code/secure_sandbox 此前全是桩。落地两层防御:

1) 静态守卫 sandbox.SecureSandbox.static_guard(纯 AST,执行前第一道)
   - 拦危险导入(os/sys/subprocess/socket/ctypes/pickle/requests…)、危险调用
     (eval/exec/compile/__import__/open…)、逃逸属性(__subclasses__/__globals__…)、语法错误。
   - 返回 (放行, 原因)。

2) 隔离执行 interpreter.CodeInterpreter.execute(Docker,真隔离)
   - network_disabled 禁网;user=65534 非 root + cap_drop=ALL + no-new-privileges;
     read_only 根 + /tmp tmpfs;mem/memswap(禁swap)/nano_cpus/pids_limit 限资源;
     python -I 隔离模式;wait 超时即 kill;容器一次性 remove。
   - 无 Docker SDK/daemon 时 available()=False 优雅降级,不阻断服务。

gateway:run_code(标准档 256m/0.5cpu/10s) 与 secure_sandbox(紧档 128m/5s) 均走
守卫→隔离,结果整理为 stdout/stderr/exit 可读文本。pyproject 启用 docker 依赖。

验证:
- 守卫 6 单测(放行安全码 / 拦危险导入·调用·逃逸属性 / 语法错误)全过。
- 隔离 4 项实跑(真 Docker):sum(range(10))→45 exit0;非root uid=65534;
  禁网 urlopen 失败(DNS解析错);while True 超时 3s 被 kill。
- 无 Docker 降级测过。

生产加固:可把执行运行时换 gVisor(runsc)/Kata(已在注释/PROGRESS 标注)。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Blizzard
2026-06-18 11:26:08 +08:00
parent 9657a07bb5
commit cad5b14382
6 changed files with 245 additions and 25 deletions
+62 -12
View File
@@ -1,19 +1,69 @@
"""安全代码沙箱gVisor / KataVM 强隔离 + 静态代码守卫。"""
"""安全代码沙箱 · 静态代码守卫(AST 扫描,纯逻辑,执行前的第一道防线)。
真正的执行隔离在 interpreter.CodeInterpreterDocker:禁网/非 root/丢能力/限资源/一次性)。
本守卫是纵深防御:在进容器前先拦掉明显的逃逸/越权/原生调用向量。
生产可把执行运行时换成 gVisor(runsc) / Kata 进一步硬化。
"""
from __future__ import annotations
import ast
# 危险模块:导入即拒(执行 / 原生 / 网络 / 序列化反序列化 / 进程等逃逸或越权向量)。
_DENY_MODULES = {
"os", "sys", "subprocess", "socket", "ctypes", "importlib", "pickle",
"marshal", "shutil", "multiprocessing", "threading", "pty", "signal",
"mmap", "fcntl", "resource", "requests", "urllib", "http", "ftplib",
"telnetlib", "smtplib", "asyncio", "platform", "pdb", "gc",
}
# 危险内置调用:拒(动态执行 / 文件 / 解释器内省入口)。
_DENY_CALLS = {
"eval", "exec", "compile", "__import__", "open", "input",
"breakpoint", "exit", "quit", "globals", "vars", "memoryview",
}
# 危险属性:拒(典型沙箱逃逸链:obj.__class__.__subclasses__()... / __globals__ ...)。
_DENY_ATTRS = {
"__subclasses__", "__globals__", "__builtins__", "__bases__",
"__mro__", "__reduce__", "__reduce_ex__", "__code__", "__dict__",
"__getattribute__", "__base__",
}
class SecureSandbox:
"""Harness: Secure Code Sandbox"""
"""Harness · 静态代码守卫"""
def static_guard(self, code: str) -> bool:
"""静态代码守卫:危险调用/导入检测,返回是否放行"""
# TODO: AST 扫描,拦截 os/subprocess/网络等高危调用
return True
def static_guard(self, code: str) -> tuple[bool, str]:
"""AST 扫描危险导入 / 调用 / 属性。返回 (是否放行, 拒绝原因)"""
try:
tree = ast.parse(code)
except SyntaxError as e:
return False, f"语法错误:{e.msg}"
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
top = alias.name.split(".")[0]
if top in _DENY_MODULES:
return False, f"禁止导入模块:{alias.name}"
elif isinstance(node, ast.ImportFrom):
top = (node.module or "").split(".")[0]
if top in _DENY_MODULES:
return False, f"禁止导入模块:{node.module}"
elif isinstance(node, ast.Call):
name = _call_name(node.func)
if name in _DENY_CALLS:
return False, f"禁止调用:{name}"
elif isinstance(node, ast.Attribute):
if node.attr in _DENY_ATTRS:
return False, f"禁止访问属性:{node.attr}"
return True, ""
async def run(self, code: str, *, runtime: str = "gvisor") -> str:
"""在 gVisor/KataVM 隔离环境中执行代码。"""
if not self.static_guard(code):
raise PermissionError("static code guard rejected")
# TODO: 调度到 gVisor(runsc) / Kata 容器执行并回收
return ""
def _call_name(func: ast.AST) -> str:
"""取被调用者的名字:普通名(eval) 或属性末段(o.system)。"""
if isinstance(func, ast.Name):
return func.id
if isinstance(func, ast.Attribute):
return func.attr
return ""