# Copyright The OpenTelemetry Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import gzip
import logging
import random
import threading
import zlib
from io import BytesIO
from os import environ
from time import time
from typing import ( # noqa: F401
Any,
Callable,
Dict,
List,
Mapping,
Optional,
Sequence,
)
import requests
from requests.exceptions import ConnectionError
from typing_extensions import deprecated
from opentelemetry.exporter.otlp.proto.common._internal import (
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import (
OTLPMetricExporterMixin,
)
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import (
encode_metrics,
)
from opentelemetry.exporter.otlp.proto.http import (
_OTLP_HTTP_HEADERS,
Compression,
)
from opentelemetry.exporter.otlp.proto.http._common import (
_is_retryable,
_load_session_from_envvar,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401
ExportMetricsServiceRequest,
)
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
AnyValue,
ArrayValue,
InstrumentationScope,
KeyValue,
KeyValueList,
)
from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 # noqa: F401
from opentelemetry.proto.resource.v1.resource_pb2 import Resource # noqa: F401
from opentelemetry.proto.resource.v1.resource_pb2 import (
Resource as PB2Resource,
)
from opentelemetry.sdk.environment_variables import (
_OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER,
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
OTEL_EXPORTER_OTLP_CLIENT_KEY,
OTEL_EXPORTER_OTLP_COMPRESSION,
OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_HEADERS,
OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
OTEL_EXPORTER_OTLP_METRICS_CLIENT_CERTIFICATE,
OTEL_EXPORTER_OTLP_METRICS_CLIENT_KEY,
OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_HEADERS,
OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
OTEL_EXPORTER_OTLP_TIMEOUT,
)
from opentelemetry.sdk.metrics._internal.aggregation import Aggregation
from opentelemetry.sdk.metrics.export import ( # noqa: F401
AggregationTemporality,
Gauge,
MetricExporter,
MetricExportResult,
MetricsData,
Sum,
)
from opentelemetry.sdk.metrics.export import ( # noqa: F401
Histogram as HistogramType,
)
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.util.re import parse_env_headers
_logger = logging.getLogger(__name__)
DEFAULT_COMPRESSION = Compression.NoCompression
DEFAULT_ENDPOINT = "http://localhost:4318/"
DEFAULT_METRICS_EXPORT_PATH = "v1/metrics"
DEFAULT_TIMEOUT = 10 # in seconds
_MAX_RETRYS = 6
[docs]
class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin):
def __init__(
self,
endpoint: str | None = None,
certificate_file: str | None = None,
client_key_file: str | None = None,
client_certificate_file: str | None = None,
headers: dict[str, str] | None = None,
timeout: float | None = None,
compression: Compression | None = None,
session: requests.Session | None = None,
preferred_temporality: dict[type, AggregationTemporality]
| None = None,
preferred_aggregation: dict[type, Aggregation] | None = None,
):
self._shutdown_in_progress = threading.Event()
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
_append_metrics_path(
environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT)
),
)
self._certificate_file = certificate_file or environ.get(
OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True),
)
self._client_key_file = client_key_file or environ.get(
OTEL_EXPORTER_OTLP_METRICS_CLIENT_KEY,
environ.get(OTEL_EXPORTER_OTLP_CLIENT_KEY, None),
)
self._client_certificate_file = client_certificate_file or environ.get(
OTEL_EXPORTER_OTLP_METRICS_CLIENT_CERTIFICATE,
environ.get(OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, None),
)
self._client_cert = (
(self._client_certificate_file, self._client_key_file)
if self._client_certificate_file and self._client_key_file
else self._client_certificate_file
)
headers_string = environ.get(
OTEL_EXPORTER_OTLP_METRICS_HEADERS,
environ.get(OTEL_EXPORTER_OTLP_HEADERS, ""),
)
self._headers = headers or parse_env_headers(
headers_string, liberal=True
)
self._timeout = timeout or float(
environ.get(
OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT),
)
)
self._compression = compression or _compression_from_env()
self._session = (
session
or _load_session_from_envvar(
_OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER
)
or requests.Session()
)
self._session.headers.update(self._headers)
self._session.headers.update(_OTLP_HTTP_HEADERS)
# let users override our defaults
self._session.headers.update(self._headers)
if self._compression is not Compression.NoCompression:
self._session.headers.update(
{"Content-Encoding": self._compression.value}
)
self._common_configuration(
preferred_temporality, preferred_aggregation
)
self._shutdown = False
def _export(
self, serialized_data: bytes, timeout_sec: Optional[float] = None
):
data = serialized_data
if self._compression == Compression.Gzip:
gzip_data = BytesIO()
with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream:
gzip_stream.write(serialized_data)
data = gzip_data.getvalue()
elif self._compression == Compression.Deflate:
data = zlib.compress(serialized_data)
if timeout_sec is None:
timeout_sec = self._timeout
# By default, keep-alive is enabled in Session's request
# headers. Backends may choose to close the connection
# while a post happens which causes an unhandled
# exception. This try/except will retry the post on such exceptions
try:
resp = self._session.post(
url=self._endpoint,
data=data,
verify=self._certificate_file,
timeout=timeout_sec,
cert=self._client_cert,
)
except ConnectionError:
resp = self._session.post(
url=self._endpoint,
data=data,
verify=self._certificate_file,
timeout=timeout_sec,
cert=self._client_cert,
)
return resp
[docs]
def export(
self,
metrics_data: MetricsData,
timeout_millis: Optional[float] = 10000,
**kwargs,
) -> MetricExportResult:
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring batch")
return MetricExportResult.FAILURE
serialized_data = encode_metrics(metrics_data).SerializeToString()
deadline_sec = time() + self._timeout
for retry_num in range(_MAX_RETRYS):
resp = self._export(serialized_data, deadline_sec - time())
if resp.ok:
return MetricExportResult.SUCCESS
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
if (
not _is_retryable(resp)
or retry_num + 1 == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
or self._shutdown
):
_logger.error(
"Failed to export metrics batch code: %s, reason: %s",
resp.status_code,
resp.text,
)
return MetricExportResult.FAILURE
_logger.warning(
"Transient error %s encountered while exporting metrics batch, retrying in %.2fs.",
resp.reason,
backoff_seconds,
)
shutdown = self._shutdown_in_progress.wait(backoff_seconds)
if shutdown:
_logger.warning("Shutdown in progress, aborting retry.")
break
return MetricExportResult.FAILURE
[docs]
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring call")
return
self._shutdown = True
self._shutdown_in_progress.set()
self._session.close()
@property
def _exporting(self) -> str:
return "metrics"
[docs]
def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True
[docs]
@deprecated(
"Use one of the encoders from opentelemetry-exporter-otlp-proto-common instead. Deprecated since version 1.18.0.",
)
def get_resource_data(
sdk_resource_scope_data: Dict[SDKResource, Any], # ResourceDataT?
resource_class: Callable[..., PB2Resource],
name: str,
) -> List[PB2Resource]:
return _get_resource_data(sdk_resource_scope_data, resource_class, name)
def _compression_from_env() -> Compression:
compression = (
environ.get(
OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"),
)
.lower()
.strip()
)
return Compression(compression)
def _append_metrics_path(endpoint: str) -> str:
if endpoint.endswith("/"):
return endpoint + DEFAULT_METRICS_EXPORT_PATH
return endpoint + f"/{DEFAULT_METRICS_EXPORT_PATH}"