はじめに
最近注目を集めているModel Context Protocol(MCP)は、大規模言語モデル(LLM)に外部ツールやサービスへのアクセス能力を提供するための標準プロトコルです。中でも公式が提供しているRust SDKはあまり注目されていませんが、私自身が必要としているためこのドキュメントを作成します。
以前は自前で実装していましたが、公式SDKが公開されたことでそちらを検討するのが良いと考えました。私の実装と比較してかなり洗練されている点が多く、多くの学びを得ることができました。
この記事では、MCP Rust SDKの内部実装を深掘りし、どのようにRustの強力な型システムと非同期プログラミングモデルが活用されているかを解説します。コードの詳細な分析を通して、Rustの優れた設計パターンや実装テクニックを学びましょう。
このブログが良ければ読者になったり、nwiizoをフォロワーしてくれるのもありがたいです。
MCP とは何か?
記事を始める前に、まず MCP (Model Context Protocol) について簡単に説明しましょう。MCP についてより詳しい情報は、公式ドキュメント modelcontextprotocol.io や Anthropic の Model Context Protocol に関する記事 を参照してください。

MCP は Cline や Cursor などの LLM クライアントが外部サービスと連携するためのプロトコルです。従来の LLM は学習したデータに基づいて「考える」ことしかできませんでしたが、MCP を通じて外部と連携し、「行動する」能力を持つことができます。
具体的には、MCP を使うことで以下のようなことが可能になります。
- Notion のファイル編集
- Supabase のデータベースクエリ
- Cloudflare のステータスチェック
- ローカルファイルの編集や操作
MCP がプロトコルとして統一されていることで、LLM プロバイダーやサービスを柔軟に切り替えることができるという大きなメリットがあります。
MCP の仕組み
MCP は基本的に JSON-RPC ベースのプロトコルで、詳細な仕様は modelcontextprotocol.io/docs/concepts/transports#message-format で確認できます。主要な構成要素は以下のとおりです。
- リソース(Resources):データへのアクセスを提供(REST API の GET に相当)
- ツール(Tools):アクションの実行を可能にする(REST API の POST に相当)
- プロンプト(Prompts):LLM がどのようにサービスを使うべきかのガイダンス
MCP の実装をサポートするための公式 SDK が複数の言語で提供されています(2024年3月27日 現在)。
ちなみに今後MCPがどうなってゆくかはRoadmapが存在しているのでぜひ、こちらを読んでもらいたいです。
SDKの全体構成 - 明確な関心の分離
MCP Rust SDKは、複数のクレートに明確に分離されており、それぞれが特定の責任を担っています。
rust-sdk/
├── crates/
│   ├── mcp-core/      # プロトコルの基本型とインターフェース
│   ├── mcp-client/    # クライアント実装
│   ├── mcp-server/    # サーバー実装
│   └── mcp-macros/    # ツール実装を簡素化するマクロ
└── examples/
    ├── clients/       # クライアント使用例
    ├── servers/       # サーバー実装例
    └── macros/        # マクロ使用例
