Don't Repeat Yourself

Don't Repeat Yourself (DRY) is a principle of software development aimed at reducing repetition of all kinds. -- wikipedia

Rust の新しい HTTP サーバーのクレート Axum をフルに活用してサーバーサイドアプリケーション開発をしてみる

この記事は Rust Advent Calendar 25日目の記事です。Merry Christmas!

今年の Web バックエンド開発関連で一番大きかったなと思っているイベントに、Axum のリリースがあります。2021年の夏頃に tokio チームからリリースされた Web アプリケーション用のライブラリです。

基本的なデザインは actix-web 等とそこまで変わらないものの、マクロレスなのが大きな特徴かなと思います。tokio 上に直接載るアプリケーションになり、独自のランタイムをもたないため、tokio のバージョン管理に悩まされずに済むのも大きなメリットかも知れません。私はあまり重要ではないと思っていますが、明示的に #![forbid(unsafe_code)] をしているのでライブラリ内部に unsafe がないのも特徴かもしれません。

github.com

現在のんびり私が作っているアプリケーションを例に、Axum を使ってアプリケーションを開発する方法についていくつか解説をします。下記の順に説明します。

  • まず、扱うドメインが金融ということで少し特殊なため、ざっくりとどういったデータを扱うかについて解説します。
  • 次に、アプリケーションアーキテクチャの全体像を簡単に説明します。
  • さらに、いわゆるクリーンアーキテクチャ [*1] を取り入れて実装してみたので、少しだけコード側のアーキテクチャの話を書きます。
  • Axum の機能ならびに tokio の機能等をフル活用して、アプリケーションを作っていきます。

リポジトリです。

github.com

前提として、次のことを想定しています。

  • 最終的には AWS 上で動くアプリケーション。具体的には下記のサービスの利用を想定しています。
    • Aurora
    • DynamoDB
    • ECS

また、Axum のバージョンは 0.4.3 でした。multipart の機能を使用するのでオンにします。

axum = { version = "0.4.3", features = ["multipart"] }

なお、例のごとく記事が長いです。目次を掲載しておきます。ご興味のあるところをかいつまんでご覧いただいても楽しめるはずです。

ドメインの説明

今回扱う題材は株式市場とします。株式のマスターデータ(Stock)と、どのマーケットで取引される株式かを記すマスターデータ(MarketKind)を用意します。さらに、その株式の時価データ(MarketData)を取り込むことができるものとします。最終的には株式の単位で時価を閲覧できたり、あるいは移動平均ボラティリティ、ボリジャーバンドなどのいくつかのテクニカルな指標を算出します[*2]

株式のマスターデータ(Stock)には、ティッカーシンボルやその株式の名称などが記載されています。これはマスターデータで、他のデータと組み合わせてはじめて意義を発揮する類のものです。主に株式に関するメタ的な情報を保持する管理目的のテーブルです。

どのマーケットかを示すマスターデータ(MarketKind)は、その株式市場のコードと名称をもつことができるようになっています。マーケットの種類というのは、たとえば東京証券取引所大阪証券取引所などといったものが日本ではあげられると思いますが、それです。このデータは管理用のもので、最終的にはそのマーケット単位で自身がどれだけの株式を保持しているかを知るためのものです。

MarketKind を用意した実装上の意義としては、複数テーブル(リポジトリ)にまたがる実装を例示したいというものです。Webアプリケーションのサンプルコードでは1種類のテーブルを扱うものが多いですが、そうではなくnパターンを例示したいので2種類目のテーブルとしてあえて追加しているという事情があります。

時価データ(MarketData)は、ある株式の時価のデータです。日単位での取得を想定しています。たとえば、Yahooファイナンスで取得できる NIKKEI 225 の csv ファイルを想像してもらうとわかりやすいと思います。今回は始値終値、高値、安値あたりを保持して、さまざまなテクニカル指標の算出に利用します。

MarketData を用意した実装上の意義としては、ミドルウェアをまたぐ実装を例示したいというものです。このデータは DynamoDB に時系列データの形で保存することを想定しています。ただ、必ずしも DynamoDB でなければならないわけではなく、RDS でも構わないデータです。あくまで例示のための区分けです。

アプリケーションのアーキテクチャ

すでにいくつか説明済みですが、AWS の各サービスを使用してアプリケーションを組み上げることを最終的には目標としています。データの保存先には Aurora と DynamoDB 、アプリケーションを動かす基盤には ECS を想定しています。

レイヤードアーキテクチャ

今回は4層のアーキテクチャを使用しています[*3]。具体的には、driver、app、kernel、adapter の4つを採用しています。kernel と adapter は DIP (依存関係逆転の原則)が適用されています。DI は、非常に単純ないわゆるコンストラクタインジェクションを採用しています。DI コンテナなどは用いずに、シンプルにモジュールという箇所に手書きをしています。

各レイヤーの担当領域

各レイヤーの担当領域は次のように定義しています。

  • driver はおもにルーターとサーバーの起動を実装しています。このレイヤーでは Axum の機能を利用してエンドポイントとサーバーの起動までを実装します。内部的に行われた処理の結果、どのようなステータスコードを返すかをハンドリングしたり、JSONシリアライズ・デシリアライズも担当します。
  • app はいわゆるユースケースのレイヤーで、アプリケーションを動作させるために必要なロジックを記述しています。複数リポジトリをまたいでアプリケーションに必要なデータ構造を返すなどします。
  • kernel はいわゆるドメインのレイヤーで、アプリケーションのコアとなる実装を記述します。具体的にはドメインモデルの生成や、各種指標の算出ロジックの記述などを行います。
  • adapter はいわゆる外部サービスとの連携のレイヤーです。RDS との接続やクエリの発行、DynamoDB との接続や操作の実装を記述します。

