一句话概括:读完这篇,你就能给自己的 AI 写"专属技能"——定时发消息、自动查数据、甚至帮你摸鱼!
适合人群:用过 MCP、想进阶的开发者,想让 AI 更懂你需求的人
兄弟们,上一篇聊了 MCP 是什么、怎么配置现成的 MCP Server。
配置完发现没?别人的 MCP Server,终究是别人的。
- 想让 AI 定时给你发日报提醒?没有现成的
- 想让 AI 直接调用你公司的内部 API?更没有了
- 想让 AI 操作你的 NAS、智能家居?想都别想
怎么办?自己写一个!
别怕,比你想的简单多了。今天这篇,手把手教你从零开发一个 MCP Server。
开发 MCP Server 前要懂的三件事
1. MCP Server 本质是什么?
一句话:一个程序,通过特定协议和 AI 客户端通信。
通信方式有三种:
| 传输方式 | 适用场景 | 配置示例 |
|---|---|---|
| stdio | 本地命令行启动 | "command": "python", "args": ["server.py"] |
| SSE | HTTP 长连接,适合远程服务 | "url": "http://localhost:8000/sse" |
| streamable-http | 现代化 HTTP,推荐用于生产 | "url": "http://localhost:8000/mcp" |
新手推荐 stdio,最简单,今天重点讲这个。
2. 选择你的武器
| 语言 | 优势 | 官方 SDK |
|---|---|---|
| Python | 简单易上手、生态丰富 | mcp |
| TypeScript/Node.js | 类型安全、前端友好 | @modelcontextprotocol/sdk |
| Go / Rust | 性能强、单二进制部署 | 社区实现 |
两种语言都演示,任选其一即可。
3. MCP Server 能提供什么能力?
三种"能力":
| 能力类型 | 是什么 | 用途 |
|---|---|---|
| Tools | AI 可调用的函数 | 执行操作、返回结果 |
| Resources | AI 可访问的资源 | 文件内容、数据库记录 |
| Prompts | 预定义的提示词模板 | 快速发起特定任务 |
最常用的是 Tools,今天重点讲这个。
Python 方式:5 分钟上手
安装依赖
pip install "mcp[cli]"
实战一:Hello World(最简版本)
# hello_mcp.py
from mcp.server.fastmcp import FastMCP
# 创建 Server 实例
mcp = FastMCP("hello-world")
# 定义工具 - 装饰器自动处理参数解析
@mcp.tool()
def say_hello(name: str) -> str:
"""向指定的人说 Hello"""
return f"Hello, {name}! 🎉,我是来自Mcp的自定义问候"
# 启动服务器
if __name__ == "__main__":
mcp.run() # 默认使用 stdio 传输
就这么简单!@mcp.tool() 装饰器自动:
- 从函数签名生成 JSON Schema
- 从 docstring 提取工具描述
- 处理参数解析和返回值
如何将 MCP 工具添加到 AI 客户端:
- 启动 MCP Server:
python hello_mcp.py
- 在 AI 客户端配置文件中添加 MCP 服务器:
{
"mcpServers": {
"hello-world": {
"command": "python",
"args": ["/path/to/hello_mcp.py"]
}
}
}
- 重启 AI 客户端,即可使用
say_hello工具。 - 使用效果:

