Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The Knowledge Mapper is a Python SDK for connecting your applications to the [TNO Knowledge Engine (TKE)](https://docs.knowledge-engine.eu/) network. Define knowledge interactions with decorators, use typed binding models, and let the SDK handle registration, polling, and data exchange with the network.

![architecture diagram](./docs/img/architecture.png)
The best way to learn to work with the Knowledge Mapper is to check out the [examples](/examples/). Users that are familiar with Python's [FastAPI](https://fastapi.tiangolo.com/) will recognize many concepts from that package.

## Quick Start

Expand Down
467 changes: 0 additions & 467 deletions docs/img/architecture.graphml

This file was deleted.

Binary file removed docs/img/architecture.png
Binary file not shown.
2 changes: 2 additions & 0 deletions src/knowledge_mapper/kb/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def __init__(self, settings: KnowledgeBaseSettings) -> None:
name=settings.knowledge_base.name,
description=settings.knowledge_base.description,
ke_url=settings.knowledge_engine_endpoint,
lease_renewal_time=settings.knowledge_base.lease_renewal_time,
reasoner_level=settings.knowledge_base.reasoner_level,
)
self._unhandled_incoming: set[str] = {
ki.name
Expand Down
89 changes: 87 additions & 2 deletions src/knowledge_mapper/kb/knowledge_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
KnowledgeBaseInfo,
KnowledgeInteractionInfo,
PostReactInteractionInfo,
SmartConnectorLease,
)
from ..knowledge_interaction import (
Handler,
Expand All @@ -44,7 +45,15 @@ class KnowledgeBase:
Starts in unregistered state.
"""

def __init__(self, id: str, name: str, description: str, ke_url: str):
def __init__(
self,
id: str,
name: str,
description: str,
ke_url: str,
lease_renewal_time: int | None = None,
reasoner_level: int | None = None,
):
self.state = KnowledgeBaseState.UNREGISTERED
self.ki_registry: dict[str, KnowledgeInteractionContext[Any, ...]] = {}
self._ki_registry_by_id: dict[str, KnowledgeInteractionContext[Any, ...]] = {}
Expand All @@ -53,6 +62,8 @@ def __init__(self, id: str, name: str, description: str, ke_url: str):
id=id,
name=name,
description=description,
lease_renewal_time=lease_renewal_time,
reasoner_level=reasoner_level,
)
self.dependency_overrides: dict[Callable[..., Any], Callable[..., Any]] = {}

Expand Down Expand Up @@ -226,7 +237,7 @@ def wrapper(
*args,
**kwargs,
) -> BindingSet | Sequence[BindingModel]:
return func(binding_set, info, *args, **kwargs)
return func(binding_set, info, *args, **kwargs) # pyright: ignore[reportReturnType]

self._register_ki_locally(
KnowledgeInteractionContext(
Expand Down Expand Up @@ -268,6 +279,80 @@ async def sync_knowledge_interactions(self) -> None:
self._ki_registry_by_id[ki_ctx.info.id] = ki_ctx
return

async def unregister_ki(self, ki_name: str) -> None:
"""Unregister a single knowledge interaction by name from this KB at the KE
runtime, and remove it from this object's local registry.

Raises:
ValueError: If the KB is not registered, or if ``ki_name`` is unknown, or
if the KI is not currently registered at the KE runtime.
SmartConnectorNotFoundError: If the KB's smart connector or the KI is not
found in the KE runtime.
UnexpectedHttpResponseError: If the KE runtime returns an unexpected HTTP
response.
"""
if self.state != KnowledgeBaseState.REGISTERED:
raise ValueError(
f"Cannot unregister KI '{ki_name}' because the KB is not registered."
)
if ki_name not in self.ki_registry:
raise ValueError(
f"Cannot unregister KI '{ki_name}': no KI with that name is "
f"registered for this KB."
)
ki_ctx = self.ki_registry[ki_name]
if (
ki_ctx.status != KnowledgeInteractionStatus.REGISTERED
or ki_ctx.info.id is None
):
raise ValueError(
f"Cannot unregister KI '{ki_name}' because it is not currently "
f"registered at the KE runtime."
)

logger.info("Unregistering KI '%s' (%s).", ki_name, ki_ctx.info.id)
await self.client.unregister_ki(kb_id=self.info.id, ki_id=ki_ctx.info.id)
self._ki_registry_by_id.pop(ki_ctx.info.id, None)
self.ki_registry.pop(ki_name, None)

async def renew_lease(self) -> SmartConnectorLease:
"""Renew this KB's smart connector lease at the KE runtime and return the new
lease.

Raises:
ValueError: If the KB is not registered.
SmartConnectorNotFoundError: If the KB's smart connector is not found in
the KE runtime, or it does not have a lease.
UnexpectedHttpResponseError: If the KE runtime returns an unexpected HTTP
response.
"""
if self.state != KnowledgeBaseState.REGISTERED:
raise ValueError("Cannot renew lease because the KB is not registered.")
logger.debug("Renewing lease for KB '%s'.", self.info.id)
return await self.client.renew_lease(kb_id=self.info.id)

async def load_domain_knowledge(self, knowledge: str) -> None:
"""Load domain knowledge (Apache Jena facts/rules) into this KB's smart
connector. Replaces any previously loaded domain knowledge.

Args:
knowledge: The domain knowledge (facts and rules) as plain text in the
Apache Jena Rules syntax.

Raises:
ValueError: If the KB is not registered.
SmartConnectorNotFoundError: If the KB's smart connector is not found in
the KE runtime.
UnexpectedHttpResponseError: If the KE runtime returns an unexpected HTTP
response.
"""
if self.state != KnowledgeBaseState.REGISTERED:
raise ValueError(
"Cannot load domain knowledge because the KB is not registered."
)
logger.debug("Loading domain knowledge for KB '%s'.", self.info.id)
await self.client.load_domain_knowledge(kb_id=self.info.id, knowledge=knowledge)

def ki_from_info(
self,
info: KnowledgeInteractionInfo,
Expand Down
92 changes: 91 additions & 1 deletion src/knowledge_mapper/ke/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
KnowledgeInteractionInfo,
PostReactInteractionInfo,
PostResult,
SmartConnectorLease,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -122,6 +123,48 @@ async def register_ki(
"""
...

async def unregister_ki(self, kb_id: str, ki_id: str) -> None:
"""Unregister a single knowledge interaction with the given ID from the KB
with the given ID.

Raises:
SmartConnectorNotFoundError: If no smart connector exists for the given KB
ID, or no knowledge interaction with the given ID exists for that KB.
UnexpectedHttpResponseError: If the KE runtime returns an unexpected HTTP
response.
"""
...

async def renew_lease(self, kb_id: str) -> SmartConnectorLease:
"""Renew the lease of the smart connector for the given KB and return the
new lease.

Raises:
SmartConnectorNotFoundError: If no smart connector exists for the given KB
ID, or it does not have a lease.
UnexpectedHttpResponseError: If the KE runtime returns an unexpected HTTP
response.
"""
...

async def load_domain_knowledge(self, kb_id: str, knowledge: str) -> None:
"""Load domain knowledge (Apache Jena facts/rules) into the smart connector
for the given KB. Replaces any previously loaded domain knowledge.

Args:
kb_id: The ID of the KB whose smart connector should be loaded with the
given domain knowledge.
knowledge: The domain knowledge (both facts and rules) as plain text in
the Apache Jena Rules syntax.

Raises:
SmartConnectorNotFoundError: If no smart connector exists for the given KB
ID.
UnexpectedHttpResponseError: If the KE runtime returns an unexpected HTTP
response.
"""
...

async def poll_ki_call(self, kb_id: str) -> tuple[PollResult, HandleRequest | None]:
"""Poll the KE runtime for an incoming KI call for the given KB.

Expand Down Expand Up @@ -247,7 +290,7 @@ async def register_kb(
logger.debug("Registering knowledge base '%s' at %s.", info.id, self.ke_url)
response = await self._http.post(
f"{self.ke_url}/sc",
json=info.model_dump(by_alias=True),
json=info.model_dump(by_alias=True, exclude_none=True),
)
if not response.is_success:
raise UnexpectedHttpResponseError(response)
Expand Down Expand Up @@ -309,6 +352,53 @@ async def register_ki(
)
return registered_ki

async def unregister_ki(self, kb_id: str, ki_id: str) -> None:
logger.debug(
"Unregistering knowledge interaction '%s' for KB '%s' at %s.",
ki_id,
kb_id,
self.ke_url,
)
response = await self._http.delete(
f"{self.ke_url}/sc/ki",
headers={
"Knowledge-Base-Id": kb_id,
"Knowledge-Interaction-Id": ki_id,
},
)
if response.status_code == 404:
raise SmartConnectorNotFoundError(kb_id, self.ke_url)
if not response.is_success:
raise UnexpectedHttpResponseError(response)

async def renew_lease(self, kb_id: str) -> SmartConnectorLease:
logger.debug("Renewing lease for KB '%s' at %s.", kb_id, self.ke_url)
response = await self._http.put(
f"{self.ke_url}/sc/lease/renew",
headers={"Knowledge-Base-Id": kb_id},
)
if response.status_code == 404:
raise SmartConnectorNotFoundError(kb_id, self.ke_url)
if not response.is_success:
raise UnexpectedHttpResponseError(response)

return SmartConnectorLease.model_validate(response.json())

async def load_domain_knowledge(self, kb_id: str, knowledge: str) -> None:
logger.debug("Loading domain knowledge for KB '%s' at %s.", kb_id, self.ke_url)
response = await self._http.post(
f"{self.ke_url}/sc/knowledge",
content=knowledge.encode("utf-8"),
headers={
"Knowledge-Base-Id": kb_id,
"Content-Type": "text/plain; charset=UTF-8",
},
)
if response.status_code == 404:
raise SmartConnectorNotFoundError(kb_id, self.ke_url)
if not response.is_success:
raise UnexpectedHttpResponseError(response)

async def poll_ki_call(self, kb_id: str) -> tuple[PollResult, HandleRequest | None]:
logger.debug("Polling for KI calls...")
response = await self._http.get(
Expand Down
15 changes: 15 additions & 0 deletions src/knowledge_mapper/ke/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ class KnowledgeBaseInfo(BaseModel):
id: Annotated[str, Field(..., alias="knowledgeBaseId")]
name: Annotated[str, Field(..., alias="knowledgeBaseName")]
description: Annotated[str, Field(..., alias="knowledgeBaseDescription")]
lease_renewal_time: Annotated[
int | None, Field(default=None, alias="leaseRenewalTime", ge=30, le=3600)
] = None
reasoner_level: Annotated[
int | None, Field(default=None, alias="reasonerLevel", ge=1, le=5)
] = None


class SmartConnectorLease(BaseModel):
model_config = ConfigDict(
alias_generator=to_camel, frozen=True, populate_by_name=True
)

knowledge_base_id: str
expires: datetime


class KiTypes(StrEnum):
Expand Down
45 changes: 44 additions & 1 deletion src/knowledge_mapper/testing/fake_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""In-memory FakeClient that satisfies ClientProtocol for use in tests."""

import asyncio
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta

from knowledge_mapper.ke.client import ClientProtocol, HandleRequest, PollResult
from knowledge_mapper.ke.errors import SmartConnectorNotFoundError
from knowledge_mapper.ke.models import (
AskResult,
BindingSet,
Expand All @@ -12,6 +13,7 @@
KnowledgeBaseInfo,
KnowledgeInteractionInfo,
PostResult,
SmartConnectorLease,
)


Expand All @@ -31,6 +33,10 @@ def __init__(self, fake_url) -> None:
asyncio.Queue()
)
self._next_handle_request_id: int = 1
# Maps kb_id -> the most recently loaded domain knowledge string.
self._domain_knowledge: dict[str, str] = {}
# Maps kb_id -> number of times its lease has been renewed.
self._lease_renewals: dict[str, int] = {}