上位のレイヤーからのみ下位のレイヤーを呼び出すように実装しており、逆は呼び出せないように作られています。つまり、driver は app を呼び出し、app は kernel を呼び出すようにしています。kernel と adapter は DIP されているので、kernel はトレイトのみが定義されており、実際の実装は adapter に記述されています。

Rust でレイヤードアーキテクチャをきれいに実装するには

cargo のワークスペースという機能を使用すると、少なくとも下位レイヤーから上位レイヤーを呼び出してしまうという経路の違反を防ぐことができます。もちろん常にワークスペースの使用が必要というわけではありませんが、この機能を使用すると差分のないワークスペースコンパイルを省略できたりといくつかメリットがあります。

詳しくは GitHub の実装をご覧いただくとよいかと思いますが、たとえば app レイヤーは次のように kernel と adapter への経路のみが定義されている関係で、app から driver を呼び出すことはできません。

[package]
name = "stock-metrics-app"
version = "0.1.0"
edition = "2021"

[dependencies]
stock-metrics-kernel = { path = "../stock-metrics-kernel" }
stock-metrics-adapter = { path  = "../stock-metrics-adapter" }
// 続く

Dependency Injection

コンストラクタインジェクションの採用

今回はシンプルなコンストラクタインジェクションを採用しています。構造体に依存性注入させたい構造体のフィールドを記述しておき、内部でそれを使う方式です。

ヘルスチェックの実装を例に取ると、ユースケースが内部にリポジトリをもっていることがわかります。 HealthCheckUseCase はヘルスチェックのためのユースケースであり、HealthCheckRepository はヘルスチェックというドメインで使用できるリポジトリです。

Arc は並行処理時も安全に実行できる参照カウンタです。Web アプリケーションではサーバー側で並行処理を行うことが多いため、参照カウンタ Rc ではなく、スレッドセーフな参照カウンタ Arc を用いる必要があります。

pub struct HealthCheckUseCase {
    repository: Arc<HealthCheckRepository>,
}

他の言語でいうコンストラクタのようなものを Rust で用意するには new という専用の関数を使用します。この new 関数が HealthCheckRepository を外部から受け取り、HealthCheckUseCase に渡して依存性の注入を行います。

impl HealthCheckUseCase {
    pub fn new(repository: HealthCheckRepository) -> Self {
        Self {
            repository: Arc::new(repository),
        }
    }
// 続く
}

最後に、Repository を作成する処理を記述し、UseCase を作成する new 関数を呼び出すと UseCase の生成を行うことができます。HealthCheckRepository にも new 関数があり、同様に必要な依存性をここで注入しています。

// db は RDS へのコネクションプール、dynamodb は DynamoDB へのクライアントをもつ
let health_check_use_case = HealthCheckUseCase::new(HealthCheckRepository::new(db, dynamodb));

モジュール

今回はシンプルにモジュール(Modules)という構造体を用意し、それを実質 DI コンテナとみなしています。DI コンテナを使用する際には専用のライブラリを使う手があるとは思いますが、今回作る程度の規模の小さなアプリケーションでしたら Modules にコツコツ依存性の定義を書いていく方式でも悪くはないと思います。

Modules は先ほど紹介したコンストラクタインジェクションをシンプルに記述しただけのものです。そして後述しますが、これを Axum のレイヤーと呼ばれる概念を経由して呼び出すように作っています。Moduels を記事で紹介してしまうと大変なことになるので、下記にコードの例を示すにとどめます。

pub struct Modules {
    health_check_use_case: HealthCheckUseCase,
    stock_view_use_case: StockViewUseCase<RepositoriesModule>,
    stock_use_case: StockUseCase<RepositoriesModule>,
    market_kind_use_case: MarketKindUseCase<RepositoriesModule>,
    market_data_use_case: MarketDataUseCase<RepositoriesModule>,
}

pub trait ModulesExt {
    type RepositoriesModule: RepositoriesModuleExt;

    fn health_check_use_case(&self) -> &HealthCheckUseCase;
    fn stock_view_use_case(&self) -> &StockViewUseCase<Self::RepositoriesModule>;
    fn stock_use_case(&self) -> &StockUseCase<Self::RepositoriesModule>;
    fn market_kind_use_case(&self) -> &MarketKindUseCase<Self::RepositoriesModule>;
    fn market_data_use_case(&self) -> &MarketDataUseCase<Self::RepositoriesModule>;
}

impl ModulesExt for Modules {
    type RepositoriesModule = RepositoriesModule;

    fn health_check_use_case(&self) -> &HealthCheckUseCase {
        &self.health_check_use_case
    }

    fn stock_view_use_case(&self) -> &StockViewUseCase<Self::RepositoriesModule> {
        &self.stock_view_use_case
    }

    fn stock_use_case(&self) -> &StockUseCase<Self::RepositoriesModule> {
        &self.stock_use_case
    }

    fn market_kind_use_case(&self) -> &MarketKindUseCase<Self::RepositoriesModule> {
        &self.market_kind_use_case
    }

    fn market_data_use_case(&self) -> &MarketDataUseCase<Self::RepositoriesModule> {
        &self.market_data_use_case
    }
}

impl Modules {
    pub async fn new() -> Modules {
        let db = Db::new().await;
        let client = init_client().await;
        let dynamodb = DynamoDB::new(client);

        let repositories_module = Arc::new(RepositoriesModule::new(db.clone()));

        let health_check_use_case =
            HealthCheckUseCase::new(HealthCheckRepository::new(db, dynamodb));
        let stock_view_use_case = StockViewUseCase::new(repositories_module.clone());
        let stock_use_case = StockUseCase::new(repositories_module.clone());
        let market_kind_use_case = MarketKindUseCase::new(repositories_module.clone());
        let market_data_use_case = MarketDataUseCase::new(repositories_module.clone());

        Self {
            health_check_use_case,
            stock_view_use_case,
            stock_use_case,
            market_kind_use_case,
            market_data_use_case,
        }
    }
}

