From 7fb51c5b867819072464f2a58fc55e72d9e248ff Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 30 Jun 2026 10:25:02 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Split provenance loader modules after strict validation.","authority":"manual"} --- .../elf-service/src/provenance/loaders.rs | 331 +----------------- .../src/provenance/loaders/bundle_tables.rs | 171 +++++++++ .../src/provenance/loaders/history_events.rs | 190 ++++++++++ .../loaders/history_events/tests.rs | 168 +++++++++ 4 files changed, 536 insertions(+), 324 deletions(-) create mode 100644 packages/elf-service/src/provenance/loaders/bundle_tables.rs create mode 100644 packages/elf-service/src/provenance/loaders/history_events.rs create mode 100644 packages/elf-service/src/provenance/loaders/history_events/tests.rs diff --git a/packages/elf-service/src/provenance/loaders.rs b/packages/elf-service/src/provenance/loaders.rs index c6706dcc..1bcabb41 100644 --- a/packages/elf-service/src/provenance/loaders.rs +++ b/packages/elf-service/src/provenance/loaders.rs @@ -1,327 +1,10 @@ -use std::collections::HashMap; +mod bundle_tables; +mod history_events; -use serde_json::Value; -use sqlx::PgPool; -use time::OffsetDateTime; -use uuid::Uuid; - -use crate::{ - Result, - provenance::{ - history::{self}, - types::{ - MemoryHistoryEvent, NoteProvenanceIndexingOutbox, NoteProvenanceIngestDecision, - NoteProvenanceNoteVersion, NoteProvenanceRecentTrace, - constants::{ - NOTE_PROVENANCE_HISTORY_LIMIT, NOTE_PROVENANCE_INGEST_DECISIONS_LIMIT, - NOTE_PROVENANCE_NOTE_VERSIONS_LIMIT, NOTE_PROVENANCE_OUTBOX_LIMIT, - NOTE_PROVENANCE_RECENT_TRACES_LIMIT, - }, - requests::ValidatedNoteProvenanceRequest, - rows::{ - NoteDerivedProposalRow, NoteIndexingOutboxRow, NoteIngestDecisionRow, - NoteProposalReviewRow, NoteRecentTraceRow, NoteVersionRow, - }, - }, +pub(super) use self::{ + bundle_tables::{ + load_indexing_outbox, load_ingest_decisions, load_note_versions, + load_recent_traces_for_note, }, + history_events::load_memory_history_events, }; -use elf_storage::models::MemoryNote; - -pub(super) async fn load_ingest_decisions( - pool: &PgPool, - req: &ValidatedNoteProvenanceRequest, -) -> Result> { - let rows: Vec = sqlx::query_as::<_, NoteIngestDecisionRow>( - "\ -SELECT - decision_id, - tenant_id, - project_id, - agent_id, - scope, - pipeline, - note_type, - note_key, - note_id, - note_version_id, - base_decision, - policy_decision, - note_op, - reason_code, - details, - ts -FROM memory_ingest_decisions -WHERE note_id = $1 AND tenant_id = $2 AND project_id = $3 -ORDER BY ts DESC -LIMIT $4", - ) - .bind(req.note_id) - .bind(&req.tenant_id) - .bind(&req.project_id) - .bind(NOTE_PROVENANCE_INGEST_DECISIONS_LIMIT) - .fetch_all(pool) - .await?; - - Ok(rows.into_iter().map(NoteProvenanceIngestDecision::from).collect()) -} - -pub(super) async fn load_note_versions( - pool: &PgPool, - tenant_id: &str, - project_id: &str, - note_id: Uuid, -) -> Result> { - let rows: Vec = sqlx::query_as::<_, NoteVersionRow>( - "\ -SELECT - memory_note_versions.version_id, - memory_note_versions.note_id, - memory_note_versions.op, - memory_note_versions.prev_snapshot, - memory_note_versions.new_snapshot, - memory_note_versions.reason, - memory_note_versions.actor, - memory_note_versions.ts -FROM memory_note_versions -JOIN memory_notes n ON n.note_id = memory_note_versions.note_id -WHERE memory_note_versions.note_id = $1 - AND n.tenant_id = $2 - AND n.project_id = $3 -ORDER BY memory_note_versions.ts DESC -LIMIT $4", - ) - .bind(note_id) - .bind(tenant_id) - .bind(project_id) - .bind(NOTE_PROVENANCE_NOTE_VERSIONS_LIMIT) - .fetch_all(pool) - .await?; - - Ok(rows.into_iter().map(NoteProvenanceNoteVersion::from).collect()) -} - -pub(super) async fn load_indexing_outbox( - pool: &PgPool, - tenant_id: &str, - project_id: &str, - note_id: Uuid, -) -> Result> { - let rows: Vec = sqlx::query_as::<_, NoteIndexingOutboxRow>( - "\ -SELECT - indexing_outbox.outbox_id, - indexing_outbox.note_id, - indexing_outbox.op, - indexing_outbox.embedding_version, - indexing_outbox.status, - indexing_outbox.attempts, - indexing_outbox.last_error, - indexing_outbox.available_at, - indexing_outbox.created_at, - indexing_outbox.updated_at -FROM indexing_outbox -JOIN memory_notes n ON n.note_id = indexing_outbox.note_id -WHERE indexing_outbox.note_id = $1 - AND n.tenant_id = $2 - AND n.project_id = $3 -ORDER BY indexing_outbox.updated_at DESC -LIMIT $4", - ) - .bind(note_id) - .bind(tenant_id) - .bind(project_id) - .bind(NOTE_PROVENANCE_OUTBOX_LIMIT) - .fetch_all(pool) - .await?; - - Ok(rows.into_iter().map(NoteProvenanceIndexingOutbox::from).collect()) -} - -pub(super) async fn load_recent_traces_for_note( - pool: &PgPool, - tenant_id: &str, - project_id: &str, - note_id: Uuid, -) -> Result> { - let rows: Vec = sqlx::query_as::<_, NoteRecentTraceRow>( - "\ -SELECT - trace_id, - tenant_id, - project_id, - agent_id, - read_profile, - query, - created_at -FROM search_traces -WHERE tenant_id = $1 - AND project_id = $2 - AND trace_id IN (SELECT DISTINCT trace_id FROM search_trace_items WHERE note_id = $3) -ORDER BY created_at DESC, trace_id DESC -LIMIT $4", - ) - .bind(tenant_id) - .bind(project_id) - .bind(note_id) - .bind(NOTE_PROVENANCE_RECENT_TRACES_LIMIT) - .fetch_all(pool) - .await?; - - Ok(rows.into_iter().map(to_recent_trace).collect()) -} - -pub(super) async fn load_memory_history_events( - pool: &PgPool, - req: &ValidatedNoteProvenanceRequest, - note: &MemoryNote, -) -> Result> { - let decisions = load_ingest_decisions(pool, req).await?; - let versions = load_note_versions(pool, &req.tenant_id, &req.project_id, req.note_id).await?; - let proposal_ref = serde_json::json!([{ "kind": "note", "id": req.note_id }]); - let target_ref = serde_json::json!({ "kind": "note", "id": req.note_id }); - let proposals = load_derived_proposals_for_note(pool, req, &proposal_ref, &target_ref).await?; - let reviews = load_proposal_reviews_for_note(pool, req, &proposal_ref, &target_ref).await?; - let mut decision_by_version = HashMap::new(); - - for decision in &decisions { - if let Some(version_id) = decision.note_version_id { - decision_by_version.insert(version_id, decision); - } - } - - let mut events = Vec::new(); - - for version in &versions { - events.push(history::version_history_event( - version, - decision_by_version.get(&version.version_id), - )); - } - for decision in &decisions { - if history::should_emit_decision_event(decision) { - events.push(history::decision_history_event(req.note_id, decision)); - } - } - - if let Some(expires_at) = note.expires_at - && expires_at <= OffsetDateTime::now_utc() - && !events.iter().any(|event| event.event_type == "expire") - { - events.push(history::expire_history_event(note, expires_at)); - } - - for proposal in proposals { - events.push(history::derived_proposal_history_event(req.note_id, proposal)); - } - for review in reviews { - events.push(history::proposal_review_history_event(req.note_id, review)); - } - - events.sort_by(|left, right| { - left.ts.cmp(&right.ts).then_with(|| left.event_id.cmp(&right.event_id)) - }); - - let history_limit = NOTE_PROVENANCE_HISTORY_LIMIT as usize; - - if events.len() > history_limit { - let drop_count = events.len() - history_limit; - - events.drain(0..drop_count); - } - - Ok(events) -} - -pub(super) async fn load_derived_proposals_for_note( - pool: &PgPool, - req: &ValidatedNoteProvenanceRequest, - proposal_ref: &Value, - target_ref: &Value, -) -> Result> { - let rows = sqlx::query_as::<_, NoteDerivedProposalRow>( - "\ -SELECT - proposal_id, - run_id, - agent_id, - proposal_kind, - apply_intent, - review_state, - source_refs, - source_snapshot, - lineage, - diff, - confidence, - COALESCE(target_ref, '{}'::jsonb) AS target_ref, - COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, - created_at -FROM consolidation_proposals -WHERE tenant_id = $1 - AND project_id = $2 - AND (source_refs @> $3 OR target_ref @> $4) -ORDER BY created_at DESC, proposal_id DESC -LIMIT $5", - ) - .bind(&req.tenant_id) - .bind(&req.project_id) - .bind(proposal_ref) - .bind(target_ref) - .bind(NOTE_PROVENANCE_HISTORY_LIMIT) - .fetch_all(pool) - .await?; - - Ok(rows) -} - -pub(super) async fn load_proposal_reviews_for_note( - pool: &PgPool, - req: &ValidatedNoteProvenanceRequest, - proposal_ref: &Value, - target_ref: &Value, -) -> Result> { - let rows = sqlx::query_as::<_, NoteProposalReviewRow>( - "\ -SELECT - reviews.review_id, - reviews.proposal_id, - reviews.run_id, - reviews.reviewer_agent_id, - reviews.action, - reviews.from_review_state, - reviews.to_review_state, - reviews.review_comment, - reviews.created_at, - proposals.proposal_kind, - proposals.apply_intent, - proposals.diff -FROM consolidation_proposal_reviews reviews -JOIN consolidation_proposals proposals - ON proposals.proposal_id = reviews.proposal_id -WHERE reviews.tenant_id = $1 - AND reviews.project_id = $2 - AND (proposals.source_refs @> $3 OR proposals.target_ref @> $4) -ORDER BY reviews.created_at DESC, reviews.review_id DESC -LIMIT $5", - ) - .bind(&req.tenant_id) - .bind(&req.project_id) - .bind(proposal_ref) - .bind(target_ref) - .bind(NOTE_PROVENANCE_HISTORY_LIMIT) - .fetch_all(pool) - .await?; - - Ok(rows) -} - -fn to_recent_trace(item: NoteRecentTraceRow) -> NoteProvenanceRecentTrace { - NoteProvenanceRecentTrace { - trace_id: item.trace_id, - tenant_id: item.tenant_id, - project_id: item.project_id, - agent_id: item.agent_id, - read_profile: item.read_profile, - query: item.query, - created_at: item.created_at, - } -} diff --git a/packages/elf-service/src/provenance/loaders/bundle_tables.rs b/packages/elf-service/src/provenance/loaders/bundle_tables.rs new file mode 100644 index 00000000..828d068f --- /dev/null +++ b/packages/elf-service/src/provenance/loaders/bundle_tables.rs @@ -0,0 +1,171 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::{ + Result, + provenance::types::{ + NoteProvenanceIndexingOutbox, NoteProvenanceIngestDecision, NoteProvenanceNoteVersion, + NoteProvenanceRecentTrace, + constants::{ + NOTE_PROVENANCE_INGEST_DECISIONS_LIMIT, NOTE_PROVENANCE_NOTE_VERSIONS_LIMIT, + NOTE_PROVENANCE_OUTBOX_LIMIT, NOTE_PROVENANCE_RECENT_TRACES_LIMIT, + }, + requests::ValidatedNoteProvenanceRequest, + rows::{NoteIndexingOutboxRow, NoteIngestDecisionRow, NoteRecentTraceRow, NoteVersionRow}, + }, +}; + +pub(in crate::provenance) async fn load_ingest_decisions( + pool: &PgPool, + req: &ValidatedNoteProvenanceRequest, +) -> Result> { + let rows: Vec = sqlx::query_as::<_, NoteIngestDecisionRow>( + "\ +SELECT + decision_id, + tenant_id, + project_id, + agent_id, + scope, + pipeline, + note_type, + note_key, + note_id, + note_version_id, + base_decision, + policy_decision, + note_op, + reason_code, + details, + ts +FROM memory_ingest_decisions +WHERE note_id = $1 AND tenant_id = $2 AND project_id = $3 +ORDER BY ts DESC +LIMIT $4", + ) + .bind(req.note_id) + .bind(&req.tenant_id) + .bind(&req.project_id) + .bind(NOTE_PROVENANCE_INGEST_DECISIONS_LIMIT) + .fetch_all(pool) + .await?; + + Ok(rows.into_iter().map(NoteProvenanceIngestDecision::from).collect()) +} + +pub(in crate::provenance) async fn load_note_versions( + pool: &PgPool, + tenant_id: &str, + project_id: &str, + note_id: Uuid, +) -> Result> { + let rows: Vec = sqlx::query_as::<_, NoteVersionRow>( + "\ +SELECT + memory_note_versions.version_id, + memory_note_versions.note_id, + memory_note_versions.op, + memory_note_versions.prev_snapshot, + memory_note_versions.new_snapshot, + memory_note_versions.reason, + memory_note_versions.actor, + memory_note_versions.ts +FROM memory_note_versions +JOIN memory_notes n ON n.note_id = memory_note_versions.note_id +WHERE memory_note_versions.note_id = $1 + AND n.tenant_id = $2 + AND n.project_id = $3 +ORDER BY memory_note_versions.ts DESC +LIMIT $4", + ) + .bind(note_id) + .bind(tenant_id) + .bind(project_id) + .bind(NOTE_PROVENANCE_NOTE_VERSIONS_LIMIT) + .fetch_all(pool) + .await?; + + Ok(rows.into_iter().map(NoteProvenanceNoteVersion::from).collect()) +} + +pub(in crate::provenance) async fn load_indexing_outbox( + pool: &PgPool, + tenant_id: &str, + project_id: &str, + note_id: Uuid, +) -> Result> { + let rows: Vec = sqlx::query_as::<_, NoteIndexingOutboxRow>( + "\ +SELECT + indexing_outbox.outbox_id, + indexing_outbox.note_id, + indexing_outbox.op, + indexing_outbox.embedding_version, + indexing_outbox.status, + indexing_outbox.attempts, + indexing_outbox.last_error, + indexing_outbox.available_at, + indexing_outbox.created_at, + indexing_outbox.updated_at +FROM indexing_outbox +JOIN memory_notes n ON n.note_id = indexing_outbox.note_id +WHERE indexing_outbox.note_id = $1 + AND n.tenant_id = $2 + AND n.project_id = $3 +ORDER BY indexing_outbox.updated_at DESC +LIMIT $4", + ) + .bind(note_id) + .bind(tenant_id) + .bind(project_id) + .bind(NOTE_PROVENANCE_OUTBOX_LIMIT) + .fetch_all(pool) + .await?; + + Ok(rows.into_iter().map(NoteProvenanceIndexingOutbox::from).collect()) +} + +pub(in crate::provenance) async fn load_recent_traces_for_note( + pool: &PgPool, + tenant_id: &str, + project_id: &str, + note_id: Uuid, +) -> Result> { + let rows: Vec = sqlx::query_as::<_, NoteRecentTraceRow>( + "\ +SELECT + trace_id, + tenant_id, + project_id, + agent_id, + read_profile, + query, + created_at +FROM search_traces +WHERE tenant_id = $1 + AND project_id = $2 + AND trace_id IN (SELECT DISTINCT trace_id FROM search_trace_items WHERE note_id = $3) +ORDER BY created_at DESC, trace_id DESC +LIMIT $4", + ) + .bind(tenant_id) + .bind(project_id) + .bind(note_id) + .bind(NOTE_PROVENANCE_RECENT_TRACES_LIMIT) + .fetch_all(pool) + .await?; + + Ok(rows.into_iter().map(to_recent_trace).collect()) +} + +fn to_recent_trace(item: NoteRecentTraceRow) -> NoteProvenanceRecentTrace { + NoteProvenanceRecentTrace { + trace_id: item.trace_id, + tenant_id: item.tenant_id, + project_id: item.project_id, + agent_id: item.agent_id, + read_profile: item.read_profile, + query: item.query, + created_at: item.created_at, + } +} diff --git a/packages/elf-service/src/provenance/loaders/history_events.rs b/packages/elf-service/src/provenance/loaders/history_events.rs new file mode 100644 index 00000000..23368a22 --- /dev/null +++ b/packages/elf-service/src/provenance/loaders/history_events.rs @@ -0,0 +1,190 @@ +use std::collections::HashMap; + +use serde_json::Value; +use sqlx::PgPool; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{ + Result, + provenance::{ + history::{self}, + loaders::bundle_tables, + types::{ + MemoryHistoryEvent, NoteProvenanceIngestDecision, NoteProvenanceNoteVersion, + constants::NOTE_PROVENANCE_HISTORY_LIMIT, + requests::ValidatedNoteProvenanceRequest, + rows::{NoteDerivedProposalRow, NoteProposalReviewRow}, + }, + }, +}; +use elf_storage::models::MemoryNote; + +pub(in crate::provenance) async fn load_memory_history_events( + pool: &PgPool, + req: &ValidatedNoteProvenanceRequest, + note: &MemoryNote, +) -> Result> { + let decisions = bundle_tables::load_ingest_decisions(pool, req).await?; + let versions = + bundle_tables::load_note_versions(pool, &req.tenant_id, &req.project_id, req.note_id) + .await?; + let proposal_ref = serde_json::json!([{ "kind": "note", "id": req.note_id }]); + let target_ref = serde_json::json!({ "kind": "note", "id": req.note_id }); + let proposals = load_derived_proposals_for_note(pool, req, &proposal_ref, &target_ref).await?; + let reviews = load_proposal_reviews_for_note(pool, req, &proposal_ref, &target_ref).await?; + + Ok(build_memory_history_events( + req.note_id, + note, + &decisions, + &versions, + proposals, + reviews, + OffsetDateTime::now_utc(), + )) +} + +fn build_memory_history_events( + note_id: Uuid, + note: &MemoryNote, + decisions: &[NoteProvenanceIngestDecision], + versions: &[NoteProvenanceNoteVersion], + proposals: Vec, + reviews: Vec, + now: OffsetDateTime, +) -> Vec { + let mut decision_by_version = HashMap::new(); + + for decision in decisions { + if let Some(version_id) = decision.note_version_id { + decision_by_version.insert(version_id, decision); + } + } + + let mut events = Vec::new(); + + for version in versions { + events.push(history::version_history_event( + version, + decision_by_version.get(&version.version_id), + )); + } + for decision in decisions { + if history::should_emit_decision_event(decision) { + events.push(history::decision_history_event(note_id, decision)); + } + } + + if let Some(expires_at) = note.expires_at + && expires_at <= now + && !events.iter().any(|event| event.event_type == "expire") + { + events.push(history::expire_history_event(note, expires_at)); + } + + for proposal in proposals { + events.push(history::derived_proposal_history_event(note_id, proposal)); + } + for review in reviews { + events.push(history::proposal_review_history_event(note_id, review)); + } + + events.sort_by(|left, right| { + left.ts.cmp(&right.ts).then_with(|| left.event_id.cmp(&right.event_id)) + }); + + let history_limit = NOTE_PROVENANCE_HISTORY_LIMIT as usize; + + if events.len() > history_limit { + let drop_count = events.len() - history_limit; + + events.drain(0..drop_count); + } + + events +} + +async fn load_derived_proposals_for_note( + pool: &PgPool, + req: &ValidatedNoteProvenanceRequest, + proposal_ref: &Value, + target_ref: &Value, +) -> Result> { + let rows = sqlx::query_as::<_, NoteDerivedProposalRow>( + "\ +SELECT + proposal_id, + run_id, + agent_id, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + COALESCE(target_ref, '{}'::jsonb) AS target_ref, + COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, + created_at +FROM consolidation_proposals +WHERE tenant_id = $1 + AND project_id = $2 + AND (source_refs @> $3 OR target_ref @> $4) +ORDER BY created_at DESC, proposal_id DESC +LIMIT $5", + ) + .bind(&req.tenant_id) + .bind(&req.project_id) + .bind(proposal_ref) + .bind(target_ref) + .bind(NOTE_PROVENANCE_HISTORY_LIMIT) + .fetch_all(pool) + .await?; + + Ok(rows) +} + +async fn load_proposal_reviews_for_note( + pool: &PgPool, + req: &ValidatedNoteProvenanceRequest, + proposal_ref: &Value, + target_ref: &Value, +) -> Result> { + let rows = sqlx::query_as::<_, NoteProposalReviewRow>( + "\ +SELECT + reviews.review_id, + reviews.proposal_id, + reviews.run_id, + reviews.reviewer_agent_id, + reviews.action, + reviews.from_review_state, + reviews.to_review_state, + reviews.review_comment, + reviews.created_at, + proposals.proposal_kind, + proposals.apply_intent, + proposals.diff +FROM consolidation_proposal_reviews reviews +JOIN consolidation_proposals proposals + ON proposals.proposal_id = reviews.proposal_id +WHERE reviews.tenant_id = $1 + AND reviews.project_id = $2 + AND (proposals.source_refs @> $3 OR proposals.target_ref @> $4) +ORDER BY reviews.created_at DESC, reviews.review_id DESC +LIMIT $5", + ) + .bind(&req.tenant_id) + .bind(&req.project_id) + .bind(proposal_ref) + .bind(target_ref) + .bind(NOTE_PROVENANCE_HISTORY_LIMIT) + .fetch_all(pool) + .await?; + + Ok(rows) +} + +#[cfg(test)] mod tests; diff --git a/packages/elf-service/src/provenance/loaders/history_events/tests.rs b/packages/elf-service/src/provenance/loaders/history_events/tests.rs new file mode 100644 index 00000000..7d3975c0 --- /dev/null +++ b/packages/elf-service/src/provenance/loaders/history_events/tests.rs @@ -0,0 +1,168 @@ +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::provenance::types::{ + NoteProvenanceIngestDecision, NoteProvenanceNoteVersion, + constants::NOTE_PROVENANCE_HISTORY_LIMIT, + rows::{NoteDerivedProposalRow, NoteProposalReviewRow}, +}; +use elf_storage::models::MemoryNote; + +#[test] +fn history_events_link_versions_emit_decisions_and_insert_expiry() { + let note_id = Uuid::from_u128(1); + let version_id = Uuid::from_u128(2); + let decision = ingest_decision(note_id, Uuid::from_u128(3), Some(version_id), "ADD", 20); + let ignored = ingest_decision(note_id, Uuid::from_u128(4), None, "NONE", 10); + let version = note_version(note_id, version_id, "ADD", 20); + let note = memory_note(note_id, Some(ts(30))); + let events = super::build_memory_history_events( + note_id, + ¬e, + &[decision, ignored], + &[version], + vec![derived_proposal(40)], + vec![proposal_review(50)], + ts(60), + ); + let event_types: Vec<&str> = events.iter().map(|event| event.event_type.as_str()).collect(); + + assert_eq!(event_types, vec!["ignore", "add", "expire", "derived", "applied"]); + assert_eq!(events[1].related_decision_id, Some(Uuid::from_u128(3))); + assert_eq!(events[1].related_note_version_id, Some(version_id)); + assert_eq!(events[2].source_table, "memory_notes"); + assert_eq!(events[3].source_table, "consolidation_proposals"); + assert_eq!(events[4].source_table, "consolidation_proposal_reviews"); +} + +#[test] +fn history_events_trim_oldest_events_after_deterministic_sort() { + let note_id = Uuid::from_u128(11); + let note = memory_note(note_id, None); + let decisions: Vec = (0..=NOTE_PROVENANCE_HISTORY_LIMIT) + .map(|idx| ingest_decision(note_id, Uuid::from_u128(idx as u128 + 100), None, "NONE", idx)) + .collect(); + let events = super::build_memory_history_events( + note_id, + ¬e, + &decisions, + &[], + Vec::new(), + Vec::new(), + ts(NOTE_PROVENANCE_HISTORY_LIMIT + 10), + ); + + assert_eq!(events.len(), NOTE_PROVENANCE_HISTORY_LIMIT as usize); + assert_eq!(events[0].ts, ts(1)); + assert_eq!(events.last().expect("event").ts, ts(NOTE_PROVENANCE_HISTORY_LIMIT)); +} + +fn ingest_decision( + note_id: Uuid, + decision_id: Uuid, + note_version_id: Option, + note_op: &str, + ts_value: i64, +) -> NoteProvenanceIngestDecision { + NoteProvenanceIngestDecision { + decision_id, + tenant_id: "tenant-a".to_string(), + project_id: "project-a".to_string(), + agent_id: "agent-a".to_string(), + scope: "project".to_string(), + pipeline: "add_note".to_string(), + note_type: "fact".to_string(), + note_key: None, + note_id: Some(note_id), + note_version_id, + base_decision: "remember".to_string(), + policy_decision: if note_op == "NONE" { "ignore" } else { "remember" }.to_string(), + note_op: note_op.to_string(), + reason_code: None, + details: serde_json::json!({}), + ts: ts(ts_value), + } +} + +fn note_version( + note_id: Uuid, + version_id: Uuid, + op: &str, + ts_value: i64, +) -> NoteProvenanceNoteVersion { + NoteProvenanceNoteVersion { + version_id, + note_id, + op: op.to_string(), + prev_snapshot: None, + new_snapshot: Some(serde_json::json!({ "note_id": note_id })), + reason: "add_note".to_string(), + actor: "agent-a".to_string(), + ts: ts(ts_value), + } +} + +fn derived_proposal(ts_value: i64) -> NoteDerivedProposalRow { + NoteDerivedProposalRow { + proposal_id: Uuid::from_u128(20), + run_id: Uuid::from_u128(21), + agent_id: "agent-a".to_string(), + proposal_kind: "memory".to_string(), + apply_intent: "review".to_string(), + review_state: "pending".to_string(), + source_refs: serde_json::json!([]), + source_snapshot: serde_json::json!({}), + lineage: serde_json::json!({}), + diff: serde_json::json!({}), + confidence: 0.9, + target_ref: serde_json::json!({}), + proposed_payload: serde_json::json!({}), + created_at: ts(ts_value), + } +} + +fn proposal_review(ts_value: i64) -> NoteProposalReviewRow { + NoteProposalReviewRow { + review_id: Uuid::from_u128(30), + proposal_id: Uuid::from_u128(20), + run_id: Uuid::from_u128(21), + reviewer_agent_id: "reviewer-a".to_string(), + action: "apply".to_string(), + from_review_state: "pending".to_string(), + to_review_state: "applied".to_string(), + review_comment: Some("ok".to_string()), + created_at: ts(ts_value), + proposal_kind: "memory".to_string(), + apply_intent: "review".to_string(), + diff: serde_json::json!({}), + } +} + +fn memory_note(note_id: Uuid, expires_at: Option) -> MemoryNote { + let created_at = ts(0); + + MemoryNote { + note_id, + tenant_id: "tenant-a".to_string(), + project_id: "project-a".to_string(), + agent_id: "agent-a".to_string(), + scope: "project".to_string(), + r#type: "fact".to_string(), + key: None, + text: "A durable English note.".to_string(), + importance: 0.5, + confidence: 0.8, + status: "active".to_string(), + created_at, + updated_at: created_at, + expires_at, + embedding_version: "v1".to_string(), + source_ref: serde_json::json!({}), + hit_count: 0, + last_hit_at: None, + } +} + +fn ts(value: i64) -> OffsetDateTime { + OffsetDateTime::from_unix_timestamp(value).expect("valid timestamp") +}