epik/downloader/
queue.rs

1// Download Queue Management for v1.2.0
2// Supports priority queue, pause/resume, scheduling, and bandwidth allocation
3
4use crate::{Error, Result};
5use chrono::{Datelike, Timelike};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11// Download priority levels
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
13pub enum DownloadPriority {
14    Low = 1,
15    Normal = 2,
16    High = 3,
17    Critical = 4,
18}
19
20// Download status for queue items
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22pub enum DownloadStatus {
23    Queued,
24    Downloading,
25    Paused,
26    Completed,
27    Failed(String),
28    Cancelled,
29}
30
31// Scheduled download time window
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct ScheduleWindow {
34    // Start hour (0-23)
35    pub start_hour: u8,
36    // End hour (0-23)
37    pub end_hour: u8,
38    // Days of week (0=Monday, 6=Sunday), empty means daily
39    pub days: Vec<u8>,
40}
41
42impl ScheduleWindow {
43    // Check if current time is within window
44    pub fn is_active_now(&self) -> bool {
45        let now = chrono::Local::now();
46        let current_hour = now.hour() as u8;
47        let current_day = now.weekday().number_from_monday() as u8 - 1;
48
49        let hour_ok = if self.start_hour <= self.end_hour {
50            current_hour >= self.start_hour && current_hour < self.end_hour
51        } else {
52            // Handle overnight window (e.g., 22:00-06:00)
53            current_hour >= self.start_hour || current_hour < self.end_hour
54        };
55
56        if !hour_ok {
57            return false;
58        }
59
60        if self.days.is_empty() {
61            return true;
62        }
63
64        self.days.contains(&current_day)
65    }
66}
67
68// Queue item with metadata
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct QueueItem {
71    pub id: String,
72    pub app_name: String,
73    pub title: String,
74    pub priority: DownloadPriority,
75    pub status: DownloadStatus,
76    pub size_bytes: u64,
77    pub downloaded_bytes: u64,
78    pub schedule: Option<ScheduleWindow>,
79    pub created_at: chrono::DateTime<chrono::Utc>,
80    pub started_at: Option<chrono::DateTime<chrono::Utc>>,
81    pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
82    // Maximum bandwidth for this download in Mbps (None = no limit)
83    pub bandwidth_limit_mbps: Option<u32>,
84}
85
86impl QueueItem {
87    pub fn new(app_name: String, title: String, size_bytes: u64) -> Self {
88        Self {
89            id: uuid::Uuid::new_v4().to_string(),
90            app_name,
91            title,
92            priority: DownloadPriority::Normal,
93            status: DownloadStatus::Queued,
94            size_bytes,
95            downloaded_bytes: 0,
96            schedule: None,
97            created_at: chrono::Utc::now(),
98            started_at: None,
99            completed_at: None,
100            bandwidth_limit_mbps: None,
101        }
102    }
103
104    pub fn with_priority(mut self, priority: DownloadPriority) -> Self {
105        self.priority = priority;
106        self
107    }
108
109    pub fn with_schedule(mut self, schedule: ScheduleWindow) -> Self {
110        self.schedule = Some(schedule);
111        self
112    }
113
114    pub fn with_bandwidth_limit(mut self, mbps: u32) -> Self {
115        self.bandwidth_limit_mbps = Some(mbps);
116        self
117    }
118
119    pub fn progress_percent(&self) -> f32 {
120        if self.size_bytes == 0 {
121            100.0
122        } else {
123            (self.downloaded_bytes as f32 / self.size_bytes as f32) * 100.0
124        }
125    }
126
127    pub fn is_scheduled(&self) -> bool {
128        self.schedule
129            .as_ref()
130            .map(|s| !s.is_active_now())
131            .unwrap_or(false)
132    }
133}
134
135// Download Queue Manager
136pub struct DownloadQueue {
137    items: Arc<RwLock<HashMap<String, QueueItem>>>,
138    download_order: Arc<RwLock<Vec<String>>>,
139    // Maximum concurrent downloads
140    _max_concurrent: usize,
141    // Global bandwidth limit in Mbps (None = no limit)
142    _global_bandwidth_limit_mbps: Option<u32>,
143}
144
145impl DownloadQueue {
146    pub fn new(max_concurrent: usize, global_bandwidth_limit: Option<u32>) -> Self {
147        Self {
148            items: Arc::new(RwLock::new(HashMap::new())),
149            download_order: Arc::new(RwLock::new(Vec::new())),
150            _max_concurrent: max_concurrent,
151            _global_bandwidth_limit_mbps: global_bandwidth_limit,
152        }
153    }
154
155    // Add item to queue
156    pub async fn enqueue(&self, item: QueueItem) -> Result<String> {
157        let id = item.id.clone();
158        let mut items = self.items.write().await;
159        let mut order = self.download_order.write().await;
160
161        items.insert(id.clone(), item);
162        order.push(id.clone());
163        self.resort_queue(&mut order, &items).await;
164
165        Ok(id)
166    }
167
168    // Remove item from queue
169    pub async fn dequeue(&self, id: &str) -> Result<Option<QueueItem>> {
170        let mut items = self.items.write().await;
171        let mut order = self.download_order.write().await;
172
173        order.retain(|item_id| item_id != id);
174        Ok(items.remove(id))
175    }
176
177    // Pause a download
178    pub async fn pause(&self, id: &str) -> Result<()> {
179        let mut items = self.items.write().await;
180        if let Some(item) = items.get_mut(id) {
181            match item.status {
182                DownloadStatus::Downloading | DownloadStatus::Queued => {
183                    item.status = DownloadStatus::Paused;
184                    Ok(())
185                }
186                _ => Err(Error::Other(
187                    "Can only pause downloading or queued downloads".to_string(),
188                )),
189            }
190        } else {
191            Err(Error::Other(format!("Queue item not found: {}", id)))
192        }
193    }
194
195    // Resume a paused download
196    pub async fn resume(&self, id: &str) -> Result<()> {
197        let mut items = self.items.write().await;
198        if let Some(item) = items.get_mut(id) {
199            if item.status == DownloadStatus::Paused {
200                item.status = DownloadStatus::Queued;
201            }
202            Ok(())
203        } else {
204            Err(Error::Other(format!("Queue item not found: {}", id)))
205        }
206    }
207
208    // Cancel a download
209    pub async fn cancel(&self, id: &str) -> Result<()> {
210        let mut items = self.items.write().await;
211        if let Some(item) = items.get_mut(id) {
212            item.status = DownloadStatus::Cancelled;
213            Ok(())
214        } else {
215            Err(Error::Other(format!("Queue item not found: {}", id)))
216        }
217    }
218
219    // Update progress for a download
220    pub async fn update_progress(&self, id: &str, downloaded_bytes: u64) -> Result<()> {
221        let mut items = self.items.write().await;
222        if let Some(item) = items.get_mut(id) {
223            item.downloaded_bytes = downloaded_bytes.min(item.size_bytes);
224            Ok(())
225        } else {
226            Err(Error::Other(format!("Queue item not found: {}", id)))
227        }
228    }
229
230    // Mark download as completed
231    pub async fn mark_completed(&self, id: &str) -> Result<()> {
232        let mut items = self.items.write().await;
233        if let Some(item) = items.get_mut(id) {
234            item.status = DownloadStatus::Completed;
235            item.downloaded_bytes = item.size_bytes;
236            item.completed_at = Some(chrono::Utc::now());
237            Ok(())
238        } else {
239            Err(Error::Other(format!("Queue item not found: {}", id)))
240        }
241    }
242
243    // Mark download as failed
244    pub async fn mark_failed(&self, id: &str, error: String) -> Result<()> {
245        let mut items = self.items.write().await;
246        if let Some(item) = items.get_mut(id) {
247            item.status = DownloadStatus::Failed(error);
248            Ok(())
249        } else {
250            Err(Error::Other(format!("Queue item not found: {}", id)))
251        }
252    }
253
254    // Get item by ID
255    pub async fn get(&self, id: &str) -> Result<Option<QueueItem>> {
256        let items = self.items.read().await;
257        Ok(items.get(id).cloned())
258    }
259
260    // Get all items
261    pub async fn list_all(&self) -> Result<Vec<QueueItem>> {
262        let items = self.items.read().await;
263        let order = self.download_order.read().await;
264
265        let mut result = Vec::new();
266        for id in order.iter() {
267            if let Some(item) = items.get(id) {
268                result.push(item.clone());
269            }
270        }
271        Ok(result)
272    }
273
274    // Get next download item to process (respecting priority and scheduling)
275    pub async fn get_next_to_download(&self) -> Result<Option<QueueItem>> {
276        let items = self.items.read().await;
277        let order = self.download_order.read().await;
278
279        for id in order.iter() {
280            if let Some(item) = items.get(id) {
281                match item.status {
282                    DownloadStatus::Queued => {
283                        // Check if scheduled and should wait
284                        if item.is_scheduled() {
285                            continue;
286                        }
287                        return Ok(Some(item.clone()));
288                    }
289                    DownloadStatus::Paused => {
290                        // Skip paused items
291                        continue;
292                    }
293                    _ => continue,
294                }
295            }
296        }
297        Ok(None)
298    }
299
300    // Get active downloads count
301    pub async fn active_count(&self) -> usize {
302        let items = self.items.read().await;
303        items
304            .values()
305            .filter(|item| item.status == DownloadStatus::Downloading)
306            .count()
307    }
308
309    // Get queued downloads count
310    pub async fn queued_count(&self) -> usize {
311        let items = self.items.read().await;
312        items
313            .values()
314            .filter(|item| item.status == DownloadStatus::Queued)
315            .count()
316    }
317
318    // Reorder queue based on priority
319    async fn resort_queue(&self, order: &mut Vec<String>, items: &HashMap<String, QueueItem>) {
320        order.sort_by(|a, b| {
321            let item_a = items
322                .get(a)
323                .map(|i| i.priority)
324                .unwrap_or(DownloadPriority::Normal);
325            let item_b = items
326                .get(b)
327                .map(|i| i.priority)
328                .unwrap_or(DownloadPriority::Normal);
329            item_b.cmp(&item_a) // Reverse for descending order
330        });
331    }
332
333    // Set priority for a queued item
334    pub async fn set_priority(&self, id: &str, priority: DownloadPriority) -> Result<()> {
335        let mut items = self.items.write().await;
336        if let Some(item) = items.get_mut(id) {
337            if item.status == DownloadStatus::Queued || item.status == DownloadStatus::Paused {
338                item.priority = priority;
339                let mut order = self.download_order.write().await;
340                self.resort_queue(&mut order, &items).await;
341                Ok(())
342            } else {
343                Err(Error::Other(
344                    "Can only change priority of queued or paused downloads".to_string(),
345                ))
346            }
347        } else {
348            Err(Error::Other(format!("Queue item not found: {}", id)))
349        }
350    }
351
352    // Get queue statistics
353    pub async fn get_stats(&self) -> Result<QueueStats> {
354        let items = self.items.read().await;
355
356        let mut total_size = 0u64;
357        let mut total_downloaded = 0u64;
358        let mut queued = 0;
359        let mut downloading = 0;
360        let mut paused = 0;
361        let mut completed = 0;
362        let mut failed = 0;
363
364        for item in items.values() {
365            total_size += item.size_bytes;
366            total_downloaded += item.downloaded_bytes;
367
368            match item.status {
369                DownloadStatus::Queued => queued += 1,
370                DownloadStatus::Downloading => downloading += 1,
371                DownloadStatus::Paused => paused += 1,
372                DownloadStatus::Completed => completed += 1,
373                DownloadStatus::Failed(_) => failed += 1,
374                DownloadStatus::Cancelled => {}
375            }
376        }
377
378        Ok(QueueStats {
379            total_items: items.len(),
380            queued,
381            downloading,
382            paused,
383            completed,
384            failed,
385            total_size_bytes: total_size,
386            total_downloaded_bytes: total_downloaded,
387            average_progress_percent: if items.is_empty() {
388                0.0
389            } else {
390                (total_downloaded as f32 / total_size.max(1) as f32) * 100.0
391            },
392        })
393    }
394}
395
396// Queue statistics
397#[derive(Debug, Clone, Serialize, Deserialize)]
398pub struct QueueStats {
399    pub total_items: usize,
400    pub queued: usize,
401    pub downloading: usize,
402    pub paused: usize,
403    pub completed: usize,
404    pub failed: usize,
405    pub total_size_bytes: u64,
406    pub total_downloaded_bytes: u64,
407    pub average_progress_percent: f32,
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413
414    #[tokio::test]
415    async fn test_enqueue() {
416        let queue = DownloadQueue::new(4, None);
417        let item = QueueItem::new("test_game".to_string(), "Test Game".to_string(), 1024);
418
419        let id = queue.enqueue(item).await.unwrap();
420        let retrieved = queue.get(&id).await.unwrap();
421
422        assert!(retrieved.is_some());
423        assert_eq!(retrieved.unwrap().status, DownloadStatus::Queued);
424    }
425
426    #[tokio::test]
427    async fn test_pause_resume() {
428        let queue = DownloadQueue::new(4, None);
429        let item = QueueItem::new("test_game".to_string(), "Test Game".to_string(), 1024);
430        let id = queue.enqueue(item).await.unwrap();
431
432        // Simulate download
433        let mut item = queue.get(&id).await.unwrap().unwrap();
434        item.status = DownloadStatus::Downloading;
435        let mut items = queue.items.write().await;
436        items.insert(id.clone(), item);
437        drop(items);
438
439        queue.pause(&id).await.unwrap();
440        let item = queue.get(&id).await.unwrap().unwrap();
441        assert_eq!(item.status, DownloadStatus::Paused);
442
443        queue.resume(&id).await.unwrap();
444        let item = queue.get(&id).await.unwrap().unwrap();
445        assert_eq!(item.status, DownloadStatus::Queued);
446    }
447
448    #[tokio::test]
449    async fn test_priority() {
450        let queue = DownloadQueue::new(4, None);
451        let item1 = QueueItem::new("game1".to_string(), "Game 1".to_string(), 1024);
452        let item2 = QueueItem::new("game2".to_string(), "Game 2".to_string(), 2048)
453            .with_priority(DownloadPriority::High);
454
455        let id1 = queue.enqueue(item1).await.unwrap();
456        let id2 = queue.enqueue(item2).await.unwrap();
457
458        let all = queue.list_all().await.unwrap();
459        // High priority should come first
460        assert_eq!(all[0].id, id2);
461        assert_eq!(all[1].id, id1);
462    }
463}