Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Add CRD-level validation markers (Minimum, MinLength, MinItems, Enum) for ScaledObject, ScaledJob, ScaleTriggers, and TriggerAuthentication API types ([#7533](https://github.com/kedacore/keda/pull/7533))
- **General**: Add `--leader-election-id` flag to allow configuring the leader election Lease name ([#7564](https://github.com/kedacore/keda/issues/7564))
- **General**: Make APIService cert injections optional ([#7559](https://github.com/kedacore/keda/pull/7559))
- **Azure Pipelines Scaler**: Add service principal authentication support with client secret and client certificate ([#4853](https://github.com/kedacore/keda/issues/4853))
- **Elasticsearch Scaler**: Add HTTP status check for Elasticsearch errors ([#7480](https://github.com/kedacore/keda/pull/7480))

### Fixes
Expand Down
113 changes: 93 additions & 20 deletions pkg/scalers/azure_pipelines_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ type azurePipelinesMetadata struct {
OrganizationURL string `keda:"name=organizationURL, order=resolvedEnv;authParams;triggerMetadata"`
OrganizationName string
PersonalAccessToken string `keda:"name=personalAccessToken, order=authParams;resolvedEnv, optional"`
TenantID string `keda:"name=tenantId, order=authParams;triggerMetadata;resolvedEnv, optional"`
ClientID string `keda:"name=clientId, order=authParams;triggerMetadata;resolvedEnv, optional"`
ClientSecret string `keda:"name=clientSecret, order=authParams;resolvedEnv, optional"`
ClientCertificate string `keda:"name=clientCertificate, order=authParams;resolvedEnv, optional"`
ClientCertificatePassword string `keda:"name=clientCertificatePassword, order=authParams;resolvedEnv, optional"`
authContext authContext
Parent string `keda:"name=parent, order=triggerMetadata, optional"`
Demands string `keda:"name=demands, order=triggerMetadata, optional"`
Expand All @@ -153,9 +158,30 @@ type azurePipelinesMetadata struct {
}

type authContext struct {
cred *azidentity.ChainedTokenCredential
pat string
token *azcore.AccessToken
cred azcore.TokenCredential
pat string
token *azcore.AccessToken
authType azurePipelinesAuthType
}

type azurePipelinesAuthType string

const (
azurePipelinesAuthTypePAT azurePipelinesAuthType = "pat"
azurePipelinesAuthTypeServicePrincipal azurePipelinesAuthType = "service-principal"
azurePipelinesAuthTypeWorkloadIdentity azurePipelinesAuthType = "workload-identity"
)

var newAzurePipelinesClientSecretCredential = func(tenantID, clientID, clientSecret string) (azcore.TokenCredential, error) {
return azidentity.NewClientSecretCredential(tenantID, clientID, clientSecret, nil)
}

var newAzurePipelinesClientCertificateCredential = func(tenantID, clientID, clientCertificate, clientCertificatePassword string) (azcore.TokenCredential, error) {
certs, key, err := azidentity.ParseCertificates([]byte(clientCertificate), []byte(clientCertificatePassword))
if err != nil {
return nil, err
}
return azidentity.NewClientCertificateCredential(tenantID, clientID, certs, key, nil)
}

func (a *azurePipelinesMetadata) Validate() error {
Expand Down Expand Up @@ -191,20 +217,61 @@ func NewAzurePipelinesScaler(ctx context.Context, config *scalersconfig.ScalerCo
}, nil
}

func getAuthMethod(logger logr.Logger, config *scalersconfig.ScalerConfig, meta *azurePipelinesMetadata) (*azidentity.ChainedTokenCredential, kedav1alpha1.AuthPodIdentity, error) {
func getAuthMethod(logger logr.Logger, config *scalersconfig.ScalerConfig, meta *azurePipelinesMetadata) (azcore.TokenCredential, kedav1alpha1.AuthPodIdentity, error) {
if meta.PersonalAccessToken != "" {
meta.authContext.pat = strings.TrimSuffix(meta.PersonalAccessToken, "\n")
meta.authContext.authType = azurePipelinesAuthTypePAT
return nil, kedav1alpha1.AuthPodIdentity{}, nil
}

if meta.TenantID != "" || meta.ClientID != "" || meta.ClientSecret != "" || meta.ClientCertificate != "" || meta.ClientCertificatePassword != "" {
missing := make([]string, 0, 3)
if meta.TenantID == "" {
missing = append(missing, "tenantId")
}
if meta.ClientID == "" {
missing = append(missing, "clientId")
}
usingClientSecret := meta.ClientSecret != ""
usingClientCertificate := meta.ClientCertificate != ""
if usingClientSecret && usingClientCertificate {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("clientSecret and clientCertificate are mutually exclusive")
}
if !usingClientSecret && !usingClientCertificate {
missing = append(missing, "one of clientSecret or clientCertificate")
}
if meta.ClientCertificatePassword != "" && !usingClientCertificate {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("clientCertificatePassword requires clientCertificate")
}
if len(missing) != 0 {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("incomplete service principal configuration, missing %s", strings.Join(missing, ", "))
}

var (
cred azcore.TokenCredential
err error
)
if usingClientCertificate {
cred, err = newAzurePipelinesClientCertificateCredential(meta.TenantID, meta.ClientID, meta.ClientCertificate, meta.ClientCertificatePassword)
} else {
cred, err = newAzurePipelinesClientSecretCredential(meta.TenantID, meta.ClientID, meta.ClientSecret)
}
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
}
meta.authContext.authType = azurePipelinesAuthTypeServicePrincipal
return cred, kedav1alpha1.AuthPodIdentity{}, nil
}

switch config.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no personalAccessToken given or PodIdentity provider configured")
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no personalAccessToken, service principal credentials, or PodIdentity provider configured")
case kedav1alpha1.PodIdentityProviderAzureWorkload:
cred, err := azure.NewChainedCredential(logger, config.PodIdentity)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
}
meta.authContext.authType = azurePipelinesAuthTypeWorkloadIdentity
return cred, kedav1alpha1.AuthPodIdentity{Provider: config.PodIdentity.Provider}, nil
default:
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure pipelines", config.PodIdentity.Provider)
Expand Down Expand Up @@ -235,13 +302,13 @@ func parseAzurePipelinesMetadata(ctx context.Context, logger logr.Logger, config
meta.authContext.token = nil

if meta.PoolName != "" {
poolID, err := getPoolIDFromName(ctx, logger, meta.PoolName, meta, podIdentity, httpClient)
poolID, err := getPoolIDFromName(ctx, logger, meta.PoolName, meta, httpClient)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
}
meta.PoolID = poolID
} else if meta.PoolID != 0 {
_, err := validatePoolID(ctx, logger, meta.PoolID, meta, podIdentity, httpClient)
_, err := validatePoolID(ctx, logger, meta.PoolID, meta, httpClient)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
}
Expand All @@ -252,9 +319,9 @@ func parseAzurePipelinesMetadata(ctx context.Context, logger logr.Logger, config
return meta, podIdentity, nil
}

func getPoolIDFromName(ctx context.Context, logger logr.Logger, poolName string, metadata *azurePipelinesMetadata, podIdentity kedav1alpha1.AuthPodIdentity, httpClient *http.Client) (int, error) {
func getPoolIDFromName(ctx context.Context, logger logr.Logger, poolName string, metadata *azurePipelinesMetadata, httpClient *http.Client) (int, error) {
urlString := fmt.Sprintf("%s/_apis/distributedtask/pools?poolName=%s", metadata.OrganizationURL, url.QueryEscape(poolName))
body, err := getAzurePipelineRequest(ctx, logger, urlString, metadata, podIdentity, httpClient)
body, err := getAzurePipelineRequest(ctx, logger, urlString, metadata, httpClient)

if err != nil {
return -1, err
Expand All @@ -278,9 +345,9 @@ func getPoolIDFromName(ctx context.Context, logger logr.Logger, poolName string,
return result.Value[0].ID, nil
}

func validatePoolID(ctx context.Context, logger logr.Logger, poolID int, metadata *azurePipelinesMetadata, podIdentity kedav1alpha1.AuthPodIdentity, httpClient *http.Client) (int, error) {
func validatePoolID(ctx context.Context, logger logr.Logger, poolID int, metadata *azurePipelinesMetadata, httpClient *http.Client) (int, error) {
urlString := fmt.Sprintf("%s/_apis/distributedtask/pools?poolID=%d", metadata.OrganizationURL, poolID)
body, err := getAzurePipelineRequest(ctx, logger, urlString, metadata, podIdentity, httpClient)
body, err := getAzurePipelineRequest(ctx, logger, urlString, metadata, httpClient)

if err != nil {
return -1, fmt.Errorf("agent pool with id `%d` not found: %w", poolID, err)
Expand Down Expand Up @@ -317,26 +384,32 @@ func getToken(ctx context.Context, metadata *azurePipelinesMetadata, scope strin
return metadata.authContext.token.Token, nil
}

func getAzurePipelineRequest(ctx context.Context, logger logr.Logger, urlString string, metadata *azurePipelinesMetadata, podIdentity kedav1alpha1.AuthPodIdentity, httpClient *http.Client) ([]byte, error) {
func getAzurePipelineRequest(ctx context.Context, logger logr.Logger, urlString string, metadata *azurePipelinesMetadata, httpClient *http.Client) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", urlString, nil)
if err != nil {
return []byte{}, err
}

switch podIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
//PAT
authType := metadata.authContext.authType

switch authType {
case azurePipelinesAuthTypePAT:
logger.V(1).Info("making request to ADO REST API using PAT")
req.SetBasicAuth("", metadata.authContext.pat)
case kedav1alpha1.PodIdentityProviderAzureWorkload:
//ADO Resource token
logger.V(1).Info("making request to ADO REST API using managed identity")
case azurePipelinesAuthTypeServicePrincipal, azurePipelinesAuthTypeWorkloadIdentity:
authMethod := "service principal"
if authType == azurePipelinesAuthTypeWorkloadIdentity {
authMethod = "workload identity"
}
logger.V(1).Info("making request to ADO REST API using bearer token", "authMethod", authMethod)
aadToken, err := getToken(ctx, metadata, devopsResource)
if err != nil {
return []byte{}, fmt.Errorf("cannot create workload identity credentials: %w", err)
return []byte{}, fmt.Errorf("cannot create %s credentials: %w", authMethod, err)
Comment thread
rickbrouwer marked this conversation as resolved.
Outdated
}
logger.V(1).Info("token acquired setting auth header as 'bearer XXXXXX'")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", aadToken))
default:
return []byte{}, fmt.Errorf("no authentication method configured for azure pipelines")
}

r, err := httpClient.Do(req)
Expand Down Expand Up @@ -390,7 +463,7 @@ func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context)
return -1, err
}

body, err := getAzurePipelineRequest(ctx, s.logger, urlString, s.metadata, s.podIdentity, s.httpClient)
body, err := getAzurePipelineRequest(ctx, s.logger, urlString, s.metadata, s.httpClient)
if err != nil {
return -1, err
}
Expand Down
90 changes: 83 additions & 7 deletions pkg/scalers/azure_pipelines_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/go-logr/logr"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
Expand Down Expand Up @@ -37,10 +39,20 @@ var testAzurePipelinesMetadata = []parseAzurePipelinesMetadataTestData{
{"using triggerAuthentication", map[string]string{"poolID": "1", "targetPipelinesQueueLength": "1"}, false, testAzurePipelinesResolvedEnv, map[string]string{"organizationURL": "https://dev.azure.com/sample", "personalAccessToken": "sample"}},
// using triggerAuthentication with personalAccessToken terminating in newline
{"using triggerAuthentication with personalAccessToken terminating in newline", map[string]string{"poolID": "1", "targetPipelinesQueueLength": "1"}, false, testAzurePipelinesResolvedEnv, map[string]string{"organizationURL": "https://dev.azure.com/sample", "personalAccessToken": "sample\n"}},
// using service principal authentication
{"using service principal authentication", map[string]string{"poolID": "1", "targetPipelinesQueueLength": "1"}, false, testAzurePipelinesResolvedEnv, map[string]string{"organizationURL": "https://dev.azure.com/sample", "tenantId": "tenant", "clientId": "client", "clientSecret": "secret"}},
// using service principal certificate authentication
{"using service principal certificate authentication", map[string]string{"poolID": "1", "targetPipelinesQueueLength": "1"}, false, testAzurePipelinesResolvedEnv, map[string]string{"organizationURL": "https://dev.azure.com/sample", "tenantId": "tenant", "clientId": "client", "clientCertificate": "cert-data", "clientCertificatePassword": "cert-password"}},
// missing organizationURL
{"missing organizationURL", map[string]string{"organizationURLFromEnv": "", "personalAccessTokenFromEnv": "sample", "poolID": "1", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{}},
// missing personalAccessToken
{"missing personalAccessToken", map[string]string{"organizationURLFromEnv": "AZP_URL", "poolID": "1", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{}},
// missing all auth methods
{"missing all auth methods", map[string]string{"organizationURLFromEnv": "AZP_URL", "poolID": "1", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{}},
// incomplete service principal
{"incomplete service principal", map[string]string{"poolID": "1", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{"organizationURL": "https://dev.azure.com/sample", "tenantId": "tenant", "clientId": "client"}},
// mutually exclusive secret and certificate
{"mutually exclusive service principal auth", map[string]string{"poolID": "1", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{"organizationURL": "https://dev.azure.com/sample", "tenantId": "tenant", "clientId": "client", "clientSecret": "secret", "clientCertificate": "cert-data"}},
// certificate password without certificate
{"certificate password without certificate", map[string]string{"poolID": "1", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{"organizationURL": "https://dev.azure.com/sample", "tenantId": "tenant", "clientId": "client", "clientCertificatePassword": "cert-password"}},
// missing poolID
{"missing poolID", map[string]string{"organizationURLFromEnv": "AZP_URL", "personalAccessTokenFromEnv": "AZP_TOKEN", "poolID": "", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{}},
// activationTargetPipelinesQueueLength malformed
Expand All @@ -64,13 +76,25 @@ func TestParseAzurePipelinesMetadata(t *testing.T) {
for _, testData := range testAzurePipelinesMetadata {
t.Run(testData.testName, func(t *testing.T) {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
personalAccessToken := strings.Split(r.Header["Authorization"][0], " ")[1]
if personalAccessToken != "" && personalAccessToken[len(personalAccessToken)-1:] == "\n" {
authHeader := r.Header.Get("Authorization")
switch {
case strings.HasPrefix(authHeader, "Basic "):
personalAccessToken := strings.Split(authHeader, " ")[1]
if personalAccessToken != "" && personalAccessToken[len(personalAccessToken)-1:] == "\n" {
w.WriteHeader(http.StatusUnauthorized)
return
}
case strings.HasPrefix(authHeader, "Bearer "):
if authHeader != "Bearer fake-token" {
w.WriteHeader(http.StatusUnauthorized)
return
}
default:
w.WriteHeader(http.StatusUnauthorized)
} else {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"count":1,"value":[{"id":1}]}`))
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"count":1,"value":[{"id":1}]}`))
Comment thread
m-amaresh marked this conversation as resolved.
}))

// set urls into local stub only if they are already defined
Expand All @@ -82,6 +106,24 @@ func TestParseAzurePipelinesMetadata(t *testing.T) {
}

logger := logr.Discard()
if testData.authParams["clientSecret"] != "" {
origNewClientSecretCredential := newAzurePipelinesClientSecretCredential
newAzurePipelinesClientSecretCredential = func(_, _, _ string) (azcore.TokenCredential, error) {
return fakeTokenCredential{token: "fake-token"}, nil
}
defer func() {
newAzurePipelinesClientSecretCredential = origNewClientSecretCredential
}()
}
if testData.authParams["clientCertificate"] != "" {
origNewClientCertificateCredential := newAzurePipelinesClientCertificateCredential
newAzurePipelinesClientCertificateCredential = func(_, _, _, _ string) (azcore.TokenCredential, error) {
return fakeTokenCredential{token: "fake-token"}, nil
}
defer func() {
newAzurePipelinesClientCertificateCredential = origNewClientCertificateCredential
}()
}

_, _, err := parseAzurePipelinesMetadata(context.TODO(), logger, &scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams}, http.DefaultClient)
if err != nil && !testData.isError {
Expand All @@ -94,6 +136,14 @@ func TestParseAzurePipelinesMetadata(t *testing.T) {
}
}

type fakeTokenCredential struct {
token string
}

func (f fakeTokenCredential) GetToken(context.Context, policy.TokenRequestOptions) (azcore.AccessToken, error) {
return azcore.AccessToken{Token: f.token}, nil
}

type validateAzurePipelinesPoolTestData struct {
testName string
metadata map[string]string
Expand Down Expand Up @@ -201,6 +251,7 @@ func getMatchedAgentMetaData(url string) *azurePipelinesMetadata {
meta.OrganizationURL = url
meta.Parent = "dotnet60-keda-template"
meta.authContext.pat = "testPAT"
meta.authContext.authType = azurePipelinesAuthTypePAT
meta.PoolID = 1
meta.TargetPipelinesQueueLength = 1

Expand Down Expand Up @@ -505,6 +556,7 @@ func TestAzurePipelinesQueueURLTest(t *testing.T) {
meta.OrganizationName = "testOrg"
meta.OrganizationURL = apiStub.URL
meta.authContext.pat = "testPAT"
meta.authContext.authType = azurePipelinesAuthTypePAT
meta.PoolID = 1
meta.TargetPipelinesQueueLength = 1

Expand All @@ -526,3 +578,27 @@ func TestAzurePipelinesQueueURLTest(t *testing.T) {
})
}
}

func TestAzurePipelinesRequestUsesBearerTokenForServicePrincipal(t *testing.T) {
apiStub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if got := r.Header.Get("Authorization"); got != "Bearer fake-token" {
t.Fatalf("expected bearer token auth, got %q", got)
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"count":1,"value":[{"id":1}]}`))
}))
defer apiStub.Close()

meta := &azurePipelinesMetadata{
OrganizationURL: apiStub.URL,
authContext: authContext{
cred: fakeTokenCredential{token: "fake-token"},
authType: azurePipelinesAuthTypeServicePrincipal,
},
}

_, err := getAzurePipelineRequest(context.Background(), logr.Discard(), apiStub.URL, meta, http.DefaultClient)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
Loading
Loading