依赖包
pycryptodome==3.23.0 PyJWT==2.10.1 PyYAML==6.0.3 SQLAlchemy==2.0.45 PyMySQL==1.1.2 fastmcp==2.14.1
main方法
from fastmcp import FastMCP from starlette.requests import Request from starlette.responses import JSONResponse, Response from tools.BaiduMapService import baidu_weather_forecasts_tool from tools.TencentSearchPro import tencent_cloud_search_tool # 初始化服务器 mcp = FastMCP("cmiot-mcp-server") mcp.add_tool(tencent_cloud_search_tool) mcp.add_tool(baidu_weather_forecasts_tool) # 自定义接口:健康监测 .custom_route("/probe", methods=["GET", "POST"]) async def probe(request: Request) -> Response: return JSONResponse({"status": "ok"}) if __name__ == "__main__": mcp.run( transport="http", host="0.0.0.0", port=8000, path="/mcp" )
Tool
tencent_cloud_search_tool = Tool.from_function( query_tencent_cloud_search, name="tencent_cloud_search", description="联网搜索功能。如果用户想要了解最近的或者实时新闻之类的信息,用本接口搜索互联网数据。", tags={"最近", "最新", "实时", "实时新闻", "实时天气"}, output_schema={ "type": "object", "properties": { "results": { "type": "array", "items": { "type": "object", "properties": { "title": { "type": "string", "description": "标题" }, "content": { "type": "string", "description": "内容" }, "url": { "type": "string", "description": "内容链接" } } } } } } )
具体实现
async def query_tencent_cloud_search(query: str) -> dict: """ 联网搜索功能。如果用户想要了解最近的或者实时新闻之类的信息,用本接口搜索互联网数据。 :param query: 用户想要联网搜索的内容。 :return: 返回实时数据列表。检索到的内容包括:标题title,文本内容content,内容链接url """ start_time = datetime.now() uid = extract_user_id_from_auth_header() if uid == 0: return {"results": []} results = tencentcloud_search_online(query) end_time = datetime.now() total_cost = (end_time - start_time).total_seconds() logger.info(f"uid: {uid}, cost: {total_cost}s, query: {query}, results: {results}") return {"results": results}
鉴权
from datetime import datetime from fastmcp.server.dependencies import get_http_headers from dao.UserDao import user_secret_key_dict from utils.LoggerManager import logger from utils.RSAHelper import parse_rsa_decrypted def extract_user_id_from_auth_header(): # 用户ID默认为0 result_uid = 0 try: headers = get_http_headers() if headers: authorization = headers.get("authorization") if authorization: authorization = authorization[7:] logger.info(f"extract_user_id authorization: {authorization}") # 解密 uid, secret_key, timestamp = parse_rsa_decrypted(authorization) # logger.info(f"extract_user_id uid: {uid}, timestamp: {timestamp}") # 如果时间超过5分钟,则认为失效 timeout = timestamp + 300 < int(datetime.now().timestamp()) # logger.info(f"extract_user_id timeout: {timeout}") # 如果没有失效 if not timeout: server_secret = user_secret_key_dict.get(uid) # 校验两个密钥是否相同 if server_secret == secret_key: result_uid = uid except Exception as e: logger.info(f"extract_user_id Exception: {e}") logger.info(f"extract_user_id result_uid: {result_uid}") return result_uid