From 66e98ff4556ec063a8d278d3bed174294ea971c2 Mon Sep 17 00:00:00 2001 From: Yaseen Ahmed Khan Date: Wed, 24 Jun 2026 03:32:46 +0530 Subject: [PATCH 1/7] fix(cert-auth): decode certificate bytes correctly before passing to OAuthClientCertificate --- cognite/extractorutils/configtools/_util.py | 2 +- cognite/extractorutils/configtools/elements.py | 4 ++-- cognite/extractorutils/unstable/configuration/models.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cognite/extractorutils/configtools/_util.py b/cognite/extractorutils/configtools/_util.py index 7f247b11..1a641b7c 100644 --- a/cognite/extractorutils/configtools/_util.py +++ b/cognite/extractorutils/configtools/_util.py @@ -83,7 +83,7 @@ def translate_camel(key: str) -> str: raise ValueError(f"Invalid case style: {case_style}") -def _load_certificate_data(cert_path: str | Path, password: str | None) -> tuple[str, str] | tuple[bytes, bytes]: +def _load_certificate_data(cert_path: str | Path, password: str | None) -> tuple[bytes, bytes]: path = Path(cert_path) if isinstance(cert_path, str) else cert_path cert_data = Path(path).read_bytes() diff --git a/cognite/extractorutils/configtools/elements.py b/cognite/extractorutils/configtools/elements.py index aa077818..8be9e930 100644 --- a/cognite/extractorutils/configtools/elements.py +++ b/cognite/extractorutils/configtools/elements.py @@ -444,8 +444,8 @@ def get_cognite_client( credential_provider = OAuthClientCertificate( authority_url=authority_url, client_id=self.idp_authentication.client_id, - cert_thumbprint=str(thumprint), - certificate=str(key), + cert_thumbprint=str(thumprint, "utf-8"), + certificate=str(key, "utf-8"), scopes=self.idp_authentication.scopes, ) diff --git a/cognite/extractorutils/unstable/configuration/models.py b/cognite/extractorutils/unstable/configuration/models.py index 60f919f8..394443ea 100644 --- a/cognite/extractorutils/unstable/configuration/models.py +++ b/cognite/extractorutils/unstable/configuration/models.py @@ -328,8 +328,8 @@ def get_cognite_client(self, client_name: str) -> CogniteClient: credential_provider = OAuthClientCertificate( authority_url=client_certificate.authority_url, client_id=client_certificate.client_id, - cert_thumbprint=str(thumbprint), - certificate=str(key), + cert_thumbprint=str(thumbprint, "utf-8"), + certificate=str(key, "utf-8"), scopes=list(client_certificate.scopes), ) From 0d37f91001a7aacddf4a212e79771dd09a386c97 Mon Sep 17 00:00:00 2001 From: Yaseen Ahmed Khan Date: Wed, 24 Jun 2026 04:00:08 +0530 Subject: [PATCH 2/7] adding unit tests --- tests/tests_unit/test_certificate_auth.py | 209 ++++++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100644 tests/tests_unit/test_certificate_auth.py diff --git a/tests/tests_unit/test_certificate_auth.py b/tests/tests_unit/test_certificate_auth.py new file mode 100644 index 00000000..3fcf2450 --- /dev/null +++ b/tests/tests_unit/test_certificate_auth.py @@ -0,0 +1,209 @@ +# Copyright 2024 Cognite AS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import binascii +import datetime +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from cognite.client.credentials import OAuthClientCertificate +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID + +# MSAL internal that calls binascii.a2b_hex on the thumbprint during JWT assertion building. +# If this import breaks after a MSAL upgrade, verify a2b_hex is still called on cert_thumbprint. +from msal.oauth2cli.assertion import _encode_thumbprint + +from cognite.extractorutils.configtools._util import _load_certificate_data +from cognite.extractorutils.configtools.elements import ( + AuthenticatorConfig, + CertificateConfig, + CogniteConfig, +) +from cognite.extractorutils.unstable.configuration.models import ConnectionConfig + +_FAKE_THUMB = b"A5BFE559AA1234567890ABCDEF123456DEADBEEF" +_FAKE_KEY = b"-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA...\n-----END RSA PRIVATE KEY-----\n" +_AUTHORITY_URL = "https://login.microsoftonline.com/test-tenant" +_CLIENT_ID = "test-client-id" +_SCOPES = ["https://api.cognitedata.com/.default"] + + +@pytest.fixture(scope="module") +def self_signed_cert_pem(tmp_path_factory: pytest.TempPathFactory) -> Path: + """Generate a self-signed RSA cert + private key in a single .pem file.""" + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "test")]) + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)) + .not_valid_after(datetime.datetime(2099, 1, 1, tzinfo=datetime.timezone.utc)) + .sign(key, hashes.SHA256()) + ) + pem = cert.public_bytes(serialization.Encoding.PEM) + pem += key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + path = tmp_path_factory.mktemp("certs") / "test_cert.pem" + path.write_bytes(pem) + return path + + +def test_load_certificate_data_returns_bytes(self_signed_cert_pem: Path) -> None: + """_load_certificate_data must return (bytes, bytes) — establishes the type contract.""" + thumbprint, key = _load_certificate_data(self_signed_cert_pem, password=None) + assert isinstance(thumbprint, bytes), "thumbprint must be bytes" + assert isinstance(key, bytes), "private key must be bytes" + + +def test_bare_str_produces_repr_with_b_prefix(self_signed_cert_pem: Path) -> None: + """Document the bug: bare str() wraps bytes in b'...' repr, making a2b_hex fail.""" + thumbprint, _ = _load_certificate_data(self_signed_cert_pem, password=None) + broken = str(thumbprint) + assert broken.startswith("b'"), f"Expected b'...' repr, got: {broken!r}" + with pytest.raises(binascii.Error, match=r"(?i)odd"): + binascii.a2b_hex(broken) + + +def test_utf8_decode_produces_clean_hex(self_signed_cert_pem: Path) -> None: + """str(bytes, 'utf-8') yields a clean even-length hex string that a2b_hex accepts.""" + thumbprint, _ = _load_certificate_data(self_signed_cert_pem, password=None) + clean = str(thumbprint, "utf-8") + assert not clean.startswith("b'"), f"Unexpected b'...' prefix: {clean!r}" + assert len(clean) % 2 == 0, f"Hex string must have even length, got {len(clean)}" + binascii.a2b_hex(clean) # must not raise + + +def test_oauth_certificate_constructs_with_correct_strings(self_signed_cert_pem: Path) -> None: + """OAuthClientCertificate must accept decoded thumbprint and key without raising.""" + thumbprint, key = _load_certificate_data(self_signed_cert_pem, password=None) + # ConfidentialClientApplication validates authority via network; mock it out. + with patch("msal.ConfidentialClientApplication.__init__", return_value=None): + OAuthClientCertificate( + authority_url=_AUTHORITY_URL, + client_id=_CLIENT_ID, + cert_thumbprint=str(thumbprint, "utf-8"), + certificate=str(key, "utf-8"), + scopes=_SCOPES, + ) + + +def test_oauth_certificate_broken_str_raises_on_auth(self_signed_cert_pem: Path) -> None: + """Regression anchor: bare str() thumbprint causes binascii.Error during JWT assertion building. + + OAuthClientCertificate delegates JWT creation to msal.oauth2cli.assertion._encode_thumbprint, + which calls binascii.a2b_hex. Verify this call path fails with the broken repr string. + """ + thumbprint, _ = _load_certificate_data(self_signed_cert_pem, password=None) + broken = str(thumbprint) # intentionally broken — b'...' repr + + with pytest.raises(binascii.Error, match=r"(?i)odd"): + _encode_thumbprint(broken) + + +def test_elements_passes_decoded_strings_to_oauth(tmp_path: Path) -> None: + """CogniteConfig.get_cognite_client() (elements.py) must pass decoded strings to OAuthClientCertificate.""" + cert_path = tmp_path / "cert.pem" + cert_path.write_bytes(b"placeholder") # contents irrelevant; _load_certificate_data is mocked + + mock_oauth = MagicMock() + + config = CogniteConfig( + project="my-project", + host="https://api.cognitedata.com", + idp_authentication=AuthenticatorConfig( + client_id=_CLIENT_ID, + scopes=_SCOPES, + certificate=CertificateConfig( + path=str(cert_path), + password=None, + authority_url=_AUTHORITY_URL, + ), + ), + ) + + # _load_certificate_data is imported directly into elements, so patch there. + with ( + patch( + "cognite.extractorutils.configtools.elements._load_certificate_data", return_value=(_FAKE_THUMB, _FAKE_KEY) + ), + patch("cognite.extractorutils.configtools.elements.OAuthClientCertificate", mock_oauth), + patch("cognite.extractorutils.configtools.elements.ClientConfig"), + patch("cognite.extractorutils.configtools.elements.CogniteClient"), + ): + config.get_cognite_client("test-extractor") + + mock_oauth.assert_called_once() + kwargs = mock_oauth.call_args.kwargs + # Variable named to match the source in elements.py: `thumprint` + thumprint = kwargs["cert_thumbprint"] + key = kwargs["certificate"] + + assert not thumprint.startswith("b'"), f"cert_thumbprint must not be a bytes repr; got {thumprint!r}" + assert not key.startswith("b'"), f"certificate must not be a bytes repr; got {key!r}" + assert thumprint == _FAKE_THUMB.decode("utf-8"), f"cert_thumbprint must equal decoded bytes; got {thumprint!r}" + assert key == _FAKE_KEY.decode("utf-8"), f"certificate must equal decoded bytes; got {key!r}" + + +def test_models_passes_decoded_strings_to_oauth(tmp_path: Path) -> None: + """ConnectionConfig.get_cognite_client() (models.py) must pass decoded strings to OAuthClientCertificate.""" + cert_path = tmp_path / "cert.pem" + cert_path.write_bytes(b"placeholder") + + mock_oauth = MagicMock() + + config = ConnectionConfig.model_validate( + { + "project": "my-project", + "base_url": "https://api.cognitedata.com", + "integration": {"external_id": "test-extractor"}, + "authentication": { + "type": "client-certificate", + "client_id": _CLIENT_ID, + "path": str(cert_path), + "authority_url": _AUTHORITY_URL, + "scopes": " ".join(_SCOPES), + }, + } + ) + + with ( + patch( + "cognite.extractorutils.unstable.configuration.models._load_certificate_data", + return_value=(_FAKE_THUMB, _FAKE_KEY), + ), + patch("cognite.extractorutils.unstable.configuration.models.OAuthClientCertificate", mock_oauth), + patch("cognite.extractorutils.unstable.configuration.models.ClientConfig"), + patch("cognite.extractorutils.unstable.configuration.models.CogniteClient"), + ): + config.get_cognite_client("test-extractor") + + mock_oauth.assert_called_once() + kwargs = mock_oauth.call_args.kwargs + thumbprint = kwargs["cert_thumbprint"] + key = kwargs["certificate"] + + assert not thumbprint.startswith("b'"), f"cert_thumbprint must not be a bytes repr; got {thumbprint!r}" + assert not key.startswith("b'"), f"certificate must not be a bytes repr; got {key!r}" + assert thumbprint == _FAKE_THUMB.decode("utf-8"), f"cert_thumbprint must equal decoded bytes; got {thumbprint!r}" + assert key == _FAKE_KEY.decode("utf-8"), f"certificate must equal decoded bytes; got {key!r}" From bd5d815d2c804cecca8afaadb7b2c2f2bf48a79f Mon Sep 17 00:00:00 2001 From: Yaseen Ahmed Khan Date: Mon, 29 Jun 2026 14:38:55 +0530 Subject: [PATCH 3/7] Added integration tests, updated the version and changelog for release --- .github/workflows/test_and_build.yml | 2 + CHANGELOG.md | 5 ++ cognite/extractorutils/__init__.py | 2 +- pyproject.toml | 2 +- .../test_certificate_auth_integration.py | 78 +++++++++++++++++ .../test_certificate_auth_integration.py | 85 +++++++++++++++++++ tests/tests_unit/test_certificate_auth.py | 24 ++---- 7 files changed, 181 insertions(+), 17 deletions(-) create mode 100644 tests/test_unstable/test_certificate_auth_integration.py create mode 100644 tests/tests_integration/test_certificate_auth_integration.py diff --git a/.github/workflows/test_and_build.yml b/.github/workflows/test_and_build.yml index f763a821..2743e7c3 100644 --- a/.github/workflows/test_and_build.yml +++ b/.github/workflows/test_and_build.yml @@ -39,6 +39,8 @@ jobs: COGNITE_CLIENT_SECRET: ${{ secrets.COGNITE_PROJECT_CLIENT_SECRET }} COGNITE_TOKEN_SCOPES: ${{ secrets.COGNITE_PROJECT_SCOPES }} COGNITE_TOKEN_URL: ${{ secrets.COGNITE_PROJECT_TOKEN_URL }} + COGNITE_PROJECT_AUTHORITY_URL: ${{ secrets.COGNITE_PROJECT_AUTHORITY_URL }} + CERTIFICATE_AUTH_PEM: ${{ secrets.CERTIFICATE_AUTH_PEM }} KEYVAULT_CLIENT_ID: ${{ secrets.KEYVAULT_CLIENT_ID }} KEYVAULT_TENANT_ID: ${{ secrets.KEYVAULT_TENANT_ID }} KEYVAULT_CLIENT_SECRET: ${{ secrets.KEYVAULT_CLIENT_SECRET }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 31b729b4..a4cc470d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,11 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## 7.12.4 + +### Fixed +* In the `unstabe` and configtools: Fixed the decoding of the `thumbprint` and `key` from bytes during certificate authentication. + ## 7.12.3 ### Fixed diff --git a/cognite/extractorutils/__init__.py b/cognite/extractorutils/__init__.py index 8997628f..3b52740b 100644 --- a/cognite/extractorutils/__init__.py +++ b/cognite/extractorutils/__init__.py @@ -16,7 +16,7 @@ Cognite extractor utils is a Python package that simplifies the development of new extractors. """ -__version__ = "7.12.3" +__version__ = "7.12.4" from .base import Extractor __all__ = ["Extractor"] diff --git a/pyproject.toml b/pyproject.toml index 2d2978b4..d3eafc21 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cognite-extractor-utils" -version = "7.12.3" +version = "7.12.4" description = "Utilities for easier development of extractors for CDF" authors = [ {name = "Mathias Lohne", email = "mathias.lohne@cognite.com"} diff --git a/tests/test_unstable/test_certificate_auth_integration.py b/tests/test_unstable/test_certificate_auth_integration.py new file mode 100644 index 00000000..6b3b4c0e --- /dev/null +++ b/tests/test_unstable/test_certificate_auth_integration.py @@ -0,0 +1,78 @@ +# Copyright 2026 Cognite AS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import binascii +import os +from pathlib import Path + +import pytest +from cognite.client import CogniteClient + +from cognite.extractorutils.configtools._util import _load_certificate_data +from cognite.extractorutils.unstable.configuration.models import ConnectionConfig + +_AUTHORITY_URL = os.environ.get("COGNITE_AUTHORITY_URL") +_CLIENT_ID = os.environ.get("COGNITE_DEV_CLIENT_ID") or os.environ.get("COGNITE_CLIENT_ID") +_PROJECT = os.environ.get("COGNITE_DEV_PROJECT") +_BASE_URL = os.environ.get("COGNITE_DEV_BASE_URL") +_SCOPES = os.environ.get("COGNITE_DEV_TOKEN_SCOPES") +_PEM = os.environ.get("CERTIFICATE_AUTH_PEM") + + +@pytest.fixture(scope="module") +def cert_pem_path(tmp_path_factory: pytest.TempPathFactory) -> Path: + pem_bytes = base64.b64decode(_PEM) + path = tmp_path_factory.mktemp("certs") / "cert.pem" + path.write_bytes(pem_bytes) + return path + + +@pytest.fixture(scope="module") +def cognite_client(cert_pem_path: Path) -> CogniteClient: + config = ConnectionConfig.model_validate( + { + "project": _PROJECT, + "base_url": _BASE_URL, + "integration": {"external_id": "extractor-utils-cert-integration-test"}, + "authentication": { + "type": "client-certificate", + "client_id": _CLIENT_ID, + "path": str(cert_pem_path), + "authority_url": _AUTHORITY_URL, + "scopes": _SCOPES, + }, + } + ) + return config.get_cognite_client("extractor-utils-cert-integration-test") + + +@pytest.mark.unstable +def test_load_certificate_data_parses_pem(cert_pem_path: Path) -> None: + """_load_certificate_data must parse the real PEM and return valid hex bytes.""" + thumbprint, key = _load_certificate_data(cert_pem_path, password=None) + assert isinstance(thumbprint, bytes) + assert isinstance(key, bytes) + binascii.a2b_hex(thumbprint) # raises if thumbprint is not valid hex + + +@pytest.mark.unstable +def test_connection_config_certificate_auth(cognite_client: CogniteClient) -> None: + """ConnectionConfig with client-certificate must acquire a token and grant access to the expected CDF project.""" + token_info = cognite_client.iam.token.inspect() + assert token_info is not None + assert token_info.subject, "Token subject must be non-empty (confirms a real identity was issued)" + assert any(p.url_name == _PROJECT for p in token_info.projects), ( + f"Expected project '{_PROJECT}' in token's project list, got: {[p.url_name for p in token_info.projects]}" + ) diff --git a/tests/tests_integration/test_certificate_auth_integration.py b/tests/tests_integration/test_certificate_auth_integration.py new file mode 100644 index 00000000..d63f1e2b --- /dev/null +++ b/tests/tests_integration/test_certificate_auth_integration.py @@ -0,0 +1,85 @@ +# Copyright 2026 Cognite AS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import binascii +import os +from pathlib import Path + +import pytest +from cognite.client import CogniteClient +from cognite.client.config import ClientConfig +from cognite.client.credentials import OAuthClientCertificate + +from cognite.extractorutils.configtools._util import _load_certificate_data + +_AUTHORITY_URL = os.environ.get("COGNITE_AUTHORITY_URL") +_CLIENT_ID = os.environ.get("COGNITE_CLIENT_ID") +_PROJECT = os.environ.get("COGNITE_PROJECT") +_BASE_URL = os.environ.get("COGNITE_BASE_URL") +_PEM = os.environ.get("CERTIFICATE_AUTH_PEM") +# Split by comma or whitespace; fall back to the standard CDF scope derived from base URL. +_SCOPES = [f"{_BASE_URL}/.default"] + + +@pytest.fixture(scope="module") +def cert_pem_path(tmp_path_factory: pytest.TempPathFactory) -> Path: + """Decode the base64 PEM from env and write it to a temp file.""" + pem_bytes = base64.b64decode(_PEM) + path = tmp_path_factory.mktemp("certs") / "cert.pem" + path.write_bytes(pem_bytes) + return path + + +@pytest.fixture(scope="module") +def cognite_client(cert_pem_path: Path) -> CogniteClient: + thumbprint, key = _load_certificate_data(cert_pem_path, password=None) + credentials = OAuthClientCertificate( + authority_url=_AUTHORITY_URL, + client_id=_CLIENT_ID, + cert_thumbprint=str(thumbprint, "utf-8"), + certificate=str(key, "utf-8"), + scopes=_SCOPES, + ) + config = ClientConfig( + client_name="extractor-utils-cert-integration-test", + project=_PROJECT, + base_url=_BASE_URL, + credentials=credentials, + ) + return CogniteClient(config) + + +def test_load_certificate_data_parses_pem(cert_pem_path: Path) -> None: + """_load_certificate_data must parse the real PEM and return valid hex bytes.""" + thumbprint, key = _load_certificate_data(cert_pem_path, password=None) + assert isinstance(thumbprint, bytes) + assert isinstance(key, bytes) + binascii.a2b_hex(thumbprint) # raises if thumbprint is not valid hex + + +def test_token_acquisition_succeeds(cognite_client: CogniteClient) -> None: + """End-to-end: certificate auth must successfully acquire a token and reach CDF.""" + token_info = cognite_client.iam.token.inspect() + assert token_info is not None + assert token_info.subject, "Token subject must be non-empty (confirms a real identity was issued)" + + +def test_api_call_with_certificate_auth(cognite_client: CogniteClient) -> None: + """Verify the acquired token grants access to the expected CDF project.""" + token_info = cognite_client.iam.token.inspect() + projects = token_info.projects + assert any(p.url_name == _PROJECT for p in projects), ( + f"Expected project '{_PROJECT}' in token's project list, got: {[p.url_name for p in projects]}" + ) diff --git a/tests/tests_unit/test_certificate_auth.py b/tests/tests_unit/test_certificate_auth.py index 3fcf2450..624bed5d 100644 --- a/tests/tests_unit/test_certificate_auth.py +++ b/tests/tests_unit/test_certificate_auth.py @@ -76,47 +76,41 @@ def test_load_certificate_data_returns_bytes(self_signed_cert_pem: Path) -> None assert isinstance(key, bytes), "private key must be bytes" -def test_bare_str_produces_repr_with_b_prefix(self_signed_cert_pem: Path) -> None: +def test_bare_str_produces_repr_with_b_prefix() -> None: """Document the bug: bare str() wraps bytes in b'...' repr, making a2b_hex fail.""" - thumbprint, _ = _load_certificate_data(self_signed_cert_pem, password=None) - broken = str(thumbprint) + broken = str(_FAKE_THUMB) assert broken.startswith("b'"), f"Expected b'...' repr, got: {broken!r}" with pytest.raises(binascii.Error, match=r"(?i)odd"): binascii.a2b_hex(broken) -def test_utf8_decode_produces_clean_hex(self_signed_cert_pem: Path) -> None: +def test_utf8_decode_produces_clean_hex() -> None: """str(bytes, 'utf-8') yields a clean even-length hex string that a2b_hex accepts.""" - thumbprint, _ = _load_certificate_data(self_signed_cert_pem, password=None) - clean = str(thumbprint, "utf-8") + clean = str(_FAKE_THUMB, "utf-8") assert not clean.startswith("b'"), f"Unexpected b'...' prefix: {clean!r}" assert len(clean) % 2 == 0, f"Hex string must have even length, got {len(clean)}" binascii.a2b_hex(clean) # must not raise -def test_oauth_certificate_constructs_with_correct_strings(self_signed_cert_pem: Path) -> None: +def test_oauth_certificate_constructs_with_correct_strings() -> None: """OAuthClientCertificate must accept decoded thumbprint and key without raising.""" - thumbprint, key = _load_certificate_data(self_signed_cert_pem, password=None) - # ConfidentialClientApplication validates authority via network; mock it out. with patch("msal.ConfidentialClientApplication.__init__", return_value=None): OAuthClientCertificate( authority_url=_AUTHORITY_URL, client_id=_CLIENT_ID, - cert_thumbprint=str(thumbprint, "utf-8"), - certificate=str(key, "utf-8"), + cert_thumbprint=str(_FAKE_THUMB, "utf-8"), + certificate=str(_FAKE_KEY, "utf-8"), scopes=_SCOPES, ) -def test_oauth_certificate_broken_str_raises_on_auth(self_signed_cert_pem: Path) -> None: +def test_oauth_certificate_broken_str_raises_on_auth() -> None: """Regression anchor: bare str() thumbprint causes binascii.Error during JWT assertion building. OAuthClientCertificate delegates JWT creation to msal.oauth2cli.assertion._encode_thumbprint, which calls binascii.a2b_hex. Verify this call path fails with the broken repr string. """ - thumbprint, _ = _load_certificate_data(self_signed_cert_pem, password=None) - broken = str(thumbprint) # intentionally broken — b'...' repr - + broken = str(_FAKE_THUMB) # intentionally broken — b'...' repr with pytest.raises(binascii.Error, match=r"(?i)odd"): _encode_thumbprint(broken) From 7ce96d120434925529acbdf7f48eed94d0e73150 Mon Sep 17 00:00:00 2001 From: Yaseen Ahmed Khan Date: Mon, 29 Jun 2026 15:01:20 +0530 Subject: [PATCH 4/7] updated the env variable names --- tests/test_unstable/test_certificate_auth_integration.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_unstable/test_certificate_auth_integration.py b/tests/test_unstable/test_certificate_auth_integration.py index 6b3b4c0e..6e3d8ba2 100644 --- a/tests/test_unstable/test_certificate_auth_integration.py +++ b/tests/test_unstable/test_certificate_auth_integration.py @@ -23,11 +23,11 @@ from cognite.extractorutils.configtools._util import _load_certificate_data from cognite.extractorutils.unstable.configuration.models import ConnectionConfig -_AUTHORITY_URL = os.environ.get("COGNITE_AUTHORITY_URL") +_AUTHORITY_URL = os.environ.get("COGNITE_PROJECT_AUTHORITY_URL") _CLIENT_ID = os.environ.get("COGNITE_DEV_CLIENT_ID") or os.environ.get("COGNITE_CLIENT_ID") -_PROJECT = os.environ.get("COGNITE_DEV_PROJECT") -_BASE_URL = os.environ.get("COGNITE_DEV_BASE_URL") -_SCOPES = os.environ.get("COGNITE_DEV_TOKEN_SCOPES") +_PROJECT = os.environ.get("COGNITE_PROJECT") +_BASE_URL = os.environ.get("COGNITE_BASE_URL") +_SCOPES = os.environ.get("COGNITE_TOKEN_SCOPES") _PEM = os.environ.get("CERTIFICATE_AUTH_PEM") From 012f110d2de78c2dc3f31c266a58a34cd64ba8c1 Mon Sep 17 00:00:00 2001 From: Yaseen Ahmed Khan Date: Mon, 29 Jun 2026 15:51:25 +0530 Subject: [PATCH 5/7] fix integration test --- tests/tests_integration/test_certificate_auth_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_integration/test_certificate_auth_integration.py b/tests/tests_integration/test_certificate_auth_integration.py index d63f1e2b..edf423b3 100644 --- a/tests/tests_integration/test_certificate_auth_integration.py +++ b/tests/tests_integration/test_certificate_auth_integration.py @@ -24,7 +24,7 @@ from cognite.extractorutils.configtools._util import _load_certificate_data -_AUTHORITY_URL = os.environ.get("COGNITE_AUTHORITY_URL") +_AUTHORITY_URL = os.environ.get("COGNITE_PROJECT_AUTHORITY_URL") _CLIENT_ID = os.environ.get("COGNITE_CLIENT_ID") _PROJECT = os.environ.get("COGNITE_PROJECT") _BASE_URL = os.environ.get("COGNITE_BASE_URL") From 4bb2609904e7c3a727584c20ff541d2ca6c345a6 Mon Sep 17 00:00:00 2001 From: Yaseen Ahmed Khan Date: Mon, 29 Jun 2026 16:03:32 +0530 Subject: [PATCH 6/7] update description --- tests/tests_unit/test_certificate_auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_unit/test_certificate_auth.py b/tests/tests_unit/test_certificate_auth.py index 624bed5d..a3c10e7c 100644 --- a/tests/tests_unit/test_certificate_auth.py +++ b/tests/tests_unit/test_certificate_auth.py @@ -1,4 +1,4 @@ -# Copyright 2024 Cognite AS +# Copyright 2026 Cognite AS # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From ea8fff66c06e6616fc0fb4cffc9938891adb60ca Mon Sep 17 00:00:00 2001 From: Yaseen Ahmed Khan Date: Mon, 29 Jun 2026 23:52:35 +0530 Subject: [PATCH 7/7] added a safe check for CERTIFICATE_AUTH_PEM to be set, updated the change log to specify the certicate auth is towards CDF. --- CHANGELOG.md | 2 +- tests/test_unstable/test_certificate_auth_integration.py | 4 ++++ tests/tests_integration/test_certificate_auth_integration.py | 4 ++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4cc470d..af2f582d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ Changes are grouped as follows ## 7.12.4 ### Fixed -* In the `unstabe` and configtools: Fixed the decoding of the `thumbprint` and `key` from bytes during certificate authentication. +* In the `unstabe` and configtools: Fixed the decoding of the `thumbprint` and `key` from bytes during certificate authentication towards CDF. ## 7.12.3 diff --git a/tests/test_unstable/test_certificate_auth_integration.py b/tests/test_unstable/test_certificate_auth_integration.py index 6e3d8ba2..4fcfe4f4 100644 --- a/tests/test_unstable/test_certificate_auth_integration.py +++ b/tests/test_unstable/test_certificate_auth_integration.py @@ -33,6 +33,10 @@ @pytest.fixture(scope="module") def cert_pem_path(tmp_path_factory: pytest.TempPathFactory) -> Path: + if not _PEM: + raise ValueError( + f"Expected environment variable CERTIFICATE_AUTH_PEM to be set to run integration tests. Got: {_PEM}" + ) pem_bytes = base64.b64decode(_PEM) path = tmp_path_factory.mktemp("certs") / "cert.pem" path.write_bytes(pem_bytes) diff --git a/tests/tests_integration/test_certificate_auth_integration.py b/tests/tests_integration/test_certificate_auth_integration.py index edf423b3..7641be51 100644 --- a/tests/tests_integration/test_certificate_auth_integration.py +++ b/tests/tests_integration/test_certificate_auth_integration.py @@ -36,6 +36,10 @@ @pytest.fixture(scope="module") def cert_pem_path(tmp_path_factory: pytest.TempPathFactory) -> Path: """Decode the base64 PEM from env and write it to a temp file.""" + if not _PEM: + raise ValueError( + f"Expected environment variable CERTIFICATE_AUTH_PEM to be set to run integration tests. Got: {_PEM}" + ) pem_bytes = base64.b64decode(_PEM) path = tmp_path_factory.mktemp("certs") / "cert.pem" path.write_bytes(pem_bytes)