MCP协议深度实战:打造AI工具互联互通的2026新基建

MCP协议深度实战:打造AI工具互联互通的2026新基建

Ethan
2026-04-05 发布 / 正在检测是否收录...

MCP协议深度实战:打造AI工具互联互通的2026新基建

一、引言:AI Agent的"USB标准"

MCP(Model Context Protocol)是Anthropic于2025年推出、2026年成为行业事实标准的AI Agent工具互联协议。形象地说,它就像是AI世界的"USB-C"——为AI模型提供了一种标准化的方式来连接外部工具和数据源。

截至2026年5月,MCP生态的规模已经相当可观:

  • 1000+ 官方和社区Server:覆盖数据库、文件系统、搜索引擎、API网关等
  • 20+ AI平台原生支持:Claude、Cursor、Continue.dev、Zed等
  • 跨语言SDK:Python、TypeScript、Java、Go的官方SDK都已成熟

本文将带你从零搭建多个MCP Server,并深入解析MCP协议的架构设计。

二、MCP协议核心架构

2.1 协议分层

┌────────────────────────────────────────┐
│           MCP Host (IDE/应用)           │
│   例如:Claude Desktop, Cursor, VS Code │
├────────────────────────────────────────┤
│           MCP Client (协议层)            │
│   管理连接、请求、通知                   │
├────────────────────────────────────────┤
│              JSON-RPC 2.0              │
│   传输层:stdio / SSE / WebSocket      │
├────────────────────────────────────────┤
│           MCP Server (工具实现)          │
│   数据库查询、API调用、文件操作...       │
└────────────────────────────────────────┘

2.2 三大核心原语

MCP协议围绕三个核心概念构建:

原语用途示例
ToolsAI可调用的功能查数据库、搜文件、调用API
ResourcesAI可读取的数据文档、Schema、配置
Prompts预定义的提示模板代码审查模板、SQL优化模板
// MCP协议JSON-RPC消息示例
{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "query_database",
    "arguments": {
      "sql": "SELECT * FROM orders WHERE status = 'pending' LIMIT 10"
    }
  },
  "id": "req-001"
}

// 响应
{
  "jsonrpc": "2.0",
  "result": {
    "content": [
      {
        "type": "text",
        "text": "查询结果: [{\"id\": 1001, \"status\": \"pending\", \"amount\": 299.00}, ...]"
      }
    ]
  },
  "id": "req-001"
}

三、实战:构建三个MCP Server

3.1 MySQL数据库MCP Server(Python)

# mysql_mcp_server.py
import asyncio
import json
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationCapabilities
from mcp.server.stdio import stdio_server
import aiomysql

server = Server("mysql-explorer")

# 注册工具
@server.list_tools()
async def list_tools():
    return [
        {
            "name": "query_database",
            "description": "执行SELECT查询,返回结果集。支持参数化查询防止SQL注入",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "sql": {
                        "type": "string",
                        "description": "SELECT查询语句"
                    },
                    "params": {
                        "type": "array",
                        "description": "查询参数(可选)",
                        "items": {"type": "string"}
                    }
                },
                "required": ["sql"]
            }
        },
        {
            "name": "get_table_schema",
            "description": "获取指定表的结构信息",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "table_name": {
                        "type": "string",
                        "description": "表名"
                    }
                },
                "required": ["table_name"]
            }
        }
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    pool = await aiomysql.create_pool(
        host="localhost", port=3307,
        user="typecho", password="typecho123",
        db="typecho", charset="utf8mb4"
    )
    
    if name == "query_database":
        async with pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(arguments["sql"], arguments.get("params", []))
                rows = await cur.fetchall()
                return [{"type": "text", "text": json.dumps(rows, ensure_ascii=False, default=str)}]
    
    elif name == "get_table_schema":
        async with pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(f"DESCRIBE {arguments['table_name']}")
                rows = await cur.fetchall()
                return [{"type": "text", "text": json.dumps(rows, ensure_ascii=False, default=str)}]
    
    pool.close()
    await pool.wait_closed()

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream, write_stream,
            InitializationCapabilities(
                sampling={},
                experimental={},
                roots={"listChanged": True}
            )
        )

if __name__ == "__main__":
    asyncio.run(main())

3.2 文件系统MCP Server(TypeScript)

// filesystem_mcp_server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";

const server = new Server(
  { name: "filesystem-server", version: "1.0.0" },
  { capabilities: { tools: {} } }
);

// 安全白名单:AI只能访问这些目录
const ALLOWED_DIRECTORIES = [
  "/home/user/projects",
  "/var/data/documents",
];

