diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6eababf2..c0cf8703 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,5 @@
+* Added OpenTelemetry tracing support via `ydb.opentelemetry.enable_tracing()`
+
## 3.28.4 ##
* Fix iam module lazy loading
@@ -66,7 +68,7 @@
* Make DeadlineExceeded not retriable
## 3.23.4 ##
-* Allow rollback after TLI
+* Allow rollback after TLI
## 3.23.3 ##
* Make attach session error readable
diff --git a/docs/index.rst b/docs/index.rst
index 3e53104e..cbe2c5dd 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -26,6 +26,12 @@ Python client for `YDB `_ — a fault-tolerant distributed SQ
coordination
scheme
+.. toctree::
+ :hidden:
+ :caption: Observability
+
+ opentelemetry
+
.. toctree::
:hidden:
:caption: Reference
@@ -103,6 +109,15 @@ use the ``@ydb_retry`` decorator. Skipping this section is a common source of pr
incidents.
+Observability
+-------------
+
+The :doc:`opentelemetry` page explains how to add distributed tracing to your
+application using OpenTelemetry. One call to ``enable_tracing()`` instruments
+query sessions, transactions, and connection pool operations — so you can
+visualize request flow in Jaeger, Grafana, or any OpenTelemetry-compatible backend.
+
+
API Reference
-------------
diff --git a/docs/opentelemetry.rst b/docs/opentelemetry.rst
new file mode 100644
index 00000000..c4eb810e
--- /dev/null
+++ b/docs/opentelemetry.rst
@@ -0,0 +1,253 @@
+OpenTelemetry Tracing
+=====================
+
+The SDK provides built-in distributed tracing via `OpenTelemetry `_.
+When enabled, key YDB operations — such as session creation, query execution, transaction
+commit/rollback, and driver initialization — produce OpenTelemetry spans. Trace
+context is automatically propagated to the YDB server through gRPC metadata using the
+`W3C Trace Context `_ standard.
+
+Tracing is **zero-cost when disabled**: the SDK uses no-op stubs by default, so there is
+no overhead unless you explicitly opt in.
+
+
+Installation
+------------
+
+OpenTelemetry packages are not included by default. Install the SDK with the
+``opentelemetry`` extra:
+
+.. code-block:: sh
+
+ pip install ydb[opentelemetry]
+
+This pulls in ``opentelemetry-api``. You will also need ``opentelemetry-sdk`` and an
+exporter for your tracing backend, for example:
+
+.. code-block:: sh
+
+ # OTLP/gRPC exporter (works with Jaeger, Tempo, and others)
+ pip install opentelemetry-exporter-otlp-proto-grpc
+
+
+Enabling Tracing
+----------------
+
+Call ``enable_tracing()`` once, **after** configuring your OpenTelemetry tracer provider
+and **before** creating a ``Driver``:
+
+.. code-block:: python
+
+ from opentelemetry import trace
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import BatchSpanProcessor
+ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
+ from opentelemetry.sdk.resources import Resource
+
+ import ydb
+ from ydb.opentelemetry import enable_tracing
+
+ # 1. Set up OpenTelemetry
+ resource = Resource(attributes={"service.name": "my-service"})
+ provider = TracerProvider(resource=resource)
+ provider.add_span_processor(
+ BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
+ )
+ trace.set_tracer_provider(provider)
+
+ # 2. Enable YDB tracing
+ enable_tracing()
+
+ # 3. Use the SDK as usual — spans are created automatically
+ with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
+ driver.wait(timeout=5)
+ with ydb.QuerySessionPool(driver) as pool:
+ pool.execute_with_retries("SELECT 1")
+
+ provider.shutdown()
+
+``enable_tracing()`` accepts an optional ``tracer`` argument. If omitted, the SDK
+obtains a tracer named ``"ydb.sdk"`` from the global tracer provider.
+
+Repeated calls to ``enable_tracing()`` do nothing until you call ``disable_tracing()``,
+which removes hooks so you can reconfigure or turn instrumentation off.
+
+
+What Is Instrumented
+--------------------
+
+The following operations produce spans:
+
+.. list-table::
+ :header-rows: 1
+ :widths: 35 20 45
+
+ * - Span Name
+ - Kind
+ - Description
+ * - ``ydb.Driver.Initialize``
+ - INTERNAL
+ - Driver wait / endpoint discovery.
+ * - ``ydb.CreateSession``
+ - CLIENT
+ - Creating a new query session.
+ * - ``ydb.ExecuteQuery``
+ - CLIENT
+ - Executing a query (including ``execute_with_retries``).
+ * - ``ydb.BeginTransaction``
+ - CLIENT
+ - Explicitly beginning a transaction via ``.begin()``.
+ * - ``ydb.Commit``
+ - CLIENT
+ - Committing an explicit transaction.
+ * - ``ydb.Rollback``
+ - CLIENT
+ - Rolling back a transaction.
+ * - ``ydb.RunWithRetry``
+ - INTERNAL
+ - Umbrella span wrapping the whole retryable block (``retry_operation_*`` / ``retry_tx_*`` / ``execute_with_retries``).
+ * - ``ydb.Try``
+ - INTERNAL
+ - A single retry attempt. From the **second** attempt onward carries
+ ``ydb.retry.backoff_ms`` — how long the retrier slept before starting this
+ attempt (``0`` on the skip-yield retry path: ``Aborted`` / ``BadSession`` /
+ ``NotFound`` / ``InternalError``, where the protocol prescribes immediate
+ retry without backoff). The very first ``ydb.Try`` omits the attribute
+ entirely because nothing preceded it.
+
+All spans are nested under the currently active span, so wrapping your application
+logic in a parent span produces a complete trace tree:
+
+.. code-block:: python
+
+ tracer = trace.get_tracer(__name__)
+
+ with tracer.start_as_current_span("handle-request"):
+ pool.execute_with_retries("SELECT 1")
+ # ↳ ydb.CreateSession (if a new session is needed)
+ # ↳ ydb.ExecuteQuery
+
+
+Span Attributes
+---------------
+
+Every YDB RPC (CLIENT-kind) span carries these semantic attributes:
+
+.. list-table::
+ :header-rows: 1
+ :widths: 30 70
+
+ * - Attribute
+ - Description
+ * - ``db.system.name``
+ - Always ``"ydb"``.
+ * - ``db.namespace``
+ - Database path (e.g. ``"/local"``).
+ * - ``server.address``
+ - Host from the connection string.
+ * - ``server.port``
+ - Port from the connection string.
+ * - ``network.peer.address``
+ - Actual node host from the discovery endpoint map (set once the session is attached to a node).
+ * - ``network.peer.port``
+ - Actual node port from the discovery endpoint map.
+ * - ``ydb.node.dc``
+ - Data-center / location reported by discovery for the node (e.g. ``"vla"``, ``"sas"``).
+
+Additional attributes are set when available:
+
+.. list-table::
+ :header-rows: 1
+ :widths: 30 70
+
+ * - Attribute
+ - Description
+ * - ``ydb.node.id``
+ - YDB node that handled the request.
+
+On errors, the span also records:
+
+- ``error.type`` — ``"ydb_error"``, ``"transport_error"``, or the Python exception class name.
+- ``db.response.status_code`` — the YDB status code name (e.g. ``"SCHEME_ERROR"``).
+
+
+Trace Context Propagation
+-------------------------
+
+When tracing is enabled, the SDK automatically injects trace context headers into
+every gRPC call to YDB using the globally configured OpenTelemetry propagator
+(``opentelemetry.propagate.inject``). By default, OpenTelemetry uses the
+`W3C Trace Context `_ propagator, which adds
+``traceparent`` and ``tracestate`` headers.
+
+YDB server expects W3C Trace Context headers, so the default propagator configuration
+works out of the box. This allows the server to correlate client spans with
+server-side processing, enabling end-to-end trace visibility across the entire
+request path.
+
+
+Async Usage
+-----------
+
+Tracing works identically with the async driver. Call ``enable_tracing()`` once at
+startup:
+
+.. code-block:: python
+
+ import asyncio
+ import ydb
+ from ydb.opentelemetry import enable_tracing
+
+ enable_tracing()
+
+ async def main():
+ async with ydb.aio.Driver(
+ endpoint="grpc://localhost:2136",
+ database="/local",
+ ) as driver:
+ await driver.wait(timeout=5)
+ async with ydb.aio.QuerySessionPool(driver) as pool:
+ await pool.execute_with_retries("SELECT 1")
+
+ asyncio.run(main())
+
+
+
+Using a Custom Tracer
+---------------------
+
+To use a specific tracer instead of the global one:
+
+.. code-block:: python
+
+ from opentelemetry import trace
+
+ my_tracer = trace.get_tracer("my.custom.tracer")
+ enable_tracing(tracer=my_tracer)
+
+
+Running the Examples
+--------------------
+
+The runnable script is ``examples/opentelemetry/otel_example.py`` (bank table + concurrent
+Serializable transactions and ``app_startup`` / ``example_tli`` application spans). **Start
+Docker (YDB or the full stack) first**, then install and run on the host — see
+``examples/opentelemetry/README.md`` for the full order of commands and environment variables.
+
+**Full stack in one command** (YDB + OTLP + Tempo + Grafana; the ``otel-example`` service is built from ``examples/opentelemetry/Dockerfile`` and runs the script once):
+
+.. code-block:: sh
+
+ docker compose -f examples/opentelemetry/compose-e2e.yaml up --build
+
+The first run builds the ``otel-example`` image from the local SDK source; subsequent runs reuse the cached image. Pass ``--build`` again if you change the SDK or the demo script.
+
+**Typical local run** (YDB in Docker, script on the host — Compose **before** ``pip`` / ``python``):
+
+.. code-block:: sh
+
+ docker compose up -d
+ pip install -e '.[opentelemetry]' -r examples/opentelemetry/requirements.txt
+ python examples/opentelemetry/otel_example.py
+
+Open `http://localhost:3000 `_ (Grafana) to explore traces via Tempo.
diff --git a/examples/opentelemetry/Dockerfile b/examples/opentelemetry/Dockerfile
new file mode 100644
index 00000000..326721a1
--- /dev/null
+++ b/examples/opentelemetry/Dockerfile
@@ -0,0 +1,21 @@
+# Isolated image for the OpenTelemetry demo. Build context is the repository root.
+#
+# docker compose -f examples/opentelemetry/compose-e2e.yaml build otel-example
+#
+# A separate ``.dockerignore`` at the repo root keeps the context small.
+
+FROM python:3.11-slim
+
+WORKDIR /app
+
+# Dependency layer: copy only what setup.py needs so changes to the demo script do
+# not bust the cached pip install.
+COPY setup.py pyproject.toml README.md requirements.txt ./
+COPY ydb ./ydb
+COPY examples/opentelemetry/requirements.txt ./examples/opentelemetry/requirements.txt
+RUN pip install --no-cache-dir -e '.[opentelemetry]' -r examples/opentelemetry/requirements.txt
+
+# Demo script.
+COPY examples/opentelemetry/otel_example.py ./examples/opentelemetry/otel_example.py
+
+CMD ["python", "examples/opentelemetry/otel_example.py"]
diff --git a/examples/opentelemetry/README.md b/examples/opentelemetry/README.md
new file mode 100644
index 00000000..1af90f6d
--- /dev/null
+++ b/examples/opentelemetry/README.md
@@ -0,0 +1,74 @@
+# OpenTelemetry example (YDB Python SDK)
+
+Async demo in [`otel_example.py`](otel_example.py): OTLP export, `enable_tracing()`,
+`app_startup` and `example_tli` application spans, bank table, Serializable transactions (TLI-style load).
+
+Most steps assume the **repository root** as the current directory; the install step also shows the variant from this folder.
+
+## 1. Start YDB (or the full stack) with Docker **first**
+
+Without running containers, the example has nothing to connect to.
+
+**Only YDB** (minimal `docker-compose.yml` in the repo root — enough for the script on the host):
+
+```sh
+cd /path/to/ydb-python-sdk
+docker compose up -d
+# wait until the ydb container is healthy / port 2136 is open, then continue
+```
+
+**Full stack** (YDB + OTLP collector + Tempo + Grafana; the `otel-example` service is built from a `Dockerfile` and runs the script once inside Compose). The compose file is `compose-e2e.yaml` next to this README.
+
+```sh
+cd /path/to/ydb-python-sdk
+docker compose -f examples/opentelemetry/compose-e2e.yaml up --build
+```
+
+From this folder the build context is still resolved correctly (it is `../..` relative to the compose file):
+
+```sh
+cd /path/to/ydb-python-sdk/examples/opentelemetry
+docker compose -f compose-e2e.yaml up --build
+```
+
+The first run builds the `otel-example` image from the local SDK source (`Dockerfile` in this folder, `.dockerignore` at the repo root keeps the context small). Subsequent runs reuse the cached image; pass `--build` if you change the SDK or the demo script.
+
+Grafana: http://localhost:3000
+
+**Logs for `otel-example`:** the container name is prefixed (e.g. `opentelemetry-otel-example-1`); use `docker compose -f examples/opentelemetry/compose-e2e.yaml ps` or `docker ps -a` to find it. The service is one-shot (`restart: "no"`) — it may already have exited.
+
+## 2. Install dependencies (on the host, for a local `python` run)
+
+**From the repository root** (editable SDK + pins from this example):
+
+```sh
+python3 -m venv .venv
+source .venv/bin/activate # Windows: .venv\Scripts\activate
+pip install -e '.[opentelemetry]' -r examples/opentelemetry/requirements.txt
+```
+
+**If your shell is already in** `examples/opentelemetry/` (same result):
+
+```sh
+pip install -e '../..[opentelemetry]' -r requirements.txt
+```
+
+`requirements.txt` includes a merge of the repository’s core `requirements.txt` (grpc, ``packaging``, …) plus the OpenTelemetry lines. The `-e` install is only needed to register the package; otherwise this example prepends the repo to ``sys.path``.
+
+**Without** `pip -e` (``ydb`` from the checkout via `sys.path`): from this directory run `pip install -r requirements.txt`, then ``python otel_example.py``.
+
+## 3. Run the example (after YDB from step 1 is up)
+
+```sh
+python examples/opentelemetry/otel_example.py
+```
+
+Defaults: YDB `grpc://localhost:2136`, OTLP `http://localhost:4317` (for a local collector, if you use one).
+
+## Environment (Docker / overrides)
+
+| Variable | Meaning |
+|----------|---------|
+| `YDB_ENDPOINT` | e.g. `grpc://ydb:2136` inside the Compose network |
+| `YDB_DATABASE` | default `/local` |
+| `OTEL_EXPORTER_OTLP_ENDPOINT` | e.g. `http://otel-collector:4317` |
diff --git a/examples/opentelemetry/compose-e2e.yaml b/examples/opentelemetry/compose-e2e.yaml
new file mode 100644
index 00000000..f8402d50
--- /dev/null
+++ b/examples/opentelemetry/compose-e2e.yaml
@@ -0,0 +1,91 @@
+# Full OpenTelemetry demo: YDB (server-side tracing config), collector, Tempo, Prometheus, Grafana,
+# and a one-shot container that runs otel_example.py once.
+#
+# Run from this directory (paths below are relative to this file):
+# cd examples/opentelemetry && docker compose -f compose-e2e.yaml up
+#
+# Or from the repository root:
+# docker compose -f examples/opentelemetry/compose-e2e.yaml up
+
+services:
+ ydb:
+ image: ydbplatform/local-ydb:trunk
+ restart: always
+ platform: linux/amd64
+ environment:
+ YDB_DEFAULT_LOG_LEVEL: NOTICE
+ GRPC_TLS_PORT: "2135"
+ GRPC_PORT: "2136"
+ MON_PORT: "8765"
+ YDB_USE_IN_MEMORY_PDISKS: "true"
+ command: ["--config-path", "/ydb_config/ydb-config-with-tracing.yaml"]
+ ports:
+ - "2135:2135"
+ - "2136:2136"
+ - "8765:8765"
+ volumes:
+ - ./ydb_config:/ydb_config:ro
+ healthcheck:
+ test: bash -c "exec 6<> /dev/tcp/localhost/2136"
+ interval: 5s
+ timeout: 2s
+ retries: 30
+ start_period: 30s
+
+ otel-collector:
+ image: otel/opentelemetry-collector-contrib:latest
+ command: ["--config=/etc/otelcol/config.yaml"]
+ volumes:
+ - ./otel-collector-config.yaml:/etc/otelcol/config.yaml:ro
+ ports:
+ - "4317:4317"
+ - "4318:4318"
+ - "9464:9464"
+ - "13133:13133"
+ - "13317:55679"
+
+ prometheus:
+ image: prom/prometheus:latest
+ volumes:
+ - ./prometheus.yaml:/etc/prometheus/prometheus.yml:ro
+ ports:
+ - "9090:9090"
+ depends_on: [otel-collector]
+
+ tempo:
+ image: grafana/tempo:2.4.1
+ command: ["-config.file=/etc/tempo.yaml"]
+ volumes:
+ - ./tempo.yaml:/etc/tempo.yaml:ro
+ ports:
+ - "3200:3200"
+ depends_on: [otel-collector]
+
+ grafana:
+ image: grafana/grafana:10.4.2
+ environment:
+ GF_AUTH_ANONYMOUS_ENABLED: "true"
+ GF_AUTH_ANONYMOUS_ORG_ROLE: "Admin"
+ volumes:
+ - ./grafana/provisioning:/etc/grafana/provisioning:ro
+ - ./grafana/dashboards:/var/lib/grafana/dashboards:ro
+ ports:
+ - "3000:3000"
+ depends_on: [prometheus, tempo]
+
+ otel-example:
+ # Built from the local SDK source (no volume mount, no relative-path tricks).
+ build:
+ context: ../..
+ dockerfile: examples/opentelemetry/Dockerfile
+ environment:
+ YDB_ENDPOINT: grpc://ydb:2136
+ YDB_DATABASE: /local
+ OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317
+ OTEL_SERVICE_NAME: ydb-otel-example
+ depends_on:
+ ydb:
+ condition: service_healthy
+ otel-collector:
+ condition: service_started
+ restart: "no"
diff --git a/examples/opentelemetry/grafana/dashboards/README.md b/examples/opentelemetry/grafana/dashboards/README.md
new file mode 100644
index 00000000..eb47493a
--- /dev/null
+++ b/examples/opentelemetry/grafana/dashboards/README.md
@@ -0,0 +1,5 @@
+This folder is intentionally left empty.
+
+Grafana is provisioned with Tempo + Prometheus datasources; use **Explore** to search traces.
+
+
diff --git a/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml b/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml
new file mode 100644
index 00000000..5ccefdc1
--- /dev/null
+++ b/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml
@@ -0,0 +1,13 @@
+apiVersion: 1
+
+providers:
+ - name: 'default'
+ orgId: 1
+ folder: ''
+ type: file
+ disableDeletion: true
+ editable: false
+ options:
+ path: /var/lib/grafana/dashboards
+
+
diff --git a/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml b/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml
new file mode 100644
index 00000000..05ba5bd9
--- /dev/null
+++ b/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml
@@ -0,0 +1,22 @@
+apiVersion: 1
+
+datasources:
+ - name: Prometheus
+ type: prometheus
+ access: proxy
+ url: http://prometheus:9090
+ isDefault: true
+ editable: false
+
+ - name: Tempo
+ type: tempo
+ access: proxy
+ url: http://tempo:3200
+ editable: false
+ jsonData:
+ tracesToMetrics:
+ datasourceUid: Prometheus
+ serviceMap:
+ datasourceUid: Prometheus
+
+
diff --git a/examples/opentelemetry/otel-collector-config.yaml b/examples/opentelemetry/otel-collector-config.yaml
new file mode 100644
index 00000000..7f784445
--- /dev/null
+++ b/examples/opentelemetry/otel-collector-config.yaml
@@ -0,0 +1,44 @@
+receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:4317
+ http:
+ endpoint: 0.0.0.0:4318
+
+processors:
+ batch: { }
+
+exporters:
+ prometheus:
+ endpoint: 0.0.0.0:9464
+ resource_to_telemetry_conversion:
+ enabled: true
+
+ otlp/tempo:
+ endpoint: tempo:4317
+ tls:
+ insecure: true
+
+ debug:
+ verbosity: detailed
+
+extensions:
+ health_check:
+ endpoint: 0.0.0.0:13133
+
+ zpages:
+ endpoint: 0.0.0.0:55679
+
+service:
+ extensions: [ health_check, zpages ]
+ pipelines:
+ metrics:
+ receivers: [ otlp ]
+ processors: [ batch ]
+ exporters: [ prometheus ]
+
+ traces:
+ receivers: [ otlp ]
+ processors: [ batch ]
+ exporters: [ otlp/tempo, debug ]
diff --git a/examples/opentelemetry/otel_example.py b/examples/opentelemetry/otel_example.py
new file mode 100644
index 00000000..6ec0c5a8
--- /dev/null
+++ b/examples/opentelemetry/otel_example.py
@@ -0,0 +1,92 @@
+"""OpenTelemetry + YDB demo: bank table and concurrent transactions (TLI-style workload).
+
+Uses ``disable_discovery=True`` so a single ``grpc://…`` to local YDB (e.g. ``local-ydb`` in Docker
+from the host) is not replaced by internal discovery addresses.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import ydb
+from ydb.opentelemetry import enable_tracing
+from opentelemetry import trace
+from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor
+
+
+def _env(name: str, default: str) -> str:
+ v = os.environ.get(name)
+ return v if v is not None and v != "" else default
+
+
+async def _first_amount(tx) -> int:
+ async with await tx.execute("SELECT amount FROM bank WHERE id = 1") as results:
+ async for rs in results:
+ for row in rs.rows:
+ return int(row["amount"])
+ raise RuntimeError("no row for id=1")
+
+
+async def _bank_read_update(tx) -> None:
+ count = await _first_amount(tx)
+ async with await tx.execute(
+ "UPDATE bank SET amount = $amt + 1 WHERE id = 1",
+ {"$amt": (count, ydb.PrimitiveType.Int32)},
+ ):
+ pass
+
+
+async def main() -> None:
+ endpoint = _env("YDB_ENDPOINT", "grpc://localhost:2136")
+ database = _env("YDB_DATABASE", "/local")
+ otlp_endpoint = _env("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
+
+ resource = Resource(attributes={"service.name": _env("OTEL_SERVICE_NAME", "ydb-otel-example")})
+ provider = TracerProvider(resource=resource)
+ provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint)))
+ trace.set_tracer_provider(provider)
+
+ tracer = trace.get_tracer(__name__)
+ enable_tracing(tracer)
+
+ async with ydb.aio.Driver(
+ endpoint=endpoint,
+ database=database,
+ disable_discovery=True,
+ ) as driver:
+ await driver.wait(timeout=60)
+
+ async with ydb.aio.QuerySessionPool(driver) as pool:
+ with tracer.start_as_current_span("app_startup") as startup:
+ startup.set_attribute("app.message", "hello")
+
+ await pool.execute_with_retries("DROP TABLE IF EXISTS bank")
+ await pool.execute_with_retries("CREATE TABLE bank (id Int32, amount Int32, PRIMARY KEY (id))")
+
+ print("Insert row...")
+ await pool.execute_with_retries("INSERT INTO bank (id, amount) VALUES (1, 0)")
+
+ print("Preparing queries...")
+ await pool.retry_tx_async(_bank_read_update)
+
+ print("Emulation TLI...")
+
+ async def concurrent_task(task_num: int) -> None:
+ with tracer.start_as_current_span("example_tli") as act:
+ act.set_attribute("app.message", f"concurrent task {task_num}")
+ await pool.retry_tx_async(_bank_read_update)
+
+ await asyncio.gather(*(concurrent_task(i) for i in range(10)))
+
+ final_rows = await pool.execute_with_retries("SELECT amount FROM bank WHERE id = 1")
+ amount = int(list(final_rows[0].rows)[0]["amount"])
+ print(f"Final amount (after serializable retries): {amount}")
+
+ provider.shutdown()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/opentelemetry/prometheus.yaml b/examples/opentelemetry/prometheus.yaml
new file mode 100644
index 00000000..64b31821
--- /dev/null
+++ b/examples/opentelemetry/prometheus.yaml
@@ -0,0 +1,7 @@
+global:
+ scrape_interval: 5s
+
+scrape_configs:
+ - job_name: otel-collector
+ static_configs:
+ - targets: ["otel-collector:9464"]
diff --git a/examples/opentelemetry/requirements.txt b/examples/opentelemetry/requirements.txt
new file mode 100644
index 00000000..fc6d399a
--- /dev/null
+++ b/examples/opentelemetry/requirements.txt
@@ -0,0 +1,10 @@
+# Core ydb import/runtime (grpc, ``packaging``, etc.) — same as repository root ``requirements.txt``.
+-r ../../requirements.txt
+# Extras and OTLP (``ydb[opentelemetry]`` only brings ``opentelemetry-api``)
+# With editable install:
+# (repository root) pip install -e '.[opentelemetry]' -r examples/opentelemetry/requirements.txt
+# (this directory) pip install -e '../..[opentelemetry]' -r requirements.txt
+# Without ``pip -e``, set ``sys.path`` is enough for ``ydb`` from the checkout, but you still need these lines:
+# pip install -r requirements.txt
+opentelemetry-sdk>=1.0.0
+opentelemetry-exporter-otlp-proto-grpc>=1.0.0
diff --git a/examples/opentelemetry/tempo.yaml b/examples/opentelemetry/tempo.yaml
new file mode 100644
index 00000000..43dbb19c
--- /dev/null
+++ b/examples/opentelemetry/tempo.yaml
@@ -0,0 +1,15 @@
+server:
+ http_listen_port: 3200
+
+distributor:
+ receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:4317
+
+storage:
+ trace:
+ backend: local
+ local:
+ path: /tmp/tempo
diff --git a/examples/opentelemetry/ydb_config/README.md b/examples/opentelemetry/ydb_config/README.md
new file mode 100644
index 00000000..6e36fae1
--- /dev/null
+++ b/examples/opentelemetry/ydb_config/README.md
@@ -0,0 +1,28 @@
+# YDB server-side tracing (OpenTelemetry)
+
+This folder is used to keep a **custom YDB config** that enables server-side OpenTelemetry tracing.
+
+## 1) Export the default config from a running container
+
+If YDB is running as `ydb-local`:
+
+```bash
+docker cp ydb-local:/ydb_data/cluster/kikimr_configs/config.yaml ./ydb_config/ydb-config.yaml
+```
+
+## 2) Enable OpenTelemetry exporter in the config
+
+Edit `ydb-config.yaml` and add the contents of `otel-tracing-snippet.yaml` (usually as a top-level section).
+
+Default OTLP endpoint (inside docker-compose network): `grpc://otel-collector:4317`
+Default service name (so you can find it in Tempo/Grafana): `ydb`
+
+## 3) Run with the overridden config
+
+Restart YDB (the main `compose-e2e.yaml` will automatically use `--config-path` if `ydb-config.yaml` exists):
+
+```bash
+docker compose -f compose-e2e.yaml up -d --force-recreate ydb
+```
+
+Now you should see additional server-side traces in Tempo/Grafana (service name defaults to `ydb-local` in the snippet).
diff --git a/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml b/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml
new file mode 100644
index 00000000..bd5978d2
--- /dev/null
+++ b/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml
@@ -0,0 +1,26 @@
+tracing_config:
+ backend:
+ opentelemetry:
+ collector_url: grpc://otel-collector:4317
+ service_name: ydb
+ external_throttling:
+ - scope:
+ database: /local
+ max_traces_per_minute: 60
+ max_traces_burst: 3
+ # Highest tracing detail for *sampled* traces (YDB-generated trace-id).
+ # Note: requests with an external `traceparent` are traced at level 13 (Detailed) per YDB docs.
+ sampling:
+ - scope:
+ database: /local
+ fraction: 1
+ level: 15
+ max_traces_per_minute: 1000
+ max_traces_burst: 100
+ uploader:
+ max_exported_spans_per_second: 30
+ max_spans_in_batch: 100
+ max_bytes_in_batch: 10485760 # 10 MiB
+ max_export_requests_inflight: 3
+ max_batch_accumulation_milliseconds: 5000
+ span_export_timeout_seconds: 120
diff --git a/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml b/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml
new file mode 100644
index 00000000..ef93d0e6
--- /dev/null
+++ b/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml
@@ -0,0 +1,349 @@
+actor_system_config:
+ batch_executor: 2
+ executor:
+ - name: System
+ spin_threshold: 0
+ threads: 2
+ type: BASIC
+ - name: User
+ spin_threshold: 0
+ threads: 3
+ type: BASIC
+ - name: Batch
+ spin_threshold: 0
+ threads: 2
+ type: BASIC
+ - name: IO
+ threads: 1
+ time_per_mailbox_micro_secs: 100
+ type: IO
+ - name: IC
+ spin_threshold: 10
+ threads: 1
+ time_per_mailbox_micro_secs: 100
+ type: BASIC
+ io_executor: 3
+ scheduler:
+ progress_threshold: 10000
+ resolution: 1024
+ spin_threshold: 0
+ service_executor:
+ - executor_id: 4
+ service_name: Interconnect
+ sys_executor: 0
+ user_executor: 1
+blob_storage_config:
+ service_set:
+ availability_domains: 1
+ groups:
+ - erasure_species: 0
+ group_generation: 1
+ group_id: 0
+ rings:
+ - fail_domains:
+ - vdisk_locations:
+ - node_id: 1
+ pdisk_guid: 1
+ pdisk_id: 1
+ vdisk_slot_id: 0
+ pdisks:
+ - node_id: 1
+ path: SectorMap:1:64
+ pdisk_category: 0
+ pdisk_guid: 1
+ pdisk_id: 1
+ vdisks:
+ - vdisk_id:
+ domain: 0
+ group_generation: 1
+ group_id: 0
+ ring: 0
+ vdisk: 0
+ vdisk_location:
+ node_id: 1
+ pdisk_guid: 1
+ pdisk_id: 1
+ vdisk_slot_id: 0
+channel_profile_config:
+ profile:
+ - channel:
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ profile_id: 0
+ - channel:
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ profile_id: 1
+domains_config:
+ domain:
+ - domain_id: 1
+ name: local
+ storage_pool_types:
+ - kind: hdd
+ pool_config:
+ box_id: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ - kind: hdd1
+ pool_config:
+ box_id: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ - kind: hdd2
+ pool_config:
+ box_id: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ - kind: hdde
+ pool_config:
+ box_id: 1
+ encryption_mode: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ security_config:
+ default_users:
+ - name: root
+ password: '1234'
+ state_storage:
+ - ring:
+ nto_select: 1
+ ring:
+ - node:
+ - 1
+ use_ring_specific_node_selection: true
+ ssid: 1
+feature_flags:
+ enable_drain_on_shutdown: false
+ enable_mvcc_snapshot_reads: true
+ enable_persistent_query_stats: true
+ enable_public_api_external_blobs: false
+ enable_scheme_transactions_at_scheme_shard: true
+federated_query_config:
+ audit:
+ enabled: false
+ uaconfig:
+ uri: ''
+ checkpoint_coordinator:
+ checkpointing_period_millis: 1000
+ enabled: true
+ max_inflight: 1
+ storage:
+ endpoint: ''
+ common:
+ ids_prefix: pt
+ use_bearer_for_ydb: true
+ control_plane_proxy:
+ enabled: true
+ request_timeout: 30s
+ control_plane_storage:
+ available_binding:
+ - DATA_STREAMS
+ - OBJECT_STORAGE
+ available_connection:
+ - YDB_DATABASE
+ - CLICKHOUSE_CLUSTER
+ - DATA_STREAMS
+ - OBJECT_STORAGE
+ - MONITORING
+ enabled: true
+ storage:
+ endpoint: ''
+ db_pool:
+ enabled: true
+ storage:
+ endpoint: ''
+ enabled: false
+ gateways:
+ dq:
+ default_settings: []
+ enabled: true
+ pq:
+ cluster_mapping: []
+ solomon:
+ cluster_mapping: []
+ nodes_manager:
+ enabled: true
+ pending_fetcher:
+ enabled: true
+ pinger:
+ ping_period: 30s
+ private_api:
+ enabled: true
+ private_proxy:
+ enabled: true
+ resource_manager:
+ enabled: true
+ token_accessor:
+ enabled: true
+grpc_config:
+ ca: /ydb_certs/ca.pem
+ cert: /ydb_certs/cert.pem
+ host: '[::]'
+ key: /ydb_certs/key.pem
+ services:
+ - nbs
+ - legacy
+ - tablet_service
+ - yql
+ - discovery
+ - cms
+ - locking
+ - kesus
+ - pq
+ - pqcd
+ - pqv1
+ - topic
+ - datastreams
+ - scripting
+ - clickhouse_internal
+ - rate_limiter
+ - analytics
+ - export
+ - import
+ - yq
+ - keyvalue
+ - monitoring
+ - auth
+ - query_service
+ - view
+interconnect_config:
+ start_tcp: true
+kafka_proxy_config:
+ enable_kafka_proxy: true
+ listening_port: 9092
+kqpconfig:
+ settings:
+ - name: _ResultRowsLimit
+ value: '1000'
+log_config:
+ default_level: 5
+ entry: []
+ sys_log: false
+nameservice_config:
+ node:
+ - address: ::1
+ host: localhost
+ node_id: 1
+ port: 19001
+ walle_location:
+ body: 1
+ data_center: '1'
+ rack: '1'
+net_classifier_config:
+ cms_config_timeout_seconds: 30
+ net_data_file_path: /ydb_data/netData.tsv
+ updater_config:
+ net_data_update_interval_seconds: 60
+ retry_interval_seconds: 30
+pqcluster_discovery_config:
+ enabled: false
+pqconfig:
+ check_acl: false
+ cluster_table_path: ''
+ clusters_update_timeout_sec: 1
+ enable_proto_source_id_info: true
+ enabled: true
+ max_storage_node_port: 65535
+ meta_cache_timeout_sec: 1
+ quoting_config:
+ enable_quoting: false
+ require_credentials_in_new_protocol: false
+ root: ''
+ topics_are_first_class_citizen: true
+ version_table_path: ''
+sqs_config:
+ enable_dead_letter_queues: true
+ enable_sqs: false
+ force_queue_creation_v2: true
+ force_queue_deletion_v2: true
+ scheme_cache_hard_refresh_time_seconds: 0
+ scheme_cache_soft_refresh_time_seconds: 0
+static_erasure: none
+system_tablets:
+ default_node:
+ - 1
+ flat_schemeshard:
+ - info:
+ tablet_id: 72057594046678944
+ flat_tx_coordinator:
+ - node:
+ - 1
+ tx_allocator:
+ - node:
+ - 1
+ tx_mediator:
+ - node:
+ - 1
+table_service_config:
+ resource_manager:
+ channel_buffer_size: 262144
+ mkql_heavy_program_memory_limit: 1048576
+ mkql_light_program_memory_limit: 65536
+ verbose_memory_limit_exception: true
+ sql_version: 1
+tracing_config:
+ backend:
+ opentelemetry:
+ collector_url: grpc://otel-collector:4317
+ service_name: ydb
+ external_throttling:
+ - scope:
+ database: /local
+ max_traces_per_minute: 1000
+ max_traces_burst: 100
+ sampling:
+ - scope:
+ database: /local
+ fraction: 1.0
+ level: 15
+ max_traces_per_minute: 1000
+# max_traces_burst: 100
+ uploader:
+ max_exported_spans_per_second: 30
+ max_spans_in_batch: 100
+ max_bytes_in_batch: 10485760 # 10 MiB
+ max_export_requests_inflight: 3
+ max_batch_accumulation_milliseconds: 5000
+ span_export_timeout_seconds: 120
diff --git a/pyproject.toml b/pyproject.toml
index 41e7ef6f..0b08f0b2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -41,6 +41,7 @@ module = [
"requests.*",
"ydb.public.api.*",
"contrib.ydb.public.api.*",
+ "opentelemetry.*",
]
ignore_missing_imports = true
diff --git a/setup.py b/setup.py
index 0eba99d5..7cadc459 100644
--- a/setup.py
+++ b/setup.py
@@ -37,5 +37,6 @@
options={"bdist_wheel": {"universal": True}},
extras_require={
"yc": ["yandexcloud", ],
+ "opentelemetry": ["opentelemetry-api>=1.0.0"],
}
)
diff --git a/test-requirements.txt b/test-requirements.txt
index a2260081..0f8d784b 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -43,6 +43,7 @@ sqlalchemy==1.4.26
pylint-protobuf
cython
freezegun>=1.3.0
+opentelemetry-sdk>=1.0.0
# pytest-cov
yandexcloud
-e .
diff --git a/tests/tracing/__init__.py b/tests/tracing/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/tracing/conftest.py b/tests/tracing/conftest.py
new file mode 100644
index 00000000..26c39cef
--- /dev/null
+++ b/tests/tracing/conftest.py
@@ -0,0 +1,45 @@
+"""Shared fixtures for OpenTelemetry tracing tests.
+
+Sets up an in-memory TracerProvider so that spans created by the SDK
+can be collected and inspected without any external backend.
+"""
+
+import pytest
+
+from opentelemetry import trace
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
+
+_provider = TracerProvider()
+_exporter = InMemorySpanExporter()
+_provider.add_span_processor(SimpleSpanProcessor(_exporter))
+trace.set_tracer_provider(_provider)
+
+
+@pytest.fixture()
+def otel_setup():
+ """Enable SDK tracing, yield the exporter, then restore noop defaults.
+
+ Each test gets a clean exporter (cleared before and after).
+ """
+ _exporter.clear()
+
+ from ydb.opentelemetry import disable_tracing, enable_tracing
+
+ disable_tracing()
+ enable_tracing()
+
+ yield _exporter
+
+ # Restore noop state
+ disable_tracing()
+ _exporter.clear()
+
+
+class FakeDriverConfig:
+ def __init__(self, endpoint="test_endpoint:1337", database="/test_database"):
+ self.endpoint = endpoint
+ self.database = database
+ self.query_client_settings = None
+ self.tracer = None
diff --git a/tests/tracing/test_tracing_async.py b/tests/tracing/test_tracing_async.py
new file mode 100644
index 00000000..6b4e96ad
--- /dev/null
+++ b/tests/tracing/test_tracing_async.py
@@ -0,0 +1,527 @@
+"""Unit tests for OpenTelemetry tracing — asynchronous SDK operations.
+
+Mirrors the sync tests but exercises the async code paths in ydb.aio.query.
+"""
+
+from opentelemetry.trace import StatusCode, SpanKind
+from ydb.opentelemetry.tracing import SpanName
+from ydb.query.transaction import QueryTxStateEnum
+from .conftest import FakeDriverConfig
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import asyncio
+import pytest
+
+
+async def _empty_async_iter():
+ return
+ yield # noqa: makes this an async generator
+
+
+def _get_spans(exporter, name=None):
+ spans = exporter.get_finished_spans()
+ if name is not None:
+ spans = [s for s in spans if s.name == name]
+ return spans
+
+
+def _get_single_span(exporter, name):
+ spans = _get_spans(exporter, name)
+ assert (
+ len(spans) == 1
+ ), f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}"
+ return spans[0]
+
+
+def _make_async_session_mock(driver_config=None, peer=None):
+ """Create a mock that behaves like an async QuerySession after create()."""
+ cfg = driver_config or FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+
+ session = MagicMock()
+ session._driver = driver
+ session._session_id = "test-session-id"
+ session._node_id = 12345
+ session._peer = peer
+ session.session_id = "test-session-id"
+ session.node_id = 12345
+ return session, driver
+
+
+def _make_async_tx(session, driver):
+ """Create a real async QueryTxContext wired to mocked session/driver."""
+ from ydb._grpc.grpcwrapper.ydb_query_public_types import QuerySerializableReadWrite
+ from ydb.aio.query.transaction import QueryTxContext
+
+ tx = QueryTxContext(driver, session, QuerySerializableReadWrite())
+ tx._tx_state._change_state(QueryTxStateEnum.BEGINED)
+ tx._tx_state.tx_id = "test-tx-id"
+ return tx
+
+
+def _make_fresh_async_tx(session, driver):
+ """Create a real async QueryTxContext in NOT_INITIALIZED state (for begin())."""
+ from ydb._grpc.grpcwrapper.ydb_query_public_types import QuerySerializableReadWrite
+ from ydb.aio.query.transaction import QueryTxContext
+
+ return QueryTxContext(driver, session, QuerySerializableReadWrite())
+
+
+class TestAsyncCreateSessionSpan:
+ @pytest.mark.asyncio
+ async def test_create_session_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.aio.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = None
+ qs._closed = False
+
+ with patch.object(QuerySession, "_create_call", new_callable=AsyncMock):
+ with patch.object(QuerySession, "_attach", new_callable=AsyncMock):
+ await qs.create()
+
+ span = _get_single_span(exporter, SpanName.CREATE_SESSION)
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["server.address"] == "test_endpoint"
+ assert attrs["server.port"] == 1337
+
+ def test_async_connection_peer_attributes_are_resolved(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.aio.connection import Connection
+ from ydb.connection import EndpointOptions
+ from ydb.opentelemetry.tracing import create_ydb_span
+ from ydb.query.session import _resolve_peer
+
+ cfg = FakeDriverConfig()
+ endpoint_options = EndpointOptions(
+ node_id=12345,
+ address="node.example.net",
+ port=2136,
+ location="dc-a",
+ )
+
+ with patch("ydb.aio.connection.channel_factory", return_value=MagicMock()):
+ with patch("ydb.aio.connection._stubs_list", ()):
+ connection = Connection(
+ endpoint="grpc://node.example.net:2136",
+ driver_config=cfg,
+ endpoint_options=endpoint_options,
+ )
+
+ driver = MagicMock()
+ driver._store.connections_by_node_id = {12345: connection}
+
+ span = create_ydb_span(
+ SpanName.CREATE_SESSION,
+ cfg,
+ node_id=12345,
+ peer=_resolve_peer(driver, 12345),
+ )
+ span.end()
+
+ span = _get_single_span(exporter, SpanName.CREATE_SESSION)
+ attrs = dict(span.attributes)
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "node.example.net"
+ assert attrs["network.peer.port"] == 2136
+ assert attrs["ydb.node.dc"] == "dc-a"
+
+
+class TestAsyncExecuteQuerySpan:
+ @pytest.mark.asyncio
+ async def test_session_execute_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.aio.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._peer = ("n1", 2136, "dc-a")
+ qs._closed = False
+
+ fake_stream = _empty_async_iter()
+ with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, return_value=fake_stream):
+ result = await qs.execute("SELECT 1;")
+ async for _ in result:
+ pass
+
+ span = _get_single_span(exporter, SpanName.EXECUTE_QUERY)
+ attrs = dict(span.attributes)
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.session.id" not in attrs
+
+ @pytest.mark.asyncio
+ async def test_tx_execute_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_async_tx(session, driver)
+
+ fake_stream = _empty_async_iter()
+ with patch.object(type(tx), "_execute_call", new_callable=AsyncMock, return_value=fake_stream):
+ tx._prev_stream = None
+ result = await tx.execute("SELECT 1;")
+ async for _ in result:
+ pass
+
+ span = _get_single_span(exporter, SpanName.EXECUTE_QUERY)
+ attrs = dict(span.attributes)
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.tx.id" not in attrs
+ assert "ydb.session.id" not in attrs
+
+
+class TestAsyncBeginTransactionSpan:
+ @pytest.mark.asyncio
+ async def test_begin_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_fresh_async_tx(session, driver)
+
+ with patch.object(type(tx), "_begin_call", new_callable=AsyncMock):
+ await tx.begin()
+
+ span = _get_single_span(exporter, SpanName.BEGIN_TRANSACTION)
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["network.peer.port"] == 2136
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.session.id" not in attrs
+ assert "ydb.tx.id" not in attrs
+ assert span.status.status_code == StatusCode.UNSET
+
+ @pytest.mark.asyncio
+ async def test_begin_sets_error_status_on_failure(self, otel_setup):
+ from ydb import issues
+
+ exporter = otel_setup
+ session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_fresh_async_tx(session, driver)
+
+ exc = issues.Unavailable("bad node")
+ with patch.object(type(tx), "_begin_call", new_callable=AsyncMock, side_effect=exc):
+ with pytest.raises(issues.Unavailable):
+ await tx.begin()
+
+ span = _get_single_span(exporter, SpanName.BEGIN_TRANSACTION)
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "UNAVAILABLE"
+ assert len(span.events) > 0
+
+
+class TestAsyncCommitSpan:
+ @pytest.mark.asyncio
+ async def test_commit_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_async_tx(session, driver)
+
+ with patch.object(type(tx), "_commit_call", new_callable=AsyncMock):
+ await tx.commit()
+
+ span = _get_single_span(exporter, SpanName.COMMIT)
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.tx.id" not in attrs
+ assert "ydb.session.id" not in attrs
+
+
+class TestAsyncRollbackSpan:
+ @pytest.mark.asyncio
+ async def test_rollback_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_async_tx(session, driver)
+
+ with patch.object(type(tx), "_rollback_call", new_callable=AsyncMock):
+ await tx.rollback()
+
+ span = _get_single_span(exporter, SpanName.ROLLBACK)
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.tx.id" not in attrs
+ assert "ydb.session.id" not in attrs
+
+
+class TestAsyncCommitRollbackErrorRecording:
+ """Async commit/rollback: the span must record the exception (event +
+ StatusCode.ERROR + error.type + db.response.status_code) when the underlying
+ RPC raises, just like the sync path.
+ """
+
+ @pytest.mark.asyncio
+ async def test_commit_records_exception_on_failure(self, otel_setup):
+ from ydb import issues
+
+ exporter = otel_setup
+ session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_async_tx(session, driver)
+
+ exc = issues.Aborted("boom")
+ with patch.object(type(tx), "_commit_call", new_callable=AsyncMock, side_effect=exc):
+ with pytest.raises(issues.Aborted):
+ await tx.commit()
+
+ span = _get_single_span(exporter, SpanName.COMMIT)
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "ABORTED"
+ assert any(e.name == "exception" for e in span.events)
+
+ @pytest.mark.asyncio
+ async def test_rollback_records_exception_on_failure(self, otel_setup):
+ from ydb import issues
+
+ exporter = otel_setup
+ session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_async_tx(session, driver)
+
+ exc = issues.Unavailable("boom")
+ with patch.object(type(tx), "_rollback_call", new_callable=AsyncMock, side_effect=exc):
+ with pytest.raises(issues.Unavailable):
+ await tx.rollback()
+
+ span = _get_single_span(exporter, SpanName.ROLLBACK)
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "UNAVAILABLE"
+ assert any(e.name == "exception" for e in span.events)
+
+
+class TestAsyncErrorHandling:
+ @pytest.mark.asyncio
+ async def test_error_sets_error_status_and_attributes(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb import issues
+
+ exc = issues.SchemeError("Table not found")
+
+ from ydb.aio.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._closed = False
+
+ with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, side_effect=exc):
+ with pytest.raises(issues.SchemeError):
+ await qs.execute("SELECT * FROM non_existing_table")
+
+ span = _get_single_span(exporter, SpanName.EXECUTE_QUERY)
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "SCHEME_ERROR"
+ assert len(span.events) > 0
+
+
+class TestAsyncRetryPolicySpans:
+ @pytest.mark.asyncio
+ async def test_success_emits_single_try(self, otel_setup):
+ from ydb.retries import retry_operation_async
+
+ exporter = otel_setup
+
+ async def callee():
+ return 7
+
+ assert await retry_operation_async(callee) == 7
+
+ run = _get_single_span(exporter, SpanName.RUN_WITH_RETRY)
+ assert run.kind == SpanKind.INTERNAL
+
+ tries = _get_spans(exporter, SpanName.TRY)
+ assert len(tries) == 1
+ assert tries[0].parent.span_id == run.context.span_id
+ assert "ydb.retry.backoff_ms" not in dict(tries[0].attributes)
+ assert tries[0].status.status_code == StatusCode.UNSET
+
+ @pytest.mark.asyncio
+ async def test_retry_failed_tries_set_error_status(self, otel_setup):
+ """Failed async attempts must set ``ydb.Try`` status to ERROR (not UNSET)."""
+ from ydb import issues
+ from ydb.retries import BackoffSettings, RetrySettings, retry_operation_async
+
+ exporter = otel_setup
+ counter = {"n": 0}
+
+ async def flaky():
+ counter["n"] += 1
+ if counter["n"] < 3:
+ raise issues.Unavailable("transient")
+ return "ok"
+
+ retry_settings = RetrySettings(
+ max_retries=5,
+ fast_backoff_settings=BackoffSettings(ceiling=0, slot_duration=0.05),
+ slow_backoff_settings=BackoffSettings(ceiling=0, slot_duration=0.05),
+ )
+
+ assert await retry_operation_async(flaky, retry_settings) == "ok"
+
+ tries = _get_spans(exporter, SpanName.TRY)
+ assert len(tries) == 3
+ assert tries[0].status.status_code == StatusCode.ERROR
+ assert tries[1].status.status_code == StatusCode.ERROR
+ assert tries[2].status.status_code == StatusCode.UNSET
+
+ @pytest.mark.asyncio
+ async def test_context_cancel_during_backoff_records_exception(self, otel_setup):
+ """Inter-attempt sleep is outside ``ydb.Try``; cancellation during
+ ``asyncio.sleep`` is recorded on ``ydb.RunWithRetry`` (``record_exception``).
+ """
+ from ydb import issues
+ from ydb.retries import BackoffSettings, RetrySettings, retry_operation_async
+
+ exporter = otel_setup
+ calls = {"n": 0}
+
+ async def flaky():
+ calls["n"] += 1
+ raise issues.Unavailable("transient")
+
+ retry_settings = RetrySettings(
+ max_retries=10,
+ fast_backoff_settings=BackoffSettings(ceiling=0, slot_duration=10.0),
+ slow_backoff_settings=BackoffSettings(ceiling=0, slot_duration=10.0),
+ )
+
+ task = asyncio.ensure_future(retry_operation_async(flaky, retry_settings))
+ for _ in range(10):
+ await asyncio.sleep(0.01)
+ if calls["n"] >= 1:
+ break
+ task.cancel()
+ with pytest.raises(asyncio.CancelledError):
+ await task
+
+ run = _get_single_span(exporter, SpanName.RUN_WITH_RETRY)
+ assert run.status.status_code == StatusCode.ERROR
+ # TracingSpan / OTel will attach the cancellation as span events (record_exception) when enabled.
+ assert run.events is not None
+ # First attempt: ``ydb.Try``; cancel hits ``ydb.RunWithRetry`` during the inter-attempt sleep.
+ tries = _get_spans(exporter, SpanName.TRY)
+ assert len(tries) >= 1
+
+
+class TestAsyncRetrySpanNesting:
+ @pytest.mark.asyncio
+ async def test_execute_query_is_child_of_try_under_run_with_retry(self, otel_setup):
+ """``ydb.RunWithRetry`` -> ``ydb.Try`` -> ``ydb.ExecuteQuery`` (deep nesting).
+
+ The previous implementation produced sibling spans because ``ydb.Try`` was
+ opened *after* the awaitable was created, leaving the gRPC span without an
+ active ``ydb.Try`` context. This test pins the corrected nesting.
+ """
+ from ydb.aio.query.session import QuerySession
+ from ydb.retries import retry_operation_async
+
+ exporter = otel_setup
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._peer = ("n1", 2136, "dc-a")
+ qs._closed = False
+
+ async def callee():
+ fake_stream = _empty_async_iter()
+ with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, return_value=fake_stream):
+ result = await qs.execute("SELECT 1;")
+ async for _ in result:
+ pass
+ return "ok"
+
+ assert await retry_operation_async(callee) == "ok"
+
+ run = _get_single_span(exporter, SpanName.RUN_WITH_RETRY)
+ try_span = _get_single_span(exporter, SpanName.TRY)
+ exec_span = _get_single_span(exporter, SpanName.EXECUTE_QUERY)
+
+ assert try_span.parent.span_id == run.context.span_id
+ assert exec_span.parent.span_id == try_span.context.span_id
+ assert exec_span.context.trace_id == run.context.trace_id
+
+
+class TestAsyncConcurrentSpansIsolation:
+ @pytest.mark.asyncio
+ async def test_parallel_executes_do_not_become_parent_child(self, otel_setup):
+ """Two concurrent execute calls must produce sibling spans, not parent-child."""
+ exporter = otel_setup
+
+ from ydb.aio.query.session import QuerySession
+
+ async def _slow_async_iter():
+ await asyncio.sleep(0.5)
+ return
+ yield # noqa
+
+ def _make_session():
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 1
+ qs._closed = False
+ return qs
+
+ async def do_execute(qs):
+ fake_stream = _slow_async_iter()
+ with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, return_value=fake_stream):
+ result = await qs.execute("SELECT 1")
+ async for _ in result:
+ pass
+
+ qs1 = _make_session()
+ qs2 = _make_session()
+ await asyncio.gather(do_execute(qs1), do_execute(qs2))
+
+ spans = _get_spans(exporter, SpanName.EXECUTE_QUERY)
+ assert len(spans) == 2
+
+ ids = {s.context.span_id for s in spans}
+ for s in spans:
+ if s.parent is not None:
+ assert s.parent.span_id not in ids, "Concurrent spans must be siblings, not parent-child"
diff --git a/tests/tracing/test_tracing_sync.py b/tests/tracing/test_tracing_sync.py
new file mode 100644
index 00000000..9f8bbc42
--- /dev/null
+++ b/tests/tracing/test_tracing_sync.py
@@ -0,0 +1,638 @@
+"""Unit tests for OpenTelemetry tracing — synchronous SDK operations.
+
+Uses an in-memory span exporter to verify that correct spans, attributes,
+parent-child relationships, and error handling are produced by the SDK.
+No real YDB connection is needed.
+"""
+
+from unittest.mock import MagicMock, patch
+from opentelemetry import trace
+from opentelemetry.trace import StatusCode, SpanKind
+from ydb.opentelemetry.tracing import SpanName, _registry, create_ydb_span
+from ydb.query.transaction import QueryTxStateEnum
+from .conftest import FakeDriverConfig
+
+import pytest
+
+
+def _get_spans(exporter, name=None):
+ spans = exporter.get_finished_spans()
+ if name is not None:
+ spans = [s for s in spans if s.name == name]
+ return spans
+
+
+def _get_single_span(exporter, name):
+ spans = _get_spans(exporter, name)
+ assert (
+ len(spans) == 1
+ ), f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}"
+ return spans[0]
+
+
+def _make_session_mock(driver_config=None, peer=None):
+ """Create a mock that behaves like a sync QuerySession after create()."""
+ cfg = driver_config or FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+
+ session = MagicMock()
+ session._driver = driver
+ session._session_id = "test-session-id"
+ session._node_id = 12345
+ session._peer = peer
+ session.session_id = "test-session-id"
+ session.node_id = 12345
+ return session, driver
+
+
+def _make_tx(session, driver):
+ """Create a real QueryTxContext wired to mocked session/driver."""
+ from ydb._grpc.grpcwrapper.ydb_query_public_types import QuerySerializableReadWrite
+ from ydb.query.transaction import QueryTxContext
+
+ tx = QueryTxContext(driver, session, QuerySerializableReadWrite())
+ # Simulate that the transaction has been started (so commit/rollback create spans)
+ tx._tx_state._change_state(QueryTxStateEnum.BEGINED)
+ tx._tx_state.tx_id = "test-tx-id"
+ return tx
+
+
+def _make_fresh_tx(session, driver):
+ """Create a real QueryTxContext in NOT_INITIALIZED state (for begin())."""
+ from ydb._grpc.grpcwrapper.ydb_query_public_types import QuerySerializableReadWrite
+ from ydb.query.transaction import QueryTxContext
+
+ return QueryTxContext(driver, session, QuerySerializableReadWrite())
+
+
+class TestCreateSessionSpan:
+ def test_create_session_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = None
+ qs._closed = False
+
+ with patch.object(QuerySession, "_create_call", return_value=None):
+ with patch.object(QuerySession, "_attach", return_value=None):
+ qs.create()
+
+ span = _get_single_span(exporter, SpanName.CREATE_SESSION)
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["server.address"] == "test_endpoint"
+ assert attrs["server.port"] == 1337
+ assert span.status.status_code == StatusCode.UNSET
+
+
+class TestExecuteQuerySpan:
+ def test_session_execute_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._peer = ("node-7.cluster", 2136, "dc-east")
+ qs._closed = False
+
+ fake_stream = iter([]) # empty stream that raises StopIteration immediately
+ with patch.object(QuerySession, "_execute_call", return_value=fake_stream):
+ result = qs.execute("SELECT 1;")
+ # Consume the iterator to finish the span
+ list(result)
+
+ span = _get_single_span(exporter, SpanName.EXECUTE_QUERY)
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["server.address"] == "test_endpoint"
+ assert attrs["server.port"] == 1337
+ assert attrs["network.peer.address"] == "node-7.cluster"
+ assert attrs["network.peer.port"] == 2136
+ assert attrs["ydb.node.dc"] == "dc-east"
+ assert attrs["ydb.node.id"] == 12345
+ assert "ydb.session.id" not in attrs
+ assert "ydb.tx.id" not in attrs
+
+ def test_tx_execute_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_tx(session, driver)
+
+ fake_stream = iter([])
+ with patch.object(type(tx), "_execute_call", return_value=fake_stream):
+ tx._prev_stream = None
+ result = tx.execute("SELECT 1;")
+ list(result)
+
+ span = _get_single_span(exporter, SpanName.EXECUTE_QUERY)
+ attrs = dict(span.attributes)
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["network.peer.port"] == 2136
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.session.id" not in attrs
+ assert "ydb.tx.id" not in attrs
+
+
+class TestBeginTransactionSpan:
+ def test_begin_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_fresh_tx(session, driver)
+
+ with patch.object(type(tx), "_begin_call", return_value=None):
+ tx.begin()
+
+ span = _get_single_span(exporter, SpanName.BEGIN_TRANSACTION)
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["network.peer.port"] == 2136
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.session.id" not in attrs
+ assert "ydb.tx.id" not in attrs
+ assert span.status.status_code == StatusCode.UNSET
+
+ def test_begin_sets_error_status_on_failure(self, otel_setup):
+ from ydb import issues
+
+ exporter = otel_setup
+ session, driver = _make_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_fresh_tx(session, driver)
+
+ exc = issues.Unavailable("bad node")
+ with patch.object(type(tx), "_begin_call", side_effect=exc):
+ with pytest.raises(issues.Unavailable):
+ tx.begin()
+
+ span = _get_single_span(exporter, SpanName.BEGIN_TRANSACTION)
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "UNAVAILABLE"
+ assert len(span.events) > 0
+
+
+class TestCommitSpan:
+ def test_commit_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_tx(session, driver)
+
+ with patch.object(type(tx), "_commit_call", return_value=None):
+ tx.commit()
+
+ span = _get_single_span(exporter, SpanName.COMMIT)
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.session.id" not in attrs
+ assert "ydb.tx.id" not in attrs
+
+
+class TestRollbackSpan:
+ def test_rollback_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_tx(session, driver)
+
+ with patch.object(type(tx), "_rollback_call", return_value=None):
+ tx.rollback()
+
+ span = _get_single_span(exporter, SpanName.ROLLBACK)
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.session.id" not in attrs
+ assert "ydb.tx.id" not in attrs
+
+
+class TestCommitRollbackErrorRecording:
+ """When the underlying RPC raises, the span must:
+ - end with ``StatusCode.ERROR``
+ - have ``error.type`` and ``db.response.status_code`` set
+ - have the exception recorded as a span event (``record_exception``)
+ """
+
+ def test_commit_records_exception_on_failure(self, otel_setup):
+ from ydb import issues
+
+ exporter = otel_setup
+ session, driver = _make_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_tx(session, driver)
+
+ exc = issues.Aborted("boom")
+ with patch.object(type(tx), "_commit_call", side_effect=exc):
+ with pytest.raises(issues.Aborted):
+ tx.commit()
+
+ span = _get_single_span(exporter, SpanName.COMMIT)
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "ABORTED"
+ assert any(e.name == "exception" for e in span.events)
+
+ def test_rollback_records_exception_on_failure(self, otel_setup):
+ from ydb import issues
+
+ exporter = otel_setup
+ session, driver = _make_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_tx(session, driver)
+
+ exc = issues.Unavailable("boom")
+ with patch.object(type(tx), "_rollback_call", side_effect=exc):
+ with pytest.raises(issues.Unavailable):
+ tx.rollback()
+
+ span = _get_single_span(exporter, SpanName.ROLLBACK)
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "UNAVAILABLE"
+ assert any(e.name == "exception" for e in span.events)
+
+
+class TestErrorHandling:
+ def test_error_sets_error_status_and_attributes(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb import issues
+
+ exc = issues.SchemeError("Table not found")
+
+ from ydb.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._closed = False
+
+ with patch.object(QuerySession, "_execute_call", side_effect=exc):
+ with pytest.raises(issues.SchemeError):
+ qs.execute("SELECT * FROM non_existing_table")
+
+ span = _get_single_span(exporter, SpanName.EXECUTE_QUERY)
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "SCHEME_ERROR"
+ assert len(span.events) > 0
+
+
+class TestNoSpansWhenDisabled:
+ def test_no_spans_without_enable_tracing(self):
+ """Without enable_tracing(), the registry uses noop — no spans are created."""
+
+ from tests.tracing.conftest import _exporter
+
+ _registry.set_create_span(None)
+ _registry.set_metadata_hook(None)
+ _exporter.clear()
+
+ with create_ydb_span(SpanName.CREATE_SESSION, FakeDriverConfig()).attach_context():
+ pass
+
+ assert len(_exporter.get_finished_spans()) == 0
+
+
+class TestParentChildRelationship:
+ def test_sdk_span_is_child_of_user_span(self, otel_setup):
+ exporter = otel_setup
+
+ tracer = trace.get_tracer("test.tracer")
+
+ with tracer.start_as_current_span("user.operation"):
+ with create_ydb_span(SpanName.EXECUTE_QUERY, FakeDriverConfig(), node_id=1).attach_context():
+ pass
+
+ spans = exporter.get_finished_spans()
+ ydb_span = next(s for s in spans if s.name == SpanName.EXECUTE_QUERY)
+ user_span = next(s for s in spans if s.name == "user.operation")
+
+ assert ydb_span.parent is not None
+ assert ydb_span.parent.span_id == user_span.context.span_id
+ assert ydb_span.context.trace_id == user_span.context.trace_id
+
+
+class TestTraceMetadataInjection:
+ def test_get_trace_metadata_returns_traceparent(self, otel_setup):
+ from ydb.opentelemetry.tracing import get_trace_metadata
+
+ tracer = trace.get_tracer("test.tracer")
+
+ with tracer.start_as_current_span("test.span"):
+ metadata = get_trace_metadata()
+
+ keys = [k for k, v in metadata]
+ assert "traceparent" in keys
+
+
+class TestDriverInitializeSpan:
+ def test_driver_initialize_emits_internal_span(self, otel_setup):
+ exporter = otel_setup
+
+ cfg = FakeDriverConfig()
+
+ with create_ydb_span(SpanName.DRIVER_INITIALIZE, cfg, kind="internal").attach_context():
+ pass
+
+ span = _get_single_span(exporter, SpanName.DRIVER_INITIALIZE)
+ assert span.kind == SpanKind.INTERNAL
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+
+
+class TestCommonAttributes:
+ @pytest.mark.parametrize(
+ "endpoint,expected_host,expected_port",
+ [
+ ("grpc://host.example.com:2136", "host.example.com", 2136),
+ ("localhost:2136", "localhost", 2136),
+ ("[::1]:2136", "[::1]", 2136),
+ ],
+ )
+ def test_endpoint_parsing(self, otel_setup, endpoint, expected_host, expected_port):
+ exporter = otel_setup
+ cfg = FakeDriverConfig(endpoint=endpoint, database="/mydb")
+
+ with create_ydb_span("ydb.Test", cfg).attach_context():
+ pass
+
+ span = _get_single_span(exporter, "ydb.Test")
+ attrs = dict(span.attributes)
+ assert attrs["server.address"] == expected_host
+ assert attrs["server.port"] == expected_port
+ assert attrs["db.namespace"] == "/mydb"
+
+ def test_peer_attributes_are_optional(self, otel_setup):
+ exporter = otel_setup
+ cfg = FakeDriverConfig()
+
+ with create_ydb_span("ydb.Test", cfg).attach_context():
+ pass
+
+ span = _get_single_span(exporter, "ydb.Test")
+ attrs = dict(span.attributes)
+ assert "network.peer.address" not in attrs
+ assert "network.peer.port" not in attrs
+
+ def test_peer_attributes_emitted_when_known(self, otel_setup):
+ exporter = otel_setup
+ cfg = FakeDriverConfig()
+
+ with create_ydb_span("ydb.Test", cfg, peer=("peer.example.com", 2137, "dc-west")).attach_context():
+ pass
+
+ span = _get_single_span(exporter, "ydb.Test")
+ attrs = dict(span.attributes)
+ assert attrs["network.peer.address"] == "peer.example.com"
+ assert attrs["network.peer.port"] == 2137
+ assert attrs["ydb.node.dc"] == "dc-west"
+
+
+class TestPeerFromEndpointMap:
+ def test_wrapper_create_session_pulls_peer_from_store(self, otel_setup):
+ """wrapper_create_session must resolve peer (host, port, dc) via the driver's
+ connections_by_node_id cache, not via the grpc target string of the rpc call.
+ """
+ from ydb.query.session import wrapper_create_session
+
+ connection = MagicMock()
+ connection.endpoint = "ipv4:10.0.0.1:2136"
+ connection.peer_address = "node-42.dc-west.example"
+ connection.peer_port = 2136
+ connection.peer_location = "dc-west"
+
+ driver = MagicMock()
+ driver._store.connections_by_node_id = {42: connection}
+
+ session = MagicMock()
+ session._driver = driver
+
+ rpc_state = MagicMock()
+ rpc_state.endpoint = "ipv4:10.0.0.1:2136" # grpc-target string — should be ignored
+
+ proto = MagicMock()
+ with patch("ydb.query.session._ydb_query.CreateSessionResponse.from_proto") as from_proto:
+ from_proto.return_value = MagicMock(session_id="s-1", node_id=42, status=MagicMock())
+ with patch("ydb.issues._process_response"):
+ wrapper_create_session(rpc_state, proto, session)
+
+ assert session._peer == ("node-42.dc-west.example", 2136, "dc-west")
+
+
+class TestRetryPolicySpans:
+ def test_success_on_first_try_emits_single_try(self, otel_setup):
+ from ydb.retries import retry_operation_sync
+
+ exporter = otel_setup
+
+ def callee():
+ return 42
+
+ assert retry_operation_sync(callee) == 42
+
+ run = _get_single_span(exporter, SpanName.RUN_WITH_RETRY)
+ assert run.kind == SpanKind.INTERNAL
+ assert run.status.status_code == StatusCode.UNSET
+
+ tries = _get_spans(exporter, SpanName.TRY)
+ assert len(tries) == 1
+ assert tries[0].kind == SpanKind.INTERNAL
+ assert "ydb.retry.backoff_ms" not in dict(tries[0].attributes)
+ assert tries[0].parent.span_id == run.context.span_id
+
+ def test_retry_backoff_ms_on_each_try(self, otel_setup):
+ from ydb import issues
+ from ydb.retries import retry_operation_sync
+ from ydb.retries import RetrySettings, BackoffSettings
+
+ exporter = otel_setup
+ counter = {"n": 0}
+
+ def flaky():
+ counter["n"] += 1
+ if counter["n"] < 3:
+ raise issues.Unavailable("transient")
+ return "ok"
+
+ retry_settings = RetrySettings(
+ max_retries=5,
+ fast_backoff_settings=BackoffSettings(ceiling=0, slot_duration=0.05),
+ slow_backoff_settings=BackoffSettings(ceiling=0, slot_duration=0.05),
+ )
+
+ assert retry_operation_sync(flaky, retry_settings) == "ok"
+
+ tries = _get_spans(exporter, SpanName.TRY)
+ assert len(tries) == 3
+ # First attempt has no preceding backoff, so no attribute at all; later ones
+ # carry a positive integer ms.
+ attrs0 = dict(tries[0].attributes)
+ assert "ydb.retry.backoff_ms" not in attrs0
+ later_values = [dict(s.attributes).get("ydb.retry.backoff_ms") for s in tries[1:]]
+ assert all(isinstance(v, int) and v > 0 for v in later_values)
+ # failed Try spans record the exception
+ assert tries[0].status.status_code == StatusCode.ERROR
+ assert tries[1].status.status_code == StatusCode.ERROR
+ assert tries[2].status.status_code == StatusCode.UNSET
+
+ def test_backoff_ms_attribute_matches_actual_sleep(self, otel_setup, monkeypatch):
+ """Pin the closure: ``ydb.retry.backoff_ms`` on the n-th ``ydb.Try`` equals
+ the sleep that preceded it, regardless of which retry attempt triggered it.
+
+ Both ``random.random`` and ``time.sleep`` are mocked so the math is fully
+ deterministic and the test does not actually wait. With
+ ``ceiling=0, slot_duration=0.1, uncertain_ratio=0.5`` and ``random()=0.5``::
+
+ slots_count = 1
+ max_duration = 1 * 0.1 * 1000 = 100 ms
+ duration = 100 * (0.5*0.5 + 0.5) = 75 ms
+ """
+ from ydb import issues
+ from ydb.retries import retry_operation_sync, RetrySettings, BackoffSettings
+
+ monkeypatch.setattr("random.random", lambda: 0.5)
+ sleeps = []
+ monkeypatch.setattr("time.sleep", sleeps.append)
+
+ exporter = otel_setup
+ counter = {"n": 0}
+
+ def flaky():
+ counter["n"] += 1
+ if counter["n"] < 3:
+ raise issues.Unavailable("transient")
+ return "ok"
+
+ settings = RetrySettings(
+ max_retries=5,
+ fast_backoff_settings=BackoffSettings(ceiling=0, slot_duration=0.1, uncertain_ratio=0.5),
+ slow_backoff_settings=BackoffSettings(ceiling=0, slot_duration=0.1, uncertain_ratio=0.5),
+ )
+ assert retry_operation_sync(flaky, settings) == "ok"
+
+ expected_ms = 75
+
+ tries = _get_spans(exporter, SpanName.TRY)
+ assert len(tries) == 3
+ assert "ydb.retry.backoff_ms" not in dict(tries[0].attributes)
+ assert dict(tries[1].attributes)["ydb.retry.backoff_ms"] == expected_ms
+ assert dict(tries[2].attributes)["ydb.retry.backoff_ms"] == expected_ms
+ assert sleeps == [expected_ms / 1000.0, expected_ms / 1000.0]
+
+ def test_skip_backoff_errors_still_emit_one_try_per_attempt(self, otel_setup):
+ """Aborted/BadSession path skips the inter-attempt sleep but must still rotate ydb.Try spans."""
+ from ydb import issues
+ from ydb.retries import RetrySettings, retry_operation_sync
+
+ exporter = otel_setup
+ counter = {"n": 0}
+
+ def flaky():
+ counter["n"] += 1
+ if counter["n"] < 3:
+ raise issues.Aborted("retry me")
+ return "ok"
+
+ assert retry_operation_sync(flaky, RetrySettings(max_retries=5)) == "ok"
+
+ tries = _get_spans(exporter, SpanName.TRY)
+ assert len(tries) == 3
+ assert tries[0].status.status_code == StatusCode.ERROR
+ assert tries[1].status.status_code == StatusCode.ERROR
+ assert tries[2].status.status_code == StatusCode.UNSET
+ # First Try has no preceding sleep -> attribute is absent.
+ # Skip-yield path means the inter-attempt sleep was zero, so backoff_ms = 0
+ # is recorded on retries to make "we did go through a retry boundary" explicit.
+ assert "ydb.retry.backoff_ms" not in dict(tries[0].attributes)
+ assert dict(tries[1].attributes)["ydb.retry.backoff_ms"] == 0
+ assert dict(tries[2].attributes)["ydb.retry.backoff_ms"] == 0
+
+ def test_non_retryable_error_propagates_to_run_span(self, otel_setup):
+ from ydb import issues
+ from ydb.retries import retry_operation_sync
+
+ exporter = otel_setup
+
+ def broken():
+ raise issues.SchemeError("boom")
+
+ with pytest.raises(issues.SchemeError):
+ retry_operation_sync(broken)
+
+ run = _get_single_span(exporter, SpanName.RUN_WITH_RETRY)
+ assert run.status.status_code == StatusCode.ERROR
+
+ tries = _get_spans(exporter, SpanName.TRY)
+ assert len(tries) == 1
+ assert tries[0].status.status_code == StatusCode.ERROR
+ attrs = dict(tries[0].attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "SCHEME_ERROR"
+
+ def test_execute_query_is_child_of_try_under_run_with_retry(self, otel_setup):
+ """``ydb.RunWithRetry`` -> ``ydb.Try`` -> ``ydb.ExecuteQuery`` (sync path)."""
+ from ydb.query.session import QuerySession
+ from ydb.retries import retry_operation_sync
+
+ exporter = otel_setup
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._peer = ("n1", 2136, "dc-a")
+ qs._closed = False
+
+ def callee():
+ fake_stream = iter([])
+ with patch.object(QuerySession, "_execute_call", return_value=fake_stream):
+ result = qs.execute("SELECT 1;")
+ list(result)
+ return "ok"
+
+ assert retry_operation_sync(callee) == "ok"
+
+ run = _get_single_span(exporter, SpanName.RUN_WITH_RETRY)
+ try_span = _get_single_span(exporter, SpanName.TRY)
+ exec_span = _get_single_span(exporter, SpanName.EXECUTE_QUERY)
+
+ assert try_span.parent.span_id == run.context.span_id
+ assert exec_span.parent.span_id == try_span.context.span_id
+ assert exec_span.context.trace_id == run.context.trace_id
diff --git a/ydb/aio/connection.py b/ydb/aio/connection.py
index 9e03450d..e5e57e3b 100644
--- a/ydb/aio/connection.py
+++ b/ydb/aio/connection.py
@@ -26,6 +26,7 @@
from ydb.driver import DriverConfig
from ydb.settings import BaseRequestSettings
from ydb import issues
+from ydb.opentelemetry.tracing import get_trace_metadata
# Workaround for good IDE and universal for runtime
if TYPE_CHECKING:
@@ -71,6 +72,9 @@ async def _construct_metadata(
metadata.append((YDB_REQUEST_TYPE_HEADER, settings.request_type))
metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ())))
+
+ metadata.extend(get_trace_metadata())
+
return metadata
@@ -149,6 +153,9 @@ class Connection:
"closing",
"endpoint_key",
"node_id",
+ "peer_address",
+ "peer_port",
+ "peer_location",
)
def __init__(
@@ -157,10 +164,12 @@ def __init__(
driver_config: Optional[DriverConfig] = None,
endpoint_options: Optional[EndpointOptions] = None,
) -> None:
- global _stubs_list
self.endpoint = endpoint
self.endpoint_key = EndpointKey(self.endpoint, getattr(endpoint_options, "node_id", None))
self.node_id = getattr(endpoint_options, "node_id", None)
+ self.peer_address = getattr(endpoint_options, "address", None)
+ self.peer_port = getattr(endpoint_options, "port", None)
+ self.peer_location = getattr(endpoint_options, "location", None)
self._channel = channel_factory(self.endpoint, driver_config, grpc.aio, endpoint_options=endpoint_options)
self._driver_config = driver_config
diff --git a/ydb/aio/driver.py b/ydb/aio/driver.py
index 405e5fcb..88221e94 100644
--- a/ydb/aio/driver.py
+++ b/ydb/aio/driver.py
@@ -69,6 +69,7 @@ def __init__(
root_certificates,
credentials,
config_class=DriverConfig,
+ **kwargs,
)
super(Driver, self).__init__(config)
diff --git a/ydb/aio/pool.py b/ydb/aio/pool.py
index fe709133..5eb51b5c 100644
--- a/ydb/aio/pool.py
+++ b/ydb/aio/pool.py
@@ -6,6 +6,7 @@
from typing import Any, Callable, Optional, Tuple, TYPE_CHECKING
from ydb import issues
+from ydb.opentelemetry.tracing import SpanName, create_ydb_span
from ydb.pool import ConnectionsCache as _ConnectionsCache, IConnectionPool
from .connection import Connection, EndpointKey
@@ -285,7 +286,8 @@ async def __wrapper__() -> None:
return __wrapper__
async def wait(self, timeout: Optional[float] = 7.0, fail_fast: bool = False) -> None: # type: ignore[override] # async override of sync method
- await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0)
+ with create_ydb_span(SpanName.DRIVER_INITIALIZE, self._driver_config, kind="internal").attach_context():
+ await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0)
def discovery_debug_details(self) -> str:
if self._discovery:
diff --git a/ydb/aio/query/base.py b/ydb/aio/query/base.py
index a1d6e5d7..6c13dfd4 100644
--- a/ydb/aio/query/base.py
+++ b/ydb/aio/query/base.py
@@ -2,9 +2,12 @@
class AsyncResponseContextIterator(_utilities.AsyncResponseIterator):
- def __init__(self, it, wrapper, on_error=None):
+ """Async ExecuteQuery result stream."""
+
+ def __init__(self, it, wrapper, on_error=None, on_finish=None):
super().__init__(it, wrapper)
self._on_error = on_error
+ self._on_finish = on_finish
async def __aenter__(self) -> "AsyncResponseContextIterator":
return self
@@ -15,6 +18,7 @@ async def _next(self):
except StopAsyncIteration:
# Normal stream termination is not an error and must not invalidate
# the session.
+ self._call_on_finish()
raise
except BaseException as e:
# BaseException (not Exception) because asyncio.CancelledError
@@ -25,8 +29,17 @@ async def _next(self):
# reply with SessionBusy.
if self._on_error:
self._on_error(e)
+ self._call_on_finish(e)
raise
+ def _call_on_finish(self, exception=None):
+ if self._on_finish is not None:
+ self._on_finish(exception)
+ self._on_finish = None
+
+ def __del__(self):
+ self._call_on_finish()
+
async def __aexit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end.
# Errors that happen during the cleanup drain have already been reported
@@ -39,3 +52,4 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
except BaseException:
pass
+ self._call_on_finish()
diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py
index a565b266..b776b638 100644
--- a/ydb/aio/query/session.py
+++ b/ydb/aio/query/session.py
@@ -19,6 +19,7 @@
from ...query import base
from ...query.session import BaseQuerySession
+from ...opentelemetry.tracing import SpanName, create_ydb_span, set_peer_attributes, span_finish_callback
from ..._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT
@@ -105,8 +106,10 @@ async def create(self, settings: Optional[BaseRequestSettings] = None) -> "Query
if self._closed:
raise RuntimeError("Session is already closed")
- await self._create_call(settings=settings)
- await self._attach()
+ with create_ydb_span(SpanName.CREATE_SESSION, self._driver_config).attach_context() as span:
+ await self._create_call(settings=settings)
+ set_peer_attributes(span, self._peer)
+ await self._attach()
return self
@@ -159,20 +162,27 @@ async def execute(
"""
self._check_session_ready_to_use()
- stream_it = await self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=True,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ SpanName.EXECUTE_QUERY,
+ self._driver_config,
+ node_id=self._node_id,
+ peer=self._peer,
)
+ with span.attach_context(end_on_exit=False):
+ stream_it = await self._execute_call(
+ query=query,
+ parameters=parameters,
+ commit_tx=True,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
return AsyncResponseContextIterator(
it=stream_it,
wrapper=lambda resp: base.wrap_execute_query_response(
@@ -182,6 +192,7 @@ async def execute(
settings=self._settings,
),
on_error=self._on_execute_stream_error,
+ on_finish=span_finish_callback(span),
)
async def explain(
diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py
index c31d79fb..a05d91f2 100644
--- a/ydb/aio/query/transaction.py
+++ b/ydb/aio/query/transaction.py
@@ -12,6 +12,7 @@
BaseQueryTxContext,
QueryTxStateEnum,
)
+from ...opentelemetry.tracing import SpanName, create_ydb_span, span_finish_callback
if TYPE_CHECKING:
from .session import QuerySession
@@ -87,7 +88,13 @@ async def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryT
:return: None or exception if begin is failed
"""
- await self._begin_call(settings)
+ with create_ydb_span(
+ SpanName.BEGIN_TRANSACTION,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ await self._begin_call(settings)
return self
async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None:
@@ -109,13 +116,19 @@ async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None:
await self._ensure_prev_stream_finished()
- try:
- await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT)
- await self._commit_call(settings)
- await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None)
- except BaseException as e:
- await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e)
- raise e
+ with create_ydb_span(
+ SpanName.COMMIT,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ try:
+ await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT)
+ await self._commit_call(settings)
+ await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None)
+ except BaseException as e:
+ await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e)
+ raise e
async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
"""Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
@@ -136,13 +149,19 @@ async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None
await self._ensure_prev_stream_finished()
- try:
- await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK)
- await self._rollback_call(settings)
- await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None)
- except BaseException as e:
- await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e)
- raise e
+ with create_ydb_span(
+ SpanName.ROLLBACK,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ try:
+ await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK)
+ await self._rollback_call(settings)
+ await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None)
+ except BaseException as e:
+ await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e)
+ raise e
async def execute(
self,
@@ -190,20 +209,27 @@ async def execute(
"""
await self._ensure_prev_stream_finished()
- stream_it = await self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=commit_tx,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ SpanName.EXECUTE_QUERY,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
)
+ with span.attach_context(end_on_exit=False):
+ stream_it = await self._execute_call(
+ query=query,
+ parameters=parameters,
+ commit_tx=commit_tx,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
self._prev_stream = AsyncResponseContextIterator(
it=stream_it,
wrapper=lambda resp: base.wrap_execute_query_response(
@@ -215,5 +241,6 @@ async def execute(
settings=self.session._settings,
),
on_error=self.session._on_execute_stream_error,
+ on_finish=span_finish_callback(span),
)
return self._prev_stream
diff --git a/ydb/aio/table.py b/ydb/aio/table.py
index 0d14ba2f..8d5e02c1 100644
--- a/ydb/aio/table.py
+++ b/ydb/aio/table.py
@@ -462,7 +462,8 @@ async def retry_operation(callee, retry_settings=None, *args, **kwargs): # pyli
opt_generator = ydb.retry_operation_impl(callee, retry_settings, *args, **kwargs)
for next_opt in opt_generator:
if isinstance(next_opt, ydb.YdbRetryOperationSleepOpt):
- await asyncio.sleep(next_opt.timeout)
+ if next_opt.timeout > 0:
+ await asyncio.sleep(next_opt.timeout)
else:
try:
return await next_opt.result
diff --git a/ydb/connection.py b/ydb/connection.py
index 98fbd5aa..d64438ef 100644
--- a/ydb/connection.py
+++ b/ydb/connection.py
@@ -24,6 +24,7 @@
import grpc
from . import issues, _apis, _utilities
from . import default_pem
+from .opentelemetry.tracing import get_trace_metadata
_stubs_list = (
_apis.TableService.Stub,
@@ -179,6 +180,9 @@ def _construct_metadata(driver_config, settings):
metadata.extend(getattr(settings, "headers", []))
metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ())))
+
+ metadata.extend(get_trace_metadata())
+
return metadata
@@ -194,11 +198,14 @@ def _get_request_timeout(settings):
class EndpointOptions(object):
- __slots__ = ("ssl_target_name_override", "node_id")
+ __slots__ = ("ssl_target_name_override", "node_id", "address", "port", "location")
- def __init__(self, ssl_target_name_override=None, node_id=None):
+ def __init__(self, ssl_target_name_override=None, node_id=None, address=None, port=None, location=None):
self.ssl_target_name_override = ssl_target_name_override
self.node_id = node_id
+ self.address = address
+ self.port = port
+ self.location = location
def _construct_channel_options(driver_config, endpoint_options=None):
@@ -405,6 +412,9 @@ class Connection(object):
"closing",
"endpoint_key",
"node_id",
+ "peer_address",
+ "peer_port",
+ "peer_location",
)
def __init__(
@@ -419,9 +429,11 @@ def __init__(
discovered by the YDB endpoint discovery mechanism
:param driver_config: A driver config instance to be used for RPC call interception
"""
- global _stubs_list
self.endpoint = endpoint
self.node_id = getattr(endpoint_options, "node_id", None)
+ self.peer_address = getattr(endpoint_options, "address", None)
+ self.peer_port = getattr(endpoint_options, "port", None)
+ self.peer_location = getattr(endpoint_options, "location", None)
self.endpoint_key = EndpointKey(endpoint, getattr(endpoint_options, "node_id", None))
self._channel = channel_factory(self.endpoint, driver_config, endpoint_options=endpoint_options)
self._driver_config = driver_config
diff --git a/ydb/opentelemetry/__init__.py b/ydb/opentelemetry/__init__.py
new file mode 100644
index 00000000..fc058d0d
--- /dev/null
+++ b/ydb/opentelemetry/__init__.py
@@ -0,0 +1,36 @@
+"""Public OpenTelemetry entrypoints for YDB."""
+
+
+def enable_tracing(tracer=None):
+ """Enable OpenTelemetry trace context propagation and span creation for all YDB gRPC calls.
+
+ This call is **idempotent**: if tracing is already enabled, later calls do nothing
+ (including passing a different ``tracer``). Call :func:`disable_tracing` first to
+ reconfigure or turn instrumentation off.
+
+ Args:
+ tracer: Optional OTel tracer to use. If not provided, the default tracer named
+ ``ydb.sdk`` from the global tracer provider will be used.
+ """
+ try:
+ from ydb.opentelemetry.plugin import _enable_tracing
+ except ImportError:
+ raise ImportError(
+ "OpenTelemetry packages are required for tracing support. "
+ "Install them with: pip install ydb[opentelemetry]"
+ ) from None
+
+ _enable_tracing(tracer)
+
+
+def disable_tracing():
+ """Disable YDB OpenTelemetry hooks and allow :func:`enable_tracing` to run again."""
+ try:
+ from ydb.opentelemetry.plugin import _disable_tracing
+ except ImportError:
+ return
+
+ _disable_tracing()
+
+
+__all__ = ["disable_tracing", "enable_tracing"]
diff --git a/ydb/opentelemetry/plugin.py b/ydb/opentelemetry/plugin.py
new file mode 100644
index 00000000..76942789
--- /dev/null
+++ b/ydb/opentelemetry/plugin.py
@@ -0,0 +1,134 @@
+"""OpenTelemetry bridge for YDB."""
+
+from opentelemetry import context as otel_context
+from opentelemetry import trace
+from opentelemetry.propagate import inject
+from opentelemetry.trace import StatusCode
+
+from ydb import issues
+from ydb.issues import StatusCode as YdbStatusCode
+from ydb.opentelemetry.tracing import _registry
+
+# YDB client transport StatusCode values (401xxx band) -> OTel error.type transport_error.
+_TRANSPORT_STATUSES = frozenset(
+ {
+ YdbStatusCode.CONNECTION_LOST,
+ YdbStatusCode.CONNECTION_FAILURE,
+ YdbStatusCode.DEADLINE_EXCEEDED,
+ YdbStatusCode.CLIENT_INTERNAL_ERROR,
+ YdbStatusCode.UNIMPLEMENTED,
+ }
+)
+
+_tracer = None
+_enabled = False
+
+_KIND_MAP = {
+ "client": trace.SpanKind.CLIENT,
+ "internal": trace.SpanKind.INTERNAL,
+}
+
+
+def _otel_metadata_hook():
+ """Inject W3C Trace Context into outgoing gRPC metadata using the active OTel context."""
+ headers = {}
+ inject(headers)
+ return list(headers.items())
+
+
+def _set_error_on_span(span, exception):
+ if isinstance(exception, issues.Error) and exception.status is not None:
+ span.set_attribute("db.response.status_code", exception.status.name)
+ error_type = "transport_error" if exception.status in _TRANSPORT_STATUSES else "ydb_error"
+ else:
+ error_type = type(exception).__qualname__
+
+ span.set_attribute("error.type", error_type)
+ span.set_status(StatusCode.ERROR, str(exception))
+ span.record_exception(exception)
+
+
+class _AttachContext:
+ """Make a span the active OTel context for a ``with`` block.
+
+ When ``end_on_exit=True`` (default) the span is ended on exit — used for
+ single-shot RPCs. When ``end_on_exit=False`` the span is only ended on
+ exception — used for streaming RPCs where the result iterator owns ``end()``.
+ """
+
+ def __init__(self, span, end_on_exit):
+ self._span = span
+ self._end_on_exit = end_on_exit
+ self._token = None
+
+ def __enter__(self):
+ ctx = trace.set_span_in_context(self._span._span)
+ self._token = otel_context.attach(ctx)
+ return self._span
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self._token is not None:
+ otel_context.detach(self._token)
+ self._token = None
+ if exc_val is not None:
+ self._span.set_error(exc_val)
+ self._span.end()
+ elif self._end_on_exit:
+ self._span.end()
+ return False
+
+
+class TracingSpan:
+ """Wrapper around an OTel span.
+
+ Use :meth:`attach_context` as a context manager around any RPC call.
+ The default (``end_on_exit=True``) is for single-shot operations; pass
+ ``end_on_exit=False`` for streaming RPCs where the result iterator owns
+ ``end()``.
+ """
+
+ def __init__(self, span):
+ self._span = span
+
+ def set_error(self, exception):
+ _set_error_on_span(self._span, exception)
+
+ def set_attribute(self, key, value):
+ self._span.set_attribute(key, value)
+
+ def end(self):
+ self._span.end()
+
+ def attach_context(self, end_on_exit=True):
+ return _AttachContext(self, end_on_exit)
+
+
+def _create_span(name, attributes=None, kind=None):
+ span = _tracer.start_span(
+ name,
+ kind=_KIND_MAP.get(kind, trace.SpanKind.CLIENT),
+ attributes=attributes or {},
+ )
+ return TracingSpan(span)
+
+
+def _enable_tracing(tracer=None):
+ global _enabled, _tracer
+
+ if _enabled:
+ return
+
+ _tracer = tracer if tracer is not None else trace.get_tracer("ydb.sdk")
+ _enabled = True
+ _registry.set_metadata_hook(_otel_metadata_hook)
+ _registry.set_create_span(_create_span)
+
+
+def _disable_tracing():
+ """Clear hooks and tracer; after this, :func:`~ydb.opentelemetry.enable_tracing` may be called again."""
+ global _enabled, _tracer
+
+ _registry.set_create_span(None)
+ _registry.set_metadata_hook(None)
+ _enabled = False
+ _tracer = None
diff --git a/ydb/opentelemetry/tracing.py b/ydb/opentelemetry/tracing.py
new file mode 100644
index 00000000..1d0995df
--- /dev/null
+++ b/ydb/opentelemetry/tracing.py
@@ -0,0 +1,165 @@
+"""Internal SDK tracing helpers and registry."""
+
+import enum
+from typing import Optional, Tuple
+
+
+class SpanName(str, enum.Enum):
+ """Canonical span names used across the YDB SDK."""
+
+ CREATE_SESSION = "ydb.CreateSession"
+ EXECUTE_QUERY = "ydb.ExecuteQuery"
+ BEGIN_TRANSACTION = "ydb.BeginTransaction"
+ COMMIT = "ydb.Commit"
+ ROLLBACK = "ydb.Rollback"
+ DRIVER_INITIALIZE = "ydb.Driver.Initialize"
+ RUN_WITH_RETRY = "ydb.RunWithRetry"
+ TRY = "ydb.Try"
+
+
+class _NoopCtx:
+ __slots__ = ("_span",)
+
+ def __init__(self, span):
+ self._span = span
+
+ def __enter__(self):
+ return self._span
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
+
+class _NoopSpan:
+ """Returned by create_ydb_span when tracing is disabled."""
+
+ def set_error(self, exception):
+ pass
+
+ def set_attribute(self, key, value):
+ pass
+
+ def end(self):
+ pass
+
+ def attach_context(self, end_on_exit=True):
+ return _NoopCtx(self)
+
+
+_NOOP_SPAN = _NoopSpan()
+
+
+class OtelTracingRegistry:
+ """Singleton registry for OpenTelemetry tracing.
+
+ By default everything is no-op until :func:`~ydb.opentelemetry.enable_tracing` is called.
+ """
+
+ def __init__(self):
+ self._metadata_hook = None
+ self._create_span_func = None
+
+ def is_active(self) -> bool:
+ return self._create_span_func is not None
+
+ def create_span(self, name, attributes=None, kind=None):
+ if self._create_span_func is None:
+ return _NOOP_SPAN
+ return self._create_span_func(name, attributes, kind=kind)
+
+ def get_trace_metadata(self):
+ if self._metadata_hook is not None:
+ return self._metadata_hook()
+ return []
+
+ def set_metadata_hook(self, hook):
+ self._metadata_hook = hook
+
+ def set_create_span(self, func):
+ self._create_span_func = func
+
+
+_registry = OtelTracingRegistry()
+
+
+def get_trace_metadata():
+ """Return tracing metadata for gRPC calls."""
+ return _registry.get_trace_metadata()
+
+
+def _split_endpoint(endpoint: Optional[str]) -> Tuple[str, int]:
+ ep = endpoint or ""
+ if ep.startswith("grpcs://"):
+ ep = ep[len("grpcs://") :]
+ elif ep.startswith("grpc://"):
+ ep = ep[len("grpc://") :]
+
+ if ep.startswith("["):
+ close = ep.find("]")
+ if close != -1 and len(ep) > close + 1 and ep[close + 1] == ":":
+ host = ep[: close + 1]
+ port_s = ep[close + 2 :]
+ return host, int(port_s) if port_s.isdigit() else 0
+
+ host, sep, port_s = ep.rpartition(":")
+ if not sep:
+ return ep, 0
+ return host, int(port_s) if port_s.isdigit() else 0
+
+
+def _build_ydb_attrs(driver_config, node_id=None, peer=None):
+ host, port = _split_endpoint(getattr(driver_config, "endpoint", None))
+ attrs = {
+ "db.system.name": "ydb",
+ "db.namespace": getattr(driver_config, "database", None) or "",
+ "server.address": host,
+ "server.port": port,
+ }
+ if peer is not None:
+ address, port_, location = peer
+ if address is not None:
+ attrs["network.peer.address"] = address
+ if port_ is not None:
+ attrs["network.peer.port"] = int(port_)
+ if location:
+ attrs["ydb.node.dc"] = location
+ if node_id is not None:
+ attrs["ydb.node.id"] = node_id
+ return attrs
+
+
+def create_span(name, attributes=None, kind="internal"):
+ """Create a span with no YDB-specific attributes (used for SDK-internal operations)."""
+ return _registry.create_span(name, attributes=attributes, kind=kind).attach_context()
+
+
+def create_ydb_span(name, driver_config, node_id=None, kind=None, peer=None):
+ """Create a span pre-filled with standard YDB attributes."""
+ if not _registry.is_active():
+ return _NOOP_SPAN
+ attrs = _build_ydb_attrs(driver_config, node_id, peer)
+ return _registry.create_span(name, attributes=attrs, kind=kind)
+
+
+def set_peer_attributes(span, peer):
+ """Fill in network.peer.* and ydb.node.dc on an existing span once the peer is known."""
+ if peer is None:
+ return
+ address, port, location = peer
+ if address is not None:
+ span.set_attribute("network.peer.address", address)
+ if port is not None:
+ span.set_attribute("network.peer.port", int(port))
+ if location:
+ span.set_attribute("ydb.node.dc", location)
+
+
+def span_finish_callback(span):
+ """Return an on_finish callable that ends *span* when a streaming result iterator completes."""
+
+ def _finish(exception=None):
+ if exception is not None:
+ span.set_error(exception)
+ span.end()
+
+ return _finish
diff --git a/ydb/pool.py b/ydb/pool.py
index 2901c573..31bfe8ba 100644
--- a/ydb/pool.py
+++ b/ydb/pool.py
@@ -10,6 +10,7 @@
from typing import Any, Callable, ContextManager, List, Optional, Set, Tuple, TYPE_CHECKING
from . import connection as connection_impl, issues, resolver, _utilities, tracing
+from .opentelemetry.tracing import SpanName, create_ydb_span
from abc import abstractmethod
from .connection import Connection, EndpointKey
@@ -453,10 +454,11 @@ def wait(self, timeout: Optional[float] = None, fail_fast: bool = False) -> None
:param timeout: A timeout to wait in seconds
:return: None
"""
- if fail_fast:
- self._store.add_fast_fail().result(timeout)
- else:
- self._store.subscribe().result(timeout)
+ with create_ydb_span(SpanName.DRIVER_INITIALIZE, self._driver_config, kind="internal").attach_context():
+ if fail_fast:
+ self._store.add_fast_fail().result(timeout)
+ else:
+ self._store.subscribe().result(timeout)
def _on_disconnected(self, connection: Connection) -> None:
"""
diff --git a/ydb/query/base.py b/ydb/query/base.py
index 7fea6cd0..093c7d55 100644
--- a/ydb/query/base.py
+++ b/ydb/query/base.py
@@ -27,7 +27,6 @@
from ydb._topic_common.common import CallFromSyncToAsync, _get_shared_event_loop
from ydb._grpc.grpcwrapper.common_utils import to_thread
-
if typing.TYPE_CHECKING:
from .transaction import BaseQueryTxContext
from .session import BaseQuerySession
@@ -73,9 +72,12 @@ class QueryResultSetFormat(enum.IntEnum):
class SyncResponseContextIterator(_utilities.SyncResponseIterator):
- def __init__(self, it, wrapper, on_error=None):
+ """Streams ExecuteQuery results."""
+
+ def __init__(self, it, wrapper, on_error=None, on_finish=None):
super().__init__(it, wrapper)
self._on_error = on_error
+ self._on_finish = on_finish
def __enter__(self) -> "SyncResponseContextIterator":
return self
@@ -86,6 +88,7 @@ def _next(self):
except StopIteration:
# Normal stream termination is not an error and must not invalidate
# the session.
+ self._call_on_finish()
raise
except BaseException as e:
# BaseException (not Exception) for parity with the async iterator:
@@ -95,8 +98,17 @@ def _next(self):
# SessionBusy.
if self._on_error:
self._on_error(e)
+ self._call_on_finish(e)
raise
+ def _call_on_finish(self, exception=None):
+ if self._on_finish is not None:
+ self._on_finish(exception)
+ self._on_finish = None
+
+ def __del__(self):
+ self._call_on_finish()
+
def __exit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end.
# Errors during the cleanup drain have already been reported to _on_error
@@ -107,6 +119,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
pass
except BaseException:
pass
+ self._call_on_finish()
class QueryClientSettings:
diff --git a/ydb/query/session.py b/ydb/query/session.py
index f2099f8c..a9c1b4a5 100644
--- a/ydb/query/session.py
+++ b/ydb/query/session.py
@@ -18,6 +18,7 @@
from .base import QueryExplainResultFormat
from .. import _apis, issues, _utilities
+from ..opentelemetry.tracing import SpanName, create_ydb_span, set_peer_attributes, span_finish_callback
from ..settings import BaseRequestSettings
from ..connection import _RpcState as RpcState, EndpointKey
from .._grpc.grpcwrapper import common_utils
@@ -30,7 +31,7 @@
from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT, DEFAULT_LONG_STREAM_TIMEOUT
if TYPE_CHECKING:
- from ..driver import Driver as SyncDriver
+ from ..driver import Driver as SyncDriver, DriverConfig
from ..aio.driver import Driver as AsyncDriver
@@ -46,9 +47,30 @@ def wrapper_create_session(
issues._process_response(message.status)
session._session_id = message.session_id
session._node_id = message.node_id
+ session._peer = _resolve_peer(session._driver, message.node_id)
return session
+def _resolve_peer(driver, node_id):
+ """Look up network.peer.* / ydb.node.dc for a node in the driver's endpoint map."""
+ if node_id is None:
+ return None
+ store = getattr(driver, "_store", None)
+ if store is None:
+ return None
+ by_node = getattr(store, "connections_by_node_id", None)
+ if not by_node:
+ return None
+ connection = by_node.get(node_id)
+ if connection is None:
+ return None
+ return (
+ getattr(connection, "peer_address", None),
+ getattr(connection, "peer_port", None),
+ getattr(connection, "peer_location", None),
+ )
+
+
def wrapper_delete_session(
rpc_state: RpcState,
response_pb: _apis.ydb_query.DeleteSessionResponse,
@@ -69,6 +91,7 @@ class BaseQuerySession(abc.ABC, Generic[DriverT]):
# Session data
_session_id: Optional[str] = None
_node_id: Optional[int] = None
+ _peer: Optional[tuple] = None
_closed: bool = False
_invalidated: bool = False
@@ -84,6 +107,10 @@ def __init__(self, driver: DriverT, settings: Optional[base.QueryClientSettings]
self._last_query_stats = None
+ @property
+ def _driver_config(self) -> Optional["DriverConfig"]:
+ return getattr(self._driver, "_driver_config", None)
+
@property
def session_id(self) -> Optional[str]:
return self._session_id
@@ -391,8 +418,10 @@ def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySessio
if self._closed:
raise RuntimeError("Session is already closed.")
- self._create_call(settings=settings)
- self._attach()
+ with create_ydb_span(SpanName.CREATE_SESSION, self._driver_config).attach_context() as span:
+ self._create_call(settings=settings)
+ set_peer_attributes(span, self._peer)
+ self._attach()
return self
@@ -458,20 +487,27 @@ def execute(
"""
self._check_session_ready_to_use()
- stream_it = self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=True,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ SpanName.EXECUTE_QUERY,
+ self._driver_config,
+ node_id=self._node_id,
+ peer=self._peer,
)
+ with span.attach_context(end_on_exit=False):
+ stream_it = self._execute_call(
+ query=query,
+ parameters=parameters,
+ commit_tx=True,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
return base.SyncResponseContextIterator(
stream_it,
lambda resp: base.wrap_execute_query_response(
@@ -481,6 +517,7 @@ def execute(
settings=self._settings,
),
on_error=self._on_execute_stream_error,
+ on_finish=span_finish_callback(span),
)
def explain(
diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py
index fdcefb0b..1d278ac2 100644
--- a/ydb/query/transaction.py
+++ b/ydb/query/transaction.py
@@ -17,6 +17,7 @@
_apis,
issues,
)
+from ..opentelemetry.tracing import SpanName, create_ydb_span, span_finish_callback
from .._grpc.grpcwrapper import ydb_topic as _ydb_topic
from .._grpc.grpcwrapper import ydb_query as _ydb_query
from ..connection import _RpcState as RpcState
@@ -244,6 +245,10 @@ def __init__(self, driver: DriverT, session: "BaseQuerySession", tx_mode: base.B
self._external_error = None
self._last_query_stats = None
+ @property
+ def _driver_config(self):
+ return getattr(self._driver, "_driver_config", None)
+
@property
def session_id(self) -> Optional[str]:
"""
@@ -523,7 +528,13 @@ def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxConte
:return: Transaction object or exception if begin is failed
"""
- self._begin_call(settings)
+ with create_ydb_span(
+ SpanName.BEGIN_TRANSACTION,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ self._begin_call(settings)
return self
@@ -545,13 +556,19 @@ def commit(self, settings: Optional[BaseRequestSettings] = None) -> None:
self._ensure_prev_stream_finished()
- try:
- self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT)
- self._commit_call(settings)
- self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None)
- except BaseException as e: # TODO: probably should be less wide
- self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e)
- raise e
+ with create_ydb_span(
+ SpanName.COMMIT,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ try:
+ self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT)
+ self._commit_call(settings)
+ self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None)
+ except BaseException as e: # TODO: probably should be less wide
+ self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e)
+ raise e
def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
"""Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
@@ -571,13 +588,19 @@ def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
self._ensure_prev_stream_finished()
- try:
- self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK)
- self._rollback_call(settings)
- self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None)
- except BaseException as e: # TODO: probably should be less wide
- self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e)
- raise e
+ with create_ydb_span(
+ SpanName.ROLLBACK,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ try:
+ self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK)
+ self._rollback_call(settings)
+ self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None)
+ except BaseException as e: # TODO: probably should be less wide
+ self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e)
+ raise e
def execute(
self,
@@ -626,20 +649,27 @@ def execute(
"""
self._ensure_prev_stream_finished()
- stream_it = self._execute_call(
- query=query,
- commit_tx=commit_tx,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- parameters=parameters,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ SpanName.EXECUTE_QUERY,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
)
+ with span.attach_context(end_on_exit=False):
+ stream_it = self._execute_call(
+ query=query,
+ commit_tx=commit_tx,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
self._prev_stream = base.SyncResponseContextIterator(
stream_it,
lambda resp: base.wrap_execute_query_response(
@@ -651,5 +681,6 @@ def execute(
settings=self.session._settings,
),
on_error=self.session._on_execute_stream_error,
+ on_finish=span_finish_callback(span),
)
return self._prev_stream
diff --git a/ydb/resolver.py b/ydb/resolver.py
index 5047f4e5..d55de389 100644
--- a/ydb/resolver.py
+++ b/ydb/resolver.py
@@ -54,7 +54,11 @@ def endpoints_with_options(self) -> typing.Generator[typing.Tuple[str, conn_impl
ssl_target_name_override = self.address
endpoint_options = conn_impl.EndpointOptions(
- ssl_target_name_override=ssl_target_name_override, node_id=self.node_id
+ ssl_target_name_override=ssl_target_name_override,
+ node_id=self.node_id,
+ address=self.address,
+ port=self.port,
+ location=self.location,
)
if self.ipv6_addrs or self.ipv4_addrs:
diff --git a/ydb/retries.py b/ydb/retries.py
index c151e3d2..4b7c137f 100644
--- a/ydb/retries.py
+++ b/ydb/retries.py
@@ -7,6 +7,11 @@
from . import issues
from ._errors import check_retriable_error
+from .opentelemetry.tracing import SpanName, create_span as _create_span
+
+
+def _try_span_attrs(backoff_ms: Optional[int]):
+ return {"ydb.retry.backoff_ms": backoff_ms} if backoff_ms is not None else None
class BackoffSettings:
@@ -129,19 +134,18 @@ def retry_operation_impl(
if not retriable_info.is_retriable:
raise
- skip_yield_error_types = [
+ skip_yield_error_types = (
issues.Aborted,
issues.BadSession,
issues.NotFound,
issues.InternalError,
- ]
-
- yield_sleep = True
- for t in skip_yield_error_types:
- if isinstance(e, t):
- yield_sleep = False
+ )
- if yield_sleep:
+ if isinstance(e, skip_yield_error_types):
+ # Skip the inter-attempt sleep but still emit a marker so consumers
+ # advance per-attempt bookkeeping (e.g. ``ydb.Try`` spans get backoff=0).
+ yield YdbRetryOperationSleepOpt(0.0)
+ else:
yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)
except Exception as e:
@@ -159,12 +163,21 @@ def retry_operation_sync(
*args: Any,
**kwargs: Any,
) -> Any:
- opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
- for next_opt in opt_generator:
- if isinstance(next_opt, YdbRetryOperationSleepOpt):
- time.sleep(next_opt.timeout)
- else:
- return next_opt.result
+ backoff_ms: Optional[int] = None
+
+ @functools.wraps(callee)
+ def traced_callee(*a: Any, **kw: Any) -> Any:
+ with _create_span(SpanName.TRY, _try_span_attrs(backoff_ms)):
+ return callee(*a, **kw)
+
+ with _create_span(SpanName.RUN_WITH_RETRY):
+ for next_opt in retry_operation_impl(traced_callee, retry_settings, *args, **kwargs):
+ if isinstance(next_opt, YdbRetryOperationSleepOpt):
+ backoff_ms = int(next_opt.timeout * 1000)
+ if next_opt.timeout > 0:
+ time.sleep(next_opt.timeout)
+ else:
+ return next_opt.result
return None
@@ -186,15 +199,20 @@ async def retry_operation_async( # pylint: disable=W1113
Returns awaitable result of coroutine. If retries are not succussful exception is raised.
"""
- opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
- for next_opt in opt_generator:
- if isinstance(next_opt, YdbRetryOperationSleepOpt):
- await asyncio.sleep(next_opt.timeout)
- else:
- try:
- return await next_opt.result
- except BaseException as e: # pylint: disable=W0703
- next_opt.set_exception(e)
+ backoff_ms: Optional[int] = None
+ with _create_span(SpanName.RUN_WITH_RETRY):
+ for next_opt in retry_operation_impl(callee, retry_settings, *args, **kwargs):
+ if isinstance(next_opt, YdbRetryOperationSleepOpt):
+ backoff_ms = int(next_opt.timeout * 1000)
+ if next_opt.timeout > 0:
+ await asyncio.sleep(next_opt.timeout)
+ else:
+ with _create_span(SpanName.TRY, _try_span_attrs(backoff_ms)) as try_span:
+ try:
+ return await next_opt.result
+ except BaseException as e: # pylint: disable=W0703
+ try_span.set_error(e)
+ next_opt.set_exception(e)
return None
diff --git a/ydb/table_test.py b/ydb/table_test.py
index b365fda1..03973ef5 100644
--- a/ydb/table_test.py
+++ b/ydb/table_test.py
@@ -80,7 +80,10 @@ def check_retriable_error(err_type, backoff):
YdbRetryOperationSleepOpt(backoff.calc_timeout(1)),
] == yields
else:
- assert [] == yields
+ # Skip-yield error types (Aborted/BadSession/NotFound/InternalError): impl emits
+ # SleepOpt(0.0) markers so consumers can rotate per-attempt bookkeeping
+ # (e.g. ``ydb.Try`` spans get backoff_ms=0).
+ assert [YdbRetryOperationSleepOpt(0.0), YdbRetryOperationSleepOpt(0.0)] == yields
assert exc == err_type("test2")