105 lines
3.0 KiB
Go
105 lines
3.0 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/docker/swarmkit/api"
|
|
"github.com/docker/swarmkit/log"
|
|
"github.com/docker/swarmkit/manager/state/store"
|
|
)
|
|
|
|
// IsReplicatedService checks if a service is a replicated service.
|
|
func IsReplicatedService(service *api.Service) bool {
|
|
// service nil validation is required as there are scenarios
|
|
// where service is removed from store
|
|
if service == nil {
|
|
return false
|
|
}
|
|
_, ok := service.Spec.GetMode().(*api.ServiceSpec_Replicated)
|
|
return ok
|
|
}
|
|
|
|
// IsGlobalService checks if the service is a global service.
|
|
func IsGlobalService(service *api.Service) bool {
|
|
if service == nil {
|
|
return false
|
|
}
|
|
_, ok := service.Spec.GetMode().(*api.ServiceSpec_Global)
|
|
return ok
|
|
}
|
|
|
|
// IsReplicatedJob returns true if the service is a replicated job.
|
|
func IsReplicatedJob(service *api.Service) bool {
|
|
if service == nil {
|
|
return false
|
|
}
|
|
|
|
_, ok := service.Spec.GetMode().(*api.ServiceSpec_ReplicatedJob)
|
|
return ok
|
|
}
|
|
|
|
// IsGlobalJob returns true if the service is a global job.
|
|
func IsGlobalJob(service *api.Service) bool {
|
|
if service == nil {
|
|
return false
|
|
}
|
|
|
|
_, ok := service.Spec.GetMode().(*api.ServiceSpec_GlobalJob)
|
|
return ok
|
|
}
|
|
|
|
// SetServiceTasksRemove sets the desired state of tasks associated with a service
|
|
// to REMOVE, so that they can be properly shut down by the agent and later removed
|
|
// by the task reaper.
|
|
func SetServiceTasksRemove(ctx context.Context, s *store.MemoryStore, service *api.Service) {
|
|
var (
|
|
tasks []*api.Task
|
|
err error
|
|
)
|
|
s.View(func(tx store.ReadTx) {
|
|
tasks, err = store.FindTasks(tx, store.ByServiceID(service.ID))
|
|
})
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Errorf("failed to list tasks")
|
|
return
|
|
}
|
|
|
|
err = s.Batch(func(batch *store.Batch) error {
|
|
for _, t := range tasks {
|
|
err := batch.Update(func(tx store.Tx) error {
|
|
// the task may have changed for some reason in the meantime
|
|
// since we read it out, so we need to get from the store again
|
|
// within the boundaries of a transaction
|
|
latestTask := store.GetTask(tx, t.ID)
|
|
|
|
// time travel is not allowed. if the current desired state is
|
|
// above the one we're trying to go to we can't go backwards.
|
|
// we have nothing to do and we should skip to the next task
|
|
if latestTask.DesiredState > api.TaskStateRemove {
|
|
// log a warning, though. we shouln't be trying to rewrite
|
|
// a state to an earlier state
|
|
log.G(ctx).Warnf(
|
|
"cannot update task %v in desired state %v to an earlier desired state %v",
|
|
latestTask.ID, latestTask.DesiredState, api.TaskStateRemove,
|
|
)
|
|
return nil
|
|
}
|
|
// update desired state to REMOVE
|
|
latestTask.DesiredState = api.TaskStateRemove
|
|
|
|
if err := store.UpdateTask(tx, latestTask); err != nil {
|
|
log.G(ctx).WithError(err).Errorf("failed transaction: update task desired state to REMOVE")
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Errorf("task search transaction failed")
|
|
}
|
|
}
|