diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/canary.go b/pkg/app/pipedv1/plugin/ecs/deployment/canary.go index 77b897e746..1ed45036c1 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/canary.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/canary.go @@ -71,6 +71,11 @@ func (p *ECSPlugin) executeECSCanaryRolloutStage( return sdk.StageStatusFailure } + if isECSControllerType(serviceDef) { + lp.Error("ECS_CANARY_ROLLOUT is not supported with ECS deployment controller type; canary deployments require EXTERNAL deployment controller type") + return sdk.StageStatusFailure + } + var canary *types.LoadBalancer if cfg.Spec.Input.AccessType == "ELB" { _, canary, err = provider.LoadTargetGroups(cfg.Spec.Input.TargetGroups) @@ -129,7 +134,7 @@ func canaryRollout( } lp.Info("Start applying the ECS service definition") - service, err := applyServiceDefinition(ctx, lp, client, serviceDef) + service, _, err := applyServiceDefinition(ctx, lp, client, serviceDef) if err != nil { return nil, fmt.Errorf("failed to apply service definition: %w", err) } diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/controller.go b/pkg/app/pipedv1/plugin/ecs/deployment/controller.go new file mode 100644 index 0000000000..c9c453ba14 --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/deployment/controller.go @@ -0,0 +1,318 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deployment + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/service/ecs/types" + sdk "github.com/pipe-cd/piped-plugin-sdk-go" + + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/provider" +) + +// deploymentController is the strategy interface for deploying ECS services. +// +// Each ECS deployment controller type (EXTERNAL, ECS) has its own implementation. +type deploymentController interface { + // Sync performs a full sync of the ECS service (used by ECS_SYNC stage). + Sync(ctx context.Context, lp sdk.StageLogPersister, client provider.Client, + taskDef types.TaskDefinition, serviceDef types.Service, + primary *types.LoadBalancer, recreate bool) error + + // PrimaryRollout rolls out the new task definition as the primary (used by ECS_PRIMARY_ROLLOUT stage). + PrimaryRollout(ctx context.Context, lp sdk.StageLogPersister, client provider.Client, + taskDef types.TaskDefinition, serviceDef types.Service, + primary *types.LoadBalancer) error + + // Rollback restores the service to the state of the running deployment source (used by ECS_ROLLBACK stage). + Rollback(ctx context.Context, lp sdk.StageLogPersister, client provider.Client, + taskDef types.TaskDefinition, serviceDef types.Service, + primary *types.LoadBalancer) error +} + +// newDeploymentController returns the appropriate deploymentController based on the deployment controller type declared in the service definition. +// +// Defaults to externalController when the type is EXTERNAL or unset. +func newDeploymentController(serviceDef types.Service) deploymentController { + if serviceDef.DeploymentController != nil && + serviceDef.DeploymentController.Type == types.DeploymentControllerTypeEcs { + return &ecsController{} + } + return &externalController{} +} + +// isECSControllerType returns true when the service definition uses the native ECS deployment controller. +func isECSControllerType(serviceDef types.Service) bool { + return serviceDef.DeploymentController != nil && + serviceDef.DeploymentController.Type == types.DeploymentControllerTypeEcs +} + +// externalController implements deploymentController for EXTERNAL deployment controller type. +type externalController struct{} + +func (e *externalController) Sync( + ctx context.Context, + lp sdk.StageLogPersister, + client provider.Client, + taskDef types.TaskDefinition, + serviceDef types.Service, + primary *types.LoadBalancer, + recreate bool, +) error { + lp.Info("Start applying the ECS task definition") + td, err := applyTaskDefinition(ctx, client, taskDef) + if err != nil { + lp.Errorf("Failed to apply task definition: %v", err) + return fmt.Errorf("failed to apply task definition: %w", err) + } + + lp.Info("Start applying the ECS service definition") + service, _, err := applyServiceDefinition(ctx, lp, client, serviceDef) + if err != nil { + lp.Errorf("Failed to apply service definition: %v", err) + return fmt.Errorf("failed to apply service definition: %w", err) + } + + if recreate { + cnt := service.DesiredCount + lp.Info("Recreate option is enabled, stop all running tasks before creating new task set") + if err := client.PruneServiceTasks(ctx, *service); err != nil { + lp.Errorf("Failed to prune service tasks: %v", err) + return fmt.Errorf("failed to prune service tasks: %w", err) + } + + lp.Info("Start rolling out ECS TaskSet for the new task definition") + if err = createPrimaryTaskSet(ctx, lp, client, *service, *td, primary); err != nil { + lp.Errorf("Failed to rollout ECS TaskSet for service %s: %v", *service.ServiceName, err) + return fmt.Errorf("failed to create primary task set: %w", err) + } + + lp.Info("Deleting old ECS TaskSets") + if err = deleteOldTaskSets(ctx, client, *service); err != nil { + lp.Errorf("Failed to delete old Tasksets of service %s: %v", *service.ServiceName, err) + return fmt.Errorf("failed to delete old tasksets: %w", err) + } + + lp.Infof("Scale up ECS desired tasks count back to %d", cnt) + service.DesiredCount = cnt + if _, err = client.UpdateService(ctx, *service); err != nil { + lp.Errorf("Failed to revive service tasks: %v", err) + return fmt.Errorf("failed to revive service tasks: %w", err) + } + } else { + lp.Info("Start rolling out ECS TaskSet for the new task definition") + if err = createPrimaryTaskSet(ctx, lp, client, *service, *td, primary); err != nil { + lp.Errorf("Failed to rollout ECS TaskSet for service %s: %v", *service.ServiceName, err) + return fmt.Errorf("failed to create primary task set: %w", err) + } + + lp.Info("Deleting old ECS TaskSets") + if err = deleteOldTaskSets(ctx, client, *service); err != nil { + lp.Errorf("Failed to delete old Tasksets of service %s: %v", *service.ServiceName, err) + return fmt.Errorf("failed to delete old tasksets: %w", err) + } + } + + lp.Infof("Wait service %s to reach stable state", *service.ServiceName) + if err := client.WaitServiceStable(ctx, *service.ClusterArn, *service.ServiceName); err != nil { + lp.Errorf("Failed to wait for service to be stable: %v", err) + return err + } + + return nil +} + +func (e *externalController) PrimaryRollout( + ctx context.Context, + lp sdk.StageLogPersister, + client provider.Client, + taskDef types.TaskDefinition, + serviceDef types.Service, + primary *types.LoadBalancer, +) error { + lp.Info("Start applying the ECS task definition") + td, err := applyTaskDefinition(ctx, client, taskDef) + if err != nil { + return fmt.Errorf("failed to apply task definition: %w", err) + } + + lp.Info("Start applying the ECS service definition") + service, _, err := applyServiceDefinition(ctx, lp, client, serviceDef) + if err != nil { + return fmt.Errorf("failed to apply service definition: %w", err) + } + + lp.Infof("Get current PRIMARY taskset") + currPrimaryTs, err := client.GetPrimaryTaskSet(ctx, *service) + if err != nil { + return fmt.Errorf("failed to get current primary taskset: %w", err) + } + + lp.Infof("Rolling out new PRIMARY taskset for service %s", *service.ServiceName) + if err = createPrimaryTaskSet(ctx, lp, client, *service, *td, primary); err != nil { + return fmt.Errorf("failed to create primary taskset for service %s: %w", *service.ServiceName, err) + } + + lp.Infof("Deleting old PRIMARY taskset") + if currPrimaryTs != nil { + if err = client.DeleteTaskSet(ctx, *currPrimaryTs); err != nil { + return fmt.Errorf("failed to delete old primary taskset: %w", err) + } + } + + lp.Infof("Waiting for service %s to reach stable state", *service.ServiceName) + if err := client.WaitServiceStable(ctx, *service.ClusterArn, *service.ServiceName); err != nil { + return fmt.Errorf("service %s did not reach stable state: %w", *service.ServiceName, err) + } + + lp.Successf("Successfully rolled out PRIMARY task set for service %s", *service.ServiceName) + return nil +} + +func (e *externalController) Rollback( + ctx context.Context, + lp sdk.StageLogPersister, + client provider.Client, + taskDef types.TaskDefinition, + serviceDef types.Service, + primary *types.LoadBalancer, +) error { + lp.Infof("Registering task definition family %s", *taskDef.Family) + td, err := client.RegisterTaskDefinition(ctx, taskDef) + if err != nil { + return fmt.Errorf("failed to register task definition %s: %w", *taskDef.Family, err) + } + + lp.Infof("Applying service definition for service %s", *serviceDef.ServiceName) + service, _, err := applyServiceDefinition(ctx, lp, client, serviceDef) + if err != nil { + return fmt.Errorf("failed to apply service definition for service %s: %w", *serviceDef.ServiceName, err) + } + + lp.Infof("Getting current task sets for service %s", *service.ServiceName) + prevTaskSets, err := client.GetServiceTaskSets(ctx, *service) + if err != nil { + return fmt.Errorf("failed to get task sets for service %s: %w", *service.ServiceName, err) + } + + lp.Infof("Creating rollback task set for service %s", *service.ServiceName) + taskSet, err := client.CreateTaskSet(ctx, *service, *td, primary, 100) + if err != nil { + return fmt.Errorf("failed to create task set for service %s: %w", *service.ServiceName, err) + } + + // Promote the new task set to PRIMARY + lp.Infof("Promoting rollback task set to PRIMARY for service %s", *service.ServiceName) + if _, err = client.UpdateServicePrimaryTaskSet(ctx, *service, *taskSet); err != nil { + return fmt.Errorf("failed to update primary task set for service %s: %w", *service.ServiceName, err) + } + + // Delete all previous task sets including any remaining canary tasksets + lp.Info("Deleting previous task sets") + for _, ts := range prevTaskSets { + lp.Infof("Deleting task set %s", *ts.TaskSetArn) + if err := client.DeleteTaskSet(ctx, ts); err != nil { + return fmt.Errorf("failed to delete task set %s: %w", *ts.TaskSetArn, err) + } + } + + return nil +} + +// ecsController implements deploymentController for the native ECS deployment controller type. +// +// Deployments are triggered by calling UpdateService with a new task definition and ForceNewDeployment=true +type ecsController struct{} + +func (e *ecsController) Sync( + ctx context.Context, + lp sdk.StageLogPersister, + client provider.Client, + taskDef types.TaskDefinition, + serviceDef types.Service, + _ *types.LoadBalancer, + _ bool, +) error { + return e.deploy(ctx, lp, client, taskDef, serviceDef) +} + +func (e *ecsController) PrimaryRollout( + ctx context.Context, + lp sdk.StageLogPersister, + client provider.Client, + taskDef types.TaskDefinition, + serviceDef types.Service, + _ *types.LoadBalancer, +) error { + return e.deploy(ctx, lp, client, taskDef, serviceDef) +} + +func (e *ecsController) Rollback( + ctx context.Context, + lp sdk.StageLogPersister, + client provider.Client, + taskDef types.TaskDefinition, + serviceDef types.Service, + _ *types.LoadBalancer, +) error { + return e.deploy(ctx, lp, client, taskDef, serviceDef) +} + +// deploy is the shared deployment flow for all ECS controller stages: +// +// register task definition -> apply service -> force new deployment -> wait stable. +func (e *ecsController) deploy( + ctx context.Context, + lp sdk.StageLogPersister, + client provider.Client, + taskDef types.TaskDefinition, + serviceDef types.Service, +) error { + lp.Info("Start applying the ECS task definition") + td, err := applyTaskDefinition(ctx, client, taskDef) + if err != nil { + return fmt.Errorf("failed to apply task definition: %w", err) + } + + // Inject the registered task definition ARN so CreateService can include it. + // ECS deployment controller requires a task definition at service creation time, + // unlike EXTERNAL controller which sets it per-task-set via CreateTaskSet. + serviceDef.TaskDefinition = td.TaskDefinitionArn + + lp.Info("Start applying the ECS service definition") + service, newlyCreated, err := applyServiceDefinition(ctx, lp, client, serviceDef) + if err != nil { + return fmt.Errorf("failed to apply service definition: %w", err) + } + + if !newlyCreated { + // For existing services, trigger a new deployment with the updated task definition. + // When the service was just created, CreateService already starts the first deployment automatically + // (calling ForceNewDeployment would trigger a second redundant deployment). + lp.Infof("Triggering new deployment for service %s with task definition %s", *service.ServiceName, *td.TaskDefinitionArn) + if _, err := client.ForceNewDeployment(ctx, *service, *td); err != nil { + return fmt.Errorf("failed to force new deployment for service %s: %w", *service.ServiceName, err) + } + } + + lp.Infof("Waiting for service %s to reach stable state", *service.ServiceName) + if err := client.WaitServiceStable(ctx, *service.ClusterArn, *service.ServiceName); err != nil { + return fmt.Errorf("service %s did not reach stable state: %w", *service.ServiceName, err) + } + + return nil +} diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/determine.go b/pkg/app/pipedv1/plugin/ecs/deployment/determine.go new file mode 100644 index 0000000000..5248e4693d --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/deployment/determine.go @@ -0,0 +1,155 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deployment + +import ( + "fmt" + "sort" + "strings" + + "github.com/aws/aws-sdk-go-v2/service/ecs/types" + + sdk "github.com/pipe-cd/piped-plugin-sdk-go" +) + +type containerImage struct { + name string + tag string + digest string +} + +// parseContainerImage parses an ECS container image reference into its components. +// +// Supported formats: [registry/]name[:tag|@digest] +func parseContainerImage(image string) (img containerImage) { + ref := image + + if idx := strings.Index(ref, "@"); idx != -1 { + img.digest = ref[idx+1:] + ref = ref[:idx] + } + + parts := strings.Split(ref, "/") + last := parts[len(parts)-1] + + // Extract tag from the last segment only when there is no digest + if img.digest == "" { + if idx := strings.LastIndex(last, ":"); idx != -1 { + img.tag = last[idx+1:] + last = last[:idx] + } + } + + img.name = last + return +} + +// containerImages returns a map of container names to their images in the task definition that have both a name and an image set. +func containerImages(taskDef types.TaskDefinition) map[string]string { + m := make(map[string]string, len(taskDef.ContainerDefinitions)) + for _, c := range taskDef.ContainerDefinitions { + if c.Name == nil || c.Image == nil || *c.Image == "" { + continue + } + m[*c.Name] = *c.Image + } + return m +} + +// determineStrategy compares the running and target task definitions and returns the appropriate sync strategy: +// +// Use PipelineSync if any container image added, removed, or changed. +// +// Use QuickSync if no image difference. +func determineStrategy(running, target types.TaskDefinition) *sdk.DetermineStrategyResponse { + runningImages := containerImages(running) + targetImages := containerImages(target) + + var changes []string + + for name, targetImage := range targetImages { + runningImage, exists := runningImages[name] + if !exists { + changes = append(changes, fmt.Sprintf("added container %s with image %s", name, targetImage)) + continue + } + if runningImage != targetImage { + ri := parseContainerImage(runningImage) + ti := parseContainerImage(targetImage) + if ri.name == ti.name { + riVer := ri.tag + if riVer == "" { + riVer = ri.digest + } + tiVer := ti.tag + if tiVer == "" { + tiVer = ti.digest + } + changes = append(changes, fmt.Sprintf("image %s from %s to %s", ri.name, riVer, tiVer)) + } else { + changes = append(changes, fmt.Sprintf("image %s to %s", runningImage, targetImage)) + } + } + } + + for name := range runningImages { + if _, exists := targetImages[name]; !exists { + changes = append(changes, fmt.Sprintf("removed container %s", name)) + } + } + + if len(changes) > 0 { + sort.Strings(changes) + return &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: fmt.Sprintf("Sync progressively because of updating %s", strings.Join(changes, ", ")), + } + } + + return &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyQuickSync, + Summary: "Quick sync because no container image change was detected", + } +} + +// determineVersions extracts artifact versions from an ECS task definition. +// +// It finds all container images defined in the task definition's ContainerDefinitions and returns their names and tags. +// +// Duplicate image references are deduplicated. +func determineVersions(taskDef types.TaskDefinition) []sdk.ArtifactVersion { + imageMap := map[string]struct{}{} + for _, c := range taskDef.ContainerDefinitions { + if c.Image == nil || *c.Image == "" { + continue + } + imageMap[*c.Image] = struct{}{} + } + + versions := make([]sdk.ArtifactVersion, 0, len(imageMap)) + for i := range imageMap { + image := parseContainerImage(i) + version := image.tag + if version == "" { + version = image.digest + } + versions = append(versions, sdk.ArtifactVersion{ + Version: version, + Name: image.name, + URL: i, + }) + } + return versions +} diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/determine_test.go b/pkg/app/pipedv1/plugin/ecs/deployment/determine_test.go new file mode 100644 index 0000000000..10758a9d49 --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/deployment/determine_test.go @@ -0,0 +1,373 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deployment + +import ( + "sort" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ecs/types" + "github.com/stretchr/testify/assert" + + sdk "github.com/pipe-cd/piped-plugin-sdk-go" +) + +func TestParseContainerImage(t *testing.T) { + tests := []struct { + name string + image string + want containerImage + }{ + // No registry, just name and optional tag/digest + { + name: "name and tag only", + image: "nginx:1.21", + want: containerImage{name: "nginx", tag: "1.21"}, + }, + { + name: "name only, no tag", + image: "nginx", + want: containerImage{name: "nginx"}, + }, + // Registry with domain + { + name: "registry with domain and tag", + image: "gcr.io/myproject/myapp:v1.0", + want: containerImage{name: "myapp", tag: "v1.0"}, + }, + { + name: "ECR registry with tag", + image: "123456789.dkr.ecr.us-east-1.amazonaws.com/myapp:latest", + want: containerImage{name: "myapp", tag: "latest"}, + }, + // Registry with port: the colon in "host:port" must not be parsed as a tag separator + { + name: "registry with port and tag", + image: "my-registry:5000/app:latest", + want: containerImage{name: "app", tag: "latest"}, + }, + { + name: "registry with port, no tag", + image: "my-registry:5000/app", + want: containerImage{name: "app"}, + }, + // Digest + { + name: "digest only, no tag", + image: "nginx@sha256:abcdef1234567890", + want: containerImage{name: "nginx", digest: "sha256:abcdef1234567890"}, + }, + { + name: "registry with digest", + image: "gcr.io/myproject/myapp@sha256:abcdef1234567890", + want: containerImage{name: "myapp", digest: "sha256:abcdef1234567890"}, + }, + // Multi-level path + { + name: "multi-level path with tag", + image: "gcr.io/project-id/subpath/app:1.0", + want: containerImage{name: "app", tag: "1.0"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := parseContainerImage(tt.image) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestDetermineStrategy(t *testing.T) { + tests := []struct { + name string + running types.TaskDefinition + target types.TaskDefinition + want *sdk.DetermineStrategyResponse + }{ + { + name: "no change -> QuickSync", + running: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.21")}, + }, + }, + target: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.21")}, + }, + }, + want: &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyQuickSync, + Summary: "Quick sync because no container image change was detected", + }, + }, + { + name: "image tag updated -> PipelineSync", + running: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.21")}, + }, + }, + target: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.25")}, + }, + }, + want: &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: "Sync progressively because of updating image nginx from 1.21 to 1.25", + }, + }, + { + name: "image replaced with different name -> PipelineSync", + running: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.21")}, + }, + }, + target: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("apache:2.4")}, + }, + }, + want: &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: "Sync progressively because of updating image nginx:1.21 to apache:2.4", + }, + }, + { + name: "container added -> PipelineSync", + running: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.21")}, + }, + }, + target: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.21")}, + {Name: aws.String("sidecar"), Image: aws.String("redis:7.0")}, + }, + }, + want: &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: "Sync progressively because of updating added container sidecar with image redis:7.0", + }, + }, + { + name: "container removed -> PipelineSync", + running: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.21")}, + {Name: aws.String("sidecar"), Image: aws.String("redis:7.0")}, + }, + }, + target: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.21")}, + }, + }, + want: &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: "Sync progressively because of updating removed container sidecar", + }, + }, + { + name: "multiple containers, only one image changed -> PipelineSync", + running: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.21")}, + {Name: aws.String("sidecar"), Image: aws.String("redis:7.0")}, + }, + }, + target: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.25")}, + {Name: aws.String("sidecar"), Image: aws.String("redis:7.0")}, + }, + }, + want: &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: "Sync progressively because of updating image nginx from 1.21 to 1.25", + }, + }, + { + name: "empty running task definition -> all containers treated as added", + running: types.TaskDefinition{}, + target: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx:1.21")}, + }, + }, + want: &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: "Sync progressively because of updating added container app with image nginx:1.21", + }, + }, + { + name: "both empty -> QuickSync", + running: types.TaskDefinition{}, + target: types.TaskDefinition{}, + want: &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyQuickSync, + Summary: "Quick sync because no container image change was detected", + }, + }, + // Digest image cases + { + name: "added container with digest image -> PipelineSync", + running: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{}, + }, + target: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx@sha256:abcdef1234567890")}, + }, + }, + want: &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: "Sync progressively because of updating added container app with image nginx@sha256:abcdef1234567890", + }, + }, + { + name: "digest updated -> PipelineSync", + running: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx@sha256:aaaa")}, + }, + }, + target: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Name: aws.String("app"), Image: aws.String("nginx@sha256:bbbb")}, + }, + }, + want: &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: "Sync progressively because of updating image nginx from sha256:aaaa to sha256:bbbb", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := determineStrategy(tt.running, tt.target) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestDetermineVersions(t *testing.T) { + tests := []struct { + name string + taskDef types.TaskDefinition + want []sdk.ArtifactVersion + }{ + { + name: "single container with tag", + taskDef: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Image: aws.String("nginx:1.21")}, + }, + }, + want: []sdk.ArtifactVersion{ + {Name: "nginx", Version: "1.21", URL: "nginx:1.21"}, + }, + }, + { + name: "two containers with different images", + taskDef: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Image: aws.String("nginx:1.21")}, + {Image: aws.String("redis:7.0")}, + }, + }, + want: []sdk.ArtifactVersion{ + {Name: "nginx", Version: "1.21", URL: "nginx:1.21"}, + {Name: "redis", Version: "7.0", URL: "redis:7.0"}, + }, + }, + { + name: "two containers with same image are deduplicated", + taskDef: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Image: aws.String("nginx:1.21")}, + {Image: aws.String("nginx:1.21")}, + }, + }, + want: []sdk.ArtifactVersion{ + {Name: "nginx", Version: "1.21", URL: "nginx:1.21"}, + }, + }, + { + name: "container with nil image is skipped", + taskDef: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Image: nil}, + {Image: aws.String("nginx:1.21")}, + }, + }, + want: []sdk.ArtifactVersion{ + {Name: "nginx", Version: "1.21", URL: "nginx:1.21"}, + }, + }, + { + name: "container with empty image is skipped", + taskDef: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Image: aws.String("")}, + {Image: aws.String("nginx:1.21")}, + }, + }, + want: []sdk.ArtifactVersion{ + {Name: "nginx", Version: "1.21", URL: "nginx:1.21"}, + }, + }, + { + name: "registry with port", + taskDef: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Image: aws.String("my-registry:5000/app:v2.0")}, + }, + }, + want: []sdk.ArtifactVersion{ + {Name: "app", Version: "v2.0", URL: "my-registry:5000/app:v2.0"}, + }, + }, + { + name: "digest image uses digest as version", + taskDef: types.TaskDefinition{ + ContainerDefinitions: []types.ContainerDefinition{ + {Image: aws.String("nginx@sha256:abcdef1234567890")}, + }, + }, + want: []sdk.ArtifactVersion{ + {Name: "nginx", Version: "sha256:abcdef1234567890", URL: "nginx@sha256:abcdef1234567890"}, + }, + }, + { + name: "empty container definitions", + taskDef: types.TaskDefinition{}, + want: []sdk.ArtifactVersion{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := determineVersions(tt.taskDef) + // Sort both slices by URL for deterministic comparison since map iteration is unordered + sort.Slice(got, func(i, j int) bool { return got[i].URL < got[j].URL }) + sort.Slice(tt.want, func(i, j int) bool { return tt.want[i].URL < tt.want[j].URL }) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/plugin.go b/pkg/app/pipedv1/plugin/ecs/deployment/plugin.go index d739c3f5ff..3c27f1ea61 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/plugin.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/plugin.go @@ -18,9 +18,12 @@ import ( "context" "errors" + "go.uber.org/zap" + sdk "github.com/pipe-cd/piped-plugin-sdk-go" ecsconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/config" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/provider" ) var _ sdk.DeploymentPlugin[ecsconfig.ECSPluginConfig, ecsconfig.ECSDeployTargetConfig, ecsconfig.ECSApplicationSpec] = (*ECSPlugin)(nil) @@ -100,28 +103,71 @@ func (p *ECSPlugin) DetermineVersions( cfg *ecsconfig.ECSPluginConfig, input *sdk.DetermineVersionsInput[ecsconfig.ECSApplicationSpec], ) (*sdk.DetermineVersionsResponse, error) { + appCfg, err := input.Request.DeploymentSource.AppConfig() + if err != nil { + input.Logger.Error("failed to load application config", zap.Error(err)) + return nil, err + } + + taskDef, err := provider.LoadTaskDefinition( + input.Request.DeploymentSource.ApplicationDirectory, + appCfg.Spec.Input.TaskDefinitionFile, + ) + if err != nil { + input.Logger.Error("failed to load task definition", zap.Error(err)) + return nil, err + } + return &sdk.DetermineVersionsResponse{ - // TODO: Implement the logic to determine the versions of the resources that will be deployed. - // This is just a placeholder - Versions: []sdk.ArtifactVersion{ - { - Version: "latest", - Name: "ecs-task", - URL: "", - }, - }, + Versions: determineVersions(taskDef), }, nil } // DetermineStrategy determines the strategy to deploy the resources. +// +// Use PipelineSync if any container image added, removed, or changed. +// +// Use QuickSync if no image difference. func (p *ECSPlugin) DetermineStrategy( ctx context.Context, cfg *ecsconfig.ECSPluginConfig, input *sdk.DetermineStrategyInput[ecsconfig.ECSApplicationSpec], ) (*sdk.DetermineStrategyResponse, error) { - // Use quick sync as the default strategy for ECS deployment. - return &sdk.DetermineStrategyResponse{ - Strategy: sdk.SyncStrategyQuickSync, - Summary: "Use quick sync strategy for ECS deployment (work as ECS_SYNC stage)", - }, nil + targetAppCfg, err := input.Request.TargetDeploymentSource.AppConfig() + if err != nil { + input.Logger.Error("failed to load target application config", zap.Error(err)) + return nil, err + } + + taskDefFile := targetAppCfg.Spec.Input.TaskDefinitionFile + + targetTaskDef, err := provider.LoadTaskDefinition( + input.Request.TargetDeploymentSource.ApplicationDirectory, + taskDefFile, + ) + if err != nil { + input.Logger.Error("failed to load target task definition", zap.Error(err)) + return nil, err + } + + if input.Request.RunningDeploymentSource.ApplicationDirectory == "" { + return &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: "Sync with the specified pipeline (no running deployment source)", + }, nil + } + + runningTaskDef, err := provider.LoadTaskDefinition( + input.Request.RunningDeploymentSource.ApplicationDirectory, + taskDefFile, + ) + if err != nil { + input.Logger.Warn("failed to load running task definition, falling back to pipeline sync", zap.Error(err)) + return &sdk.DetermineStrategyResponse{ + Strategy: sdk.SyncStrategyPipelineSync, + Summary: "Sync with the specified pipeline (unable to load running task definition)", + }, nil + } + + return determineStrategy(runningTaskDef, targetTaskDef), nil } diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/primary.go b/pkg/app/pipedv1/plugin/ecs/deployment/primary.go index 53e9a20ecb..548b2c2263 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/primary.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/primary.go @@ -16,7 +16,6 @@ package deployment import ( "context" - "fmt" "github.com/aws/aws-sdk-go-v2/service/ecs/types" sdk "github.com/pipe-cd/piped-plugin-sdk-go" @@ -70,68 +69,11 @@ func (p *ECSPlugin) executeECSPrimaryRolloutStage( } } - if err := primaryRollout(ctx, lp, client, taskDef, serviceDef, primary); err != nil { + ctrl := newDeploymentController(serviceDef) + if err := ctrl.PrimaryRollout(ctx, lp, client, taskDef, serviceDef, primary); err != nil { lp.Errorf("Failed to roll out ECS primary task set: %v", err) return sdk.StageStatusFailure } return sdk.StageStatusSuccess } - -// primaryRollout performs the primary rollout workflow: -// -// 1. Registers the task definition -// -// 2. Applies the service definition (creates or updates the service) -// -// 3. Creates a new PRIMARY task set at 100% scale -// -// 4. Delete old PRIMARY task set -// -// 5. Waits for the service to reach stable state -func primaryRollout( - ctx context.Context, - lp sdk.StageLogPersister, - client provider.Client, - taskDef types.TaskDefinition, - serviceDef types.Service, - primary *types.LoadBalancer, -) error { - lp.Info("Start applying the ECS task definition") - td, err := applyTaskDefinition(ctx, client, taskDef) - if err != nil { - return fmt.Errorf("failed to apply task definition: %w", err) - } - - lp.Info("Start applying the ECS service definition") - service, err := applyServiceDefinition(ctx, lp, client, serviceDef) - if err != nil { - return fmt.Errorf("failed to apply service definition: %w", err) - } - - lp.Infof("Get current PRIMARY taskset") - currPrimaryTs, err := client.GetPrimaryTaskSet(ctx, *service) - if err != nil { - return fmt.Errorf("failed to get current primary taskset: %w", err) - } - - lp.Infof("Rolling out new PRIMARY taskset for service %s", *service.ServiceName) - if err = createPrimaryTaskSet(ctx, lp, client, *service, *td, primary); err != nil { - return fmt.Errorf("failed to create primary taskset for service %s: %w", *service.ServiceName, err) - } - - lp.Infof("Deleting old PRIMARY taskset") - if currPrimaryTs != nil { - if err = client.DeleteTaskSet(ctx, *currPrimaryTs); err != nil { - return fmt.Errorf("failed to delete old primary taskset: %w", err) - } - } - - lp.Infof("Waiting for service %s to reach stable state", *service.ServiceName) - if err := client.WaitServiceStable(ctx, *service.ClusterArn, *service.ServiceName); err != nil { - return fmt.Errorf("service %s did not reach stable state: %w", *service.ServiceName, err) - } - - lp.Successf("Successfully rolled out PRIMARY task set for service %s", *service.ServiceName) - return nil -} diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/primary_test.go b/pkg/app/pipedv1/plugin/ecs/deployment/primary_test.go index 0db880d256..be680fe931 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/primary_test.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/primary_test.go @@ -248,7 +248,8 @@ func TestPrimaryRollout(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - err := primaryRollout(context.Background(), &fakeLogPersister{}, tc.client, tc.taskDef, tc.serviceDef, tc.primary) + ctrl := &externalController{} + err := ctrl.PrimaryRollout(context.Background(), &fakeLogPersister{}, tc.client, tc.taskDef, tc.serviceDef, tc.primary) if tc.wantErr { require.Error(t, err) diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/rollback.go b/pkg/app/pipedv1/plugin/ecs/deployment/rollback.go index 242210a268..d40a82694f 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/rollback.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/rollback.go @@ -84,7 +84,8 @@ func (p *ECSPlugin) executeECSRollbackStage( // Restore ELB weights before touching task sets to avoid sending traffic to a // canary target group with no healthy targets during the rollback window. - if cfg.Spec.Input.AccessType == "ELB" && primary != nil { + // This is only applicable for EXTERNAL deployment controller which uses task sets and PipeCD-managed ELB traffic routing. + if cfg.Spec.Input.AccessType == "ELB" && primary != nil && !isECSControllerType(serviceDef) { lp.Info("Restoring ELB listener weights to 100% primary / 0% canary") if err := restoreELBWeights(ctx, lp, input.Client, client, primary); err != nil { lp.Errorf("Failed to restore ELB listener weights: %v", err) @@ -93,7 +94,8 @@ func (p *ECSPlugin) executeECSRollbackStage( } lp.Infof("Rolling back ECS service %s and task definition family %s", *serviceDef.ServiceName, *taskDef.Family) - if err := rollback(ctx, lp, client, taskDef, serviceDef, primary); err != nil { + ctrl := newDeploymentController(serviceDef) + if err := ctrl.Rollback(ctx, lp, client, taskDef, serviceDef, primary); err != nil { lp.Errorf("Failed to rollback ECS service: %v", err) return sdk.StageStatusFailure } @@ -142,56 +144,3 @@ func restoreELBWeights( lp.Infof("Restored ELB listener weights to 100%% primary / 0%% canary, modified %d rules", len(modifiedRules)) return nil } - -// rollback restores the ECS service and task set to the state defined in the running deployment source. -func rollback( - ctx context.Context, - lp sdk.StageLogPersister, - client provider.Client, - taskDef types.TaskDefinition, - serviceDef types.Service, - primary *types.LoadBalancer, -) error { - lp.Infof("Registering task definition family %s", *taskDef.Family) - td, err := client.RegisterTaskDefinition(ctx, taskDef) - if err != nil { - return fmt.Errorf("failed to register task definition %s: %w", *taskDef.Family, err) - } - - lp.Infof("Applying service definition for service %s", *serviceDef.ServiceName) - service, err := applyServiceDefinition(ctx, lp, client, serviceDef) - if err != nil { - return fmt.Errorf("failed to apply service definition for service %s: %w", *serviceDef.ServiceName, err) - } - - // Capture existing task sets before creating the rollback task set - lp.Infof("Getting current task sets for service %s", *service.ServiceName) - prevTaskSets, err := client.GetServiceTaskSets(ctx, *service) - if err != nil { - return fmt.Errorf("failed to get task sets for service %s: %w", *service.ServiceName, err) - } - - // Create a new task set at 100% scale to restore the original state - lp.Infof("Creating rollback task set for service %s", *service.ServiceName) - taskSet, err := client.CreateTaskSet(ctx, *service, *td, primary, 100) - if err != nil { - return fmt.Errorf("failed to create task set for service %s: %w", *service.ServiceName, err) - } - - // Promote the new task set to PRIMARY - lp.Infof("Promoting rollback task set to PRIMARY for service %s", *service.ServiceName) - if _, err = client.UpdateServicePrimaryTaskSet(ctx, *service, *taskSet); err != nil { - return fmt.Errorf("failed to update primary task set for service %s: %w", *service.ServiceName, err) - } - - // Delete all previous task sets including any remaining canary tasksets - lp.Info("Deleting previous task sets") - for _, ts := range prevTaskSets { - lp.Infof("Deleting task set %s", *ts.TaskSetArn) - if err := client.DeleteTaskSet(ctx, ts); err != nil { - return fmt.Errorf("failed to delete task set %s: %w", *ts.TaskSetArn, err) - } - } - - return nil -} diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/rollback_test.go b/pkg/app/pipedv1/plugin/ecs/deployment/rollback_test.go index 612d52121f..1e5147399e 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/rollback_test.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/rollback_test.go @@ -319,7 +319,8 @@ func TestRollBack(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - err := rollback(context.Background(), &fakeLogPersister{}, tc.client, tc.taskDef, tc.serviceDef, tc.primary) + ctrl := &externalController{} + err := ctrl.Rollback(context.Background(), &fakeLogPersister{}, tc.client, tc.taskDef, tc.serviceDef, tc.primary) if tc.wantErr { require.Error(t, err) assert.Contains(t, err.Error(), tc.wantErrMsg) diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/sync.go b/pkg/app/pipedv1/plugin/ecs/deployment/sync.go index 26f820a686..ca3f9e41be 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/sync.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/sync.go @@ -83,7 +83,8 @@ func (p *ECSPlugin) executeECSSyncStage( } } - if err := sync(ctx, lp, client, taskDef, serviceDef, primary, cfg.Spec.QuickSyncOptions.Recreate); err != nil { + ctrl := newDeploymentController(serviceDef) + if err := ctrl.Sync(ctx, lp, client, taskDef, serviceDef, primary, cfg.Spec.QuickSyncOptions.Recreate); err != nil { lp.Errorf("Failed to sync ECS service: %v", err) return sdk.StageStatusFailure } @@ -91,79 +92,6 @@ func (p *ECSPlugin) executeECSSyncStage( return sdk.StageStatusSuccess } -func sync( - ctx context.Context, - lp sdk.StageLogPersister, - client provider.Client, - taskDef types.TaskDefinition, - serviceDef types.Service, - primary *types.LoadBalancer, - recreate bool, -) error { - lp.Info("Start applying the ECS task definition") - td, err := applyTaskDefinition(ctx, client, taskDef) - if err != nil { - lp.Errorf("Failed to apply task definition: %v", err) - return fmt.Errorf("failed to apply task definition: %w", err) - } - - lp.Info("Start applying the ECS service definition") - service, err := applyServiceDefinition(ctx, lp, client, serviceDef) - if err != nil { - lp.Errorf("Failed to apply service definition: %v", err) - return fmt.Errorf("failed to apply service definition: %w", err) - } - - if recreate { - cnt := service.DesiredCount - lp.Info("Recreate option is enabled, stop all running tasks before creating new task set") - if err := client.PruneServiceTasks(ctx, *service); err != nil { - lp.Errorf("Failed to prune service tasks: %v", err) - return fmt.Errorf("failed to prune service tasks: %w", err) - } - - lp.Info("Start rolling out ECS TaskSet for the new task definition") - if err = createPrimaryTaskSet(ctx, lp, client, *service, *td, primary); err != nil { - lp.Errorf("Failed to rollout ECS TaskSet for service %s: %v", *service.ServiceName, err) - return fmt.Errorf("failed to create primary task set: %w", err) - } - - lp.Info("Deleting old ECS TaskSets") - if err = deleteOldTaskSets(ctx, client, *service); err != nil { - lp.Errorf("Failed to delete old Tasksets of service %s: %v", *service.ServiceName, err) - return fmt.Errorf("failed to delete old tasksets: %w", err) - } - - // Scale up the service tasks count back to its desired.p - lp.Infof("Scale up ECS desired tasks count back to %d", cnt) - service.DesiredCount = cnt - if _, err = client.UpdateService(ctx, *service); err != nil { - lp.Errorf("Failed to revive service tasks: %v", err) - return fmt.Errorf("failed to revive service tasks: %w", err) - } - } else { - lp.Info("Start rolling out ECS TaskSet for the new task definition") - if err = createPrimaryTaskSet(ctx, lp, client, *service, *td, primary); err != nil { - lp.Errorf("Failed to rollout ECS TaskSet for service %s: %v", *service.ServiceName, err) - return fmt.Errorf("failed to create primary task set: %w", err) - } - - lp.Info("Deleting old ECS TaskSets") - if err = deleteOldTaskSets(ctx, client, *service); err != nil { - lp.Errorf("Failed to delete old Tasksets of service %s: %v", *service.ServiceName, err) - return fmt.Errorf("failed to delete old tasksets: %w", err) - } - } - - lp.Infof("Wait service %s to reach stable state", *service.ServiceName) - if err := client.WaitServiceStable(ctx, *service.ClusterArn, *service.ServiceName); err != nil { - lp.Errorf("Failed to wait for service to be stable: %v", err) - return err - } - - return nil -} - func runStandaloneTask( ctx context.Context, client provider.Client, @@ -206,65 +134,67 @@ func applyTaskDefinition( return td, nil } +// applyServiceDefinition creates or updates the ECS service based on its existence. +// +// newlyCreated is true when the service did not exist and was newly created. func applyServiceDefinition( ctx context.Context, lp sdk.StageLogPersister, client provider.Client, serviceDef types.Service, -) (*types.Service, error) { +) (service *types.Service, newlyCreated bool, err error) { // Check whether the service already exists or not. // If it exists, update the service, otherwise create a new one. found, err := client.ServiceExists(ctx, *serviceDef.ClusterArn, *serviceDef.ServiceName) if err != nil { - return nil, fmt.Errorf("failed to check service %s existence: %w", *serviceDef.ServiceName, err) + return nil, false, fmt.Errorf("failed to check service %s existence: %w", *serviceDef.ServiceName, err) } - var service *types.Service if found { svcStatus, err := client.GetServiceStatus(ctx, *serviceDef.ClusterArn, *serviceDef.ServiceName) if err != nil { - return nil, fmt.Errorf("failed to get service %s status: %w", *serviceDef.ServiceName, err) + return nil, false, fmt.Errorf("failed to get service %s status: %w", *serviceDef.ServiceName, err) } lp.Infof("Service %s already exists with status %s", *serviceDef.ServiceName, svcStatus) // Only update the service when it is in ACTIVE status // Nothing can be performed if the service is in DRAINING or INACTIVE status - if svcStatus == "ACTIVE" { - lp.Infof("Updating service %s", *serviceDef.ServiceName) - service, err = client.UpdateService(ctx, serviceDef) - if err != nil { - return nil, fmt.Errorf("failed to update service %s: %w", *serviceDef.ServiceName, err) - } - } else { - return nil, fmt.Errorf("service %s is in %s status, cannot be updated", *serviceDef.ServiceName, svcStatus) + if svcStatus != "ACTIVE" { + return nil, false, fmt.Errorf("service %s is in %s status, cannot be updated", *serviceDef.ServiceName, svcStatus) + } + + lp.Infof("Updating service %s", *serviceDef.ServiceName) + service, err = client.UpdateService(ctx, serviceDef) + if err != nil { + return nil, false, fmt.Errorf("failed to update service %s: %w", *serviceDef.ServiceName, err) } currentTags, err := client.ListTags(ctx, *service.ServiceArn) if err != nil { - return nil, fmt.Errorf("failed to list tags for ECS service %s: %w", *serviceDef.ServiceName, err) + return nil, false, fmt.Errorf("failed to list tags for ECS service %s: %w", *serviceDef.ServiceName, err) } tagsToRemove := findTagsToRemove(currentTags, serviceDef.Tags) if len(tagsToRemove) > 0 { lp.Infof("Found tags to remove from service %s: %v", *serviceDef.ServiceName, tagsToRemove) if err := client.UntagResource(ctx, *service.ServiceArn, tagsToRemove); err != nil { - return nil, fmt.Errorf("failed to remove tags from ECS service %s: %w", *serviceDef.ServiceName, err) + return nil, false, fmt.Errorf("failed to remove tags from ECS service %s: %w", *serviceDef.ServiceName, err) } } if err := client.TagResource(ctx, *service.ServiceArn, serviceDef.Tags); err != nil { - return nil, fmt.Errorf("failed to update tags of ECS service %s: %w", *serviceDef.ServiceName, err) + return nil, false, fmt.Errorf("failed to update tags of ECS service %s: %w", *serviceDef.ServiceName, err) } // Re-assign tags to service object because UpdateService API doesn't return tags. service.Tags = serviceDef.Tags - } else { - lp.Infof("Service %s does not exist, creating a new service", *serviceDef.ServiceName) - service, err = client.CreateService(ctx, serviceDef) - if err != nil { - return nil, fmt.Errorf("failed to create service %s: %w", *serviceDef.ServiceName, err) - } + return service, false, nil } - return service, nil + lp.Infof("Service %s does not exist, creating a new service", *serviceDef.ServiceName) + service, err = client.CreateService(ctx, serviceDef) + if err != nil { + return nil, false, fmt.Errorf("failed to create service %s: %w", *serviceDef.ServiceName, err) + } + return service, true, nil } func findTagsToRemove(currentTags, desiredTags []types.Tag) []string { diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/sync_test.go b/pkg/app/pipedv1/plugin/ecs/deployment/sync_test.go index 64026bf153..0a548a8b6a 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/sync_test.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/sync_test.go @@ -313,7 +313,8 @@ func TestSync(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - err := sync(context.Background(), &fakeLogPersister{}, tc.client, tc.taskDef, tc.serviceDef, tc.primary, tc.recreate) + ctrl := &externalController{} + err := ctrl.Sync(context.Background(), &fakeLogPersister{}, tc.client, tc.taskDef, tc.serviceDef, tc.primary, tc.recreate) if tc.wantErr { require.Error(t, err) diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go b/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go index 497e9309a4..3fea00ec9c 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go @@ -51,6 +51,7 @@ type mockECSClient struct { ServiceExistsFunc func(ctx context.Context, cluster, serviceName string) (bool, error) GetServiceStatusFunc func(ctx context.Context, cluster, serviceName string) (string, error) WaitServiceStableFunc func(ctx context.Context, cluster, serviceName string) error + ForceNewDeploymentFunc func(ctx context.Context, service types.Service, taskDef types.TaskDefinition) (*types.Service, error) RegisterTaskDefinitionFunc func(ctx context.Context, taskDef types.TaskDefinition) (*types.TaskDefinition, error) RunTaskFunc func(ctx context.Context, taskDefinition types.TaskDefinition, clusterArn string, launchType string, awsVpcConfiguration *appconfig.ECSVpcConfiguration, tags []types.Tag) error PruneServiceTasksFunc func(ctx context.Context, service types.Service) error @@ -99,6 +100,9 @@ func (m *mockECSClient) GetServiceStatus(ctx context.Context, cluster, serviceNa func (m *mockECSClient) WaitServiceStable(ctx context.Context, cluster, serviceName string) error { return m.WaitServiceStableFunc(ctx, cluster, serviceName) } +func (m *mockECSClient) ForceNewDeployment(ctx context.Context, service types.Service, taskDef types.TaskDefinition) (*types.Service, error) { + return m.ForceNewDeploymentFunc(ctx, service, taskDef) +} func (m *mockECSClient) RegisterTaskDefinition(ctx context.Context, taskDef types.TaskDefinition) (*types.TaskDefinition, error) { return m.RegisterTaskDefinitionFunc(ctx, taskDef) } @@ -130,6 +134,10 @@ func happyPathClient(registeredTD *types.TaskDefinition, updatedSvc *types.Servi td := *registeredTD return &td, nil }, + ForceNewDeploymentFunc: func(_ context.Context, _ types.Service, _ types.TaskDefinition) (*types.Service, error) { + svc := *updatedSvc + return &svc, nil + }, ServiceExistsFunc: func(_ context.Context, _, _ string) (bool, error) { return true, nil }, diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/traffic.go b/pkg/app/pipedv1/plugin/ecs/deployment/traffic.go index fa6ba26e89..1b2b86d335 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/traffic.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/traffic.go @@ -48,6 +48,21 @@ func (p *ECSPlugin) executeECSTrafficRouting( return sdk.StageStatusFailure } + serviceDef, err := provider.LoadServiceDefinition( + input.Request.TargetDeploymentSource.ApplicationDirectory, + cfg.Spec.Input.ServiceDefinitionFile, + input, + ) + if err != nil { + lp.Errorf("Failed to load service definition: %v", err) + return sdk.StageStatusFailure + } + + if isECSControllerType(serviceDef) { + lp.Error("ECS_TRAFFIC_ROUTING is not supported with ECS deployment controller type; traffic routing requires EXTERNAL deployment controller type") + return sdk.StageStatusFailure + } + accessType := cfg.Spec.Input.AccessType if accessType != "ELB" { lp.Errorf("Unsupported access type %s in stage Traffic Routing for ECS application", accessType) diff --git a/pkg/app/pipedv1/plugin/ecs/go.mod b/pkg/app/pipedv1/plugin/ecs/go.mod index 733528143b..d06231b9c8 100644 --- a/pkg/app/pipedv1/plugin/ecs/go.mod +++ b/pkg/app/pipedv1/plugin/ecs/go.mod @@ -3,10 +3,10 @@ module github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs go 1.25.0 require ( - github.com/aws/aws-sdk-go-v2 v1.31.0 + github.com/aws/aws-sdk-go-v2 v1.41.5 github.com/aws/aws-sdk-go-v2/config v1.27.38 github.com/aws/aws-sdk-go-v2/credentials v1.17.36 - github.com/aws/aws-sdk-go-v2/service/ecs v1.46.2 + github.com/aws/aws-sdk-go-v2/service/ecs v1.78.0 github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.38.2 github.com/creasty/defaults v1.6.0 github.com/go-playground/assert/v2 v2.2.0 @@ -24,15 +24,15 @@ require ( cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/profiler v0.3.1 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.23.2 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.2 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.31.2 // indirect - github.com/aws/smithy-go v1.21.0 // indirect + github.com/aws/smithy-go v1.24.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/coreos/go-oidc/v3 v3.11.0 // indirect diff --git a/pkg/app/pipedv1/plugin/ecs/go.sum b/pkg/app/pipedv1/plugin/ecs/go.sum index 75ed8f3ff7..f2ff1e9b67 100644 --- a/pkg/app/pipedv1/plugin/ecs/go.sum +++ b/pkg/app/pipedv1/plugin/ecs/go.sum @@ -50,6 +50,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/aws/aws-sdk-go-v2 v1.31.0 h1:3V05LbxTSItI5kUqNwhJrrrY1BAXxXt0sN0l72QmG5U= github.com/aws/aws-sdk-go-v2 v1.31.0/go.mod h1:ztolYtaEUtdpf9Wftr31CJfLVjOnD/CVRkKOOYgF8hA= +github.com/aws/aws-sdk-go-v2 v1.41.5 h1:dj5kopbwUsVUVFgO4Fi5BIT3t4WyqIDjGKCangnV/yY= +github.com/aws/aws-sdk-go-v2 v1.41.5/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o= github.com/aws/aws-sdk-go-v2/config v1.27.38 h1:mMVyJJuSUdbD4zKXoxDgWrgM60QwlFEg+JhihCq6wCw= github.com/aws/aws-sdk-go-v2/config v1.27.38/go.mod h1:6xOiNEn58bj/64MPKx89r6G/el9JZn8pvVbquSqTKK4= github.com/aws/aws-sdk-go-v2/credentials v1.17.36 h1:zwI5WrT+oWWfzSKoTNmSyeBKQhsFRJRv+PGW/UZW+Yk= @@ -58,12 +60,18 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 h1:C/d03NAmh8C4BZXhuRNboF github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14/go.mod h1:7I0Ju7p9mCIdlrfS+JCgqcYD0VXz/N4yozsox+0o078= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 h1:kYQ3H1u0ANr9KEKlGs/jTLrBFPo8P8NaH/w7A01NeeM= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18/go.mod h1:r506HmK5JDUh9+Mw4CfGJGSSoqIiLCndAuqXuhbv67Y= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 h1:Rgg6wvjjtX8bNHcvi9OnXWwcE0a2vGpbwmtICOsvcf4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21/go.mod h1:A/kJFst/nm//cyqonihbdpQZwiUhhzpqTsdbhDdRF9c= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 h1:Z7IdFUONvTcvS7YuhtVxN99v2cCoHRXOS4mTr0B/pUc= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18/go.mod h1:DkKMmksZVVyat+Y+r1dEOgJEfUeA7UngIHWeKsi0yNc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 h1:PEgGVtPoB6NTpPrBgqSE5hE/o47Ij9qk/SEZFbUOe9A= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21/go.mod h1:p+hz+PRAYlY3zcpJhPwXlLC4C+kqn70WIHwnzAfs6ps= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= github.com/aws/aws-sdk-go-v2/service/ecs v1.46.2 h1:mC8vCpzGYi87z5Ot+LcIU7rpabkX88os9ZvtelIhHu0= github.com/aws/aws-sdk-go-v2/service/ecs v1.46.2/go.mod h1:/IMvyX4u5s4Ed0kzD+vWdPK92zm/q4CN1afJeDCsdhE= +github.com/aws/aws-sdk-go-v2/service/ecs v1.78.0 h1:P8s4jrrYr9CUPhoYXS0dI4Zi5oKXa6DWHUkeJ9m/gDQ= +github.com/aws/aws-sdk-go-v2/service/ecs v1.78.0/go.mod h1:QkWmubOYmjj3cHn7A4CoUU7BKJhVeo39Gp6NH7IyhZw= github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.38.2 h1:0pVeGkp7MqM3k3Il75hA6xI2USdkjaUv58SXJwvFIGY= github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.38.2/go.mod h1:V/sx2Ja18AlrvTGQsilx8CAH0CPm+hpKdT9RbSpceik= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 h1:QFASJGfT8wMXtuP3D5CRmMjARHv9ZmzFUMJznHDOY3w= @@ -78,6 +86,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.31.2 h1:O6tyji8mXmBGsHvTCB0VIhrDw19l github.com/aws/aws-sdk-go-v2/service/sts v1.31.2/go.mod h1:yMWe0F+XG0DkRZK5ODZhG7BEFYhLXi2dqGsv6tX0cgI= github.com/aws/smithy-go v1.21.0 h1:H7L8dtDRk0P1Qm6y0ji7MCYMQObJ5R9CRpyPhRUkLYA= github.com/aws/smithy-go v1.21.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng= +github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= diff --git a/pkg/app/pipedv1/plugin/ecs/provider/client.go b/pkg/app/pipedv1/plugin/ecs/provider/client.go index 1ad0da5ca2..ef1d44e11e 100644 --- a/pkg/app/pipedv1/plugin/ecs/provider/client.go +++ b/pkg/app/pipedv1/plugin/ecs/provider/client.go @@ -187,9 +187,9 @@ func (c *client) ModifyListeners(ctx context.Context, listenerArns []string, rou } func (c *client) CreateService(ctx context.Context, service types.Service) (*types.Service, error) { - if service.DeploymentController == nil || service.DeploymentController.Type != types.DeploymentControllerTypeExternal { - return nil, fmt.Errorf("failed to create ECS service %s: deployment controller of type EXTERNAL is required", *service.ServiceName) - } + isExternal := service.DeploymentController == nil || + service.DeploymentController.Type == types.DeploymentControllerTypeExternal + input := &ecs.CreateServiceInput{ Cluster: service.ClusterArn, ServiceName: service.ServiceName, @@ -207,23 +207,41 @@ func (c *client) CreateService(ctx context.Context, service types.Service) (*typ SchedulingStrategy: service.SchedulingStrategy, Tags: service.Tags, } + + if !isExternal { + // ECS controller allows passing all fields directly in CreateService. + + // TaskDefinition is also required for ECS controller + // (unlike EXTERNAL where it is set per-task-set via CreateTaskSet). + input.TaskDefinition = service.TaskDefinition + input.LaunchType = service.LaunchType + input.NetworkConfiguration = service.NetworkConfiguration + input.ServiceRegistries = service.ServiceRegistries + input.LoadBalancers = service.LoadBalancers + } + output, err := c.ecsClient.CreateService(ctx, input) if err != nil { return nil, fmt.Errorf("failed to create ECS service %s: %w", *service.ServiceName, err) } - // Hack: Since we use EXTERNAL deployment controller, the below configurations are not allowed to be passed - // in CreateService step, but it required in further step (CreateTaskSet step). We reassign those values - // as part of service definition for that purpose. - // ref: https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_CreateService.html - output.Service.LaunchType = service.LaunchType - output.Service.NetworkConfiguration = service.NetworkConfiguration - output.Service.ServiceRegistries = service.ServiceRegistries + if isExternal { + // Hack: Since we use EXTERNAL deployment controller, the below configurations are not allowed to be passed + // in CreateService step, but it required in further step (CreateTaskSet step). + // We reassign those values as part of service definition for that purpose. + // ref: https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_CreateService.html + output.Service.LaunchType = service.LaunchType + output.Service.NetworkConfiguration = service.NetworkConfiguration + output.Service.ServiceRegistries = service.ServiceRegistries + } return output.Service, nil } func (c *client) UpdateService(ctx context.Context, service types.Service) (*types.Service, error) { + isExternal := service.DeploymentController == nil || + service.DeploymentController.Type == types.DeploymentControllerTypeExternal + // TODO: Support other properties (current only support the properties that v0 supports) // This should be delegated to user to decide which properties to update by defining in service definition file. input := &ecs.UpdateServiceInput{ @@ -235,6 +253,16 @@ func (c *client) UpdateService(ctx context.Context, service types.Service) (*typ EnableECSManagedTags: aws.Bool(service.EnableECSManagedTags), } + if !isExternal { + // ECS controller allows updating network configuration, service registries, and + // deployment configuration (including DeploymentStrategy for ROLLING, BLUE_GREEN, + // CANARY, LINEAR strategies with their associated CanaryConfiguration, LinearConfiguration, and BakeTimeInMinutes). + input.NetworkConfiguration = service.NetworkConfiguration + input.ServiceRegistries = service.ServiceRegistries + input.DeploymentConfiguration = service.DeploymentConfiguration + input.LoadBalancers = service.LoadBalancers + } + // If desiredCount is 0 or not set, keep current desiredCount because a user might use AutoScaling. if service.DesiredCount != 0 { input.DesiredCount = aws.Int32(service.DesiredCount) @@ -245,17 +273,37 @@ func (c *client) UpdateService(ctx context.Context, service types.Service) (*typ return nil, fmt.Errorf("failed to update ECS service %s: %w", *service.ServiceName, err) } - // Hack: Since we use EXTERNAL deployment controller, the below configurations are not allowed to be passed - // in UpdateService step, but it required in further step (CreateTaskSet step). We reassign those values - // as part of service definition for that purpose. - // ref: https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_CreateService.html - output.Service.LaunchType = service.LaunchType - output.Service.NetworkConfiguration = service.NetworkConfiguration - output.Service.ServiceRegistries = service.ServiceRegistries + if isExternal { + // Hack: Since we use EXTERNAL deployment controller, the below configurations are not allowed to be passed + // in UpdateService step, but it required in further step (CreateTaskSet step). + // We reassign those values as part of service definition for that purpose. + // ref: https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_CreateService.html + output.Service.LaunchType = service.LaunchType + output.Service.NetworkConfiguration = service.NetworkConfiguration + output.Service.ServiceRegistries = service.ServiceRegistries + } return output.Service, nil } +func (c *client) ForceNewDeployment(ctx context.Context, service types.Service, taskDef types.TaskDefinition) (*types.Service, error) { + input := &ecs.UpdateServiceInput{ + Cluster: service.ClusterArn, + Service: service.ServiceName, + TaskDefinition: taskDef.TaskDefinitionArn, + ForceNewDeployment: true, + // Include DeploymentConfiguration so the deployment strategy (ROLLING, BLUE_GREEN, + // CANARY, LINEAR) and its associated configuration (CanaryConfiguration, + // LinearConfiguration, BakeTimeInMinutes) are applied for this deployment. + DeploymentConfiguration: service.DeploymentConfiguration, + } + output, err := c.ecsClient.UpdateService(ctx, input) + if err != nil { + return nil, fmt.Errorf("failed to force new deployment for service %s: %w", *service.ServiceName, err) + } + return output.Service, nil +} + func (c *client) DescribeService(ctx context.Context, service types.Service) (*types.Service, error) { input := &ecs.DescribeServicesInput{ Cluster: service.ClusterArn, diff --git a/pkg/app/pipedv1/plugin/ecs/provider/ecs.go b/pkg/app/pipedv1/plugin/ecs/provider/ecs.go index e991ffcbb1..9e75276fa5 100644 --- a/pkg/app/pipedv1/plugin/ecs/provider/ecs.go +++ b/pkg/app/pipedv1/plugin/ecs/provider/ecs.go @@ -54,6 +54,7 @@ type ECS interface { ServiceExists(ctx context.Context, cluster, serviceName string) (bool, error) GetServiceStatus(ctx context.Context, cluster, serviceName string) (string, error) WaitServiceStable(ctx context.Context, cluster, serviceName string) error + ForceNewDeployment(ctx context.Context, service types.Service, taskDef types.TaskDefinition) (*types.Service, error) RegisterTaskDefinition(ctx context.Context, taskDef types.TaskDefinition) (*types.TaskDefinition, error) RunTask(ctx context.Context, taskDefinition types.TaskDefinition, clusterArn string, launchType string, awsVpcConfiguration *config.ECSVpcConfiguration, tags []types.Tag) error PruneServiceTasks(ctx context.Context, service types.Service) error