Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test_and_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ jobs:
COGNITE_CLIENT_SECRET: ${{ secrets.COGNITE_PROJECT_CLIENT_SECRET }}
COGNITE_TOKEN_SCOPES: ${{ secrets.COGNITE_PROJECT_SCOPES }}
COGNITE_TOKEN_URL: ${{ secrets.COGNITE_PROJECT_TOKEN_URL }}
COGNITE_PROJECT_AUTHORITY_URL: ${{ secrets.COGNITE_PROJECT_AUTHORITY_URL }}
CERTIFICATE_AUTH_PEM: ${{ secrets.CERTIFICATE_AUTH_PEM }}
KEYVAULT_CLIENT_ID: ${{ secrets.KEYVAULT_CLIENT_ID }}
KEYVAULT_TENANT_ID: ${{ secrets.KEYVAULT_TENANT_ID }}
KEYVAULT_CLIENT_SECRET: ${{ secrets.KEYVAULT_CLIENT_SECRET }}
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## 7.12.4

### Fixed
* In the `unstabe` and configtools: Fixed the decoding of the `thumbprint` and `key` from bytes during certificate authentication towards CDF.

## 7.12.3

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "7.12.3"
__version__ = "7.12.4"
from .base import Extractor

__all__ = ["Extractor"]
2 changes: 1 addition & 1 deletion cognite/extractorutils/configtools/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def translate_camel(key: str) -> str:
raise ValueError(f"Invalid case style: {case_style}")


def _load_certificate_data(cert_path: str | Path, password: str | None) -> tuple[str, str] | tuple[bytes, bytes]:
def _load_certificate_data(cert_path: str | Path, password: str | None) -> tuple[bytes, bytes]:
path = Path(cert_path) if isinstance(cert_path, str) else cert_path
cert_data = Path(path).read_bytes()

