Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Add CRD-level validation markers (Minimum, MinLength, MinItems, Enum) for ScaledObject, ScaledJob, ScaleTriggers, and TriggerAuthentication API types ([#7533](https://github.com/kedacore/keda/pull/7533))
- **General**: Add `--leader-election-id` flag to allow configuring the leader election Lease name ([#7564](https://github.com/kedacore/keda/issues/7564))
- **General**: Make APIService cert injections optional ([#7559](https://github.com/kedacore/keda/pull/7559))
- **Azure Pipelines Scaler**: Add service principal authentication support with client secret and client certificate ([#4853](https://github.com/kedacore/keda/issues/4853))
- **Elasticsearch Scaler**: Add HTTP status check for Elasticsearch errors ([#7480](https://github.com/kedacore/keda/pull/7480))

### Fixes
Expand Down
161 changes: 116 additions & 45 deletions pkg/scalers/azure_pipelines_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,21 @@ type azurePipelinesPoolIDResponse struct {
}

type azurePipelinesScaler struct {
metricType v2.MetricTargetType
metadata *azurePipelinesMetadata
httpClient *http.Client
podIdentity kedav1alpha1.AuthPodIdentity
logger logr.Logger
metricType v2.MetricTargetType
metadata *azurePipelinesMetadata
httpClient *http.Client
logger logr.Logger
}

type azurePipelinesMetadata struct {
OrganizationURL string `keda:"name=organizationURL, order=resolvedEnv;authParams;triggerMetadata"`
OrganizationName string
PersonalAccessToken string `keda:"name=personalAccessToken, order=authParams;resolvedEnv, optional"`
TenantID string `keda:"name=tenantId, order=authParams;triggerMetadata;resolvedEnv, optional"`
ClientID string `keda:"name=clientId, order=authParams;triggerMetadata;resolvedEnv, optional"`
ClientSecret string `keda:"name=clientSecret, order=authParams;resolvedEnv, optional"`
ClientCertificate string `keda:"name=clientCertificate, order=authParams;resolvedEnv, optional"`
ClientCertificatePassword string `keda:"name=clientCertificatePassword, order=authParams;resolvedEnv, optional"`
authContext authContext
Parent string `keda:"name=parent, order=triggerMetadata, optional"`
Demands string `keda:"name=demands, order=triggerMetadata, optional"`
Expand All @@ -153,9 +157,30 @@ type azurePipelinesMetadata struct {
}

type authContext struct {
cred *azidentity.ChainedTokenCredential
pat string
token *azcore.AccessToken
cred azcore.TokenCredential
pat string
token *azcore.AccessToken
authType azurePipelinesAuthType
}

type azurePipelinesAuthType string

const (
azurePipelinesAuthTypePAT azurePipelinesAuthType = "pat"
azurePipelinesAuthTypeServicePrincipal azurePipelinesAuthType = "service-principal"
azurePipelinesAuthTypeWorkloadIdentity azurePipelinesAuthType = "workload-identity"
)

var newAzurePipelinesClientSecretCredential = func(tenantID, clientID, clientSecret string) (azcore.TokenCredential, error) {
return azidentity.NewClientSecretCredential(tenantID, clientID, clientSecret, nil)
}

var newAzurePipelinesClientCertificateCredential = func(tenantID, clientID, clientCertificate, clientCertificatePassword string) (azcore.TokenCredential, error) {
certs, key, err := azidentity.ParseCertificates([]byte(clientCertificate), []byte(clientCertificatePassword))
if err != nil {
return nil, err
}
return azidentity.NewClientCertificateCredential(tenantID, clientID, certs, key, nil)
}

func (a *azurePipelinesMetadata) Validate() error {
Expand All @@ -177,84 +202,124 @@ func NewAzurePipelinesScaler(ctx context.Context, config *scalersconfig.ScalerCo
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

meta, podIdentity, err := parseAzurePipelinesMetadata(ctx, logger, config, httpClient)
meta, err := parseAzurePipelinesMetadata(ctx, logger, config, httpClient)
if err != nil {
return nil, fmt.Errorf("error parsing azure Pipelines metadata: %w", err)
}

return &azurePipelinesScaler{
metricType: metricType,
metadata: meta,
httpClient: httpClient,
podIdentity: podIdentity,
logger: logger,
metricType: metricType,
metadata: meta,
httpClient: httpClient,
logger: logger,
}, nil
}

func getAuthMethod(logger logr.Logger, config *scalersconfig.ScalerConfig, meta *azurePipelinesMetadata) (*azidentity.ChainedTokenCredential, kedav1alpha1.AuthPodIdentity, error) {
func getAuthMethod(logger logr.Logger, config *scalersconfig.ScalerConfig, meta *azurePipelinesMetadata) (azcore.TokenCredential, error) {
if meta.PersonalAccessToken != "" {
meta.authContext.pat = strings.TrimSuffix(meta.PersonalAccessToken, "\n")
return nil, kedav1alpha1.AuthPodIdentity{}, nil
meta.authContext.authType = azurePipelinesAuthTypePAT
return nil, nil
}

if meta.TenantID != "" || meta.ClientID != "" || meta.ClientSecret != "" || meta.ClientCertificate != "" || meta.ClientCertificatePassword != "" {
missing := make([]string, 0, 3)
if meta.TenantID == "" {
missing = append(missing, "tenantId")
}
if meta.ClientID == "" {
missing = append(missing, "clientId")
}
usingClientSecret := meta.ClientSecret != ""
usingClientCertificate := meta.ClientCertificate != ""
if usingClientSecret && usingClientCertificate {
return nil, fmt.Errorf("clientSecret and clientCertificate are mutually exclusive")
}
if !usingClientSecret && !usingClientCertificate {
missing = append(missing, "one of clientSecret or clientCertificate")
}
if meta.ClientCertificatePassword != "" && !usingClientCertificate {
return nil, fmt.Errorf("clientCertificatePassword requires clientCertificate")
}
if len(missing) != 0 {
return nil, fmt.Errorf("incomplete service principal configuration, missing %s", strings.Join(missing, ", "))
}

var (
cred azcore.TokenCredential
err error
)
if usingClientCertificate {
cred, err = newAzurePipelinesClientCertificateCredential(meta.TenantID, meta.ClientID, meta.ClientCertificate, meta.ClientCertificatePassword)
} else {
cred, err = newAzurePipelinesClientSecretCredential(meta.TenantID, meta.ClientID, meta.ClientSecret)
}
if err != nil {
return nil, err
}
meta.authContext.authType = azurePipelinesAuthTypeServicePrincipal
return cred, nil
Copy link
Copy Markdown
Member

@JorTurFer JorTurFer Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, move this code inside podIdentity none, as pod identity takes precedence over PAT

}

switch config.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no personalAccessToken given or PodIdentity provider configured")
return nil, fmt.Errorf("no personalAccessToken, service principal credentials, or PodIdentity provider configured")
case kedav1alpha1.PodIdentityProviderAzureWorkload:
cred, err := azure.NewChainedCredential(logger, config.PodIdentity)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
return nil, err
}
return cred, kedav1alpha1.AuthPodIdentity{Provider: config.PodIdentity.Provider}, nil
meta.authContext.authType = azurePipelinesAuthTypeWorkloadIdentity
return cred, nil
default:
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure pipelines", config.PodIdentity.Provider)
return nil, fmt.Errorf("pod identity %s not supported for azure pipelines", config.PodIdentity.Provider)
}
}

func parseAzurePipelinesMetadata(ctx context.Context, logger logr.Logger, config *scalersconfig.ScalerConfig, httpClient *http.Client) (*azurePipelinesMetadata, kedav1alpha1.AuthPodIdentity, error) {
func parseAzurePipelinesMetadata(ctx context.Context, logger logr.Logger, config *scalersconfig.ScalerConfig, httpClient *http.Client) (*azurePipelinesMetadata, error) {
if config.TriggerMetadata["jobsToFetch"] != "" && config.TriggerMetadata["fetchUnfinishedJobsOnly"] != "" {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("cannot specify both jobsToFetch and fetchUnfinishedJobsOnly at the same time")
return nil, fmt.Errorf("cannot specify both jobsToFetch and fetchUnfinishedJobsOnly at the same time")
}
if config.TriggerMetadata["jobsToFetch"] != "" && config.TriggerMetadata["parent"] != "" {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("cannot specify both jobsToFetch and parent at the same time")
return nil, fmt.Errorf("cannot specify both jobsToFetch and parent at the same time")
}

meta := &azurePipelinesMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing azure pipeline metadata: %w", err)
return nil, fmt.Errorf("error parsing azure pipeline metadata: %w", err)
}

meta.triggerIndex = config.TriggerIndex

cred, podIdentity, err := getAuthMethod(logger, config, meta)
cred, err := getAuthMethod(logger, config, meta)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
return nil, err
}

meta.authContext.cred = cred
meta.authContext.token = nil

if meta.PoolName != "" {
poolID, err := getPoolIDFromName(ctx, logger, meta.PoolName, meta, podIdentity, httpClient)
poolID, err := getPoolIDFromName(ctx, logger, meta.PoolName, meta, httpClient)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
return nil, err
}
meta.PoolID = poolID
} else if meta.PoolID != 0 {
_, err := validatePoolID(ctx, logger, meta.PoolID, meta, podIdentity, httpClient)
_, err := validatePoolID(ctx, logger, meta.PoolID, meta, httpClient)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
return nil, err
}
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no poolName or poolID given")
return nil, fmt.Errorf("no poolName or poolID given")
}

return meta, podIdentity, nil
return meta, nil
}

func getPoolIDFromName(ctx context.Context, logger logr.Logger, poolName string, metadata *azurePipelinesMetadata, podIdentity kedav1alpha1.AuthPodIdentity, httpClient *http.Client) (int, error) {
func getPoolIDFromName(ctx context.Context, logger logr.Logger, poolName string, metadata *azurePipelinesMetadata, httpClient *http.Client) (int, error) {
urlString := fmt.Sprintf("%s/_apis/distributedtask/pools?poolName=%s", metadata.OrganizationURL, url.QueryEscape(poolName))
body, err := getAzurePipelineRequest(ctx, logger, urlString, metadata, podIdentity, httpClient)
body, err := getAzurePipelineRequest(ctx, logger, urlString, metadata, httpClient)

if err != nil {
return -1, err
Expand All @@ -278,9 +343,9 @@ func getPoolIDFromName(ctx context.Context, logger logr.Logger, poolName string,
return result.Value[0].ID, nil
}

func validatePoolID(ctx context.Context, logger logr.Logger, poolID int, metadata *azurePipelinesMetadata, podIdentity kedav1alpha1.AuthPodIdentity, httpClient *http.Client) (int, error) {
func validatePoolID(ctx context.Context, logger logr.Logger, poolID int, metadata *azurePipelinesMetadata, httpClient *http.Client) (int, error) {
urlString := fmt.Sprintf("%s/_apis/distributedtask/pools?poolID=%d", metadata.OrganizationURL, poolID)
body, err := getAzurePipelineRequest(ctx, logger, urlString, metadata, podIdentity, httpClient)
body, err := getAzurePipelineRequest(ctx, logger, urlString, metadata, httpClient)

if err != nil {
return -1, fmt.Errorf("agent pool with id `%d` not found: %w", poolID, err)
Expand Down Expand Up @@ -317,26 +382,32 @@ func getToken(ctx context.Context, metadata *azurePipelinesMetadata, scope strin
return metadata.authContext.token.Token, nil
}

func getAzurePipelineRequest(ctx context.Context, logger logr.Logger, urlString string, metadata *azurePipelinesMetadata, podIdentity kedav1alpha1.AuthPodIdentity, httpClient *http.Client) ([]byte, error) {
func getAzurePipelineRequest(ctx context.Context, logger logr.Logger, urlString string, metadata *azurePipelinesMetadata, httpClient *http.Client) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", urlString, nil)
if err != nil {
return []byte{}, err
}

switch podIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
//PAT
authType := metadata.authContext.authType

switch authType {
case azurePipelinesAuthTypePAT:
logger.V(1).Info("making request to ADO REST API using PAT")
req.SetBasicAuth("", metadata.authContext.pat)
case kedav1alpha1.PodIdentityProviderAzureWorkload:
//ADO Resource token
logger.V(1).Info("making request to ADO REST API using managed identity")
case azurePipelinesAuthTypeServicePrincipal, azurePipelinesAuthTypeWorkloadIdentity:
authMethod := "service principal"
if authType == azurePipelinesAuthTypeWorkloadIdentity {
authMethod = "workload identity"
}
logger.V(1).Info("making request to ADO REST API using bearer token", "authMethod", authMethod)
aadToken, err := getToken(ctx, metadata, devopsResource)
if err != nil {
return []byte{}, fmt.Errorf("cannot create workload identity credentials: %w", err)
return []byte{}, fmt.Errorf("cannot acquire %s token: %w", authMethod, err)
}
logger.V(1).Info("token acquired setting auth header as 'bearer XXXXXX'")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", aadToken))
default:
return []byte{}, fmt.Errorf("no authentication method configured for azure pipelines")
}

r, err := httpClient.Do(req)
Expand Down Expand Up @@ -390,7 +461,7 @@ func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context)
return -1, err
}

body, err := getAzurePipelineRequest(ctx, s.logger, urlString, s.metadata, s.podIdentity, s.httpClient)
body, err := getAzurePipelineRequest(ctx, s.logger, urlString, s.metadata, s.httpClient)
if err != nil {
return -1, err
}
Expand Down
Loading
Loading