From 2ae6046fae722a016622839407c104ad20f3e691 Mon Sep 17 00:00:00 2001 From: Alessandro Gallo Date: Wed, 17 Jun 2026 17:09:14 +0200 Subject: [PATCH] [CRE-1299] - support white listed requests --- libs/opsqueue_python/src/common.rs | 13 +- opsqueue/src/common/submission.rs | 234 +++++++++++++++++++++++++---- 2 files changed, 214 insertions(+), 33 deletions(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index f19eaa6..e5d2d92 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -386,6 +386,7 @@ pub struct Submission { pub chunks_total: u64, pub chunks_done: u64, pub metadata: Option, + pub strategic_metadata: StrategicMetadataMap, } impl From for Submission { @@ -395,6 +396,7 @@ impl From for Submission { chunks_total: value.chunks_total.into(), chunks_done: value.chunks_done.into(), metadata: value.metadata, + strategic_metadata: value.strategic_metadata, } } } @@ -403,11 +405,12 @@ impl From for Submission { impl Submission { fn __repr__(&self) -> String { format!( - "Submission(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?})", + "Submission(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, strategic_metadata={4:?})", self.id.__repr__(), self.chunks_total, self.chunks_done, - self.metadata + self.metadata, + self.strategic_metadata ) } } @@ -448,7 +451,7 @@ pub struct SubmissionCompleted { pub id: SubmissionId, pub chunks_total: u64, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub completed_at: DateTime, } @@ -459,7 +462,7 @@ pub struct SubmissionFailed { pub chunks_total: u64, pub chunks_done: Option, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub failed_at: DateTime, pub failed_chunk_id: u64, } @@ -471,7 +474,7 @@ pub struct SubmissionCancelled { pub chunks_total: u64, pub chunks_done: u64, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub cancelled_at: DateTime, } diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 69040f1..3c18bcf 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -151,6 +151,7 @@ pub struct Submission { pub chunks_done: ChunkCount, pub chunk_size: ChunkSize, pub metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub otel_trace_carrier: String, } @@ -166,7 +167,7 @@ pub struct SubmissionCompleted { pub chunks_total: ChunkCount, pub chunk_size: ChunkSize, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub completed_at: DateTime, pub otel_trace_carrier: String, } @@ -179,7 +180,7 @@ pub struct SubmissionFailed { pub chunks_done: Option, pub chunk_size: ChunkSize, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub failed_at: DateTime, pub failed_chunk_id: ChunkIndex, pub otel_trace_carrier: String, @@ -196,7 +197,7 @@ pub struct SubmissionCancelled { pub chunks_total: ChunkCount, pub chunks_done: ChunkCount, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub cancelled_at: DateTime, } @@ -224,6 +225,7 @@ impl Submission { chunks_done: ChunkCount::zero(), chunk_size: ChunkSize::default(), metadata: None, + strategic_metadata: StrategicMetadataMap::default(), otel_trace_carrier, } } @@ -243,6 +245,7 @@ impl Submission { chunks_done: ChunkCount::zero(), chunk_size, metadata, + strategic_metadata: StrategicMetadataMap::default(), otel_trace_carrier, }; let chunks = chunks @@ -268,7 +271,7 @@ pub mod db { db::{Connection, True, WriterConnection, WriterPool}, }; use chunk::ChunkSize; - use sqlx::{query, query_as, Sqlite}; + use sqlx::{query, Sqlite}; use axum_prometheus::metrics::{counter, histogram}; @@ -366,7 +369,6 @@ pub mod db { pub(crate) async fn insert_submission( submission: Submission, chunks: Vec, - strategic_metadata: StrategicMetadataMap, mut conn: impl WriterConnection, ) -> Result<(), DatabaseError> { use axum_prometheus::metrics::counter; @@ -379,12 +381,16 @@ pub mod db { .transaction(move |mut tx| { async move { insert_submission_raw(&submission, &mut tx).await?; - insert_submission_metadata_raw(&submission, &strategic_metadata, &mut tx) - .await?; + insert_submission_metadata_raw( + &submission, + &submission.strategic_metadata, + &mut tx, + ) + .await?; super::chunk::db::insert_many_chunks(&chunks, &mut tx).await?; super::chunk::db::insert_many_chunks_metadata( &chunks, - &strategic_metadata, + &submission.strategic_metadata, &mut tx, ) .await?; @@ -421,6 +427,7 @@ pub mod db { chunks_done: ChunkCount::zero(), chunk_size, metadata, + strategic_metadata, otel_trace_carrier, }; let iter = chunks_contents @@ -431,7 +438,7 @@ pub mod db { Chunk::new(submission_id, chunk_index.try_into().unwrap(), uri) }) .collect(); - insert_submission(submission, iter, strategic_metadata, &mut conn).await?; + insert_submission(submission, iter, &mut conn).await?; // Empty submissions get special handling: we mark them as completed right away. // See https://github.com/channable/opsqueue/issues/86 for rationale. if len == 0 { @@ -461,15 +468,18 @@ pub mod db { id: SubmissionId, mut conn: impl Connection, ) -> Result> { - let submission = query_as!( - Submission, + let submission_row = query!( r#" SELECT id AS "id: SubmissionId" , prefix , chunks_total AS "chunks_total: ChunkCount" , chunks_done AS "chunks_done: ChunkCount" - , chunk_size AS "chunk_size: ChunkSize" + , chunk_size AS "chunk_size!: ChunkSize" , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions.id + ) AS "strategic_metadata!: sqlx::types::Json" , otel_trace_carrier FROM submissions WHERE id = $1 "#, @@ -477,9 +487,18 @@ pub mod db { ) .fetch_optional(conn.get_inner()) .await?; - match submission { + match submission_row { None => Err(E::R(SubmissionNotFound(id))), - Some(submission) => Ok(submission), + Some(row) => Ok(Submission { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunks_done: row.chunks_done, + chunk_size: row.chunk_size, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.0, + otel_trace_carrier: row.otel_trace_carrier, + }), } } @@ -535,16 +554,19 @@ pub mod db { // NOTE: The order is important here; a concurrent writer could move a submission // from InProgress to Completed/Failed in-between the queries. - let submission = query_as!( - Submission, + let submission_row = query!( r#" SELECT id AS "id: SubmissionId" , prefix , chunks_total AS "chunks_total: ChunkCount" , chunks_done AS "chunks_done: ChunkCount" - , chunk_size AS "chunk_size: ChunkSize" + , chunk_size AS "chunk_size!: ChunkSize" , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions.id + ) AS "strategic_metadata!: sqlx::types::Json" , otel_trace_carrier FROM submissions WHERE id = $1 "#, @@ -552,7 +574,17 @@ pub mod db { ) .fetch_optional(conn.get_inner()) .await?; - if let Some(submission) = submission { + if let Some(row) = submission_row { + let submission = Submission { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunks_done: row.chunks_done, + chunk_size: row.chunk_size, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.0, + otel_trace_carrier: row.otel_trace_carrier, + }; return Ok(Some(SubmissionStatus::InProgress(submission))); } @@ -567,7 +599,7 @@ pub mod db { , ( SELECT json_group_object(metadata_key, metadata_value) FROM submissions_metadata WHERE submission_id = submissions_completed.id - ) AS "strategic_metadata: sqlx::types::Json" + ) AS "strategic_metadata!: sqlx::types::Json" , completed_at AS "completed_at: DateTime" , otel_trace_carrier FROM submissions_completed WHERE id = $1 @@ -583,7 +615,7 @@ pub mod db { chunks_total: row.chunks_total, chunk_size: row.chunk_size, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: row.strategic_metadata.0, completed_at: row.completed_at, otel_trace_carrier: row.otel_trace_carrier, }; @@ -602,7 +634,7 @@ pub mod db { , ( SELECT json_group_object(metadata_key, metadata_value) FROM submissions_metadata WHERE submission_id = submissions_failed.id - ) AS "strategic_metadata: sqlx::types::Json" + ) AS "strategic_metadata!: sqlx::types::Json" , failed_at AS "failed_at: DateTime" , failed_chunk_id AS "failed_chunk_id: ChunkIndex" , otel_trace_carrier @@ -620,7 +652,7 @@ pub mod db { chunks_done: row.chunks_done, chunk_size: row.chunk_size, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: row.strategic_metadata.0, failed_at: row.failed_at, failed_chunk_id: row.failed_chunk_id, otel_trace_carrier: row.otel_trace_carrier, @@ -644,7 +676,7 @@ pub mod db { , ( SELECT json_group_object(metadata_key, metadata_value) FROM submissions_metadata WHERE submission_id = submissions_cancelled.id - ) AS "strategic_metadata: sqlx::types::Json" + ) AS "strategic_metadata!: sqlx::types::Json" , cancelled_at AS "cancelled_at: DateTime" FROM submissions_cancelled WHERE id = $1 "#, @@ -659,7 +691,7 @@ pub mod db { chunks_total: row.chunks_total, chunks_done: row.chunks_done, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: row.strategic_metadata.0, cancelled_at: row.cancelled_at, }; return Ok(Some(SubmissionStatus::Cancelled(cancelled_submission))); @@ -1004,6 +1036,8 @@ pub mod test { use assert_matches::*; use chrono::Utc; use chunk::ChunkSize; + use itertools::Itertools; + use sqlx::{Row, SqliteConnection}; use crate::common::StrategicMetadataMap; use crate::db::{Connection as _, WriterPool}; @@ -1011,6 +1045,145 @@ pub mod test { use super::db::*; use super::*; + async fn explain_query_plan(query: &str, conn: &mut SqliteConnection) -> String { + sqlx::raw_sql(&format!("EXPLAIN QUERY PLAN {query}")) + .fetch_all(&mut *conn) + .await + .unwrap_or_else(|_| panic!("Invalid query: \n{query}\n")) + .into_iter() + .map(|row| { + let id = row.get::("id"); + let parent = row.get::("parent"); + let detail = row.get::("detail"); + format!("{id}, {parent}, {detail}") + }) + .join("\n") + } + + fn assert_non_regressing_query_plan(query: &str, explained: &str) { + assert!( + !explained.contains("MATERIALIZED"), + "Query should contain no materialization, but it did.\n\nQuery: {query}\n\nPlan:\n\n{explained}" + ); + assert!( + !explained.contains("B-TREE"), + "Query should contain no temporary B-tree construction, but it did.\n\nQuery: {query}\n\nPlan:\n\n{explained}" + ); + } + + #[sqlx::test] + pub async fn test_query_plan_submission_status_in_progress(db: sqlx::SqlitePool) { + let mut conn = db.acquire().await.unwrap(); + let query = r#" + SELECT + id + , prefix + , chunks_total + , chunks_done + , chunk_size + , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions.id + ) AS strategic_metadata + , otel_trace_carrier + FROM submissions WHERE id = 1 + "#; + + let explained = explain_query_plan(query, &mut conn).await; + assert_non_regressing_query_plan(query, &explained); + insta::assert_snapshot!(explained, @r" + 3, 0, SEARCH submissions USING INDEX sqlite_autoindex_submissions_1 (id=?) + 15, 0, CORRELATED SCALAR SUBQUERY 1 + 20, 15, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?) + "); + } + + #[sqlx::test] + pub async fn test_query_plan_submission_status_completed(db: sqlx::SqlitePool) { + let mut conn = db.acquire().await.unwrap(); + let query = r#" + SELECT + id + , prefix + , chunks_total + , chunk_size + , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_completed.id + ) AS strategic_metadata + , completed_at + , otel_trace_carrier + FROM submissions_completed WHERE id = 1 + "#; + + let explained = explain_query_plan(query, &mut conn).await; + assert_non_regressing_query_plan(query, &explained); + insta::assert_snapshot!(explained, @r" + 3, 0, SEARCH submissions_completed USING INDEX sqlite_autoindex_submissions_completed_1 (id=?) + 14, 0, CORRELATED SCALAR SUBQUERY 1 + 19, 14, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?) + "); + } + + #[sqlx::test] + pub async fn test_query_plan_submission_status_failed(db: sqlx::SqlitePool) { + let mut conn = db.acquire().await.unwrap(); + let query = r#" + SELECT + id + , prefix + , chunks_total + , chunks_done + , chunk_size + , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_failed.id + ) AS strategic_metadata + , failed_at + , failed_chunk_id + , otel_trace_carrier + FROM submissions_failed WHERE id = 1 + "#; + + let explained = explain_query_plan(query, &mut conn).await; + assert_non_regressing_query_plan(query, &explained); + insta::assert_snapshot!(explained, @r" + 3, 0, SEARCH submissions_failed USING INDEX sqlite_autoindex_submissions_failed_1 (id=?) + 15, 0, CORRELATED SCALAR SUBQUERY 1 + 20, 15, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?) + "); + } + + #[sqlx::test] + pub async fn test_query_plan_submission_status_cancelled(db: sqlx::SqlitePool) { + let mut conn = db.acquire().await.unwrap(); + let query = r#" + SELECT + id + , prefix + , chunks_total + , chunks_done + , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_cancelled.id + ) AS strategic_metadata + , cancelled_at + FROM submissions_cancelled WHERE id = 1 + "#; + + let explained = explain_query_plan(query, &mut conn).await; + assert_non_regressing_query_plan(query, &explained); + insta::assert_snapshot!(explained, @r" + 3, 0, SEARCH submissions_cancelled USING INDEX sqlite_autoindex_submissions_cancelled_1 (id=?) + 14, 0, CORRELATED SCALAR SUBQUERY 1 + 19, 14, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?) + "); + } + #[sqlx::test] pub async fn test_insert_submission(db: sqlx::SqlitePool) { let db = WriterPool::new(db); @@ -1024,7 +1197,7 @@ pub mod test { ChunkSize::default(), ) .unwrap(); - insert_submission(submission, chunks, Default::default(), &mut conn) + insert_submission(submission, chunks, &mut conn) .await .expect("insertion failed"); @@ -1041,11 +1214,16 @@ pub mod test { ChunkSize(1), ) .unwrap(); - insert_submission(submission.clone(), chunks, Default::default(), &mut conn) + insert_submission(submission.clone(), chunks, &mut conn) .await .expect("insertion failed"); let fetched_submission = get_submission(submission.id, &mut conn).await.unwrap(); + // When fetched from DB with no metadata rows, json_group_object returns '{}'. + let submission = Submission { + strategic_metadata: Default::default(), + ..submission + }; assert_eq!(fetched_submission, submission); } @@ -1103,7 +1281,7 @@ pub mod test { ChunkSize::default(), ) .unwrap(); - insert_submission(submission.clone(), chunks, Default::default(), &mut conn) + insert_submission(submission.clone(), chunks, &mut conn) .await .expect("insertion failed"); @@ -1128,7 +1306,7 @@ pub mod test { ChunkSize::default(), ) .unwrap(); - insert_submission(submission.clone(), chunks, Default::default(), &mut conn) + insert_submission(submission.clone(), chunks, &mut conn) .await .expect("insertion failed");