docker.io/swarmkit/manager/orchestrator/replicated/slot.go

116 lines
3.1 KiB
Go

package replicated
import (
"context"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/orchestrator"
"github.com/docker/swarmkit/manager/state/store"
)
type slotsByRunningState []orchestrator.Slot
func (is slotsByRunningState) Len() int { return len(is) }
func (is slotsByRunningState) Swap(i, j int) { is[i], is[j] = is[j], is[i] }
// Less returns true if the first task should be preferred over the second task,
// all other things being equal in terms of node balance.
func (is slotsByRunningState) Less(i, j int) bool {
iRunning := false
jRunning := false
for _, ii := range is[i] {
if ii.Status.State == api.TaskStateRunning {
iRunning = true
break
}
}
for _, ij := range is[j] {
if ij.Status.State == api.TaskStateRunning {
jRunning = true
break
}
}
if iRunning && !jRunning {
return true
}
if !iRunning && jRunning {
return false
}
// Use Slot number as a tie-breaker to prefer to remove tasks in reverse
// order of Slot number. This would help us avoid unnecessary master
// migration when scaling down a stateful service because the master
// task of a stateful service is usually in a low numbered Slot.
return is[i][0].Slot < is[j][0].Slot
}
type slotWithIndex struct {
slot orchestrator.Slot
// index is a counter that counts this task as the nth instance of
// the service on its node. This is used for sorting the tasks so that
// when scaling down we leave tasks more evenly balanced.
index int
}
type slotsByIndex []slotWithIndex
func (is slotsByIndex) Len() int { return len(is) }
func (is slotsByIndex) Swap(i, j int) { is[i], is[j] = is[j], is[i] }
func (is slotsByIndex) Less(i, j int) bool {
if is[i].index < 0 && is[j].index >= 0 {
return false
}
if is[j].index < 0 && is[i].index >= 0 {
return true
}
return is[i].index < is[j].index
}
// updatableAndDeadSlots returns two maps of slots. The first contains slots
// that have at least one task with a desired state above NEW and lesser or
// equal to RUNNING, or a task that shouldn't be restarted. The second contains
// all other slots with at least one task.
func (r *Orchestrator) updatableAndDeadSlots(ctx context.Context, service *api.Service) (map[uint64]orchestrator.Slot, map[uint64]orchestrator.Slot, error) {
var (
tasks []*api.Task
err error
)
r.store.View(func(tx store.ReadTx) {
tasks, err = store.FindTasks(tx, store.ByServiceID(service.ID))
})
if err != nil {
return nil, nil, err
}
updatableSlots := make(map[uint64]orchestrator.Slot)
for _, t := range tasks {
updatableSlots[t.Slot] = append(updatableSlots[t.Slot], t)
}
deadSlots := make(map[uint64]orchestrator.Slot)
for slotID, slot := range updatableSlots {
updatable := r.restarts.UpdatableTasksInSlot(ctx, slot, service)
if len(updatable) != 0 {
updatableSlots[slotID] = updatable
} else {
delete(updatableSlots, slotID)
deadSlots[slotID] = slot
}
}
return updatableSlots, deadSlots, nil
}
// SlotTuple returns a slot tuple for the replicated service task.
func (r *Orchestrator) SlotTuple(t *api.Task) orchestrator.SlotTuple {
return orchestrator.SlotTuple{
ServiceID: t.ServiceID,
Slot: t.Slot,
}
}