From 0964234ffac0baf79f0dc6925fde718ca2cb7dbc Mon Sep 17 00:00:00 2001 From: Vitalymt Date: Thu, 7 May 2026 11:40:37 +0000 Subject: [PATCH] fix: parse 'create' responses in elasticsearch bulk API When batch_op_type is set to 'create' (required for ES/OpenSearch data streams), the bulk API returns responses with a 'create' key instead of 'index'. The plugin was only checking for the 'index' key, causing it to report errors and mark events as failed even though documents were successfully created. Now the plugin checks for both 'index' and 'create' keys in the response items, falling back to error only when neither is present. Fixes #898 --- plugin/output/elasticsearch/elasticsearch.go | 6 ++- .../elasticsearch/elasticsearch_test.go | 46 +++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index 4bdaea55e..eae76ec76 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -588,7 +588,11 @@ func (p *Plugin) reportESErrors(data []byte) error { for _, node := range items { indexNode := node.Dig("index") if indexNode == nil { - p.logger.Error("unknown elasticsearch response, 'index' field in the response is empty", + // When batch_op_type is "create", ES returns "create": {...} instead of "index": {...} + indexNode = node.Dig("create") + } + if indexNode == nil { + p.logger.Error("unknown elasticsearch response, neither 'index' nor 'create' field found in the response", zap.String("response", node.EncodeToString()), ) continue diff --git a/plugin/output/elasticsearch/elasticsearch_test.go b/plugin/output/elasticsearch/elasticsearch_test.go index b3f6f30c8..ad27f5c3b 100644 --- a/plugin/output/elasticsearch/elasticsearch_test.go +++ b/plugin/output/elasticsearch/elasticsearch_test.go @@ -80,6 +80,52 @@ func TestAppendEventWithCreateOpType(t *testing.T) { assert.Equal(t, expected, string(result), "wrong request content") } +func TestReportESErrorsWithCreateResponseType(t *testing.T) { + p := &Plugin{} + config := &Config{ + Endpoints: []string{"test"}, + IndexFormat: "test-%", + BatchSize: "1", + } + test.NewConfig(config, map[string]int{"gomaxprocs": 1}) + p.Start(config, test.NewEmptyOutputPluginParams()) + + // Response with "create" instead of "index" (batch_op_type: create) + response := []byte(`{ + "errors": true, + "items": [ + {"create": {"_index": ".ds-test-000001", "_id": "1", "status": 201, "result": "created"}}, + {"create": {"_index": ".ds-test-000001", "_id": "2", "status": 201, "result": "created"}} + ] + }`) + + err := p.reportESErrors(response) + assert.NoError(t, err) +} + +func TestReportESErrorsWithMixedResponses(t *testing.T) { + p := &Plugin{} + config := &Config{ + Endpoints: []string{"test"}, + IndexFormat: "test-%", + BatchSize: "1", + } + test.NewConfig(config, map[string]int{"gomaxprocs": 1}) + p.Start(config, test.NewEmptyOutputPluginParams()) + + // Response with mixed "index" and "create" items + response := []byte(`{ + "errors": true, + "items": [ + {"index": {"_index": "test-000001", "_id": "1", "status": 200}}, + {"create": {"_index": ".ds-test-000001", "_id": "2", "status": 201, "result": "created"}} + ] + }`) + + err := p.reportESErrors(response) + assert.NoError(t, err) +} + func TestPrepareEndpoints(t *testing.T) { testCases := []struct { in []string