Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 67 additions & 28 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import importlib.util
import json
import os
import urllib.parse
from typing import Any, List, Literal, Optional, TYPE_CHECKING

import pyarrow as pa

from influxdb_client_3.version import USER_AGENT
from influxdb_client_3.write_client._sync import rest_client as rest

if TYPE_CHECKING:
import pandas as pd
import polars as pl
Expand All @@ -14,7 +18,7 @@
from influxdb_client_3.exceptions import InfluxDBError
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
from influxdb_client_3.read_file import UploadFile
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
from influxdb_client_3.write_client import WriteOptions, Point
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
PointSettings, DefaultWriteOptions, WriteType
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
Expand Down Expand Up @@ -185,15 +189,19 @@ def _parse_timeout(to: str) -> int:
class InfluxDBClient3:
def __init__(
self,
host=None,
host='localhost',
org=None,
database=None,
token=None,
auth_scheme=None,
enable_gzip=False,
gzip_threshold=None,
write_client_options=None,
flight_client_options=None,
write_port_overwrite=None,
query_port_overwrite=None,
disable_grpc_compression=False,
point_settings=None,
**kwargs):
"""
Initialize an InfluxDB client.
Expand All @@ -212,6 +220,12 @@ def __init__(
:type flight_client_options: dict[str, any]
:param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False.
:type disable_grpc_compression: bool
:param point_settings The settings for Points
:type point_settings: PointSettings
:param enable_gzip: Enable GZIP compression for write requests.
:type enable_gzip: bool
:param gzip_threshold: Minimum payload size (bytes) to trigger GZIP when enable_gzip is True.
:type gzip_threshold: int
:key auth_scheme: token authentication scheme. Set to "Bearer" for Edge.
:key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
:key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
Expand Down Expand Up @@ -293,14 +307,44 @@ def __init__(
if write_port_overwrite is not None:
port = write_port_overwrite

self._client = _InfluxDBClient(
url=f"{scheme}://{hostname}:{port}",
auth_schema = 'Token' if auth_scheme is None else auth_scheme
default_header = {
'User-Agent': USER_AGENT
}
if self._token is not None:
default_header['Authorization'] = f'{auth_schema} {self._token}'
self.base_url = f"{scheme}://{hostname}:{port}"
self.default_header = default_header
self.rest_client = rest.RestClient(
base_url=self.base_url,
default_header=default_header,
verify_ssl=kwargs.get('verify_ssl', True),
ssl_ca_cert=kwargs.get('ssl_ca_cert', None),
cert_file=kwargs.get('cert_file', None),
cert_key_file=kwargs.get('cert_key_file', None),
cert_key_password=kwargs.get('cert_key_password', None),
ssl_context=kwargs.get('ssl_context', None),
proxy=kwargs.get('proxy', None),
proxy_headers=kwargs.get('proxy_headers', None),
retries=kwargs.get('retries', False)
)

if point_settings is None:
point_settings = PointSettings()

self._write_api = _WriteApi(
Comment thread
NguyenHoangSon96 marked this conversation as resolved.
token=self._token,
bucket=self._database,
org=self._org,
gzip_threshold=gzip_threshold,
enable_gzip=enable_gzip,
auth_scheme=auth_scheme,
timeout=write_timeout,
**kwargs)

self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
default_header=default_header,
rest_client=self.rest_client,
point_settings=point_settings,
**self._write_client_options
)

if query_port_overwrite is not None:
port = query_port_overwrite
Expand Down Expand Up @@ -658,32 +702,28 @@ async def query_async(self, query: str, language: str = "sql", mode: str = "all"
except ArrowException as e:
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")

def get_server_version(self) -> str:
def get_server_version(self) -> Optional[str]:
"""
Get the version of the connected InfluxDB server.
Retrieves the server version by querying the designated endpoint and
extracting the version information from either response headers or
the response body.

This method makes a ping request to the server and extracts the version information
from either the response headers or response body.
This method interacts with a REST API endpoint to fetch the server's
version details, which might be stored in a specific HTTP header or
available in the response body as part of a JSON structure.

:return: The version string of the InfluxDB server.
:rtype: str
:return: The version string of the server if available, otherwise None.
:rtype: Optional[str]
"""
version = None
(resp_body, _, header) = self._client.api_client.call_api(
resource_path="/ping",
method="GET",
response_type=object
)

for key, value in header.items():
resp = self.rest_client.request(url='/ping', method="GET", headers=self.default_header)
for key, value in resp.getheaders().items():
if key.lower() == "x-influxdb-version":
version = value
break

if version is None and isinstance(resp_body, dict):
version = resp_body['version']
return value

return version
string_body = resp.get_string_body()
if string_body is not None:
return json.loads(string_body)['version']
return None

def flush(self):
"""
Expand All @@ -702,7 +742,6 @@ def close(self):
"""Close the client and clean up resources."""
self._write_api.close()
self._query_api.close()
self._client.close()

def __enter__(self):
return self
Expand Down
10 changes: 2 additions & 8 deletions influxdb_client_3/write_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@

from __future__ import absolute_import

from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
from influxdb_client_3.write_client.client.influxdb_client import InfluxDBClient
from influxdb_client_3.write_client.client.logging_handler import InfluxLoggingHandler
from influxdb_client_3.version import VERSION
from influxdb_client_3.write_client.client.write.point import Point

from influxdb_client_3.write_client.service.write_service import WriteService

from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
from influxdb_client_3.write_client.domain.write_precision import WritePrecision

from influxdb_client_3.write_client.configuration import Configuration
from influxdb_client_3.version import VERSION
__version__ = VERSION
Loading
Loading