1use std::collections::HashSet;
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::num::NonZeroU64;
8use std::sync::mpsc::Sender;
9
10use globset::Glob;
11use walkdir::WalkDir;
12
13use crate::context::DICTIONARIES_DIRNAME;
14use crate::db::runtime::RUNTIME;
15use crate::db::Database;
16use crate::dictionary::{normalize, Entry, Metadata};
17use crate::fl;
18use crate::helpers::{Fingerprint, IsHidden};
19use crate::task::{BackgroundTask, ShutdownSignal, TaskId};
20use crate::view::notification::NotificationEvent;
21use crate::view::{Event, ViewId, ID_FEEDER};
22
23const BATCH_SIZE: usize = 5000;
24
25struct IndexFileJob<'a> {
26 index_path: &'a std::path::Path,
27 path_str: &'a str,
28 dict_id: i64,
29 dict_name: &'a str,
30 total_lines: u64,
31 notif_id: ViewId,
32 metadata: Metadata,
33}
34
35fn decode_number(word: &str) -> Option<u64> {
54 let mut index = 0u64;
55 for (i, ch) in word.chars().rev().enumerate() {
56 let base: u64 = match ch {
57 'A'..='Z' => (ch as u64) - 65,
58 'a'..='z' => (ch as u64) - 71,
59 '0'..='9' => (ch as u64) + 4,
60 '+' => 62,
61 '/' => 63,
62 _ => return None,
63 };
64 index += base * 64u64.pow(i as u32);
65 }
66 Some(index)
67}
68
69pub struct DictionaryIndexTask {
74 database: Database,
75}
76
77impl DictionaryIndexTask {
78 pub fn new(database: Database) -> Self {
80 Self { database }
81 }
82
83 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(path = %path_str)))]
88 fn detect_metadata(path_str: &str) -> (bool, bool) {
89 let file = match File::open(path_str) {
90 Ok(f) => f,
91 Err(e) => {
92 tracing::error!(path = %path_str, error = %e, "failed to open index file for metadata detection");
93 return (false, false);
94 }
95 };
96
97 let mut all_chars = false;
98 let mut case_sensitive = false;
99
100 for line in BufReader::new(file).lines() {
101 let line = match line {
102 Ok(l) => l,
103 Err(_) => continue,
104 };
105
106 let word = line.split('\t').next().unwrap_or("");
107
108 if word.is_empty() {
109 continue;
110 } else if word == "00-database-allchars" {
111 all_chars = true;
112 } else if word == "00-database-case-sensitive" || word == "00databasecasesensitive" {
113 case_sensitive = true;
114 } else if !word.starts_with("00-database-") && !word.starts_with("00database") {
115 break;
116 }
117
118 if all_chars && case_sensitive {
119 break;
120 }
121 }
122
123 (case_sensitive, all_chars)
124 }
125
126 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(path = %path_str, fingerprint = %fp_str)))]
132 fn resolve_index_state(
133 &self,
134 index_path: &std::path::Path,
135 path_str: &str,
136 fp_str: &str,
137 ) -> Option<(i64, u64, u64, bool)> {
138 let pool = self.database.pool().clone();
139
140 let meta = RUNTIME.block_on(async {
141 sqlx::query!(
142 r#"SELECT dict_id, total_lines, indexed_lines, completed
143 FROM dictionary_index_meta
144 WHERE fingerprint = ?"#,
145 fp_str,
146 )
147 .fetch_optional(&pool)
148 .await
149 });
150
151 let meta = match meta {
152 Ok(m) => m,
153 Err(e) => {
154 tracing::error!(path = %path_str, fingerprint = %fp_str, error = %e, "failed to query dictionary_index_meta");
155 return None;
156 }
157 };
158
159 if let Some(row) = meta {
160 if row.completed != 0 {
161 tracing::debug!(path = %path_str, fingerprint = %fp_str, "dictionary already indexed, skipping");
162 return None;
163 }
164
165 return Some((
166 row.dict_id?,
167 row.indexed_lines as u64,
168 row.total_lines as u64,
169 false,
170 ));
171 }
172
173 let file = match File::open(index_path) {
174 Ok(f) => f,
175 Err(e) => {
176 tracing::error!(path = %path_str, error = %e, "failed to open index file for line count");
177 return None;
178 }
179 };
180
181 let total = BufReader::new(file).lines().count() as i64;
182
183 let result = RUNTIME.block_on(async {
184 sqlx::query!(
185 r#"INSERT INTO dictionary_index_meta (fingerprint, dict_path, total_lines, indexed_lines, completed)
186 VALUES (?, ?, ?, 0, 0)"#,
187 fp_str,
188 path_str,
189 total,
190 )
191 .execute(&pool)
192 .await
193 });
194
195 if let Err(e) = result {
196 tracing::error!(path = %path_str, error = %e, "failed to insert dictionary_index_meta row");
197 return None;
198 }
199
200 let dict_id: i64 = RUNTIME.block_on(async {
201 sqlx::query_scalar!(
202 "SELECT dict_id FROM dictionary_index_meta WHERE fingerprint = ?",
203 fp_str
204 )
205 .fetch_one(&pool)
206 .await
207 .ok()?
208 })?;
209
210 Some((dict_id, 0u64, total as u64, true))
211 }
212
213 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(path = %path_str, dict_id, indexed = current_line, total = total_lines)))]
215 fn mark_completed(&self, dict_id: i64, path_str: &str, current_line: u64, total_lines: u64) {
216 let pool = self.database.pool().clone();
217
218 let result = RUNTIME.block_on(async {
219 sqlx::query!(
220 "UPDATE dictionary_index_meta SET completed = 1 WHERE dict_id = ?",
221 dict_id,
222 )
223 .execute(&pool)
224 .await
225 });
226
227 if let Err(e) = result {
228 tracing::error!(path = %path_str, error = %e, "failed to mark dictionary as completed");
229 return;
230 }
231
232 tracing::info!(path = %path_str, indexed = current_line, total = total_lines, "dictionary index complete");
233 }
234
235 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(path = %path_str)))]
243 fn parse_index_line<'a>(path_str: &str, line: &'a str) -> Option<(&'a str, i64, i64)> {
244 let trimmed = line.trim_end();
245 let mut cols = trimmed.split('\t');
246
247 let word = cols.next()?;
248
249 let offset_str = cols.next()?;
250 let offset = match decode_number(offset_str) {
251 Some(o) => o as i64,
252 None => {
253 tracing::error!(path = %path_str, word, offset_str, "failed to decode offset");
254 return None;
255 }
256 };
257
258 let size_str = cols.next()?;
259 let size = match decode_number(size_str) {
260 Some(s) => s as i64,
261 None => {
262 tracing::error!(path = %path_str, word, size_str, "failed to decode size");
263 return None;
264 }
265 };
266
267 Some((word, offset, size))
268 }
269
270 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(path = %job.path_str, skip_lines, total_lines = job.total_lines)))]
276 fn scan_and_batch(
277 &self,
278 job: &IndexFileJob<'_>,
279 skip_lines: u64,
280 hub: &Sender<Event>,
281 shutdown: &ShutdownSignal,
282 ) -> Option<u64> {
283 let file = match File::open(job.index_path) {
284 Ok(f) => f,
285 Err(e) => {
286 tracing::error!(path = %job.path_str, error = %e, "failed to open index file");
287 return None;
288 }
289 };
290
291 let reader = BufReader::new(file);
292 let mut lines_iter = reader.lines().enumerate();
293
294 for _ in 0..skip_lines {
295 lines_iter.next();
296 }
297
298 let mut current_line = skip_lines;
299 let mut raw_batch: Vec<Entry> = Vec::with_capacity(BATCH_SIZE);
300
301 for (_, line_result) in &mut lines_iter {
302 let line = match line_result {
303 Ok(l) => l,
304 Err(e) => {
305 tracing::error!(path = %job.path_str, line = current_line, error = %e, "failed to read line");
306 current_line += 1;
307 continue;
308 }
309 };
310
311 current_line += 1;
312
313 if let Some((word, offset, size)) = Self::parse_index_line(job.path_str, &line) {
314 raw_batch.push(Entry {
315 headword: word.to_string(),
316 offset: offset as u64,
317 size: size as u64,
318 original: None,
319 });
320 }
321
322 if raw_batch.len() >= BATCH_SIZE {
323 let normalized = normalize(&raw_batch, &job.metadata);
324 let batch: Vec<(i64, String, i64, i64, Option<String>)> = normalized
325 .into_iter()
326 .map(|e| {
327 (
328 job.dict_id,
329 e.headword,
330 e.offset as i64,
331 e.size as i64,
332 e.original,
333 )
334 })
335 .collect();
336
337 if let Err(e) = self.flush_batch(job, &batch, current_line, hub) {
338 tracing::error!(path = %job.path_str, error = %e, "failed to flush batch");
339 return None;
340 }
341
342 raw_batch.clear();
343
344 if shutdown.should_stop() {
345 return None;
346 }
347 }
348 }
349
350 if !raw_batch.is_empty() {
351 let normalized = normalize(&raw_batch, &job.metadata);
352 let batch: Vec<(i64, String, i64, i64, Option<String>)> = normalized
353 .into_iter()
354 .map(|e| {
355 (
356 job.dict_id,
357 e.headword,
358 e.offset as i64,
359 e.size as i64,
360 e.original,
361 )
362 })
363 .collect();
364
365 if let Err(e) = self.flush_batch(job, &batch, current_line, hub) {
366 tracing::error!(path = %job.path_str, error = %e, "failed to flush final batch");
367 return None;
368 }
369 }
370
371 Some(current_line)
372 }
373
374 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(path = %index_path.display())))]
375 fn index_file(
376 &self,
377 index_path: &std::path::Path,
378 hub: &Sender<Event>,
379 shutdown: &ShutdownSignal,
380 ) {
381 let path_str = index_path.display().to_string();
382
383 let dict_name = index_path
384 .file_stem()
385 .map(|s| s.to_string_lossy().into_owned())
386 .unwrap_or_else(|| path_str.clone());
387
388 let fp = match index_path.fingerprint() {
389 Ok(fp) => fp,
390 Err(e) => {
391 tracing::error!(path = %path_str, error = %e, "failed to fingerprint index file");
392 return;
393 }
394 };
395
396 let fp_str = fp.to_string();
397
398 let (dict_id, skip_lines, total_lines, is_new) =
399 match self.resolve_index_state(index_path, &path_str, &fp_str) {
400 Some(state) => state,
401 None => {
402 return;
403 }
404 };
405
406 if is_new {
407 hub.send(Event::ReloadDictionaries).ok();
408 }
409
410 let (case_sensitive, all_chars) = Self::detect_metadata(&path_str);
411 let metadata = Metadata {
412 case_sensitive,
413 all_chars,
414 };
415
416 let notif_id = ViewId::MessageNotif(ID_FEEDER.next());
417 hub.send(Event::Notification(NotificationEvent::ShowPinned(
418 notif_id,
419 fl!(
420 "notification-dictionary-indexing",
421 name = dict_name.as_str()
422 ),
423 )))
424 .ok();
425
426 let job = IndexFileJob {
427 index_path,
428 path_str: &path_str,
429 dict_id,
430 dict_name: &dict_name,
431 total_lines,
432 notif_id,
433 metadata,
434 };
435
436 tracing::debug!(path = %path_str, dict_id, skip_lines, total_lines, case_sensitive, all_chars, "starting dictionary indexing");
437
438 match self.scan_and_batch(&job, skip_lines, hub, shutdown) {
439 Some(current_line) => {
440 self.mark_completed(dict_id, &path_str, current_line, total_lines);
441 hub.send(Event::ReloadDictionaries).ok();
442 hub.send(Event::Close(notif_id)).ok();
443 }
444 None => {
445 hub.send(Event::Close(notif_id)).ok();
446 }
447 }
448 }
449
450 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(batch_size = batch.len(), current_line, total_lines = job.total_lines)))]
451 fn flush_batch(
452 &self,
453 job: &IndexFileJob<'_>,
454 batch: &[(i64, String, i64, i64, Option<String>)],
455 current_line: u64,
456 hub: &Sender<Event>,
457 ) -> Result<(), anyhow::Error> {
458 let pool = self.database.pool().clone();
459 let indexed_lines = current_line as i64;
460
461 RUNTIME.block_on(async {
462 let mut tx = pool.begin().await?;
463
464 for (dict_id, word, offset, size, original) in batch {
465 sqlx::query!(
466 r#"INSERT OR IGNORE INTO dictionary_index_entry (dict_id, word, offset, size, original)
467 VALUES (?, ?, ?, ?, ?)"#,
468 dict_id,
469 word,
470 offset,
471 size,
472 original,
473 )
474 .execute(&mut *tx)
475 .await?;
476 }
477
478 sqlx::query!(
479 "UPDATE dictionary_index_meta SET indexed_lines = ? WHERE dict_id = ?",
480 indexed_lines,
481 job.dict_id,
482 )
483 .execute(&mut *tx)
484 .await?;
485
486 tx.commit().await?;
487
488 Ok::<_, anyhow::Error>(())
489 })?;
490
491 let progress = NonZeroU64::new(job.total_lines)
492 .and_then(|total_lines| {
493 current_line
494 .checked_mul(100)
495 .map(|value| value / total_lines.get())
496 })
497 .unwrap_or(0)
498 .min(100) as u8;
499 let msg = fl!("notification-dictionary-indexing", name = job.dict_name);
500 hub.send(Event::Notification(NotificationEvent::UpdateText(
501 job.notif_id,
502 msg,
503 )))
504 .ok();
505 hub.send(Event::Notification(NotificationEvent::UpdateProgress(
506 job.notif_id,
507 progress,
508 )))
509 .ok();
510
511 Ok(())
512 }
513
514 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(on_disk_count = on_disk_fingerprints.len())))]
523 fn delete_stale_entries(
524 &self,
525 on_disk_fingerprints: &[String],
526 hub: &Sender<Event>,
527 shutdown: &ShutdownSignal,
528 ) {
529 let pool = self.database.pool().clone();
530
531 let result = RUNTIME.block_on(async {
532 let on_disk_set: HashSet<&str> =
533 on_disk_fingerprints.iter().map(|s| s.as_str()).collect();
534
535 let db_entries = sqlx::query!(
536 "SELECT fingerprint, dict_id FROM dictionary_index_meta"
537 )
538 .fetch_all(&pool)
539 .await?;
540
541 let mut deleted_any = false;
542
543 for row in db_entries {
544 let fp = row.fingerprint;
545
546 if on_disk_set.contains(fp.as_str()) {
547 continue;
548 }
549
550 let dict_id = match row.dict_id {
551 Some(id) => id,
552 None => {
553 tracing::warn!(fingerprint = %fp, "dict_id missing for stale fingerprint, skipping");
554 continue;
555 }
556 };
557
558 tracing::info!(fingerprint = %fp, "removing stale dictionary index");
559
560 sqlx::query!(
561 "UPDATE dictionary_index_meta SET completed = 0, indexed_lines = 0 WHERE dict_id = ?",
562 dict_id,
563 )
564 .execute(&pool)
565 .await?;
566
567 let total_deleted =
568 delete_entries_for_dict(&pool, dict_id, shutdown).await?;
569
570 tracing::info!(fingerprint = %fp, total_deleted, "deleted stale dictionary index entries");
571
572 sqlx::query!(
573 "DELETE FROM dictionary_index_meta WHERE fingerprint = ?",
574 fp
575 )
576 .execute(&pool)
577 .await?;
578
579 deleted_any = true;
580
581 if shutdown.should_stop() {
582 break;
583 }
584 }
585
586 Ok::<_, anyhow::Error>(deleted_any)
587 });
588
589 match result {
590 Ok(true) => {
591 hub.send(Event::ReloadDictionaries).ok();
592 }
593 Ok(false) => {}
594 Err(e) => {
595 tracing::error!(error = %e, "failed to delete stale dictionary index entries");
596 }
597 }
598 }
599}
600
601#[cfg_attr(
615 feature = "tracing",
616 tracing::instrument(skip(pool, shutdown), fields(dict_id))
617)]
618async fn delete_entries_for_dict(
619 pool: &sqlx::SqlitePool,
620 dict_id: i64,
621 shutdown: &ShutdownSignal,
622) -> Result<u64, anyhow::Error> {
623 let batch_size = BATCH_SIZE as i64;
624 let mut total_deleted: u64 = 0;
625
626 loop {
627 let keys = sqlx::query!(
628 "SELECT word, offset FROM dictionary_index_entry WHERE dict_id = ? LIMIT ?",
629 dict_id,
630 batch_size,
631 )
632 .fetch_all(pool)
633 .await?;
634
635 if keys.is_empty() {
636 break;
637 }
638
639 let mut tx = pool.begin().await?;
640
641 for key in &keys {
642 sqlx::query!(
643 "DELETE FROM dictionary_index_entry WHERE dict_id = ? AND word = ? AND offset = ?",
644 dict_id,
645 key.word,
646 key.offset,
647 )
648 .execute(&mut *tx)
649 .await?;
650 }
651
652 tx.commit().await?;
653
654 total_deleted += keys.len() as u64;
655
656 if shutdown.should_stop() {
657 tracing::info!(total_deleted, "entry deletion interrupted by shutdown");
658 return Ok(total_deleted);
659 }
660 }
661
662 Ok(total_deleted)
663}
664
665impl BackgroundTask for DictionaryIndexTask {
666 fn id(&self) -> TaskId {
667 TaskId::DictionaryIndex
668 }
669
670 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
671 fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal) {
672 let glob = match Glob::new("**/*.index") {
673 Ok(g) => g.compile_matcher(),
674 Err(e) => {
675 tracing::error!(error = %e, "failed to compile glob pattern for dictionary index task");
676 return;
677 }
678 };
679
680 let path = std::path::Path::new(DICTIONARIES_DIRNAME);
681
682 let mut on_disk_fingerprints: Vec<String> = Vec::new();
683
684 for entry in WalkDir::new(path)
685 .min_depth(1)
686 .into_iter()
687 .filter_entry(|e| !e.is_hidden())
688 {
689 if shutdown.should_stop() {
690 return;
691 }
692
693 let entry = match entry {
694 Ok(e) => e,
695 Err(e) => {
696 tracing::error!(error = %e, "failed to read directory entry");
697 continue;
698 }
699 };
700
701 if !glob.is_match(entry.path()) {
702 continue;
703 }
704
705 if let Ok(fp) = entry.path().fingerprint() {
706 on_disk_fingerprints.push(fp.to_string());
707 }
708
709 self.index_file(entry.path(), hub, shutdown);
710 }
711
712 if shutdown.should_stop() {
713 return;
714 }
715
716 self.delete_stale_entries(&on_disk_fingerprints, hub, shutdown);
717 }
718}
719
720#[cfg(test)]
721mod tests {
722 use super::*;
723 use crate::db::{runtime::RUNTIME, Database};
724
725 fn setup_db() -> Database {
726 let db = Database::new(":memory:").expect("failed to create in-memory database");
727 db.migrate().expect("failed to run migrations");
728 db
729 }
730
731 async fn insert_meta(pool: &sqlx::SqlitePool, fingerprint: &str) -> i64 {
732 sqlx::query_scalar!(
733 "INSERT INTO dictionary_index_meta (fingerprint, dict_path, total_lines) VALUES (?, ?, ?) RETURNING dict_id",
734 fingerprint,
735 fingerprint,
736 0_i64,
737 )
738 .fetch_one(pool)
739 .await
740 .expect("failed to insert meta")
741 }
742
743 async fn insert_entry(pool: &sqlx::SqlitePool, dict_id: i64, word: &str, offset: i64) {
744 sqlx::query!(
745 "INSERT INTO dictionary_index_entry (dict_id, word, offset, size) VALUES (?, ?, ?, 0)",
746 dict_id,
747 word,
748 offset,
749 )
750 .execute(pool)
751 .await
752 .expect("failed to insert entry");
753 }
754
755 async fn count_entries(pool: &sqlx::SqlitePool, dict_id: i64) -> i64 {
756 sqlx::query_scalar!(
757 "SELECT COUNT(*) FROM dictionary_index_entry WHERE dict_id = ?",
758 dict_id,
759 )
760 .fetch_one(pool)
761 .await
762 .expect("failed to count entries")
763 }
764
765 #[test]
766 fn test_delete_entries_for_dict_removes_all_entries() {
767 let db = setup_db();
768 let pool = db.pool();
769 let shutdown = ShutdownSignal::never();
770
771 RUNTIME.block_on(async {
772 let dict_id = insert_meta(pool, "all-entries").await;
773 for i in 0..5_i64 {
774 insert_entry(pool, dict_id, "word", i).await;
775 }
776
777 let deleted = delete_entries_for_dict(pool, dict_id, &shutdown)
778 .await
779 .expect("delete should succeed");
780
781 assert_eq!(deleted, 5);
782 assert_eq!(count_entries(pool, dict_id).await, 0);
783 });
784 }
785
786 #[test]
787 fn test_delete_entries_for_dict_only_removes_target_dict() {
788 let db = setup_db();
789 let pool = db.pool();
790 let shutdown = ShutdownSignal::never();
791
792 RUNTIME.block_on(async {
793 let dict_a = insert_meta(pool, "dict-a").await;
794 let dict_b = insert_meta(pool, "dict-b").await;
795
796 insert_entry(pool, dict_a, "apple", 0).await;
797 insert_entry(pool, dict_b, "banana", 0).await;
798 insert_entry(pool, dict_b, "cherry", 0).await;
799
800 let deleted = delete_entries_for_dict(pool, dict_a, &shutdown)
801 .await
802 .expect("delete should succeed");
803
804 assert_eq!(deleted, 1);
805 assert_eq!(count_entries(pool, dict_a).await, 0);
806 assert_eq!(count_entries(pool, dict_b).await, 2);
807 });
808 }
809}