diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java index c7bc33402b312..748342d191dd7 100644 --- a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java +++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java @@ -65,6 +65,7 @@ public OpenTelemetryEventReporter() { @Override public void open(MetricConfig metricConfig) { LOG.info("Starting OpenTelemetryEventReporter"); + super.open(metricConfig); final String protocol = Optional.ofNullable( metricConfig.getProperty( diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryEventReporterTest.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryEventReporterTest.java index dc49d9fdf82a2..545b76e6591af 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryEventReporterTest.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryEventReporterTest.java @@ -26,6 +26,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -166,4 +168,79 @@ public void testReportLogRecord() throws Exception { }); }); } + + @Test + public void testServiceNameAndVersionAppliedToResource() { + InspectableEventReporter inspectable = new InspectableEventReporter(); + MetricConfig config = new MetricConfig(); + config.setProperty( + OpenTelemetryReporterOptions.EXPORTER_ENDPOINT.key(), "http://localhost:4317"); + config.setProperty(OpenTelemetryReporterOptions.SERVICE_NAME.key(), "my-flink-job"); + config.setProperty(OpenTelemetryReporterOptions.SERVICE_VERSION.key(), "1.2.3"); + inspectable.open(config); + inspectable.close(); + + Resource resource = inspectable.getResource(); + assertThat(resource.getAttribute(ResourceAttributes.SERVICE_NAME)) + .as(" service.name must be set from config; was null -super.open() not called") + .isEqualTo("my-flink-job"); + assertThat(resource.getAttribute(ResourceAttributes.SERVICE_VERSION)) + .as(" service.version must be set from config; was null -super.open() not called") + .isEqualTo("1.2.3"); + } + + @Test + public void testServiceNameAppearsInExportedResourceAttributes() throws Exception { + MetricConfig config = createMetricConfig(); + + config.setProperty( + OpenTelemetryReporterOptions.SERVICE_NAME.key(), "exported-service-name"); + + reporter.open(config); + try { + reporter.notifyOfAddedEvent( + Event.builder(this.getClass(), "service-name-verification-event") + .setObservedTsMillis(1L) + .build()); + } finally { + reporter.close(); + } + + eventuallyConsumeJson( + (json) -> { + JsonNode resourceAttributes = + json.findPath("resourceLogs") + .findPath("resource") + .findPath("attributes"); + + assertThat(resourceAttributes.isArray()) + .as("resource attributes should be a JSON array") + .isTrue(); + + boolean found = false; + + for (JsonNode attr : resourceAttributes) { + if ("service.name".equals(attr.findPath("key").asText())) { + assertThat(attr.at("/value/stringValue").asText()) + .as("service.name attribute value") + .isEqualTo("exported-service-name"); + found = true; + break; + } + } + + assertThat(found) + .as( + "service.name was absent from exported resource attributes - " + + "super.open() was not called in OpenTelemetryEventReporter.open()") + .isTrue(); + }); + } + + private static class InspectableEventReporter extends OpenTelemetryEventReporter { + + Resource getResource() { + return resource; + } + } }