Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,14 @@ All settings use the `PYLON_` prefix. Example: `PYLON_BITTENSOR_NETWORK=finney`
- By default, data is cached for all subnets configured in identities

- **Monitoring**:
- `sentry_dsn`: Sentry DSN for error tracking (optional)
- `sentry_environment`: Environment name for Sentry (default: "development")
- `environment` (in `pylon_commons/settings.py`, env `PYLON_ENVIRONMENT`): Deployment environment name; single source of truth for both Sentry and OTEL (default: "production")
- Sentry (`SentrySettings` in `pylon_service/settings.py`, env prefix `PYLON_SENTRY_`):
- `dsn` (`PYLON_SENTRY_DSN`): Sentry DSN for error tracking; empty disables Sentry (default: "")
- `environment` (`PYLON_SENTRY_ENVIRONMENT`): Sentry environment name; falls back to `PYLON_ENVIRONMENT` when left unset
- OpenTelemetry traces (`OtelSettings` in `pylon_service/settings.py`, env prefix `PYLON_OTEL_`):
- `collector_endpoint` (`PYLON_OTEL_COLLECTOR_ENDPOINT`): Base URL of an OTLP collector; empty disables traces (default: "")
- `deployment_environment` (`PYLON_OTEL_DEPLOYMENT_ENVIRONMENT`): OTEL `deployment.environment.name` attribute; falls back to `PYLON_ENVIRONMENT` when left unset
- `service_version` (`PYLON_OTEL_SERVICE_VERSION`): OTEL `service.version` attribute; injected automatically at Docker build time (default: "")

- **Development**:
- `docker_image_name`: Docker image name (default: "bittensor_pylon")
Expand Down
3 changes: 3 additions & 0 deletions pylon_commons/pylon_commons/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class Settings(BaseSettings):
max_request_timeout_seconds: float = 300.0

# evm
# WARNING: these URLs are recorded as-is in telemetry (OpenTelemetry spans, debug logs, and the
# Prometheus `rpc_url` metric label), so do NOT embed credentials in them (no `user:pass@` and no
# path/query API keys like `/v2/<key>`). If a provider requires authentication, pass it out of band.
evm_rpc_url: evm_types.RpcUrl = evm_types.RpcUrl("https://lite.chain.opentensor.ai")
evm_archive_rpc_url: evm_types.RpcUrl = evm_types.RpcUrl("https://archive.chain.opentensor.ai")
evm_archive_blocks_cutoff: ArchiveBlocksCutoff = ArchiveBlocksCutoff(300)
Expand Down
35 changes: 35 additions & 0 deletions pylon_service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,41 @@ That means:
This keeps one execution path for public behavior.


## Observability

The service ships optional, disabled-by-default integrations for error tracking and tracing. Both are enabled purely
by setting an endpoint/DSN — no separate feature flag.

### Sentry

- Enabled iff `PYLON_SENTRY_DSN` is set. Reports errors through the Litestar and asyncio integrations.

### OpenTelemetry traces

- Enabled iff `PYLON_OTEL_COLLECTOR_ENDPOINT` is set to the base URL of an OTLP collector (e.g. `http://alloy:4318`).
Traces are exported via OTLP HTTP/protobuf to `<endpoint>/v1/traces`.
- When enabled, auto-instrumentation covers: the Litestar HTTP server (incoming requests), `httpx` and `aiohttp`
(outgoing HTTP — chain RPC over `httpx`, web3/EVM RPC over `aiohttp`), and `SQLAlchemy` (database). The active span's
`trace_id` / `span_id` are injected into structured logs for log↔trace correlation.
- Outgoing HTTP URLs are recorded on spans verbatim, so do **not** embed credentials in the configured RPC URLs
(`PYLON_EVM_RPC_URL`, `PYLON_EVM_ARCHIVE_RPC_URL`, or an `http(s)://` `PYLON_BITTENSOR_NETWORK`) — see the warning in
`pylon_commons/settings.py`. The same URLs also appear in debug logs and the Prometheus `rpc_url` metric label.
- **Not traced:** the default Bittensor chain transport in `turbobt` is websockets, for which no OpenTelemetry
instrumentation exists — so chain RPC calls are not auto-traced. They are covered only when
`PYLON_BITTENSOR_NETWORK` points at an `http(s)://` URI, where `turbobt` falls back to `httpx`.
- The service does not ship a collector. Running and configuring Alloy (or any OTLP collector) at the configured
endpoint, including any tail-sampling or endpoint filtering, is the deployer's responsibility.
- **Long-lived on-chain submission spans:** background submission tasks (`apply_weights`, `set_commitment`,
`set_revealed_commitment`) emit one short, self-contained span per retry attempt, each with its own `trace_id` and
span links back to the originating request and the previous attempt — this keeps traces short across the (up to 200)
retries. A *single* attempt can still take up to 120s while waiting for extrinsic finalization (~12s per block, longer
under congestion). Backends that bound trace lifetime — notably Tempo's `max_trace_live` (default 30s) — will split
such an attempt's trace. If you use Tempo, set `max_trace_live` to at least 180s (a margin above the 120s submission
timeout) and `max_trace_idle` to at least 30s.
- Traces require the service to run as a single uvicorn process; both `--workers` and `WEB_CONCURRENCY` (other than
`1`) are rejected (see `uvicorn_entrypoint.py`) because the SDK is initialised once at import and would not survive
`fork()`.