Expand Down
4 changes: 2 additions & 2 deletions cognite/extractorutils/configtools/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ def get_cognite_client(
credential_provider = OAuthClientCertificate(
authority_url=authority_url,
client_id=self.idp_authentication.client_id,
cert_thumbprint=str(thumprint),
certificate=str(key),
cert_thumbprint=str(thumprint, "utf-8"),
Comment thread
Yaseen-A-Khan marked this conversation as resolved.
certificate=str(key, "utf-8"),
Comment on lines +447 to +448

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using str(bytes, "utf-8") is unidiomatic in Python. It is highly recommended to use .decode("utf-8") instead, which is the standard and more readable way to convert bytes to a string.

Suggested change
cert_thumbprint=str(thumprint, "utf-8"),
certificate=str(key, "utf-8"),
cert_thumbprint=thumprint.decode("utf-8"),
certificate=key.decode("utf-8"),

@Yaseen-A-Khan Yaseen-A-Khan Jun 29, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the functionality remains same for key.decode("utf-8") and str(key, "utf-8"), thinking str seemed to convey that it converts to a string better, i retained it.

scopes=self.idp_authentication.scopes,
)

Expand Down
4 changes: 2 additions & 2 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ def get_cognite_client(self, client_name: str) -> CogniteClient:
credential_provider = OAuthClientCertificate(
authority_url=client_certificate.authority_url,
client_id=client_certificate.client_id,
cert_thumbprint=str(thumbprint),
certificate=str(key),
cert_thumbprint=str(thumbprint, "utf-8"),
certificate=str(key, "utf-8"),
Comment on lines +331 to +332

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using str(bytes, "utf-8") is unidiomatic in Python. It is highly recommended to use .decode("utf-8") instead, which is the standard and more readable way to convert bytes to a string.

Suggested change
cert_thumbprint=str(thumbprint, "utf-8"),
certificate=str(key, "utf-8"),
cert_thumbprint=thumbprint.decode("utf-8"),
certificate=key.decode("utf-8"),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the functionality remains same for key.decode("utf-8") and str(key, "utf-8"), thinking str seemed to convey that it converts to a string better, i retained it.

scopes=list(client_certificate.scopes),
)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "cognite-extractor-utils"
version = "7.12.3"
version = "7.12.4"
description = "Utilities for easier development of extractors for CDF"
authors = [
{name = "Mathias Lohne", email = "mathias.lohne@cognite.com"}
Expand Down
82 changes: 82 additions & 0 deletions tests/test_unstable/test_certificate_auth_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright 2026 Cognite AS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import binascii
import os
from pathlib import Path

import pytest
from cognite.client import CogniteClient

from cognite.extractorutils.configtools._util import _load_certificate_data
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig

_AUTHORITY_URL = os.environ.get("COGNITE_PROJECT_AUTHORITY_URL")
_CLIENT_ID = os.environ.get("COGNITE_DEV_CLIENT_ID") or os.environ.get("COGNITE_CLIENT_ID")
_PROJECT = os.environ.get("COGNITE_PROJECT")
_BASE_URL = os.environ.get("COGNITE_BASE_URL")
_SCOPES = os.environ.get("COGNITE_TOKEN_SCOPES")
_PEM = os.environ.get("CERTIFICATE_AUTH_PEM")


@pytest.fixture(scope="module")
def cert_pem_path(tmp_path_factory: pytest.TempPathFactory) -> Path:
if not _PEM:
raise ValueError(
f"Expected environment variable CERTIFICATE_AUTH_PEM to be set to run integration tests. Got: {_PEM}"
)
pem_bytes = base64.b64decode(_PEM)
Comment thread
Yaseen-A-Khan marked this conversation as resolved.
path = tmp_path_factory.mktemp("certs") / "cert.pem"
path.write_bytes(pem_bytes)
return path


@pytest.fixture(scope="module")
def cognite_client(cert_pem_path: Path) -> CogniteClient:
config = ConnectionConfig.model_validate(
{
"project": _PROJECT,
"base_url": _BASE_URL,
"integration": {"external_id": "extractor-utils-cert-integration-test"},
"authentication": {
"type": "client-certificate",
"client_id": _CLIENT_ID,
"path": str(cert_pem_path),
"authority_url": _AUTHORITY_URL,
"scopes": _SCOPES,
},
}
)
return config.get_cognite_client("extractor-utils-cert-integration-test")


@pytest.mark.unstable
def test_load_certificate_data_parses_pem(cert_pem_path: Path) -> None:
"""_load_certificate_data must parse the real PEM and return valid hex bytes."""
thumbprint, key = _load_certificate_data(cert_pem_path, password=None)
assert isinstance(thumbprint, bytes)
assert isinstance(key, bytes)
binascii.a2b_hex(thumbprint) # raises if thumbprint is not valid hex


@pytest.mark.unstable
def test_connection_config_certificate_auth(cognite_client: CogniteClient) -> None:
"""ConnectionConfig with client-certificate must acquire a token and grant access to the expected CDF project."""
token_info = cognite_client.iam.token.inspect()
assert token_info is not None
assert token_info.subject, "Token subject must be non-empty (confirms a real identity was issued)"
assert any(p.url_name == _PROJECT for p in token_info.projects), (
f"Expected project '{_PROJECT}' in token's project list, got: {[p.url_name for p in token_info.projects]}"
)
89 changes: 89 additions & 0 deletions tests/tests_integration/test_certificate_auth_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2026 Cognite AS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import binascii
import os
from pathlib import Path

import pytest
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig
from cognite.client.credentials import OAuthClientCertificate

from cognite.extractorutils.configtools._util import _load_certificate_data

_AUTHORITY_URL = os.environ.get("COGNITE_PROJECT_AUTHORITY_URL")
_CLIENT_ID = os.environ.get("COGNITE_CLIENT_ID")
_PROJECT = os.environ.get("COGNITE_PROJECT")
_BASE_URL = os.environ.get("COGNITE_BASE_URL")
_PEM = os.environ.get("CERTIFICATE_AUTH_PEM")
# Split by comma or whitespace; fall back to the standard CDF scope derived from base URL.
_SCOPES = [f"{_BASE_URL}/.default"]


@pytest.fixture(scope="module")
def cert_pem_path(tmp_path_factory: pytest.TempPathFactory) -> Path:
"""Decode the base64 PEM from env and write it to a temp file."""
if not _PEM:
raise ValueError(
f"Expected environment variable CERTIFICATE_AUTH_PEM to be set to run integration tests. Got: {_PEM}"
)
pem_bytes = base64.b64decode(_PEM)
path = tmp_path_factory.mktemp("certs") / "cert.pem"
path.write_bytes(pem_bytes)
return path


@pytest.fixture(scope="module")
def cognite_client(cert_pem_path: Path) -> CogniteClient:
thumbprint, key = _load_certificate_data(cert_pem_path, password=None)
credentials = OAuthClientCertificate(
authority_url=_AUTHORITY_URL,
client_id=_CLIENT_ID,
cert_thumbprint=str(thumbprint, "utf-8"),
certificate=str(key, "utf-8"),
scopes=_SCOPES,
)
config = ClientConfig(
client_name="extractor-utils-cert-integration-test",
project=_PROJECT,
base_url=_BASE_URL,
credentials=credentials,
)
return CogniteClient(config)


def test_load_certificate_data_parses_pem(cert_pem_path: Path) -> None:
"""_load_certificate_data must parse the real PEM and return valid hex bytes."""
thumbprint, key = _load_certificate_data(cert_pem_path, password=None)
assert isinstance(thumbprint, bytes)
assert isinstance(key, bytes)
binascii.a2b_hex(thumbprint) # raises if thumbprint is not valid hex


def test_token_acquisition_succeeds(cognite_client: CogniteClient) -> None:
"""End-to-end: certificate auth must successfully acquire a token and reach CDF."""
token_info = cognite_client.iam.token.inspect()
assert token_info is not None
assert token_info.subject, "Token subject must be non-empty (confirms a real identity was issued)"


def test_api_call_with_certificate_auth(cognite_client: CogniteClient) -> None:
"""Verify the acquired token grants access to the expected CDF project."""
token_info = cognite_client.iam.token.inspect()
projects = token_info.projects
assert any(p.url_name == _PROJECT for p in projects), (
f"Expected project '{_PROJECT}' in token's project list, got: {[p.url_name for p in projects]}"
)
Loading
Loading