DIP (依存関係逆転の原則)

今回のコードでは kernel と adapter に対して DIP が適用されている関係で、kernel に Repository のトレイトのみが用意され、実際の実装は adapter で行われるという形になっています。app レイヤーで最終的に呼び出されるのは kernel のみになります。

DIP 適用のメリットは、仮にデータソースをまたいだ CRUD 操作をひとつのドメインに対して行いたい際に同じインターフェースで扱えることや、データソースの変更が行われたとしても、ドメインレイヤーやアプリケーションレイヤーの実装には影響が出ないといった保守運用のしやすさにつながる点です。

kernel では、Repository は次のようにインタフェースのみが用意されています。MarketKind の Repository を例に取ります。

#[async_trait]
pub trait MarketKindRepository {
    async fn find(&self, id: &Id<MarketKind>) -> anyhow::Result<Option<MarketKind>>;
    async fn insert(&self, source: NewMarketKind) -> anyhow::Result<Id<MarketKind>>;
}

実装は adapter に存在します。DatabaseRepositoryImpl という型は、各ドメインモデルに対する Repository の実装を意味します。

impl MarketKindRepository for DatabaseRepositoryImpl<MarketKind> {
    async fn find(&self, id: &Id<MarketKind>) -> anyhow::Result<Option<MarketKind>> {
        let pool = self.pool.0.clone();
        let market_kind = query_as::<_, MarketKindTable>("select * from market_kind where id = ?")
            .bind(id.value.to_string())
            .fetch_one(&*pool)
            .await
            .ok();
        match market_kind {
            Some(mk) => {
                let mk = mk.try_into()?;
                Ok(Some(mk))
            }
            None => Ok(None),
        }
    }
// 以下、インタフェースを満たすのに必要なだけ。
}

このアプリケーションでは最終的に app の UseCase で Repository は呼び出されますが、UseCase は kernel のインタフェースのみを知っていれば adapter の実装がどうであれ呼び出し可能です。依存関係の定義の関係で、DI コンテナとして使用している Modules は adapter に設置する必要はありますが、Repository に属する関数の呼び出し時に必要なのは kernel の情報のみであることがわかります。

use std::sync::Arc;

use derive_new::new;
use stock_metrics_adapter::modules::RepositoriesModuleExt;
use stock_metrics_kernel::repository::{market_kind::MarketKindRepository, stock::StockRepository};

use crate::model::stock_view::StockView;

#[derive(new)]
pub struct StockViewUseCase<R: RepositoriesModuleExt> {
    repositories: Arc<R>,
}

impl<R: RepositoriesModuleExt> StockViewUseCase<R> {
    pub async fn show_specific_stock(&self, id: String) -> anyhow::Result<Option<StockView>> {
        let stock = self
            .repositories
            .stock_repository()
            .find(id.try_into()?)
            .await?;
        match stock {
            Some(stock) => {
                let market_kind = self
                    .repositories
                    .market_kind_repository()
                    .find(&stock.market_kind)
                    .await?;
                Ok(market_kind.map(|market_kind| StockView::new(stock, market_kind)))
            }
            None => Ok(None),
        }
    }
}

採用した以外の方法

ひとつは Cake Pattern を用いた実装が考えられます。これは同様にトレイトを採用している Scala でよく知られた DI の手法です。ただ、ボイラープレートが増えて少し好みがわかれる方法ではあると思います。

また、DI 用のライブラリがあるといえばあるので、それを利用する方法も考えられます。が、私は Rust ではこうしたライブラリは使用したことがなく、どれがいいかなどの情報は持ち合わせていません。デファクトスタンダードのようなライブラリもないように見受けられます。

使用したクレート

主なクレートを少しだけ紹介します。

  • axum: これから紹介する予定の Web サーバーを実装するためのクレートです。
  • sqlx: 今回 MySQL との接続を行いますが、そのために使用したクレートです。
  • aws-sdk-rust: DynamoDB 関連の接続のための使用したクレートです。

その他のクレートは、リポジトリ内の Cargo.toml をご覧ください。

Axum をフル活用してアプリケーションを実装する

今回実装したアプリケーションがどのような構造をもつアプリケーションだったかを簡単にご紹介しました。株式の管理に関するもので、4層のレイヤードアーキテクチャをしたアプリケーションでした。

ここから、Axum のもつ機能や tokio が提供する機能を使用して、本番使用しても問題ないようなアプリケーションを実装できるのかを見ていきます。

ルーティングの基本的な定義方法

単純にパスを指定するとステータスコードを返すだけのエンドポイントから、REST でよく行われるパスに ID を含むリクエストを送るものや、JSON のやりとりをするものといった代表的なパターンを簡単にご紹介します。

Router

まず、Axum ではエンドポイントの管理を Router という構造体が担っています。この構造体はいわゆるビルダーパターンのようになっていて、route という関数を経由してエンドポイント情報を登録させます。

Axum にはルーターの他に、ハンドラ(Handler)という概念があります。詳しい実装については後述しますが、このハンドラを各 HTTP メソッドを示す関数に渡すことで、裏でエンドポイントが Router に登録されます。これを繰り返してエンドポイントを登録していき、サーバーが所定のパスへのアクセスに対してレスポンスを返すことができるようになります。

最終的にこの Router は、サービス(Service)に直されサーバーの起動時に使用されます。

use crate::{
    module::Modules,
    routes::{
        health::{hc, hc_db, hc_dynamo},
        market_data::upload_market_data,
        market_kind::create_market_kind,
        stock_view::{create_stock, stock_view},
    },
};
use axum::{
    handler::{get, post},
    AddExtensionLayer, Router,
};
use dotenv::dotenv;
use std::{net::SocketAddr, sync::Arc};