实战二:定时提醒 MCP Server
from mcp.server.fastmcp import FastMCP
from datetime import datetime, timedelta
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.asyncio import AsyncIOExecutor
import asyncio
import sys
import subprocess
import threading
import logging
import json
import os
from typing import Dict, Any, Optional
# 配置日志,避免干扰MCP协议
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stderr)] # 输出到stderr,不干扰stdout的MCP通信
)
logger = logging.getLogger(__name__)
mcp = FastMCP("reminder-enhanced")
scheduler = AsyncIOScheduler()
reminders = {} # 存储所有提醒
REMINDER_FILE = "reminders.json" # 持久化存储文件
def check_notification_dependencies():
"""检查通知功能的依赖"""
deps = {}
if sys.platform == "darwin":
deps['osascript'] = subprocess.run(['which', 'osascript'], capture_output=True).returncode == 0
elif sys.platform.startswith("linux"):
deps['notify-send'] = subprocess.run(['which', 'notify-send'], capture_output=True).returncode == 0
deps['zenity'] = subprocess.run(['which', 'zenity'], capture_output=True).returncode == 0
elif sys.platform == "win32":
try:
import plyer
deps['plyer'] = True
except ImportError:
deps['plyer'] = False
return deps
def show_desktop_notification(title: str, message: str):
"""显示桌面通知,带有更好的错误处理"""
def _notify():
try:
deps = check_notification_dependencies()
notified = False
if sys.platform == "darwin": # macOS
if deps.get('osascript', False):
script = f'display notification "{message}" with title "{title}" subtitle "提醒"'
subprocess.run(["osascript", "-e", script], check=True, capture_output=True)
logger.info(f"macOS通知已发送:{title}")
notified = True
elif sys.platform.startswith("linux"): # Linux
# 尝试使用notify-send
if deps.get('notify-send', False):
result = subprocess.run(
["notify-send", "-u", "normal", "-t", "10000", title, message],
capture_output=True,
text=True
)
if result.returncode == 0:
logger.info(f"Linux通知已发送:{title}")
notified = True
else:
logger.error(f"notify-send失败:{result.stderr}")
# 如果notify-send失败,尝试zenity
if not notified and deps.get('zenity', False):
try:
subprocess.run(
["zenity", "--info", "--title", title, "--text", message, "--timeout", "10"],
capture_output=True
)
logger.info(f"Zenity通知已发送:{title}")
notified = True
except Exception as e:
logger.error(f"Zenity也失败:{e}")
elif sys.platform == "win32": # Windows
if deps.get('plyer', False):
try:
from plyer import notification
notification.notify(
title=title,
message=message,
timeout=10
)
logger.info(f"Windows通知已发送:{title}")
notified = True
except ImportError:
logger.error("Windows系统需要安装plyer库:pip install plyer")
if not notified:
logger.warning(f"无法发送桌面通知,系统可能不支持:{title}")
except subprocess.CalledProcessError as e:
logger.error(f"桌面通知调用失败:{e}")
except Exception as e:
logger.error(f"显示桌面通知时发生错误:{e}")
# 在新线程中执行通知,避免阻塞定时任务
thread = threading.Thread(target=_notify)
thread.daemon = True
thread.start()
def load_reminders():
"""从文件加载提醒"""
global reminders
if os.path.exists(REMINDER_FILE):
try:
with open(REMINDER_FILE, 'r', encoding='utf-8') as f:
reminders = json.load(f)
# 将时间字符串转换回datetime对象
for rid, reminder in reminders.items():
if isinstance(reminder.get('trigger_time'), str):
try:
reminder['trigger_time'] = datetime.fromisoformat(reminder['trigger_time'])
except ValueError:
# 如果解析失败,尝试其他格式
try:
reminder['trigger_time'] = datetime.strptime(reminder['trigger_time'], '%Y-%m-%d %H:%M:%S')
except ValueError:
logger.error(f"无法解析时间格式: {reminder['trigger_time']}")
continue
if isinstance(reminder.get('created_time'), str):
try:
reminder['created_time'] = datetime.fromisoformat(reminder['created_time'])
except ValueError:
try:
reminder['created_time'] = datetime.strptime(reminder['created_time'], '%Y-%m-%d %H:%M:%S')
except ValueError:
logger.error(f"无法解析创建时间格式: {reminder['created_time']}")
continue
logger.info(f"已从 {REMINDER_FILE} 加载 {len(reminders)} 个提醒")
except Exception as e:
logger.error(f"加载提醒失败:{e}")
reminders = {}
else:
logger.info("提醒文件不存在,创建空提醒列表")
def save_reminders():
"""将提醒保存到文件"""
try:
# 将datetime对象转换为字符串进行存储
reminders_for_save = {}
for rid, reminder in reminders.items():
reminder_copy = reminder.copy()
if isinstance(reminder_copy.get('trigger_time'), datetime):
reminder_copy['trigger_time'] = reminder_copy['trigger_time'].isoformat()
if isinstance(reminder_copy.get('created_time'), datetime):
reminder_copy['created_time'] = reminder_copy['created_time'].isoformat()
reminders_for_save[rid] = reminder_copy
with open(REMINDER_FILE, 'w', encoding='utf-8') as f:
json.dump(reminders_for_save, f, ensure_ascii=False, indent=2)
logger.info(f"已保存 {len(reminders)} 个提醒到 {REMINDER_FILE}")
except Exception as e:
logger.error(f"保存提醒失败:{e}")
async def reminder_callback(reminder_id: str):
"""提醒触发时的回调"""
reminder = reminders.get(reminder_id)
if reminder:
# 触发桌面通知
show_desktop_notification(reminder['title'], reminder['message'])
# 使用logger输出到stderr,不干扰MCP协议
logger.info(f"⏰ 提醒已触发:{reminder['title']} - {reminder['message']}")
# 从内存中移除提醒
if reminder_id in reminders:
del reminders[reminder_id]
# 保存更改
save_reminders()
def ensure_scheduler_running():
"""确保调度器正在运行"""
try:
if not scheduler.running:
scheduler.start()
except Exception as e:
# 在非事件循环环境中,我们只记录提醒但不设置定时器
logger.warning(f"无法启动调度器:{e},将仅保存提醒记录")
@mcp.tool()
def create_reminder(title: str, message: str, delay_minutes: float) -> str:
"""创建一个定时提醒,支持分钟和小时"""
try:
# 验证输入参数
if not title.strip():
return "错误:提醒标题不能为空"
if not message.strip():
return "错误:提醒消息不能为空"
if delay_minutes <= 0:
return "错误:延迟时间必须大于0"
reminder_id = f"r_{datetime.now().strftime('%Y%m%d%H%M%S')}_{int(delay_minutes * 60)}"
trigger_time = datetime.now() + timedelta(minutes=delay_minutes)
reminders[reminder_id] = {
"id": reminder_id,
"title": title,
"message": message,
"trigger_time": trigger_time,
"created_time": datetime.now()
}
# 尝试添加定时任务,但不抛出错误
try:
if not scheduler.running:
scheduler.start()
scheduler.add_job(
reminder_callback,
'date',
run_date=trigger_time,
args=[reminder_id],
id=reminder_id
)
job_status = "ok"
except Exception as e:
logger.warning(f"无法设置定时任务:{e},提醒已保存但不会触发通知")
job_status = "WARN(仅保存,无定时任务)"
# 保存提醒到持久化存储
save_reminders()
time_str = trigger_time.strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"提醒已创建:{title}, 触发时间:{time_str}")
return (
f"提醒已创建!{job_status}\n"
f"ID: {reminder_id}\n"
f"标题: {title}\n"
f"消息: {message}\n"
f"延迟时间: {delay_minutes} 分钟\n"
f"触发时间: {time_str}"
)
except Exception as e:
error_msg = f"创建提醒时发生错误:{str(e)}"
logger.error(error_msg)
return error_msg
@mcp.tool()
def create_reminder_hours(title: str, message: str, delay_hours: float) -> str:
"""创建一个按时计的定时提醒"""
try:
delay_minutes = delay_hours * 60
return create_reminder(title, message, delay_minutes)
except Exception as e:
error_msg = f"创建提醒时发生错误:{str(e)}"
logger.error(error_msg)
return error_msg
@mcp.tool()
def list_reminders() -> str:
"""列出所有待处理的提醒"""
if not reminders:
return "暂无待处理的提醒"
lines = ["待处理提醒:\n"]
for rid, r in reminders.items():
trigger_time = r['trigger_time']
if isinstance(trigger_time, datetime):
time_str = trigger_time.strftime('%Y-%m-%d %H:%M:%S')
else:
time_str = str(trigger_time)
lines.append(f"- [{rid}] {r['title']} - {time_str}")
return "\n".join(lines)
@mcp.tool()
def cancel_reminder(reminder_id: str) -> str:
"""取消一个提醒"""
if reminder_id in reminders:
try:
scheduler.remove_job(reminder_id)
except Exception as e:
logger.warning(f"移除定时任务时出错:{e}")
del reminders[reminder_id]
save_reminders()
return f"已取消提醒 {reminder_id}"
return f"未找到提醒 {reminder_id}"
@mcp.tool()
def get_reminder_status(reminder_id: str) -> str:
"""获取提醒的状态"""
if reminder_id in reminders:
r = reminders[reminder_id]
trigger_time = r['trigger_time']
if isinstance(trigger_time, datetime):
time_str = trigger_time.strftime('%Y-%m-%d %H:%M:%S')
else:
time_str = str(trigger_time)
time_left = trigger_time - datetime.now()
hours_left = time_left.total_seconds() / 3600
return (
f"提醒状态:\n"
f"ID: {reminder_id}\n"
f"标题: {r['title']}\n"
f"消息: {r['message']}\n"
f"触发时间: {time_str}\n"
f"剩余时间: {hours_left:.2f} 小时"
)
return f"未找到提醒 {reminder_id}"
@mcp.tool()
def clear_all_reminders() -> str:
"""清除所有提醒"""
try:
# 移除所有定时任务
for job in scheduler.get_jobs():
job.remove()
# 清空提醒列表
reminders.clear()
save_reminders()
return f"已清除所有 {len(reminders)} 个提醒"
except Exception as e:
error_msg = f"清除提醒时发生错误:{str(e)}"
logger.error(error_msg)
return error_msg
def init_reminder_system():
"""初始化提醒系统"""
# 加载持久化数据
load_reminders()
logger.info("提醒系统初始化完成")
# 启动时初始化
init_reminder_system()
async def start_scheduler():
"""启动调度器"""
if not scheduler.running:
scheduler.start()
logger.info("调度器已启动")
def run_mcp_server():
"""运行MCP服务器,正确处理事件循环"""
import asyncio
try:
# 检查是否已有事件循环
try:
loop = asyncio.get_running_loop()
loop_is_running = True
except RuntimeError:
# 没有运行的事件循环,创建一个新的
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop_is_running = False
# 启动调度器
if not scheduler.running:
scheduler.start()
# 运行MCP服务器
if loop_is_running:
# 如果事件循环已经运行,使用create_task
loop.create_task(mcp.run_async())
else:
# 否则直接运行
mcp.run()
except KeyboardInterrupt:
print("\n正在关闭MCP服务器...")
if scheduler.running:
scheduler.shutdown()
print("服务器已关闭")
except Exception as e:
print(f"运行MCP服务器时出错: {e}")
if scheduler.running:
scheduler.shutdown()
raise
if __name__ == "__main__":
run_mcp_server()
如何将定时提醒 MCP 工具添加到 AI 客户端:
- 启动 MCP Server:
python your_reminder_server.py
- 在 AI 客户端配置文件中添加 MCP 服务器:
{
"mcpServers": {
"reminder-enhanced": {
"command": "python",
"args": ["/path/to/your_reminder_server.py"]
}
}
}
- 重启 AI 客户端,即可使用以下工具:
create_reminder: 创建分钟级定时提醒create_reminder_hours: 创建小时级定时提醒list_reminders: 列出所有待处理的提醒cancel_reminder: 取消指定提醒get_reminder_status: 获取提醒的状态clear_all_reminders: 清除所有提醒
实战三:Nacos 服务状态查询 MCP Server
# nacos_status_mcp.py
from mcp.server.fastmcp import FastMCP
import urllib.request
import json
from typing import Optional
mcp = FastMCP("nacos-status")
@mcp.tool()
def get_nacos_service_status(
service_name: str,
nacos_host: str = "localhost",
nacos_port: int = 8848,
namespace_id: str = "public"
) -> str:
"""查询 Nacos 中指定服务的状态
Args:
service_name: 要查询的服务名称
nacos_host: Nacos 服务器地址,默认为 localhost
nacos_port: Nacos 服务器端口,默认为 8848
namespace_id: 命名空间ID,默认为 public
"""
# 构建 Nacos 服务详情查询的 URL (Nacos 2.4.3 版本)
detail_url = f"http://{nacos_host}:{nacos_port}/nacos/v1/ns/instance/list?serviceName={urllib.parse.quote(service_name)}&namespaceId={namespace_id}"
try:
# 直接查询具体服务的详情
with urllib.request.urlopen(detail_url, timeout=10) as response:
service_detail = json.loads(response.read().decode())
# 检查服务是否存在
hosts = service_detail.get("hosts", [])
if not hosts and service_detail.get("dom") is None:
return f"服务 {service_name} 不存在于 Nacos 中或没有任何实例注册"
# 提取服务信息
service_info = service_detail.get("name", service_name)
# 统计健康和不健康实例数量
healthy_count = sum(1 for host in hosts if host.get("healthy", False))
total_count = len(hosts)
# 构建返回信息
result = f"📋 Nacos 服务状态: {service_info}\n"
result += f"实例总数: {total_count}\n"
result += f"健康实例: {healthy_count}\n"
result += f"不健康实例: {total_count - healthy_count}\n\n"
result += "实例详情:\n"
for host in hosts:
ip = host.get("ip", "N/A")
port = host.get("port", "N/A")
healthy = "✅ 健康" if host.get("healthy", False) else "❌ 不健康"
enabled = "启用" if host.get("enabled", False) else "禁用"
weight = host.get("weight", "N/A")
result += f" - {ip}:{port} [{healthy}, {enabled}, 权重:{weight}]\n"
return result
except Exception as e:
return f"查询 Nacos 服务状态失败: {str(e)}"
@mcp.tool()
def get_nacos_services_list(
nacos_host: str = "localhost",
nacos_port: int = 8848,
namespace_id: str = "public"
) -> str:
"""获取 Nacos 中所有的服务列表
Args:
nacos_host: Nacos 服务器地址,默认为 localhost
nacos_port: Nacos 服务器端口,默认为 8848
namespace_id: 命名空间ID,默认为 public
"""
url = f"http://{nacos_host}:{nacos_port}/nacos/v1/ns/catalog/services?namespaceId={namespace_id}&pageNo=1&pageSize=1000"
try:
with urllib.request.urlopen(url, timeout=10) as response:
data = json.loads(response.read().decode())
service_list = data.get("serviceList", [])
if not service_list:
return "Nacos 中没有注册任何服务"
result = f"📋 Nacos 服务列表 (共 {len(service_list)} 个服务):\n"
for service in service_list:
service_name = service.get("name", "N/A")
count = service.get("count", "N/A")
result += f" - {service_name} (实例数: {count})\n"
return result
except Exception as e:
return f"获取 Nacos 服务列表失败: {str(e)}"
if __name__ == "__main__":
mcp.run()
如何将 Nacos 服务状态查询 MCP 工具添加到 AI 客户端:
- 启动 MCP Server:
python nacos_status_mcp.py
- 在 AI 客户端配置文件中添加 MCP 服务器:
{
"mcpServers": {
"nacos-status": {
"command": "python",
"args": ["/path/to/nacos_status_mcp.py"]
}
}
}
- 重启 AI 客户端,即可使用以下工具:
get_nacos_service_status: 查询指定 Nacos 服务的状态get_nacos_services_list: 获取 Nacos 中所有的服务列表
使用效果:

