diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index 4bdaea55e..eae76ec76 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -588,7 +588,11 @@ func (p *Plugin) reportESErrors(data []byte) error { for _, node := range items { indexNode := node.Dig("index") if indexNode == nil { - p.logger.Error("unknown elasticsearch response, 'index' field in the response is empty", + // When batch_op_type is "create", ES returns "create": {...} instead of "index": {...} + indexNode = node.Dig("create") + } + if indexNode == nil { + p.logger.Error("unknown elasticsearch response, neither 'index' nor 'create' field found in the response", zap.String("response", node.EncodeToString()), ) continue diff --git a/plugin/output/elasticsearch/elasticsearch_test.go b/plugin/output/elasticsearch/elasticsearch_test.go index b3f6f30c8..ad27f5c3b 100644 --- a/plugin/output/elasticsearch/elasticsearch_test.go +++ b/plugin/output/elasticsearch/elasticsearch_test.go @@ -80,6 +80,52 @@ func TestAppendEventWithCreateOpType(t *testing.T) { assert.Equal(t, expected, string(result), "wrong request content") } +func TestReportESErrorsWithCreateResponseType(t *testing.T) { + p := &Plugin{} + config := &Config{ + Endpoints: []string{"test"}, + IndexFormat: "test-%", + BatchSize: "1", + } + test.NewConfig(config, map[string]int{"gomaxprocs": 1}) + p.Start(config, test.NewEmptyOutputPluginParams()) + + // Response with "create" instead of "index" (batch_op_type: create) + response := []byte(`{ + "errors": true, + "items": [ + {"create": {"_index": ".ds-test-000001", "_id": "1", "status": 201, "result": "created"}}, + {"create": {"_index": ".ds-test-000001", "_id": "2", "status": 201, "result": "created"}} + ] + }`) + + err := p.reportESErrors(response) + assert.NoError(t, err) +} + +func TestReportESErrorsWithMixedResponses(t *testing.T) { + p := &Plugin{} + config := &Config{ + Endpoints: []string{"test"}, + IndexFormat: "test-%", + BatchSize: "1", + } + test.NewConfig(config, map[string]int{"gomaxprocs": 1}) + p.Start(config, test.NewEmptyOutputPluginParams()) + + // Response with mixed "index" and "create" items + response := []byte(`{ + "errors": true, + "items": [ + {"index": {"_index": "test-000001", "_id": "1", "status": 200}}, + {"create": {"_index": ".ds-test-000001", "_id": "2", "status": 201, "result": "created"}} + ] + }`) + + err := p.reportESErrors(response) + assert.NoError(t, err) +} + func TestPrepareEndpoints(t *testing.T) { testCases := []struct { in []string