pub async fn startup(modules: Arc<Modules>) {
    let hc_router = Router::new()
        // `get` 関数にハンドラを入れ、GET 用のエンドポイントを作る。
        // `hc` が「ハンドラ」にあたる。`hc` の中身については後述する。
        // POST の場合は `post` など、各リクエストメソッドに対応した関数がある。
        .route("/", get(hc))
        .route("/db", get(hc_db))
        .route("/dynamo", get(hc_dynamo));
    let stocks_router = Router::new()
        .route("/", post(create_stock))
        .route("/:id", get(stock_view));
    let market_kind_router = Router::new().route("/", post(create_market_kind));
    let market_data_router = Router::new().route("/:stock_id", post(upload_market_data));

    let app = Router::new()
        .nest("/hc", hc_router)
        .nest("/stocks", stocks_router)
        .nest("/market_kind", market_kind_router)
        .nest("/market_data", market_data_router)
        .layer(AddExtensionLayer::new(modules));

    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));

    tracing::info!("Server listening on {}", addr);

    axum::Server::bind(&addr)
        // 最後ルーターはサービスに直されてサーバーの起動時に使われる
        .serve(app.into_make_service())
        .await
        .unwrap_or_else(|_| panic!("Server cannot launch!"))
}

単純なルーティング

もっとも単純なルーティングとして、/hc にアクセスすると成功のステータスコードを返すというアプリケーションのヘルスチェックのためのハンドラを用意しています。それは、次のように記述できます。

pub async fn hc() -> impl IntoResponse {
    // tracing::debug! マクロについては後ほど説明するが、tracing の機能。
    tracing::debug!("Access health check endpoint from user!");
    StatusCode::NO_CONTENT
}

impl IntoResponse についてですが、IntoResponse はトレイトで、このトレイトを実装した型であれば返すことができます。つまり、StatusCode には IntoResponse が実装されています。IntoResponse トレイトを実装すると、自分のカスタムレスポンスを返すことができるようになります。

この記事では、StatusCode とレスポンスボディが含まれる場合はそのレスポンスボディのタプルを返しているケースが多いですが、struct Response のような独自型を定義して、タプルを使用せず別の方法をとることもできると思います。その際、Response 型に IntoResponse の実装を記述しておけば問題なく取り扱いできるはずです。

パスに ID のような変数を含むもの

たとえば /stock/:id というエンドポイントを作りたいときです。:id の部分に ID を入れるとそれをルーティング時にパースして ID として別個認識し、取り出せるというものです。他のクレートでもある機能は Axum にも同様にあります。

Path という構造体を用いるとパスの取り出しを実装できます。なぜこの記述だけで取り出せるかについては、のちの「JSON を含むリクエストの受け取りを含むもの」でも説明しますが FromRequest というトレイトを実装しておりそれが裏で絡んでいるからです。

Extension については後述しますが、これは依存性を注入したり、アプリケーション全体で保持させたい状態をハンドラ間(ルーター間)で引き回す際に使用できるものです。今回私が作ったアプリケーションでは、ほぼすべてのハンドラに登場し、ここからユースケースの取り出し等を行っています。

// ルーターの定義方法は次の通り。
// `/stocks/1` とアクセスすると、`:id` に `1` が入る。
let stocks_router = Router::new()
        // 間にいろいろ
    .route("/stocks/:id", get(stock_view));
pub async fn stock_view(
    Path(id): Path<String>,
    Extension(modules): Extension<Arc<Modules>>,
) -> Result<impl IntoResponse, StatusCode> {
    let res = modules.stock_view_use_case().show_specific_stock(id).await;
    match res {
        Ok(sv) => sv
            .map(|sv| {
                let json: JsonStockView = sv.into();
                (StatusCode::OK, Json(json))
            })
            .ok_or_else(|| StatusCode::NOT_FOUND),
        Err(err) => {
            error!("Unexpected error: {:?}", err);
            Err(StatusCode::INTERNAL_SERVER_ERROR)
        }
    }
}

JSON を含むリクエストの受け取りを含むもの

JSON を含むリクエストとしてありえるのは、たとえば POST メソッドを使って新規リソースを作成する際、JSON で登録内容を送信する方法です。サーバーで JSON を解析し、その内容を元に各ストレージレイヤーにアクセスしてデータを保存するというのは、Web アプリケーションでは一般的な方法かと思われます。

Axum は Serde クレートの Deserialize が実装された構造体等であれば、裏で自動でパースします。もっともシンプルな記述は下記のようにできます。単に関数の引数にデシリアライズしたい構造体をもたせておくと、Axum が裏で自動で構造体に値を詰めます。

// 別ファイルに記述されている想定。
#[derive(Deserialize, Debug)]
pub struct JsonCreateStock {
    name: String,
    ticker_symbol: String,
    market_kind: String,
}

// 引数に JsonCreateStock を入れておくと、裏で JSON パースしてくれる
pub async fn create_stock(
    source: JsonCreateStock,
    Extension(modules): Extension<Arc<Modules>>,
) -> Result<impl IntoResponse, StatusCode> {
    let res = modules.stock_use_case().register_stock(source.into()).await;
    res.map(|_| StatusCode::CREATED).map_err(|err| {
        error!("Unexpected error: {:?}", err);
        StatusCode::INTERNAL_SERVER_ERROR
    })
}

JSON の各フィールドの値を受け取りのタイミングでバリデーションチェックすることができます。Axum のリクエストは FromRequest というトレイトでいろいろ処理を追加すると、自身でカスタマイズすることができるようになっています。下記は validator というクレートと組み合わせて、フィールドの値のバリデーションチェックを追加してみた例です。