この設計はRustエコシステムでよく見られる「関心の分離」パターンに従っています。各クレートがひとつの責任を持ち、依存関係も明確です。こうすることで、メンテナンス性と再利用性が大幅に向上します。
特に注目すべきは、コア型定義とプロトコル実装をmcp-coreに分離している点です。これにより、クライアントとサーバーが共通の型定義を使いながら、それぞれ独立して実装・進化できる柔軟性を確保しています。
mcp-core: 堅牢な基盤となる型定義
mcp-coreクレートは、MCPプロトコルの心臓部とも言える基本型とインターフェースを提供しています。ここでの実装がSDK全体の品質を大きく左右します。
JSON-RPCメッセージの巧妙な実装
MCPはJSON-RPCプロトコルをベースにしていますが、その実装が非常に興味深いものになっています
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] #[serde(untagged, try_from = "JsonRpcRaw")] pub enum JsonRpcMessage { Request(JsonRpcRequest), Response(JsonRpcResponse), Notification(JsonRpcNotification), Error(JsonRpcError), Nil, // used to respond to notifications } #[derive(Debug, Serialize, Deserialize)] struct JsonRpcRaw { jsonrpc: String, #[serde(skip_serializing_if = "Option::is_none")] id: Option<u64>, #[serde(skip_serializing_if = "Option::is_none")] method: Option<String>, #[serde(skip_serializing_if = "Option::is_none")] params: Option<Value>, #[serde(skip_serializing_if = "Option::is_none")] result: Option<Value>, #[serde(skip_serializing_if = "Option::is_none")] error: Option<ErrorData>, } impl TryFrom<JsonRpcRaw> for JsonRpcMessage { type Error = String; fn try_from(raw: JsonRpcRaw) -> Result<Self, <Self as TryFrom<JsonRpcRaw>>::Error> { // If it has an error field, it's an error response if raw.error.is_some() { return Ok(JsonRpcMessage::Error(JsonRpcError { jsonrpc: raw.jsonrpc, id: raw.id, error: raw.error.unwrap(), })); } // If it has a result field, it's a response if raw.result.is_some() { return Ok(JsonRpcMessage::Response(JsonRpcResponse { jsonrpc: raw.jsonrpc, id: raw.id, result: raw.result, error: None, })); } // If we have a method, it's either a notification or request if let Some(method) = raw.method { if raw.id.is_none() { return Ok(JsonRpcMessage::Notification(JsonRpcNotification { jsonrpc: raw.jsonrpc, method, params: raw.params, })); } return Ok(JsonRpcMessage::Request(JsonRpcRequest { jsonrpc: raw.jsonrpc, id: raw.id, method, params: raw.params, })); } // If we have no method and no result/error, it's a nil response if raw.id.is_none() && raw.result.is_none() && raw.error.is_none() { return Ok(JsonRpcMessage::Nil); } // If we get here, something is wrong with the message Err(format!( "Invalid JSON-RPC message format: id={:?}, method={:?}, result={:?}, error={:?}", raw.id, raw.method, raw.result, raw.error )) } }
この実装の素晴らしい点は3つあります。
- #[serde(untagged)]アノテーションの活用:JSONデータの構造に基づいて適切な列挙型バリアントに自動的にデシリアライズします。これにより、外部向けのJSONはシンプルな形式を維持できます。
- try_from = "JsonRpcRaw"による変換の分離:複雑な変換ロジックを別の型に委譲し、コードの見通しを良くしています。これはRustの型システムを活用した優れたパターンです。
- 段階的な判断ロジック:各メッセージタイプの判定条件を明確にし、順番に評価することで複雑な条件分岐を読みやすく実装しています。 
これらの工夫により、複雑なJSON-RPCプロトコルの処理を堅牢かつ読みやすいコードで実現しています。特に注目すべきは、Rustの型システムを最大限に活用し、コンパイル時の型チェックでバグを防ぐ設計になっている点です。
豊かなコンテンツ型システム
MCPはさまざまなコンテンツ型(テキスト、画像、リソースなど)をサポートしています。その実装も非常に洗練されています。
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] pub enum Content { Text(TextContent), Image(ImageContent), Resource(EmbeddedResource), } impl Content { pub fn text<S: Into<String>>(text: S) -> Self { Content::Text(TextContent { text: text.into(), annotations: None, }) } pub fn image<S: Into<String>, T: Into<String>>(data: S, mime_type: T) -> Self { Content::Image(ImageContent { data: data.into(), mime_type: mime_type.into(), annotations: None, }) } pub fn resource(resource: ResourceContents) -> Self { Content::Resource(EmbeddedResource { resource, annotations: None, }) } // その他のメソッド... }
この実装には、使いやすさと型安全性を両立する工夫がいくつもあります。
- タグ付き列挙型の活用: - #[serde(tag = "type")]は、JSONに「type」フィールドを追加し、その値に基づいて適切な型にデシリアライズします。これはJSONとRustの型を自然にマッピングする優れた方法です。
- ファクトリメソッド: - text(),- image(),- resource()などのメソッドは、わかりやすい方法でコンテンツを作成できるようにしています。これは、制約を守りながら簡潔にオブジェクトを作成するのに役立ちます。
- ジェネリックな引数: - S: Into<String>のようなトレイト境界を使うことで、文字列リテラル、- String、- &strなど、さまざまな文字列型を引数として受け入れることができます。これは使い勝手を大幅に向上させます。
この設計は、使いやすいAPIと堅牢な内部実装のバランスが見事です。とりわけ、列挙型とそのバリアントを活用してドメインの概念を表現する点はRustらしいアプローチと言えるでしょう。
mcp-client: 柔軟なトランスポートと抽象化
mcp-clientクレートは、MCPサーバーとの通信を担当します。特に注目すべきは、トランスポート層の抽象化です。
トランスポートの抽象化
MCPサーバーとの通信には複数の方法(標準入出力、HTTP、WebSocketなど)が考えられます。このSDKはそれらを抽象化するための優れた設計を採用しています。
/// A message that can be sent through the transport #[derive(Debug)] pub struct TransportMessage { /// The JSON-RPC message to send pub message: JsonRpcMessage, /// Channel to receive the response on (None for notifications) pub response_tx: Option<oneshot::Sender<Result<JsonRpcMessage, Error>>>, } /// A generic asynchronous transport trait with channel-based communication #[async_trait] pub trait Transport { type Handle: TransportHandle; /// Start the transport and establish the underlying connection. /// Returns the transport handle for sending messages. async fn start(&self) -> Result<Self::Handle, Error>; /// Close the transport and free any resources. async fn close(&self) -> Result<(), Error>; } #[async_trait] pub trait TransportHandle: Send + Sync + Clone + 'static { async fn send(&self, message: JsonRpcMessage) -> Result<JsonRpcMessage, Error>; }
この抽象化にはいくつもの巧妙な工夫があります。
- 関連型(associated type)の活用: - type Handle: TransportHandleという関連型を使うことで、トランスポートとそのハンドルを型レベルで紐づけています。これにより、異なるトランスポート実装が異なるハンドル型を持つことができます。
- 非同期トレイト: - #[async_trait]マクロを使って、非同期メソッドをトレイトに含められるようにしています。これは標準のRustでは直接サポートされていない機能です。
- 分離された開始と通信: - startメソッドで接続を確立し、その結果として得られるハンドルを使って通信するという2段階のパターンを採用しています。これにより、接続のライフサイクルとメッセージ送受信を明確に分離できます。
このような抽象化により、新しいトランスポート実装を追加するのが容易になりますし、クライアント側のコードはトランスポートの詳細を気にせず書けるようになります。
StdioTransportの実装
標準入出力を使ったトランスポート実装も見てみましょう:
pub struct StdioTransport { command: String, args: Vec<String>, env: HashMap<String, String>, } impl StdioTransport { pub fn new<S: Into<String>>( command: S, args: Vec<String>, env: HashMap<String, String>, ) -> Self { Self { command: command.into(), args, env, } } async fn spawn_process(&self) -> Result<(Child, ChildStdin, ChildStdout, ChildStderr), Error> { let mut command = Command::new(&self.command); command .envs(&self.env) .args(&self.args) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .kill_on_drop(true); // Set process group only on Unix systems #[cfg(unix)] command.process_group(0); // don't inherit signal handling from parent process // Hide console window on Windows #[cfg(windows)] command.creation_flags(0x08000000); // CREATE_NO_WINDOW flag let mut process = command .spawn() .map_err(|e| Error::StdioProcessError(e.to_string()))?; let stdin = process .stdin .take() .ok_or_else(|| Error::StdioProcessError("Failed to get stdin".into()))?; let stdout = process .stdout .take() .ok_or_else(|| Error::StdioProcessError("Failed to get stdout".into()))?; let stderr = process .stderr .take() .ok_or_else(|| Error::StdioProcessError("Failed to get stderr".into()))?; Ok((process, stdin, stdout, stderr)) } }
この実装の素晴らしい点を見てみましょう:
- プラットフォーム固有の最適化: - #[cfg(unix)]と- #[cfg(windows)]を使って、各OSに最適な設定を行っています。これはRustの条件付きコンパイルの機能をうまく活用した例です。
- リソース管理: - kill_on_drop(true)を使って、オブジェクトが破棄された時に子プロセスも確実に終了するよう保証しています。これはリソースリークを防ぐための重要な安全策です。
- エラーハンドリング: - ok_or_elseのような関数を使って、エラーケースを明確に処理しています。これにより、どのような状況でもプログラムが予測可能な動作をするようになります。
この実装は、複雑な子プロセス操作を安全かつ効率的に行うための優れた例です。特に、クロスプラットフォームな動作を保証するための配慮が随所に見られます。
クライアント本体の実装
最後に、クライアント本体の実装を見てみましょう:
pub struct McpClient<S> where S: Service<JsonRpcMessage, Response = JsonRpcMessage> + Clone + Send + Sync + 'static, S::Error: Into<Error>, S::Future: Send, { service: Mutex<S>, next_id: AtomicU64, server_capabilities: Option<ServerCapabilities>, server_info: Option<Implementation>, } impl<S> McpClient<S> where S: Service<JsonRpcMessage, Response = JsonRpcMessage> + Clone + Send + Sync + 'static, S::Error: Into<Error>, S::Future: Send, { pub fn new(service: S) -> Self { Self { service: Mutex::new(service), next_id: AtomicU64::new(1), server_capabilities: None, server_info: None, } } /// Send a JSON-RPC request and check we don't get an error response. async fn send_request<R>(&self, method: &str, params: Value) -> Result<R, Error> where R: for<'de> Deserialize<'de>, { let mut service = self.service.lock().await; service.ready().await.map_err(|_| Error::NotReady)?; let id = self.next_id.fetch_add(1, Ordering::SeqCst); let request = JsonRpcMessage::Request(JsonRpcRequest { jsonrpc: "2.0".to_string(), id: Some(id), method: method.to_string(), params: Some(params.clone()), }); let response_msg = service .call(request) .await .map_err(|e| Error::McpServerError { server: self .server_info .as_ref() .map(|s| s.name.clone()) .unwrap_or("".to_string()), method: method.to_string(), // we don't need include params because it can be really large source: Box::new(e.into()), })?; // ... レスポンス処理 ... } }
この実装には、Rustの現代的な非同期プログラミング技術が凝縮されています。
- Tower Serviceの活用:低レベルのトランスポート詳細を抽象化するために、Tower crateの - Serviceトレイトを使用しています。これはミドルウェアの組み合わせや機能拡張を容易にします。
- ジェネリックな戻り値型: - send_request<R>のようなジェネリック関数を使って、様々な型のレスポンスを受け取れるようにしています。これはクライアントAPIを使いやすくする工夫です。
- スレッドセーフなカウンター: - AtomicU64を使って、スレッドセーフなID生成を実現しています。これは並行処理を安全に行うための基本的なテクニックです。
- 非同期排他制御: - Mutex<S>を使って、非同期コンテキストでのサービスアクセスを管理しています。- tokio::sync::Mutexはブロッキングせずに排他制御を行える優れたプリミティブです。
これらの機能を組み合わせることで、堅牢で効率的、かつ使いやすいクライアントAPIを実現しています。特にTowerのサービス抽象化を活用することで、将来的な拡張性も確保されています。
mcp-server: モジュラーなサーバー設計
mcp-serverクレートは、MCPサーバーをRustで実装するためのフレームワークを提供しています。ここでもいくつか興味深い実装が見られます。
ByteTransportの実装
#[pin_project] pub struct ByteTransport<R, W> { // Reader is a BufReader on the underlying stream (stdin or similar) buffering // the underlying data across poll calls, we clear one line (\n) during each // iteration of poll_next from this buffer #[pin] reader: BufReader<R>, #[pin] writer: W, } impl<R, W> ByteTransport<R, W> where R: AsyncRead, W: AsyncWrite, { pub fn new(reader: R, writer: W) -> Self { Self { // Default BufReader capacity is 8 * 1024, increase this to 2MB to the file size limit // allows the buffer to have the capacity to read very large calls reader: BufReader::with_capacity(2 * 1024 * 1024, reader), writer, } } } impl<R, W> Stream for ByteTransport<R, W> where R: AsyncRead + Unpin, W: AsyncWrite + Unpin, { type Item = Result<JsonRpcMessage, TransportError>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // Poll実装... } }
この実装には、Rustの非同期I/Oに関する高度な知識が詰まっています。
- 巨大なバッファサイズ:デフォルトの8KBではなく2MBという大きなバッファを使用し、大量のデータを効率的に処理できるようにしています。これは実際のユースケースに基づく現実的な最適化でしょう。 
- pin-projectの活用:非同期処理でピン留めが必要なフィールドを持つ構造体を安全に扱うために、 - pin-projectクレートを使用しています。これは非同期Rustの複雑な問題を解決するための定石です。
- Streamトレイトの実装: - Streamトレイトを実装することで、メッセージを非同期ストリームとして扱えるようにしています。これは非同期処理パターンとの自然な統合を可能にします。
このようなトランスポート実装により、サーバーは効率的に大量のメッセージを処理できるようになります。また、バッファ管理や非同期I/Oの複雑さは抽象化されるため、上位層のコードはビジネスロジックに集中できます。
優れたRouterトレイト
MCPサーバーの中核となるのがRouterトレイトです。
pub trait Router: Send + Sync + 'static { fn name(&self) -> String; fn instructions(&self) -> String; fn capabilities(&self) -> ServerCapabilities; fn list_tools(&self) -> Vec<mcp_core::tool::Tool>; fn call_tool( &self, tool_name: &str, arguments: Value, ) -> Pin<Box<dyn Future<Output = Result<Vec<Content>, ToolError>> + Send + 'static>>; fn list_resources(&self) -> Vec<mcp_core::resource::Resource>; fn read_resource( &self, uri: &str, ) -> Pin<Box<dyn Future<Output = Result<String, ResourceError>> + Send + 'static>>; fn list_prompts(&self) -> Vec<Prompt>; fn get_prompt(&self, prompt_name: &str) -> PromptFuture; // 以下はデフォルト実装を持つヘルパーメソッド fn create_response(&self, id: Option<u64>) -> JsonRpcResponse { ... } fn handle_initialize(&self, req: JsonRpcRequest) -> impl Future<Output = Result<JsonRpcResponse, RouterError>> + Send { ... } // その他のハンドラメソッド... }
この設計の素晴らしさは以下の点にあります。
- 最小限の実装要件:ユーザーが実装すべきメソッドは基本的な機能に限られており、複雑なプロトコル処理はデフォルト実装として提供されています。これにより、ルーターの実装がシンプルになり、ドメインロジックに集中できます。 
- Futureを返すメソッド:ツール呼び出しなどの処理は非同期で行われるケースが多いため、 - Pin<Box<dyn Future<...>>>を返すメソッドになっています。これにより、実装者は任意の非同期処理を行う自由を持ちます。
- 明確なトレイト境界: - Send + Sync + 'staticという境界により、マルチスレッド環境での使用を安全に行えるようになっています。これは実際のサーバー環境では不可欠な制約です。
この設計は、「使いやすさ」と「柔軟性」のバランスがとれた素晴らしい例です。初心者でも簡単に基本的なルーターを実装できますが、高度なユースケースに対応する拡張性も備えています。
RouterServiceの実装
pub struct RouterService<T>(pub T); impl<T> Service<JsonRpcRequest> for RouterService<T> where T: Router + Clone + Send + Sync + 'static, { type Response = JsonRpcResponse; type Error = BoxError; type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } fn call(&mut self, req: JsonRpcRequest) -> Self::Future { let this = self.0.clone(); Box::pin(async move { let result = match req.method.as_str() { "initialize" => this.handle_initialize(req).await, "tools/list" => this.handle_tools_list(req).await, "tools/call" => this.handle_tools_call(req).await, "resources/list" => this.handle_resources_list(req).await, "resources/read" => this.handle_resources_read(req).await, "prompts/list" => this.handle_prompts_list(req).await, "prompts/get" => this.handle_prompts_get(req).await, _ => { let mut response = this.create_response(req.id); response.error = Some(RouterError::MethodNotFound(req.method).into()); Ok(response) } }; result.map_err(BoxError::from) }) } }
この実装は、デザインパターンの「アダプターパターン」を思わせる優れた例です。
- シンプルなラッパー型: - RouterService<T>(pub T)というシンプルな新型でRouterトレイトをTowerのServiceトレイトに適応させています。これは非常にエレガントなアプローチです。
- メソッドディスパッチ:リクエストのmethod文字列に基づいて適切なハンドラメソッドに処理をディスパッチしています。これはルーティングのための直感的で効率的な実装です。 
- Clone要件:非同期クロージャ内でルーターを使用するために- Cloneトレイト境界を要求しています。これにより、所有権の問題を簡単に解決できます。
このようなラッパー型とディスパッチロジックにより、開発者はRouterトレイトの実装に集中でき、ServiceやTowerのような低レベルの詳細を気にする必要がなくなります。これは抽象化の良い例です。
mcp-macros: 宣言的ツール定義の魔法
最後に、mcp-macrosクレートの中核である#[tool]マクロを見てみましょう:
#[proc_macro_attribute] pub fn tool(args: TokenStream, input: TokenStream) -> TokenStream { let args = parse_macro_input!(args as MacroArgs); let input_fn = parse_macro_input!(input as ItemFn); // Extract function details let fn_name = &input_fn.sig.ident; let fn_name_str = fn_name.to_string(); // Generate PascalCase struct name from the function name let struct_name = format_ident!("{}", { fn_name_str.to_case(Case::Pascal) }); // Use provided name or function name as default let tool_name = args.name.unwrap_or(fn_name_str); let tool_description = args.description.unwrap_or_default(); // パラメータの抽出処理... // 実装の生成 let params_struct_name = format_ident!("{}Parameters", struct_name); let expanded = quote! { #[derive(serde::Deserialize, schemars::JsonSchema)] struct #params_struct_name { #(#param_defs,)* } #input_fn #[derive(Default)] struct #struct_name; #[async_trait::async_trait] impl mcp_core::handler::ToolHandler for #struct_name { fn name(&self) -> &'static str { #tool_name } fn description(&self) -> &'static str { #tool_description } fn schema(&self) -> serde_json::Value { mcp_core::handler::generate_schema::<#params_struct_name>() .expect("Failed to generate schema") } async fn call(&self, params: serde_json::Value) -> Result<serde_json::Value, mcp_core::handler::ToolError> { let params: #params_struct_name = serde_json::from_value(params) .map_err(|e| mcp_core::handler::ToolError::InvalidParameters(e.to_string()))?; // Extract parameters and call the function let result = #fn_name(#(params.#param_names,)*).await .map_err(|e| mcp_core::handler::ToolError::ExecutionError(e.to_string()))?; Ok(serde_json::to_value(result).expect("should serialize")) } } }; TokenStream::from(expanded) }
このマクロは、Rustの宣言的プログラミングの可能性を示す素晴らしい例です。
- 関数からのメタデータ抽出:関数の名前や引数リストを解析して、ツールの基本情報を自動的に取得します。 
- パラメータ構造体の自動生成:関数の引数リストから自動的にパラメータ構造体を生成し、 - serdeと- schemarsのデリバティブを適用してJSON対応にします。
- ツールハンドラの自動実装:抽出した情報を元に、 - ToolHandlerトレイトを自動的に実装します。これにより、開発者はツールのビジネスロジックだけに集中できます。
このマクロを使うと、以下のように簡潔なコードでツールを定義できます。
#[tool( name = "calculator", description = "Perform basic arithmetic operations", params( x = "First number in the calculation", y = "Second number in the calculation", operation = "The operation to perform (add, subtract, multiply, divide)" ) )] async fn calculator(x: i32, y: i32, operation: String) -> Result<i32, ToolError> { match operation.as_str() { "add" => Ok(x + y), "subtract" => Ok(x - y), "multiply" => Ok(x * y), "divide" => { if y == 0 { Err(ToolError::ExecutionError("Division by zero".into())) } else { Ok(x / y) } } _ => Err(ToolError::InvalidParameters(format!( "Unknown operation: {}", operation ))), } }
通常なら数十行のボイラープレートコードが必要なところを、このマクロによって数行のアノテーションだけで実現できています。これは開発者体験を大幅に向上させる素晴らしい例です。
Webフレームワークとの統合
MCPはしばしばWebアプリケーションと統合されます。そのための優れた実装例を見てみましょう:
async fn sse_handler(State(app): State<App>) -> Sse<impl Stream<Item = Result<Event, io::Error>>> { // it's 4KB const BUFFER_SIZE: usize = 1 << 12; let session = session_id(); tracing::debug!(%session, "sse connection"); let (c2s_read, c2s_write) = tokio::io::simplex(BUFFER_SIZE); let (s2c_read, s2c_write) = tokio::io::simplex(BUFFER_SIZE); app.txs .write() .await .insert(session.clone(), Arc::new(Mutex::new(c2s_write))); { let session = session.clone(); tokio::spawn(async move { let router = RouterService(counter::CounterRouter::new()); let server = Server::new(router); let bytes_transport = ByteTransport::new(c2s_read, s2c_write); let _result = server .run(bytes_transport) .await .inspect_err(|e| tracing::error!(?e, "server run error")); app.txs.write().await.remove(&session); }); } let stream = futures::stream::once(futures::future::ok( Event::default() .event("endpoint") .data(format!("?sessionId={session}")), )) .chain( FramedRead::new(s2c_read, common::jsonrpc_frame_codec::JsonRpcFrameCodec) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) .and_then(move |bytes| match std::str::from_utf8(&bytes) { Ok(message) => futures::future::ok(Event::default().event("message").data(message)), Err(e) => futures::future::err(io::Error::new(io::ErrorKind::InvalidData, e)), }), ); Sse::new(stream) }
この実装は、Webアプリケーションとバックエンドサービスを統合する優れた例です。
- 単方向チャネルの活用: - tokio::io::simplexを使って、クライアントからサーバーへの通信とサーバーからクライアントへの通信を分離しています。これは各方向の流れを独立して最適化できるようにします。
- バックグラウンドタスク:MCPサーバーを - tokio::spawnを使ってバックグラウンドタスクとして実行しています。これによりWebハンドラーは応答を待つことなく、すぐにSSEストリームを返すことができます。
- SSEストリームの構築: - futures::stream::onceと- .chain()を組み合わせて、初期メッセージと継続的なメッセージストリームを連結しています。これはストリーミングAPIの標準的なパターンです。
この実装パターンは、MCPサーバーをWebアプリケーションに統合する効果的な方法を示しています。特に注目すべきは、非同期処理とストリーミングを効果的に組み合わせている点です。
SDKの設計思想分析
このSDKの実装から、いくつかの重要な設計思想が読み取れます。
堅牢性と型安全性への徹底したこだわり
このSDKは、Rustの型システムを徹底的に活用して堅牢性を確保しています。
- トレイト境界(Send + Sync + 'staticなど)の明示的な指定
- ジェネリックパラメータを使ったAPI設計
- Result型による包括的なエラーハンドリング
- async/awaitと- Futureの適切な組み合わせ
これらの特徴は、SDKの開発者がRustの強みをよく理解し、それを活かそうとしていることを示しています。特に、コンパイル時に多くのエラーを捕捉できるように設計されており、実行時の予期せぬ動作を最小限に抑える工夫が随所に見られます。
拡張性と将来性を考えた設計
SDKは将来の拡張を見据えた柔軟な設計になっています。
このような設計により、MCPプロトコル自体が進化しても、SDKを大きく書き換えることなく対応できるでしょう。また、ユーザーが独自の機能を追加するための拡張ポイントが多く用意されています。
開発者体験の重視
SDKは、使いやすさにも重点を置いています。
- マクロによるボイラープレートコードの削減
- 直感的なビルダースタイルAPI
- 豊富なデフォルト実装
これらの機能は、SDKを使う開発者の負担を軽減し、本質的なビジネスロジックに集中できるようにするための工夫です。特に#[tool]マクロは、開発者体験を大幅に向上させる優れた例です。
パフォーマンスへの配慮
実装には、パフォーマンスを考慮した数々の工夫が見られます。
- 大きなバッファサイズ(2MB)の使用
- 非同期I/Oの全面採用
- ロックの最小化と効率的な並行処理
これらの最適化は、MCPが大量のデータや複雑なコンテキストを扱うAIユースケースを想定していることを示唆しています。
優れた実装パターンのまとめ
このSDKから学べる優れたRust実装パターンをまとめましょう。
1. 関心の明確な分離
SDKは複数のクレートに分かれており、各クレートが明確な責任を持っています。これは保守性と再利用性を高める優れた設計原則です。
2. トランスポート抽象化
異なる通信方法(Stdio、SSEなど)を統一的なインターフェースで扱うための抽象化は、拡張性と柔軟性の高いコードを書くための良い例です。
3. Tower Serviceパターン
Towerのサービス抽象化を活用して、ミドルウェアの組み合わせやサービス合成を容易にする設計は、現代的なRustサーバー実装のベストプラクティスです。
4. プロシージャルマクロの効果的な活用
ボイラープレートコードを削減し、宣言的なスタイルでコードを書けるようにするマクロの活用は、開発者体験を向上させる優れた方法です。
5. 非同期プログラミングのベストプラクティス
Pin、Box<dyn Future>、async_traitなどを適切に組み合わせた非同期処理の実装は、Rustの非同期プログラミングの洗練されたパターンを示しています。
おわりに
MCP Rust SDKの内部実装を深掘りすることで、Rustの強力な型システムと非同期プログラミングモデルを最大限に活用した素晴らしい設計パターンを学ぶことができました。
このSDKは、「型安全性」「拡張性」「使いやすさ」「パフォーマンス」のバランスが優れており、大規模なRustアプリケーションを設計する際の参考になります。特に、トランスポート抽象化、サービス指向設計、プロシージャルマクロの活用は、他のRustプロジェクトでも応用できる価値のある実践例です。
MCPプロトコルの実装を検討している方はもちろん、Rustでの堅牢なライブラリ設計に興味がある方にとっても、このSDKのコードベースは探求する価値のある宝庫と言えるでしょう。
次回のブログではサンプルを見ながら実際に色々動かしてみたいと思います。