Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,23 +314,47 @@ func launchSDKProcess() error {

var wg sync.WaitGroup
wg.Add(len(workerIds))
for _, workerId := range workerIds {
go func(workerId string) {
defer wg.Done()

bufLogger := tools.NewBufferedLogger(logger)
errorCount := 0
for {
childPids.mu.Lock()
if childPids.canceled {
childPids.mu.Unlock()
return
}
logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "python", args...)
childPids.v = append(childPids.v, cmd.Process.Pid)
childPids.mu.Unlock()

for _, workerId := range workerIds {
go func(workerId string) {
defer wg.Done()

workerCtx := grpcx.WriteWorkerID(context.Background(), workerId)

// Create a separate logger per worker so that each worker initializes
// its own Fn logging stream with the correct worker_id metadata.
// Shared loggers would reuse the first stream and cause incorrect
// portability_worker_id attribution across workers.
workerLogger := &tools.Logger{
Endpoint: *loggingEndpoint,
}

bufLogger := tools.NewBufferedLoggerWithFlushInterval(workerCtx, workerLogger, 100*time.Millisecond)

errorCount := 0
for {
childPids.mu.Lock()
if childPids.canceled {
childPids.mu.Unlock()
return
}

workerLogger.Printf(workerCtx,
"Executing Python (worker %v): python %v",
workerId,
strings.Join(args, " "),
)

cmd := StartCommandEnv(
map[string]string{"WORKER_ID": workerId},
os.Stdin,
bufLogger,
bufLogger,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tvalentyn I just noticed that the process stderr/stdout is output as debug logs via FlushAtDebug in

b.FlushAtDebug(b.periodicFlushContext)

Is it possible to either

  1. Make this default FlushAtInfo
  2. Or let users configure this to the log level they desire?

"python",
args...,
)

childPids.v = append(childPids.v, cmd.Process.Pid)
childPids.mu.Unlock()
if err := cmd.Wait(); err != nil {
// Retry on fatal errors, like OOMs and segfaults, not just
// DoFns throwing exceptions.
Expand Down
Binary file added sdks/python/container/container
Binary file not shown.
Loading