diff --git a/src/app.rs b/src/app.rs index dc15c0c..352cd1d 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,7 +1,11 @@ -use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; +use std::sync::Arc; + use chrono::NaiveDateTime; +use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; +use tokio::sync::mpsc; use crate::domain::models::*; +use crate::infrastructure::api::ApiClient; use crate::infrastructure::db::Db; use crate::ui::{Focus, NetworkStatus, Popup}; @@ -19,11 +23,18 @@ pub struct App { pub should_quit: bool, pub task_list_scroll: u16, pub detail_scroll: u16, - pub db: Db, + pub db: Arc, + pub api_client: Arc, + sync_tx: mpsc::Sender, +} + +pub enum SyncCommand { + TriggerSync, + Shutdown, } impl App { - pub fn new(db: Db) -> Self { + pub fn new(db: Arc, api_client: Arc, sync_tx: mpsc::Sender) -> Self { let lists = db.get_lists(); let tasks = if !lists.is_empty() { let list_id = &lists[0].id; @@ -47,9 +58,15 @@ impl App { task_list_scroll: 0, detail_scroll: 0, db, + api_client, + sync_tx, } } + fn trigger_sync(&self) { + let _ = self.sync_tx.try_send(SyncCommand::TriggerSync); + } + pub fn handle_key(&mut self, key: KeyEvent) { if let Some(ref popup) = self.show_popup.clone() { self.handle_popup_key(key, popup); @@ -167,6 +184,7 @@ impl App { &list.id, &serde_json::to_string(&list).unwrap_or_default(), ).ok(); + self.trigger_sync(); self.load_lists(); } Focus::TaskList => { @@ -188,13 +206,14 @@ impl App { list_id, &serde_json::to_string(&task).unwrap_or_default(), ).ok(); + self.trigger_sync(); self.load_tasks(); } } Focus::Detail => { if !self.tasks.is_empty() { let task = &mut self.tasks[self.selected_task]; - task.title = input; + task.title = input; self.db.update_task(task).ok(); self.db.push_sync( SyncAction::Update, @@ -202,9 +221,8 @@ impl App { &task.list_id, &serde_json::to_string(task).unwrap_or_default(), ).ok(); + self.trigger_sync(); self.load_tasks(); - if !self.tasks.is_empty() && self.selected_task < self.tasks.len() { - } } } } @@ -257,6 +275,7 @@ impl App { &task.list_id, &serde_json::to_string(task).unwrap_or_default(), ).ok(); + self.trigger_sync(); self.load_tasks(); } self.show_popup = None; @@ -285,6 +304,7 @@ impl App { &list_id, "", ).ok(); + self.trigger_sync(); self.load_lists(); if self.selected_list >= self.lists.len() { self.selected_list = self.lists.len().saturating_sub(1); @@ -304,6 +324,7 @@ impl App { &list_id, "", ).ok(); + self.trigger_sync(); self.load_tasks(); if self.selected_task >= self.tasks.len() { self.selected_task = self.tasks.len().saturating_sub(1); @@ -354,6 +375,7 @@ impl App { &list_id, &payload.to_string(), ).ok(); + self.trigger_sync(); self.selected_task = new_index as usize; self.load_tasks(); } diff --git a/src/infrastructure/api.rs b/src/infrastructure/api.rs index 0116a35..f61b02a 100644 --- a/src/infrastructure/api.rs +++ b/src/infrastructure/api.rs @@ -1 +1,431 @@ -// TODO: Fase 5 - Google Tasks API integration +use std::path::PathBuf; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; +use tokio::time::{sleep, Duration}; + +use crate::domain::models::*; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OAuthToken { + pub access_token: String, + pub refresh_token: Option, + pub expires_at: Option>, +} + +#[derive(Debug)] +pub enum ApiError { + Network(String), + Auth(String), + Api(String), +} + +pub struct ApiClient { + client: Client, + client_id: String, + client_secret: String, + token: Arc>>, + token_path: PathBuf, +} + +impl ApiClient { + pub fn new(client_id: String, client_secret: String) -> Self { + let token_path = dirs::config_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join("task_app") + .join("token.json"); + + Self { + client: Client::new(), + client_id, + client_secret, + token: Arc::new(Mutex::new(None)), + token_path, + } + } + + pub async fn load_token(&self) -> Option { + let content = std::fs::read_to_string(&self.token_path).ok()?; + serde_json::from_str(&content).ok() + } + + pub async fn save_token(&self, token: &OAuthToken) { + if let Some(parent) = self.token_path.parent() { + std::fs::create_dir_all(parent).ok(); + } + if let Ok(content) = serde_json::to_string_pretty(token) { + std::fs::write(&self.token_path, content).ok(); + } + } + + pub async fn authenticate(&self) -> Result<(String, String), ApiError> { + let params = serde_json::json!({ + "client_id": self.client_id, + "scope": "https://www.googleapis.com/auth/tasks", + }); + + let resp = self + .client + .post("https://oauth2.googleapis.com/device/code") + .json(¶ms) + .send() + .await + .map_err(|e| ApiError::Network(e.to_string()))?; + + let data: serde_json::Value = resp + .json() + .await + .map_err(|e| ApiError::Network(e.to_string()))?; + + let url = data["verification_url"] + .as_str() + .unwrap_or(&data["verification_url"].to_string()) + .to_string(); + let code = data["user_code"] + .as_str() + .unwrap_or("") + .to_string(); + let device_code = data["device_code"].as_str().unwrap_or("").to_string(); + let interval = data["interval"].as_u64().unwrap_or(5); + + tokio::spawn({ + let client = self.client.clone(); + let client_id = self.client_id.clone(); + let client_secret = self.client_secret.clone(); + let device_code = device_code.clone(); + let token = self.token.clone(); + let token_path = self.token_path.clone(); + + async move { + loop { + sleep(Duration::from_secs(interval)).await; + + let poll = serde_json::json!({ + "client_id": client_id, + "client_secret": client_secret, + "device_code": device_code, + "grant_type": "urn:ietf:params:oauth:grant_type:device_code", + }); + + if let Ok(resp) = client + .post("https://oauth2.googleapis.com/token") + .json(&poll) + .send() + .await + { + if let Ok(data) = resp.json::().await { + if let Some(access_token) = data["access_token"].as_str() { + let expires_in = data["expires_in"].as_i64().unwrap_or(3600); + let oauth_token = OAuthToken { + access_token: access_token.to_string(), + refresh_token: data["refresh_token"].as_str().map(|s| s.to_string()), + expires_at: Some(Utc::now() + chrono::Duration::seconds(expires_in)), + }; + + if let Ok(content) = serde_json::to_string_pretty(&oauth_token) { + std::fs::write(&token_path, content).ok(); + } + + let mut t = token.lock().await; + *t = Some(oauth_token); + break; + } + } + } + } + } + }); + + Ok((url, code)) + } + + pub async fn refresh_access_token(&self, refresh_token: &str) -> Result<(), ApiError> { + let params = serde_json::json!({ + "client_id": self.client_id, + "client_secret": self.client_secret, + "refresh_token": refresh_token, + "grant_type": "refresh_token", + }); + + let resp = self + .client + .post("https://oauth2.googleapis.com/token") + .json(¶ms) + .send() + .await + .map_err(|e| ApiError::Network(e.to_string()))?; + + let data: serde_json::Value = resp + .json() + .await + .map_err(|e| ApiError::Network(e.to_string()))?; + + if let Some(access_token) = data["access_token"].as_str() { + let expires_in = data["expires_in"].as_i64().unwrap_or(3600); + let token = OAuthToken { + access_token: access_token.to_string(), + refresh_token: Some(refresh_token.to_string()), + expires_at: Some(Utc::now() + chrono::Duration::seconds(expires_in)), + }; + + self.save_token(&token).await; + let mut t = self.token.lock().await; + *t = Some(token); + Ok(()) + } else { + Err(ApiError::Auth("Failed to refresh token".to_string())) + } + } + + pub async fn ensure_token(&self) -> Result { + let mut token = self.token.lock().await; + if let Some(ref t) = *token { + if let Some(expires_at) = t.expires_at { + if Utc::now() < expires_at { + return Ok(t.access_token.clone()); + } + } + if let Some(ref refresh) = t.refresh_token { + let refresh_token = refresh.clone(); + drop(token); + self.refresh_access_token(&refresh_token).await?; + let t2 = self.token.lock().await; + if let Some(ref t) = *t2 { + return Ok(t.access_token.clone()); + } + } + Err(ApiError::Auth("Token expired and no refresh token".to_string())) + } else if let Some(saved) = self.load_token().await { + *token = Some(saved); + if let Some(ref t) = *token { + Ok(t.access_token.clone()) + } else { + Err(ApiError::Auth("No token available".to_string())) + } + } else { + Err(ApiError::Auth("Not authenticated".to_string())) + } + } + + pub async fn fetch_lists(&self) -> Result, ApiError> { + let token = self.ensure_token().await?; + + let resp = self + .client + .get("https://tasks.googleapis.com/tasks/v1/users/@me/lists") + .bearer_auth(&token) + .send() + .await + .map_err(|e| ApiError::Network(e.to_string()))?; + + let data: serde_json::Value = resp + .json() + .await + .map_err(|e| ApiError::Api(e.to_string()))?; + + let empty = vec![]; + let items = data["items"].as_array().unwrap_or(&empty); + let lists = items + .iter() + .map(|item| TaskList { + id: item["id"].as_str().unwrap_or("").to_string(), + title: item["title"].as_str().unwrap_or("").to_string(), + }) + .collect(); + + Ok(lists) + } + + pub async fn fetch_tasks(&self, list_id: &str) -> Result, ApiError> { + let token = self.ensure_token().await?; + + let url = format!( + "https://tasks.googleapis.com/tasks/v1/lists/{}/tasks?showCompleted=true&showHidden=true", + list_id + ); + + let resp = self + .client + .get(&url) + .bearer_auth(&token) + .send() + .await + .map_err(|e| ApiError::Network(e.to_string()))?; + + let data: serde_json::Value = resp + .json() + .await + .map_err(|e| ApiError::Api(e.to_string()))?; + + let empty = vec![]; + let items = data["items"].as_array().unwrap_or(&empty); + let tasks = items + .iter() + .enumerate() + .map(|(i, item)| { + let due_str = item["due"].as_str().and_then(|s| { + chrono::NaiveDateTime::parse_from_str( + &s.replace("T", " ").replace("Z", "").chars().take(16).collect::(), + "%Y-%m-%d %H:%M", + ) + .ok() + }); + + Task { + id: item["id"].as_str().unwrap_or("").to_string(), + list_id: list_id.to_string(), + title: item["title"].as_str().unwrap_or("").to_string(), + notes: item["notes"].as_str().map(|s| s.to_string()), + status: if item["status"].as_str() == Some("completed") { + TaskStatus::Completed + } else { + TaskStatus::NeedsAction + }, + due: due_str, + position: i as i64, + } + }) + .collect(); + + Ok(tasks) + } + + pub async fn create_task(&self, list_id: &str, task: &Task) -> Result<(), ApiError> { + let token = self.ensure_token().await?; + + let mut body = serde_json::json!({ + "title": task.title, + }); + + if let Some(ref notes) = task.notes { + body["notes"] = serde_json::Value::String(notes.clone()); + } + if let Some(due) = task.due { + body["due"] = serde_json::Value::String(due.format("%Y-%m-%dT%H:%M:00.000Z").to_string()); + } + if task.status == TaskStatus::Completed { + body["status"] = serde_json::Value::String("completed".to_string()); + } + + let url = format!( + "https://tasks.googleapis.com/tasks/v1/lists/{}/tasks", + list_id + ); + + let resp = self + .client + .post(&url) + .bearer_auth(&token) + .json(&body) + .send() + .await + .map_err(|e| ApiError::Network(e.to_string()))?; + + if !resp.status().is_success() { + return Err(ApiError::Api(format!("Create failed: {}", resp.status()))); + } + + Ok(()) + } + + pub async fn update_task(&self, list_id: &str, task: &Task) -> Result<(), ApiError> { + let token = self.ensure_token().await?; + + let mut body = serde_json::json!({ + "title": task.title, + }); + + if let Some(ref notes) = task.notes { + body["notes"] = serde_json::Value::String(notes.clone()); + } + if let Some(due) = task.due { + body["due"] = serde_json::Value::String(due.format("%Y-%m-%dT%H:%M:00.000Z").to_string()); + } + body["status"] = serde_json::Value::String(match task.status { + TaskStatus::Completed => "completed".to_string(), + TaskStatus::NeedsAction => "needsAction".to_string(), + }); + + let url = format!( + "https://tasks.googleapis.com/tasks/v1/lists/{}/tasks/{}", + list_id, task.id + ); + + let resp = self + .client + .patch(&url) + .bearer_auth(&token) + .json(&body) + .send() + .await + .map_err(|e| ApiError::Network(e.to_string()))?; + + if !resp.status().is_success() { + return Err(ApiError::Api(format!("Update failed: {}", resp.status()))); + } + + Ok(()) + } + + pub async fn delete_task(&self, list_id: &str, task_id: &str) -> Result<(), ApiError> { + let token = self.ensure_token().await?; + + let url = format!( + "https://tasks.googleapis.com/tasks/v1/lists/{}/tasks/{}", + list_id, task_id + ); + + let resp = self + .client + .delete(&url) + .bearer_auth(&token) + .send() + .await + .map_err(|e| ApiError::Network(e.to_string()))?; + + if !resp.status().is_success() { + return Err(ApiError::Api(format!("Delete failed: {}", resp.status()))); + } + + Ok(()) + } + + pub async fn move_task( + &self, + list_id: &str, + task_id: &str, + prev: Option<&str>, + sibling: Option<&str>, + ) -> Result<(), ApiError> { + let token = self.ensure_token().await?; + + let mut url = format!( + "https://tasks.googleapis.com/tasks/v1/lists/{}/tasks/{}/move", + list_id, task_id + ); + + if let Some(p) = prev { + url.push_str(&format!("&previous={}", p)); + } + if let Some(s) = sibling { + url.push_str(&format!("&destinationTaskList={}", s)); + } + + let resp = self + .client + .post(&url) + .bearer_auth(&token) + .send() + .await + .map_err(|e| ApiError::Network(e.to_string()))?; + + if !resp.status().is_success() { + return Err(ApiError::Api(format!("Move failed: {}", resp.status()))); + } + + Ok(()) + } +} diff --git a/src/infrastructure/db.rs b/src/infrastructure/db.rs index 0ad54b7..f1c621c 100644 --- a/src/infrastructure/db.rs +++ b/src/infrastructure/db.rs @@ -1,10 +1,12 @@ -use rusqlite::{params, Connection, Result as SqlResult}; +use std::sync::Mutex; + use chrono::NaiveDateTime; +use rusqlite::{params, Connection, Result as SqlResult}; use crate::domain::models::*; pub struct Db { - conn: Connection, + conn: Mutex, } impl Db { @@ -38,11 +40,12 @@ impl Db { created_at TEXT NOT NULL );", )?; - Ok(Self { conn }) + Ok(Self { conn: Mutex::new(conn) }) } pub fn get_lists(&self) -> Vec { - let mut stmt = self.conn + let conn = self.conn.lock().unwrap(); + let mut stmt = conn .prepare("SELECT id, title FROM task_lists ORDER BY title") .unwrap(); stmt.query_map([], |row| { @@ -57,7 +60,8 @@ impl Db { } pub fn insert_list(&self, list: &TaskList) -> SqlResult<()> { - self.conn.execute( + let conn = self.conn.lock().unwrap(); + conn.execute( "INSERT OR REPLACE INTO task_lists (id, title) VALUES (?1, ?2)", params![list.id, list.title], )?; @@ -65,13 +69,15 @@ impl Db { } pub fn delete_list(&self, list_id: &str) -> SqlResult<()> { - self.conn.execute("DELETE FROM tasks WHERE list_id = ?1", params![list_id])?; - self.conn.execute("DELETE FROM task_lists WHERE id = ?1", params![list_id])?; + let conn = self.conn.lock().unwrap(); + conn.execute("DELETE FROM tasks WHERE list_id = ?1", params![list_id])?; + conn.execute("DELETE FROM task_lists WHERE id = ?1", params![list_id])?; Ok(()) } pub fn get_tasks(&self, list_id: &str) -> Vec { - let mut stmt = self.conn + let conn = self.conn.lock().unwrap(); + let mut stmt = conn .prepare( "SELECT id, list_id, title, notes, status, due, position FROM tasks WHERE list_id = ?1 ORDER BY position ASC", @@ -104,19 +110,18 @@ impl Db { TaskStatus::Completed => "completed", TaskStatus::NeedsAction => "needsAction", }; + let conn = self.conn.lock().unwrap(); let position = if task.position == 0 { - let max: i64 = self.conn - .query_row( - "SELECT COALESCE(MAX(position), -1) + 1 FROM tasks WHERE list_id = ?1", - params![task.list_id], - |row| row.get(0), - ) - .unwrap_or(0); - max + conn.query_row( + "SELECT COALESCE(MAX(position), -1) + 1 FROM tasks WHERE list_id = ?1", + params![task.list_id], + |row| row.get(0), + ) + .unwrap_or(0) } else { task.position }; - self.conn.execute( + conn.execute( "INSERT OR REPLACE INTO tasks (id, list_id, title, notes, status, due, position, updated_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", params![ @@ -139,7 +144,8 @@ impl Db { TaskStatus::Completed => "completed", TaskStatus::NeedsAction => "needsAction", }; - self.conn.execute( + let conn = self.conn.lock().unwrap(); + conn.execute( "UPDATE tasks SET title=?1, notes=?2, status=?3, due=?4, position=?5, updated_at=?6 WHERE id=?7", params![ @@ -156,12 +162,14 @@ impl Db { } pub fn delete_task(&self, task_id: &str) -> SqlResult<()> { - self.conn.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?; + let conn = self.conn.lock().unwrap(); + conn.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?; Ok(()) } pub fn reorder_task(&self, task_id: &str, new_position: i64) -> SqlResult<()> { - let (old_position, list_id): (i64, String) = self.conn.query_row( + let conn = self.conn.lock().unwrap(); + let (old_position, list_id): (i64, String) = conn.query_row( "SELECT position, list_id FROM tasks WHERE id = ?1", params![task_id], |row| Ok((row.get(0)?, row.get(1)?)), @@ -172,20 +180,20 @@ impl Db { } if new_position > old_position { - self.conn.execute( + conn.execute( "UPDATE tasks SET position = position - 1 WHERE list_id = ?1 AND position > ?2 AND position <= ?3", params![list_id, old_position, new_position], )?; } else { - self.conn.execute( + conn.execute( "UPDATE tasks SET position = position + 1 WHERE list_id = ?1 AND position >= ?2 AND position < ?3", params![list_id, new_position, old_position], )?; } - self.conn.execute( + conn.execute( "UPDATE tasks SET position = ?1 WHERE id = ?2", params![new_position, task_id], )?; @@ -194,7 +202,9 @@ impl Db { } pub fn replace_all_lists(&self, lists: &[TaskList]) -> SqlResult<()> { - self.conn.execute("DELETE FROM task_lists", [])?; + let conn = self.conn.lock().unwrap(); + conn.execute("DELETE FROM task_lists", [])?; + drop(conn); for list in lists { self.insert_list(list)?; } @@ -202,7 +212,9 @@ impl Db { } pub fn replace_all_tasks(&self, list_id: &str, tasks: &[Task]) -> SqlResult<()> { - self.conn.execute("DELETE FROM tasks WHERE list_id = ?1", params![list_id])?; + let conn = self.conn.lock().unwrap(); + conn.execute("DELETE FROM tasks WHERE list_id = ?1", params![list_id])?; + drop(conn); for (i, task) in tasks.iter().enumerate() { let mut t = task.clone(); if t.position == 0 { @@ -215,8 +227,14 @@ impl Db { } pub fn push_sync(&self, action: SyncAction, task_id: &str, list_id: &str, payload: &str) -> SqlResult<()> { - let action_str = serde_json::to_string(&action).unwrap_or_default(); - self.conn.execute( + let action_str = match action { + SyncAction::Create => "Create", + SyncAction::Update => "Update", + SyncAction::Delete => "Delete", + SyncAction::Reorder => "Reorder", + }; + let conn = self.conn.lock().unwrap(); + conn.execute( "INSERT INTO sync_queue (action, task_id, list_id, payload, created_at) VALUES (?1, ?2, ?3, ?4, ?5)", params![ @@ -231,13 +249,20 @@ impl Db { } pub fn drain_sync(&self) -> Vec { + let conn = self.conn.lock().unwrap(); let items: Vec = { - let mut stmt = self.conn + let mut stmt = conn .prepare("SELECT id, action, task_id, list_id, payload, created_at FROM sync_queue ORDER BY id") .unwrap(); stmt.query_map([], |row| { let action_str: String = row.get(1)?; - let action: SyncAction = serde_json::from_str(&action_str).unwrap_or(SyncAction::Update); + let action = match action_str.as_str() { + "\"Create\"" | "Create" => SyncAction::Create, + "\"Update\"" | "Update" => SyncAction::Update, + "\"Delete\"" | "Delete" => SyncAction::Delete, + "\"Reorder\"" | "Reorder" => SyncAction::Reorder, + _ => SyncAction::Update, + }; Ok(SyncQueueItem { id: row.get(0)?, action, @@ -253,7 +278,7 @@ impl Db { }; if !items.is_empty() { - self.conn.execute("DELETE FROM sync_queue", []).unwrap(); + conn.execute("DELETE FROM sync_queue", []).unwrap(); } items diff --git a/src/main.rs b/src/main.rs index c26e983..013661b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,16 +4,20 @@ mod infrastructure; mod ui; use std::io; +use std::sync::Arc; use crossterm::event::{self, Event}; use crossterm::terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}; use crossterm::ExecutableCommand; use ratatui::backend::CrosstermBackend; use ratatui::Terminal; +use tokio::sync::Mutex; -use crate::app::App; +use crate::app::{App, SyncCommand}; +use crate::domain::models::*; +use crate::infrastructure::api::ApiClient; use crate::infrastructure::db::Db; -use crate::ui::{draw, AppView}; +use crate::ui::{draw, AppView, NetworkStatus}; fn main() -> io::Result<()> { let db_path = dirs::data_dir() @@ -23,7 +27,7 @@ fn main() -> io::Result<()> { std::fs::create_dir_all(db_path.parent().unwrap()).ok(); - let db = Db::new(db_path.to_str().unwrap()).expect("Failed to open database"); + let db = Arc::new(Db::new(db_path.to_str().unwrap()).expect("Failed to open database")); enable_raw_mode()?; let mut stdout = io::stdout(); @@ -31,10 +35,35 @@ fn main() -> io::Result<()> { let backend = CrosstermBackend::new(stdout); let mut terminal = Terminal::new(backend)?; - let mut app = App::new(db); + let api_client = Arc::new(ApiClient::new( + std::env::var("GOOGLE_CLIENT_ID").unwrap_or_default(), + std::env::var("GOOGLE_CLIENT_SECRET").unwrap_or_default(), + )); + + let network_status = Arc::new(Mutex::new(NetworkStatus::Online)); + let (sync_tx, mut sync_rx) = tokio::sync::mpsc::channel::(32); + + let mut app = App::new(db.clone(), api_client.clone(), sync_tx.clone()); + + let network_clone = network_status.clone(); + let db_clone = db.clone(); + let api_clone = api_client.clone(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { + run_sync_engine(db_clone, api_clone, network_clone, &mut sync_rx).await; + }); + }); while !app.should_quit { terminal.draw(|frame| { + let status = { + let guard = network_status.blocking_lock(); + guard.clone() + }; + app.network_status = status; + let view = AppView { lists: &app.lists, tasks: &app.tasks, @@ -61,3 +90,102 @@ fn main() -> io::Result<()> { io::stdout().execute(LeaveAlternateScreen)?; Ok(()) } + +async fn run_sync_engine( + db: Arc, + api: Arc, + network_status: Arc>, + rx: &mut tokio::sync::mpsc::Receiver, +) { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); + + loop { + tokio::select! { + _ = interval.tick() => { + process_sync_queue(&db, &api, &network_status).await; + } + cmd = rx.recv() => { + match cmd { + Some(SyncCommand::TriggerSync) => { + process_sync_queue(&db, &api, &network_status).await; + } + Some(SyncCommand::Shutdown) | None => break, + } + } + } + } +} + +async fn process_sync_queue( + db: &Arc, + api: &Arc, + network_status: &Arc>, +) { + let token_available = api.load_token().await.is_some(); + if !token_available { + *network_status.lock().await = NetworkStatus::Offline; + return; + } + + let items = db.drain_sync(); + + if items.is_empty() { + *network_status.lock().await = NetworkStatus::Online; + return; + } + + *network_status.lock().await = NetworkStatus::Syncing; + + let mut all_ok = true; + for item in &items { + let result = match item.action { + SyncAction::Create => { + let task = serde_json::from_str::(&item.payload).unwrap_or_else(|_| Task { + id: item.task_id.clone(), + list_id: item.list_id.clone(), + title: String::new(), + notes: None, + status: TaskStatus::NeedsAction, + due: None, + position: 0, + }); + api.create_task(&item.list_id, &task).await + } + SyncAction::Update => { + let task = serde_json::from_str::(&item.payload).unwrap_or_else(|_| Task { + id: item.task_id.clone(), + list_id: item.list_id.clone(), + title: String::new(), + notes: None, + status: TaskStatus::NeedsAction, + due: None, + position: 0, + }); + api.update_task(&item.list_id, &task).await + } + SyncAction::Delete => { + api.delete_task(&item.list_id, &item.task_id).await + } + SyncAction::Reorder => { + api.move_task(&item.list_id, &item.task_id, None, None).await + } + }; + + if result.is_err() { + let _ = db.push_sync( + item.action.clone(), + &item.task_id, + &item.list_id, + &item.payload, + ); + all_ok = false; + break; + } + } + + *network_status.lock().await = if all_ok { + NetworkStatus::Online + } else { + NetworkStatus::Offline + }; +}