Skip to content

Commit

Permalink
Don't fail if creating a venv didn't succeed. Also provide a way to d…
Browse files Browse the repository at this point in the history
…isable creating separate venvs. (#26753)

Co-authored-by: Robert Burke <lostluck@users.noreply.github.com>
  • Loading branch information
tvalentyn and lostluck committed May 19, 2023
1 parent 81fbedb commit f2400c8
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -157,15 +156,20 @@ func launchSDKProcess() error {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

venvDir, err := setupVenv(ctx, logger, "/opt/apache/beam-venv", *id)
if err != nil {
return errors.New("failed to initialize Python venv")
}
cleanupFunc := func() {
os.RemoveAll(venvDir)
logger.Printf(ctx, "Cleaned up temporary venv for worker %v.", *id)
// Create a separate virtual environment (with access to globally installed packages), unless disabled by the user.
// This improves usability on runners that persist the execution environment for the boot entrypoint between multiple pipeline executions.
if os.Getenv("RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT") == "" {
venvDir, err := setupVenv(ctx, logger, "/opt/apache/beam-venv", *id)
if err != nil {
logger.Printf(ctx, "Using default environment, since creating a virtual environment for the SDK harness didn't succeed: %v", err)
} else {
cleanupFunc := func() {
os.RemoveAll(venvDir)
logger.Printf(ctx, "Cleaned up temporary venv for worker %v.", *id)
}
defer cleanupFunc()
}
}
defer cleanupFunc()

dir := filepath.Join(*semiPersistDir, "staged")
files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
Expand Down Expand Up @@ -308,9 +312,8 @@ func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.C

// setupVenv initializes a local Python venv and sets the corresponding env variables
func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId string) (string, error) {
logger.Printf(ctx, "Initializing temporary Python venv ...")

dir := filepath.Join(baseDir, "beam-venv-worker-"+workerId)
logger.Printf(ctx, "Initializing temporary Python venv in %v", dir)
if _, err := os.Stat(dir); !os.IsNotExist(err) {
// Probably leftovers from a previous run
logger.Printf(ctx, "Cleaning up previous venv ...")
Expand Down

0 comments on commit f2400c8

Please sign in to comment.