cad5b14382
mcp-py 的 run_code/secure_sandbox 此前全是桩。落地两层防御:
1) 静态守卫 sandbox.SecureSandbox.static_guard(纯 AST,执行前第一道)
- 拦危险导入(os/sys/subprocess/socket/ctypes/pickle/requests…)、危险调用
(eval/exec/compile/__import__/open…)、逃逸属性(__subclasses__/__globals__…)、语法错误。
- 返回 (放行, 原因)。
2) 隔离执行 interpreter.CodeInterpreter.execute(Docker,真隔离)
- network_disabled 禁网;user=65534 非 root + cap_drop=ALL + no-new-privileges;
read_only 根 + /tmp tmpfs;mem/memswap(禁swap)/nano_cpus/pids_limit 限资源;
python -I 隔离模式;wait 超时即 kill;容器一次性 remove。
- 无 Docker SDK/daemon 时 available()=False 优雅降级,不阻断服务。
gateway:run_code(标准档 256m/0.5cpu/10s) 与 secure_sandbox(紧档 128m/5s) 均走
守卫→隔离,结果整理为 stdout/stderr/exit 可读文本。pyproject 启用 docker 依赖。
验证:
- 守卫 6 单测(放行安全码 / 拦危险导入·调用·逃逸属性 / 语法错误)全过。
- 隔离 4 项实跑(真 Docker):sum(range(10))→45 exit0;非root uid=65534;
禁网 urlopen 失败(DNS解析错);while True 超时 3s 被 kill。
- 无 Docker 降级测过。
生产加固:可把执行运行时换 gVisor(runsc)/Kata(已在注释/PROGRESS 标注)。
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
62 lines
1.9 KiB
Python
62 lines
1.9 KiB
Python
"""静态守卫与隔离器的单测(守卫纯逻辑无依赖;隔离器测无 Docker 时的降级)。"""
|
|
|
|
import asyncio
|
|
|
|
from sundynix_mcp_py.interpreter import CodeInterpreter
|
|
from sundynix_mcp_py.sandbox import SecureSandbox
|
|
|
|
|
|
def test_guard_allows_safe_code():
|
|
g = SecureSandbox()
|
|
for code in [
|
|
"x = sum(range(10))\nprint(x)",
|
|
"import math, json\nprint(math.sqrt(2))",
|
|
"import re\nprint(re.findall(r'\\d+', 'a1b2'))",
|
|
]:
|
|
ok, reason = g.static_guard(code)
|
|
assert ok, f"安全代码被误拒: {code} → {reason}"
|
|
|
|
|
|
def test_guard_blocks_dangerous_imports():
|
|
g = SecureSandbox()
|
|
for code in [
|
|
"import os\nos.listdir('/')",
|
|
"import subprocess",
|
|
"from socket import socket",
|
|
"import ctypes",
|
|
"import pickle",
|
|
"import requests",
|
|
]:
|
|
ok, reason = g.static_guard(code)
|
|
assert not ok, f"危险导入未拦: {code}"
|
|
assert "禁止导入" in reason
|
|
|
|
|
|
def test_guard_blocks_dangerous_calls():
|
|
g = SecureSandbox()
|
|
for code in ["eval('1+1')", "exec('x=1')", "__import__('os')", "open('/etc/passwd')"]:
|
|
ok, reason = g.static_guard(code)
|
|
assert not ok and "禁止调用" in reason, f"危险调用未拦: {code}"
|
|
|
|
|
|
def test_guard_blocks_escape_attrs():
|
|
g = SecureSandbox()
|
|
ok, reason = g.static_guard("().__class__.__subclasses__()")
|
|
assert not ok and "禁止访问属性" in reason
|
|
ok2, _ = g.static_guard("(lambda: 0).__globals__")
|
|
assert not ok2
|
|
|
|
|
|
def test_guard_rejects_syntax_error():
|
|
g = SecureSandbox()
|
|
ok, reason = g.static_guard("def f(:\n pass")
|
|
assert not ok and "语法错误" in reason
|
|
|
|
|
|
def test_interpreter_degrades_without_docker():
|
|
ci = CodeInterpreter()
|
|
if ci.available():
|
|
return # 本机有 Docker:跳过降级断言(执行路径走集成验证)
|
|
r = asyncio.run(ci.execute("print(1)"))
|
|
assert r["degraded"] is True and r["ok"] is False
|