// Validate を derive する。
#[derive(Deserialize, Debug, Validate)]
pub struct JsonCreateStock {
    // validator クレートが提供するマクロ。
    // 詳しくは validator というクレートを参照。
    #[validate(length(min = 1, max = 255))]
    name: String,
    #[validate(length(min = 1, max = 255))]
    ticker_symbol: String,
    market_kind: String,
}
use thiserror::Error;

// 今回使用するためのエラー用の型。
#[derive(Debug, Error)]
pub enum AppError {
    #[error(transparent)]
    Validation(#[from] validator::ValidationErrors),
    #[error(transparent)]
    JsonRejection(#[from] axum::extract::rejection::JsonRejection),
}

impl IntoResponse for AppError {
    fn into_response(self) -> Response {
        match self {
            AppError::Validation(_) => {
                let msg = format!("Input validation error: [{}]", self).replace('\n', ", ");
                (StatusCode::BAD_REQUEST, msg)
            }
            AppError::JsonRejection(_) => (StatusCode::BAD_REQUEST, self.to_string()),
        }
        .into_response()
    }
}

// FromRequest は、リクエストから特定の型への取り出し処理を実装することができる。
// これを使って、カスタマイズしたリクエスト用の型を定義できたりする。

#[async_trait]
impl<T, B> FromRequest<B> for ValidatedRequest<T>
where
    T: DeserializeOwned + Validate,
    B: http_body::Body + Send,
    B::Data: Send,
    B::Error: Into<BoxError>,
{
    type Rejection = AppError;

    async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
        let Json(value) = Json::<T>::from_request(req).await?;
        value.validate()?;
        Ok(ValidatedRequest(value))
    }
}

これらをまとめると、先ほどの create_stock 関数は次のようにアップデートされます。リクエストが送られてきた際にバリデーションチェックが先に走り、不正なリクエストは 400 で返します。JSON のフィールドに対するバリデーションチェックは、ルーターの内部でやりたい(たとえばドメインレイヤーなど)ケースも多いとは思うので常に使ったらよいというわけではありませんが、考え方のひとつとして持っておくと実装の幅が広がりそうです。

pub async fn create_stock(
    ValidatedRequest(source): ValidatedRequest<JsonCreateStock>,
    Extension(modules): Extension<Arc<Modules>>,
) -> Result<impl IntoResponse, StatusCode> {
    let res = modules.stock_use_case().register_stock(source.into()).await;
    res.map(|_| StatusCode::CREATED).map_err(|err| {
        error!("Unexpected error: {:?}", err);
        StatusCode::INTERNAL_SERVER_ERROR
    })
}

JSON を HTTP レスポンスボディに含むレスポンスを返すもの

JSON をレスポンスボディに含むレスポンスを返すには、Json という型に包んで返します。この型に包むことができる型は、Serde の Serialize を derive したもののみです。

下記は少し実装を隠蔽してしまっているのでわかりにくいかもしれませんが、JSON を返すハンドラの例です。JsonStockView という型は Serialize を derive しており Json に包むことが可能です。

#[derive(Serialize)]
pub struct JsonStockView {
    id: String,
    name: String,
    ticker_symbol: String,
    market_kind: String,
}

pub async fn stock_view(
    Path(id): Path<String>,
    Extension(modules): Extension<Arc<Modules>>,
) -> Result<impl IntoResponse, StatusCode> {
    let res = modules.stock_view_use_case().show_specific_stock(id).await;
    match res {
        Ok(sv) => sv
            .map(|sv| {
                let json: JsonStockView = sv.into();
                (StatusCode::OK, Json(json))
            })
            .ok_or_else(|| StatusCode::NOT_FOUND),
        Err(err) => {
            error!("Unexpected error: {:?}", err);
            Err(StatusCode::INTERNAL_SERVER_ERROR)
        }
    }
}

DI の扱い方

Axum にはレイヤーという機能があり、これを使うことで DI を実現できます。先ほどから登場していますが Extension という型がそれにあたります。Extension はハンドラ間で状態の共有をするために使われる機能ですが、これを用いると DI を管理できます。

今回私の作ったアプリケーションでは、Modules という構造体で実質的に DI コンテナの役割を果たすように作っています。なので、この ModulesExtension を経由して各ハンドラに配布しておけば、そこから自由に必要なモジュールの呼び出しをできるようになっています。

各ハンドラでは先ほどからも登場している通り、次のように Extension の引数での定義と呼び出しができます。

pub async fn hc_db(
    Extension(module): Extension<Arc<Modules>>,
) -> Result<impl IntoResponse, StatusCode> {
    module
        .health_check_use_case()
        .diagnose_db_conn()
        .await
        .map(|_| StatusCode::NO_CONTENT)
        .map_err(|err| {
            error!("{:?}", err);
            StatusCode::SERVICE_UNAVAILABLE
        })
}

Extension に渡される Arc<Modules> はどのタイミングで渡されているかと言うと、ルーターです。Routerlayer という関数が用意されており、そこに渡すとあとは裏側で自動で各ハンドラに配ってくれるようです。

// Modules は起動の瞬間に new して、startup 関数に渡す
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    init_app();

    let modules = Modules::new().await;
    let _ = startup(Arc::new(modules)).await;

    Ok(())
}

pub async fn startup(modules: Arc<Modules>) {
    let hc_router = Router::new()
        .route("/", get(hc))
        .route("/db", get(hc_db))
        .route("/dynamo", get(hc_dynamo));
    let stocks_router = Router::new()
        .route("/", post(create_stock))
        .route("/:id", get(stock_view));
    let market_kind_router = Router::new().route("/", post(create_market_kind));
    let market_data_router = Router::new().route("/:stock_id", post(upload_market_data));

    let app = Router::new()
        .nest("/hc", hc_router)
        .nest("/stocks", stocks_router)
        .nest("/market_kind", market_kind_router)
        .nest("/market_data", market_data_router)
        // ↓ここで Modules を入れ込む。
        .layer(AddExtensionLayer::new(modules));

    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));

    tracing::info!("Server listening on {}", addr);

    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap_or_else(|_| panic!("Server cannot launch!"))
}

