diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1.rs b/packages/elf-service/tests/acceptance/docs_extension_v1.rs index f770b6a6..1ad9fe4e 100644 --- a/packages/elf-service/tests/acceptance/docs_extension_v1.rs +++ b/packages/elf-service/tests/acceptance/docs_extension_v1.rs @@ -1,660 +1,15 @@ mod excerpts; +mod helpers; mod indexing; mod l0_search; mod lifecycle; mod search_filters; mod validation_rejections; -use std::{ - collections::HashSet, - future::IntoFuture, - string::ToString, - sync::Arc, - time::{Duration, Instant}, +pub(crate) use helpers::{ + DocsContext, TEST_CONTENT, assert_doc_excerpt, assert_doc_get, assert_docs_search_l0, + cleanup_docs_filter_fixture, create_docs_search_filter_fixture, fetch_first_doc_chunk_id, + fetch_first_doc_chunk_point, payload_string, put_test_doc, put_test_doc_with, + search_doc_ids_with_filters, setup_docs_context, spawn_doc_worker, trajectory_stage_stats, + verify_docs_qdrant_filter_indexes, wait_for_doc_outbox_done, wait_for_note_outbox_done, }; - -use ahash::AHashMap; -use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing}; -use qdrant_client::qdrant::{ - CreateFieldIndexCollection, FieldType, GetPointsBuilder, PayloadSchemaType, RetrievedPoint, - value, -}; -use serde_json::Map; -use sqlx::{FromRow, PgPool}; -use tokenizers::{Tokenizer, models::wordlevel::WordLevel}; -use tokio::{ - net::TcpListener, - sync::{oneshot, oneshot::Sender}, - task::JoinHandle, - time, -}; -use uuid::Uuid; - -use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank, chunking::ChunkingConfig}; -use elf_config::EmbeddingProviderConfig; -use elf_service::{ - DocsExcerptsGetRequest, DocsGetRequest, DocsPutRequest, DocsPutResponse, DocsSearchL0Request, - ElfService, Providers, TextQuoteSelector, docs::DocRetrievalTrajectory, -}; -use elf_storage::{db::Db, qdrant::QdrantStore}; -use elf_testkit::TestDatabase; -use elf_worker::worker::{self, WorkerState}; - -const TEST_CONTENT: &str = - "ELF docs extension v1 stores evidence. Keyword: peregrine.\nSecond sentence for chunking."; -const DOCS_SEARCH_FILTER_INDEXES: [(&str, PayloadSchemaType, FieldType); 9] = [ - ("scope", PayloadSchemaType::Keyword, FieldType::Keyword), - ("status", PayloadSchemaType::Keyword, FieldType::Keyword), - ("doc_type", PayloadSchemaType::Keyword, FieldType::Keyword), - ("agent_id", PayloadSchemaType::Keyword, FieldType::Keyword), - ("updated_at", PayloadSchemaType::Datetime, FieldType::Datetime), - ("doc_ts", PayloadSchemaType::Datetime, FieldType::Datetime), - ("thread_id", PayloadSchemaType::Keyword, FieldType::Keyword), - ("domain", PayloadSchemaType::Keyword, FieldType::Keyword), - ("repo", PayloadSchemaType::Keyword, FieldType::Keyword), -]; - -#[derive(FromRow)] -struct DocOutboxCounts { - total: i64, - done: i64, - failed: i64, -} - -#[derive(FromRow)] -struct NoteOutboxCounts { - total: i64, - done: i64, - failed: i64, -} - -struct DocsContext { - test_db: TestDatabase, - service: ElfService, -} - -fn build_test_tokenizer() -> Tokenizer { - let mut vocab = AHashMap::new(); - - vocab.insert("".to_string(), 0_u32); - - let model = WordLevel::builder() - .vocab(vocab) - .unk_token("".to_string()) - .build() - .expect("Failed to build test tokenizer."); - - Tokenizer::new(model) -} - -fn payload_string(payload_value: &qdrant_client::qdrant::Value) -> Option<&str> { - match payload_value.kind.as_ref() { - Some(value::Kind::StringValue(value)) => Some(value.as_str()), - _ => None, - } -} - -fn trajectory_stage_stats<'a>( - trajectory: &'a DocRetrievalTrajectory, - stage_name: &str, -) -> Option<&'a serde_json::Value> { - trajectory.stages.iter().find(|stage| stage.stage_name == stage_name).map(|stage| &stage.stats) -} - -async fn wait_for_doc_outbox_done(pool: &PgPool, doc_id: Uuid, timeout: Duration) -> bool { - let deadline = Instant::now() + timeout; - - loop { - let row: Option = sqlx::query_as::<_, DocOutboxCounts>( - "\ -SELECT - COUNT(*) AS total, - COUNT(*) FILTER (WHERE status = 'DONE') AS done, - COUNT(*) FILTER (WHERE status = 'FAILED') AS failed -FROM doc_indexing_outbox -WHERE doc_id = $1", - ) - .bind(doc_id) - .fetch_optional(pool) - .await - .ok() - .flatten(); - - if let Some(row) = row.as_ref() - && row.total > 0 - && row.done == row.total - { - return true; - } - if let Some(row) = row.as_ref() - && row.failed > 0 - { - return false; - } - - if Instant::now() >= deadline { - return false; - } - - time::sleep(Duration::from_millis(200)).await; - } -} - -async fn wait_for_note_outbox_done(pool: &PgPool, note_id: Uuid, timeout: Duration) -> bool { - let deadline = Instant::now() + timeout; - - loop { - let row: Option = sqlx::query_as::<_, NoteOutboxCounts>( - "\ -SELECT - COUNT(*) AS total, - COUNT(*) FILTER (WHERE status = 'DONE') AS done, - COUNT(*) FILTER (WHERE status = 'FAILED') AS failed -FROM indexing_outbox -WHERE note_id = $1", - ) - .bind(note_id) - .fetch_optional(pool) - .await - .ok() - .flatten(); - - if let Some(row) = row.as_ref() - && row.total > 0 - && row.done == row.total - { - return true; - } - if let Some(row) = row.as_ref() - && row.failed > 0 - { - return false; - } - - if Instant::now() >= deadline { - return false; - } - - time::sleep(Duration::from_millis(200)).await; - } -} - -async fn start_embed_server() -> (String, Sender<()>) { - let app = Router::new().route("/embeddings", routing::post(embed_handler)).with_state(()); - let listener = TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind embed server."); - let addr = listener.local_addr().expect("Failed to read embed server address."); - let (tx, rx) = oneshot::channel(); - let server = axum::serve(listener, app).with_graceful_shutdown(async move { - let _ = rx.await; - }); - - tokio::spawn(async move { - let _ = server.into_future().await; - }); - - (format!("http://{addr}"), tx) -} - -async fn embed_handler( - State(()): State<()>, - Json(payload): Json, -) -> impl IntoResponse { - let inputs = - payload.get("input").and_then(|value| value.as_array()).cloned().unwrap_or_default(); - let data: Vec<_> = inputs - .iter() - .enumerate() - .map(|(index, _)| { - let embedding: Vec = vec![0.1_f32; 4_096]; - - serde_json::json!({ - "index": index, - "embedding": embedding, - }) - }) - .collect(); - - (StatusCode::OK, Json(serde_json::json!({ "data": data }))).into_response() -} - -async fn create_docs_search_filter_fixture( - ctx: DocsContext, -) -> (TestDatabase, ElfService, Uuid, Uuid, Uuid, JoinHandle<()>, Sender<()>) { - let DocsContext { test_db, service } = ctx; - let shared_knowledge_doc = put_test_doc_with( - &service, - "owner", - "project_shared", - None, - "Docs filter sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2026-02-25T12:00:00Z", - }), - TEST_CONTENT, - ) - .await; - let older_shared_knowledge_doc = put_test_doc_with( - &service, - "owner", - "project_shared", - None, - "Docs old filter sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2025-01-01T10:00:00Z", - }), - TEST_CONTENT, - ) - .await; - let private_chat_doc = put_test_doc_with( - &service, - "assistant", - "agent_private", - Some("chat"), - "Docs chat sample", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "chat", - "ts": "2026-02-25T12:00:00Z", - "thread_id": "shared-chat-thread", - "role": "assistant" - }), - TEST_CONTENT, - ) - .await; - let (handle, shutdown) = spawn_doc_worker(&service).await; - - assert!( - wait_for_doc_outbox_done( - &service.db.pool, - shared_knowledge_doc.doc_id, - std::time::Duration::from_secs(15) - ) - .await, - "Expected shared docs outbox to reach DONE." - ); - assert!( - wait_for_doc_outbox_done( - &service.db.pool, - older_shared_knowledge_doc.doc_id, - std::time::Duration::from_secs(15) - ) - .await, - "Expected older shared docs outbox to reach DONE." - ); - assert!( - wait_for_doc_outbox_done( - &service.db.pool, - private_chat_doc.doc_id, - std::time::Duration::from_secs(15) - ) - .await, - "Expected private docs outbox to reach DONE." - ); - - ( - test_db, - service, - shared_knowledge_doc.doc_id, - older_shared_knowledge_doc.doc_id, - private_chat_doc.doc_id, - handle, - shutdown, - ) -} - -async fn cleanup_docs_filter_fixture( - test_db: TestDatabase, - _handle: JoinHandle<()>, - shutdown: Sender<()>, -) { - let _ = shutdown.send(()); - - _handle.abort(); - - let _ = _handle.await; - - test_db.cleanup().await.expect("Failed to cleanup test database."); -} - -async fn setup_docs_context() -> Option { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping docs_extension_v1; set ELF_PG_DSN to run this test."); - - return None; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!( - "Skipping docs_extension_v1; set ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test." - ); - - return None; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let cfg = acceptance::test_config( - test_db.dsn().to_string(), - qdrant_url, - 4_096, - collection, - docs_collection, - ); - let providers = Providers::new( - Arc::new(StubEmbedding { vector_dim: 4_096 }), - Arc::new(StubRerank), - Arc::new(SpyExtractor { - calls: Arc::new(Default::default()), - payload: serde_json::json!({ "notes": [] }), - }), - ); - let service = - acceptance::build_service(cfg, providers).await.expect("Failed to build service."); - - acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); - acceptance::reset_qdrant_collection( - &service.qdrant.client, - &service.qdrant.collection, - service.qdrant.vector_dim, - ) - .await - .expect("Failed to reset Qdrant memory collection."); - acceptance::reset_qdrant_collection( - &service.qdrant.client, - &service.cfg.storage.qdrant.docs_collection, - service.qdrant.vector_dim, - ) - .await - .expect("Failed to reset Qdrant docs collection."); - - Some(DocsContext { test_db, service }) -} - -async fn fetch_first_doc_chunk_id(db: &ElfService, doc_id: Uuid) -> Option { - sqlx::query_scalar::<_, Uuid>( - "SELECT chunk_id FROM doc_chunks WHERE doc_id = $1 ORDER BY chunk_index LIMIT 1", - ) - .bind(doc_id) - .fetch_optional(&db.db.pool) - .await - .expect("Failed to fetch doc chunk id.") -} - -async fn fetch_first_doc_chunk_point(service: &ElfService, doc_id: Uuid) -> Option { - let chunk_id = fetch_first_doc_chunk_id(service, doc_id).await?; - let response = service - .qdrant - .client - .get_points( - GetPointsBuilder::new( - service.cfg.storage.qdrant.docs_collection.clone(), - vec![chunk_id.to_string().into()], - ) - .with_payload(true), - ) - .await - .expect("Failed to fetch doc chunk point from Qdrant."); - - response.result.into_iter().next() -} - -async fn put_test_doc(service: &ElfService) -> DocsPutResponse { - put_test_doc_with( - service, - "owner", - "project_shared", - None, - "Docs v1", - serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "ts": "2026-02-25T12:00:00Z", - "uri": "acceptance://knowledge/v1" - }), - TEST_CONTENT, - ) - .await -} - -async fn put_test_doc_with( - service: &ElfService, - agent_id: &str, - scope: &str, - doc_type: Option<&str>, - title: &str, - source_ref: serde_json::Value, - content: &str, -) -> DocsPutResponse { - service - .docs_put(DocsPutRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: agent_id.to_string(), - scope: scope.to_string(), - doc_type: doc_type.map(ToString::to_string), - title: Some(title.to_string()), - write_policy: None, - source_ref, - content: content.to_string(), - }) - .await - .expect("Failed to put doc.") -} - -async fn search_doc_ids_with_filters( - service: &ElfService, - scope: Option<&str>, - doc_type: Option<&str>, - agent_id: Option<&str>, - updated_after: Option<&str>, - updated_before: Option<&str>, - caller_agent_id: &str, -) -> HashSet { - let results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: caller_agent_id.to_string(), - scope: scope.map(str::to_string), - status: None, - doc_type: doc_type.map(str::to_string), - sparse_mode: None, - domain: None, - repo: None, - agent_id: agent_id.map(str::to_string), - thread_id: None, - updated_after: updated_after.map(str::to_string), - updated_before: updated_before.map(str::to_string), - ts_gte: None, - ts_lte: None, - read_profile: "all_scopes".to_string(), - query: "peregrine".to_string(), - top_k: Some(20), - candidate_k: Some(50), - explain: None, - }) - .await - .expect("Failed to search docs."); - - results.items.into_iter().map(|item| item.doc_id).collect() -} - -async fn verify_docs_qdrant_filter_indexes(service: &ElfService) { - let mut payload_schema = service - .qdrant - .client - .collection_info(&service.cfg.storage.qdrant.docs_collection) - .await - .expect("Failed to fetch Qdrant docs collection info.") - .result - .expect("Qdrant collection info is missing.") - .payload_schema; - - for (field_name, payload_type, index_type) in DOCS_SEARCH_FILTER_INDEXES { - let missing_or_wrong = match payload_schema.get(field_name) { - Some(schema) => schema.data_type != payload_type as i32, - None => true, - }; - - if missing_or_wrong { - let request = CreateFieldIndexCollection { - collection_name: service.cfg.storage.qdrant.docs_collection.clone(), - wait: Some(true), - field_name: field_name.to_string(), - field_type: Some(index_type as i32), - field_index_params: None, - ordering: None, - timeout: None, - }; - - service - .qdrant - .client - .create_field_index(request) - .await - .expect("Failed to create required Qdrant payload index."); - } - } - - payload_schema = service - .qdrant - .client - .collection_info(&service.cfg.storage.qdrant.docs_collection) - .await - .expect("Failed to fetch Qdrant docs collection info.") - .result - .expect("Qdrant collection info is missing.") - .payload_schema; - - for (field_name, payload_type, _) in DOCS_SEARCH_FILTER_INDEXES { - let schema = payload_schema.get(field_name).expect("Expected required payload field."); - - assert_eq!( - schema.data_type, payload_type as i32, - "Unexpected payload type for {field_name}." - ); - } -} - -async fn assert_doc_get(service: &ElfService, doc_id: Uuid) { - let get_as_owner = service - .docs_get(DocsGetRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "owner".to_string(), - read_profile: "private_plus_project".to_string(), - doc_id, - }) - .await - .expect("Failed to get doc as owner."); - - assert_eq!(get_as_owner.scope, "project_shared"); - assert_eq!(get_as_owner.doc_type, "knowledge"); - assert_eq!(get_as_owner.agent_id, "owner"); - assert_eq!(get_as_owner.title.as_deref(), Some("Docs v1")); - - let get_as_reader = service - .docs_get(DocsGetRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "reader".to_string(), - read_profile: "private_plus_project".to_string(), - doc_id, - }) - .await - .expect("Failed to get doc as reader (expected project grant)."); - - assert_eq!(get_as_reader.doc_id, doc_id); -} - -async fn assert_doc_excerpt(service: &ElfService, doc_id: Uuid, content_hash: &str) { - let excerpts = service - .docs_excerpts_get(DocsExcerptsGetRequest { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - agent_id: "reader".to_string(), - read_profile: "private_plus_project".to_string(), - doc_id, - level: "L1".to_string(), - chunk_id: None, - quote: Some(TextQuoteSelector { - exact: "Keyword: peregrine.".to_string(), - prefix: Some("evidence. ".to_string()), - suffix: Some("\nSecond".to_string()), - }), - position: None, - explain: None, - }) - .await - .expect("Failed to get excerpt."); - - assert!(excerpts.verification.verified); - assert!(excerpts.excerpt.contains("Keyword: peregrine.")); - assert_eq!(excerpts.verification.content_hash, content_hash); -} - -async fn spawn_doc_worker(service: &ElfService) -> (JoinHandle<()>, Sender<()>) { - let (api_base, shutdown) = start_embed_server().await; - let worker_state = WorkerState { - db: Db::connect(&service.cfg.storage.postgres).await.expect("Failed to connect worker DB."), - qdrant: QdrantStore::new(&service.cfg.storage.qdrant) - .expect("Failed to build Qdrant store."), - docs_qdrant: QdrantStore::new_with_collection( - &service.cfg.storage.qdrant, - &service.cfg.storage.qdrant.docs_collection, - ) - .expect("Failed to build docs Qdrant store."), - embedding: EmbeddingProviderConfig { - provider_id: "test".to_string(), - api_base, - api_key: "test-key".to_string(), - path: "/embeddings".to_string(), - model: "test".to_string(), - dimensions: 4_096, - timeout_ms: 1_000, - default_headers: Map::new(), - }, - chunking: ChunkingConfig { max_tokens: 64, overlap_tokens: 8 }, - tokenizer: build_test_tokenizer(), - }; - let handle = tokio::spawn(async move { - let _ = worker::run_worker(worker_state).await; - }); - - (handle, shutdown) -} - -async fn assert_docs_search_l0(service: &ElfService, doc_id: Uuid) { - let results = service - .docs_search_l0(DocsSearchL0Request { - tenant_id: "t".to_string(), - project_id: "p".to_string(), - caller_agent_id: "reader".to_string(), - scope: None, - status: None, - doc_type: None, - sparse_mode: None, - domain: None, - repo: None, - agent_id: None, - thread_id: None, - updated_after: None, - updated_before: None, - ts_gte: None, - ts_lte: None, - read_profile: "private_plus_project".to_string(), - query: "peregrine".to_string(), - top_k: Some(5), - candidate_k: Some(20), - explain: None, - }) - .await - .expect("Failed to search docs."); - - assert!(!results.items.is_empty()); - assert_eq!(results.items[0].doc_id, doc_id); - assert_eq!(results.items[0].doc_type, "knowledge"); - assert!(results.items[0].snippet.contains("peregrine")); -} diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1/helpers.rs b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers.rs new file mode 100644 index 00000000..7d7c52fe --- /dev/null +++ b/packages/elf-service/tests/acceptance/docs_extension_v1/helpers.rs @@ -0,0 +1,664 @@ +use std::{ + collections::HashSet, + future::IntoFuture, + string::ToString, + sync::Arc, + time::{Duration, Instant}, +}; + +use ahash::AHashMap; +use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing}; +use qdrant_client::qdrant::{ + CreateFieldIndexCollection, FieldType, GetPointsBuilder, PayloadSchemaType, RetrievedPoint, + value, +}; +use serde_json::Map; +use sqlx::{FromRow, PgPool}; +use tokenizers::{Tokenizer, models::wordlevel::WordLevel}; +use tokio::{ + net::TcpListener, + sync::{oneshot, oneshot::Sender}, + task::JoinHandle, + time, +}; +use uuid::Uuid; + +use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank, chunking::ChunkingConfig}; +use elf_config::EmbeddingProviderConfig; +use elf_service::{ + DocsExcerptsGetRequest, DocsGetRequest, DocsPutRequest, DocsPutResponse, DocsSearchL0Request, + ElfService, Providers, TextQuoteSelector, docs::DocRetrievalTrajectory, +}; +use elf_storage::{db::Db, qdrant::QdrantStore}; +use elf_testkit::TestDatabase; +use elf_worker::worker::{self, WorkerState}; + +pub(crate) const TEST_CONTENT: &str = + "ELF docs extension v1 stores evidence. Keyword: peregrine.\nSecond sentence for chunking."; +pub(crate) const DOCS_SEARCH_FILTER_INDEXES: [(&str, PayloadSchemaType, FieldType); 9] = [ + ("scope", PayloadSchemaType::Keyword, FieldType::Keyword), + ("status", PayloadSchemaType::Keyword, FieldType::Keyword), + ("doc_type", PayloadSchemaType::Keyword, FieldType::Keyword), + ("agent_id", PayloadSchemaType::Keyword, FieldType::Keyword), + ("updated_at", PayloadSchemaType::Datetime, FieldType::Datetime), + ("doc_ts", PayloadSchemaType::Datetime, FieldType::Datetime), + ("thread_id", PayloadSchemaType::Keyword, FieldType::Keyword), + ("domain", PayloadSchemaType::Keyword, FieldType::Keyword), + ("repo", PayloadSchemaType::Keyword, FieldType::Keyword), +]; + +#[derive(FromRow)] +pub(crate) struct DocOutboxCounts { + total: i64, + done: i64, + failed: i64, +} + +#[derive(FromRow)] +pub(crate) struct NoteOutboxCounts { + total: i64, + done: i64, + failed: i64, +} + +pub(crate) struct DocsContext { + pub(crate) test_db: TestDatabase, + pub(crate) service: ElfService, +} + +pub(crate) fn build_test_tokenizer() -> Tokenizer { + let mut vocab = AHashMap::new(); + + vocab.insert("".to_string(), 0_u32); + + let model = WordLevel::builder() + .vocab(vocab) + .unk_token("".to_string()) + .build() + .expect("Failed to build test tokenizer."); + + Tokenizer::new(model) +} + +pub(crate) fn payload_string(payload_value: &qdrant_client::qdrant::Value) -> Option<&str> { + match payload_value.kind.as_ref() { + Some(value::Kind::StringValue(value)) => Some(value.as_str()), + _ => None, + } +} + +pub(crate) fn trajectory_stage_stats<'a>( + trajectory: &'a DocRetrievalTrajectory, + stage_name: &str, +) -> Option<&'a serde_json::Value> { + trajectory.stages.iter().find(|stage| stage.stage_name == stage_name).map(|stage| &stage.stats) +} + +pub(crate) async fn wait_for_doc_outbox_done( + pool: &PgPool, + doc_id: Uuid, + timeout: Duration, +) -> bool { + let deadline = Instant::now() + timeout; + + loop { + let row: Option = sqlx::query_as::<_, DocOutboxCounts>( + "\ +SELECT + COUNT(*) AS total, + COUNT(*) FILTER (WHERE status = 'DONE') AS done, + COUNT(*) FILTER (WHERE status = 'FAILED') AS failed +FROM doc_indexing_outbox +WHERE doc_id = $1", + ) + .bind(doc_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + + if let Some(row) = row.as_ref() + && row.total > 0 + && row.done == row.total + { + return true; + } + if let Some(row) = row.as_ref() + && row.failed > 0 + { + return false; + } + + if Instant::now() >= deadline { + return false; + } + + time::sleep(Duration::from_millis(200)).await; + } +} + +pub(crate) async fn wait_for_note_outbox_done( + pool: &PgPool, + note_id: Uuid, + timeout: Duration, +) -> bool { + let deadline = Instant::now() + timeout; + + loop { + let row: Option = sqlx::query_as::<_, NoteOutboxCounts>( + "\ +SELECT + COUNT(*) AS total, + COUNT(*) FILTER (WHERE status = 'DONE') AS done, + COUNT(*) FILTER (WHERE status = 'FAILED') AS failed +FROM indexing_outbox +WHERE note_id = $1", + ) + .bind(note_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + + if let Some(row) = row.as_ref() + && row.total > 0 + && row.done == row.total + { + return true; + } + if let Some(row) = row.as_ref() + && row.failed > 0 + { + return false; + } + + if Instant::now() >= deadline { + return false; + } + + time::sleep(Duration::from_millis(200)).await; + } +} + +pub(crate) async fn start_embed_server() -> (String, Sender<()>) { + let app = Router::new().route("/embeddings", routing::post(embed_handler)).with_state(()); + let listener = TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind embed server."); + let addr = listener.local_addr().expect("Failed to read embed server address."); + let (tx, rx) = oneshot::channel(); + let server = axum::serve(listener, app).with_graceful_shutdown(async move { + let _ = rx.await; + }); + + tokio::spawn(async move { + let _ = server.into_future().await; + }); + + (format!("http://{addr}"), tx) +} + +pub(crate) async fn embed_handler( + State(()): State<()>, + Json(payload): Json, +) -> impl IntoResponse { + let inputs = + payload.get("input").and_then(|value| value.as_array()).cloned().unwrap_or_default(); + let data: Vec<_> = inputs + .iter() + .enumerate() + .map(|(index, _)| { + let embedding: Vec = vec![0.1_f32; 4_096]; + + serde_json::json!({ + "index": index, + "embedding": embedding, + }) + }) + .collect(); + + (StatusCode::OK, Json(serde_json::json!({ "data": data }))).into_response() +} + +pub(crate) async fn create_docs_search_filter_fixture( + ctx: DocsContext, +) -> (TestDatabase, ElfService, Uuid, Uuid, Uuid, JoinHandle<()>, Sender<()>) { + let DocsContext { test_db, service } = ctx; + let shared_knowledge_doc = put_test_doc_with( + &service, + "owner", + "project_shared", + None, + "Docs filter sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-25T12:00:00Z", + }), + TEST_CONTENT, + ) + .await; + let older_shared_knowledge_doc = put_test_doc_with( + &service, + "owner", + "project_shared", + None, + "Docs old filter sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2025-01-01T10:00:00Z", + }), + TEST_CONTENT, + ) + .await; + let private_chat_doc = put_test_doc_with( + &service, + "assistant", + "agent_private", + Some("chat"), + "Docs chat sample", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "chat", + "ts": "2026-02-25T12:00:00Z", + "thread_id": "shared-chat-thread", + "role": "assistant" + }), + TEST_CONTENT, + ) + .await; + let (handle, shutdown) = spawn_doc_worker(&service).await; + + assert!( + wait_for_doc_outbox_done( + &service.db.pool, + shared_knowledge_doc.doc_id, + std::time::Duration::from_secs(15) + ) + .await, + "Expected shared docs outbox to reach DONE." + ); + assert!( + wait_for_doc_outbox_done( + &service.db.pool, + older_shared_knowledge_doc.doc_id, + std::time::Duration::from_secs(15) + ) + .await, + "Expected older shared docs outbox to reach DONE." + ); + assert!( + wait_for_doc_outbox_done( + &service.db.pool, + private_chat_doc.doc_id, + std::time::Duration::from_secs(15) + ) + .await, + "Expected private docs outbox to reach DONE." + ); + + ( + test_db, + service, + shared_knowledge_doc.doc_id, + older_shared_knowledge_doc.doc_id, + private_chat_doc.doc_id, + handle, + shutdown, + ) +} + +pub(crate) async fn cleanup_docs_filter_fixture( + test_db: TestDatabase, + _handle: JoinHandle<()>, + shutdown: Sender<()>, +) { + let _ = shutdown.send(()); + + _handle.abort(); + + let _ = _handle.await; + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +pub(crate) async fn setup_docs_context() -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping docs_extension_v1; set ELF_PG_DSN to run this test."); + + return None; + }; + let Some(qdrant_url) = acceptance::test_qdrant_url() else { + eprintln!( + "Skipping docs_extension_v1; set ELF_QDRANT_URL (or ELF_QDRANT_GRPC_URL) to run this test." + ); + + return None; + }; + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + let providers = Providers::new( + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(SpyExtractor { + calls: Arc::new(Default::default()), + payload: serde_json::json!({ "notes": [] }), + }), + ); + let service = + acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + acceptance::reset_qdrant_collection( + &service.qdrant.client, + &service.qdrant.collection, + service.qdrant.vector_dim, + ) + .await + .expect("Failed to reset Qdrant memory collection."); + acceptance::reset_qdrant_collection( + &service.qdrant.client, + &service.cfg.storage.qdrant.docs_collection, + service.qdrant.vector_dim, + ) + .await + .expect("Failed to reset Qdrant docs collection."); + + Some(DocsContext { test_db, service }) +} + +pub(crate) async fn fetch_first_doc_chunk_id(db: &ElfService, doc_id: Uuid) -> Option { + sqlx::query_scalar::<_, Uuid>( + "SELECT chunk_id FROM doc_chunks WHERE doc_id = $1 ORDER BY chunk_index LIMIT 1", + ) + .bind(doc_id) + .fetch_optional(&db.db.pool) + .await + .expect("Failed to fetch doc chunk id.") +} + +pub(crate) async fn fetch_first_doc_chunk_point( + service: &ElfService, + doc_id: Uuid, +) -> Option { + let chunk_id = fetch_first_doc_chunk_id(service, doc_id).await?; + let response = service + .qdrant + .client + .get_points( + GetPointsBuilder::new( + service.cfg.storage.qdrant.docs_collection.clone(), + vec![chunk_id.to_string().into()], + ) + .with_payload(true), + ) + .await + .expect("Failed to fetch doc chunk point from Qdrant."); + + response.result.into_iter().next() +} + +pub(crate) async fn put_test_doc(service: &ElfService) -> DocsPutResponse { + put_test_doc_with( + service, + "owner", + "project_shared", + None, + "Docs v1", + serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "ts": "2026-02-25T12:00:00Z", + "uri": "acceptance://knowledge/v1" + }), + TEST_CONTENT, + ) + .await +} + +pub(crate) async fn put_test_doc_with( + service: &ElfService, + agent_id: &str, + scope: &str, + doc_type: Option<&str>, + title: &str, + source_ref: serde_json::Value, + content: &str, +) -> DocsPutResponse { + service + .docs_put(DocsPutRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: agent_id.to_string(), + scope: scope.to_string(), + doc_type: doc_type.map(ToString::to_string), + title: Some(title.to_string()), + write_policy: None, + source_ref, + content: content.to_string(), + }) + .await + .expect("Failed to put doc.") +} + +pub(crate) async fn search_doc_ids_with_filters( + service: &ElfService, + scope: Option<&str>, + doc_type: Option<&str>, + agent_id: Option<&str>, + updated_after: Option<&str>, + updated_before: Option<&str>, + caller_agent_id: &str, +) -> HashSet { + let results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: caller_agent_id.to_string(), + scope: scope.map(str::to_string), + status: None, + doc_type: doc_type.map(str::to_string), + sparse_mode: None, + domain: None, + repo: None, + agent_id: agent_id.map(str::to_string), + thread_id: None, + updated_after: updated_after.map(str::to_string), + updated_before: updated_before.map(str::to_string), + ts_gte: None, + ts_lte: None, + read_profile: "all_scopes".to_string(), + query: "peregrine".to_string(), + top_k: Some(20), + candidate_k: Some(50), + explain: None, + }) + .await + .expect("Failed to search docs."); + + results.items.into_iter().map(|item| item.doc_id).collect() +} + +pub(crate) async fn verify_docs_qdrant_filter_indexes(service: &ElfService) { + let mut payload_schema = service + .qdrant + .client + .collection_info(&service.cfg.storage.qdrant.docs_collection) + .await + .expect("Failed to fetch Qdrant docs collection info.") + .result + .expect("Qdrant collection info is missing.") + .payload_schema; + + for (field_name, payload_type, index_type) in DOCS_SEARCH_FILTER_INDEXES { + let missing_or_wrong = match payload_schema.get(field_name) { + Some(schema) => schema.data_type != payload_type as i32, + None => true, + }; + + if missing_or_wrong { + let request = CreateFieldIndexCollection { + collection_name: service.cfg.storage.qdrant.docs_collection.clone(), + wait: Some(true), + field_name: field_name.to_string(), + field_type: Some(index_type as i32), + field_index_params: None, + ordering: None, + timeout: None, + }; + + service + .qdrant + .client + .create_field_index(request) + .await + .expect("Failed to create required Qdrant payload index."); + } + } + + payload_schema = service + .qdrant + .client + .collection_info(&service.cfg.storage.qdrant.docs_collection) + .await + .expect("Failed to fetch Qdrant docs collection info.") + .result + .expect("Qdrant collection info is missing.") + .payload_schema; + + for (field_name, payload_type, _) in DOCS_SEARCH_FILTER_INDEXES { + let schema = payload_schema.get(field_name).expect("Expected required payload field."); + + assert_eq!( + schema.data_type, payload_type as i32, + "Unexpected payload type for {field_name}." + ); + } +} + +pub(crate) async fn assert_doc_get(service: &ElfService, doc_id: Uuid) { + let get_as_owner = service + .docs_get(DocsGetRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "owner".to_string(), + read_profile: "private_plus_project".to_string(), + doc_id, + }) + .await + .expect("Failed to get doc as owner."); + + assert_eq!(get_as_owner.scope, "project_shared"); + assert_eq!(get_as_owner.doc_type, "knowledge"); + assert_eq!(get_as_owner.agent_id, "owner"); + assert_eq!(get_as_owner.title.as_deref(), Some("Docs v1")); + + let get_as_reader = service + .docs_get(DocsGetRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "reader".to_string(), + read_profile: "private_plus_project".to_string(), + doc_id, + }) + .await + .expect("Failed to get doc as reader (expected project grant)."); + + assert_eq!(get_as_reader.doc_id, doc_id); +} + +pub(crate) async fn assert_doc_excerpt(service: &ElfService, doc_id: Uuid, content_hash: &str) { + let excerpts = service + .docs_excerpts_get(DocsExcerptsGetRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "reader".to_string(), + read_profile: "private_plus_project".to_string(), + doc_id, + level: "L1".to_string(), + chunk_id: None, + quote: Some(TextQuoteSelector { + exact: "Keyword: peregrine.".to_string(), + prefix: Some("evidence. ".to_string()), + suffix: Some("\nSecond".to_string()), + }), + position: None, + explain: None, + }) + .await + .expect("Failed to get excerpt."); + + assert!(excerpts.verification.verified); + assert!(excerpts.excerpt.contains("Keyword: peregrine.")); + assert_eq!(excerpts.verification.content_hash, content_hash); +} + +pub(crate) async fn spawn_doc_worker(service: &ElfService) -> (JoinHandle<()>, Sender<()>) { + let (api_base, shutdown) = start_embed_server().await; + let worker_state = WorkerState { + db: Db::connect(&service.cfg.storage.postgres).await.expect("Failed to connect worker DB."), + qdrant: QdrantStore::new(&service.cfg.storage.qdrant) + .expect("Failed to build Qdrant store."), + docs_qdrant: QdrantStore::new_with_collection( + &service.cfg.storage.qdrant, + &service.cfg.storage.qdrant.docs_collection, + ) + .expect("Failed to build docs Qdrant store."), + embedding: EmbeddingProviderConfig { + provider_id: "test".to_string(), + api_base, + api_key: "test-key".to_string(), + path: "/embeddings".to_string(), + model: "test".to_string(), + dimensions: 4_096, + timeout_ms: 1_000, + default_headers: Map::new(), + }, + chunking: ChunkingConfig { max_tokens: 64, overlap_tokens: 8 }, + tokenizer: build_test_tokenizer(), + }; + let handle = tokio::spawn(async move { + let _ = worker::run_worker(worker_state).await; + }); + + (handle, shutdown) +} + +pub(crate) async fn assert_docs_search_l0(service: &ElfService, doc_id: Uuid) { + let results = service + .docs_search_l0(DocsSearchL0Request { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + caller_agent_id: "reader".to_string(), + scope: None, + status: None, + doc_type: None, + sparse_mode: None, + domain: None, + repo: None, + agent_id: None, + thread_id: None, + updated_after: None, + updated_before: None, + ts_gte: None, + ts_lte: None, + read_profile: "private_plus_project".to_string(), + query: "peregrine".to_string(), + top_k: Some(5), + candidate_k: Some(20), + explain: None, + }) + .await + .expect("Failed to search docs."); + + assert!(!results.items.is_empty()); + assert_eq!(results.items[0].doc_id, doc_id); + assert_eq!(results.items[0].doc_type, "knowledge"); + assert!(results.items[0].snippet.contains("peregrine")); +} diff --git a/packages/elf-service/tests/acceptance/knowledge_pages.rs b/packages/elf-service/tests/acceptance/knowledge_pages.rs index 81dd0c5c..c8ee518c 100644 --- a/packages/elf-service/tests/acceptance/knowledge_pages.rs +++ b/packages/elf-service/tests/acceptance/knowledge_pages.rs @@ -1,457 +1,14 @@ -use std::sync::{Arc, atomic::AtomicUsize}; +mod helpers; -use time::OffsetDateTime; -use uuid::Uuid; - -use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; -use elf_domain::knowledge::KnowledgePageKind; -use elf_service::{ - AddNoteInput, AddNoteRequest, ElfService, KnowledgePageLintRequest, - KnowledgePageRebuildRequest, KnowledgePageRebuildResponse, KnowledgePageSearchRequest, - Providers, +pub(crate) use helpers::{ + AGENT_ID, PROJECT_ID, TENANT_ID, assert_first_rebuild, insert_rebuild_sources, + knowledge_foundation_request, setup_service, }; -use elf_testkit::TestDatabase; - -const TENANT_ID: &str = "tenant_knowledge"; -const PROJECT_ID: &str = "project_knowledge"; -const AGENT_ID: &str = "agent_knowledge"; - -struct KnowledgeFixture { - service: ElfService, - _test_db: TestDatabase, -} - -#[derive(Clone, Copy)] -struct KnowledgeSourceIds { - note_id: Uuid, - event_id: Uuid, - doc_id: Uuid, - chunk_id: Uuid, - fact_id: Uuid, - proposal_id: Uuid, -} - -fn knowledge_foundation_request(ids: KnowledgeSourceIds) -> KnowledgePageRebuildRequest { - KnowledgePageRebuildRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: AGENT_ID.to_string(), - page_kind: KnowledgePageKind::Project, - page_key: "knowledge-foundation".to_string(), - title: Some("Knowledge Foundation".to_string()), - doc_ids: vec![ids.doc_id], - doc_chunk_ids: vec![ids.chunk_id], - note_ids: vec![ids.note_id], - event_ids: vec![ids.event_id], - relation_ids: vec![ids.fact_id], - proposal_ids: vec![ids.proposal_id], - provider_metadata: serde_json::json!({}), - } -} - -fn assert_first_rebuild(first: &KnowledgePageRebuildResponse) { - assert_eq!(first.page.sections.len(), 6); - assert_eq!(first.page.source_refs.len(), 6); - assert!(first.page.sections.iter().all(|section| { - section.citations.as_array().is_some_and(|citations| !citations.is_empty()) - })); - assert!(first.page.source_refs.iter().any(|source_ref| source_ref.source_kind == "doc")); - assert!(first.page.source_refs.iter().any(|source_ref| source_ref.source_kind == "doc_chunk")); - assert_eq!(first.page.page.source_coverage["coverage_complete"], true); - assert_eq!(first.page.page.rebuild_metadata["deterministic"], true); - assert_eq!( - first.page.page.rebuild_metadata["generated_by"]["runtime"], - "ElfService::knowledge_page_rebuild" - ); - assert_eq!( - first.page.page.rebuild_metadata["memory_candidate_policy"]["direct_memory_ledger_mutation_allowed"], - false - ); - assert_eq!( - first.page.page.rebuild_metadata["version_identity"]["schema"], - "elf.knowledge_page.version_identity/v1" - ); - assert_eq!( - first - .page - .page - .previous_version_diff - .as_ref() - .expect("initial rebuild should expose no-previous diff")["available"], - false - ); -} - -async fn setup_service(test_name: &str) -> Option { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); - - return None; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); - - return None; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let cfg = acceptance::test_config( - test_db.dsn().to_string(), - qdrant_url, - 4_096, - collection, - docs_collection, - ); - let extractor = SpyExtractor { - calls: Arc::new(AtomicUsize::new(0)), - payload: serde_json::json!({ "notes": [] }), - }; - let providers = Providers::new( - Arc::new(StubEmbedding { vector_dim: 4_096 }), - Arc::new(StubRerank), - Arc::new(extractor), - ); - let service = - acceptance::build_service(cfg, providers).await.expect("Failed to build service."); - - acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); - - Some(KnowledgeFixture { service, _test_db: test_db }) -} - -async fn insert_source_note(service: &ElfService, key: &str, text: &str) -> Uuid { - let response = service - .add_note(AddNoteRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: AGENT_ID.to_string(), - scope: "agent_private".to_string(), - notes: vec![AddNoteInput { - r#type: "fact".to_string(), - key: Some(key.to_string()), - text: text.to_string(), - structured: None, - importance: 0.7, - confidence: 0.9, - ttl_days: None, - source_ref: serde_json::json!({ "schema": "acceptance/v1", "key": key }), - write_policy: None, - }], - }) - .await - .expect("add_note should persist source note"); - - response.results[0].note_id.expect("source note id should be present") -} - -async fn insert_event_audit(service: &ElfService, note_id: Uuid) -> Uuid { - let decision_id = Uuid::new_v4(); - - sqlx::query( - "\ -INSERT INTO memory_ingest_decisions ( - decision_id, - tenant_id, - project_id, - agent_id, - scope, - pipeline, - note_type, - note_key, - note_id, - base_decision, - policy_decision, - note_op, - reason_code, - details, - ts -) -VALUES ($1,$2,$3,$4,'agent_private','add_event','fact','knowledge_event',$5,'remember','remember','ADD',NULL,$6,$7)", - ) - .bind(decision_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(note_id) - .bind(serde_json::json!({ "fixture": "knowledge_page_event_audit" })) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("event audit should be inserted"); - - decision_id -} - -async fn insert_source_document(service: &ElfService) -> (Uuid, Uuid) { - let doc_id = Uuid::new_v4(); - let chunk_id = Uuid::new_v4(); - let content = "The Knowledge Workspace compiles Source Library spans into cited derived pages."; - let content_hash = blake3::hash(content.as_bytes()).to_hex().to_string(); - let chunk_hash = blake3::hash(content.as_bytes()).to_hex().to_string(); - let source_ref = serde_json::json!({ - "schema": "doc_source_ref/v1", - "doc_type": "knowledge", - "uri": "docs://knowledge/workspace/source-span-fixture", - "source_record_id": doc_id, - "content_hash": content_hash, - "source_spans": [ - { - "schema": "doc_source_span/v1", - "span_id": Uuid::new_v4(), - "chunk_id": chunk_id, - "status": "captured", - "start_offset": 0, - "end_offset": content.len(), - "content_hash": content_hash, - "chunk_hash": chunk_hash - } - ] - }); - - sqlx::query( - "\ -INSERT INTO doc_documents ( - doc_id, - tenant_id, - project_id, - agent_id, - scope, - doc_type, - status, - title, - source_ref, - content, - content_bytes, - content_hash, - created_at, - updated_at -) -VALUES ($1,$2,$3,$4,'project_shared','knowledge','active','Knowledge Workspace Source',$5,$6,$7,$8,$9,$9)", - ) - .bind(doc_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(source_ref) - .bind(content) - .bind(i32::try_from(content.len()).expect("fixture content length should fit i32")) - .bind(content_hash) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("source document should be inserted"); - sqlx::query( - "\ -INSERT INTO doc_chunks ( - chunk_id, - doc_id, - chunk_index, - start_offset, - end_offset, - chunk_text, - chunk_hash, - created_at -) -VALUES ($1,$2,0,0,$3,$4,$5,$6)", - ) - .bind(chunk_id) - .bind(doc_id) - .bind(i32::try_from(content.len()).expect("fixture content length should fit i32")) - .bind(content) - .bind(chunk_hash) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("source document chunk should be inserted"); - - (doc_id, chunk_id) -} - -async fn insert_relation(service: &ElfService, note_id: Uuid) -> Uuid { - let subject_id = Uuid::new_v4(); - let fact_id = Uuid::new_v4(); - let evidence_id = Uuid::new_v4(); - - sqlx::query( - "\ -INSERT INTO graph_entities ( - entity_id, - tenant_id, - project_id, - canonical, - canonical_norm, - kind, - created_at, - updated_at -) -VALUES ($1,$2,$3,'ELF knowledge pages','elf knowledge pages','concept',$4,$4)", - ) - .bind(subject_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("graph entity should be inserted"); - sqlx::query( - "\ -INSERT INTO graph_facts ( - fact_id, - tenant_id, - project_id, - agent_id, - scope, - subject_entity_id, - predicate, - predicate_id, - object_entity_id, - object_value, - valid_from, - valid_to, - created_at, - updated_at -) -VALUES ($1,$2,$3,$4,'project_shared',$5,'compile from',NULL,NULL,'authoritative source memory',$6,NULL,$6,$6)", - ) - .bind(fact_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(subject_id) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("graph fact should be inserted"); - sqlx::query( - "\ -INSERT INTO graph_fact_evidence (evidence_id, fact_id, note_id, created_at) -VALUES ($1,$2,$3,$4)", - ) - .bind(evidence_id) - .bind(fact_id) - .bind(note_id) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("graph fact evidence should be inserted"); - - fact_id -} - -async fn insert_applied_proposal(service: &ElfService, note_id: Uuid) -> Uuid { - let run_id = Uuid::new_v4(); - let proposal_id = Uuid::new_v4(); - let source_refs = serde_json::json!([ - { - "kind": "note", - "id": note_id, - "snapshot": { - "status": "active", - "updated_at": "1970-01-01T00:00:00Z", - "metadata": { "fixture": "knowledge_pages" }, - "source_ref": {} - } - } - ]); - let lineage = serde_json::json!({ "source_refs": source_refs }); - - sqlx::query( - "\ -INSERT INTO consolidation_runs ( - run_id, - tenant_id, - project_id, - agent_id, - contract_schema, - job_kind, - status, - input_refs, - source_snapshot, - lineage, - error, - created_at, - updated_at, - completed_at -) -VALUES ($1,$2,$3,$4,'elf.consolidation/v1','manual','completed',$5,$6,$7,'{}'::jsonb,$8,$8,$8)", - ) - .bind(run_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(&source_refs) - .bind(serde_json::json!({ "source_count": 1 })) - .bind(&lineage) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("consolidation run should be inserted"); - sqlx::query( - "\ -INSERT INTO consolidation_proposals ( - proposal_id, - run_id, - tenant_id, - project_id, - agent_id, - contract_schema, - proposal_kind, - apply_intent, - review_state, - source_refs, - source_snapshot, - lineage, - diff, - confidence, - unsupported_claim_flags, - contradiction_markers, - staleness_markers, - target_ref, - proposed_payload, - reviewer_agent_id, - review_comment, - reviewed_at, - created_at, - updated_at -) -VALUES ($1,$2,$3,$4,$5,'elf.consolidation/v1','knowledge_page','create_derived_knowledge_page','applied',$6,$7,$8,$9,0.9,'[]'::jsonb,'[]'::jsonb,'[]'::jsonb,'{}'::jsonb,$10,$5,'Apply derived page proposal.',$11,$11,$11)", - ) - .bind(proposal_id) - .bind(run_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(AGENT_ID) - .bind(&source_refs) - .bind(serde_json::json!({ "source_count": 1 })) - .bind(&lineage) - .bind(serde_json::json!({ - "summary": "Create a derived knowledge page from cited source memory.", - "before": {}, - "after": { "page_key": "knowledge-foundation" } - })) - .bind(serde_json::json!({ "page_key": "knowledge-foundation" })) - .bind(OffsetDateTime::UNIX_EPOCH) - .execute(&service.db.pool) - .await - .expect("consolidation proposal should be inserted"); - proposal_id -} - -async fn insert_rebuild_sources(service: &ElfService) -> KnowledgeSourceIds { - let note_id = insert_source_note( - service, - "knowledge_pages_foundation", - "Fact: Derived knowledge pages are rebuilt from authoritative source memory and keep citations.", - ) - .await; - let event_id = insert_event_audit(service, note_id).await; - let (doc_id, chunk_id) = insert_source_document(service).await; - let fact_id = insert_relation(service, note_id).await; - let proposal_id = insert_applied_proposal(service, note_id).await; +use time::OffsetDateTime; - KnowledgeSourceIds { note_id, event_id, doc_id, chunk_id, fact_id, proposal_id } -} +use elf_domain::knowledge::KnowledgePageKind; +use elf_service::{KnowledgePageLintRequest, KnowledgePageSearchRequest}; #[tokio::test] #[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run this test."] diff --git a/packages/elf-service/tests/acceptance/knowledge_pages/helpers.rs b/packages/elf-service/tests/acceptance/knowledge_pages/helpers.rs new file mode 100644 index 00000000..c86345ad --- /dev/null +++ b/packages/elf-service/tests/acceptance/knowledge_pages/helpers.rs @@ -0,0 +1,453 @@ +use std::sync::{Arc, atomic::AtomicUsize}; + +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; +use elf_domain::knowledge::KnowledgePageKind; +use elf_service::{ + AddNoteInput, AddNoteRequest, ElfService, KnowledgePageRebuildRequest, + KnowledgePageRebuildResponse, Providers, +}; +use elf_testkit::TestDatabase; + +pub(crate) const TENANT_ID: &str = "tenant_knowledge"; +pub(crate) const PROJECT_ID: &str = "project_knowledge"; +pub(crate) const AGENT_ID: &str = "agent_knowledge"; + +pub(crate) struct KnowledgeFixture { + pub(crate) service: ElfService, + pub(crate) _test_db: TestDatabase, +} + +#[derive(Clone, Copy)] +pub(crate) struct KnowledgeSourceIds { + pub(crate) note_id: Uuid, + pub(crate) event_id: Uuid, + pub(crate) doc_id: Uuid, + pub(crate) chunk_id: Uuid, + pub(crate) fact_id: Uuid, + pub(crate) proposal_id: Uuid, +} + +pub(crate) fn knowledge_foundation_request(ids: KnowledgeSourceIds) -> KnowledgePageRebuildRequest { + KnowledgePageRebuildRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + page_kind: KnowledgePageKind::Project, + page_key: "knowledge-foundation".to_string(), + title: Some("Knowledge Foundation".to_string()), + doc_ids: vec![ids.doc_id], + doc_chunk_ids: vec![ids.chunk_id], + note_ids: vec![ids.note_id], + event_ids: vec![ids.event_id], + relation_ids: vec![ids.fact_id], + proposal_ids: vec![ids.proposal_id], + provider_metadata: serde_json::json!({}), + } +} + +pub(crate) fn assert_first_rebuild(first: &KnowledgePageRebuildResponse) { + assert_eq!(first.page.sections.len(), 6); + assert_eq!(first.page.source_refs.len(), 6); + assert!(first.page.sections.iter().all(|section| { + section.citations.as_array().is_some_and(|citations| !citations.is_empty()) + })); + assert!(first.page.source_refs.iter().any(|source_ref| source_ref.source_kind == "doc")); + assert!(first.page.source_refs.iter().any(|source_ref| source_ref.source_kind == "doc_chunk")); + assert_eq!(first.page.page.source_coverage["coverage_complete"], true); + assert_eq!(first.page.page.rebuild_metadata["deterministic"], true); + assert_eq!( + first.page.page.rebuild_metadata["generated_by"]["runtime"], + "ElfService::knowledge_page_rebuild" + ); + assert_eq!( + first.page.page.rebuild_metadata["memory_candidate_policy"]["direct_memory_ledger_mutation_allowed"], + false + ); + assert_eq!( + first.page.page.rebuild_metadata["version_identity"]["schema"], + "elf.knowledge_page.version_identity/v1" + ); + assert_eq!( + first + .page + .page + .previous_version_diff + .as_ref() + .expect("initial rebuild should expose no-previous diff")["available"], + false + ); +} + +pub(crate) async fn setup_service(test_name: &str) -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); + + return None; + }; + let Some(qdrant_url) = acceptance::test_qdrant_url() else { + eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); + + return None; + }; + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + let extractor = SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }; + let providers = Providers::new( + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(extractor), + ); + let service = + acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + + Some(KnowledgeFixture { service, _test_db: test_db }) +} + +pub(crate) async fn insert_source_note(service: &ElfService, key: &str, text: &str) -> Uuid { + let response = service + .add_note(AddNoteRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + scope: "agent_private".to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: Some(key.to_string()), + text: text.to_string(), + structured: None, + importance: 0.7, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({ "schema": "acceptance/v1", "key": key }), + write_policy: None, + }], + }) + .await + .expect("add_note should persist source note"); + + response.results[0].note_id.expect("source note id should be present") +} + +pub(crate) async fn insert_event_audit(service: &ElfService, note_id: Uuid) -> Uuid { + let decision_id = Uuid::new_v4(); + + sqlx::query( + "\ +INSERT INTO memory_ingest_decisions ( + decision_id, + tenant_id, + project_id, + agent_id, + scope, + pipeline, + note_type, + note_key, + note_id, + base_decision, + policy_decision, + note_op, + reason_code, + details, + ts +) +VALUES ($1,$2,$3,$4,'agent_private','add_event','fact','knowledge_event',$5,'remember','remember','ADD',NULL,$6,$7)", + ) + .bind(decision_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(note_id) + .bind(serde_json::json!({ "fixture": "knowledge_page_event_audit" })) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("event audit should be inserted"); + + decision_id +} + +pub(crate) async fn insert_source_document(service: &ElfService) -> (Uuid, Uuid) { + let doc_id = Uuid::new_v4(); + let chunk_id = Uuid::new_v4(); + let content = "The Knowledge Workspace compiles Source Library spans into cited derived pages."; + let content_hash = blake3::hash(content.as_bytes()).to_hex().to_string(); + let chunk_hash = blake3::hash(content.as_bytes()).to_hex().to_string(); + let source_ref = serde_json::json!({ + "schema": "doc_source_ref/v1", + "doc_type": "knowledge", + "uri": "docs://knowledge/workspace/source-span-fixture", + "source_record_id": doc_id, + "content_hash": content_hash, + "source_spans": [ + { + "schema": "doc_source_span/v1", + "span_id": Uuid::new_v4(), + "chunk_id": chunk_id, + "status": "captured", + "start_offset": 0, + "end_offset": content.len(), + "content_hash": content_hash, + "chunk_hash": chunk_hash + } + ] + }); + + sqlx::query( + "\ +INSERT INTO doc_documents ( + doc_id, + tenant_id, + project_id, + agent_id, + scope, + doc_type, + status, + title, + source_ref, + content, + content_bytes, + content_hash, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,'project_shared','knowledge','active','Knowledge Workspace Source',$5,$6,$7,$8,$9,$9)", + ) + .bind(doc_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(source_ref) + .bind(content) + .bind(i32::try_from(content.len()).expect("fixture content length should fit i32")) + .bind(content_hash) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("source document should be inserted"); + sqlx::query( + "\ +INSERT INTO doc_chunks ( + chunk_id, + doc_id, + chunk_index, + start_offset, + end_offset, + chunk_text, + chunk_hash, + created_at +) +VALUES ($1,$2,0,0,$3,$4,$5,$6)", + ) + .bind(chunk_id) + .bind(doc_id) + .bind(i32::try_from(content.len()).expect("fixture content length should fit i32")) + .bind(content) + .bind(chunk_hash) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("source document chunk should be inserted"); + + (doc_id, chunk_id) +} + +pub(crate) async fn insert_relation(service: &ElfService, note_id: Uuid) -> Uuid { + let subject_id = Uuid::new_v4(); + let fact_id = Uuid::new_v4(); + let evidence_id = Uuid::new_v4(); + + sqlx::query( + "\ +INSERT INTO graph_entities ( + entity_id, + tenant_id, + project_id, + canonical, + canonical_norm, + kind, + created_at, + updated_at +) +VALUES ($1,$2,$3,'ELF knowledge pages','elf knowledge pages','concept',$4,$4)", + ) + .bind(subject_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("graph entity should be inserted"); + sqlx::query( + "\ +INSERT INTO graph_facts ( + fact_id, + tenant_id, + project_id, + agent_id, + scope, + subject_entity_id, + predicate, + predicate_id, + object_entity_id, + object_value, + valid_from, + valid_to, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,'project_shared',$5,'compile from',NULL,NULL,'authoritative source memory',$6,NULL,$6,$6)", + ) + .bind(fact_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(subject_id) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("graph fact should be inserted"); + sqlx::query( + "\ +INSERT INTO graph_fact_evidence (evidence_id, fact_id, note_id, created_at) +VALUES ($1,$2,$3,$4)", + ) + .bind(evidence_id) + .bind(fact_id) + .bind(note_id) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("graph fact evidence should be inserted"); + + fact_id +} + +pub(crate) async fn insert_applied_proposal(service: &ElfService, note_id: Uuid) -> Uuid { + let run_id = Uuid::new_v4(); + let proposal_id = Uuid::new_v4(); + let source_refs = serde_json::json!([ + { + "kind": "note", + "id": note_id, + "snapshot": { + "status": "active", + "updated_at": "1970-01-01T00:00:00Z", + "metadata": { "fixture": "knowledge_pages" }, + "source_ref": {} + } + } + ]); + let lineage = serde_json::json!({ "source_refs": source_refs }); + + sqlx::query( + "\ +INSERT INTO consolidation_runs ( + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + job_kind, + status, + input_refs, + source_snapshot, + lineage, + error, + created_at, + updated_at, + completed_at +) +VALUES ($1,$2,$3,$4,'elf.consolidation/v1','manual','completed',$5,$6,$7,'{}'::jsonb,$8,$8,$8)", + ) + .bind(run_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(&source_refs) + .bind(serde_json::json!({ "source_count": 1 })) + .bind(&lineage) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("consolidation run should be inserted"); + sqlx::query( + "\ +INSERT INTO consolidation_proposals ( + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + unsupported_claim_flags, + contradiction_markers, + staleness_markers, + target_ref, + proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,$5,'elf.consolidation/v1','knowledge_page','create_derived_knowledge_page','applied',$6,$7,$8,$9,0.9,'[]'::jsonb,'[]'::jsonb,'[]'::jsonb,'{}'::jsonb,$10,$5,'Apply derived page proposal.',$11,$11,$11)", + ) + .bind(proposal_id) + .bind(run_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(&source_refs) + .bind(serde_json::json!({ "source_count": 1 })) + .bind(&lineage) + .bind(serde_json::json!({ + "summary": "Create a derived knowledge page from cited source memory.", + "before": {}, + "after": { "page_key": "knowledge-foundation" } + })) + .bind(serde_json::json!({ "page_key": "knowledge-foundation" })) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("consolidation proposal should be inserted"); + + proposal_id +} + +pub(crate) async fn insert_rebuild_sources(service: &ElfService) -> KnowledgeSourceIds { + let note_id = insert_source_note( + service, + "knowledge_pages_foundation", + "Fact: Derived knowledge pages are rebuilt from authoritative source memory and keep citations.", + ) + .await; + let event_id = insert_event_audit(service, note_id).await; + let (doc_id, chunk_id) = insert_source_document(service).await; + let fact_id = insert_relation(service, note_id).await; + let proposal_id = insert_applied_proposal(service, note_id).await; + + KnowledgeSourceIds { note_id, event_id, doc_id, chunk_id, fact_id, proposal_id } +} diff --git a/packages/elf-service/tests/acceptance/trace_admin_observability.rs b/packages/elf-service/tests/acceptance/trace_admin_observability.rs index 30453fe9..34591da8 100644 --- a/packages/elf-service/tests/acceptance/trace_admin_observability.rs +++ b/packages/elf-service/tests/acceptance/trace_admin_observability.rs @@ -1,430 +1,16 @@ -use std::sync::{Arc, atomic::AtomicUsize}; +mod helpers; -use serde_json::Value; -use sqlx::PgPool; -use time::{Duration, OffsetDateTime}; -use uuid::Uuid; - -use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; -use elf_service::{ - ElfService, Providers, SearchExplainRequest, TraceBundleGetRequest, TraceGetRequest, - TraceRecentListRequest, TraceRecentListResponse, TraceTrajectoryGetRequest, - search::{TraceBundleMode, TraceReplayCandidate}, +pub(crate) use helpers::{ + PROJECT_ID, TENANT_ID, TraceAdminObservabilityFixture, VisibilityTraceFixtureIds, + assert_trace_admin_visibility_cross_scope, insert_trace, insert_trace_candidate, + insert_trace_item, insert_trace_stage, insert_trace_stage_item, + seed_visibility_and_recent_list_traces, setup_service, trace_recent_list_page, }; -use elf_testkit::TestDatabase; - -const TENANT_ID: &str = "tenant_admin_scope"; -const PROJECT_ID: &str = "project_admin_scope"; -const TRACE_VERSION: i32 = 3; - -struct TraceAdminObservabilityFixture { - service: ElfService, - test_db: TestDatabase, -} - -struct VisibilityTraceFixtureIds { - trace_one: Uuid, - trace_two: Uuid, - trace_three: Uuid, - item_two: Uuid, -} - -async fn setup_service(test_name: &str) -> Option { - let Some(test_db) = acceptance::test_db().await else { - eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); - - return None; - }; - let Some(qdrant_url) = acceptance::test_qdrant_url() else { - eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); - - return None; - }; - let collection = test_db.collection_name("elf_acceptance"); - let docs_collection = test_db.collection_name("elf_acceptance_docs"); - let cfg = acceptance::test_config( - test_db.dsn().to_string(), - qdrant_url, - 4_096, - collection, - docs_collection, - ); - let extractor = SpyExtractor { - calls: Arc::new(AtomicUsize::new(0)), - payload: serde_json::json!({ "notes": [] }), - }; - let providers = Providers::new( - Arc::new(StubEmbedding { vector_dim: 4_096 }), - Arc::new(StubRerank), - Arc::new(extractor), - ); - let service = - acceptance::build_service(cfg, providers).await.expect("Failed to build service."); - - acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); - - Some(TraceAdminObservabilityFixture { service, test_db }) -} - -async fn insert_trace( - executor: &PgPool, - trace_id: Uuid, - agent_id: &str, - read_profile: &str, - query: &str, - created_at: OffsetDateTime, -) { - sqlx::query( - "\ -INSERT INTO search_traces ( - trace_id, - tenant_id, - project_id, - agent_id, - read_profile, - query, - expansion_mode, - expanded_queries, - allowed_scopes, - candidate_count, - top_k, - config_snapshot, - trace_version, - created_at, - expires_at -) - VALUES ( - $1, - $2, - $3, - $4, - $5, - $6, - $7, - $8, - $9, - $10, - $11, - $12, - $13, - $14, - $15 - )", - ) - .bind(trace_id) - .bind(TENANT_ID) - .bind(PROJECT_ID) - .bind(agent_id) - .bind(read_profile) - .bind(query) - .bind("full") - .bind(serde_json::json!([query])) - .bind(serde_json::json!(["agent_private", "project_shared", "org_shared"])) - .bind(10_i32) - .bind(5_i32) - .bind(serde_json::json!({ "test": true })) - .bind(TRACE_VERSION) - .bind(created_at) - .bind(created_at + Duration::minutes(60)) - .execute(executor) - .await - .expect("Failed to insert trace."); -} - -async fn insert_trace_item( - executor: &PgPool, - item_id: Uuid, - trace_id: Uuid, - note_id: Uuid, - chunk_id: Uuid, - rank: i32, -) { - sqlx::query( - "\ -INSERT INTO search_trace_items ( - item_id, - trace_id, - note_id, - chunk_id, - rank, - final_score, - explain -) -VALUES ($1, $2, $3, $4, $5, $6, $7)", - ) - .bind(item_id) - .bind(trace_id) - .bind(note_id) - .bind(chunk_id) - .bind(rank) - .bind(1.0_f32) - .bind(serde_json::json!({ - "match": { "matched_terms": [], "matched_fields": [] }, - "ranking": { - "schema": "search_ranking_explain/v2", - "policy_id": "ranking_v2:test", - "final_score": 1.0, - "terms": [] - } - })) - .execute(executor) - .await - .expect("Failed to insert trace item."); -} - -async fn insert_trace_stage( - executor: &PgPool, - stage_id: Uuid, - trace_id: Uuid, - stage_order: i32, - stage_name: &str, - created_at: OffsetDateTime, -) { - sqlx::query( - "\ -INSERT INTO search_trace_stages ( - stage_id, - trace_id, - stage_order, - stage_name, - stage_payload, - created_at -) -VALUES ($1, $2, $3, $4, $5, $6)", - ) - .bind(stage_id) - .bind(trace_id) - .bind(stage_order) - .bind(stage_name) - .bind(serde_json::json!({ - "stage_name": stage_name, - "metrics": { "items": 0 } - })) - .bind(created_at) - .execute(executor) - .await - .expect("Failed to insert trace stage."); -} - -async fn insert_trace_stage_item( - executor: &PgPool, - item_id: Uuid, - stage_id: Uuid, - note_id: Uuid, - chunk_id: Uuid, - metrics: Value, -) { - sqlx::query( - "\ -INSERT INTO search_trace_stage_items ( - id, - stage_id, - item_id, - note_id, - chunk_id, - metrics -) -VALUES ($1, $2, $3, $4, $5, $6)", - ) - .bind(Uuid::new_v4()) - .bind(stage_id) - .bind(item_id) - .bind(note_id) - .bind(chunk_id) - .bind(metrics) - .execute(executor) - .await - .expect("Failed to insert trace stage item."); -} - -#[allow(clippy::too_many_arguments)] -async fn insert_trace_candidate( - executor: &PgPool, - candidate_id: Uuid, - trace_id: Uuid, - note_id: Uuid, - chunk_id: Uuid, - rank: i32, - retrieval_rank: i32, - retrieval_score: f32, - created_at: OffsetDateTime, -) { - sqlx::query( - "\ -INSERT INTO search_trace_candidates ( - candidate_id, - trace_id, - note_id, - chunk_id, - chunk_index, - snippet, - candidate_snapshot, - retrieval_rank, - rerank_score, - note_scope, - note_importance, - note_updated_at, - note_hit_count, - note_last_hit_at, - created_at, - expires_at -) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)", - ) - .bind(candidate_id) - .bind(trace_id) - .bind(note_id) - .bind(chunk_id) - .bind(rank) - .bind("trace candidate snippet") - .bind({ - let candidate_snapshot = TraceReplayCandidate { - note_id, - chunk_id, - chunk_index: rank, - snippet: "trace candidate snippet".to_string(), - retrieval_rank: retrieval_rank as u32, - retrieval_score: Some(retrieval_score), - rerank_score: retrieval_score, - note_scope: "agent_private".to_string(), - note_importance: 0.6, - note_updated_at: created_at, - note_hit_count: 12, - note_last_hit_at: None, - diversity_selected: None, - diversity_selected_rank: None, - diversity_selected_reason: None, - diversity_skipped_reason: None, - diversity_nearest_selected_note_id: None, - diversity_similarity: None, - diversity_mmr_score: None, - diversity_missing_embedding: None, - }; - - serde_json::to_value(candidate_snapshot) - .expect("Failed to serialize trace replay candidate.") - }) - .bind(retrieval_rank) - .bind(retrieval_score) - .bind("agent_private") - .bind(0.6_f32) - .bind(created_at) - .bind(12_i64) - .bind(Option::::None) - .bind(created_at) - .bind(created_at + Duration::minutes(90)) - .execute(executor) - .await - .expect("Failed to insert trace candidate."); -} -async fn seed_visibility_and_recent_list_traces( - service: &ElfService, - now: OffsetDateTime, -) -> VisibilityTraceFixtureIds { - let trace_one = Uuid::new_v4(); - let trace_two = Uuid::new_v4(); - let trace_three = Uuid::new_v4(); - let item_one = Uuid::new_v4(); - let item_two = Uuid::new_v4(); - let item_three = Uuid::new_v4(); - let note_one = Uuid::new_v4(); - let note_two = Uuid::new_v4(); - let note_three = Uuid::new_v4(); - let chunk_one = Uuid::new_v4(); - let chunk_two = Uuid::new_v4(); - let chunk_three = Uuid::new_v4(); - - insert_trace(&service.db.pool, trace_one, "agent_one", "private_only", "one", now).await; - insert_trace( - &service.db.pool, - trace_two, - "agent_two", - "private_only", - "two", - now - Duration::seconds(10), - ) - .await; - insert_trace( - &service.db.pool, - trace_three, - "agent_three", - "private_only", - "three", - now - Duration::seconds(20), - ) - .await; - insert_trace_item(&service.db.pool, item_one, trace_one, note_one, chunk_one, 1).await; - insert_trace_item(&service.db.pool, item_two, trace_two, note_two, chunk_two, 1).await; - insert_trace_item(&service.db.pool, item_three, trace_three, note_three, chunk_three, 1).await; - - VisibilityTraceFixtureIds { trace_one, trace_two, trace_three, item_two } -} - -async fn trace_recent_list_page( - service: &ElfService, - cursor_created_at: Option, - cursor_trace_id: Option, -) -> TraceRecentListResponse { - service - .trace_recent_list(TraceRecentListRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: "admin_agent".to_string(), - limit: Some(2), - cursor_created_at, - cursor_trace_id, - agent_id_filter: None, - read_profile: None, - created_after: None, - created_before: None, - }) - .await - .expect("Failed to list recent traces.") -} - -async fn assert_trace_admin_visibility_cross_scope( - service: &ElfService, - trace_id: Uuid, - item_id: Uuid, -) { - let cross_agent_trace_get = service - .trace_get(TraceGetRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: "different_agent".to_string(), - trace_id, - }) - .await - .expect("Expected cross-agent trace lookup to bypass agent ownership filtering."); - - assert_eq!(cross_agent_trace_get.trace.trace_id, trace_id); - assert_eq!(cross_agent_trace_get.trace.agent_id, "agent_two"); - - let cross_agent_trajectory = service - .trace_trajectory_get(TraceTrajectoryGetRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: "different_agent".to_string(), - trace_id, - }) - .await - .expect("Expected cross-agent trajectory lookup to bypass agent ownership filtering."); - - assert_eq!(cross_agent_trajectory.trace.trace_id, trace_id); - - let cross_agent_item = service - .search_explain(SearchExplainRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: "different_agent".to_string(), - result_handle: item_id, - }) - .await - .expect("Expected cross-agent trace-item lookup to bypass agent ownership filtering."); +use time::OffsetDateTime; +use uuid::Uuid; - assert_eq!(cross_agent_item.item.result_handle, item_id); -} +use elf_service::{TraceBundleGetRequest, search::TraceBundleMode}; #[tokio::test] #[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] diff --git a/packages/elf-service/tests/acceptance/trace_admin_observability/helpers.rs b/packages/elf-service/tests/acceptance/trace_admin_observability/helpers.rs new file mode 100644 index 00000000..d35c863a --- /dev/null +++ b/packages/elf-service/tests/acceptance/trace_admin_observability/helpers.rs @@ -0,0 +1,426 @@ +use std::sync::{Arc, atomic::AtomicUsize}; + +use serde_json::Value; +use sqlx::PgPool; +use time::{Duration, OffsetDateTime}; +use uuid::Uuid; + +use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; +use elf_service::{ + ElfService, Providers, SearchExplainRequest, TraceGetRequest, TraceRecentListRequest, + TraceRecentListResponse, TraceTrajectoryGetRequest, search::TraceReplayCandidate, +}; +use elf_testkit::TestDatabase; + +pub(crate) const TENANT_ID: &str = "tenant_admin_scope"; +pub(crate) const PROJECT_ID: &str = "project_admin_scope"; +pub(crate) const TRACE_VERSION: i32 = 3; + +pub(crate) struct TraceAdminObservabilityFixture { + pub(crate) service: ElfService, + pub(crate) test_db: TestDatabase, +} + +pub(crate) struct VisibilityTraceFixtureIds { + pub(crate) trace_one: Uuid, + pub(crate) trace_two: Uuid, + pub(crate) trace_three: Uuid, + pub(crate) item_two: Uuid, +} + +pub(crate) async fn setup_service(test_name: &str) -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); + + return None; + }; + let Some(qdrant_url) = acceptance::test_qdrant_url() else { + eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); + + return None; + }; + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + let extractor = SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }; + let providers = Providers::new( + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(extractor), + ); + let service = + acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + + Some(TraceAdminObservabilityFixture { service, test_db }) +} + +pub(crate) async fn insert_trace( + executor: &PgPool, + trace_id: Uuid, + agent_id: &str, + read_profile: &str, + query: &str, + created_at: OffsetDateTime, +) { + sqlx::query( + "\ +INSERT INTO search_traces ( + trace_id, + tenant_id, + project_id, + agent_id, + read_profile, + query, + expansion_mode, + expanded_queries, + allowed_scopes, + candidate_count, + top_k, + config_snapshot, + trace_version, + created_at, + expires_at +) + VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15 + )", + ) + .bind(trace_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(agent_id) + .bind(read_profile) + .bind(query) + .bind("full") + .bind(serde_json::json!([query])) + .bind(serde_json::json!(["agent_private", "project_shared", "org_shared"])) + .bind(10_i32) + .bind(5_i32) + .bind(serde_json::json!({ "test": true })) + .bind(TRACE_VERSION) + .bind(created_at) + .bind(created_at + Duration::minutes(60)) + .execute(executor) + .await + .expect("Failed to insert trace."); +} + +pub(crate) async fn insert_trace_item( + executor: &PgPool, + item_id: Uuid, + trace_id: Uuid, + note_id: Uuid, + chunk_id: Uuid, + rank: i32, +) { + sqlx::query( + "\ +INSERT INTO search_trace_items ( + item_id, + trace_id, + note_id, + chunk_id, + rank, + final_score, + explain +) +VALUES ($1, $2, $3, $4, $5, $6, $7)", + ) + .bind(item_id) + .bind(trace_id) + .bind(note_id) + .bind(chunk_id) + .bind(rank) + .bind(1.0_f32) + .bind(serde_json::json!({ + "match": { "matched_terms": [], "matched_fields": [] }, + "ranking": { + "schema": "search_ranking_explain/v2", + "policy_id": "ranking_v2:test", + "final_score": 1.0, + "terms": [] + } + })) + .execute(executor) + .await + .expect("Failed to insert trace item."); +} + +pub(crate) async fn insert_trace_stage( + executor: &PgPool, + stage_id: Uuid, + trace_id: Uuid, + stage_order: i32, + stage_name: &str, + created_at: OffsetDateTime, +) { + sqlx::query( + "\ +INSERT INTO search_trace_stages ( + stage_id, + trace_id, + stage_order, + stage_name, + stage_payload, + created_at +) +VALUES ($1, $2, $3, $4, $5, $6)", + ) + .bind(stage_id) + .bind(trace_id) + .bind(stage_order) + .bind(stage_name) + .bind(serde_json::json!({ + "stage_name": stage_name, + "metrics": { "items": 0 } + })) + .bind(created_at) + .execute(executor) + .await + .expect("Failed to insert trace stage."); +} + +pub(crate) async fn insert_trace_stage_item( + executor: &PgPool, + item_id: Uuid, + stage_id: Uuid, + note_id: Uuid, + chunk_id: Uuid, + metrics: Value, +) { + sqlx::query( + "\ +INSERT INTO search_trace_stage_items ( + id, + stage_id, + item_id, + note_id, + chunk_id, + metrics +) +VALUES ($1, $2, $3, $4, $5, $6)", + ) + .bind(Uuid::new_v4()) + .bind(stage_id) + .bind(item_id) + .bind(note_id) + .bind(chunk_id) + .bind(metrics) + .execute(executor) + .await + .expect("Failed to insert trace stage item."); +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn insert_trace_candidate( + executor: &PgPool, + candidate_id: Uuid, + trace_id: Uuid, + note_id: Uuid, + chunk_id: Uuid, + rank: i32, + retrieval_rank: i32, + retrieval_score: f32, + created_at: OffsetDateTime, +) { + sqlx::query( + "\ +INSERT INTO search_trace_candidates ( + candidate_id, + trace_id, + note_id, + chunk_id, + chunk_index, + snippet, + candidate_snapshot, + retrieval_rank, + rerank_score, + note_scope, + note_importance, + note_updated_at, + note_hit_count, + note_last_hit_at, + created_at, + expires_at +) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)", + ) + .bind(candidate_id) + .bind(trace_id) + .bind(note_id) + .bind(chunk_id) + .bind(rank) + .bind("trace candidate snippet") + .bind({ + let candidate_snapshot = TraceReplayCandidate { + note_id, + chunk_id, + chunk_index: rank, + snippet: "trace candidate snippet".to_string(), + retrieval_rank: retrieval_rank as u32, + retrieval_score: Some(retrieval_score), + rerank_score: retrieval_score, + note_scope: "agent_private".to_string(), + note_importance: 0.6, + note_updated_at: created_at, + note_hit_count: 12, + note_last_hit_at: None, + diversity_selected: None, + diversity_selected_rank: None, + diversity_selected_reason: None, + diversity_skipped_reason: None, + diversity_nearest_selected_note_id: None, + diversity_similarity: None, + diversity_mmr_score: None, + diversity_missing_embedding: None, + }; + + serde_json::to_value(candidate_snapshot) + .expect("Failed to serialize trace replay candidate.") + }) + .bind(retrieval_rank) + .bind(retrieval_score) + .bind("agent_private") + .bind(0.6_f32) + .bind(created_at) + .bind(12_i64) + .bind(Option::::None) + .bind(created_at) + .bind(created_at + Duration::minutes(90)) + .execute(executor) + .await + .expect("Failed to insert trace candidate."); +} + +pub(crate) async fn seed_visibility_and_recent_list_traces( + service: &ElfService, + now: OffsetDateTime, +) -> VisibilityTraceFixtureIds { + let trace_one = Uuid::new_v4(); + let trace_two = Uuid::new_v4(); + let trace_three = Uuid::new_v4(); + let item_one = Uuid::new_v4(); + let item_two = Uuid::new_v4(); + let item_three = Uuid::new_v4(); + let note_one = Uuid::new_v4(); + let note_two = Uuid::new_v4(); + let note_three = Uuid::new_v4(); + let chunk_one = Uuid::new_v4(); + let chunk_two = Uuid::new_v4(); + let chunk_three = Uuid::new_v4(); + + insert_trace(&service.db.pool, trace_one, "agent_one", "private_only", "one", now).await; + insert_trace( + &service.db.pool, + trace_two, + "agent_two", + "private_only", + "two", + now - Duration::seconds(10), + ) + .await; + insert_trace( + &service.db.pool, + trace_three, + "agent_three", + "private_only", + "three", + now - Duration::seconds(20), + ) + .await; + insert_trace_item(&service.db.pool, item_one, trace_one, note_one, chunk_one, 1).await; + insert_trace_item(&service.db.pool, item_two, trace_two, note_two, chunk_two, 1).await; + insert_trace_item(&service.db.pool, item_three, trace_three, note_three, chunk_three, 1).await; + + VisibilityTraceFixtureIds { trace_one, trace_two, trace_three, item_two } +} + +pub(crate) async fn trace_recent_list_page( + service: &ElfService, + cursor_created_at: Option, + cursor_trace_id: Option, +) -> TraceRecentListResponse { + service + .trace_recent_list(TraceRecentListRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: "admin_agent".to_string(), + limit: Some(2), + cursor_created_at, + cursor_trace_id, + agent_id_filter: None, + read_profile: None, + created_after: None, + created_before: None, + }) + .await + .expect("Failed to list recent traces.") +} + +pub(crate) async fn assert_trace_admin_visibility_cross_scope( + service: &ElfService, + trace_id: Uuid, + item_id: Uuid, +) { + let cross_agent_trace_get = service + .trace_get(TraceGetRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: "different_agent".to_string(), + trace_id, + }) + .await + .expect("Expected cross-agent trace lookup to bypass agent ownership filtering."); + + assert_eq!(cross_agent_trace_get.trace.trace_id, trace_id); + assert_eq!(cross_agent_trace_get.trace.agent_id, "agent_two"); + + let cross_agent_trajectory = service + .trace_trajectory_get(TraceTrajectoryGetRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: "different_agent".to_string(), + trace_id, + }) + .await + .expect("Expected cross-agent trajectory lookup to bypass agent ownership filtering."); + + assert_eq!(cross_agent_trajectory.trace.trace_id, trace_id); + + let cross_agent_item = service + .search_explain(SearchExplainRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: "different_agent".to_string(), + result_handle: item_id, + }) + .await + .expect("Expected cross-agent trace-item lookup to bypass agent ownership filtering."); + + assert_eq!(cross_agent_item.item.result_handle, item_id); +}