Fix bug in processing jobs on platforms without Docker (#1834)

* Log incoming jobs.

Log the full contents of the job protobuf to make debugging jobs easier

* Ensure that the parallel executor always uses at least one thread.

The caller may mis-calculate the number of CPUs as zero, in which case
ensure that at least one thread is spawned.

* Use runtime.NumCPU for CPU counts.

For hosts without docker, GetHostInfo() returns a blank struct which
has zero CPUs and causes downstream trouble.

---------

Co-authored-by: Paul Armstrong <psa@users.noreply.gitea.com>
Co-authored-by: Jason Song <i@wolfogre.com>
This commit is contained in:
psa 2023-06-06 03:00:54 +00:00 committed by GitHub
parent c70a6743f6
commit 3ac2b726f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 56 additions and 8 deletions

View File

@ -3,6 +3,8 @@ package common
import ( import (
"context" "context"
"fmt" "fmt"
log "github.com/sirupsen/logrus"
) )
// Warning that implements `error` but safe to ignore // Warning that implements `error` but safe to ignore
@ -94,6 +96,11 @@ func NewParallelExecutor(parallel int, executors ...Executor) Executor {
work := make(chan Executor, len(executors)) work := make(chan Executor, len(executors))
errs := make(chan error, len(executors)) errs := make(chan error, len(executors))
if 1 > parallel {
log.Infof("Parallel tasks (%d) below minimum, setting to 1", parallel)
parallel = 1
}
for i := 0; i < parallel; i++ { for i := 0; i < parallel; i++ {
go func(work <-chan Executor, errs chan<- error) { go func(work <-chan Executor, errs chan<- error) {
for executor := range work { for executor := range work {

View File

@ -100,6 +100,17 @@ func TestNewParallelExecutor(t *testing.T) {
assert.Equal(3, count, "should run all 3 executors") assert.Equal(3, count, "should run all 3 executors")
assert.Equal(2, maxCount, "should run at most 2 executors in parallel") assert.Equal(2, maxCount, "should run at most 2 executors in parallel")
assert.Nil(err) assert.Nil(err)
// Reset to test running the executor with 0 parallelism
count = 0
activeCount = 0
maxCount = 0
errSingle := NewParallelExecutor(0, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx)
assert.Equal(3, count, "should run all 3 executors")
assert.Equal(1, maxCount, "should run at most 1 executors in parallel")
assert.Nil(errSingle)
} }
func TestNewParallelExecutorFailed(t *testing.T) { func TestNewParallelExecutorFailed(t *testing.T) {

View File

@ -417,6 +417,7 @@ func (j *Job) GetMatrixes() ([]map[string]interface{}, error) {
} }
} else { } else {
matrixes = append(matrixes, make(map[string]interface{})) matrixes = append(matrixes, make(map[string]interface{}))
log.Debugf("Empty Strategy, matrixes=%v", matrixes)
} }
return matrixes, nil return matrixes, nil
} }

View File

@ -5,11 +5,11 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"runtime"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/nektos/act/pkg/common" "github.com/nektos/act/pkg/common"
"github.com/nektos/act/pkg/container"
"github.com/nektos/act/pkg/model" "github.com/nektos/act/pkg/model"
) )
@ -103,15 +103,45 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
maxJobNameLen := 0 maxJobNameLen := 0
stagePipeline := make([]common.Executor, 0) stagePipeline := make([]common.Executor, 0)
log.Debugf("Plan Stages: %v", plan.Stages)
for i := range plan.Stages { for i := range plan.Stages {
stage := plan.Stages[i] stage := plan.Stages[i]
stagePipeline = append(stagePipeline, func(ctx context.Context) error { stagePipeline = append(stagePipeline, func(ctx context.Context) error {
pipeline := make([]common.Executor, 0) pipeline := make([]common.Executor, 0)
for _, run := range stage.Runs { for _, run := range stage.Runs {
log.Debugf("Stages Runs: %v", stage.Runs)
stageExecutor := make([]common.Executor, 0) stageExecutor := make([]common.Executor, 0)
job := run.Job() job := run.Job()
log.Debugf("Job.Name: %v", job.Name)
log.Debugf("Job.RawNeeds: %v", job.RawNeeds)
log.Debugf("Job.RawRunsOn: %v", job.RawRunsOn)
log.Debugf("Job.Env: %v", job.Env)
log.Debugf("Job.If: %v", job.If)
for step := range job.Steps {
if nil != job.Steps[step] {
log.Debugf("Job.Steps: %v", job.Steps[step].String())
}
}
log.Debugf("Job.TimeoutMinutes: %v", job.TimeoutMinutes)
log.Debugf("Job.Services: %v", job.Services)
log.Debugf("Job.Strategy: %v", job.Strategy)
log.Debugf("Job.RawContainer: %v", job.RawContainer)
log.Debugf("Job.Defaults.Run.Shell: %v", job.Defaults.Run.Shell)
log.Debugf("Job.Defaults.Run.WorkingDirectory: %v", job.Defaults.Run.WorkingDirectory)
log.Debugf("Job.Outputs: %v", job.Outputs)
log.Debugf("Job.Uses: %v", job.Uses)
log.Debugf("Job.With: %v", job.With)
// log.Debugf("Job.RawSecrets: %v", job.RawSecrets)
log.Debugf("Job.Result: %v", job.Result)
if job.Strategy != nil { if job.Strategy != nil {
log.Debugf("Job.Strategy.FailFast: %v", job.Strategy.FailFast)
log.Debugf("Job.Strategy.MaxParallel: %v", job.Strategy.MaxParallel)
log.Debugf("Job.Strategy.FailFastString: %v", job.Strategy.FailFastString)
log.Debugf("Job.Strategy.MaxParallelString: %v", job.Strategy.MaxParallelString)
log.Debugf("Job.Strategy.RawMatrix: %v", job.Strategy.RawMatrix)
strategyRc := runner.newRunContext(ctx, run, nil) strategyRc := runner.newRunContext(ctx, run, nil)
if err := strategyRc.NewExpressionEvaluator(ctx).EvaluateYamlNode(ctx, &job.Strategy.RawMatrix); err != nil { if err := strategyRc.NewExpressionEvaluator(ctx).EvaluateYamlNode(ctx, &job.Strategy.RawMatrix); err != nil {
log.Errorf("Error while evaluating matrix: %v", err) log.Errorf("Error while evaluating matrix: %v", err)
@ -122,6 +152,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
if m, err := job.GetMatrixes(); err != nil { if m, err := job.GetMatrixes(); err != nil {
log.Errorf("Error while get job's matrix: %v", err) log.Errorf("Error while get job's matrix: %v", err)
} else { } else {
log.Debugf("Job Matrices: %v", m)
log.Debugf("Runner Matrices: %v", runner.config.Matrix)
matrixes = selectMatrixes(m, runner.config.Matrix) matrixes = selectMatrixes(m, runner.config.Matrix)
} }
log.Debugf("Final matrix after applying user inclusions '%v'", matrixes) log.Debugf("Final matrix after applying user inclusions '%v'", matrixes)
@ -152,14 +184,11 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
} }
pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...)) pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...))
} }
var ncpu int ncpu := runtime.NumCPU()
info, err := container.GetHostInfo(ctx) if 1 > ncpu {
if err != nil { ncpu = 1
log.Errorf("failed to obtain container engine info: %s", err)
ncpu = 1 // sane default?
} else {
ncpu = info.NCPU
} }
log.Debugf("Detected CPUs: %d", ncpu)
return common.NewParallelExecutor(ncpu, pipeline...)(ctx) return common.NewParallelExecutor(ncpu, pipeline...)(ctx)
}) })
} }