async def ke_is_available(self) -> bool:
return True
Expand Down Expand Up @@ -72,6 +78,43 @@ async def register_ki(
self._knowledge_interactions.setdefault(kb_id, []).append(registered)
return registered

async def unregister_ki(self, kb_id: str, ki_id: str) -> None:
if kb_id not in self._knowledge_bases:
raise SmartConnectorNotFoundError(kb_id, self._ke_url)
kis = self._knowledge_interactions.get(kb_id, [])
for i, ki in enumerate(kis):
if ki.id == ki_id:
kis.pop(i)
return
raise SmartConnectorNotFoundError(kb_id, self._ke_url)

async def renew_lease(self, kb_id: str) -> SmartConnectorLease:
if kb_id not in self._knowledge_bases:
raise SmartConnectorNotFoundError(kb_id, self._ke_url)
self._lease_renewals[kb_id] = self._lease_renewals.get(kb_id, 0) + 1
info = self._knowledge_bases[kb_id]
# Default to 60 seconds if leaseRenewalTime is unset.
renewal_seconds = info.lease_renewal_time or 60
return SmartConnectorLease(
knowledge_base_id=kb_id,
expires=datetime.now(tz=UTC) + timedelta(seconds=renewal_seconds),
)

async def load_domain_knowledge(self, kb_id: str, knowledge: str) -> None:
if kb_id not in self._knowledge_bases:
raise SmartConnectorNotFoundError(kb_id, self._ke_url)
self._domain_knowledge[kb_id] = knowledge

@property
def loaded_domain_knowledge(self) -> dict[str, str]:
"""Return the most-recently-loaded domain knowledge per KB id (for tests)."""
return dict(self._domain_knowledge)

@property
def lease_renewals(self) -> dict[str, int]:
"""Return the number of lease renewals per KB id (for tests)."""
return dict(self._lease_renewals)

async def poll_ki_call(self, kb_id: str) -> tuple[PollResult, HandleRequest | None]:
return await self._incoming_calls.get()

Expand Down
Loading
Loading