cadmus_core/db/migrations/
mod.rs

1use crate::db::types::UnixTimestamp;
2use anyhow::Error;
3use once_cell::sync::Lazy;
4use sqlx::SqlitePool;
5use std::collections::HashMap;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Mutex;
9
10type MigrationFn =
11    for<'a> fn(&'a SqlitePool) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>;
12
13static REGISTRY: Lazy<Mutex<HashMap<&'static str, MigrationFn>>> =
14    Lazy::new(|| Mutex::new(HashMap::new()));
15
16/// Registers an `async fn` as a Cadmus runtime database migration.
17///
18/// The macro takes a stable string literal ID and an `async fn` definition.
19/// The function is registered into the global migration registry automatically
20/// before `main` runs, via `ctor`.
21///
22/// Migrations can be defined anywhere inside `cadmus-core` — co-locate them
23/// with the feature they belong to (e.g. `library/migrations.rs`).
24///
25/// Doc comments placed before the `async fn` are forwarded onto the generated
26/// function and the generated submodule, making them visible in rustdoc.
27/// The macro also appends the migration ID and a rerun SQL snippet
28/// automatically.
29///
30/// # Arguments
31///
32/// * `$id` — Stable string literal identifying this migration in
33///   `_cadmus_migrations`. Never change after deployment.
34/// * The `async fn` — Must accept `&SqlitePool` and return
35///   `Result<(), anyhow::Error>`.
36///
37/// The generated inner module is named after `$name`, so `$name` must be
38/// unique across all `migration!` calls in the crate.
39///
40/// # Use Cases
41///
42/// Migrations are one-time operations that run once per deployment. They can:
43/// - **Transform data** — backfill metadata, normalize formats, compute derived values
44/// - **Import legacy data** — load from filesystem/config into the database
45/// - **Cleanup** — prune obsolete rows, remove temporary data, optimize storage
46/// - **Any procedural setup** — not limited to SQL; can call filesystem operations,
47///   external APIs, or other async code
48///
49/// **Note:** Schema updates (creating/altering tables) should be handled via `.sql`
50/// files in `crates/core/migrations/`, not runtime migrations. SQL migrations are
51/// applied by `sqlx-cli` or the `migrate!` macro before runtime migrations run.
52///
53/// # SQL Best Practices
54///
55/// When executing SQL in migrations, use SQLx's typed query macros for compile-time
56/// verification:
57///
58/// - Use `sqlx::query!` for `INSERT`, `UPDATE`, `DELETE`, and `SELECT` returning rows
59/// - Use `sqlx::query_as!` when mapping results into a named struct
60/// - Use `sqlx::query_scalar!` for single-column results
61/// - Pass `&mut **tx` (double deref) when executing against a transaction
62///
63/// # Example
64///
65/// ```rust
66/// mod my_migrations {
67///     use sqlx::SqlitePool;
68///
69///     cadmus_core::migration!(
70///         /// Backfills metadata from legacy storage.
71///         "v1_backfill_metadata",
72///         async fn backfill_metadata(pool: &SqlitePool) {
73///             // sqlx::query!(...).execute(pool).await?;
74///             Ok(())
75///         }
76///     );
77/// }
78/// ```
79#[macro_export]
80macro_rules! migration {
81    ($(#[$attr:meta])* $id:literal, async fn $name:ident($pool:ident : &$pool_ty:ty) $body:block) => {
82        $crate::migration!(@internal $(#[$attr])* $id, $name, $pool, $body);
83    };
84    (@internal $(#[$attr:meta])* $id:literal, $name:ident, $pool:ident, $body:block) => {
85        $(#[$attr])*
86        #[doc = ""]
87        #[doc = concat!("**Migration ID:** `", $id, "`")]
88        #[doc = ""]
89        #[doc = "To re-run this migration, delete its tracking row:"]
90        #[doc = concat!("```sql\nDELETE FROM _cadmus_migrations WHERE id = '", $id, "';\n```")]
91        #[cfg_attr(feature = "otel", tracing::instrument(skip($pool), fields(migration_id = $id)))]
92        async fn $name($pool: &::sqlx::SqlitePool) -> ::std::result::Result<(), ::anyhow::Error> {
93            $body
94        }
95
96        /// Module generated by the [`crate::migration!`] macro.
97        #[allow(non_snake_case)]
98        pub mod $name {
99            $(#[$attr])*
100            #[doc = ""]
101            #[doc = concat!("**Migration ID:** `", $id, "`")]
102            #[doc = ""]
103            #[doc = "To re-run this migration, delete its tracking row:"]
104            #[doc = concat!("```sql\nDELETE FROM _cadmus_migrations WHERE id = '", $id, "';\n```")]
105            /// The stable ID used to track this migration in `_cadmus_migrations`.
106            #[allow(dead_code)]
107            pub const MIGRATION_ID: &str = $id;
108
109            #[$crate::ctor::ctor]
110            fn __register() {
111                fn __boxed(
112                    pool: &::sqlx::SqlitePool,
113                ) -> ::std::pin::Pin<
114                    ::std::boxed::Box<
115                        dyn ::std::future::Future<
116                            Output = ::std::result::Result<(), ::anyhow::Error>,
117                        > + ::std::marker::Send
118                        + '_,
119                    >,
120                > {
121                    ::std::boxed::Box::pin(super::$name(pool))
122                }
123                $crate::db::migrations::register($id, __boxed);
124            }
125        }
126    };
127}
128
129#[cfg(feature = "test")]
130mod example;
131
132/// Registers a migration function under the given stable `id`.
133///
134/// # Panics
135///
136/// Panics if the registry lock is poisoned (should never happen in normal use).
137pub fn register(id: &'static str, f: MigrationFn) {
138    let mut registry = REGISTRY
139        .lock()
140        .expect("migration registry lock should not be poisoned");
141    registry.insert(id, f);
142}
143
144/// Runs all registered migrations that have not yet been executed.
145///
146/// Reads the `_cadmus_migrations` table to determine which migrations have
147/// already run. Executes pending migrations ordered by ID, recording each
148/// outcome (`success` or `failed`) before continuing.
149///
150/// On failure of any individual migration, the error is logged and recorded
151/// as `failed` in the tracking table. Execution continues with the remaining
152/// migrations so a single failure does not block subsequent independent ones.
153pub struct MigrationRunner {
154    pool: SqlitePool,
155}
156
157impl MigrationRunner {
158    pub(super) fn new(pool: SqlitePool) -> Self {
159        Self { pool }
160    }
161
162    /// Execute all pending registered migrations against the database.
163    #[cfg_attr(feature = "otel", tracing::instrument(skip(self)))]
164    pub async fn run_all(&self) -> Result<(), Error> {
165        let registry = REGISTRY
166            .lock()
167            .expect("migration registry lock should not be poisoned")
168            .clone();
169
170        if registry.is_empty() {
171            return Ok(());
172        }
173
174        let already_run: Vec<String> =
175            sqlx::query_scalar!("SELECT id FROM _cadmus_migrations WHERE status = 'success'")
176                .fetch_all(&self.pool)
177                .await?;
178
179        let mut pending: Vec<(&'static str, MigrationFn)> = registry
180            .into_iter()
181            .filter(|(id, _)| !already_run.contains(&id.to_string()))
182            .collect();
183
184        if pending.is_empty() {
185            tracing::info!("no pending runtime migrations");
186            return Ok(());
187        }
188
189        pending.sort_by_key(|(id, _)| *id);
190
191        tracing::info!(count = pending.len(), "running pending runtime migrations");
192
193        for (id, migration_fn) in pending {
194            tracing::info!(migration_id = id, "running migration");
195
196            let result = migration_fn(&self.pool).await;
197            let status = match &result {
198                Ok(_) => "success",
199                Err(_) => "failed",
200            };
201
202            let executed_at = UnixTimestamp::now();
203
204            sqlx::query!(
205                "INSERT INTO _cadmus_migrations (id, executed_at, status) VALUES (?, ?, ?)
206                 ON CONFLICT(id) DO UPDATE SET executed_at = excluded.executed_at, status = excluded.status",
207                id,
208                executed_at,
209                status,
210            )
211            .execute(&self.pool)
212            .await?;
213
214            match result {
215                Ok(_) => tracing::info!(migration_id = id, "migration succeeded"),
216                Err(e) => tracing::error!(migration_id = id, error = %e, "migration failed"),
217            }
218        }
219
220        Ok(())
221    }
222}