Node.js/TypeScript 方式
安装依赖
npm install @modelcontextprotocol/sdk zod
实战一:Hello World
// hello_mcp.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
const server = new McpServer({
name: "hello-world",
version: "1.0.0",
});
// 定义工具
server.tool(
"say_hello",
"向指定的人说 Hello",
{ name: z.string().describe("对方的名字") },
async ({ name }) => ({
content: [{ type: "text", text: `Hello, ${name}! 🎉,我是来自MCP的问候` }],
})
);
// 启动服务器
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
}
main().catch(console.error);
如何将 TypeScript Hello World MCP 工具添加到 AI 客户端:
- 启动 MCP Server:
npx tsx hello_mcp.ts
- 在 AI 客户端配置文件中添加 MCP 服务器:
{
"mcpServers": {
"hello-world": {
"command": "npx",
"args": ["tsx", "/path/to/hello_mcp.ts"]
}
}
}
或者,如果您已编译为 JavaScript:
{
"mcpServers": {
"hello-world": {
"command": "node",
"args": ["/path/to/hello_mcp.js"]
}
}
}
- 重启 AI 客户端,即可使用
say_hello工具。
运行 TypeScript
# 方式一:使用 tsx 直接运行
npx tsx hello_mcp.ts
# 方式二:编译后运行
npx tsc && node hello_mcp.js
实战二:Nacos 服务状态查询
// nacos_status_mcp.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
const server = new McpServer({
name: "nacos-status",
version: "1.0.0",
});
server.tool(
"get_nacos_service_status",
"查询 Nacos 中指定服务的状态",
{
service_name: z.string().describe("要查询的服务名称"),
nacos_host: z.string().optional().default("localhost").describe("Nacos 服务器地址"),
nacos_port: z.number().optional().default(8848).describe("Nacos 服务器端口"),
namespace_id: z.string().optional().default("public").describe("命名空间ID")
},
async ({ service_name, nacos_host = "localhost", nacos_port = 8848, namespace_id = "public" }) => {
try {
// 查询具体服务的详情 (Nacos 2.4.3 版本)
const detailUrl = `http://${nacos_host}:${nacos_port}/nacos/v1/ns/instance/list?serviceName=${encodeURIComponent(service_name)}&namespaceId=${namespace_id}`;
const detailResponse = await fetch(detailUrl);
const serviceDetail = await detailResponse.json();
const hosts = serviceDetail.hosts || [];
if (hosts.length === 0 && serviceDetail.dom === undefined) {
return {
content: [{
type: "text",
text: `服务 ${service_name} 不存在于 Nacos 中或没有任何实例注册`
}]
};
}
// 统计健康和不健康实例数量
const healthyCount = hosts.filter((host: any) => host.healthy).length;
const totalCount = hosts.length;
// 构建返回信息
let result = `📋 Nacos 服务状态: ${serviceDetail.name || service_name}\n`;
result += `实例总数: ${totalCount}\n`;
result += `健康实例: ${healthyCount}\n`;
result += `不健康实例: ${totalCount - healthyCount}\n\n`;
result += "实例详情:\n";
hosts.forEach((host: any) => {
const ip = host.ip || "N/A";
const port = host.port || "N/A";
const healthy = host.healthy ? "✅ 健康" : "❌ 不健康";
const enabled = host.enabled ? "启用" : "禁用";
const weight = host.weight || "N/A";
result += ` - ${ip}:${port} [${healthy}, ${enabled}, 权重:${weight}]\n`;
});
return {
content: [{ type: "text", text: result }]
};
} catch (e) {
return {
content: [{ type: "text", text: `查询 Nacos 服务状态失败: ${e}` }]
};
}
}
);
server.tool(
"get_nacos_services_list",
"获取 Nacos 中所有的服务列表",
{
nacos_host: z.string().optional().default("localhost").describe("Nacos 服务器地址"),
nacos_port: z.number().optional().default(8848).describe("Nacos 服务器端口"),
namespace_id: z.string().optional().default("public").describe("命名空间ID")
},
async ({ nacos_host = "localhost", nacos_port = 8848, namespace_id = "public" }) => {
try {
const url = `http://${nacos_host}:${nacos_port}/nacos/v1/ns/catalog/services?namespaceId=${namespace_id}&pageNo=1&pageSize=1000`;
const response = await fetch(url);
const data = await response.json();
const serviceList = data.serviceList || [];
if (serviceList.length === 0) {
return {
content: [{
type: "text",
text: "Nacos 中没有注册任何服务"
}]
};
}
let result = `📋 Nacos 服务列表 (共 ${serviceList.length} 个服务):\n`;
serviceList.forEach((service: any) => {
const serviceName = service.name || "N/A";
const count = service.count || "N/A";
result += ` - ${serviceName} (实例数: ${count})\n`;
});
return {
content: [{ type: "text", text: result }]
};
} catch (e) {
return {
content: [{ type: "text", text: `获取 Nacos 服务列表失败: ${e}` }]
};
}
}
);
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
}
main().catch(console.error);
如何将 TypeScript Nacos 服务状态查询 MCP 工具添加到 AI 客户端:
- 启动 MCP Server:
npx tsx nacos_status_mcp.ts
- 在 AI 客户端配置文件中添加 MCP 服务器:
{
"mcpServers": {
"nacos-status": {
"command": "npx",
"args": ["tsx", "/path/to/nacos_status_mcp.ts"]
}
}
}
- 重启 AI 客户端,即可使用以下工具:
get_nacos_service_status: 查询指定 Nacos 服务的状态get_nacos_services_list: 获取 Nacos 中所有的服务列表
使用 Java SDK 开发 MCP Server
Java 也有官方的 MCP SDK,可以用来开发 MCP Server。这与 Python 和 Node.js 一样,都是使用官方提供的 SDK 来实现 MCP 协议。
安装 Java SDK 依赖
使用官方的 Java SDK 依赖:
Maven 依赖
<!-- Source: https://mvnrepository.com/artifact/io.modelcontextprotocol.sdk/mcp -->
<!-- 当前包括了 STDIO, SSE, and Streamable HTTP transport ,其他依赖见 https://java.sdk.modelcontextprotocol.io/latest/quickstart/ -->
<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp</artifactId>
<version>1.0.0</version>
</dependency>
Gradle 依赖
implementation 'io.modelcontextprotocol.sdk:mcp:1.0.0'
Java MCP Server 实现示例
使用 Java SDK 开发 MCP Server 的方式与 Python 和 Node.js 类似:
更多其他传输提供者方式(http、sse) 详细参考官方文档:https://java.sdk.modelcontextprotocol.io/latest/server/#stdio
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.json.jackson2.JacksonMcpJsonMapper;
import io.modelcontextprotocol.server.McpServer;
import io.modelcontextprotocol.server.McpServerFeatures;
import io.modelcontextprotocol.server.McpSyncServer;
import io.modelcontextprotocol.server.transport.StdioServerTransportProvider;
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
public class NacosStatusMcpServer {
public static void main(String[] args) throws Exception {
StdioServerTransportProvider transportProvider =
new StdioServerTransportProvider(new JacksonMcpJsonMapper(new ObjectMapper()));
// 创建服务器配置
McpSyncServer syncServer = McpServer.sync(transportProvider)
.serverInfo("nacos-status-java", "1.0.0")
.capabilities(McpSchema.ServerCapabilities.builder()
.resources(false, true) // Enable resource support with list changes
.tools(true) // Enable tool support with list changes
.prompts(true) // Enable prompt support with list changes
.completions() // Enable completions support
.logging() // Enable logging support
.build())
.build();
// 定义查询 Nacos 服务状态的工具
// Sync tool specification using builder
var serviceStatusTool = McpServerFeatures.SyncToolSpecification.builder()
.tool(McpSchema.Tool.builder()
.name("get_nacos_service_status")
.description("查询 Nacos 中指定服务的状态")
.inputSchema(new McpSchema.JsonSchema("object", Map.of(
"service_name", Map.of("type", "string", "description", "要查询的服务名称"),
"nacos_host",
Map.of("type", "string", "description", "Nacos 服务器地址", "default", "localhost"),
"nacos_port",
Map.of("type", "number", "description", "Nacos 服务器端口", "default", 8848),
"namespace_id",
Map.of("type", "string", "description", "命名空间ID", "default", "public")
), List.of("service_name"), false, null, null))
.build())
.callHandler((exchange, request) -> {
// Access arguments via request.arguments()
Map<String, Object> params = request.arguments();
String serviceName = params.getOrDefault("service_name", "").toString();
String host = params.get("nacos_host") != null ? params.get("nacos_host").toString() : "localhost";
int port =
params.get("nacos_port") != null ? Integer.parseInt(params.get("nacos_port").toString()) :
8848;
String namespace =
params.get("namespace_id") != null ? params.get("namespace_id").toString() : "public";
// 调用实际的 Nacos 查询逻辑
String result = queryNacosServiceStatus(serviceName, host, port, namespace);
// Tool implementation
return McpSchema.CallToolResult.builder()
.content(List.of(new McpSchema.TextContent(result)))
.build();
})
.build();
syncServer.addTool(serviceStatusTool);
// 定义查询 Nacos 服务列表的工具
var serviceListTool = McpServerFeatures.SyncToolSpecification.builder()
.tool(McpSchema.Tool.builder()
.name("get_nacos_services_list")
.description("获取 Nacos 中所有的服务列表")
.inputSchema(new McpSchema.JsonSchema("object", Map.of(
"nacos_host",
Map.of("type", "string", "description", "Nacos 服务器地址", "default", "localhost"),
"nacos_port",
Map.of("type", "number", "description", "Nacos 服务器端口", "default", 8848),
"namespace_id",
Map.of("type", "string", "description", "命名空间ID", "default", "public")
), List.of("service_name"), false, null, null))
.build())
.callHandler((exchange, request) -> {
// Access arguments via request.arguments()
Map<String, Object> params = request.arguments();
String host = params.get("nacos_host") != null ? params.get("nacos_host").toString() : "localhost";
int port =
params.get("nacos_port") != null ? Integer.parseInt(params.get("nacos_port").toString()) :
8848;
String namespace =
params.get("namespace_id") != null ? params.get("namespace_id").toString() : "public";
// 调用实际的 Nacos 查询逻辑
String result = queryNacosServicesList(host, port, namespace);
// Tool implementation
return McpSchema.CallToolResult.builder()
.content(List.of(new McpSchema.TextContent(result)))
.build();
})
.build();
syncServer.addTool(serviceListTool);
syncServer.notifyToolsListChanged();
}
// 实际的 Nacos 查询逻辑 - 服务状态
private static String queryNacosServiceStatus(String serviceName, String host, int port, String namespace) {
try {
String url = String.format("http://%s:%d/nacos/v1/ns/instance/list?serviceName=%s&namespaceId=%s",
host, port, serviceName, namespace);
// 使用 Java 内置 HTTP 客户端进行请求
java.net.http.HttpClient client = java.net.http.HttpClient.newHttpClient();
java.net.http.HttpRequest request = java.net.http.HttpRequest.newBuilder()
.uri(java.net.URI.create(url))
.timeout(java.time.Duration.ofSeconds(10))
.build();
java.net.http.HttpResponse<String> response = client.send(request,
java.net.http.HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
return "查询失败: HTTP " + response.statusCode();
}
// 解析响应结果 (需要 Jackson 或其他 JSON 库)
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
com.fasterxml.jackson.databind.JsonNode rootNode = mapper.readTree(response.body());
com.fasterxml.jackson.databind.JsonNode hostsNode = rootNode.get("hosts");
if (hostsNode == null || hostsNode.size() == 0) {
return String.format("服务 %s 不存在于 Nacos 中或没有任何实例注册", serviceName);
}
// 构建结果
StringBuilder result = new StringBuilder();
result.append(String.format("📋 Nacos 服务状态: %s\n", rootNode.get("name").asText()));
result.append(String.format("实例总数: %d\n", hostsNode.size()));
int healthyCount = 0;
for (com.fasterxml.jackson.databind.JsonNode host : hostsNode) {
if (host.get("healthy").asBoolean()) {
healthyCount++;
}
}
result.append(String.format("健康实例: %d\n", healthyCount));
result.append(String.format("不健康实例: %d\n\n", hostsNode.size() - healthyCount));
result.append("实例详情:\n");
for (com.fasterxml.jackson.databind.JsonNode host : hostsNode) {
String ip = host.get("ip").asText();
int instancePort = host.get("port").asInt();
String healthy = host.get("healthy").asBoolean() ? "健康" : "不健康";
String enabled = host.get("enabled").asBoolean() ? "启用" : "禁用";
double weight = host.get("weight").asDouble();
result.append(
String.format(" - %s:%d [%s, %s, 权重:%.2f]\n", ip, instancePort, healthy, enabled, weight));
}
return result.toString();
} catch (Exception e) {
return String.format("查询 Nacos 服务状态失败: %s", e.getMessage());
}
}
// 实际的 Nacos 查询逻辑 - 服务列表
private static String queryNacosServicesList(String host, int port, String namespace) {
try {
String url =
String.format("http://%s:%d/nacos/v1/ns/catalog/services?namespaceId=%s&pageNo=1&pageSize=1000",
host, port, namespace);
java.net.http.HttpClient client = java.net.http.HttpClient.newHttpClient();
java.net.http.HttpRequest request = java.net.http.HttpRequest.newBuilder()
.uri(java.net.URI.create(url))
.timeout(java.time.Duration.ofSeconds(10))
.build();
java.net.http.HttpResponse<String> response = client.send(request,
java.net.http.HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
return "查询失败: HTTP " + response.statusCode();
}
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
com.fasterxml.jackson.databind.JsonNode rootNode = mapper.readTree(response.body());
com.fasterxml.jackson.databind.JsonNode serviceListNode = rootNode.get("serviceList");
if (serviceListNode == null || serviceListNode.size() == 0) {
return "Nacos 中没有注册任何服务";
}
StringBuilder result = new StringBuilder();
result.append(String.format("📋 Nacos 服务列表 (共 %d 个服务):\n", serviceListNode.size()));
for (com.fasterxml.jackson.databind.JsonNode service : serviceListNode) {
String serviceName = service.get("name").asText();
int count = service.get("count").asInt();
result.append(String.format(" - %s (实例数: %d)\n", serviceName, count));
}
return result.toString();
} catch (Exception e) {
return String.format("获取 Nacos 服务列表失败: %s", e.getMessage());
}
}
}
在 Spring Boot 中快速定义 MCP 组件
如果您正在使用 Spring Boot,可以利用 Spring AI 提供的 MCP 服务器启动器:
Spring Boot 依赖
<!-- Source: https://mvnrepository.com/artifact/org.springframework.ai/spring-ai-mcp-server-spring-boot-starter -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-mcp-server-spring-boot-starter</artifactId>
<version>1.0.0-M6</version>
<scope>compile</scope>
</dependency>
Spring Boot MCP 组件示例
import org.springframework.ai.mcp.server.annotation.McpTool;
import org.springframework.stereotype.Component;
@Component
public class NacosMcpTools {
@McpTool(
name = "get_nacos_service_status",
description = "查询 Nacos 中指定服务的状态"
)
public String getNacosServiceStatus(
String serviceName,
String nacosHost,
Integer nacosPort,
String namespaceId
) {
// 设置默认值
if (nacosHost == null) nacosHost = "localhost";
if (nacosPort == null) nacosPort = 8848;
if (namespaceId == null) namespaceId = "public";
return queryNacosServiceStatus(serviceName, nacosHost, nacosPort, namespaceId);
}
@McpTool(
name = "get_nacos_services_list",
description = "获取 Nacos 中所有的服务列表"
)
public String getNacosServicesList(
String nacosHost,
Integer nacosPort,
String namespaceId
) {
// 设置默认值
if (nacosHost == null) nacosHost = "localhost";
if (nacosPort == null) nacosPort = 8848;
if (namespaceId == null) namespaceId = "public";
return queryNacosServicesList(nacosHost, nacosPort, namespaceId);
}
// 实现查询逻辑的方法(与上面相同)
private String queryNacosServiceStatus(String serviceName, String host, int port, String namespace) {
// 实现查询逻辑
// ...
return "服务状态结果";
}
private String queryNacosServicesList(String host, int port, String namespace) {
// 实现查询逻辑
// ...
return "服务列表结果";
}
}
配置 Java MCP Server 到 AI 客户端
- 编译 Java 项目:
javac -cp ".:mcp-1.0.0.jar:jackson-core-2.15.2.jar:jackson-databind-2.15.2.jar" NacosStatusMcpServer.java
- 启动 MCP Server:
java -cp ".:mcp-1.0.0.jar:jackson-core-2.15.2.jar:jackson-databind-2.15.2.jar" NacosStatusMcpServer
- 在 AI 客户端配置文件中添加 MCP 服务器:
{
"mcpServers": {
"nacos-status-java": {
"command": "java",
"args": [
"-cp",
"/path/to/your/classes:/path/to/dependencies/*",
"NacosStatusMcpServer"
]
}
}
}
- 重启 AI 客户端,即可使用 Java 实现的 Nacos 服务查询工具。
Java SDK 的优势
使用 Java SDK 的优势包括:
- 协议兼容性:确保完全符合 MCP 协议规范
- 类型安全:利用 Java 的强类型系统减少错误
- 性能:在 Java 环境中运行效率更高
- 集成性:更容易与现有的 Java 应用集成
- Spring Boot 支持:可以轻松集成到 Spring Boot 应用中
这样,Java 开发者就可以使用与 Python 和 Node.js 相同的模式来开发 MCP Server 了。
三种传输方式详解
1. stdio(推荐本地使用)
# 默认方式,无需修改
if __name__ == "__main__":
mcp.run() # 等同于 mcp.run(transport="stdio")
配置到 AI 客户端:
{
"mcpServers": {
"my-server": {
"command": "python",
"args": ["/path/to/server.py"]
}
}
}
2. SSE(Server-Sent Events)
if __name__ == "__main__":
mcp.run(transport="sse") # 启动 HTTP 服务器
配置到 AI 客户端:
{
"mcpServers": {
"my-server": {
"url": "http://localhost:8000/sse"
}
}
}
3. Streamable HTTP(推荐远程部署)
if __name__ == "__main__":
mcp.run(transport="streamable-http")
配置到 AI 客户端:
{
"mcpServers": {
"my-server": {
"url": "http://your-server:8000/mcp"
}
}
}
如何验证 MCP Server 是否正常
方式一:使用 MCP Inspector(推荐)
官方调试神器,可视化测试:
npx -y @modelcontextprotocol/inspector python your_server.py
会自动打开浏览器界面,可以:
- 查看所有工具定义
- 测试工具调用
- 查看请求/响应日志
方式二:命令行测试
# 测试 Python MCP Server
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list"}' | python your_server.py
# 如果校验比较严格,需要强制要求先完成初始化握手,才会接受 tools/list 等后续请求,那上面的方式可能无法正常返回方法列表,需要先建立链接,如下所示
(
# 1. initialize 请求(必须先发这个)
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"test-cli","version":"1.0"}}}'
# 2. initialized 通知(很多 server 要求这个,注意:通知**没有 id**)
echo '{"jsonrpc":"2.0","method":"notifications/initialized","params":{}}'
# 3. 再发 tools/list
echo '{"jsonrpc":"2.0","id":3,"method":"tools/list","params":{}}'
) | python your_server.py
# 测试 Node.js MCP Server
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list"}' | npx tsx your_server.ts
正常输出应该包含 tools/list 的响应。
调用其中的某个mcp方法示例:
# 第一个输入主要是为了建立连接,第二个输入 tools/call 才是真正的 mcp 方法调用,参数根据具体的调用方法决定是否调用
( echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"test-cli","version":"1.0"}}}'; \
echo '{"jsonrpc":"2.0","id":10,"method":"tools/call","params":{"name":"get_nacos_services_list","arguments":{"nacos_host":"172.16.18.164","nacos_port":28848,"namespace_id":"public"}}}'; \
) | python nacos_status_mcp.py
方式三:配置到 AI 客户端测试
以 Claude Desktop 为例:
-
编辑配置文件:
- macOS:
~/Library/Application Support/Claude/claude_desktop_config.json - Windows:
%APPDATA%\Claude\claude_desktop_config.json
- macOS:
-
添加你的 MCP Server:
{
"mcpServers": {
"hello": {
"command": "python",
"args": ["/path/to/hello_mcp.py"]
}
}
}
-
重启 Claude Desktop
-
测试:对 AI 说 "用 say_hello 工具跟小明打个招呼"
方式四:HTTP 模式测试
如果使用 SSE 或 streamable-http:
# 启动服务器
python your_server.py # 默认监听 127.0.0.1:8000
# 另一个终端测试
curl http://localhost:8000/mcp \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/list"}'
调试技巧
使用 Context 记录日志
from mcp.server.fastmcp import FastMCP, Context
from mcp.server.session import ServerSession
mcp = FastMCP("debug-example")
@mcp.tool()
async def my_tool(data: str, ctx: Context[ServerSession, None]) -> str:
"""带日志的工具"""
await ctx.debug(f"收到数据: {data}")
await ctx.info("开始处理...")
# ... 处理逻辑
await ctx.info("处理完成!")
return f"结果: {data}"
常见问题排查
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| AI 找不到工具 | 函数没有 @mcp.tool() 装饰器 | 添加装饰器 |
| 参数解析失败 | 类型注解缺失或错误 | 使用正确的 Python 类型 |
| AI 不知道工具用途 | 缺少 docstring | 添加 """描述""" |
| 启动后无响应 | print 干扰协议 | 用 ctx.info() 或 logging |
| HTTP 模式连不上 | 端口被占用或未启动 | 检查端口和防火墙 |
MCP Server 发布指南
Python 项目结构
my-mcp-server/
├── src/
│ └── server.py
├── pyproject.toml
└── README.md
pyproject.toml
[project]
name = "my-mcp-server"
version = "0.1.0"
description = "My awesome MCP Server"
dependencies = ["mcp"]
[project.scripts]
my-mcp-server = "src.server:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
发布到 PyPI
pip install build twine
python -m build
twine upload dist/*
总结
开发 MCP Server 的核心心法:
| 要点 | 说明 |
|---|---|
| 选对 SDK | Python 用 mcp,Node.js 用 @modelcontextprotocol/sdk |
| 装饰器定义工具 | Python: @mcp.tool(),TS: server.tool() |
| 类型注解必须 | FastMCP 自动从签名生成 JSON Schema |
| docstring 不能省 | AI 靠它理解工具用途 |
| 调试用 Inspector | npx @modelcontextprotocol/inspector 是神器 |
| 选对传输方式 | 本地用 stdio,远程用 streamable-http |
福利:学习资源
官方文档: https://modelcontextprotocol.io
Python SDK: https://github.com/modelcontextprotocol/python-sdk
TypeScript SDK: https://github.com/modelcontextprotocol/typescript-sdk
Java SDK https://github.com/modelcontextprotocol/java-sdk
社区 MCP Server 合集: https://github.com/punkpeye/awesome-mcp-servers
更多其他语言的 mcp sdk https://github.com/modelcontextprotocol

觉得有用的话,点个赞 + 转发 + 关注!
评论区见,有问题随时问!
#MCP #AI开发 #Python #TypeScript #开发者工具 #技术干货
Q.E.D.