function isPathSafe(targetPath: string): boolean {
  const resolved = path.resolve(targetPath);
  return ALLOWED_DIRECTORIES.some(dir => resolved.startsWith(dir));
}

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "read_file",
      description: "读取文件内容",
      inputSchema: {
        type: "object",
        properties: {
          path: { type: "string", description: "文件路径" },
          encoding: { type: "string", enum: ["utf-8", "base64"], default: "utf-8" }
        },
        required: ["path"]
      }
    },
    {
      name: "search_files",
      description: "在目录中搜索文件(支持glob模式)",
      inputSchema: {
        type: "object",
        properties: {
          directory: { type: "string" },
          pattern: { type: "string" }
        },
        required: ["directory", "pattern"]
      }
    }
  ]
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;
  
  if (name === "read_file") {
    const filePath = args.path as string;
    if (!isPathSafe(filePath)) {
      return {
        content: [{ type: "text", text: "Error: 访问路径不在白名单中" }],
        isError: true
      };
    }
    const content = await fs.readFile(filePath, "utf-8");
    return {
      content: [{ type: "text", text: content }]
    };
  }
  
  if (name === "search_files") {
    const dir = args.directory as string;
    if (!isPathSafe(dir)) {
      return {
        content: [{ type: "text", text: "Error: 访问路径不在白名单中" }],
        isError: true
      };
    }
    const entries = await fs.readdir(dir, { withFileTypes: true });
    const matched = entries
      .filter(e => e.name.includes(args.pattern as string))
      .map(e => `${e.isDirectory() ? '📁' : '📄'} ${e.name}`);
    
    return {
      content: [{ type: "text", text: matched.join('\n') }]
    };
  }
  
  throw new Error(`Unknown tool: ${name}`);
});

async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
}

main();

3.3 Spring Boot MCP Server(Java)

// Spring Boot MCP Server - 将现有API自动转化为MCP工具
@SpringBootApplication
@McpServer(
    name = "user-management-server",
    version = "1.0.0"
)
public class McpServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(McpServerApplication.class, args);
    }
}

@Component
public class UserManagementTools {
    
    @McpTool(
        name = "get_user",
        description = "根据ID查询用户信息"
    )
    public User getUser(@McpParam("用户ID") Long userId) {
        return userRepository.findById(userId)
            .orElseThrow(() -> new NotFoundException("用户不存在"));
    }
    
    @McpTool(
        name = "search_users",
        description = "根据关键词搜索用户"
    )
    public List<User> searchUsers(
        @McpParam("搜索关键词") String keyword,
        @McpParam(value = "分页大小", required = false) Integer limit
    ) {
        return userRepository.search(keyword, limit != null ? limit : 20);
    }
    
    @McpTool(
        name = "export_users_csv",
        description = "导出用户列表为CSV格式"
    )
    public String exportUsersCsv(
        @McpParam("部门ID") Long departmentId
    ) {
        List<User> users = userRepository.findByDepartment(departmentId);
        return CsvUtils.toCsv(users);
    }
}

// application.yml 配置
mcp:
  server:
    transport: stdio        # 支持 stdio / sse / websocket
    tools:
      auto-scan: true       # 自动扫描@McpTool注解

四、MCP与A2A协议的关系

这是2026年AI协议领域最常见的问题:

维度MCPA2A
定位AI到工具的连接协议Agent到Agent的通信协议
类比USB-C(设备连接)TCP/IP(设备间通信)
通信流单向:AI调用工具双向:Agent间协作
发起者AnthropicGoogle
成熟度生产就绪2026年5月正式发布

两者是互补关系而非竞争关系。一个典型的AI应用会同时使用:

  • MCP:让单个Agent拥有一套标准化工具
  • A2A:让多个Agent(可能由不同框架构建)协作完成复杂任务

五、生产环境MCP部署架构

                  ┌──────────────────┐
                  │   AI 应用/IDE    │
                  └────────┬─────────┘
                           │
                  ┌────────▼─────────┐
                  │   MCP Gateway    │
                  │  (路由/鉴权/限流) │
                  └──┬───────┬──────┘
                     │       │
          ┌──────────▼─┐ ┌──▼──────────┐
          │  MySQL MCP │ │ Redis MCP   │
          │  Server    │ │ Server      │
          └────────────┘ └─────────────┘
                     │       │
          ┌──────────▼─┐ ┌──▼──────────┐
          │  GitHub MCP│ │  Slack MCP  │
          │  Server    │ │  Server     │
          └────────────┘ └─────────────┘

关键生产实践:

  • 安全隔离:每个MCP Server运行在独立的沙箱中
  • 权限控制:细粒度的用户-工具访问策略
  • 速率限制:防止AI模型的工具调用频率过高导致后端压力
  • 日志审计:所有工具调用留痕,满足合规要求

六、结语

MCP协议正在成为AI Agent的"新基建"。对于技术团队,现在的关键行动项是:

  1. 盘点内部API:识别哪些API适合暴露给AI Agent
  2. 搭建MCP Server:将高频使用的API封装为MCP工具
  3. 制定安全策略:确认AI可以访问哪些数据和功能
  4. 关注A2A:随着多Agent场景增多,A2A协议也将变得重要

MCP不是在创造新的工作量,而是在为AI Agent时代做基础设施准备。


发布日期:2026年4月5日 | 作者:Ethan | 分类:AI、MCP协议

© 版权声明
THE END
喜欢就支持一下吧
点赞 1 分享 收藏

评论 (0)

取消

Warning: file_put_contents(/var/www/html/usr/cache/pagecache/d2/d285fddfc230b27a84baf054724a1b6a.cache): failed to open stream: No such file or directory in /var/www/html/usr/plugins/PageCache/Plugin.php on line 188