cadmus_core/db/migrations/mod.rs
1use crate::db::types::UnixTimestamp;
2use anyhow::Error;
3use once_cell::sync::Lazy;
4use sqlx::SqlitePool;
5use std::collections::HashMap;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Mutex;
9
10type MigrationFn =
11 for<'a> fn(&'a SqlitePool) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>;
12
13static REGISTRY: Lazy<Mutex<HashMap<&'static str, MigrationFn>>> =
14 Lazy::new(|| Mutex::new(HashMap::new()));
15
16/// Registers an `async fn` as a Cadmus runtime database migration.
17///
18/// The macro takes a stable string literal ID and an `async fn` definition.
19/// The function is registered into the global migration registry automatically
20/// before `main` runs, via `ctor`.
21///
22/// Migrations can be defined anywhere inside `cadmus-core` — co-locate them
23/// with the feature they belong to (e.g. `library/migrations.rs`).
24///
25/// Doc comments placed before the `async fn` are forwarded onto the generated
26/// function and the generated submodule, making them visible in rustdoc.
27/// The macro also appends the migration ID and a rerun SQL snippet
28/// automatically.
29///
30/// # Arguments
31///
32/// * `$id` — Stable string literal identifying this migration in
33/// `_cadmus_migrations`. Never change after deployment.
34/// * The `async fn` — Must accept `&SqlitePool` and return
35/// `Result<(), anyhow::Error>`.
36///
37/// The generated inner module is named after `$name`, so `$name` must be
38/// unique across all `migration!` calls in the crate.
39///
40/// # Use Cases
41///
42/// Migrations are one-time operations that run once per deployment. They can:
43/// - **Transform data** — backfill metadata, normalize formats, compute derived values
44/// - **Import legacy data** — load from filesystem/config into the database
45/// - **Cleanup** — prune obsolete rows, remove temporary data, optimize storage
46/// - **Any procedural setup** — not limited to SQL; can call filesystem operations,
47/// external APIs, or other async code
48///
49/// **Note:** Schema updates (creating/altering tables) should be handled via `.sql`
50/// files in `crates/core/migrations/`, not runtime migrations. SQL migrations are
51/// applied by `sqlx-cli` or the `migrate!` macro before runtime migrations run.
52///
53/// # SQL Best Practices
54///
55/// When executing SQL in migrations, use SQLx's typed query macros for compile-time
56/// verification:
57///
58/// - Use `sqlx::query!` for `INSERT`, `UPDATE`, `DELETE`, and `SELECT` returning rows
59/// - Use `sqlx::query_as!` when mapping results into a named struct
60/// - Use `sqlx::query_scalar!` for single-column results
61/// - Pass `&mut **tx` (double deref) when executing against a transaction
62///
63/// # Example
64///
65/// ```rust
66/// mod my_migrations {
67/// use sqlx::SqlitePool;
68///
69/// cadmus_core::migration!(
70/// /// Backfills metadata from legacy storage.
71/// "v1_backfill_metadata",
72/// async fn backfill_metadata(pool: &SqlitePool) {
73/// // sqlx::query!(...).execute(pool).await?;
74/// Ok(())
75/// }
76/// );
77/// }
78/// ```
79#[macro_export]
80macro_rules! migration {
81 ($(#[$attr:meta])* $id:literal, async fn $name:ident($pool:ident : &$pool_ty:ty) $body:block) => {
82 $crate::migration!(@internal $(#[$attr])* $id, $name, $pool, $body);
83 };
84 (@internal $(#[$attr:meta])* $id:literal, $name:ident, $pool:ident, $body:block) => {
85 $(#[$attr])*
86 #[doc = ""]
87 #[doc = concat!("**Migration ID:** `", $id, "`")]
88 #[doc = ""]
89 #[doc = "To re-run this migration, delete its tracking row:"]
90 #[doc = concat!("```sql\nDELETE FROM _cadmus_migrations WHERE id = '", $id, "';\n```")]
91 #[cfg_attr(feature = "otel", tracing::instrument(skip($pool), fields(migration_id = $id)))]
92 async fn $name($pool: &::sqlx::SqlitePool) -> ::std::result::Result<(), ::anyhow::Error> {
93 $body
94 }
95
96 /// Module generated by the [`crate::migration!`] macro.
97 #[allow(non_snake_case)]
98 pub mod $name {
99 $(#[$attr])*
100 #[doc = ""]
101 #[doc = concat!("**Migration ID:** `", $id, "`")]
102 #[doc = ""]
103 #[doc = "To re-run this migration, delete its tracking row:"]
104 #[doc = concat!("```sql\nDELETE FROM _cadmus_migrations WHERE id = '", $id, "';\n```")]
105 /// The stable ID used to track this migration in `_cadmus_migrations`.
106 #[allow(dead_code)]
107 pub const MIGRATION_ID: &str = $id;
108
109 #[$crate::ctor::ctor]
110 fn __register() {
111 fn __boxed(
112 pool: &::sqlx::SqlitePool,
113 ) -> ::std::pin::Pin<
114 ::std::boxed::Box<
115 dyn ::std::future::Future<
116 Output = ::std::result::Result<(), ::anyhow::Error>,
117 > + ::std::marker::Send
118 + '_,
119 >,
120 > {
121 ::std::boxed::Box::pin(super::$name(pool))
122 }
123 $crate::db::migrations::register($id, __boxed);
124 }
125 }
126 };
127}
128
129#[cfg(feature = "test")]
130mod example;
131
132/// Registers a migration function under the given stable `id`.
133///
134/// # Panics
135///
136/// Panics if the registry lock is poisoned (should never happen in normal use).
137pub fn register(id: &'static str, f: MigrationFn) {
138 let mut registry = REGISTRY
139 .lock()
140 .expect("migration registry lock should not be poisoned");
141 registry.insert(id, f);
142}
143
144/// Runs all registered migrations that have not yet been executed.
145///
146/// Reads the `_cadmus_migrations` table to determine which migrations have
147/// already run. Executes pending migrations ordered by ID, recording each
148/// outcome (`success` or `failed`) before continuing.
149///
150/// On failure of any individual migration, the error is logged and recorded
151/// as `failed` in the tracking table. Execution continues with the remaining
152/// migrations so a single failure does not block subsequent independent ones.
153pub struct MigrationRunner {
154 pool: SqlitePool,
155}
156
157impl MigrationRunner {
158 pub(super) fn new(pool: SqlitePool) -> Self {
159 Self { pool }
160 }
161
162 /// Execute all pending registered migrations against the database.
163 #[cfg_attr(feature = "otel", tracing::instrument(skip(self)))]
164 pub async fn run_all(&self) -> Result<(), Error> {
165 let registry = REGISTRY
166 .lock()
167 .expect("migration registry lock should not be poisoned")
168 .clone();
169
170 if registry.is_empty() {
171 return Ok(());
172 }
173
174 let already_run: Vec<String> =
175 sqlx::query_scalar!("SELECT id FROM _cadmus_migrations WHERE status = 'success'")
176 .fetch_all(&self.pool)
177 .await?;
178
179 let mut pending: Vec<(&'static str, MigrationFn)> = registry
180 .into_iter()
181 .filter(|(id, _)| !already_run.contains(&id.to_string()))
182 .collect();
183
184 if pending.is_empty() {
185 tracing::info!("no pending runtime migrations");
186 return Ok(());
187 }
188
189 pending.sort_by_key(|(id, _)| *id);
190
191 tracing::info!(count = pending.len(), "running pending runtime migrations");
192
193 for (id, migration_fn) in pending {
194 tracing::info!(migration_id = id, "running migration");
195
196 let result = migration_fn(&self.pool).await;
197 let status = match &result {
198 Ok(_) => "success",
199 Err(_) => "failed",
200 };
201
202 let executed_at = UnixTimestamp::now();
203
204 sqlx::query!(
205 "INSERT INTO _cadmus_migrations (id, executed_at, status) VALUES (?, ?, ?)
206 ON CONFLICT(id) DO UPDATE SET executed_at = excluded.executed_at, status = excluded.status",
207 id,
208 executed_at,
209 status,
210 )
211 .execute(&self.pool)
212 .await?;
213
214 match result {
215 Ok(_) => tracing::info!(migration_id = id, "migration succeeded"),
216 Err(e) => tracing::error!(migration_id = id, error = %e, "migration failed"),
217 }
218 }
219
220 Ok(())
221 }
222}