From 629cafb0f8446ac74460099513d840508dd3cf0d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 28 Apr 2026 22:27:28 -0400 Subject: [PATCH 1/4] Add some more logging into boot.go to facilitate debugging --- sdks/python/container/boot.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 7c0f22675daf..5d3cfc13e95a 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -183,12 +183,17 @@ func launchSDKProcess() error { if err != nil { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } + logger.Printf(ctx, "PipelineOptions=%v", options) experiments := getExperiments(options) + logger.Printf(ctx, "Experiments=%v", experiments) + pipNoBuildIsolation = false if slices.Contains(experiments, "pip_no_build_isolation") { pipNoBuildIsolation = true - logger.Printf(ctx, "Disabled build isolation when installing packages with pip") + logger.Printf(ctx, "Build isolation disabled when installing packages with pip") + } else { + logger.Printf(ctx, "Build isolation enabled when installing packages with pip") } // (2) Retrieve and install the staged packages. @@ -408,6 +413,10 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str bufLogger := tools.NewBufferedLogger(logger) bufLogger.Printf(ctx, "Installing setup packages ...") + if err := logRuntimeDependencies(ctx, bufLogger, "pre-installation"); err != nil { + bufLogger.Printf(ctx, "couldn't fetch the runtime python dependencies: %v", err) + } + // Install the Dataflow Python SDK if one was staged. In released // container images, SDK is already installed, but can be overriden // using the --sdk_location pipeline option. @@ -432,7 +441,7 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil { return fmt.Errorf("failed to install workflow: %v", err) } - if err := logRuntimeDependencies(ctx, bufLogger); err != nil { + if err := logRuntimeDependencies(ctx, bufLogger, "post-installation"); err != nil { bufLogger.Printf(ctx, "couldn't fetch the runtime python dependencies: %v", err) } if err := logSubmissionEnvDependencies(ctx, bufLogger, workDir); err != nil { @@ -485,20 +494,20 @@ func processArtifactsInSetupOnlyMode() { // logRuntimeDependencies logs the python dependencies // installed in the runtime environment. -func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger) error { +func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger, phase string) error { pythonVersion, err := expansionx.GetPythonVersion() if err != nil { return err } - bufLogger.Printf(ctx, "Using Python version:") + bufLogger.Printf(ctx, "Using Python version (%s):", phase) args := []string{"--version"} if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { bufLogger.FlushAtError(ctx) } else { bufLogger.FlushAtDebug(ctx) } - bufLogger.Printf(ctx, "Logging runtime dependencies:") - args = []string{"-m", "pip", "freeze"} + bufLogger.Printf(ctx, "Logging runtime dependencies (%s):", phase) + args = []string{"-m", "pip", "freeze", "--all"} if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { bufLogger.FlushAtError(ctx) } else { From 53d2c7ff703330b3e32d212043c32e18afcd9b31 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 30 Apr 2026 11:14:31 -0400 Subject: [PATCH 2/4] Remove logging pipeline options. --- sdks/python/container/boot.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 5d3cfc13e95a..143e3e6bdedc 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -183,7 +183,6 @@ func launchSDKProcess() error { if err != nil { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } - logger.Printf(ctx, "PipelineOptions=%v", options) experiments := getExperiments(options) logger.Printf(ctx, "Experiments=%v", experiments) From 8febd75a2fda42799423fa2ce920dc82b8e4f40a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 1 May 2026 21:09:51 -0400 Subject: [PATCH 3/4] Improve readability and consistency for logging messages. --- sdks/python/container/boot.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 143e3e6bdedc..934253baec63 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -410,10 +410,10 @@ func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId stri // installSetupPackages installs Beam SDK and user dependencies. func installSetupPackages(ctx context.Context, logger *tools.Logger, files []string, workDir string, requirementsFiles []string) error { bufLogger := tools.NewBufferedLogger(logger) - bufLogger.Printf(ctx, "Installing setup packages ...") + bufLogger.Printf(ctx, "Installing Beam SDK and user dependencies ...") - if err := logRuntimeDependencies(ctx, bufLogger, "pre-installation"); err != nil { - bufLogger.Printf(ctx, "couldn't fetch the runtime python dependencies: %v", err) + if err := logRuntimeDependencies(ctx, bufLogger, "initial runtime environment"); err != nil { + bufLogger.Printf(ctx, "Failed to fetch the runtime python dependencies: %v", err) } // Install the Dataflow Python SDK if one was staged. In released @@ -440,11 +440,11 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil { return fmt.Errorf("failed to install workflow: %v", err) } - if err := logRuntimeDependencies(ctx, bufLogger, "post-installation"); err != nil { - bufLogger.Printf(ctx, "couldn't fetch the runtime python dependencies: %v", err) + if err := logRuntimeDependencies(ctx, bufLogger, "final runtime environment"); err != nil { + bufLogger.Printf(ctx, "Failed to fetch the runtime python dependencies: %v", err) } if err := logSubmissionEnvDependencies(ctx, bufLogger, workDir); err != nil { - bufLogger.Printf(ctx, "couldn't fetch the submission environment dependencies: %v", err) + bufLogger.Printf(ctx, "Failed to fetch the submission environment dependencies: %v", err) } return nil @@ -498,14 +498,14 @@ func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger if err != nil { return err } - bufLogger.Printf(ctx, "Using Python version (%s):", phase) + bufLogger.Printf(ctx, "Python version in %s:", phase) args := []string{"--version"} if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { bufLogger.FlushAtError(ctx) } else { bufLogger.FlushAtDebug(ctx) } - bufLogger.Printf(ctx, "Logging runtime dependencies (%s):", phase) + bufLogger.Printf(ctx, "Dependencies in %s:", phase) args = []string{"-m", "pip", "freeze", "--all"} if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { bufLogger.FlushAtError(ctx) @@ -518,7 +518,7 @@ func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger // logSubmissionEnvDependencies logs the python dependencies // installed in the submission environment. func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.BufferedLogger, dir string) error { - bufLogger.Printf(ctx, "Logging submission environment dependencies:") + bufLogger.Printf(ctx, "Dependencies in submission environment:") // path for submission environment dependencies should match with the // one defined in apache_beam/runners/portability/stager.py. filename := filepath.Join(dir, "submission_environment_dependencies.txt") From a56f6ee4fd84de904c4d2817499d9fe95b27f7b4 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 1 May 2026 21:53:29 -0400 Subject: [PATCH 4/4] Minor change to a log message. --- sdks/python/container/boot.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 934253baec63..a2655903a4b1 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -407,10 +407,10 @@ func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId stri return dir, nil } -// installSetupPackages installs Beam SDK and user dependencies. +// installSetupPackages installs user dependencies. func installSetupPackages(ctx context.Context, logger *tools.Logger, files []string, workDir string, requirementsFiles []string) error { bufLogger := tools.NewBufferedLogger(logger) - bufLogger.Printf(ctx, "Installing Beam SDK and user dependencies ...") + bufLogger.Printf(ctx, "Installing user dependencies ...") if err := logRuntimeDependencies(ctx, bufLogger, "initial runtime environment"); err != nil { bufLogger.Printf(ctx, "Failed to fetch the runtime python dependencies: %v", err)