diff --git a/reporters/kamon-datadog/src/main/resources/reference.conf b/reporters/kamon-datadog/src/main/resources/reference.conf index c61d2fa5b..7ea8aae04 100644 --- a/reporters/kamon-datadog/src/main/resources/reference.conf +++ b/reporters/kamon-datadog/src/main/resources/reference.conf @@ -55,7 +55,12 @@ kamon { # api { - # API endpoint to which metrics time series data will be posted. + # Version of the API to use to submit metrics. + # More info: https://docs.datadoghq.com/api/latest/metrics/#submit-metrics + version = "v1" + + # Metrics submission API endpoint to which metrics time series data will be posted. + # More info: https://docs.datadoghq.com/api/latest/metrics/#submit-metrics api-url = "https://app.datadoghq.com/api/v1/series" # Datadog API key to use to send metrics to Datadog directly over HTTPS. The API key will be combined with the diff --git a/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogAPIReporter.scala b/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogAPIReporter.scala index 598ed6cd8..4254e5a22 100644 --- a/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogAPIReporter.scala +++ b/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogAPIReporter.scala @@ -16,7 +16,6 @@ package kamon.datadog -import java.lang.StringBuilder import java.nio.charset.StandardCharsets import java.text.{DecimalFormat, DecimalFormatSymbols} import java.time.Duration @@ -26,25 +25,23 @@ import kamon.metric.MeasurementUnit.Dimension.{Information, Time} import kamon.metric.{MeasurementUnit, MetricSnapshot, PeriodSnapshot} import kamon.tag.{Tag, TagSet} import kamon.util.{EnvironmentTags, Filter} -import kamon.{Kamon, module} +import kamon.Kamon import kamon.datadog.DatadogAPIReporter.Configuration import kamon.module.{MetricReporter, ModuleFactory} import org.slf4j.LoggerFactory import org.slf4j.event.Level + import scala.collection.JavaConverters._ import scala.util.{Failure, Success} class DatadogAPIReporterFactory extends ModuleFactory { override def create(settings: ModuleFactory.Settings): DatadogAPIReporter = { val config = DatadogAPIReporter.readConfiguration(settings.config) - new DatadogAPIReporter(config, new HttpClient(config.httpConfig, usingAgent = false)) + new DatadogAPIReporter(config) } } -class DatadogAPIReporter( - @volatile private var configuration: Configuration, - @volatile private var httpClient: HttpClient -) extends MetricReporter { +class DatadogAPIReporter(@volatile private var configuration: Configuration) extends MetricReporter { import DatadogAPIReporter._ private val logger = LoggerFactory.getLogger(classOf[DatadogAPIReporter]) @@ -60,13 +57,11 @@ class DatadogAPIReporter( } override def reconfigure(config: Config): Unit = { - val newConfiguration = readConfiguration(config) - configuration = newConfiguration - httpClient = new HttpClient(configuration.httpConfig, usingAgent = false) + configuration = readConfiguration(config) } override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { - httpClient.doPost("application/json; charset=utf-8", buildRequestBody(snapshot)) match { + configuration.httpClient.doPost("application/json; charset=utf-8", buildRequestBody(snapshot)) match { case Failure(e) => logger.logAtLevel(configuration.failureLogLevel, e.getMessage) case Success(response) => @@ -79,7 +74,10 @@ class DatadogAPIReporter( val host = Kamon.environment.host val interval = Math.round(Duration.between(snapshot.from, snapshot.to).toMillis() / 1000d) - val seriesBuilder = new StringBuilder() + + val payloadBuilder = new StringBuilder() + + val apiVersion = configuration.apiVersion @inline def doubleToPercentileString(double: Double) = { @@ -87,25 +85,59 @@ class DatadogAPIReporter( else f"$double%s" } + @inline + def metricTypeJsonPart(metricTypeStr: String) = { + if (apiVersion == "v1") { + s""""type":"$metricTypeStr"""" + } + // v2 requires an enum type in the payload instead of the string name + else { + if (metricTypeStr == countMetricType) { + """"type":1""" + } else if (metricTypeStr == gaugeMetricType) { + """"type":3""" + } else { + // This reporter currently only supports counter and gauges. + // `0` is an undefined metric type in Datadog API. + """"type":0""" + } + } + } + + lazy val hostJsonPart = { + if (apiVersion == "v1") { + s""""host":"$host"""" + } + // v2 has a "resources" array field where "host" should be defined + else { + s""""resources":[{"name":"$host","type":"host"}]""" + } + } + def addDistribution(metric: MetricSnapshot.Distributions): Unit = { val unit = metric.settings.unit metric.instruments.foreach { d => val dist = d.value val average = if (dist.count > 0L) (dist.sum / dist.count) else 0L - addMetric(metric.name + ".avg", valueFormat.format(scale(average, unit)), gauge, d.tags) - addMetric(metric.name + ".count", valueFormat.format(dist.count), count, d.tags) - addMetric(metric.name + ".median", valueFormat.format(scale(dist.percentile(50d).value, unit)), gauge, d.tags) + addMetric(metric.name + ".avg", valueFormat.format(scale(average, unit)), gaugeMetricType, d.tags) + addMetric(metric.name + ".count", valueFormat.format(dist.count), countMetricType, d.tags) + addMetric( + metric.name + ".median", + valueFormat.format(scale(dist.percentile(50d).value, unit)), + gaugeMetricType, + d.tags + ) configuration.percentiles.foreach { p => addMetric( metric.name + s".${doubleToPercentileString(p)}percentile", valueFormat.format(scale(dist.percentile(p).value, unit)), - gauge, + gaugeMetricType, d.tags ) } - addMetric(metric.name + ".max", valueFormat.format(scale(dist.max, unit)), gauge, d.tags) - addMetric(metric.name + ".min", valueFormat.format(scale(dist.min, unit)), gauge, d.tags) + addMetric(metric.name + ".max", valueFormat.format(scale(dist.max, unit)), gaugeMetricType, d.tags) + addMetric(metric.name + ".min", valueFormat.format(scale(dist.min, unit)), gaugeMetricType, d.tags) } } @@ -113,14 +145,21 @@ class DatadogAPIReporter( val customTags = (configuration.extraTags ++ tags.iterator(_.toString).map(p => p.key -> p.value).filter(t => configuration.tagFilter.accept(t._1) )).map { case (k, v) ⇒ quote"$k:$v" } + val allTagsString = customTags.mkString("[", ",", "]") - if (seriesBuilder.length() > 0) seriesBuilder.append(",") + if (payloadBuilder.nonEmpty) payloadBuilder.append(",") - seriesBuilder - .append( - s"""{"metric":"$metricName","interval":$interval,"points":[[$timestamp,$value]],"type":"$metricType","host":"$host","tags":$allTagsString}""" - ) + val point = if (apiVersion == "v1") { + s"[$timestamp,$value]" + } else { + s"""{"timestamp":$timestamp,"value":$value}""" + } + + payloadBuilder + .append(s"""{"metric":"$metricName",${metricTypeJsonPart( + metricType + )},"interval":$interval,"points":[$point],"tags":$allTagsString,$hostJsonPart}""".stripMargin) } snapshot.counters.foreach { snap => @@ -128,7 +167,7 @@ class DatadogAPIReporter( addMetric( snap.name, valueFormat.format(scale(instrument.value, snap.settings.unit)), - count, + countMetricType, instrument.tags ) } @@ -138,7 +177,7 @@ class DatadogAPIReporter( addMetric( snap.name, valueFormat.format(scale(instrument.value, snap.settings.unit)), - gauge, + gaugeMetricType, instrument.tags ) } @@ -146,11 +185,10 @@ class DatadogAPIReporter( (snapshot.histograms ++ snapshot.rangeSamplers ++ snapshot.timers).foreach(addDistribution) - seriesBuilder + payloadBuilder .insert(0, "{\"series\":[") .append("]}") - .toString() - .getBytes(StandardCharsets.UTF_8) + .toString.getBytes(StandardCharsets.UTF_8) } @@ -166,11 +204,42 @@ class DatadogAPIReporter( } private object DatadogAPIReporter { - val count = "count" - val gauge = "gauge" + val countMetricType = "count" + val gaugeMetricType = "gauge" + + private def configureHttpClient(config: Config): (String, HttpClient) = { + val baseClient = buildHttpClient(config) + + val apiVersion = { + val v = config.getString("version") + if (v != "v1" && v != "v2") { + sys.error(s"Invalid Datadog API version, the possible values are [v1, v2].") + } + v + } + + val apiKey = config.getString("api-key") + val apiUrl = { + val url = config.getString("api-url") + + if (apiVersion == "v1") { + url + "?api_key=" + apiKey + } else { + url + } + } + val headers = { + if (apiVersion == "v2") { + List("DD-API-KEY" -> apiKey) + } else List.empty + } + + (apiVersion, baseClient.copy(endpoint = apiUrl, headers = headers)) + } case class Configuration( - httpConfig: Config, + httpClient: HttpClient, + apiVersion: String, percentiles: Set[Double], timeUnit: MeasurementUnit, informationUnit: MeasurementUnit, @@ -185,6 +254,8 @@ private object DatadogAPIReporter { def readConfiguration(config: Config): Configuration = { val datadogConfig = config.getConfig("kamon.datadog") + val httpConfig = datadogConfig.getConfig("api") + val (apiVersion, httpClient) = configureHttpClient(httpConfig) // Remove the "host" tag since it gets added to the datadog payload separately val extraTags = EnvironmentTags @@ -194,7 +265,8 @@ private object DatadogAPIReporter { .map(p => p.key -> Tag.unwrapValue(p).toString) Configuration( - datadogConfig.getConfig("api"), + httpClient = httpClient, + apiVersion = apiVersion, percentiles = datadogConfig.getDoubleList("percentiles").asScala.toList.map(_.toDouble).toSet, timeUnit = readTimeUnit(datadogConfig.getString("time-unit")), informationUnit = readInformationUnit(datadogConfig.getString("information-unit")), diff --git a/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogSpanReporter.scala b/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogSpanReporter.scala index 63578be3f..3cd21a746 100644 --- a/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogSpanReporter.scala +++ b/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogSpanReporter.scala @@ -91,7 +91,7 @@ object DatadogSpanReporter { Configuration( getTranslator(config), - new HttpClient(config.getConfig(DatadogSpanReporter.httpConfigPath), usingAgent = true), + buildHttpClient(config.getConfig(DatadogSpanReporter.httpConfigPath)), Kamon.filter("kamon.datadog.environment-tags.filter"), EnvironmentTags.from(Kamon.environment, config.getConfig("kamon.datadog.environment-tags")).without("service") ) diff --git a/reporters/kamon-datadog/src/main/scala/kamon/datadog/HttpClient.scala b/reporters/kamon-datadog/src/main/scala/kamon/datadog/HttpClient.scala new file mode 100644 index 000000000..a995d547a --- /dev/null +++ b/reporters/kamon-datadog/src/main/scala/kamon/datadog/HttpClient.scala @@ -0,0 +1,95 @@ +package kamon.datadog + +import okhttp3.{MediaType, OkHttpClient, Request, RequestBody, Response} + +import java.nio.charset.StandardCharsets +import java.time.Duration +import java.util.concurrent.TimeUnit +import scala.annotation.tailrec +import scala.util.{Failure, Success, Try} + +private[datadog] case class HttpClient( + endpoint: String, + headers: List[(String, String)], + usingCompression: Boolean, + connectTimeout: Duration, + readTimeout: Duration, + writeTimeout: Duration, + retries: Int, + initRetryDelay: Duration +) { + + private val retryableStatusCodes: Set[Int] = Set(408, 429, 502, 503, 504) + + private lazy val httpClient: OkHttpClient = { + // Apparently okhttp doesn't require explicit closing of the connection + val builder = new OkHttpClient.Builder() + .connectTimeout(connectTimeout.toMillis, TimeUnit.MILLISECONDS) + .readTimeout(readTimeout.toMillis, TimeUnit.MILLISECONDS) + .writeTimeout(writeTimeout.toMillis, TimeUnit.MILLISECONDS) + .retryOnConnectionFailure(true) + + if (usingCompression) builder.addInterceptor(new DeflateInterceptor).build() + else builder.build() + } + + @tailrec + private def doRequestWithRetries(request: Request, attempt: Int = 0): Try[Response] = { + // Try executing the request + val responseAttempt = Try(httpClient.newCall(request).execute()) + + if (attempt >= retries - 1) { + responseAttempt + } else { + responseAttempt match { + // If the request succeeded but with a retryable HTTP status code. + case Success(response) if retryableStatusCodes.contains(response.code) => + response.close() + Thread.sleep(initRetryDelay.toMillis * Math.pow(2, attempt).toLong) + doRequestWithRetries(request, attempt + 1) + + // Either the request succeeded with an HTTP status not included in `retryableStatusCodes` + // or we have an unknown failure + case _ => + responseAttempt + } + } + } + + private def doMethodWithBody(method: String, contentType: String, contentBody: Array[Byte]): Try[String] = { + val body = RequestBody.create(contentBody, MediaType.parse(contentType)) + val request = { + val builder = new Request.Builder().url(endpoint).method(method, body) + headers.foreach { + case (name, value) => builder.header(name, value) + } + builder.build() + } + + doRequestWithRetries(request) match { + case Success(response) => + val responseBody = response.body().string() + response.close() + if (response.isSuccessful) { + Success(responseBody) + } else { + Failure(new Exception( + s"Failed to $method metrics to Datadog with status code [${response.code()}], Body: [$responseBody]" + )) + } + case Failure(f) if f.getCause != null => + Failure(f.getCause) + case f @ Failure(_) => + f.asInstanceOf[Try[String]] + } + } + + def doPost(contentType: String, contentBody: Array[Byte]): Try[String] = { + doMethodWithBody("POST", contentType, contentBody) + } + + def doJsonPut(contentBody: String): Try[String] = { + // Datadog Agent does not accept ";charset=UTF-8", using bytes to send Json posts + doMethodWithBody("PUT", "application/json", contentBody.getBytes(StandardCharsets.UTF_8)) + } +} diff --git a/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala b/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala index 5b2289c7b..505769096 100644 --- a/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala +++ b/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala @@ -16,19 +16,13 @@ package kamon -import java.nio.charset.StandardCharsets -import java.time.{Duration, Instant} -import java.util.concurrent.TimeUnit +import java.time.Instant import com.typesafe.config.Config import kamon.metric.MeasurementUnit import kamon.metric.MeasurementUnit.{information, time} -import okhttp3._ import org.slf4j.Logger import org.slf4j.event.Level -import scala.annotation.tailrec -import scala.util.{Failure, Success, Try} - package object datadog { implicit class InstantImprovements(val instant: Instant) { @@ -55,110 +49,26 @@ package object datadog { } } - private[datadog] case class HttpClient( - apiUrl: String, - apiKey: Option[String], - usingCompression: Boolean, - usingAgent: Boolean, - connectTimeout: Duration, - readTimeout: Duration, - writeTimeout: Duration, - retries: Int, - initRetryDelay: Duration - ) { - - private val httpClient: OkHttpClient = createHttpClient() - private val retryableStatusCodes: Set[Int] = Set(408, 429, 502, 503, 504) - - def this(config: Config, usingAgent: Boolean) = { - this( - config.getString("api-url"), - if (usingAgent) None else Some(config.getString("api-key")), - if (usingAgent) false else config.getBoolean("compression"), - usingAgent, - config.getDuration("connect-timeout"), - config.getDuration("read-timeout"), - config.getDuration("write-timeout"), - config.getInt("retries"), - config.getDuration("init-retry-delay") - ) - } - - @tailrec - private def doRequestWithRetries(request: Request, attempt: Int = 0): Try[Response] = { - // Try executing the request - val responseAttempt = Try(httpClient.newCall(request).execute()) - - if (attempt >= retries - 1) { - responseAttempt - } else { - responseAttempt match { - // If the request succeeded but with a retryable HTTP status code. - case Success(response) if retryableStatusCodes.contains(response.code) => - response.close() - Thread.sleep(initRetryDelay.toMillis * Math.pow(2, attempt).toLong) - doRequestWithRetries(request, attempt + 1) - - // Either the request succeeded with an HTTP status not included in `retryableStatusCodes` - // or we have an unknown failure - case _ => - responseAttempt - } - } - } - - def doMethodWithBody(method: String, contentType: String, contentBody: Array[Byte]): Try[String] = { - val body = RequestBody.create(MediaType.parse(contentType), contentBody) - val url = apiUrl + apiKey.map(key => "?api_key=" + key).getOrElse("") - val request = new Request.Builder().url(url).method(method, body).build - - doRequestWithRetries(request) match { - case Success(response) => - val responseBody = response.body().string() - response.close() - if (response.isSuccessful) { - Success(responseBody) - } else { - Failure(new Exception( - s"Failed to ${method} metrics to Datadog with status code [${response.code()}], Body: [${responseBody}]" - )) - } - case Failure(f) if f.getCause != null => - Failure(f.getCause) - case f @ Failure(_) => - f.asInstanceOf[Try[String]] - } - } - - def doPost(contentType: String, contentBody: Array[Byte]): Try[String] = { - doMethodWithBody("POST", contentType, contentBody) - } - - def doPut(contentType: String, contentBody: Array[Byte]): Try[String] = { - doMethodWithBody("PUT", contentType, contentBody) - } - - def doJsonPost(contentBody: String): Try[String] = { - // Datadog Agent does not accept ";charset=UTF-8", using bytes to send Json posts - doPost("application/json", contentBody.getBytes(StandardCharsets.UTF_8)) - } - - def doJsonPut(contentBody: String): Try[String] = { - // Datadog Agent does not accept ";charset=UTF-8", using bytes to send Json posts - doPut("application/json", contentBody.getBytes(StandardCharsets.UTF_8)) - } - - // Apparently okhttp doesn't require explicit closing of the connection - private def createHttpClient(): OkHttpClient = { - val builder = new OkHttpClient.Builder() - .connectTimeout(connectTimeout.toMillis, TimeUnit.MILLISECONDS) - .readTimeout(readTimeout.toMillis, TimeUnit.MILLISECONDS) - .writeTimeout(writeTimeout.toMillis, TimeUnit.MILLISECONDS) - .retryOnConnectionFailure(true) - - if (usingCompression) builder.addInterceptor(new DeflateInterceptor).build() - else builder.build() - } + def buildHttpClient(config: Config): HttpClient = { + val apiUrl = config.getString("api-url") + + val connectTimeout = config.getDuration("connect-timeout") + val readTimeout = config.getDuration("read-timeout") + val writeTimeout = config.getDuration("write-timeout") + val retries = config.getInt("retries") + val initialRetryDelay = config.getDuration("init-retry-delay") + val useCompression = if (config.hasPath("compression")) config.getBoolean("compression") else false + + HttpClient( + endpoint = apiUrl, + headers = List.empty, + usingCompression = useCompression, + connectTimeout = connectTimeout, + readTimeout = readTimeout, + writeTimeout = writeTimeout, + retries = retries, + initRetryDelay = initialRetryDelay + ) } def readTimeUnit(unit: String): MeasurementUnit = unit match { diff --git a/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala b/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala index 39ffbd20f..66116f2da 100644 --- a/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala +++ b/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala @@ -212,8 +212,47 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec ) } - reporter.stop() + "support v1 and v2 of the submission endpoint" in { + val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK")) + applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"") + applyConfig("kamon.datadog.api.api-key = \"dummy\"") + applyConfig("kamon.datadog.percentiles = []") + applyConfig("kamon.datadog.api.version = \"v2\"") + reporter.reconfigure(Kamon.config()) + reporter.reportPeriodSnapshot(examplePeriod) + val request = server.takeRequest() + val body = request.getBody.readUtf8() + + // v1 differs from v2 on: + // - the "type" field value, v1 requires a string, v2 an integer + // - the "points" array, v1 was an array of arrays, v2 an array of objects + // - the "host" field does not exist, instead there is a "resources" field where "host" is one of the allowed definitions + Json.parse(body) shouldEqual Json + .parse( + """{"series":[{ + |"metric": "test.counter", + |"interval": 1, + |"points": [{"timestamp": 1523394, "value": 0}], + |"type": 1, + |"tags": ["env:staging","service:kamon-application","tag1:value1"], + |"resources": [{"type": "host", "name": "test"}] + |}]}""".stripMargin + ) + + reporter.reportPeriodSnapshot(examplePeriodWithDistributions) + Json.parse(server.takeRequest().getBody.readUtf8()) shouldEqual Json + .parse( + """{"series":[ + |{"metric":"test.timer.avg","interval":1,"points":[{"timestamp":1523394,"value":20}],"type":3,"resources": [{"type": "host", "name": "test"}],"tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.count","interval":1,"points":[{"timestamp":1523394,"value":5}],"type":1,"resources": [{"type": "host", "name": "test"}],"tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.median","interval":1,"points":[{"timestamp":1523394,"value":0}],"type":3,"resources": [{"type": "host", "name": "test"}],"tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.max","interval":1,"points":[{"timestamp":1523394,"value":10}],"type":3,"resources": [{"type": "host", "name": "test"}],"tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.min","interval":1,"points":[{"timestamp":1523394,"value":0}],"type":3,"resources": [{"type": "host", "name": "test"}],"tags":["env:staging","service:kamon-application"]}]}""".stripMargin + ) + } + + reporter.stop() } }