multipart/formdata の扱い方

multipart も扱えます。扱う際には、Axum の multipart 用の機能をオンにする必要があります。Cargo.toml の Axum の依存を次のように変えます。

axum = { version = "0.4.3", features = ["multipart"] }

csv ファイル形式の時価データをアップロードする機能の実装です。この部分では csv というクレートを使用して csv ファイルの読み込みを行い、中身をアプリケーション内部で扱うことができる構造体の形に詰め直しています。csv ファイルのサイズの上限はとくに考えていなかったため、一旦 100MB としています[*4]

pub async fn upload_market_data(
    Path(stock_id): Path<String>,
    ContentLengthLimit(mut multipart): ContentLengthLimit<Multipart, { 100 * 1024 * 1024 }>,
    Extension(modules): Extension<Arc<Modules>>,
) -> Result<impl IntoResponse, StatusCode> {
    while let Some(field) = multipart.next_field().await.unwrap() {
        let bytes = field.bytes().await.unwrap();
        let bytes = String::from_utf8(bytes.to_vec()).unwrap();

        let mut rdr = ReaderBuilder::new()
            .has_headers(true)
            .from_reader(bytes.as_bytes());

        let mut market_data_list = Vec::new();
        for result in rdr.deserialize() {
            let market_data: CsvMarketData = result.unwrap();
            market_data_list.push(market_data);
        }

        let market_data_list: Vec<MarketData> = market_data_list
            .into_iter()
            .filter_map(|csv| csv.to_market_data(&stock_id).ok())
            .collect();

        let _ = modules
            .market_data_use_case()
            .register_market_data(market_data_list)
            .map_err(|err| {
                error!("Unexpected error: {:?}", err);
                StatusCode::INTERNAL_SERVER_ERROR
            })?;
    }
    Ok(StatusCode::CREATED)
}

tracing を使ってトレーシングをする

アプリケーションは作って終了というわけではなく、運用する必要があります。運用には監視が必要なわけですが、そのためにはよいロギングができるとよいです。どこのハンドラの処理で吐き出されたログかがわかるととてもよいですよね。

tracing を使用すると、プロセス内部の処理を追いかけることができるようになります。Zipkin や Jaeger などが類似のライブラリとしては代表的でしょうか。tokio が提供するトレーシング用のクレートで、tokio を使用したアプリケーションを作っているのであれば、入れておいて損はないと思います。詳しい実装方法などについてはこの記事がよいと思います。

Axum は tokio チームが作っているというのもあるからか、tokio の周辺ツールと非常にシームレスに連携できます。ドキュメントのとおりに tracing に関するマクロを追加するだけで実装できます。tracing そのものは actix-web などのクレートにも対応してはいます。

もっともシンプルな例ですが、ヘルスチェックに次のようなデバッグログを仕込んでみます。

pub async fn hc() -> impl IntoResponse {
    tracing::debug!("Access health check endpoint from user!");
    StatusCode::NO_CONTENT
}

これをサーバーで動かすと、ログの結果として次のように出力できます。

2021-12-25T15:08:39.484032Z DEBUG stock_metrics_driver::routes::health: Access health check endpoint from user!

さらに、トレーシングの文脈でいうスパン(span)のような機能もあります。スパンは #[tracing::instrument] というマクロを関数に追加すると、裏で実行できるようになっています。例として、MarketKind を作成するハンドラに追加して、様子を見てみます。

// skip(module) は、関数の引数のうちロギング対象としたくない引数を定義しておくと、ロギングしなくなる。
#[tracing::instrument(skip(modules))]
pub async fn create_market_kind(
    ValidatedRequest(source): ValidatedRequest<JsonCreateMarketKind>,
    Extension(modules): Extension<Arc<Modules>>,
) -> Result<impl IntoResponse, StatusCode> {
    let res = modules
        .market_kind_use_case()
        .register_market_kind(source.into())
        .await;
    res.map(|id| (StatusCode::CREATED, id)).map_err(|err| {
        error!("Unexpected error: {:?}", err);
        StatusCode::INTERNAL_SERVER_ERROR
    })
}

このエンドポイントに、次のような JSON 定義をもつ HTTP リクエストを送信してみます。

{
    "name": "東京証券取引所",
    "code": "TSE"
}

sqlx に関する INFO ログが出力されていますが、その情報の中に先ほど関数の引数にあった JsonCreateMarketKind の情報が追加されています。今回はかなり雑にログを吐き出させてしまっているのでわかりにくいかもしれませんが、ログの情報を何段階か追加することで、処理がどのように行われていっているかをトレーシングの結果から追いかけやすくなるはずです。

