From 3b8273286f2c37076c615f27e9953d9916a1bb99 Mon Sep 17 00:00:00 2001 From: Richard Noble Date: Thu, 26 Nov 2020 16:42:51 +0000 Subject: [PATCH 1/7] Retain the response message in case of an error. --- common/util.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/common/util.go b/common/util.go index 0b3641ba..4bf47e8c 100644 --- a/common/util.go +++ b/common/util.go @@ -2,6 +2,7 @@ package common import ( "fmt" + "io/ioutil" "net/http" "os" "strconv" @@ -72,10 +73,17 @@ func HandleHTTPRequest(message string, headers http.Header, data ConnectorMetada // Make the request resp, err = http.DefaultClient.Do(req) if err != nil { + body := "" + if resp != nil { + defer resp.Body.Close() + b, _ := ioutil.ReadAll(resp.Body) + body = string(b) + } logger.Error("sending function invocation request failed", zap.Error(err), zap.String("http_endpoint", data.HTTPEndpoint), - zap.String("source", data.SourceName)) + zap.String("source", data.SourceName), + zap.String("response", body)) continue } if resp == nil { From beb4ea007fda22fbc9fa708c269b33c4602f8c47 Mon Sep 17 00:00:00 2001 From: Richard Noble Date: Tue, 1 Dec 2020 13:48:02 +0000 Subject: [PATCH 2/7] Adds support for Kafka keys, and handles nil values and keys as expected by Kafka --- kafka-http-connector/main.go | 58 +++++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/kafka-http-connector/main.go b/kafka-http-connector/main.go index d0b64e5d..f83d1ff5 100644 --- a/kafka-http-connector/main.go +++ b/kafka-http-connector/main.go @@ -182,6 +182,28 @@ func getConfig(metadata kafkaMetadata) (*sarama.Config, error) { return config, nil } +func extractControlHeaders(headers []sarama.RecordHeader) ([]byte, bool, []sarama.RecordHeader) { + var ( + key []byte + ) + tombstone := false + var cleaned []sarama.RecordHeader + for _, header := range headers { + if strings.ToLower(string(header.Key)) == "keda-message-key" { + key = header.Value + continue + } + + if strings.ToLower(string(header.Key)) == "keda-message-tombstone" { + tombstone = true + continue + } + cleaned = append(cleaned, header) + } + + return key, tombstone, cleaned +} + // kafkaConnector represents a Sarama consumer group consumer type kafkaConnector struct { ready chan bool @@ -209,7 +231,7 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for message := range claim.Messages() { - conn.logger.Info(fmt.Sprintf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)) + conn.logger.Info(fmt.Sprintf("Message claimed: key = %s, value = %s, timestamp = %v, topic = %s", string(message.Key), string(message.Value), message.Timestamp, message.Topic)) msg := string(message.Value) headers := http.Header{ @@ -220,6 +242,17 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl "KEDA-Source-Name": {conn.connectorData.SourceName}, } + // Add the message key, if it's been set. + if message.Key != nil { + headers.Add("KEDA-Message-Key", string(message.Key)) + } + + // Indicate that this is a tombstone, not a empty message. + // Normally indicative of a deletion request + if message.Value == nil { + headers.Add("KEDA-Message-Tombstone", "true") + } + // Set the headers came from Kafka record for _, h := range message.Headers { headers.Add(string(h.Key), string(h.Value)) @@ -275,12 +308,29 @@ func (conn *kafkaConnector) errorHandler(err error) { } func (conn *kafkaConnector) responseHandler(msg string, headers []sarama.RecordHeader) bool { + + // extract the key and tombstone should they exist. + key, tombstone, headers := extractControlHeaders(headers) + if len(conn.connectorData.ResponseTopic) > 0 { - _, _, err := conn.producer.SendMessage(&sarama.ProducerMessage{ + message := &sarama.ProducerMessage{ Topic: conn.connectorData.ResponseTopic, - Value: sarama.StringEncoder(msg), Headers: headers, - }) + } + + if key != nil { + message.Key = sarama.StringEncoder(key) + } + + if len(msg) > 0 || !tombstone { + message.Value = sarama.StringEncoder(msg) + } + + if tombstone { + conn.logger.Warn("Sending a Tombstone") + } + + _, _, err := conn.producer.SendMessage(message) if err != nil { conn.logger.Warn("failed to publish response body from http request to topic", zap.Error(err), From 4f5e8f0ca3d5fddd6c53ff030f04edca13cae307 Mon Sep 17 00:00:00 2001 From: Richard Noble Date: Tue, 1 Dec 2020 14:01:46 +0000 Subject: [PATCH 3/7] Adds support for Kafka Keys, and handles nil for both keys and values as kafka would expect --- kafka-http-connector/main.go | 58 +++++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/kafka-http-connector/main.go b/kafka-http-connector/main.go index d0b64e5d..f83d1ff5 100644 --- a/kafka-http-connector/main.go +++ b/kafka-http-connector/main.go @@ -182,6 +182,28 @@ func getConfig(metadata kafkaMetadata) (*sarama.Config, error) { return config, nil } +func extractControlHeaders(headers []sarama.RecordHeader) ([]byte, bool, []sarama.RecordHeader) { + var ( + key []byte + ) + tombstone := false + var cleaned []sarama.RecordHeader + for _, header := range headers { + if strings.ToLower(string(header.Key)) == "keda-message-key" { + key = header.Value + continue + } + + if strings.ToLower(string(header.Key)) == "keda-message-tombstone" { + tombstone = true + continue + } + cleaned = append(cleaned, header) + } + + return key, tombstone, cleaned +} + // kafkaConnector represents a Sarama consumer group consumer type kafkaConnector struct { ready chan bool @@ -209,7 +231,7 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for message := range claim.Messages() { - conn.logger.Info(fmt.Sprintf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)) + conn.logger.Info(fmt.Sprintf("Message claimed: key = %s, value = %s, timestamp = %v, topic = %s", string(message.Key), string(message.Value), message.Timestamp, message.Topic)) msg := string(message.Value) headers := http.Header{ @@ -220,6 +242,17 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl "KEDA-Source-Name": {conn.connectorData.SourceName}, } + // Add the message key, if it's been set. + if message.Key != nil { + headers.Add("KEDA-Message-Key", string(message.Key)) + } + + // Indicate that this is a tombstone, not a empty message. + // Normally indicative of a deletion request + if message.Value == nil { + headers.Add("KEDA-Message-Tombstone", "true") + } + // Set the headers came from Kafka record for _, h := range message.Headers { headers.Add(string(h.Key), string(h.Value)) @@ -275,12 +308,29 @@ func (conn *kafkaConnector) errorHandler(err error) { } func (conn *kafkaConnector) responseHandler(msg string, headers []sarama.RecordHeader) bool { + + // extract the key and tombstone should they exist. + key, tombstone, headers := extractControlHeaders(headers) + if len(conn.connectorData.ResponseTopic) > 0 { - _, _, err := conn.producer.SendMessage(&sarama.ProducerMessage{ + message := &sarama.ProducerMessage{ Topic: conn.connectorData.ResponseTopic, - Value: sarama.StringEncoder(msg), Headers: headers, - }) + } + + if key != nil { + message.Key = sarama.StringEncoder(key) + } + + if len(msg) > 0 || !tombstone { + message.Value = sarama.StringEncoder(msg) + } + + if tombstone { + conn.logger.Warn("Sending a Tombstone") + } + + _, _, err := conn.producer.SendMessage(message) if err != nil { conn.logger.Warn("failed to publish response body from http request to topic", zap.Error(err), From 6efff8e7918a8a061535498860797cdb6f024c83 Mon Sep 17 00:00:00 2001 From: Richard Noble Date: Wed, 2 Dec 2020 09:52:11 +0000 Subject: [PATCH 4/7] Change Github Actions to use Keda 1.5.0 --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b4f7511f..62580479 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,7 +39,7 @@ jobs: helm repo add kedacore https://kedacore.github.io/charts helm repo update kubectl create namespace keda - helm install keda kedacore/keda --namespace keda + helm install keda kedacore/keda --namespace keda --version 1.5.0 - name: Create Docker Image for HTTP server run: | cd test/server/ From f7d19633691f34a0d0e8d1e4aae883aa4db19dfd Mon Sep 17 00:00:00 2001 From: Richard Noble Date: Fri, 11 Dec 2020 11:16:00 +0000 Subject: [PATCH 5/7] Kafka keys are now supported in error topic as well as in response topic. This also allows a function to return headers to provide more information about an error, and these will be included in the Kafka message headers --- common/util.go | 2 +- kafka-http-connector/main.go | 44 ++++++++++++++++++++++-------------- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/common/util.go b/common/util.go index 4bf47e8c..93689e42 100644 --- a/common/util.go +++ b/common/util.go @@ -100,7 +100,7 @@ func HandleHTTPRequest(message string, headers http.Header, data ConnectorMetada } if resp.StatusCode < 200 && resp.StatusCode > 300 { - return nil, fmt.Errorf("request returned failure: %v. http_endpoint: %v, source: %v", resp.StatusCode, data.HTTPEndpoint, data.SourceName) + return resp, fmt.Errorf("request returned failure: %v. http_endpoint: %v, source: %v", resp.StatusCode, data.HTTPEndpoint, data.SourceName) } return resp, nil } diff --git a/kafka-http-connector/main.go b/kafka-http-connector/main.go index f83d1ff5..e197ccc5 100644 --- a/kafka-http-connector/main.go +++ b/kafka-http-connector/main.go @@ -260,22 +260,15 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl resp, err := common.HandleHTTPRequest(msg, headers, conn.connectorData, conn.logger) if err != nil { - conn.errorHandler(err) + conn.errorHandler(resp, err) } else { defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - conn.errorHandler(err) + conn.errorHandler(nil, err) } else { // Generate Kafka record headers - var kafkaRecordHeaders []sarama.RecordHeader - - for k, v := range resp.Header { - // One key may have multiple values - for _, v := range v { - kafkaRecordHeaders = append(kafkaRecordHeaders, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)}) - } - } + kafkaRecordHeaders := mapHeaders(resp) if success := conn.responseHandler(string(body), kafkaRecordHeaders); success { session.MarkMessage(message, "") } @@ -285,12 +278,33 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl return nil } -func (conn *kafkaConnector) errorHandler(err error) { +func mapHeaders(resp *http.Response) []sarama.RecordHeader { + var kafkaRecordHeaders []sarama.RecordHeader + + for k, v := range resp.Header { + // One key may have multiple values + for _, v := range v { + kafkaRecordHeaders = append(kafkaRecordHeaders, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)}) + } + } + return kafkaRecordHeaders +} + +func (conn *kafkaConnector) errorHandler(resp *http.Response, err error) { if len(conn.connectorData.ErrorTopic) > 0 { - _, _, e := conn.producer.SendMessage(&sarama.ProducerMessage{ + message := &sarama.ProducerMessage{ Topic: conn.connectorData.ErrorTopic, Value: sarama.StringEncoder(err.Error()), - }) + } + if resp != nil { + kafkaRecordHeaders := mapHeaders(resp) + key, _, headers := extractControlHeaders(kafkaRecordHeaders) + message.Headers = headers + if key != nil { + message.Key = sarama.StringEncoder(key) + } + } + _, _, e := conn.producer.SendMessage(message) if e != nil { conn.logger.Error("failed to publish message to error topic", zap.Error(e), @@ -326,10 +340,6 @@ func (conn *kafkaConnector) responseHandler(msg string, headers []sarama.RecordH message.Value = sarama.StringEncoder(msg) } - if tombstone { - conn.logger.Warn("Sending a Tombstone") - } - _, _, err := conn.producer.SendMessage(message) if err != nil { conn.logger.Warn("failed to publish response body from http request to topic", From 5bd3e3b410525bd32285b49004d91be7c6a41db7 Mon Sep 17 00:00:00 2001 From: Richard Noble Date: Fri, 11 Dec 2020 12:32:32 +0000 Subject: [PATCH 6/7] Revert adding body to error log. This is a decision outside of this change --- common/util.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/common/util.go b/common/util.go index 93689e42..f2df3c6f 100644 --- a/common/util.go +++ b/common/util.go @@ -2,7 +2,6 @@ package common import ( "fmt" - "io/ioutil" "net/http" "os" "strconv" @@ -73,17 +72,10 @@ func HandleHTTPRequest(message string, headers http.Header, data ConnectorMetada // Make the request resp, err = http.DefaultClient.Do(req) if err != nil { - body := "" - if resp != nil { - defer resp.Body.Close() - b, _ := ioutil.ReadAll(resp.Body) - body = string(b) - } logger.Error("sending function invocation request failed", zap.Error(err), zap.String("http_endpoint", data.HTTPEndpoint), - zap.String("source", data.SourceName), - zap.String("response", body)) + zap.String("source", data.SourceName)) continue } if resp == nil { From aa19e4bf2000ab9cac2e310b5457d86d957d4c3a Mon Sep 17 00:00:00 2001 From: Nikhil Sharma Date: Wed, 12 Apr 2023 14:55:35 +0530 Subject: [PATCH 7/7] Lint fix --- kafka-http-connector/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-http-connector/main.go b/kafka-http-connector/main.go index 60924e4a..e7dd839d 100644 --- a/kafka-http-connector/main.go +++ b/kafka-http-connector/main.go @@ -184,7 +184,7 @@ func extractControlHeaders(headers []sarama.RecordHeader) ([]byte, bool, []saram key []byte ) tombstone := false - var cleaned []sarama.RecordHeader + cleaned := make([]sarama.RecordHeader, 0, len(headers)) for _, header := range headers { if strings.ToLower(string(header.Key)) == "keda-message-key" { key = header.Value