Skip to content

Commit da7c64b

Browse files
committed
feat: generate anvil-index.json during HF ingestion
- Added and to persistence. - Updated worker to track file metadata (size, etag) and generate at the end of ingestion. - Implemented retry logic for index file upload.
1 parent a1d06ea commit da7c64b

4 files changed

Lines changed: 134 additions & 12 deletions

File tree

anvil-core/src/persistence.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,6 +1403,33 @@ impl Persistence {
14031403
Ok(())
14041404
}
14051405

1406+
pub async fn hf_update_item_success(
1407+
&self,
1408+
id: i64,
1409+
size: i64,
1410+
etag: &str,
1411+
) -> Result<()> {
1412+
let client = self.global_pool.get().await?;
1413+
client
1414+
.execute(
1415+
r#"UPDATE hf_ingestion_items SET state='stored'::hf_item_state, size=$2, etag=$3, finished_at=now() WHERE id=$1"#,
1416+
&[&id, &size, &etag],
1417+
)
1418+
.await?;
1419+
Ok(())
1420+
}
1421+
1422+
pub async fn hf_get_ingestion_items(&self, ingestion_id: i64) -> Result<Vec<(String, Option<i64>, Option<String>, Option<DateTime<Utc>>)>> {
1423+
let client = self.global_pool.get().await?;
1424+
let rows = client
1425+
.query(
1426+
"SELECT path, size, etag, finished_at FROM hf_ingestion_items WHERE ingestion_id=$1 AND state='stored'::hf_item_state",
1427+
&[&ingestion_id],
1428+
)
1429+
.await?;
1430+
Ok(rows.into_iter().map(|r| (r.get(0), r.get(1), r.get(2), r.get(3))).collect())
1431+
}
1432+
14061433
pub async fn hf_status_summary(
14071434
&self,
14081435
id: i64,

anvil-core/src/worker.rs

Lines changed: 105 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,20 @@ use crate::cluster::ClusterState;
55
use crate::object_manager::ObjectManager;
66
use crate::persistence::Persistence;
77
use crate::tasks::{HFIngestionItemState, HFIngestionState, TaskStatus, TaskType};
8+
use crate::persistence::Object;
89
use anyhow::{Result, anyhow};
910
use serde::Deserialize;
10-
use serde_json::Value as JsonValue;
11+
use serde_json::{Value as JsonValue, json};
12+
use std::collections::HashMap;
13+
use std::convert::Infallible;
14+
use std::boxed::Box;
15+
use std::pin::Pin;
1116
use std::sync::Arc;
1217
use std::time::Duration;
1318
use tokio_postgres::Row;
1419
use tonic::Status;
1520
use tracing::{debug, error, info, warn};
21+
use futures_util::{StreamExt, Stream};
1622

1723
#[derive(Debug)]
1824
struct Task {
@@ -322,11 +328,11 @@ async fn handle_hf_ingestion(
322328
.put_object(tenant_id, &target_bucket, &full_key, &scopes, reader)
323329
.await;
324330
match res {
325-
Ok(_obj) => {
326-
info!(key = %full_key, "Upload successful");
327-
break;
328-
}
329-
Err(e) if attempt < 3 => {
331+
Ok(obj) => {
332+
info!(key = %full_key, "Upload successful");
333+
persistence.hf_update_item_success(item_id, obj.size, &obj.etag).await?;
334+
break;
335+
} Err(e) if attempt < 3 => {
330336
warn!(
331337
attempt,
332338
key = %full_key,
@@ -351,16 +357,105 @@ async fn handle_hf_ingestion(
351357
}
352358
}
353359
}
354-
persistence
355-
.hf_update_item_state(item_id, HFIngestionItemState::Stored, None)
356-
.await?;
357-
debug!(item_id, "Item state set to stored.");
360+
358361
}
359362

360363
info!(
361364
ingestion_id,
362365
"Ingestion task completed successfully."
363366
);
367+
368+
// --- Generate and upload anvil-index.json ---
369+
let items = persistence.hf_get_ingestion_items(ingestion_id).await?;
370+
let mut file_map = HashMap::new();
371+
let mut total_bytes = 0;
372+
for (path, size_opt, etag_opt, finished_at_opt) in items {
373+
let mut meta = json!({});
374+
if let Some(s) = size_opt {
375+
meta["size"] = json!(s);
376+
total_bytes += s;
377+
}
378+
if let Some(e) = etag_opt {
379+
meta["etag"] = json!(e);
380+
}
381+
if let Some(f) = finished_at_opt {
382+
meta["last_modified"] = json!(f.to_rfc3339());
383+
}
384+
file_map.insert(path, meta);
385+
}
386+
387+
let index_json = json!({
388+
"meta": {
389+
"source_repo": repo_str,
390+
"revision": revision,
391+
"generated_at": chrono::Utc::now().to_rfc3339(),
392+
"total_files": file_map.len(),
393+
"total_bytes": total_bytes
394+
},
395+
"files": file_map,
396+
});
397+
398+
let index_content_data = serde_json::to_vec_pretty(&index_json)?;
399+
let index_key = if target_prefix.is_empty() {
400+
"anvil-index.json".to_string()
401+
} else {
402+
format!("{}/anvil-index.json", target_prefix.trim_end_matches('/'))
403+
};
404+
info!(index_key = %index_key, "Uploading anvil-index.json");
405+
406+
// Upload index file, using retry logic adapted from above for robustness
407+
let mut attempt = 0;
408+
loop {
409+
attempt += 1;
410+
info!("Putting anvil-index.json, attempt {}", attempt);
411+
let current_index_content = index_content_data.clone();
412+
let index_stream: Pin<Box<dyn Stream<Item = Result<Vec<u8>, Status>> + Send + 'static>> = Box::pin(
413+
futures_util::stream::once(async move {
414+
Ok(current_index_content)
415+
})
416+
.map(|item: Result<Vec<u8>, Infallible>| {
417+
item.map_err(|e| match e {})
418+
})
419+
);
420+
421+
let res: Result<Object, Status> = object_manager.put_object(
422+
tenant_id,
423+
&target_bucket,
424+
&index_key,
425+
&vec!["object:write|*".to_string()], // Scopes
426+
index_stream,
427+
).await;
428+
match res {
429+
Ok(_) => {
430+
info!(key = %index_key, "anvil-index.json upload successful");
431+
break;
432+
}
433+
Err(e) if attempt < 3 => {
434+
warn!(
435+
attempt,
436+
key = %index_key,
437+
error = %e.to_string(),
438+
"anvil-index.json upload attempt failed. Retrying..."
439+
);
440+
let jitter = (rand::random::<u64>() % 200) as u64;
441+
tokio::time::sleep(std::time::Duration::from_millis(
442+
500 * attempt as u64 + jitter,
443+
))
444+
.await;
445+
continue;
446+
}
447+
Err(e) => {
448+
error!(
449+
key = %index_key,
450+
error = %e,
451+
"anvil-index.json upload failed permanently"
452+
);
453+
return Err(anyhow::anyhow!(e.to_string()));
454+
}
455+
}
456+
}
457+
// --- End anvil-index.json upload ---
458+
364459
info!(ingestion_id, "Updating ingestion state to completed.");
365460
persistence
366461
.hf_update_ingestion_state(ingestion_id, HFIngestionState::Completed, None)

docs/admin/01-deployment.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ This guide covers the fundamentals of deploying Anvil. The architecture is flexi
2121

2222
A single-node deployment is the simplest way to run Anvil and is perfect for development and testing. It consists of one Anvil instance and its required PostgreSQL databases.
2323

24-
For a complete, working `docker-compose.yml` and a step-by-step tutorial for this setup, please refer to the [**Getting Started Guide**](../fundamentals/getting-started).
24+
For a complete, working `docker-compose.yml` and a step-by-step tutorial for this setup, please refer to the getting started guide.
2525

2626
## 3. Multi-Node Deployment (Single Region)
2727

docs/scenarios/04-s3-gateway.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ tags: [scenario, s3, aws-cli, rclone, sdk]
1111
1212
One of Anvil's most powerful features is its S3-compatible API gateway. This allows you to leverage the vast ecosystem of existing S3 tools, libraries, and SDKs to interact with your Anvil cluster without needing to write any custom code.
1313

14-
> **Note:** While this guide focuses on S3-compatible tools, the `anvil` is the recommended primary interface for most operations. See the [Getting Started](./getting-started) guide for `anvil` examples.
14+
> **Note:** While this guide focuses on S3-compatible tools, the `anvil` is the recommended primary interface for most operations. See the getting started guide for `anvil` examples.
1515
1616
### 4.1. Configuring S3 Clients
1717

0 commit comments

Comments
 (0)