diff --git a/packages/elf-service/src/add_event/service.rs b/packages/elf-service/src/add_event/service.rs index f78df0dc..0c0bb339 100644 --- a/packages/elf-service/src/add_event/service.rs +++ b/packages/elf-service/src/add_event/service.rs @@ -1,312 +1,3 @@ -use serde_json; -use sqlx::{PgConnection, Postgres, Transaction}; -use time::{Duration, OffsetDateTime}; - -use crate::{ - ElfService, Error, ResolveUpdateArgs, Result, UpdateDecision, - access::ORG_PROJECT_ID, - add_event::{ - audit, materialize, - policy::{self}, - rejection, - types::{ - AddEventContext, AddEventRequest, AddEventResponse, AddEventResult, ExtractedNote, - ExtractorOutput, NoteProcessingData, PersistExtractedNoteArgs, - }, - validation::{self}, - }, - ingestion_profiles::{self, IngestionProfileRef}, -}; -use elf_domain::{memory_policy::MemoryPolicyDecision, ttl, writegate::WritePolicyAudit}; - -impl ElfService { - /// Extracts notes from an event transcript and optionally persists the accepted results. - pub async fn add_event(&self, req: AddEventRequest) -> Result { - validation::validate_add_event_request(&req)?; - - let resolved_profile = ingestion_profiles::resolve_add_event_profile( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - req.ingestion_profile.as_ref(), - ) - .await?; - let (messages, message_policy_applied, write_policy_audits) = - validation::apply_write_policies_to_messages(req.messages.as_slice())?; - let message_texts: Vec = - messages.iter().map(|message| message.content.clone()).collect(); - let messages_json = - serde_json::to_string(&messages).map_err(|_| Error::InvalidRequest { - message: "Failed to serialize messages for extractor.".to_string(), - })?; - let extractor_messages = resolved_profile.build_extractor_messages( - &messages_json, - self.cfg.memory.max_notes_per_add_event, - self.cfg.memory.max_note_chars, - )?; - let llm_cfg = resolved_profile.resolved_llm_config(&self.cfg.providers.llm_extractor); - let extracted_raw = self.providers.extractor.extract(&llm_cfg, &extractor_messages).await?; - let max_notes = self.cfg.memory.max_notes_per_add_event as usize; - let mut extracted: ExtractorOutput = serde_json::from_value(extracted_raw.clone()) - .map_err(|_| Error::InvalidRequest { - message: "Extractor output is missing notes array.".to_string(), - })?; - - if extracted.notes.len() > max_notes { - extracted.notes.truncate(max_notes); - } - - let extracted_json = serde_json::to_value(&extracted).map_err(|_| { - Error::InvalidRequest { message: "Failed to serialize extracted notes.".to_string() } - })?; - let base_now = OffsetDateTime::now_utc(); - let embed_version = crate::embedding_version(&self.cfg); - let dry_run = req.dry_run.unwrap_or(false); - let mut results = Vec::with_capacity(extracted.notes.len()); - - for (note_idx, note) in extracted.notes.into_iter().enumerate() { - let now = base_now + Duration::microseconds(note_idx as i64); - - results.push( - self.process_extracted_note( - &req, - &resolved_profile.profile_ref, - &message_texts, - &message_policy_applied, - write_policy_audits.as_ref(), - note, - now, - embed_version.as_str(), - dry_run, - ) - .await?, - ); - } - - Ok(AddEventResponse { - extracted: extracted_json, - results, - ingestion_profile: Some(resolved_profile.profile_ref), - }) - } - - #[allow(clippy::too_many_arguments)] - async fn process_extracted_note( - &self, - req: &AddEventRequest, - ingestion_profile: &IngestionProfileRef, - message_texts: &[String], - message_policy_applied: &[bool], - write_policy_audits: Option<&Vec>, - note: ExtractedNote, - now: OffsetDateTime, - embed_version: &str, - dry_run: bool, - ) -> Result { - let note_data = NoteProcessingData::from_request_and_note(req, ¬e); - let effective_project_id = if note_data.scope.trim() == "org_shared" { - ORG_PROJECT_ID - } else { - req.project_id.as_str() - }; - let ctx = AddEventContext { - tenant_id: req.tenant_id.as_str(), - project_id: effective_project_id, - agent_id: req.agent_id.as_str(), - scope: note_data.scope.as_str(), - now, - }; - let mut tx = self.db.pool.begin().await?; - - if let Some(result) = rejection::record_extracted_note_rejections( - &mut tx, - &self.cfg, - &ctx, - ingestion_profile, - ¬e, - ¬e_data, - message_texts, - message_policy_applied, - write_policy_audits, - ) - .await? - { - tx.commit().await?; - - return Ok(result); - } - - let result = self - .apply_extracted_note_decision( - req, - ingestion_profile, - &mut tx, - &ctx, - ¬e, - ¬e_data, - note_data.note_type.as_str(), - effective_project_id, - now, - embed_version, - dry_run, - write_policy_audits, - ) - .await?; - - tx.commit().await?; - - Ok(result) - } - - #[allow(clippy::too_many_arguments)] - async fn apply_extracted_note_decision( - &self, - req: &AddEventRequest, - ingestion_profile: &IngestionProfileRef, - tx: &mut Transaction<'_, Postgres>, - ctx: &AddEventContext<'_>, - note: &ExtractedNote, - note_data: &NoteProcessingData, - note_type: &str, - project_id: &str, - now: OffsetDateTime, - embed_version: &str, - dry_run: bool, - write_policy_audits: Option<&Vec>, - ) -> Result { - let decision = self.resolve_extracted_note_update(note, req, note_data, tx, now).await?; - let metadata = decision.metadata(); - let base_decision = policy::base_decision_for_update( - &decision, - note_data.structured_present, - note_data.graph_present, - ); - let (policy_decision, decision_policy_rule, min_confidence, min_importance) = - policy::resolve_policy_for_update(&self.cfg, note_data, base_decision); - let ignore_reason_code = policy::ignore_reason_code_for_policy( - base_decision, - policy_decision, - metadata.matched_dup, - ); - let should_apply = matches!( - policy_decision, - MemoryPolicyDecision::Remember | MemoryPolicyDecision::Update - ); - let mut result = policy::build_result_from_decision( - &decision, - policy_decision, - note_data.reason.clone(), - note_data.structured_present || note_data.graph_present, - ); - - policy::apply_policy_ignore_adjustments( - &mut result, - &decision, - policy_decision, - ignore_reason_code, - ); - - let mut note_version_id = None; - - if should_apply && !dry_run { - let persist_args = PersistExtractedNoteArgs { - req, - project_id, - structured: note_data.structured.as_ref(), - key: note.key.as_deref(), - reason: note.reason.as_ref(), - note_type, - text: note_data.text.as_str(), - scope: note_data.scope.as_str(), - importance: note_data.importance, - confidence: note_data.confidence, - expires_at: ttl::compute_expires_at( - note_data.ttl_days, - note_data.note_type.as_str(), - &self.cfg, - now, - ), - source_ref: serde_json::json!({ - "evidence": note_data.evidence.clone(), - "reason": note_data.reason.clone().unwrap_or_default(), - "ingestion_profile": serde_json::json!({ - "id": ingestion_profile.id, - "version": ingestion_profile.version, - }), - }), - now, - embed_version, - }; - let persisted = materialize::persist_extracted_note_decision( - tx, - persist_args, - decision, - policy_decision, - ) - .await?; - - result = persisted.0; - note_version_id = persisted.1; - } - - result.write_policy_audits = write_policy_audits.cloned(); - - audit::record_ingest_decision( - tx, - &self.cfg, - ctx, - note, - note_data.note_type.as_str(), - result.note_id, - note_version_id, - base_decision, - policy_decision, - result.op, - result.reason_code.as_deref(), - decision_policy_rule.as_deref(), - metadata.similarity_best, - metadata.key_match, - metadata.matched_dup, - min_confidence, - min_importance, - Some(ingestion_profile.id.as_str()), - Some(ingestion_profile.version), - note_data.structured_present, - note_data.graph_present, - write_policy_audits.cloned(), - ) - .await?; - - Ok(result) - } - - async fn resolve_extracted_note_update( - &self, - note: &ExtractedNote, - req: &AddEventRequest, - note_data: &NoteProcessingData, - tx: &mut PgConnection, - now: OffsetDateTime, - ) -> Result { - crate::resolve_update( - tx, - ResolveUpdateArgs { - cfg: &self.cfg, - providers: &self.providers, - tenant_id: req.tenant_id.as_str(), - project_id: if note_data.scope.trim() == "org_shared" { - ORG_PROJECT_ID - } else { - req.project_id.as_str() - }, - agent_id: req.agent_id.as_str(), - scope: note_data.scope.as_str(), - note_type: note_data.note_type.as_str(), - key: note.key.as_deref(), - text: note_data.text.as_str(), - now, - }, - ) - .await - } -} +mod decision; +mod entrypoint; +mod processing; diff --git a/packages/elf-service/src/add_event/service/decision.rs b/packages/elf-service/src/add_event/service/decision.rs new file mode 100644 index 00000000..79ff1248 --- /dev/null +++ b/packages/elf-service/src/add_event/service/decision.rs @@ -0,0 +1,171 @@ +use sqlx::{PgConnection, Postgres, Transaction}; +use time::OffsetDateTime; + +use crate::{ + ElfService, ResolveUpdateArgs, Result, UpdateDecision, + access::ORG_PROJECT_ID, + add_event::{ + audit, materialize, + policy::{self}, + types::{ + AddEventContext, AddEventRequest, AddEventResult, ExtractedNote, NoteProcessingData, + PersistExtractedNoteArgs, + }, + }, + ingestion_profiles::IngestionProfileRef, +}; +use elf_domain::{memory_policy::MemoryPolicyDecision, ttl, writegate::WritePolicyAudit}; + +impl ElfService { + #[allow(clippy::too_many_arguments)] + pub(in crate::add_event) async fn apply_extracted_note_decision( + &self, + req: &AddEventRequest, + ingestion_profile: &IngestionProfileRef, + tx: &mut Transaction<'_, Postgres>, + ctx: &AddEventContext<'_>, + note: &ExtractedNote, + note_data: &NoteProcessingData, + note_type: &str, + project_id: &str, + now: OffsetDateTime, + embed_version: &str, + dry_run: bool, + write_policy_audits: Option<&Vec>, + ) -> Result { + let decision = self.resolve_extracted_note_update(note, req, note_data, tx, now).await?; + let metadata = decision.metadata(); + let base_decision = policy::base_decision_for_update( + &decision, + note_data.structured_present, + note_data.graph_present, + ); + let (policy_decision, decision_policy_rule, min_confidence, min_importance) = + policy::resolve_policy_for_update(&self.cfg, note_data, base_decision); + let ignore_reason_code = policy::ignore_reason_code_for_policy( + base_decision, + policy_decision, + metadata.matched_dup, + ); + let should_apply = matches!( + policy_decision, + MemoryPolicyDecision::Remember | MemoryPolicyDecision::Update + ); + let mut result = policy::build_result_from_decision( + &decision, + policy_decision, + note_data.reason.clone(), + note_data.structured_present || note_data.graph_present, + ); + + policy::apply_policy_ignore_adjustments( + &mut result, + &decision, + policy_decision, + ignore_reason_code, + ); + + let mut note_version_id = None; + + if should_apply && !dry_run { + let persist_args = PersistExtractedNoteArgs { + req, + project_id, + structured: note_data.structured.as_ref(), + key: note.key.as_deref(), + reason: note.reason.as_ref(), + note_type, + text: note_data.text.as_str(), + scope: note_data.scope.as_str(), + importance: note_data.importance, + confidence: note_data.confidence, + expires_at: ttl::compute_expires_at( + note_data.ttl_days, + note_data.note_type.as_str(), + &self.cfg, + now, + ), + source_ref: serde_json::json!({ + "evidence": note_data.evidence.clone(), + "reason": note_data.reason.clone().unwrap_or_default(), + "ingestion_profile": serde_json::json!({ + "id": ingestion_profile.id, + "version": ingestion_profile.version, + }), + }), + now, + embed_version, + }; + let persisted = materialize::persist_extracted_note_decision( + tx, + persist_args, + decision, + policy_decision, + ) + .await?; + + result = persisted.0; + note_version_id = persisted.1; + } + + result.write_policy_audits = write_policy_audits.cloned(); + + audit::record_ingest_decision( + tx, + &self.cfg, + ctx, + note, + note_data.note_type.as_str(), + result.note_id, + note_version_id, + base_decision, + policy_decision, + result.op, + result.reason_code.as_deref(), + decision_policy_rule.as_deref(), + metadata.similarity_best, + metadata.key_match, + metadata.matched_dup, + min_confidence, + min_importance, + Some(ingestion_profile.id.as_str()), + Some(ingestion_profile.version), + note_data.structured_present, + note_data.graph_present, + write_policy_audits.cloned(), + ) + .await?; + + Ok(result) + } + + async fn resolve_extracted_note_update( + &self, + note: &ExtractedNote, + req: &AddEventRequest, + note_data: &NoteProcessingData, + tx: &mut PgConnection, + now: OffsetDateTime, + ) -> Result { + crate::resolve_update( + tx, + ResolveUpdateArgs { + cfg: &self.cfg, + providers: &self.providers, + tenant_id: req.tenant_id.as_str(), + project_id: if note_data.scope.trim() == "org_shared" { + ORG_PROJECT_ID + } else { + req.project_id.as_str() + }, + agent_id: req.agent_id.as_str(), + scope: note_data.scope.as_str(), + note_type: note_data.note_type.as_str(), + key: note.key.as_deref(), + text: note_data.text.as_str(), + now, + }, + ) + .await + } +} diff --git a/packages/elf-service/src/add_event/service/entrypoint.rs b/packages/elf-service/src/add_event/service/entrypoint.rs new file mode 100644 index 00000000..9e8888ac --- /dev/null +++ b/packages/elf-service/src/add_event/service/entrypoint.rs @@ -0,0 +1,82 @@ +use time::{Duration, OffsetDateTime}; + +use crate::{ + ElfService, Error, Result, + add_event::{ + types::{AddEventRequest, AddEventResponse, ExtractorOutput}, + validation, + }, + ingestion_profiles, +}; + +impl ElfService { + /// Extracts notes from an event transcript and optionally persists the accepted results. + pub async fn add_event(&self, req: AddEventRequest) -> Result { + validation::validate_add_event_request(&req)?; + + let resolved_profile = ingestion_profiles::resolve_add_event_profile( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + req.ingestion_profile.as_ref(), + ) + .await?; + let (messages, message_policy_applied, write_policy_audits) = + validation::apply_write_policies_to_messages(req.messages.as_slice())?; + let message_texts: Vec = + messages.iter().map(|message| message.content.clone()).collect(); + let messages_json = + serde_json::to_string(&messages).map_err(|_| Error::InvalidRequest { + message: "Failed to serialize messages for extractor.".to_string(), + })?; + let extractor_messages = resolved_profile.build_extractor_messages( + &messages_json, + self.cfg.memory.max_notes_per_add_event, + self.cfg.memory.max_note_chars, + )?; + let llm_cfg = resolved_profile.resolved_llm_config(&self.cfg.providers.llm_extractor); + let extracted_raw = self.providers.extractor.extract(&llm_cfg, &extractor_messages).await?; + let max_notes = self.cfg.memory.max_notes_per_add_event as usize; + let mut extracted: ExtractorOutput = serde_json::from_value(extracted_raw.clone()) + .map_err(|_| Error::InvalidRequest { + message: "Extractor output is missing notes array.".to_string(), + })?; + + if extracted.notes.len() > max_notes { + extracted.notes.truncate(max_notes); + } + + let extracted_json = serde_json::to_value(&extracted).map_err(|_| { + Error::InvalidRequest { message: "Failed to serialize extracted notes.".to_string() } + })?; + let base_now = OffsetDateTime::now_utc(); + let embed_version = crate::embedding_version(&self.cfg); + let dry_run = req.dry_run.unwrap_or(false); + let mut results = Vec::with_capacity(extracted.notes.len()); + + for (note_idx, note) in extracted.notes.into_iter().enumerate() { + let now = base_now + Duration::microseconds(note_idx as i64); + + results.push( + self.process_extracted_note( + &req, + &resolved_profile.profile_ref, + &message_texts, + &message_policy_applied, + write_policy_audits.as_ref(), + note, + now, + embed_version.as_str(), + dry_run, + ) + .await?, + ); + } + + Ok(AddEventResponse { + extracted: extracted_json, + results, + ingestion_profile: Some(resolved_profile.profile_ref), + }) + } +} diff --git a/packages/elf-service/src/add_event/service/processing.rs b/packages/elf-service/src/add_event/service/processing.rs new file mode 100644 index 00000000..39303890 --- /dev/null +++ b/packages/elf-service/src/add_event/service/processing.rs @@ -0,0 +1,84 @@ +use time::OffsetDateTime; + +use crate::{ + ElfService, Result, + access::ORG_PROJECT_ID, + add_event::{ + rejection, + types::{ + AddEventContext, AddEventRequest, AddEventResult, ExtractedNote, NoteProcessingData, + }, + }, + ingestion_profiles::IngestionProfileRef, +}; +use elf_domain::writegate::WritePolicyAudit; + +impl ElfService { + #[allow(clippy::too_many_arguments)] + pub(in crate::add_event) async fn process_extracted_note( + &self, + req: &AddEventRequest, + ingestion_profile: &IngestionProfileRef, + message_texts: &[String], + message_policy_applied: &[bool], + write_policy_audits: Option<&Vec>, + note: ExtractedNote, + now: OffsetDateTime, + embed_version: &str, + dry_run: bool, + ) -> Result { + let note_data = NoteProcessingData::from_request_and_note(req, ¬e); + let effective_project_id = if note_data.scope.trim() == "org_shared" { + ORG_PROJECT_ID + } else { + req.project_id.as_str() + }; + let ctx = AddEventContext { + tenant_id: req.tenant_id.as_str(), + project_id: effective_project_id, + agent_id: req.agent_id.as_str(), + scope: note_data.scope.as_str(), + now, + }; + let mut tx = self.db.pool.begin().await?; + + if let Some(result) = rejection::record_extracted_note_rejections( + &mut tx, + &self.cfg, + &ctx, + ingestion_profile, + ¬e, + ¬e_data, + message_texts, + message_policy_applied, + write_policy_audits, + ) + .await? + { + tx.commit().await?; + + return Ok(result); + } + + let result = self + .apply_extracted_note_decision( + req, + ingestion_profile, + &mut tx, + &ctx, + ¬e, + ¬e_data, + note_data.note_type.as_str(), + effective_project_id, + now, + embed_version, + dry_run, + write_policy_audits, + ) + .await?; + + tx.commit().await?; + + Ok(result) + } +} diff --git a/packages/elf-service/src/add_event/validation.rs b/packages/elf-service/src/add_event/validation.rs index 0c4e1d95..59dc03ef 100644 --- a/packages/elf-service/src/add_event/validation.rs +++ b/packages/elf-service/src/add_event/validation.rs @@ -1,240 +1,14 @@ -use crate::{ - Error, NoteOp, REJECT_EVIDENCE_MISMATCH, REJECT_WRITE_POLICY_MISMATCH, Result, - add_event::types::{ - AddEventRequest, AddEventResult, EventMessage, EvidenceQuote, ProcessedEventOutput, - }, - structured_fields::{self, StructuredFields}, -}; -use elf_config::Config; -use elf_domain::{ - english_gate, evidence, - memory_policy::MemoryPolicyDecision, - writegate::{self, NoteInput, WritePolicyAudit, WritePolicyError}, +mod evidence; +mod request; +mod structured; +mod write_policy; +mod writegate; + +pub(super) use self::{ + evidence::reject_extracted_note_if_evidence_invalid, request::validate_add_event_request, + structured::reject_extracted_note_if_structured_invalid, + write_policy::apply_write_policies_to_messages, + writegate::reject_extracted_note_if_writegate_rejects, }; pub(super) const REJECT_STRUCTURED_INVALID: &str = "REJECT_STRUCTURED_INVALID"; - -pub(super) fn validate_add_event_request(req: &AddEventRequest) -> Result<()> { - if req.messages.is_empty() { - return Err(Error::InvalidRequest { message: "Messages list is empty.".to_string() }); - } - if req.tenant_id.trim().is_empty() - || req.project_id.trim().is_empty() - || req.agent_id.trim().is_empty() - { - return Err(Error::InvalidRequest { - message: "tenant_id, project_id, and agent_id are required.".to_string(), - }); - } - - if let Some(scope) = req.scope.as_ref() - && scope.trim().is_empty() - { - return Err(Error::InvalidRequest { - message: "scope must not be empty when provided.".to_string(), - }); - } - if let Some(profile) = req.ingestion_profile.as_ref() { - if profile.id.trim().is_empty() { - return Err(Error::InvalidRequest { - message: "ingestion_profile.id must not be empty.".to_string(), - }); - } - - if let Some(version) = profile.version - && version <= 0 - { - return Err(Error::InvalidRequest { - message: "ingestion_profile.version must be greater than zero.".to_string(), - }); - } - } - - for (idx, msg) in req.messages.iter().enumerate() { - if !english_gate::is_english_natural_language(msg.content.as_str()) { - return Err(Error::NonEnglishInput { field: format!("$.messages[{idx}].content") }); - } - } - - Ok(()) -} - -pub(super) fn apply_write_policies_to_messages( - messages: &[EventMessage], -) -> Result { - let mut message_policy_applied = Vec::with_capacity(messages.len()); - let mut write_policy_audits = Vec::new(); - let mut transformed_messages = Vec::with_capacity(messages.len()); - - for message in messages { - let (transformed_message, audit) = apply_write_policy_to_message(message)?; - - message_policy_applied.push(audit.is_some()); - - if let Some(audit) = audit { - write_policy_audits.push(audit); - } - - transformed_messages.push(transformed_message); - } - - Ok(( - transformed_messages, - message_policy_applied, - if write_policy_audits.is_empty() { None } else { Some(write_policy_audits) }, - )) -} - -pub(super) fn apply_write_policy_to_message( - message: &EventMessage, -) -> Result<(EventMessage, Option)> { - let result = - writegate::apply_write_policy(message.content.as_str(), message.write_policy.as_ref()) - .map_err(|err| { - let message = match err { - WritePolicyError::InvalidSpan => "Invalid write_policy span provided.", - WritePolicyError::OverlappingOps => "Overlapping write_policy spans provided.", - }; - - Error::InvalidRequest { message: message.to_string() } - })?; - let has_policy = message.write_policy.is_some(); - let mut transformed = message.clone(); - - transformed.content = result.transformed; - - Ok((transformed, if has_policy { Some(result.audit) } else { None })) -} - -pub(super) fn reject_extracted_note_if_evidence_invalid( - cfg: &Config, - reason: Option<&String>, - evidence: &[EvidenceQuote], - message_texts: &[String], - message_policy_applied: &[bool], -) -> Option { - if evidence.is_empty() - || evidence.len() < cfg.security.evidence_min_quotes as usize - || evidence.len() > cfg.security.evidence_max_quotes as usize - { - return Some(AddEventResult { - note_id: None, - op: NoteOp::Rejected, - policy_decision: MemoryPolicyDecision::Reject, - reason_code: Some(REJECT_EVIDENCE_MISMATCH.to_string()), - reason: reason.cloned(), - field_path: None, - write_policy_audits: None, - }); - } - - for quote in evidence { - if quote.quote.len() > cfg.security.evidence_max_quote_chars as usize { - return Some(AddEventResult { - note_id: None, - op: NoteOp::Rejected, - policy_decision: MemoryPolicyDecision::Reject, - reason_code: Some(REJECT_EVIDENCE_MISMATCH.to_string()), - reason: reason.cloned(), - field_path: None, - write_policy_audits: None, - }); - } - if !evidence::evidence_matches(message_texts, quote.message_index, quote.quote.as_str()) { - let reason_code = - message_policy_applied.get(quote.message_index).is_some_and(|applied| *applied); - - return Some(AddEventResult { - note_id: None, - op: NoteOp::Rejected, - policy_decision: MemoryPolicyDecision::Reject, - reason_code: Some(if reason_code { - REJECT_WRITE_POLICY_MISMATCH.to_string() - } else { - REJECT_EVIDENCE_MISMATCH.to_string() - }), - reason: reason.cloned(), - field_path: None, - write_policy_audits: None, - }); - } - } - - None -} - -pub(super) fn reject_extracted_note_if_structured_invalid( - structured: Option<&StructuredFields>, - text: &str, - evidence: &[EvidenceQuote], - reason: Option<&String>, -) -> Option { - let structured = structured?; - - if structured.is_effectively_empty() { - return None; - } - - let event_evidence: Vec<(usize, String)> = - evidence.iter().map(|q| (q.message_index, q.quote.clone())).collect(); - - if let Err(err) = structured_fields::validate_structured_fields( - structured, - text, - &serde_json::json!({}), - Some(event_evidence.as_slice()), - ) { - tracing::info!(error = %err, "Rejecting extracted note due to invalid structured fields."); - - let field_path = extract_structured_rejection_field_path(&err); - - return Some(AddEventResult { - note_id: None, - op: NoteOp::Rejected, - policy_decision: MemoryPolicyDecision::Reject, - reason_code: Some(REJECT_STRUCTURED_INVALID.to_string()), - reason: reason.cloned(), - field_path, - write_policy_audits: None, - }); - } - - None -} - -pub(super) fn reject_extracted_note_if_writegate_rejects( - cfg: &Config, - reason: Option<&String>, - note_type: &str, - scope: &str, - text: &str, -) -> Option { - let gate_input = NoteInput { - note_type: note_type.to_string(), - scope: scope.to_string(), - text: text.to_string(), - }; - - if let Err(code) = writegate::writegate(&gate_input, cfg) { - return Some(AddEventResult { - note_id: None, - op: NoteOp::Rejected, - policy_decision: MemoryPolicyDecision::Reject, - reason_code: Some(crate::writegate_reason_code(code).to_string()), - reason: reason.cloned(), - field_path: None, - write_policy_audits: None, - }); - } - - None -} - -fn extract_structured_rejection_field_path(err: &Error) -> Option { - match err { - Error::NonEnglishInput { field } => Some(field.clone()), - Error::InvalidRequest { message } if message.starts_with("structured.") => - message.split_whitespace().next().map(ToString::to_string), - _ => None, - } -} diff --git a/packages/elf-service/src/add_event/validation/evidence.rs b/packages/elf-service/src/add_event/validation/evidence.rs new file mode 100644 index 00000000..30e4668e --- /dev/null +++ b/packages/elf-service/src/add_event/validation/evidence.rs @@ -0,0 +1,63 @@ +use crate::{ + NoteOp, REJECT_EVIDENCE_MISMATCH, REJECT_WRITE_POLICY_MISMATCH, + add_event::types::{AddEventResult, EvidenceQuote}, +}; +use elf_config::Config; +use elf_domain::{evidence, memory_policy::MemoryPolicyDecision}; + +pub(in crate::add_event) fn reject_extracted_note_if_evidence_invalid( + cfg: &Config, + reason: Option<&String>, + evidence: &[EvidenceQuote], + message_texts: &[String], + message_policy_applied: &[bool], +) -> Option { + if evidence.is_empty() + || evidence.len() < cfg.security.evidence_min_quotes as usize + || evidence.len() > cfg.security.evidence_max_quotes as usize + { + return Some(AddEventResult { + note_id: None, + op: NoteOp::Rejected, + policy_decision: MemoryPolicyDecision::Reject, + reason_code: Some(REJECT_EVIDENCE_MISMATCH.to_string()), + reason: reason.cloned(), + field_path: None, + write_policy_audits: None, + }); + } + + for quote in evidence { + if quote.quote.len() > cfg.security.evidence_max_quote_chars as usize { + return Some(AddEventResult { + note_id: None, + op: NoteOp::Rejected, + policy_decision: MemoryPolicyDecision::Reject, + reason_code: Some(REJECT_EVIDENCE_MISMATCH.to_string()), + reason: reason.cloned(), + field_path: None, + write_policy_audits: None, + }); + } + if !evidence::evidence_matches(message_texts, quote.message_index, quote.quote.as_str()) { + let reason_code = + message_policy_applied.get(quote.message_index).is_some_and(|applied| *applied); + + return Some(AddEventResult { + note_id: None, + op: NoteOp::Rejected, + policy_decision: MemoryPolicyDecision::Reject, + reason_code: Some(if reason_code { + REJECT_WRITE_POLICY_MISMATCH.to_string() + } else { + REJECT_EVIDENCE_MISMATCH.to_string() + }), + reason: reason.cloned(), + field_path: None, + write_policy_audits: None, + }); + } + } + + None +} diff --git a/packages/elf-service/src/add_event/validation/request.rs b/packages/elf-service/src/add_event/validation/request.rs new file mode 100644 index 00000000..8b69704f --- /dev/null +++ b/packages/elf-service/src/add_event/validation/request.rs @@ -0,0 +1,47 @@ +use crate::{Error, Result, add_event::types::AddEventRequest}; +use elf_domain::english_gate; + +pub(in crate::add_event) fn validate_add_event_request(req: &AddEventRequest) -> Result<()> { + if req.messages.is_empty() { + return Err(Error::InvalidRequest { message: "Messages list is empty.".to_string() }); + } + if req.tenant_id.trim().is_empty() + || req.project_id.trim().is_empty() + || req.agent_id.trim().is_empty() + { + return Err(Error::InvalidRequest { + message: "tenant_id, project_id, and agent_id are required.".to_string(), + }); + } + + if let Some(scope) = req.scope.as_ref() + && scope.trim().is_empty() + { + return Err(Error::InvalidRequest { + message: "scope must not be empty when provided.".to_string(), + }); + } + if let Some(profile) = req.ingestion_profile.as_ref() { + if profile.id.trim().is_empty() { + return Err(Error::InvalidRequest { + message: "ingestion_profile.id must not be empty.".to_string(), + }); + } + + if let Some(version) = profile.version + && version <= 0 + { + return Err(Error::InvalidRequest { + message: "ingestion_profile.version must be greater than zero.".to_string(), + }); + } + } + + for (idx, msg) in req.messages.iter().enumerate() { + if !english_gate::is_english_natural_language(msg.content.as_str()) { + return Err(Error::NonEnglishInput { field: format!("$.messages[{idx}].content") }); + } + } + + Ok(()) +} diff --git a/packages/elf-service/src/add_event/validation/structured.rs b/packages/elf-service/src/add_event/validation/structured.rs new file mode 100644 index 00000000..a2076c92 --- /dev/null +++ b/packages/elf-service/src/add_event/validation/structured.rs @@ -0,0 +1,57 @@ +use crate::{ + Error, NoteOp, + add_event::{ + types::{AddEventResult, EvidenceQuote}, + validation::REJECT_STRUCTURED_INVALID, + }, + structured_fields::{self, StructuredFields}, +}; +use elf_domain::memory_policy::MemoryPolicyDecision; + +pub(in crate::add_event) fn reject_extracted_note_if_structured_invalid( + structured: Option<&StructuredFields>, + text: &str, + evidence: &[EvidenceQuote], + reason: Option<&String>, +) -> Option { + let structured = structured?; + + if structured.is_effectively_empty() { + return None; + } + + let event_evidence: Vec<(usize, String)> = + evidence.iter().map(|q| (q.message_index, q.quote.clone())).collect(); + + if let Err(err) = structured_fields::validate_structured_fields( + structured, + text, + &serde_json::json!({}), + Some(event_evidence.as_slice()), + ) { + tracing::info!(error = %err, "Rejecting extracted note due to invalid structured fields."); + + let field_path = extract_structured_rejection_field_path(&err); + + return Some(AddEventResult { + note_id: None, + op: NoteOp::Rejected, + policy_decision: MemoryPolicyDecision::Reject, + reason_code: Some(REJECT_STRUCTURED_INVALID.to_string()), + reason: reason.cloned(), + field_path, + write_policy_audits: None, + }); + } + + None +} + +fn extract_structured_rejection_field_path(err: &Error) -> Option { + match err { + Error::NonEnglishInput { field } => Some(field.clone()), + Error::InvalidRequest { message } if message.starts_with("structured.") => + message.split_whitespace().next().map(ToString::to_string), + _ => None, + } +} diff --git a/packages/elf-service/src/add_event/validation/write_policy.rs b/packages/elf-service/src/add_event/validation/write_policy.rs new file mode 100644 index 00000000..20913df7 --- /dev/null +++ b/packages/elf-service/src/add_event/validation/write_policy.rs @@ -0,0 +1,52 @@ +use crate::{ + Error, Result, + add_event::types::{EventMessage, ProcessedEventOutput}, +}; +use elf_domain::writegate::{self, WritePolicyAudit, WritePolicyError}; + +pub(in crate::add_event) fn apply_write_policies_to_messages( + messages: &[EventMessage], +) -> Result { + let mut message_policy_applied = Vec::with_capacity(messages.len()); + let mut write_policy_audits = Vec::new(); + let mut transformed_messages = Vec::with_capacity(messages.len()); + + for message in messages { + let (transformed_message, audit) = apply_write_policy_to_message(message)?; + + message_policy_applied.push(audit.is_some()); + + if let Some(audit) = audit { + write_policy_audits.push(audit); + } + + transformed_messages.push(transformed_message); + } + + Ok(( + transformed_messages, + message_policy_applied, + if write_policy_audits.is_empty() { None } else { Some(write_policy_audits) }, + )) +} + +fn apply_write_policy_to_message( + message: &EventMessage, +) -> Result<(EventMessage, Option)> { + let result = + writegate::apply_write_policy(message.content.as_str(), message.write_policy.as_ref()) + .map_err(|err| { + let message = match err { + WritePolicyError::InvalidSpan => "Invalid write_policy span provided.", + WritePolicyError::OverlappingOps => "Overlapping write_policy spans provided.", + }; + + Error::InvalidRequest { message: message.to_string() } + })?; + let has_policy = message.write_policy.is_some(); + let mut transformed = message.clone(); + + transformed.content = result.transformed; + + Ok((transformed, if has_policy { Some(result.audit) } else { None })) +} diff --git a/packages/elf-service/src/add_event/validation/writegate.rs b/packages/elf-service/src/add_event/validation/writegate.rs new file mode 100644 index 00000000..20000a70 --- /dev/null +++ b/packages/elf-service/src/add_event/validation/writegate.rs @@ -0,0 +1,34 @@ +use crate::{NoteOp, add_event::types::AddEventResult}; +use elf_config::Config; +use elf_domain::{ + memory_policy::MemoryPolicyDecision, + writegate::{self, NoteInput}, +}; + +pub(in crate::add_event) fn reject_extracted_note_if_writegate_rejects( + cfg: &Config, + reason: Option<&String>, + note_type: &str, + scope: &str, + text: &str, +) -> Option { + let gate_input = NoteInput { + note_type: note_type.to_string(), + scope: scope.to_string(), + text: text.to_string(), + }; + + if let Err(code) = writegate::writegate(&gate_input, cfg) { + return Some(AddEventResult { + note_id: None, + op: NoteOp::Rejected, + policy_decision: MemoryPolicyDecision::Reject, + reason_code: Some(crate::writegate_reason_code(code).to_string()), + reason: reason.cloned(), + field_path: None, + write_policy_audits: None, + }); + } + + None +} diff --git a/packages/elf-service/src/ingestion_profiles/admin.rs b/packages/elf-service/src/ingestion_profiles/admin.rs index f9aea896..de92ef14 100644 --- a/packages/elf-service/src/ingestion_profiles/admin.rs +++ b/packages/elf-service/src/ingestion_profiles/admin.rs @@ -1,273 +1,4 @@ -use time::OffsetDateTime; - -use crate::{ - ElfService, Error, Result, - ingestion_profiles::{ - ADD_EVENT_PIPELINE, profile, - storage::{self}, - types::{ - AdminIngestionProfileCreateRequest, AdminIngestionProfileDefaultGetRequest, - AdminIngestionProfileDefaultResponse, AdminIngestionProfileDefaultSetRequest, - AdminIngestionProfileGetRequest, AdminIngestionProfileListRequest, - AdminIngestionProfileResponse, AdminIngestionProfileSummary, - AdminIngestionProfileVersionsListRequest, AdminIngestionProfileVersionsListResponse, - AdminIngestionProfilesListResponse, IngestionProfileSelector, - }, - }, -}; - -impl ElfService { - /// Creates a new ingestion profile version. - pub async fn admin_ingestion_profile_create( - &self, - req: AdminIngestionProfileCreateRequest, - ) -> Result { - let profile_id = req.profile_id.trim().to_string(); - let created_by = req.created_by.trim().to_string(); - - if profile_id.is_empty() { - return Err(Error::InvalidRequest { - message: "profile_id must be non-empty.".to_string(), - }); - } - if created_by.is_empty() { - return Err(Error::InvalidRequest { - message: "created_by must be non-empty.".to_string(), - }); - } - if !req.profile.is_object() { - return Err(Error::InvalidRequest { - message: "profile must be a JSON object.".to_string(), - }); - } - - let _ = profile::parse_profile(req.profile.clone())?; - let version = match req.version { - Some(version) if version > 0 => version, - Some(_) => { - return Err(Error::InvalidRequest { - message: "version must be greater than 0.".to_string(), - }); - }, - None => - storage::next_profile_version( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - profile_id.as_str(), - ) - .await?, - }; - let row = storage::insert_profile_metadata( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - profile_id.as_str(), - version, - req.profile, - created_by.as_str(), - ) - .await?; - let row = row.ok_or_else(|| Error::Conflict { - message: format!( - "Ingestion profile '{}' version {} already exists for tenant '{}' project '{}' pipeline '{}'.", - profile_id, version, req.tenant_id, req.project_id, ADD_EVENT_PIPELINE, - ), - })?; - - Ok(AdminIngestionProfileResponse { - profile_id: row.profile_id, - version: row.version, - profile: row.profile, - created_at: row.created_at, - created_by: row.created_by, - }) - } - - /// Lists the latest visible ingestion profile versions. - pub async fn admin_ingestion_profiles_list( - &self, - req: AdminIngestionProfileListRequest, - ) -> Result { - let rows = storage::list_latest_profile_summaries( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - ) - .await?; - let profiles = rows - .into_iter() - .map(|row| AdminIngestionProfileSummary { - profile_id: row.profile_id, - version: row.version, - created_at: row.created_at, - created_by: row.created_by, - }) - .collect(); - - Ok(AdminIngestionProfilesListResponse { profiles }) - } - - /// Fetches one ingestion profile version. - pub async fn admin_ingestion_profile_get( - &self, - req: AdminIngestionProfileGetRequest, - ) -> Result { - let selector = IngestionProfileSelector { - id: req.profile_id.trim().to_string(), - version: req.version, - }; - - if selector.id.is_empty() { - return Err(Error::InvalidRequest { - message: "profile_id must be non-empty.".to_string(), - }); - } - - if let Some(version) = selector.version - && version <= 0 - { - return Err(Error::InvalidRequest { - message: "version must be greater than 0.".to_string(), - }); - } - - let row = storage::select_profile_metadata( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - &selector, - ) - .await?; - - Ok(AdminIngestionProfileResponse { - profile_id: row.profile_id, - version: row.version, - profile: row.profile, - created_at: row.created_at, - created_by: row.created_by, - }) - } - - /// Lists all versions for one ingestion profile. - pub async fn admin_ingestion_profile_versions_list( - &self, - req: AdminIngestionProfileVersionsListRequest, - ) -> Result { - let profile_id = req.profile_id.trim().to_string(); - - if profile_id.is_empty() { - return Err(Error::InvalidRequest { - message: "profile_id must be non-empty.".to_string(), - }); - } - - let rows = storage::list_profile_version_summaries( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - profile_id.as_str(), - ) - .await?; - let profiles = rows - .into_iter() - .map(|row| AdminIngestionProfileSummary { - profile_id: row.profile_id, - version: row.version, - created_at: row.created_at, - created_by: row.created_by, - }) - .collect(); - - Ok(AdminIngestionProfileVersionsListResponse { profiles }) - } - - /// Reads the default ingestion profile pointer. - pub async fn admin_ingestion_profile_default_get( - &self, - req: AdminIngestionProfileDefaultGetRequest, - ) -> Result { - storage::seed_default_profile( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - ) - .await?; - - let row = storage::select_default_row( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - ) - .await?; - let row = match row { - Some(row) => row, - None => { - let selector = storage::select_default_selector( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - ) - .await?; - - return Ok(AdminIngestionProfileDefaultResponse { - profile_id: selector.id, - version: selector.version, - updated_at: OffsetDateTime::now_utc(), - }); - }, - }; - - Ok(AdminIngestionProfileDefaultResponse { - profile_id: row.profile_id, - version: row.version, - updated_at: row.updated_at, - }) - } - - /// Updates the default ingestion profile pointer. - pub async fn admin_ingestion_profile_default_set( - &self, - req: AdminIngestionProfileDefaultSetRequest, - ) -> Result { - let profile_id = req.profile_id.trim().to_string(); - - if profile_id.is_empty() { - return Err(Error::InvalidRequest { - message: "profile_id must be non-empty.".to_string(), - }); - } - - if let Some(version) = req.version - && version <= 0 - { - return Err(Error::InvalidRequest { - message: "version must be greater than 0.".to_string(), - }); - } - - let selector = IngestionProfileSelector { id: profile_id.clone(), version: req.version }; - let row = storage::select_profile_metadata( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - &selector, - ) - .await?; - let version = row.version; - let row = storage::upsert_default_row( - &self.db.pool, - req.tenant_id.as_str(), - req.project_id.as_str(), - row.profile_id, - version, - ) - .await?; - - Ok(AdminIngestionProfileDefaultResponse { - profile_id: row.profile_id, - version: row.version, - updated_at: row.updated_at, - }) - } -} +mod create; +mod defaults; +mod read; +mod versions; diff --git a/packages/elf-service/src/ingestion_profiles/admin/create.rs b/packages/elf-service/src/ingestion_profiles/admin/create.rs new file mode 100644 index 00000000..bdfff59b --- /dev/null +++ b/packages/elf-service/src/ingestion_profiles/admin/create.rs @@ -0,0 +1,76 @@ +use crate::{ + ElfService, Error, Result, + ingestion_profiles::{ + ADD_EVENT_PIPELINE, profile, storage, + types::{AdminIngestionProfileCreateRequest, AdminIngestionProfileResponse}, + }, +}; + +impl ElfService { + /// Creates a new ingestion profile version. + pub async fn admin_ingestion_profile_create( + &self, + req: AdminIngestionProfileCreateRequest, + ) -> Result { + let profile_id = req.profile_id.trim().to_string(); + let created_by = req.created_by.trim().to_string(); + + if profile_id.is_empty() { + return Err(Error::InvalidRequest { + message: "profile_id must be non-empty.".to_string(), + }); + } + if created_by.is_empty() { + return Err(Error::InvalidRequest { + message: "created_by must be non-empty.".to_string(), + }); + } + if !req.profile.is_object() { + return Err(Error::InvalidRequest { + message: "profile must be a JSON object.".to_string(), + }); + } + + let _ = profile::parse_profile(req.profile.clone())?; + let version = match req.version { + Some(version) if version > 0 => version, + Some(_) => { + return Err(Error::InvalidRequest { + message: "version must be greater than 0.".to_string(), + }); + }, + None => + storage::next_profile_version( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + profile_id.as_str(), + ) + .await?, + }; + let row = storage::insert_profile_metadata( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + profile_id.as_str(), + version, + req.profile, + created_by.as_str(), + ) + .await?; + let row = row.ok_or_else(|| Error::Conflict { + message: format!( + "Ingestion profile '{}' version {} already exists for tenant '{}' project '{}' pipeline '{}'.", + profile_id, version, req.tenant_id, req.project_id, ADD_EVENT_PIPELINE, + ), + })?; + + Ok(AdminIngestionProfileResponse { + profile_id: row.profile_id, + version: row.version, + profile: row.profile, + created_at: row.created_at, + created_by: row.created_by, + }) + } +} diff --git a/packages/elf-service/src/ingestion_profiles/admin/defaults.rs b/packages/elf-service/src/ingestion_profiles/admin/defaults.rs new file mode 100644 index 00000000..7b044f09 --- /dev/null +++ b/packages/elf-service/src/ingestion_profiles/admin/defaults.rs @@ -0,0 +1,103 @@ +use time::OffsetDateTime; + +use crate::{ + ElfService, Error, Result, + ingestion_profiles::{ + storage, + types::{ + AdminIngestionProfileDefaultGetRequest, AdminIngestionProfileDefaultResponse, + AdminIngestionProfileDefaultSetRequest, IngestionProfileSelector, + }, + }, +}; + +impl ElfService { + /// Reads the default ingestion profile pointer. + pub async fn admin_ingestion_profile_default_get( + &self, + req: AdminIngestionProfileDefaultGetRequest, + ) -> Result { + storage::seed_default_profile( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + ) + .await?; + + let row = storage::select_default_row( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + ) + .await?; + let row = match row { + Some(row) => row, + None => { + let selector = storage::select_default_selector( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + ) + .await?; + + return Ok(AdminIngestionProfileDefaultResponse { + profile_id: selector.id, + version: selector.version, + updated_at: OffsetDateTime::now_utc(), + }); + }, + }; + + Ok(AdminIngestionProfileDefaultResponse { + profile_id: row.profile_id, + version: row.version, + updated_at: row.updated_at, + }) + } + + /// Updates the default ingestion profile pointer. + pub async fn admin_ingestion_profile_default_set( + &self, + req: AdminIngestionProfileDefaultSetRequest, + ) -> Result { + let profile_id = req.profile_id.trim().to_string(); + + if profile_id.is_empty() { + return Err(Error::InvalidRequest { + message: "profile_id must be non-empty.".to_string(), + }); + } + + if let Some(version) = req.version + && version <= 0 + { + return Err(Error::InvalidRequest { + message: "version must be greater than 0.".to_string(), + }); + } + + let selector = IngestionProfileSelector { id: profile_id.clone(), version: req.version }; + let row = storage::select_profile_metadata( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + &selector, + ) + .await?; + let version = row.version; + let row = storage::upsert_default_row( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + row.profile_id, + version, + ) + .await?; + + Ok(AdminIngestionProfileDefaultResponse { + profile_id: row.profile_id, + version: row.version, + updated_at: row.updated_at, + }) + } +} diff --git a/packages/elf-service/src/ingestion_profiles/admin/read.rs b/packages/elf-service/src/ingestion_profiles/admin/read.rs new file mode 100644 index 00000000..515025a0 --- /dev/null +++ b/packages/elf-service/src/ingestion_profiles/admin/read.rs @@ -0,0 +1,78 @@ +use crate::{ + ElfService, Error, Result, + ingestion_profiles::{ + storage, + types::{ + AdminIngestionProfileGetRequest, AdminIngestionProfileListRequest, + AdminIngestionProfileResponse, AdminIngestionProfileSummary, + AdminIngestionProfilesListResponse, IngestionProfileSelector, + }, + }, +}; + +impl ElfService { + /// Lists the latest visible ingestion profile versions. + pub async fn admin_ingestion_profiles_list( + &self, + req: AdminIngestionProfileListRequest, + ) -> Result { + let rows = storage::list_latest_profile_summaries( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + ) + .await?; + let profiles = rows + .into_iter() + .map(|row| AdminIngestionProfileSummary { + profile_id: row.profile_id, + version: row.version, + created_at: row.created_at, + created_by: row.created_by, + }) + .collect(); + + Ok(AdminIngestionProfilesListResponse { profiles }) + } + + /// Fetches one ingestion profile version. + pub async fn admin_ingestion_profile_get( + &self, + req: AdminIngestionProfileGetRequest, + ) -> Result { + let selector = IngestionProfileSelector { + id: req.profile_id.trim().to_string(), + version: req.version, + }; + + if selector.id.is_empty() { + return Err(Error::InvalidRequest { + message: "profile_id must be non-empty.".to_string(), + }); + } + + if let Some(version) = selector.version + && version <= 0 + { + return Err(Error::InvalidRequest { + message: "version must be greater than 0.".to_string(), + }); + } + + let row = storage::select_profile_metadata( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + &selector, + ) + .await?; + + Ok(AdminIngestionProfileResponse { + profile_id: row.profile_id, + version: row.version, + profile: row.profile, + created_at: row.created_at, + created_by: row.created_by, + }) + } +} diff --git a/packages/elf-service/src/ingestion_profiles/admin/versions.rs b/packages/elf-service/src/ingestion_profiles/admin/versions.rs new file mode 100644 index 00000000..a3bf1a82 --- /dev/null +++ b/packages/elf-service/src/ingestion_profiles/admin/versions.rs @@ -0,0 +1,45 @@ +use crate::{ + ElfService, Error, Result, + ingestion_profiles::{ + storage, + types::{ + AdminIngestionProfileSummary, AdminIngestionProfileVersionsListRequest, + AdminIngestionProfileVersionsListResponse, + }, + }, +}; + +impl ElfService { + /// Lists all versions for one ingestion profile. + pub async fn admin_ingestion_profile_versions_list( + &self, + req: AdminIngestionProfileVersionsListRequest, + ) -> Result { + let profile_id = req.profile_id.trim().to_string(); + + if profile_id.is_empty() { + return Err(Error::InvalidRequest { + message: "profile_id must be non-empty.".to_string(), + }); + } + + let rows = storage::list_profile_version_summaries( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + profile_id.as_str(), + ) + .await?; + let profiles = rows + .into_iter() + .map(|row| AdminIngestionProfileSummary { + profile_id: row.profile_id, + version: row.version, + created_at: row.created_at, + created_by: row.created_by, + }) + .collect(); + + Ok(AdminIngestionProfileVersionsListResponse { profiles }) + } +} diff --git a/packages/elf-service/src/ingestion_profiles/storage.rs b/packages/elf-service/src/ingestion_profiles/storage.rs index 00995e8b..fb68ef89 100644 --- a/packages/elf-service/src/ingestion_profiles/storage.rs +++ b/packages/elf-service/src/ingestion_profiles/storage.rs @@ -1,228 +1,14 @@ mod defaults; - -pub(super) use self::defaults::{ - seed_default_profile, select_default_row, select_default_selector, upsert_default_row, +mod metadata; +mod rows; + +pub(super) use self::{ + defaults::{ + seed_default_profile, select_default_row, select_default_selector, upsert_default_row, + }, + metadata::{ + insert_profile_metadata, list_latest_profile_summaries, list_profile_version_summaries, + next_profile_version, select_profile, select_profile_metadata, + }, + rows::{ProfileMetadataRow, ProfileRow, ProfileSummaryRow}, }; - -use serde_json::Value; -use sqlx::{FromRow, PgPool}; -use time::OffsetDateTime; - -use crate::{ - Error, Result, - ingestion_profiles::{ADD_EVENT_PIPELINE, types::IngestionProfileSelector}, -}; - -#[derive(FromRow)] -pub(super) struct ProfileRow { - pub(super) profile_id: String, - pub(super) version: i32, - pub(super) profile: Value, -} - -#[derive(FromRow)] -pub(super) struct ProfileMetadataRow { - pub(super) profile_id: String, - pub(super) version: i32, - pub(super) profile: Value, - pub(super) created_at: OffsetDateTime, - pub(super) created_by: String, -} - -#[derive(FromRow)] -pub(super) struct ProfileSummaryRow { - pub(super) profile_id: String, - pub(super) version: i32, - pub(super) created_at: OffsetDateTime, - pub(super) created_by: String, -} - -pub(super) async fn next_profile_version( - pool: &PgPool, - tenant_id: &str, - project_id: &str, - profile_id: &str, -) -> Result { - let version = sqlx::query_scalar::<_, i32>( - "SELECT COALESCE(MAX(version), 0) + 1 FROM memory_ingestion_profiles WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .bind(profile_id) - .fetch_one(pool) - .await?; - - Ok(version) -} - -pub(super) async fn insert_profile_metadata( - pool: &PgPool, - tenant_id: &str, - project_id: &str, - profile_id: &str, - version: i32, - profile: Value, - created_by: &str, -) -> Result> { - let row = sqlx::query_as::<_, ProfileMetadataRow>( - "\ -INSERT INTO memory_ingestion_profiles ( - tenant_id, - project_id, - pipeline, - profile_id, - version, - profile, - created_by -) VALUES ($1,$2,$3,$4,$5,$6::jsonb,$7) -ON CONFLICT DO NOTHING -RETURNING profile_id, version, profile, created_at, created_by", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .bind(profile_id) - .bind(version) - .bind(profile) - .bind(created_by) - .fetch_optional(pool) - .await?; - - Ok(row) -} - -pub(super) async fn list_latest_profile_summaries( - pool: &PgPool, - tenant_id: &str, - project_id: &str, -) -> Result> { - let rows = sqlx::query_as::<_, ProfileSummaryRow>( - "\ -SELECT DISTINCT ON (profile_id) - profile_id, version, created_at, created_by -FROM memory_ingestion_profiles -WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 -ORDER BY profile_id, version DESC", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .fetch_all(pool) - .await?; - - Ok(rows) -} - -pub(super) async fn select_profile_metadata( - pool: &PgPool, - tenant_id: &str, - project_id: &str, - selector: &IngestionProfileSelector, -) -> Result { - let row = if let Some(version) = selector.version { - sqlx::query_as::<_, ProfileMetadataRow>( - "\ -SELECT profile_id, version, profile, created_at, created_by -FROM memory_ingestion_profiles -WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4 AND version=$5", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .bind(selector.id.as_str()) - .bind(version) - .fetch_optional(pool) - .await? - } else { - sqlx::query_as::<_, ProfileMetadataRow>( - "\ -SELECT profile_id, version, profile, created_at, created_by -FROM memory_ingestion_profiles -WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4 -ORDER BY version DESC -LIMIT 1", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .bind(selector.id.as_str()) - .fetch_optional(pool) - .await? - }; - - row.ok_or_else(|| Error::InvalidRequest { - message: format!( - "Ingestion profile '{}' not found for tenant '{}' project '{}' pipeline '{}'.", - selector.id, tenant_id, project_id, ADD_EVENT_PIPELINE, - ), - }) -} - -pub(super) async fn list_profile_version_summaries( - pool: &PgPool, - tenant_id: &str, - project_id: &str, - profile_id: &str, -) -> Result> { - let rows = sqlx::query_as::<_, ProfileSummaryRow>( - "\ -SELECT profile_id, version, created_at, created_by -FROM memory_ingestion_profiles -WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4 -ORDER BY version DESC", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .bind(profile_id) - .fetch_all(pool) - .await?; - - Ok(rows) -} - -pub(super) async fn select_profile( - pool: &PgPool, - tenant_id: &str, - project_id: &str, - selector: &IngestionProfileSelector, -) -> Result { - let row = if let Some(version) = selector.version { - sqlx::query_as::<_, ProfileRow>( - "\ -SELECT profile_id, version, profile -FROM memory_ingestion_profiles -WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4 AND version=$5", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .bind(selector.id.as_str()) - .bind(version) - .fetch_optional(pool) - .await? - } else { - sqlx::query_as::<_, ProfileRow>( - "\ -SELECT profile_id, version, profile -FROM memory_ingestion_profiles -WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4 -ORDER BY version DESC -LIMIT 1", - ) - .bind(tenant_id) - .bind(project_id) - .bind(ADD_EVENT_PIPELINE) - .bind(selector.id.as_str()) - .fetch_optional(pool) - .await? - }; - - row.ok_or_else(|| Error::InvalidRequest { - message: format!( - "Ingestion profile '{}' not found for tenant '{}' project '{}' pipeline '{}'.", - selector.id, tenant_id, project_id, ADD_EVENT_PIPELINE - ), - }) -} diff --git a/packages/elf-service/src/ingestion_profiles/storage/metadata.rs b/packages/elf-service/src/ingestion_profiles/storage/metadata.rs new file mode 100644 index 00000000..ceaa075c --- /dev/null +++ b/packages/elf-service/src/ingestion_profiles/storage/metadata.rs @@ -0,0 +1,201 @@ +use serde_json::Value; +use sqlx::PgPool; + +use crate::{ + Error, Result, + ingestion_profiles::{ + ADD_EVENT_PIPELINE, + storage::{ProfileMetadataRow, ProfileRow, ProfileSummaryRow}, + types::IngestionProfileSelector, + }, +}; + +pub(in crate::ingestion_profiles) async fn next_profile_version( + pool: &PgPool, + tenant_id: &str, + project_id: &str, + profile_id: &str, +) -> Result { + let version = sqlx::query_scalar::<_, i32>( + "SELECT COALESCE(MAX(version), 0) + 1 FROM memory_ingestion_profiles WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .bind(profile_id) + .fetch_one(pool) + .await?; + + Ok(version) +} + +pub(in crate::ingestion_profiles) async fn insert_profile_metadata( + pool: &PgPool, + tenant_id: &str, + project_id: &str, + profile_id: &str, + version: i32, + profile: Value, + created_by: &str, +) -> Result> { + let row = sqlx::query_as::<_, ProfileMetadataRow>( + "\ +INSERT INTO memory_ingestion_profiles ( + tenant_id, + project_id, + pipeline, + profile_id, + version, + profile, + created_by +) VALUES ($1,$2,$3,$4,$5,$6::jsonb,$7) +ON CONFLICT DO NOTHING +RETURNING profile_id, version, profile, created_at, created_by", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .bind(profile_id) + .bind(version) + .bind(profile) + .bind(created_by) + .fetch_optional(pool) + .await?; + + Ok(row) +} + +pub(in crate::ingestion_profiles) async fn list_latest_profile_summaries( + pool: &PgPool, + tenant_id: &str, + project_id: &str, +) -> Result> { + let rows = sqlx::query_as::<_, ProfileSummaryRow>( + "\ +SELECT DISTINCT ON (profile_id) + profile_id, version, created_at, created_by +FROM memory_ingestion_profiles +WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 +ORDER BY profile_id, version DESC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .fetch_all(pool) + .await?; + + Ok(rows) +} + +pub(in crate::ingestion_profiles) async fn select_profile_metadata( + pool: &PgPool, + tenant_id: &str, + project_id: &str, + selector: &IngestionProfileSelector, +) -> Result { + let row = if let Some(version) = selector.version { + sqlx::query_as::<_, ProfileMetadataRow>( + "\ +SELECT profile_id, version, profile, created_at, created_by +FROM memory_ingestion_profiles +WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4 AND version=$5", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .bind(selector.id.as_str()) + .bind(version) + .fetch_optional(pool) + .await? + } else { + sqlx::query_as::<_, ProfileMetadataRow>( + "\ +SELECT profile_id, version, profile, created_at, created_by +FROM memory_ingestion_profiles +WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4 +ORDER BY version DESC +LIMIT 1", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .bind(selector.id.as_str()) + .fetch_optional(pool) + .await? + }; + + row.ok_or_else(|| Error::InvalidRequest { + message: format!( + "Ingestion profile '{}' not found for tenant '{}' project '{}' pipeline '{}'.", + selector.id, tenant_id, project_id, ADD_EVENT_PIPELINE, + ), + }) +} + +pub(in crate::ingestion_profiles) async fn list_profile_version_summaries( + pool: &PgPool, + tenant_id: &str, + project_id: &str, + profile_id: &str, +) -> Result> { + let rows = sqlx::query_as::<_, ProfileSummaryRow>( + "\ +SELECT profile_id, version, created_at, created_by +FROM memory_ingestion_profiles +WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4 +ORDER BY version DESC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .bind(profile_id) + .fetch_all(pool) + .await?; + + Ok(rows) +} + +pub(in crate::ingestion_profiles) async fn select_profile( + pool: &PgPool, + tenant_id: &str, + project_id: &str, + selector: &IngestionProfileSelector, +) -> Result { + let row = if let Some(version) = selector.version { + sqlx::query_as::<_, ProfileRow>( + "\ +SELECT profile_id, version, profile +FROM memory_ingestion_profiles +WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4 AND version=$5", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .bind(selector.id.as_str()) + .bind(version) + .fetch_optional(pool) + .await? + } else { + sqlx::query_as::<_, ProfileRow>( + "\ +SELECT profile_id, version, profile +FROM memory_ingestion_profiles +WHERE tenant_id=$1 AND project_id=$2 AND pipeline=$3 AND profile_id=$4 +ORDER BY version DESC +LIMIT 1", + ) + .bind(tenant_id) + .bind(project_id) + .bind(ADD_EVENT_PIPELINE) + .bind(selector.id.as_str()) + .fetch_optional(pool) + .await? + }; + + row.ok_or_else(|| Error::InvalidRequest { + message: format!( + "Ingestion profile '{}' not found for tenant '{}' project '{}' pipeline '{}'.", + selector.id, tenant_id, project_id, ADD_EVENT_PIPELINE + ), + }) +} diff --git a/packages/elf-service/src/ingestion_profiles/storage/rows.rs b/packages/elf-service/src/ingestion_profiles/storage/rows.rs new file mode 100644 index 00000000..88d29d6f --- /dev/null +++ b/packages/elf-service/src/ingestion_profiles/storage/rows.rs @@ -0,0 +1,27 @@ +use serde_json::Value; +use sqlx::FromRow; +use time::OffsetDateTime; + +#[derive(FromRow)] +pub(in crate::ingestion_profiles) struct ProfileRow { + pub(in crate::ingestion_profiles) profile_id: String, + pub(in crate::ingestion_profiles) version: i32, + pub(in crate::ingestion_profiles) profile: Value, +} + +#[derive(FromRow)] +pub(in crate::ingestion_profiles) struct ProfileMetadataRow { + pub(in crate::ingestion_profiles) profile_id: String, + pub(in crate::ingestion_profiles) version: i32, + pub(in crate::ingestion_profiles) profile: Value, + pub(in crate::ingestion_profiles) created_at: OffsetDateTime, + pub(in crate::ingestion_profiles) created_by: String, +} + +#[derive(FromRow)] +pub(in crate::ingestion_profiles) struct ProfileSummaryRow { + pub(in crate::ingestion_profiles) profile_id: String, + pub(in crate::ingestion_profiles) version: i32, + pub(in crate::ingestion_profiles) created_at: OffsetDateTime, + pub(in crate::ingestion_profiles) created_by: String, +} diff --git a/packages/elf-service/src/sharing/grants.rs b/packages/elf-service/src/sharing/grants.rs index fe5bf0cf..77f55a3f 100644 --- a/packages/elf-service/src/sharing/grants.rs +++ b/packages/elf-service/src/sharing/grants.rs @@ -1,298 +1,4 @@ -use sqlx::FromRow; -use uuid::Uuid; - -use crate::{ - ElfService, Error, Result, - access::ORG_PROJECT_ID, - sharing::{ - sql::{AGENT_SPACE_GRANT_UPSERT_SQL, PROJECT_SPACE_GRANT_UPSERT_SQL}, - types::{ - GranteeKind, ShareScope, SpaceGrantItem, SpaceGrantRevokeRequest, - SpaceGrantRevokeResponse, SpaceGrantUpsertRequest, SpaceGrantUpsertResponse, - SpaceGrantsListRequest, SpaceGrantsListResponse, - }, - }, -}; - -impl ElfService { - /// Creates or reactivates a shared-scope grant. - pub async fn space_grant_upsert( - &self, - req: SpaceGrantUpsertRequest, - ) -> Result { - let tenant_id = req.tenant_id.trim(); - let project_id = req.project_id.trim(); - let agent_id = req.agent_id.trim(); - - if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { - return Err(Error::InvalidRequest { - message: "tenant_id, project_id, and agent_id are required.".to_string(), - }); - } - - let scope = req.scope.as_str(); - let scope_allowed = match scope { - "project_shared" => self.cfg.scopes.write_allowed.project_shared, - "org_shared" => self.cfg.scopes.write_allowed.org_shared, - _ => false, - }; - - if !scope_allowed { - return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); - } - if req.grantee_kind == GranteeKind::Agent - && req.grantee_agent_id.as_ref().is_none_or(|id| id.trim().is_empty()) - { - return Err(Error::InvalidRequest { - message: "grantee_agent_id is required for agent grantee_kind.".to_string(), - }); - } - - let grantee_agent_id = req - .grantee_agent_id - .as_ref() - .map(|value| value.trim()) - .filter(|value| !value.is_empty()) - .map(ToString::to_string); - - if req.grantee_kind == GranteeKind::Project && grantee_agent_id.is_some() { - return Err(Error::InvalidRequest { - message: "grantee_agent_id must be empty for project grantee_kind.".to_string(), - }); - } - - let grantee_agent_id_ref = grantee_agent_id.as_deref(); - let now = time::OffsetDateTime::now_utc(); - let effective_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; - - if req.grantee_kind == GranteeKind::Project { - self.upsert_project_grant(tenant_id, effective_project_id, scope, agent_id, now) - .await?; - } else { - self.upsert_agent_grant( - tenant_id, - effective_project_id, - scope, - agent_id, - grantee_agent_id_ref, - now, - ) - .await?; - } - - Ok(SpaceGrantUpsertResponse { - scope: scope.to_string(), - grantee_kind: req.grantee_kind, - grantee_agent_id, - granted: true, - }) - } - - async fn upsert_project_grant( - &self, - tenant_id: &str, - project_id: &str, - scope: &str, - agent_id: &str, - now: time::OffsetDateTime, - ) -> Result<()> { - sqlx::query(PROJECT_SPACE_GRANT_UPSERT_SQL) - .bind(Uuid::new_v4()) - .bind(tenant_id) - .bind(project_id) - .bind(scope) - .bind(agent_id) - .bind("project") - .bind::>(None) - .bind(agent_id) - .bind(now) - .execute(&self.db.pool) - .await?; - - Ok(()) - } - - async fn upsert_agent_grant( - &self, - tenant_id: &str, - project_id: &str, - scope: &str, - agent_id: &str, - grantee_agent_id: Option<&str>, - now: time::OffsetDateTime, - ) -> Result<()> { - sqlx::query(AGENT_SPACE_GRANT_UPSERT_SQL) - .bind(Uuid::new_v4()) - .bind(tenant_id) - .bind(project_id) - .bind(scope) - .bind(agent_id) - .bind("agent") - .bind(grantee_agent_id) - .bind(agent_id) - .bind(now) - .execute(&self.db.pool) - .await?; - - Ok(()) - } - - /// Revokes a shared-scope grant. - pub async fn space_grant_revoke( - &self, - req: SpaceGrantRevokeRequest, - ) -> Result { - let tenant_id = req.tenant_id.trim(); - let project_id = req.project_id.trim(); - let agent_id = req.agent_id.trim(); - - if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { - return Err(Error::InvalidRequest { - message: "tenant_id, project_id, and agent_id are required.".to_string(), - }); - } - - let scope = req.scope.as_str(); - let grantee_agent_id = req - .grantee_agent_id - .as_deref() - .map(|value| value.trim()) - .filter(|value| !value.is_empty()); - - if req.grantee_kind == GranteeKind::Agent && grantee_agent_id.is_none() { - return Err(Error::InvalidRequest { - message: "grantee_agent_id is required for agent grantee_kind.".to_string(), - }); - } - if req.grantee_kind == GranteeKind::Project && grantee_agent_id.is_some() { - return Err(Error::InvalidRequest { - message: "grantee_agent_id must be empty for project grantee_kind.".to_string(), - }); - } - - let scope_allowed = match scope { - "project_shared" => self.cfg.scopes.write_allowed.project_shared, - "org_shared" => self.cfg.scopes.write_allowed.org_shared, - _ => false, - }; - - if !scope_allowed { - return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); - } - - let effective_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; - let revocation = sqlx::query( - "\ -UPDATE memory_space_grants -SET revoked_at = $7, - revoked_by_agent_id = $8 -WHERE tenant_id = $1 - AND project_id = $2 - AND scope = $3 - AND space_owner_agent_id = $4 - AND grantee_kind = $5 - AND ((grantee_kind = 'project' AND grantee_agent_id IS NULL) - OR (grantee_kind = 'agent' AND grantee_agent_id = $6)) - AND revoked_at IS NULL", - ) - .bind(tenant_id) - .bind(effective_project_id) - .bind(scope) - .bind(agent_id) - .bind(match req.grantee_kind { - GranteeKind::Project => "project", - GranteeKind::Agent => "agent", - }) - .bind(grantee_agent_id) - .bind(time::OffsetDateTime::now_utc()) - .bind(agent_id) - .execute(&self.db.pool) - .await?; - - if revocation.rows_affected() == 0 { - return Err(Error::InvalidRequest { message: "No active grant found.".to_string() }); - } - - Ok(SpaceGrantRevokeResponse { revoked: true }) - } - - /// Lists active grants for a shared scope. - pub async fn space_grants_list( - &self, - req: SpaceGrantsListRequest, - ) -> Result { - let tenant_id = req.tenant_id.trim(); - let project_id = req.project_id.trim(); - let agent_id = req.agent_id.trim(); - - if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { - return Err(Error::InvalidRequest { - message: "tenant_id, project_id, and agent_id are required.".to_string(), - }); - } - - let scope = req.scope.as_str(); - let scope_allowed = match scope { - "project_shared" => self.cfg.scopes.write_allowed.project_shared, - "org_shared" => self.cfg.scopes.write_allowed.org_shared, - _ => false, - }; - - if !scope_allowed { - return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); - } - - let effective_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; - - #[derive(FromRow)] - struct Row { - scope: String, - grantee_kind: String, - grantee_agent_id: Option, - granted_by_agent_id: String, - granted_at: time::OffsetDateTime, - } - - let rows = sqlx::query_as::<_, Row>( - "\ -SELECT scope, grantee_kind, grantee_agent_id, granted_by_agent_id, granted_at -FROM memory_space_grants -WHERE tenant_id = $1 - AND project_id = $2 - AND space_owner_agent_id = $3 - AND scope = $4 - AND revoked_at IS NULL -ORDER BY granted_at DESC", - ) - .bind(tenant_id) - .bind(effective_project_id) - .bind(agent_id) - .bind(scope) - .fetch_all(&self.db.pool) - .await?; - let mut grants = Vec::with_capacity(rows.len()); - - for row in rows { - let grantee_kind = match row.grantee_kind.as_str() { - "agent" => GranteeKind::Agent, - "project" => GranteeKind::Project, - _ => continue, - }; - let scope = match row.scope.as_str() { - "project_shared" => ShareScope::ProjectShared, - "org_shared" => ShareScope::OrgShared, - _ => continue, - }; - - grants.push(SpaceGrantItem { - scope, - grantee_kind, - grantee_agent_id: row.grantee_agent_id, - granted_by_agent_id: row.granted_by_agent_id, - granted_at: row.granted_at, - }); - } - - Ok(SpaceGrantsListResponse { grants }) - } -} +mod list; +mod revoke; +mod rows; +mod upsert; diff --git a/packages/elf-service/src/sharing/grants/list.rs b/packages/elf-service/src/sharing/grants/list.rs new file mode 100644 index 00000000..cddfa0c4 --- /dev/null +++ b/packages/elf-service/src/sharing/grants/list.rs @@ -0,0 +1,83 @@ +use crate::{ + ElfService, Error, Result, + access::ORG_PROJECT_ID, + sharing::{ + grants::rows::SpaceGrantRow, + types::{ + GranteeKind, ShareScope, SpaceGrantItem, SpaceGrantsListRequest, + SpaceGrantsListResponse, + }, + }, +}; + +impl ElfService { + /// Lists active grants for a shared scope. + pub async fn space_grants_list( + &self, + req: SpaceGrantsListRequest, + ) -> Result { + let tenant_id = req.tenant_id.trim(); + let project_id = req.project_id.trim(); + let agent_id = req.agent_id.trim(); + + if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { + return Err(Error::InvalidRequest { + message: "tenant_id, project_id, and agent_id are required.".to_string(), + }); + } + + let scope = req.scope.as_str(); + let scope_allowed = match scope { + "project_shared" => self.cfg.scopes.write_allowed.project_shared, + "org_shared" => self.cfg.scopes.write_allowed.org_shared, + _ => false, + }; + + if !scope_allowed { + return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); + } + + let effective_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; + let rows = sqlx::query_as::<_, SpaceGrantRow>( + "\ +SELECT scope, grantee_kind, grantee_agent_id, granted_by_agent_id, granted_at +FROM memory_space_grants +WHERE tenant_id = $1 + AND project_id = $2 + AND space_owner_agent_id = $3 + AND scope = $4 + AND revoked_at IS NULL +ORDER BY granted_at DESC", + ) + .bind(tenant_id) + .bind(effective_project_id) + .bind(agent_id) + .bind(scope) + .fetch_all(&self.db.pool) + .await?; + let mut grants = Vec::with_capacity(rows.len()); + + for row in rows { + let grantee_kind = match row.grantee_kind.as_str() { + "agent" => GranteeKind::Agent, + "project" => GranteeKind::Project, + _ => continue, + }; + let scope = match row.scope.as_str() { + "project_shared" => ShareScope::ProjectShared, + "org_shared" => ShareScope::OrgShared, + _ => continue, + }; + + grants.push(SpaceGrantItem { + scope, + grantee_kind, + grantee_agent_id: row.grantee_agent_id, + granted_by_agent_id: row.granted_by_agent_id, + granted_at: row.granted_at, + }); + } + + Ok(SpaceGrantsListResponse { grants }) + } +} diff --git a/packages/elf-service/src/sharing/grants/revoke.rs b/packages/elf-service/src/sharing/grants/revoke.rs new file mode 100644 index 00000000..a30286c9 --- /dev/null +++ b/packages/elf-service/src/sharing/grants/revoke.rs @@ -0,0 +1,88 @@ +use time::OffsetDateTime; + +use crate::{ + ElfService, Error, Result, + access::ORG_PROJECT_ID, + sharing::types::{GranteeKind, SpaceGrantRevokeRequest, SpaceGrantRevokeResponse}, +}; + +impl ElfService { + /// Revokes a shared-scope grant. + pub async fn space_grant_revoke( + &self, + req: SpaceGrantRevokeRequest, + ) -> Result { + let tenant_id = req.tenant_id.trim(); + let project_id = req.project_id.trim(); + let agent_id = req.agent_id.trim(); + + if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { + return Err(Error::InvalidRequest { + message: "tenant_id, project_id, and agent_id are required.".to_string(), + }); + } + + let scope = req.scope.as_str(); + let grantee_agent_id = req + .grantee_agent_id + .as_deref() + .map(|value| value.trim()) + .filter(|value| !value.is_empty()); + + if req.grantee_kind == GranteeKind::Agent && grantee_agent_id.is_none() { + return Err(Error::InvalidRequest { + message: "grantee_agent_id is required for agent grantee_kind.".to_string(), + }); + } + if req.grantee_kind == GranteeKind::Project && grantee_agent_id.is_some() { + return Err(Error::InvalidRequest { + message: "grantee_agent_id must be empty for project grantee_kind.".to_string(), + }); + } + + let scope_allowed = match scope { + "project_shared" => self.cfg.scopes.write_allowed.project_shared, + "org_shared" => self.cfg.scopes.write_allowed.org_shared, + _ => false, + }; + + if !scope_allowed { + return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); + } + + let effective_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; + let revocation = sqlx::query( + "\ +UPDATE memory_space_grants +SET revoked_at = $7, + revoked_by_agent_id = $8 +WHERE tenant_id = $1 + AND project_id = $2 + AND scope = $3 + AND space_owner_agent_id = $4 + AND grantee_kind = $5 + AND ((grantee_kind = 'project' AND grantee_agent_id IS NULL) + OR (grantee_kind = 'agent' AND grantee_agent_id = $6)) + AND revoked_at IS NULL", + ) + .bind(tenant_id) + .bind(effective_project_id) + .bind(scope) + .bind(agent_id) + .bind(match req.grantee_kind { + GranteeKind::Project => "project", + GranteeKind::Agent => "agent", + }) + .bind(grantee_agent_id) + .bind(OffsetDateTime::now_utc()) + .bind(agent_id) + .execute(&self.db.pool) + .await?; + + if revocation.rows_affected() == 0 { + return Err(Error::InvalidRequest { message: "No active grant found.".to_string() }); + } + + Ok(SpaceGrantRevokeResponse { revoked: true }) + } +} diff --git a/packages/elf-service/src/sharing/grants/rows.rs b/packages/elf-service/src/sharing/grants/rows.rs new file mode 100644 index 00000000..4f61d711 --- /dev/null +++ b/packages/elf-service/src/sharing/grants/rows.rs @@ -0,0 +1,11 @@ +use sqlx::FromRow; +use time::OffsetDateTime; + +#[derive(FromRow)] +pub(in crate::sharing) struct SpaceGrantRow { + pub(in crate::sharing) scope: String, + pub(in crate::sharing) grantee_kind: String, + pub(in crate::sharing) grantee_agent_id: Option, + pub(in crate::sharing) granted_by_agent_id: String, + pub(in crate::sharing) granted_at: OffsetDateTime, +} diff --git a/packages/elf-service/src/sharing/grants/upsert.rs b/packages/elf-service/src/sharing/grants/upsert.rs new file mode 100644 index 00000000..b2bb4f1c --- /dev/null +++ b/packages/elf-service/src/sharing/grants/upsert.rs @@ -0,0 +1,134 @@ +use uuid::Uuid; + +use crate::{ + ElfService, Error, Result, + access::ORG_PROJECT_ID, + sharing::{ + sql::{AGENT_SPACE_GRANT_UPSERT_SQL, PROJECT_SPACE_GRANT_UPSERT_SQL}, + types::{GranteeKind, SpaceGrantUpsertRequest, SpaceGrantUpsertResponse}, + }, +}; + +impl ElfService { + /// Creates or reactivates a shared-scope grant. + pub async fn space_grant_upsert( + &self, + req: SpaceGrantUpsertRequest, + ) -> Result { + let tenant_id = req.tenant_id.trim(); + let project_id = req.project_id.trim(); + let agent_id = req.agent_id.trim(); + + if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { + return Err(Error::InvalidRequest { + message: "tenant_id, project_id, and agent_id are required.".to_string(), + }); + } + + let scope = req.scope.as_str(); + let scope_allowed = match scope { + "project_shared" => self.cfg.scopes.write_allowed.project_shared, + "org_shared" => self.cfg.scopes.write_allowed.org_shared, + _ => false, + }; + + if !scope_allowed { + return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); + } + if req.grantee_kind == GranteeKind::Agent + && req.grantee_agent_id.as_ref().is_none_or(|id| id.trim().is_empty()) + { + return Err(Error::InvalidRequest { + message: "grantee_agent_id is required for agent grantee_kind.".to_string(), + }); + } + + let grantee_agent_id = req + .grantee_agent_id + .as_ref() + .map(|value| value.trim()) + .filter(|value| !value.is_empty()) + .map(ToString::to_string); + + if req.grantee_kind == GranteeKind::Project && grantee_agent_id.is_some() { + return Err(Error::InvalidRequest { + message: "grantee_agent_id must be empty for project grantee_kind.".to_string(), + }); + } + + let grantee_agent_id_ref = grantee_agent_id.as_deref(); + let now = time::OffsetDateTime::now_utc(); + let effective_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; + + if req.grantee_kind == GranteeKind::Project { + self.upsert_project_grant(tenant_id, effective_project_id, scope, agent_id, now) + .await?; + } else { + self.upsert_agent_grant( + tenant_id, + effective_project_id, + scope, + agent_id, + grantee_agent_id_ref, + now, + ) + .await?; + } + + Ok(SpaceGrantUpsertResponse { + scope: scope.to_string(), + grantee_kind: req.grantee_kind, + grantee_agent_id, + granted: true, + }) + } + + async fn upsert_project_grant( + &self, + tenant_id: &str, + project_id: &str, + scope: &str, + agent_id: &str, + now: time::OffsetDateTime, + ) -> Result<()> { + sqlx::query(PROJECT_SPACE_GRANT_UPSERT_SQL) + .bind(Uuid::new_v4()) + .bind(tenant_id) + .bind(project_id) + .bind(scope) + .bind(agent_id) + .bind("project") + .bind::>(None) + .bind(agent_id) + .bind(now) + .execute(&self.db.pool) + .await?; + + Ok(()) + } + + async fn upsert_agent_grant( + &self, + tenant_id: &str, + project_id: &str, + scope: &str, + agent_id: &str, + grantee_agent_id: Option<&str>, + now: time::OffsetDateTime, + ) -> Result<()> { + sqlx::query(AGENT_SPACE_GRANT_UPSERT_SQL) + .bind(Uuid::new_v4()) + .bind(tenant_id) + .bind(project_id) + .bind(scope) + .bind(agent_id) + .bind("agent") + .bind(grantee_agent_id) + .bind(agent_id) + .bind(now) + .execute(&self.db.pool) + .await?; + + Ok(()) + } +} diff --git a/packages/elf-service/src/sharing/publish.rs b/packages/elf-service/src/sharing/publish.rs index 80ee853c..1d567df4 100644 --- a/packages/elf-service/src/sharing/publish.rs +++ b/packages/elf-service/src/sharing/publish.rs @@ -1,201 +1,2 @@ -use time::OffsetDateTime; - -use crate::{ - ElfService, Error, InsertVersionArgs, Result, - access::{self, ORG_PROJECT_ID}, - sharing::types::{ - PublishNoteRequest, PublishNoteResponse, UnpublishNoteRequest, UnpublishNoteResponse, - }, -}; -use elf_storage::models::MemoryNote; - -impl ElfService { - /// Publishes an owned note into a shared scope. - pub async fn publish_note(&self, req: PublishNoteRequest) -> Result { - let tenant_id = req.tenant_id.trim(); - let project_id = req.project_id.trim(); - let agent_id = req.agent_id.trim(); - - if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { - return Err(Error::InvalidRequest { - message: "tenant_id, project_id, and agent_id are required.".to_string(), - }); - } - - let mut tx = self.db.pool.begin().await?; - let mut note: MemoryNote = sqlx::query_as::<_, MemoryNote>( - "\ -SELECT * -FROM memory_notes -WHERE note_id = $1 - AND tenant_id = $2 - AND project_id IN ($3, $4) -FOR UPDATE", - ) - .bind(req.note_id) - .bind(tenant_id) - .bind(project_id) - .bind(ORG_PROJECT_ID) - .fetch_optional(&mut *tx) - .await? - .ok_or_else(|| Error::InvalidRequest { message: "Note not found.".to_string() })?; - - if note.agent_id != agent_id { - return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); - } - if note.status != "active" { - return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); - } - if note.expires_at.map(|ts| ts <= OffsetDateTime::now_utc()).unwrap_or(false) { - return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); - } - - let scope = req.scope.as_str(); - let scope_allowed = match scope { - "project_shared" => self.cfg.scopes.write_allowed.project_shared, - "org_shared" => self.cfg.scopes.write_allowed.org_shared, - _ => false, - }; - - if !scope_allowed { - return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); - } - - let target_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; - - access::ensure_active_project_scope_grant( - &mut *tx, - tenant_id, - target_project_id, - scope, - agent_id, - ) - .await?; - - if note.scope == scope && note.project_id == target_project_id { - return Ok(PublishNoteResponse { note_id: note.note_id, scope: note.scope }); - } - - let now = OffsetDateTime::now_utc(); - let prev_snapshot = crate::note_snapshot(¬e); - - note.scope = scope.to_string(); - note.project_id = target_project_id.to_string(); - note.updated_at = now; - - crate::insert_version( - &mut *tx, - InsertVersionArgs { - note_id: note.note_id, - op: "PUBLISH", - prev_snapshot: Some(prev_snapshot), - new_snapshot: Some(crate::note_snapshot(¬e)), - reason: "publish_note", - actor: agent_id, - ts: now, - }, - ) - .await?; - sqlx::query( - "UPDATE memory_notes SET scope = $1, project_id = $2, updated_at = $3 WHERE note_id = $4", - ) - .bind(scope) - .bind(note.project_id.as_str()) - .bind(now) - .bind(note.note_id) - .execute(&mut *tx) - .await?; - crate::enqueue_outbox_tx(&mut *tx, note.note_id, "UPSERT", ¬e.embedding_version, now) - .await?; - - tx.commit().await?; - - Ok(PublishNoteResponse { note_id: note.note_id, scope: note.scope }) - } - - /// Returns a previously published note to its non-shared scope. - pub async fn unpublish_note(&self, req: UnpublishNoteRequest) -> Result { - let tenant_id = req.tenant_id.trim(); - let project_id = req.project_id.trim(); - let agent_id = req.agent_id.trim(); - - if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { - return Err(Error::InvalidRequest { - message: "tenant_id, project_id, and agent_id are required.".to_string(), - }); - } - - let mut tx = self.db.pool.begin().await?; - let mut note: MemoryNote = sqlx::query_as::<_, MemoryNote>( - "\ -SELECT * -FROM memory_notes -WHERE note_id = $1 - AND tenant_id = $2 - AND project_id IN ($3, $4) -FOR UPDATE", - ) - .bind(req.note_id) - .bind(tenant_id) - .bind(project_id) - .bind(ORG_PROJECT_ID) - .fetch_optional(&mut *tx) - .await? - .ok_or_else(|| Error::InvalidRequest { message: "Note not found.".to_string() })?; - - if note.agent_id != agent_id { - return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); - } - if note.status != "active" { - return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); - } - if note.expires_at.map(|ts| ts <= OffsetDateTime::now_utc()).unwrap_or(false) { - return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); - } - if !self.cfg.scopes.write_allowed.agent_private { - return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); - } - if note.scope == "agent_private" { - return Ok(UnpublishNoteResponse { note_id: note.note_id, scope: note.scope }); - } - - let now = OffsetDateTime::now_utc(); - let prev_snapshot = crate::note_snapshot(¬e); - - if note.scope == "org_shared" && note.project_id == ORG_PROJECT_ID { - note.project_id = project_id.to_string(); - } - - note.scope = "agent_private".to_string(); - note.updated_at = now; - - crate::insert_version( - &mut *tx, - InsertVersionArgs { - note_id: note.note_id, - op: "UNPUBLISH", - prev_snapshot: Some(prev_snapshot), - new_snapshot: Some(crate::note_snapshot(¬e)), - reason: "unpublish_note", - actor: agent_id, - ts: now, - }, - ) - .await?; - sqlx::query( - "UPDATE memory_notes SET scope = $1, project_id = $2, updated_at = $3 WHERE note_id = $4", - ) - .bind(note.scope.as_str()) - .bind(note.project_id.as_str()) - .bind(now) - .bind(note.note_id) - .execute(&mut *tx) - .await?; - crate::enqueue_outbox_tx(&mut *tx, note.note_id, "UPSERT", ¬e.embedding_version, now) - .await?; - - tx.commit().await?; - - Ok(UnpublishNoteResponse { note_id: note.note_id, scope: note.scope }) - } -} +mod note; +mod unpublish; diff --git a/packages/elf-service/src/sharing/publish/note.rs b/packages/elf-service/src/sharing/publish/note.rs new file mode 100644 index 00000000..e4379f90 --- /dev/null +++ b/packages/elf-service/src/sharing/publish/note.rs @@ -0,0 +1,113 @@ +use time::OffsetDateTime; + +use crate::{ + ElfService, Error, InsertVersionArgs, Result, + access::{self, ORG_PROJECT_ID}, + sharing::types::{PublishNoteRequest, PublishNoteResponse}, +}; +use elf_storage::models::MemoryNote; + +impl ElfService { + /// Publishes an owned note into a shared scope. + pub async fn publish_note(&self, req: PublishNoteRequest) -> Result { + let tenant_id = req.tenant_id.trim(); + let project_id = req.project_id.trim(); + let agent_id = req.agent_id.trim(); + + if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { + return Err(Error::InvalidRequest { + message: "tenant_id, project_id, and agent_id are required.".to_string(), + }); + } + + let mut tx = self.db.pool.begin().await?; + let mut note: MemoryNote = sqlx::query_as::<_, MemoryNote>( + "\ +SELECT * +FROM memory_notes +WHERE note_id = $1 + AND tenant_id = $2 + AND project_id IN ($3, $4) +FOR UPDATE", + ) + .bind(req.note_id) + .bind(tenant_id) + .bind(project_id) + .bind(ORG_PROJECT_ID) + .fetch_optional(&mut *tx) + .await? + .ok_or_else(|| Error::InvalidRequest { message: "Note not found.".to_string() })?; + + if note.agent_id != agent_id { + return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); + } + if note.status != "active" { + return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); + } + if note.expires_at.map(|ts| ts <= OffsetDateTime::now_utc()).unwrap_or(false) { + return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); + } + + let scope = req.scope.as_str(); + let scope_allowed = match scope { + "project_shared" => self.cfg.scopes.write_allowed.project_shared, + "org_shared" => self.cfg.scopes.write_allowed.org_shared, + _ => false, + }; + + if !scope_allowed { + return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); + } + + let target_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; + + access::ensure_active_project_scope_grant( + &mut *tx, + tenant_id, + target_project_id, + scope, + agent_id, + ) + .await?; + + if note.scope == scope && note.project_id == target_project_id { + return Ok(PublishNoteResponse { note_id: note.note_id, scope: note.scope }); + } + + let now = OffsetDateTime::now_utc(); + let prev_snapshot = crate::note_snapshot(¬e); + + note.scope = scope.to_string(); + note.project_id = target_project_id.to_string(); + note.updated_at = now; + + crate::insert_version( + &mut *tx, + InsertVersionArgs { + note_id: note.note_id, + op: "PUBLISH", + prev_snapshot: Some(prev_snapshot), + new_snapshot: Some(crate::note_snapshot(¬e)), + reason: "publish_note", + actor: agent_id, + ts: now, + }, + ) + .await?; + sqlx::query( + "UPDATE memory_notes SET scope = $1, project_id = $2, updated_at = $3 WHERE note_id = $4", + ) + .bind(scope) + .bind(note.project_id.as_str()) + .bind(now) + .bind(note.note_id) + .execute(&mut *tx) + .await?; + crate::enqueue_outbox_tx(&mut *tx, note.note_id, "UPSERT", ¬e.embedding_version, now) + .await?; + + tx.commit().await?; + + Ok(PublishNoteResponse { note_id: note.note_id, scope: note.scope }) + } +} diff --git a/packages/elf-service/src/sharing/publish/unpublish.rs b/packages/elf-service/src/sharing/publish/unpublish.rs new file mode 100644 index 00000000..3a8a91b1 --- /dev/null +++ b/packages/elf-service/src/sharing/publish/unpublish.rs @@ -0,0 +1,96 @@ +use time::OffsetDateTime; + +use crate::{ + ElfService, Error, InsertVersionArgs, Result, + access::ORG_PROJECT_ID, + sharing::types::{UnpublishNoteRequest, UnpublishNoteResponse}, +}; +use elf_storage::models::MemoryNote; + +impl ElfService { + /// Returns a previously published note to its non-shared scope. + pub async fn unpublish_note(&self, req: UnpublishNoteRequest) -> Result { + let tenant_id = req.tenant_id.trim(); + let project_id = req.project_id.trim(); + let agent_id = req.agent_id.trim(); + + if tenant_id.is_empty() || project_id.is_empty() || agent_id.is_empty() { + return Err(Error::InvalidRequest { + message: "tenant_id, project_id, and agent_id are required.".to_string(), + }); + } + + let mut tx = self.db.pool.begin().await?; + let mut note: MemoryNote = sqlx::query_as::<_, MemoryNote>( + "\ +SELECT * +FROM memory_notes +WHERE note_id = $1 + AND tenant_id = $2 + AND project_id IN ($3, $4) +FOR UPDATE", + ) + .bind(req.note_id) + .bind(tenant_id) + .bind(project_id) + .bind(ORG_PROJECT_ID) + .fetch_optional(&mut *tx) + .await? + .ok_or_else(|| Error::InvalidRequest { message: "Note not found.".to_string() })?; + + if note.agent_id != agent_id { + return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); + } + if note.status != "active" { + return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); + } + if note.expires_at.map(|ts| ts <= OffsetDateTime::now_utc()).unwrap_or(false) { + return Err(Error::InvalidRequest { message: "Note not found.".to_string() }); + } + if !self.cfg.scopes.write_allowed.agent_private { + return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); + } + if note.scope == "agent_private" { + return Ok(UnpublishNoteResponse { note_id: note.note_id, scope: note.scope }); + } + + let now = OffsetDateTime::now_utc(); + let prev_snapshot = crate::note_snapshot(¬e); + + if note.scope == "org_shared" && note.project_id == ORG_PROJECT_ID { + note.project_id = project_id.to_string(); + } + + note.scope = "agent_private".to_string(); + note.updated_at = now; + + crate::insert_version( + &mut *tx, + InsertVersionArgs { + note_id: note.note_id, + op: "UNPUBLISH", + prev_snapshot: Some(prev_snapshot), + new_snapshot: Some(crate::note_snapshot(¬e)), + reason: "unpublish_note", + actor: agent_id, + ts: now, + }, + ) + .await?; + sqlx::query( + "UPDATE memory_notes SET scope = $1, project_id = $2, updated_at = $3 WHERE note_id = $4", + ) + .bind(note.scope.as_str()) + .bind(note.project_id.as_str()) + .bind(now) + .bind(note.note_id) + .execute(&mut *tx) + .await?; + crate::enqueue_outbox_tx(&mut *tx, note.note_id, "UPSERT", ¬e.embedding_version, now) + .await?; + + tx.commit().await?; + + Ok(UnpublishNoteResponse { note_id: note.note_id, scope: note.scope }) + } +}