## Change checklist

Before merging changes in `pylon_service`, verify:
Expand Down
63 changes: 59 additions & 4 deletions pylon_service/pylon_service/api/_unstable/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import asyncio
import logging
from abc import ABC, abstractmethod
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any, ClassVar, TypeVar

from opentelemetry import trace
from opentelemetry.context import Context
from opentelemetry.trace import Link, SpanContext
from opentelemetry.util.types import AttributeValue
from prometheus_client import Histogram
from pylon_commons.models import Block, CommitReveal
from pylon_commons.types import (
Expand Down Expand Up @@ -44,8 +50,10 @@
track_operation,
)
from pylon_service.settings import settings
from pylon_service.tracing import TraceLinkType, get_current_valid_span_context

logger = logging.getLogger(__name__)
_tracer = trace.get_tracer(__name__)


class StopRetrying(Exception):
Expand Down Expand Up @@ -79,10 +87,14 @@ def __init_subclass__(

def __init__(self) -> None:
self._running_task: asyncio.Task[ReturnT] | None = None
self._request_span_context: SpanContext | None = None
self._previous_attempt_context: SpanContext | None = None

async def schedule(self) -> asyncio.Task[ReturnT]:
await self._on_task_scheduled()

self._request_span_context = get_current_valid_span_context()

self._running_task = asyncio.create_task(self(), name=self.JOB_NAME)
type(self).tasks_running.add(self)
self._running_task.add_done_callback(self._on_task_done)
Expand All @@ -91,15 +103,46 @@ async def schedule(self) -> asyncio.Task[ReturnT]:
async def __call__(self) -> ReturnT:
return await self._submit_with_retries()

@contextmanager
def _attempt_span(self, attempt_number: int) -> Iterator[None]:
links: list[Link] = []
if self._request_span_context is not None:
links.append(
Link(
self._request_span_context,
attributes={TraceLinkType.ATTRIBUTE_KEY: TraceLinkType.ORIGINATING_REQUEST},
)
)
if self._previous_attempt_context is not None:
links.append(
Link(
self._previous_attempt_context,
attributes={TraceLinkType.ATTRIBUTE_KEY: TraceLinkType.PREVIOUS_ATTEMPT},
)
)
with _tracer.start_as_current_span(
f"{self.JOB_NAME}.attempt",
context=Context(),
links=links,
attributes={
"attempt_number": attempt_number,
**self._attempt_span_attributes(),
},
):
self._previous_attempt_context = get_current_valid_span_context()
yield

async def _submit_with_retries(self) -> ReturnT:
prepared = False
self._previous_attempt_context = None

async def attempt() -> ReturnT:
nonlocal prepared
if not prepared:
await self._prepare()
prepared = True
return await self._single_attempt()
with self._attempt_span(retrying.statistics["attempt_number"]):
if not prepared:
await self._prepare()
prepared = True
return await self._single_attempt()

retrying = AsyncRetrying(
stop=stop_after_attempt(self._retry_attempts + 1),
Expand Down Expand Up @@ -150,6 +193,12 @@ def _on_task_done(self, task: asyncio.Task[ReturnT]) -> None:
else:
logger.info("Task %s (%s) finished successfully.", task, self.JOB_NAME)

def _attempt_span_attributes(self) -> dict[str, AttributeValue]:
"""
Per-task attributes attached to each attempt span; overridden by subclasses that have them.
"""
return {}

async def _on_task_scheduled(self) -> None:
pass

Expand Down Expand Up @@ -214,6 +263,12 @@ def _retry_attempts(self) -> int:
def _retry_delay_seconds(self) -> int:
return settings.weights_retry_delay_seconds

def _attempt_span_attributes(self) -> dict[str, AttributeValue]:
"""
Attach netuid and hotkey to each attempt span for trace filtering.
"""
return {"netuid": self._netuid, "hotkey": self._hotkey}

async def _on_task_scheduled(self) -> None:
if self._is_rescheduled:
return
Expand Down
6 changes: 6 additions & 0 deletions pylon_service/pylon_service/envs/test_env.template
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ PYLON_OTEL_SERVICE_VERSION=
# To override, uncomment the line below and set a non-empty value (an empty value would disable the fallback).
# PYLON_OTEL_DEPLOYMENT_ENVIRONMENT=

# OTLP traces exporter endpoint - base URL of the collector, e.g. http://alloy:4318.
# Leave empty to disable OpenTelemetry traces (disabled by default). When set, traces are exported via
# OTLP HTTP/protobuf to <endpoint>/v1/traces, and the Litestar HTTP server plus the httpx, aiohttp
# and SQLAlchemy libraries are auto-instrumented. You must run your own OTLP collector at this address.
PYLON_OTEL_COLLECTOR_ENDPOINT=

# Sentry Configuration
# DSN address for pylon to use - leave empty to disable sentry integration.
PYLON_SENTRY_DSN=
Expand Down
28 changes: 25 additions & 3 deletions pylon_service/pylon_service/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from pylon_service.middleware.request_id import current_request_id
from pylon_service.settings import otel_settings, settings
from pylon_service.tracing import get_current_valid_span_context

if TYPE_CHECKING:
from structlog.typing import EventDict, WrappedLogger
Expand Down Expand Up @@ -52,6 +53,19 @@ def add_request_id_to_structlog(
return event_dict


def add_otel_context_to_structlog(
logger: WrappedLogger,
method_name: str,
event_dict: EventDict,
) -> EventDict:
"""Structlog processor injecting the active span's trace_id and span_id into the log event."""
ctx = get_current_valid_span_context()
if ctx is not None:
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
return event_dict


def add_coro_name_to_structlog(
logger: WrappedLogger,
method_name: str,
Expand All @@ -75,6 +89,9 @@ def add_coro_name_to_structlog(
# coro_name and OTEL attributes are layered on top per formatter: the raw formatter omits coro_name to stay
# recursion-safe (see _get_current_coroutine_name), and OTEL resource attributes are only injected into the
# json (production) output to keep dev console clean.
# The OTEL trace context (trace_id/span_id) is injected into every formatter, including the dev console,
# for log<->trace correlation; it reads only OTEL contextvars (no env lookup, no logging) so it stays
# recursion-safe, and adds nothing when tracing is disabled (the active span context is then invalid).
_RAW_FOREIGN_PRE_CHAIN = (
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_logger_name,
Expand All @@ -85,9 +102,14 @@ def add_coro_name_to_structlog(

# raw json output (production) carries OTEL resource attributes for parity with the json formatter, but
# still omits coro_name to stay recursion-safe.
_RAW_JSON_FOREIGN_PRE_CHAIN = (*_RAW_FOREIGN_PRE_CHAIN, add_otel_resource_to_structlog)
_CONSOLE_FOREIGN_PRE_CHAIN = (*_RAW_FOREIGN_PRE_CHAIN, add_coro_name_to_structlog)
_JSON_FOREIGN_PRE_CHAIN = (*_RAW_FOREIGN_PRE_CHAIN, add_coro_name_to_structlog, add_otel_resource_to_structlog)
_RAW_JSON_FOREIGN_PRE_CHAIN = (*_RAW_FOREIGN_PRE_CHAIN, add_otel_resource_to_structlog, add_otel_context_to_structlog)
_CONSOLE_FOREIGN_PRE_CHAIN = (*_RAW_FOREIGN_PRE_CHAIN, add_coro_name_to_structlog, add_otel_context_to_structlog)
_JSON_FOREIGN_PRE_CHAIN = (
*_RAW_FOREIGN_PRE_CHAIN,
add_coro_name_to_structlog,
add_otel_resource_to_structlog,
add_otel_context_to_structlog,
)

_JSON_RENDER_PROCESSORS = [
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
Expand Down
12 changes: 10 additions & 2 deletions pylon_service/pylon_service/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from litestar import Litestar
from litestar.contrib.opentelemetry import OpenTelemetryConfig, OpenTelemetryPlugin
from litestar.di import Provide
from litestar.openapi.config import OpenAPIConfig
from litestar.plugins import PluginProtocol
from litestar.plugins.prometheus import PrometheusConfig

from pylon_service import dependencies, lifecycle
Expand All @@ -11,10 +13,11 @@
from pylon_service.logging import litestar_logging_config
from pylon_service.middleware.request_id import RequestIdMiddleware
from pylon_service.middleware.request_timeout import RequestTimeoutMiddleware
from pylon_service.otel_config import init_otel
from pylon_service.prometheus_controller import AuthenticatedPrometheusController
from pylon_service.schema import PylonSchemaPlugin
from pylon_service.sentry_config import init_sentry
from pylon_service.settings import response_cache_config, settings
from pylon_service.settings import otel_settings, response_cache_config, settings
from pylon_service.stores import stores


Expand All @@ -27,6 +30,10 @@ def create_app() -> Litestar:
group_path=True, # Group metrics by path template to avoid cardinality explosion
)

plugins: list[PluginProtocol] = [PylonSchemaPlugin()]
if otel_settings.traces_enabled:
plugins.append(OpenTelemetryPlugin(OpenTelemetryConfig()))

return Litestar(
route_handlers=[
v1_router,
Expand All @@ -49,7 +56,7 @@ def create_app() -> Litestar:
lifecycle.reschedule_weight_tasks_on_startup,
],
dependencies={"bt_contact_pool": Provide(dependencies.bt_contact_pool_dep, use_cache=True)},
plugins=[PylonSchemaPlugin()],
plugins=plugins,
exception_handlers={ArchiveFallbackException: archive_fallback_handler},
stores=stores,
response_cache_config=response_cache_config,
Expand All @@ -58,5 +65,6 @@ def create_app() -> Litestar:
)


init_otel()
init_sentry()
app = create_app()
33 changes: 33 additions & 0 deletions pylon_service/pylon_service/otel_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from pylon_service.db.database import engine as db_engine
from pylon_service.settings import otel_settings


def init_otel() -> None:
"""
Initialize OpenTelemetry tracing if a traces endpoint is configured.

Sets up a TracerProvider exporting via OTLP HTTP/protobuf and auto-instruments the httpx,
aiohttp, and SQLAlchemy libraries. A no-op when no endpoint is set.
"""
if not otel_settings.traces_enabled:
return

resource = Resource.create(otel_settings.resource_attributes())
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=f"{otel_settings.normalized_collector_endpoint}/v1/traces"))
)
trace.set_tracer_provider(provider)

HTTPXClientInstrumentor().instrument()
AioHttpClientInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument(engine=db_engine.sync_engine)
28 changes: 23 additions & 5 deletions pylon_service/pylon_service/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from typing import Self

from litestar.config.response_cache import ResponseCacheConfig
from opentelemetry.semconv._incubating.attributes.deployment_attributes import DEPLOYMENT_ENVIRONMENT_NAME
from opentelemetry.semconv.resource import ResourceAttributes
from pydantic import Field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from pylon_commons.settings import ENV_FILE, Settings
Expand Down Expand Up @@ -82,6 +84,7 @@ class OtelSettings(BaseSettings):
deployment_environment: str = Field(default_factory=lambda: settings.environment)
service_instance_id: str = _DEFAULT_SERVICE_INSTANCE_ID
service_version: str = ""
collector_endpoint: str = ""

model_config = SettingsConfigDict(
env_file=ENV_FILE,
Expand All @@ -90,16 +93,31 @@ class OtelSettings(BaseSettings):
extra="ignore",
)

@property
def normalized_collector_endpoint(self) -> str:
"""
Return the collector endpoint with surrounding whitespace and trailing slashes removed,
so signal paths can be appended without producing a double slash.
"""
return self.collector_endpoint.strip().rstrip("/")

@property
def traces_enabled(self) -> bool:
"""
Return whether traces export is enabled (a non-empty endpoint is configured).
"""
return bool(self.normalized_collector_endpoint)

def resource_attributes(self) -> dict[str, str]:
"""Return OTEL resource attributes as dotted-key fields for log injection."""
attrs = {
"service.namespace": self.service_namespace,
"service.name": self.service_name,
"deployment.environment.name": self.deployment_environment,
"service.instance.id": self.service_instance_id,
ResourceAttributes.SERVICE_NAMESPACE: self.service_namespace,
ResourceAttributes.SERVICE_NAME: self.service_name,
DEPLOYMENT_ENVIRONMENT_NAME: self.deployment_environment,
ResourceAttributes.SERVICE_INSTANCE_ID: self.service_instance_id,
}
if self.service_version:
attrs["service.version"] = self.service_version
attrs[ResourceAttributes.SERVICE_VERSION] = self.service_version
return attrs


Expand Down
Loading
Loading