...
2021-12-25T15:13:24.934399Z DEBUG hyper::proto::h1::conn: incoming body completed
2021-12-25T15:13:25.150209Z  INFO create_market_kind{source=JsonCreateMarketKind { code: "TSE", name: "東京証券取引所" }}: sqlx::query: insert into market_kind (id, …; rows: 0, elapsed: 208.464ms

insert into
  market_kind (id, code, name, created_at, updated_at)
values
  (?, ?, ?, ?, ?)
  
2021-12-25T15:13:25.150468Z DEBUG hyper::proto::h1::io: flushed 133 bytes
...

トレーシングそのものは、マイクロサービス化に応じて経路が複雑化し、監視が難しくなりつつある現代のアプリケーションでは必須の機能だと思っています。とくに本番運用するとなると、この機能を楽に使用できるかどうかはアプリケーションエンジニアにとっては重要な話題です。tracing は思った以上にシームレスに Axum と連携でき、運用面でも費用対効果のよいアプリケーションを実装しやすくなっていると感じました。

Tips

DTO (Data Transfer Object) の詰め替えには From/TryFrom を実装

レイヤードアーキテクチャにすると、レイヤー間でオブジェクトの詰替えのような作業を行うことがあります。これはたとえば下位レイヤーが上位レイヤーに依存しないという規則を守らせる際にとくに重要で、上位レイヤーのうちに下位レイヤーのデータ構造に直してから引数に渡すといった作業が必要になります。今回作ったアプリケーションでも、app レイヤーから kernel レイヤーへのデータの変換などが行われています。

その際に便利なのが Rust の標準ライブラリに入っている FromTryFrom といったトレイトです。これを構造体へ追加で実装しておくだけで、from()into() といった関数を呼び出して DTO の変換をかけられるようになります。

下記は、MarketKind を新規作成する際に app レイヤー用のデータから kernel レイヤー用のデータへ変換する作業を行っている例です。

#[derive(new)]
pub struct CreateMarketKind {
    pub code: i32,
    pub name: String,
}

impl TryFrom<CreateMarketKind> for NewMarketKind {
    type Error = anyhow::Error;
    fn try_from(c: CreateMarketKind) -> anyhow::Result<Self> {
        let market_kind_id = Id::gen();
        Ok(NewMarketKind::new(
            market_kind_id,
            MarketCode(c.code),
            MarketName(c.name),
        ))
    }
}

呼び出し自体は次のようにして済ませられます。try_into() 関数を呼び出すだけです。

impl<R: RepositoriesModuleExt> MarketKindUseCase<R> {
    pub async fn register_market_kind(&self, source: CreateMarketKind) -> anyhow::Result<String> {
        self.repositories
            .market_kind_repository()
            .insert(source.try_into()?)
            .await
            .map(|id| id.value.to_string())
    }
}

各関数の Result 型は anyhow::Result にしておき、エラーの定義には thiserror を使う

今回書いたコードの至るところに登場しているイディオムですが、各関数の Result 型は anyhow::Result にしておき、独自にエラー型を定義した際には thiserror で拡張するという方法ととっています。

今回は独自のエラー型をビジネスロジックごとに正しく定義するという手間を省くために anyhow! マクロを使ってエラー型を作成しています。

しかし、本来エラー型は、エラーになる異常処理のケース単位で型を用意しておくことが多いかと思います。その際には thiserror でエラーメッセージを付与したり、サードパーティライブラリのエラー型を吸収したりします。

ID 用の型を節約する

エンティティに使用される ID 型は、エンティティの数が増えるにつれてだんだん構造体の定義型自体も増えていく傾向にあります。たとえば、下記のように Stock に使用したい ID 型を用意すると次のように StockId などという構造体を用意するかもしれません。ここにさらに MarketData に使用する ID 型を増やすとなると、MarketDataId のような構造体を追加する必要が出てきます。

pub struct StockId(pub Ulid);

都度構造体を追加するのは少し手間かもしれません。

そこで、PhantomData を活用したイディオムがあります。T には Id を使用する元のエンティティ名が入ります。つまり、Stock というエンティティに使用するなら Id<Stock> のような型になります。

#[derive(new, Debug, Clone)]
pub struct Id<T> {
    pub value: Ulid,
    _marker: PhantomData<T>,
}

すべてのパターンの ID 型で共通する実装は、T 型に対する実装を用意しておけばよいです。

impl<T> Id<T> {
    pub fn gen() -> Id<T> {
        Id::new(Ulid::new())
    }
}

個別のエンティティに対する実装を生やしたくなったら、エンティティの型をはめた実装を用意すればよいです。

impl Id<Stock> {
    pub fn id(&self) -> String {
        self.value.to_string()
    }
}

あるいは、マクロを利用して構造体を自動生成させてしまう手も考えられます。今回は実装はしていませんが、好みに応じて使い分けられるかもしれません。

トレイトを使用した DI に使用される型引数を削る

Rust でトレイトを使用した DI では、動的ディスパッチを使用する方法と静的ディスパッチを使用する方法の2つがあると思います。

  • 動的ディスパッチ: Box<dyn TraitName>Arc<dyn TraitName> など。
  • 静的ディスパッチ: 型引数にトレイト側の名前を入れておく。あるいは、関連型を使用する。

たとえば動的ディスパッチの場合は下記のように、

struct SomeUseCase {
    repo_a: Arc<dyn RepositoryA>,
    repo_b: Arc<dyn RepositoryB>,
}

静的ディスパッチの場合は下記のようにといった具合です。

struct SomeUseCase<A: RepositoryA, B: RepositoryB> {
    repo_a: A,
    repo_b: B,
}

動的ディスパッチの場合は、型が複雑になりすぎずに済み型の解決が比較的楽になる傾向にあると思います。最近だと Wasmer という OSS の実装で、動的ディスパッチをあえて積極的に使っているようなコードをみた記憶があります。

一方で、動的ディスパッチには静的ディスパッチと比較するとオーバーヘッドが発生したり、Web アプリケーションの場合は、Axum がそうだと思うのですが、動的ディスパッチ用のトレイト実装がいくつか生えておらず、そもそも対応していないのでコンパイルエラーといったケースがあるように見受けられます。

静的ディスパッチの場合は速度面は問題ないですし、他の Web アプリケーション構築用のクレートが対応しているように見えるので、Rust では最もオーソドックスな手法かと思われます。

一方で、先ほど示した UseCase に対する DI の例からもわかるように、依存するフィールドが増えるとその分型引数名が増えるというデメリットがあると思います。関連型を使用する手はひとつ、型引数の数を減らすのに有力な方法ではあると思うのですが、その分余分なトレイトを1枚噛ませる必要が出てくるなど、手間が増えそうです。

静的ディスパッチしつつ型引数を増やさないためには、リポジトリの依存をまとめたモジュール(ModulesDependencies みたいな名前)を作って、それを経由して呼び出すという方法がひとつ考えられます。下記のような実装を用意しておき、それを UseCase の型引数にひとつ与えるだけでよくなります。

pub struct RepositoriesModule {
    stock_repository: DatabaseRepositoryImpl<Stock>,
    market_kind_repository: DatabaseRepositoryImpl<MarketKind>,
}

pub trait RepositoriesModuleExt {
    type StockRepo: StockRepository;
    type MarketKindRepo: MarketKindRepository;
    fn stock_repository(&self) -> &Self::StockRepo;
    fn market_kind_repository(&self) -> &Self::MarketKindRepo;
}

impl RepositoriesModuleExt for RepositoriesModule {
    type StockRepo = DatabaseRepositoryImpl<Stock>;
    type MarketKindRepo = DatabaseRepositoryImpl<MarketKind>;

    fn stock_repository(&self) -> &Self::StockRepo {
        &self.stock_repository
    }

    fn market_kind_repository(&self) -> &Self::MarketKindRepo {
        &self.market_kind_repository
    }
}

この処理は StockRepositoryMarketKindReposiory の2つのリポジトリを呼び出す必要のあるコードですが、型引数は1つで済むようになりました。

pub struct StockViewUseCase<R: RepositoriesModuleExt> {
    repositories: Arc<R>,
}

impl<R: RepositoriesModuleExt> StockViewUseCase<R> {
    pub async fn show_specific_stock(&self, id: String) -> anyhow::Result<Option<StockView>> {
        let stock = self
            .repositories
            .stock_repository()
            .find(id.try_into()?)
            .await?;
        match stock {
            Some(stock) => {
                let market_kind = self
                    .repositories
                    .market_kind_repository()
                    .find(&stock.market_kind)
                    .await?;
                Ok(market_kind.map(|market_kind| StockView::new(stock, market_kind)))
            }
            None => Ok(None),
        }
    }
}

デメリットとしては、モジュールに大きくリポジトリの依存をまとめてしまうと、そのモジュールで管理するリポジトリの量が増えたときに大変です。また、Stock から Bondリポジトリは呼び出せしまってはいけないのだが、なぜか呼び出せてしまうミスなどが発生してしまうことは考えられます。その際は、適切な量に再度モジュールを切り直す必要があるでしょう。がいずれにせよ、リポジトリひとつひとつの型引数を UseCase に渡し続けるよりかは、かなり状況はよくなると思います。

for ループやパターンマッチングより filter や map などのアダプタを使う

私は下記のようにパターンマッチングでもよいケースや for ループでもよいケースであっても、可能な限りアダプタ(mapmap_err を Rust ではそう呼びます)を採用するようにしています。これを用いると、変数のスコープを狭めることができるようになり、 Rust 特有の所有権やライフタイム問題を起こさずコーディングしやすくなるかなと体感的に思っています。

pub async fn delete_market_kind(
    Path(id): Path<String>,
    Extension(modules): Extension<Arc<Modules>>,
) -> Result<impl IntoResponse, StatusCode> {
    modules
        .market_kind_use_case()
        .delete_market_kind(id)
        .await
        .map(|_| StatusCode::NO_CONTENT)
        .map_err(|err| {
            error!("Unexpected error: {:?}", err);
            StatusCode::INTERNAL_SERVER_ERROR
        })
}

また、Rust 特有の事情として、for ループは所有権とライフタイムがループ1回ごとにチェックされるので、他の言語ではコンパイルが通るケースも通らなくなることがある、という難しさがあると思っています。1回ループを回すとその時点で内部で確保した変数の所有権はすべて解放されてしまうので、追加で clone が必要になったり、必要以上に Rc する必要が出てきたりとだんだんコードが難しくなります。が、アダプタを使用した際にはこのような問題は起こりにくくなります。

アダプタの使いこなしは慣れが必要ですが、宣言的にコードを記述できるようになるためコードの見通しがよくなると思います。また、アダプタはコンパイラ側の最適化が走りやすく、ゼロコスト抽象化の恩恵をより受けられやすくなるなどのメリットもあるようです。

まとめ

普段 Web アプリケーションを作っている者から見ても、Axum ならびに Rust の Web のエコシステムがどんどん充実してきていることが改めてわかりました。ただ一方で、「他の言語だったらこれあるのにな」と感じたものはまだまだなかったり、突然クレートのバージョンを上げるとコンパイルエラーが出始めるケースなど、枯れているとは言い切れないのも事実だと思います。

また、今回は記事では紹介しなかったのですが、Axum 使用中にもハマりポイントが何箇所かありました。が、まだクレートそのものが出たてで Axum の使用例が少なく、「このワークアラウンドで正しいのだろうか」と思う場面も多々ありました。この記事は、そうしたワークアラウンドの紹介のためのものでもあります。

実装そのものはまだ未完成の段階で、一旦未完成ながらも紹介可能な箇所を紹介しました。examples ディレクトリの充実やテストの充実なども今後必要だと思います。

記事が長くなりすぎてしまったので、Zenn で本を書いて出そうかなと思い始めました。

みなさまよいお年を!

*1:レイヤードアーキテクチャといったほうが正しいのかもしれません。ドライバ、アプリ、カーネル、アダプタの4層のレイヤーをもつアプリケーションを実装しています。

*2:ただし、執筆時点ではテクニカル指標の算出はまだ実装できていません。

*3:処理内容は複雑ではないものの4層にしてしまったので、単に面倒臭さを増しただけと思っていますが、今回は試してみたかったからということで。

*4:この部分は同期的にやってしまうとレスポンスが返るまでクライアントを止めてしまう可能性があるので、キューに詰めてタスク管理させて別のサーバーで処理させるなどしたいなとは思っています。