docker.io/swarmkit/manager/scheduler/scheduler_test.go

3732 lines
86 KiB
Go

package scheduler
import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"testing"
"time"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/genericresource"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestScheduler(t *testing.T) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
ID: "id1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
{
ID: "id2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
{
ID: "id3",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
}
initialTaskSet := []*api.Task{
{
ID: "id1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStateAssigned,
},
NodeID: initialNodeSet[0].ID,
},
{
ID: "id2",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
},
{
ID: "id3",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
},
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Prepopulate nodes
for _, n := range initialNodeSet {
assert.NoError(t, store.CreateNode(tx, n))
}
// Prepopulate tasks
for _, task := range initialTaskSet {
assert.NoError(t, store.CreateTask(tx, task))
}
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
assignment1 := watchAssignment(t, watch)
// must assign to id2 or id3 since id1 already has a task
assert.Regexp(t, assignment1.NodeID, "(id2|id3)")
assignment2 := watchAssignment(t, watch)
// must assign to id2 or id3 since id1 already has a task
if assignment1.NodeID == "id2" {
assert.Equal(t, "id3", assignment2.NodeID)
} else {
assert.Equal(t, "id2", assignment2.NodeID)
}
err = s.Update(func(tx store.Tx) error {
// Update each node to make sure this doesn't mess up the
// scheduler's state.
for _, n := range initialNodeSet {
assert.NoError(t, store.UpdateNode(tx, n))
}
return nil
})
assert.NoError(t, err)
err = s.Update(func(tx store.Tx) error {
// Delete the task associated with node 1 so it's now the most lightly
// loaded node.
assert.NoError(t, store.DeleteTask(tx, "id1"))
// Create a new task. It should get assigned to id1.
t4 := &api.Task{
ID: "id4",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name4",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
assert.NoError(t, store.CreateTask(tx, t4))
return nil
})
assert.NoError(t, err)
assignment3 := watchAssignment(t, watch)
assert.Equal(t, "id1", assignment3.NodeID)
// Update a task to make it unassigned. It should get assigned by the
// scheduler.
err = s.Update(func(tx store.Tx) error {
// Remove assignment from task id4. It should get assigned
// to node id1.
t4 := &api.Task{
ID: "id4",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name4",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
assert.NoError(t, store.UpdateTask(tx, t4))
return nil
})
assert.NoError(t, err)
assignment4 := watchAssignment(t, watch)
assert.Equal(t, "id1", assignment4.NodeID)
err = s.Update(func(tx store.Tx) error {
// Create a ready node, then remove it. No tasks should ever
// be assigned to it.
node := &api.Node{
ID: "removednode",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "removednode",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_DOWN,
},
}
assert.NoError(t, store.CreateNode(tx, node))
assert.NoError(t, store.DeleteNode(tx, node.ID))
// Create an unassigned task.
task := &api.Task{
ID: "removednode",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "removednode",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
assert.NoError(t, store.CreateTask(tx, task))
return nil
})
assert.NoError(t, err)
assignmentRemovedNode := watchAssignment(t, watch)
assert.NotEqual(t, "removednode", assignmentRemovedNode.NodeID)
err = s.Update(func(tx store.Tx) error {
// Create a ready node. It should be used for the next
// assignment.
n4 := &api.Node{
ID: "id4",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name4",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
assert.NoError(t, store.CreateNode(tx, n4))
// Create an unassigned task.
t5 := &api.Task{
ID: "id5",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name5",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
assert.NoError(t, store.CreateTask(tx, t5))
return nil
})
assert.NoError(t, err)
assignment5 := watchAssignment(t, watch)
assert.Equal(t, "id4", assignment5.NodeID)
err = s.Update(func(tx store.Tx) error {
// Create a non-ready node. It should NOT be used for the next
// assignment.
n5 := &api.Node{
ID: "id5",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name5",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_DOWN,
},
}
assert.NoError(t, store.CreateNode(tx, n5))
// Create an unassigned task.
t6 := &api.Task{
ID: "id6",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name6",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
assert.NoError(t, store.CreateTask(tx, t6))
return nil
})
assert.NoError(t, err)
assignment6 := watchAssignment(t, watch)
assert.NotEqual(t, "id5", assignment6.NodeID)
err = s.Update(func(tx store.Tx) error {
// Update node id5 to put it in the READY state.
n5 := &api.Node{
ID: "id5",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name5",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
assert.NoError(t, store.UpdateNode(tx, n5))
// Create an unassigned task. Should be assigned to the
// now-ready node.
t7 := &api.Task{
ID: "id7",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name7",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
assert.NoError(t, store.CreateTask(tx, t7))
return nil
})
assert.NoError(t, err)
assignment7 := watchAssignment(t, watch)
assert.Equal(t, "id5", assignment7.NodeID)
err = s.Update(func(tx store.Tx) error {
// Create a ready node, then immediately take it down. The next
// unassigned task should NOT be assigned to it.
n6 := &api.Node{
ID: "id6",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name6",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
assert.NoError(t, store.CreateNode(tx, n6))
n6.Status.State = api.NodeStatus_DOWN
assert.NoError(t, store.UpdateNode(tx, n6))
// Create an unassigned task.
t8 := &api.Task{
ID: "id8",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name8",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
assert.NoError(t, store.CreateTask(tx, t8))
return nil
})
assert.NoError(t, err)
assignment8 := watchAssignment(t, watch)
assert.NotEqual(t, "id6", assignment8.NodeID)
}
func testHA(t *testing.T, useSpecVersion bool) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
ID: "id1",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
{
ID: "id2",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
{
ID: "id3",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
{
ID: "id4",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
{
ID: "id5",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
}
taskTemplate1 := &api.Task{
DesiredState: api.TaskStateRunning,
ServiceID: "service1",
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Image: "v:1",
},
},
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
taskTemplate2 := &api.Task{
DesiredState: api.TaskStateRunning,
ServiceID: "service2",
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Image: "v:2",
},
},
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
if useSpecVersion {
taskTemplate1.SpecVersion = &api.Version{Index: 1}
taskTemplate2.SpecVersion = &api.Version{Index: 1}
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
t1Instances := 18
err := s.Update(func(tx store.Tx) error {
// Prepopulate nodes
for _, n := range initialNodeSet {
assert.NoError(t, store.CreateNode(tx, n))
}
// Prepopulate tasks from template 1
for i := 0; i != t1Instances; i++ {
taskTemplate1.ID = fmt.Sprintf("t1id%d", i)
assert.NoError(t, store.CreateTask(tx, taskTemplate1))
}
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
t1Assignments := make(map[string]int)
for i := 0; i != t1Instances; i++ {
assignment := watchAssignment(t, watch)
if !strings.HasPrefix(assignment.ID, "t1") {
t.Fatal("got assignment for different kind of task")
}
t1Assignments[assignment.NodeID]++
}
assert.Len(t, t1Assignments, 5)
nodesWith3T1Tasks := 0
nodesWith4T1Tasks := 0
for nodeID, taskCount := range t1Assignments {
if taskCount == 3 {
nodesWith3T1Tasks++
} else if taskCount == 4 {
nodesWith4T1Tasks++
} else {
t.Fatalf("unexpected number of tasks %d on node %s", taskCount, nodeID)
}
}
assert.Equal(t, 3, nodesWith4T1Tasks)
assert.Equal(t, 2, nodesWith3T1Tasks)
t2Instances := 2
// Add a new service with two instances. They should fill the nodes
// that only have two tasks.
err = s.Update(func(tx store.Tx) error {
for i := 0; i != t2Instances; i++ {
taskTemplate2.ID = fmt.Sprintf("t2id%d", i)
assert.NoError(t, store.CreateTask(tx, taskTemplate2))
}
return nil
})
assert.NoError(t, err)
t2Assignments := make(map[string]int)
for i := 0; i != t2Instances; i++ {
assignment := watchAssignment(t, watch)
if !strings.HasPrefix(assignment.ID, "t2") {
t.Fatal("got assignment for different kind of task")
}
t2Assignments[assignment.NodeID]++
}
assert.Len(t, t2Assignments, 2)
for nodeID := range t2Assignments {
assert.Equal(t, 3, t1Assignments[nodeID])
}
// Scale up service 1 to 21 tasks. It should cover the two nodes that
// service 2 was assigned to, and also one other node.
err = s.Update(func(tx store.Tx) error {
for i := t1Instances; i != t1Instances+3; i++ {
taskTemplate1.ID = fmt.Sprintf("t1id%d", i)
assert.NoError(t, store.CreateTask(tx, taskTemplate1))
}
return nil
})
assert.NoError(t, err)
var sharedNodes [2]string
for i := 0; i != 3; i++ {
assignment := watchAssignment(t, watch)
if !strings.HasPrefix(assignment.ID, "t1") {
t.Fatal("got assignment for different kind of task")
}
if t1Assignments[assignment.NodeID] == 5 {
t.Fatal("more than one new task assigned to the same node")
}
t1Assignments[assignment.NodeID]++
if t2Assignments[assignment.NodeID] != 0 {
if sharedNodes[0] == "" {
sharedNodes[0] = assignment.NodeID
} else if sharedNodes[1] == "" {
sharedNodes[1] = assignment.NodeID
} else {
t.Fatal("all three assignments went to nodes with service2 tasks")
}
}
}
assert.NotEmpty(t, sharedNodes[0])
assert.NotEmpty(t, sharedNodes[1])
assert.NotEqual(t, sharedNodes[0], sharedNodes[1])
nodesWith4T1Tasks = 0
nodesWith5T1Tasks := 0
for nodeID, taskCount := range t1Assignments {
if taskCount == 4 {
nodesWith4T1Tasks++
} else if taskCount == 5 {
nodesWith5T1Tasks++
} else {
t.Fatalf("unexpected number of tasks %d on node %s", taskCount, nodeID)
}
}
assert.Equal(t, 4, nodesWith4T1Tasks)
assert.Equal(t, 1, nodesWith5T1Tasks)
// Add another task from service2. It must not land on the node that
// has 5 service1 tasks.
err = s.Update(func(tx store.Tx) error {
taskTemplate2.ID = "t2id4"
assert.NoError(t, store.CreateTask(tx, taskTemplate2))
return nil
})
assert.NoError(t, err)
assignment := watchAssignment(t, watch)
if assignment.ID != "t2id4" {
t.Fatal("got assignment for different task")
}
if t2Assignments[assignment.NodeID] != 0 {
t.Fatal("was scheduled on a node that already has a service2 task")
}
if t1Assignments[assignment.NodeID] == 5 {
t.Fatal("was scheduled on the node that has the most service1 tasks")
}
t2Assignments[assignment.NodeID]++
// Remove all tasks on node id1.
err = s.Update(func(tx store.Tx) error {
tasks, err := store.FindTasks(tx, store.ByNodeID("id1"))
assert.NoError(t, err)
for _, task := range tasks {
assert.NoError(t, store.DeleteTask(tx, task.ID))
}
return nil
})
assert.NoError(t, err)
t1Assignments["id1"] = 0
t2Assignments["id1"] = 0
// Add four instances of service1 and two instances of service2.
// All instances of service1 should land on node "id1", and one
// of the two service2 instances should as well.
// Put these in a map to randomize the order in which they are
// created.
err = s.Update(func(tx store.Tx) error {
tasksMap := make(map[string]*api.Task)
for i := 22; i <= 25; i++ {
taskTemplate1.ID = fmt.Sprintf("t1id%d", i)
tasksMap[taskTemplate1.ID] = taskTemplate1.Copy()
}
for i := 5; i <= 6; i++ {
taskTemplate2.ID = fmt.Sprintf("t2id%d", i)
tasksMap[taskTemplate2.ID] = taskTemplate2.Copy()
}
for _, task := range tasksMap {
assert.NoError(t, store.CreateTask(tx, task))
}
return nil
})
assert.NoError(t, err)
for i := 0; i != 4+2; i++ {
assignment := watchAssignment(t, watch)
if strings.HasPrefix(assignment.ID, "t1") {
t1Assignments[assignment.NodeID]++
} else if strings.HasPrefix(assignment.ID, "t2") {
t2Assignments[assignment.NodeID]++
}
}
assert.Equal(t, 4, t1Assignments["id1"])
assert.Equal(t, 1, t2Assignments["id1"])
}
func TestHA(t *testing.T) {
t.Run("useSpecVersion=false", func(t *testing.T) { testHA(t, false) })
t.Run("useSpecVersion=true", func(t *testing.T) { testHA(t, true) })
}
func testPreferences(t *testing.T, useSpecVersion bool) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
ID: "id1",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az1",
},
},
},
},
{
ID: "id2",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az2",
},
},
},
},
{
ID: "id3",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az2",
},
},
},
},
{
ID: "id4",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az2",
},
},
},
},
{
ID: "id5",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az2",
},
},
},
},
}
taskTemplate1 := &api.Task{
DesiredState: api.TaskStateRunning,
ServiceID: "service1",
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Image: "v:1",
},
},
Placement: &api.Placement{
Preferences: []*api.PlacementPreference{
{
Preference: &api.PlacementPreference_Spread{
Spread: &api.SpreadOver{
SpreadDescriptor: "node.labels.az",
},
},
},
},
},
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
if useSpecVersion {
taskTemplate1.SpecVersion = &api.Version{Index: 1}
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
t1Instances := 8
err := s.Update(func(tx store.Tx) error {
// Prepoulate nodes
for _, n := range initialNodeSet {
assert.NoError(t, store.CreateNode(tx, n))
}
// Prepopulate tasks from template 1
for i := 0; i != t1Instances; i++ {
taskTemplate1.ID = fmt.Sprintf("t1id%d", i)
assert.NoError(t, store.CreateTask(tx, taskTemplate1))
}
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
t1Assignments := make(map[string]int)
for i := 0; i != t1Instances; i++ {
assignment := watchAssignment(t, watch)
if !strings.HasPrefix(assignment.ID, "t1") {
t.Fatal("got assignment for different kind of task")
}
t1Assignments[assignment.NodeID]++
}
assert.Len(t, t1Assignments, 5)
assert.Equal(t, 4, t1Assignments["id1"])
assert.Equal(t, 1, t1Assignments["id2"])
assert.Equal(t, 1, t1Assignments["id3"])
assert.Equal(t, 1, t1Assignments["id4"])
assert.Equal(t, 1, t1Assignments["id5"])
}
func TestPreferences(t *testing.T) {
t.Run("useSpecVersion=false", func(t *testing.T) { testPreferences(t, false) })
t.Run("useSpecVersion=true", func(t *testing.T) { testPreferences(t, true) })
}
func testMultiplePreferences(t *testing.T, useSpecVersion bool) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
ID: "id0",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az1",
"rack": "rack1",
},
},
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 1e9,
MemoryBytes: 1e8,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 1),
},
},
},
},
{
ID: "id1",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az1",
"rack": "rack1",
},
},
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 1e9,
MemoryBytes: 1e9,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 10),
},
},
},
},
{
ID: "id2",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az2",
"rack": "rack1",
},
},
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 1e9,
MemoryBytes: 1e9,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 6),
},
},
},
},
{
ID: "id3",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az2",
"rack": "rack1",
},
},
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 1e9,
MemoryBytes: 1e9,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 6),
},
},
},
},
{
ID: "id4",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az2",
"rack": "rack1",
},
},
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 1e9,
MemoryBytes: 1e9,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 6),
},
},
},
},
{
ID: "id5",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az2",
"rack": "rack2",
},
},
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 1e9,
MemoryBytes: 1e9,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 6),
},
},
},
},
{
ID: "id6",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"az": "az2",
"rack": "rack2",
},
},
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 1e9,
MemoryBytes: 1e9,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 6),
},
},
},
},
}
taskTemplate1 := &api.Task{
DesiredState: api.TaskStateRunning,
ServiceID: "service1",
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Image: "v:1",
},
},
Placement: &api.Placement{
Preferences: []*api.PlacementPreference{
{
Preference: &api.PlacementPreference_Spread{
Spread: &api.SpreadOver{
SpreadDescriptor: "node.labels.az",
},
},
},
{
Preference: &api.PlacementPreference_Spread{
Spread: &api.SpreadOver{
SpreadDescriptor: "node.labels.rack",
},
},
},
},
},
Resources: &api.ResourceRequirements{
Reservations: &api.Resources{
MemoryBytes: 2e8,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 2),
},
},
},
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
if useSpecVersion {
taskTemplate1.SpecVersion = &api.Version{Index: 1}
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
t1Instances := 12
err := s.Update(func(tx store.Tx) error {
// Prepoulate nodes
for _, n := range initialNodeSet {
assert.NoError(t, store.CreateNode(tx, n))
}
// Prepopulate tasks from template 1
for i := 0; i != t1Instances; i++ {
taskTemplate1.ID = fmt.Sprintf("t1id%d", i)
assert.NoError(t, store.CreateTask(tx, taskTemplate1))
}
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
t1Assignments := make(map[string]int)
for i := 0; i != t1Instances; i++ {
assignment := watchAssignment(t, watch)
if !strings.HasPrefix(assignment.ID, "t1") {
t.Fatal("got assignment for different kind of task")
}
t1Assignments[assignment.NodeID]++
}
assert.Len(t, t1Assignments, 6)
// There should be no tasks assigned to id0 because it doesn't meet the
// resource requirements.
assert.Equal(t, 0, t1Assignments["id0"])
// There should be 5 tasks assigned to id1 because half of the 12 tasks
// should ideally end up in az1, but id1 can only accommodate 5 due to
// resource requirements.
assert.Equal(t, 5, t1Assignments["id1"])
// The remaining 7 tasks should be spread across rack1 and rack2 of
// az2.
if t1Assignments["id2"]+t1Assignments["id3"]+t1Assignments["id4"] == 4 {
// If rack1 gets 4 and rack2 gets 3, then one of id[2-4] will have two
// tasks and the others will have one.
if t1Assignments["id2"] == 2 {
assert.Equal(t, 1, t1Assignments["id3"])
assert.Equal(t, 1, t1Assignments["id4"])
} else if t1Assignments["id3"] == 2 {
assert.Equal(t, 1, t1Assignments["id2"])
assert.Equal(t, 1, t1Assignments["id4"])
} else {
assert.Equal(t, 1, t1Assignments["id2"])
assert.Equal(t, 1, t1Assignments["id3"])
assert.Equal(t, 2, t1Assignments["id4"])
}
// either id5 or id6 should end up with 2 tasks
if t1Assignments["id5"] == 1 {
assert.Equal(t, 2, t1Assignments["id6"])
} else {
assert.Equal(t, 2, t1Assignments["id5"])
assert.Equal(t, 1, t1Assignments["id6"])
}
} else if t1Assignments["id2"]+t1Assignments["id3"]+t1Assignments["id4"] == 3 {
// If rack2 gets 4 and rack1 gets 3, then id[2-4] will each get
// 1 task and id[5-6] will each get 2 tasks.
assert.Equal(t, 1, t1Assignments["id2"])
assert.Equal(t, 1, t1Assignments["id3"])
assert.Equal(t, 1, t1Assignments["id4"])
assert.Equal(t, 2, t1Assignments["id5"])
assert.Equal(t, 2, t1Assignments["id6"])
} else {
t.Fatal("unexpected task layout")
}
}
func TestMultiplePreferences(t *testing.T) {
t.Run("useSpecVersion=false", func(t *testing.T) { testMultiplePreferences(t, false) })
t.Run("useSpecVersion=true", func(t *testing.T) { testMultiplePreferences(t, true) })
}
func TestSchedulerNoReadyNodes(t *testing.T) {
ctx := context.Background()
initialTask := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial service and task
assert.NoError(t, store.CreateService(tx, &api.Service{ID: "serviceID1"}))
assert.NoError(t, store.CreateTask(tx, initialTask))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node", failure.Status.Err)
err = s.Update(func(tx store.Tx) error {
// Create a ready node. The task should get assigned to this
// node.
node := &api.Node{
ID: "newnode",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "newnode",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
assert.NoError(t, store.CreateNode(tx, node))
return nil
})
assert.NoError(t, err)
assignment := watchAssignment(t, watch)
assert.Equal(t, "newnode", assignment.NodeID)
}
func TestSchedulerFaultyNode(t *testing.T) {
ctx := context.Background()
replicatedTaskTemplate := &api.Task{
ServiceID: "service1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
preassignedTaskTemplate := &api.Task{
ServiceID: "service2",
NodeID: "id1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
node1 := &api.Node{
ID: "id1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "id1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
node2 := &api.Node{
ID: "id2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "id2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial nodes, and one task of each type assigned to node id1
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
task1 := replicatedTaskTemplate.Copy()
task1.ID = "id1"
task1.NodeID = "id1"
task1.Status.State = api.TaskStateRunning
assert.NoError(t, store.CreateTask(tx, task1))
task2 := preassignedTaskTemplate.Copy()
task2.ID = "id2"
task2.NodeID = "id1"
task2.Status.State = api.TaskStateRunning
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
for i := 0; i != 8; i++ {
// Simulate a task failure cycle
newReplicatedTask := replicatedTaskTemplate.Copy()
newReplicatedTask.ID = identity.NewID()
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, newReplicatedTask))
return nil
})
assert.NoError(t, err)
assignment := watchAssignment(t, watch)
assert.Equal(t, newReplicatedTask.ID, assignment.ID)
if i < 5 {
// The first 5 attempts should be assigned to node id2 because
// it has no replicas of the service.
assert.Equal(t, "id2", assignment.NodeID)
} else {
// The next ones should be assigned to id1, since we'll
// flag id2 as potentially faulty.
assert.Equal(t, "id1", assignment.NodeID)
}
node2Info, err := scheduler.nodeSet.nodeInfo("id2")
assert.NoError(t, err)
expectedNode2Failures := i
if i > 5 {
expectedNode2Failures = 5
}
assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1"}], expectedNode2Failures)
node1Info, err := scheduler.nodeSet.nodeInfo("id1")
assert.NoError(t, err)
expectedNode1Failures := i - 5
if i < 5 {
expectedNode1Failures = 0
}
assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1"}], expectedNode1Failures)
newPreassignedTask := preassignedTaskTemplate.Copy()
newPreassignedTask.ID = identity.NewID()
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, newPreassignedTask))
return nil
})
assert.NoError(t, err)
assignment = watchAssignment(t, watch)
assert.Equal(t, newPreassignedTask.ID, assignment.ID)
// The preassigned task is always assigned to node id1
assert.Equal(t, "id1", assignment.NodeID)
// The service associated with the preassigned task will not be
// marked as
nodeInfo, err := scheduler.nodeSet.nodeInfo("id1")
assert.NoError(t, err)
assert.Len(t, nodeInfo.recentFailures[versionedService{serviceID: "service2"}], 0)
err = s.Update(func(tx store.Tx) error {
newReplicatedTask := store.GetTask(tx, newReplicatedTask.ID)
require.NotNil(t, newReplicatedTask)
newReplicatedTask.Status.State = api.TaskStateFailed
assert.NoError(t, store.UpdateTask(tx, newReplicatedTask))
newPreassignedTask := store.GetTask(tx, newPreassignedTask.ID)
require.NotNil(t, newPreassignedTask)
newPreassignedTask.Status.State = api.TaskStateFailed
assert.NoError(t, store.UpdateTask(tx, newPreassignedTask))
return nil
})
assert.NoError(t, err)
}
}
func TestSchedulerFaultyNodeSpecVersion(t *testing.T) {
ctx := context.Background()
taskTemplate := &api.Task{
ServiceID: "service1",
SpecVersion: &api.Version{Index: 1},
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
node1 := &api.Node{
ID: "id1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "id1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
node2 := &api.Node{
ID: "id2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "id2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial nodes, and one task assigned to node id1
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
task1 := taskTemplate.Copy()
task1.ID = "id1"
task1.NodeID = "id1"
task1.Status.State = api.TaskStateRunning
assert.NoError(t, store.CreateTask(tx, task1))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
for i := 0; i != 15; i++ {
// Simulate a task failure cycle
newTask := taskTemplate.Copy()
newTask.ID = identity.NewID()
// After the condition for node faultiness has been reached,
// bump the spec version to simulate a service update.
if i > 5 {
newTask.SpecVersion.Index++
}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, newTask))
return nil
})
assert.NoError(t, err)
assignment := watchAssignment(t, watch)
assert.Equal(t, newTask.ID, assignment.ID)
if i < 5 || (i > 5 && i < 11) {
// The first 5 attempts should be assigned to node id2 because
// it has no replicas of the service.
// Same with i=6 to i=10 inclusive, which is repeating the
// same behavior with a different SpecVersion.
assert.Equal(t, "id2", assignment.NodeID)
} else {
// The next ones should be assigned to id1, since we'll
// flag id2 as potentially faulty.
assert.Equal(t, "id1", assignment.NodeID)
}
node1Info, err := scheduler.nodeSet.nodeInfo("id1")
assert.NoError(t, err)
node2Info, err := scheduler.nodeSet.nodeInfo("id2")
assert.NoError(t, err)
expectedNode1Spec1Failures := 0
expectedNode1Spec2Failures := 0
expectedNode2Spec1Failures := i
expectedNode2Spec2Failures := 0
if i > 5 {
expectedNode1Spec1Failures = 1
expectedNode2Spec1Failures = 5
expectedNode2Spec2Failures = i - 6
}
if i > 11 {
expectedNode1Spec2Failures = i - 11
expectedNode2Spec2Failures = 5
}
assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 1}}], expectedNode1Spec1Failures)
assert.Len(t, node1Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 2}}], expectedNode1Spec2Failures)
assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 1}}], expectedNode2Spec1Failures)
assert.Len(t, node2Info.recentFailures[versionedService{serviceID: "service1", specVersion: api.Version{Index: 2}}], expectedNode2Spec2Failures)
err = s.Update(func(tx store.Tx) error {
newTask := store.GetTask(tx, newTask.ID)
require.NotNil(t, newTask)
newTask.Status.State = api.TaskStateFailed
assert.NoError(t, store.UpdateTask(tx, newTask))
return nil
})
assert.NoError(t, err)
}
}
func TestSchedulerResourceConstraint(t *testing.T) {
ctx := context.Background()
// Create a ready node without enough memory to run the task.
underprovisionedNode := &api.Node{
ID: "underprovisioned",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "underprovisioned",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 1e9,
MemoryBytes: 1e9,
Generic: append(
genericresource.NewSet("orange", "blue"),
genericresource.NewDiscrete("apple", 1),
),
},
},
}
// Non-ready nodes that satisfy the constraints but shouldn't be used
nonready1 := &api.Node{
ID: "nonready1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "nonready1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_UNKNOWN,
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 2e9,
MemoryBytes: 2e9,
Generic: append(
genericresource.NewSet("orange", "blue", "red"),
genericresource.NewDiscrete("apple", 2),
),
},
},
}
nonready2 := &api.Node{
ID: "nonready2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "nonready2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_UNKNOWN,
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 2e9,
MemoryBytes: 2e9,
Generic: append(
genericresource.NewSet("orange", "blue", "red"),
genericresource.NewDiscrete("apple", 2),
),
},
},
}
initialTask := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Resources: &api.ResourceRequirements{
Reservations: &api.Resources{
MemoryBytes: 2e9,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("orange", 2),
genericresource.NewDiscrete("apple", 2),
},
},
},
},
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
initialService := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, initialService))
assert.NoError(t, store.CreateTask(tx, initialTask))
assert.NoError(t, store.CreateNode(tx, underprovisionedNode))
assert.NoError(t, store.CreateNode(tx, nonready1))
assert.NoError(t, store.CreateNode(tx, nonready2))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (2 nodes not available for new tasks; insufficient resources on 1 node)", failure.Status.Err)
err = s.Update(func(tx store.Tx) error {
// Create a node with enough memory. The task should get
// assigned to this node.
node := &api.Node{
ID: "bignode",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "bignode",
},
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 4e9,
MemoryBytes: 8e9,
Generic: append(
genericresource.NewSet("orange", "blue", "red", "green"),
genericresource.NewDiscrete("apple", 4),
),
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
assert.NoError(t, store.CreateNode(tx, node))
return nil
})
assert.NoError(t, err)
assignment := watchAssignment(t, watch)
assert.Equal(t, "bignode", assignment.NodeID)
}
func TestSchedulerResourceConstraintHA(t *testing.T) {
// node 1 starts with 1 task, node 2 starts with 3 tasks.
// however, node 1 only has enough memory to schedule one more task.
ctx := context.Background()
node1 := &api.Node{
ID: "id1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "id1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Resources: &api.Resources{
MemoryBytes: 1e9,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 2),
},
},
},
}
node2 := &api.Node{
ID: "id2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "id2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Resources: &api.Resources{
MemoryBytes: 1e11,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 5),
},
},
},
}
taskTemplate := &api.Task{
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Resources: &api.ResourceRequirements{
Reservations: &api.Resources{
MemoryBytes: 5e8,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 1),
},
},
},
},
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial node and task
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
// preassigned tasks
task1 := taskTemplate.Copy()
task1.ID = "id1"
task1.NodeID = "id1"
task1.Status.State = api.TaskStateRunning
assert.NoError(t, store.CreateTask(tx, task1))
task2 := taskTemplate.Copy()
task2.ID = "id2"
task2.NodeID = "id2"
task2.Status.State = api.TaskStateRunning
assert.NoError(t, store.CreateTask(tx, task2))
task3 := taskTemplate.Copy()
task3.ID = "id3"
task3.NodeID = "id2"
task3.Status.State = api.TaskStateRunning
assert.NoError(t, store.CreateTask(tx, task3))
task4 := taskTemplate.Copy()
task4.ID = "id4"
task4.NodeID = "id2"
task4.Status.State = api.TaskStateRunning
assert.NoError(t, store.CreateTask(tx, task4))
// tasks to assign
task5 := taskTemplate.Copy()
task5.ID = "id5"
assert.NoError(t, store.CreateTask(tx, task5))
task6 := taskTemplate.Copy()
task6.ID = "id6"
assert.NoError(t, store.CreateTask(tx, task6))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
assignment1 := watchAssignment(t, watch)
if assignment1.ID != "id5" && assignment1.ID != "id6" {
t.Fatal("assignment for unexpected task")
}
assignment2 := watchAssignment(t, watch)
if assignment1.ID == "id5" {
assert.Equal(t, "id6", assignment2.ID)
} else {
assert.Equal(t, "id5", assignment2.ID)
}
if assignment1.NodeID == "id1" {
assert.Equal(t, "id2", assignment2.NodeID)
} else {
assert.Equal(t, "id1", assignment2.NodeID)
}
}
func TestSchedulerResourceConstraintDeadTask(t *testing.T) {
ctx := context.Background()
// Create a ready node without enough memory to run the task.
node := &api.Node{
ID: "id1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 1e9,
MemoryBytes: 1e9,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 4),
},
},
},
}
bigTask1 := &api.Task{
DesiredState: api.TaskStateRunning,
ID: "id1",
ServiceID: "serviceID1",
Spec: api.TaskSpec{
Resources: &api.ResourceRequirements{
Reservations: &api.Resources{
MemoryBytes: 8e8,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 3),
},
},
},
},
ServiceAnnotations: api.Annotations{
Name: "big",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
bigTask2 := bigTask1.Copy()
bigTask2.ID = "id2"
bigService := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, bigService))
assert.NoError(t, store.CreateNode(tx, node))
assert.NoError(t, store.CreateTask(tx, bigTask1))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
// The task fits, so it should get assigned
assignment := watchAssignment(t, watch)
assert.Equal(t, "id1", assignment.ID)
assert.Equal(t, "id1", assignment.NodeID)
err = s.Update(func(tx store.Tx) error {
// Add a second task. It shouldn't get assigned because of
// resource constraints.
return store.CreateTask(tx, bigTask2)
})
assert.NoError(t, err)
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (insufficient resources on 1 node)", failure.Status.Err)
err = s.Update(func(tx store.Tx) error {
// The task becomes dead
updatedTask := store.GetTask(tx, bigTask1.ID)
updatedTask.Status.State = api.TaskStateShutdown
return store.UpdateTask(tx, updatedTask)
})
assert.NoError(t, err)
// With the first task no longer consuming resources, the second
// one can be scheduled.
assignment = watchAssignment(t, watch)
assert.Equal(t, "id2", assignment.ID)
assert.Equal(t, "id1", assignment.NodeID)
}
func TestSchedulerPreexistingDeadTask(t *testing.T) {
ctx := context.Background()
// Create a ready node without enough memory to run two tasks at once.
node := &api.Node{
ID: "id1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Resources: &api.Resources{
NanoCPUs: 1e9,
MemoryBytes: 1e9,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 1),
},
},
},
}
deadTask := &api.Task{
DesiredState: api.TaskStateRunning,
ID: "id1",
NodeID: "id1",
Spec: api.TaskSpec{
Resources: &api.ResourceRequirements{
Reservations: &api.Resources{
MemoryBytes: 8e8,
Generic: []*api.GenericResource{
genericresource.NewDiscrete("apple", 1),
},
},
},
},
ServiceAnnotations: api.Annotations{
Name: "big",
},
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
}
bigTask2 := deadTask.Copy()
bigTask2.ID = "id2"
bigTask2.Status.State = api.TaskStatePending
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial node and task
assert.NoError(t, store.CreateNode(tx, node))
assert.NoError(t, store.CreateTask(tx, deadTask))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
err = s.Update(func(tx store.Tx) error {
// Add a second task. It should get assigned because the task
// using the resources is past the running state.
return store.CreateTask(tx, bigTask2)
})
assert.NoError(t, err)
assignment := watchAssignment(t, watch)
assert.Equal(t, "id2", assignment.ID)
assert.Equal(t, "id1", assignment.NodeID)
}
func TestSchedulerCompatiblePlatform(t *testing.T) {
ctx := context.Background()
// create tasks
// task1 - has a node it can run on
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "amd64",
OS: "linux",
},
},
},
},
}
// task2 - has no node it can run on
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "arm",
OS: "linux",
},
},
},
},
}
// task3 - no platform constraints, should run on any node
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name3",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
// task4 - only OS constraint, is runnable on any linux node
task4 := &api.Task{
ID: "id4",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name4",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "",
OS: "linux",
},
},
},
},
}
// task5 - supported on multiple platforms
task5 := &api.Task{
ID: "id5",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name5",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "amd64",
OS: "linux",
},
{
Architecture: "x86_64",
OS: "windows",
},
},
},
},
}
node1 := &api.Node{
ID: "node1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Platform: &api.Platform{
Architecture: "x86_64",
OS: "linux",
},
},
}
node2 := &api.Node{
ID: "node2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Platform: &api.Platform{
Architecture: "amd64",
OS: "windows",
},
},
}
// node with nil platform description, cannot schedule anything
// with a platform constraint
node3 := &api.Node{
ID: "node3",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node3",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{},
}
service1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial task, service and nodes to the store
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
assert.NoError(t, store.CreateNode(tx, node3))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
// task1 should get assigned
assignment1 := watchAssignment(t, watch)
assert.Equal(t, "node1", assignment1.NodeID)
// add task2
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (unsupported platform on 3 nodes)", failure.Status.Err)
// add task3
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task3))
return nil
})
assert.NoError(t, err)
assignment2 := watchAssignment(t, watch)
assert.Regexp(t, assignment2.NodeID, "(node2|node3)")
// add task4
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task4))
return nil
})
assert.NoError(t, err)
assignment3 := watchAssignment(t, watch)
assert.Equal(t, "node1", assignment3.NodeID)
// add task5
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task5))
return nil
})
assert.NoError(t, err)
assignment4 := watchAssignment(t, watch)
assert.Regexp(t, assignment4.NodeID, "(node1|node2)")
}
// TestSchedulerUnassignedMap tests that unassigned tasks are deleted from unassignedTasks when the service is removed
func TestSchedulerUnassignedMap(t *testing.T) {
ctx := context.Background()
// create a service and a task with OS constraint that is not met
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "amd64",
OS: "windows",
},
},
},
},
}
node1 := &api.Node{
ID: "node1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Platform: &api.Platform{
Architecture: "x86_64",
OS: "linux",
},
},
}
service1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial task, service and nodes to the store
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateNode(tx, node1))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
scheduler.unassignedTasks["id1"] = task1
scheduler.tick(ctx)
// task1 is in the unassigned map
assert.Contains(t, scheduler.unassignedTasks, task1.ID)
// delete the service of an unassigned task
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.DeleteService(tx, service1.ID))
return nil
})
assert.NoError(t, err)
scheduler.tick(ctx)
// task1 is removed from the unassigned map
assert.NotContains(t, scheduler.unassignedTasks, task1.ID)
}
func TestPreassignedTasks(t *testing.T) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
ID: "node1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
{
ID: "node2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
}
initialTaskSet := []*api.Task{
{
ID: "task1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
},
{
ID: "task2",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
NodeID: initialNodeSet[0].ID,
},
{
ID: "task3",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
NodeID: initialNodeSet[0].ID,
},
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Prepopulate nodes
for _, n := range initialNodeSet {
assert.NoError(t, store.CreateNode(tx, n))
}
// Prepopulate tasks
for _, task := range initialTaskSet {
assert.NoError(t, store.CreateTask(tx, task))
}
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
//preassigned tasks would be processed first
assignment1 := watchAssignment(t, watch)
// task2 and task3 are preassigned to node1
assert.Equal(t, assignment1.NodeID, "node1")
assert.Regexp(t, assignment1.ID, "(task2|task3)")
assignment2 := watchAssignment(t, watch)
if assignment1.ID == "task2" {
assert.Equal(t, "task3", assignment2.ID)
} else {
assert.Equal(t, "task2", assignment2.ID)
}
// task1 would be assigned to node2 because node1 has 2 tasks already
assignment3 := watchAssignment(t, watch)
assert.Equal(t, assignment3.ID, "task1")
assert.Equal(t, assignment3.NodeID, "node2")
}
func TestIgnoreTasks(t *testing.T) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
ID: "node1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
},
}
// Tasks with desired state running, shutdown, remove.
initialTaskSet := []*api.Task{
{
ID: "task1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
},
{
ID: "task2",
DesiredState: api.TaskStateShutdown,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
NodeID: initialNodeSet[0].ID,
},
{
ID: "task3",
DesiredState: api.TaskStateRemove,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
NodeID: initialNodeSet[0].ID,
},
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Prepopulate nodes
for _, n := range initialNodeSet {
assert.NoError(t, store.CreateNode(tx, n))
}
// Prepopulate tasks
for _, task := range initialTaskSet {
assert.NoError(t, store.CreateTask(tx, task))
}
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
// task1 is the only task that gets assigned since other two tasks
// are ignored by the scheduler.
// Normally task2/task3 should get assigned first since its a preassigned task.
assignment3 := watchAssignment(t, watch)
assert.Equal(t, assignment3.ID, "task1")
assert.Equal(t, assignment3.NodeID, "node1")
}
// TestNoStuckTask tests that a task which is cannot be scheduled (because of
// MaxReplicas or otherwise) does not remain stuck in the Pending state forever
// if the service is updated.
//
// Before the change which introduced this test, if a task got stuck in
// Pending, it could stay there forever, because it could not progress through
// the scheduler, and could likewise not be shut down.
//
// After the change which introduced this test, if the desired state of a task
// is terminal, and the task is in pending, and there is no suitable node, then
// the task is shut down.
func TestUnscheduleableTask(t *testing.T) {
ctx := context.Background()
node := &api.Node{
ID: "nodeid1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{},
}
task1 := &api.Task{
ID: "taskid1",
ServiceID: "serviceid1",
DesiredState: api.TaskStateRunning,
SpecVersion: &api.Version{
Index: 0,
},
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "servicename1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task2 := &api.Task{
ID: "taskid2",
ServiceID: "serviceid1",
DesiredState: api.TaskStateRunning,
SpecVersion: &api.Version{
Index: 0,
},
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "servicename1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
service1 := &api.Service{
ID: "serviceid1",
SpecVersion: &api.Version{
Index: 0,
},
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
assert.NoError(t, store.CreateNode(tx, node))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
var assigned, failed *api.Task
watchAttempt:
for {
select {
case event := <-watch:
if task, ok := event.(api.EventUpdateTask); ok {
if task.Task.Status.State < api.TaskStateAssigned {
failed = task.Task.Copy()
} else if task.Task.Status.State >= api.TaskStateAssigned &&
task.Task.Status.State <= api.TaskStateRunning &&
task.Task.NodeID != "" {
assigned = task.Task.Copy()
}
}
case <-time.After(time.Second):
assignedID := "none"
failedID := "none"
if assigned != nil {
assignedID = assigned.ID
}
if failed != nil {
failedID = failed.ID
}
t.Fatalf(
"did not get assignment and failure. Assigned: %v, Failed: %v",
assignedID, failedID,
)
}
if assigned != nil && failed != nil {
break watchAttempt
}
}
assert.Equal(t, "no suitable node (max replicas per node limit exceed)", failed.Status.Err)
// this is a case where the service is scaled down. in practice, scaling
// down a service does not work like this, but for this test, it can.
task1Update := &api.Task{
ID: "taskid1update",
ServiceID: "serviceid1",
DesiredState: api.TaskStateRunning,
SpecVersion: &api.Version{
Index: 1,
},
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "servicename1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
service1.SpecVersion.Index = 1
// now, update the tasks.
err = s.Update(func(tx store.Tx) error {
assigned.Status.State = api.TaskStateRunning
// simulate Start First ordering, where we'll start the new task then
// stop the old one. this is worst-case scenario, because it means that
// the other task (the one that succeeded) cannot be freed yet.
//
// if we set the old task to a terminal state, it there will be a race
// in the test where the old task might be marked freed, allowing the
// failed task to progress. We want to handle the case where this does
// not happen
assert.NoError(t, store.UpdateTask(tx, assigned))
failed.DesiredState = api.TaskStateShutdown
assert.NoError(t, store.UpdateTask(tx, failed))
assert.NoError(t, store.CreateTask(tx, task1Update))
assert.NoError(t, store.UpdateService(tx, service1))
return nil
})
assert.NoError(t, err)
// because the failed task is still currently under the purview of the
// scheduler, the scheduler should shut it down.
watchShutdown:
for {
select {
case event := <-watch:
if task, ok := event.(api.EventUpdateTask); ok {
if task.Task.ID == failed.ID {
if task.Task.Status.State >= api.TaskStateShutdown {
break watchShutdown
}
}
if task.Task.ID == task1Update.ID {
if task.Task.Status.State == api.TaskStateAssigned {
t.Logf("updated task assigned")
}
}
}
case <-time.After(time.Second):
t.Fatalf("old task %s never shut down", failed.ID)
}
}
}
func watchAssignmentFailure(t *testing.T, watch chan events.Event) *api.Task {
for {
select {
case event := <-watch:
if task, ok := event.(api.EventUpdateTask); ok {
if task.Task.Status.State < api.TaskStateAssigned {
return task.Task
}
}
case <-time.After(time.Second):
t.Fatal("no task assignment failure")
}
}
}
func watchAssignment(t *testing.T, watch chan events.Event) *api.Task {
for {
select {
case event := <-watch:
if task, ok := event.(api.EventUpdateTask); ok {
if task.Task.Status.State >= api.TaskStateAssigned &&
task.Task.Status.State <= api.TaskStateRunning &&
task.Task.NodeID != "" {
return task.Task
}
}
case <-time.After(time.Second):
t.Fatal("no task assignment")
}
}
}
func TestSchedulerPluginConstraint(t *testing.T) {
ctx := context.Background()
// Node1: vol plugin1
n1 := &api.Node{
ID: "node1_ID",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Description: &api.NodeDescription{
Engine: &api.EngineDescription{
Plugins: []api.PluginDescription{
{
Type: "Volume",
Name: "plugin1",
},
{
Type: "Log",
Name: "default",
},
},
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
// Node2: vol plugin1, vol plugin2
n2 := &api.Node{
ID: "node2_ID",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node2",
},
},
Description: &api.NodeDescription{
Engine: &api.EngineDescription{
Plugins: []api.PluginDescription{
{
Type: "Volume",
Name: "plugin1",
},
{
Type: "Volume",
Name: "plugin2",
},
{
Type: "Log",
Name: "default",
},
},
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
// Node3: vol plugin1, network plugin1
n3 := &api.Node{
ID: "node3_ID",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node3",
},
},
Description: &api.NodeDescription{
Engine: &api.EngineDescription{
Plugins: []api.PluginDescription{
{
Type: "Volume",
Name: "plugin1",
},
{
Type: "Network",
Name: "plugin1",
},
{
Type: "Log",
Name: "default",
},
},
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
// Node4: log plugin1
n4 := &api.Node{
ID: "node4_ID",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node4",
},
},
Description: &api.NodeDescription{
Engine: &api.EngineDescription{
Plugins: []api.PluginDescription{
{
Type: "Log",
Name: "plugin1",
},
},
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
volumeOptionsDriver := func(driver string) *api.Mount_VolumeOptions {
return &api.Mount_VolumeOptions{
DriverConfig: &api.Driver{
Name: driver,
},
}
}
// Task0: bind mount
t0 := &api.Task{
ID: "task0_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Mounts: []api.Mount{
{
Source: "/src",
Target: "/foo",
Type: api.MountTypeBind,
},
},
},
},
},
ServiceAnnotations: api.Annotations{
Name: "task0",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
// Task1: vol plugin1
t1 := &api.Task{
ID: "task1_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Mounts: []api.Mount{
{
Source: "testVol1",
Target: "/foo",
Type: api.MountTypeVolume,
VolumeOptions: volumeOptionsDriver("plugin1"),
},
},
},
},
},
ServiceAnnotations: api.Annotations{
Name: "task1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
// Task2: vol plugin1, vol plugin2
t2 := &api.Task{
ID: "task2_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Mounts: []api.Mount{
{
Source: "testVol1",
Target: "/foo",
Type: api.MountTypeVolume,
VolumeOptions: volumeOptionsDriver("plugin1"),
},
{
Source: "testVol2",
Target: "/foo",
Type: api.MountTypeVolume,
VolumeOptions: volumeOptionsDriver("plugin2"),
},
},
},
},
},
ServiceAnnotations: api.Annotations{
Name: "task2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
// Task3: vol plugin1, network plugin1
t3 := &api.Task{
ID: "task3_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Networks: []*api.NetworkAttachment{
{
Network: &api.Network{
ID: "testNwID1",
Spec: api.NetworkSpec{
Annotations: api.Annotations{
Name: "testVol1",
},
},
DriverState: &api.Driver{
Name: "plugin1",
},
},
},
},
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Mounts: []api.Mount{
{
Source: "testVol1",
Target: "/foo",
Type: api.MountTypeVolume,
VolumeOptions: volumeOptionsDriver("plugin1"),
},
},
},
},
},
ServiceAnnotations: api.Annotations{
Name: "task2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
// Task4: log plugin1
t4 := &api.Task{
ID: "task4_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
LogDriver: &api.Driver{Name: "plugin1"},
},
ServiceAnnotations: api.Annotations{
Name: "task4",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
// Task5: log plugin1
t5 := &api.Task{
ID: "task5_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
LogDriver: &api.Driver{Name: "plugin1"},
},
ServiceAnnotations: api.Annotations{
Name: "task5",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
// no logging
t6 := &api.Task{
ID: "task6_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
LogDriver: &api.Driver{Name: "none"},
},
ServiceAnnotations: api.Annotations{
Name: "task6",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
// log driver with no name
t7 := &api.Task{
ID: "task7_ID",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
LogDriver: &api.Driver{
Options: map[string]string{
"max-size": "50k",
},
},
},
ServiceAnnotations: api.Annotations{
Name: "task7",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
s1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
// Add initial node, service and task
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, s1))
assert.NoError(t, store.CreateTask(tx, t1))
assert.NoError(t, store.CreateNode(tx, n1))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
// t1 should get assigned
assignment := watchAssignment(t, watch)
assert.Equal(t, assignment.NodeID, "node1_ID")
// Create t0; it should get assigned because the plugin filter shouldn't
// be enabled for tasks that have bind mounts
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, t0))
return nil
})
assert.NoError(t, err)
assignment0 := watchAssignment(t, watch)
assert.Equal(t, assignment0.ID, "task0_ID")
assert.Equal(t, assignment0.NodeID, "node1_ID")
// Create t2; it should stay in the pending state because there is
// no node that with volume plugin `plugin2`
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, t2))
return nil
})
assert.NoError(t, err)
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (missing plugin on 1 node)", failure.Status.Err)
// Now add the second node
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateNode(tx, n2))
return nil
})
assert.NoError(t, err)
// Check that t2 has been assigned
assignment1 := watchAssignment(t, watch)
assert.Equal(t, assignment1.ID, "task2_ID")
assert.Equal(t, assignment1.NodeID, "node2_ID")
// Create t3; it should stay in the pending state because there is
// no node that with network plugin `plugin1`
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, t3))
return nil
})
assert.NoError(t, err)
failure = watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (missing plugin on 2 nodes)", failure.Status.Err)
// Now add the node3
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateNode(tx, n3))
return nil
})
assert.NoError(t, err)
// Check that t3 has been assigned
assignment2 := watchAssignment(t, watch)
assert.Equal(t, assignment2.ID, "task3_ID")
assert.Equal(t, assignment2.NodeID, "node3_ID")
// Create t4; it should stay in the pending state because there is
// no node that with log plugin `plugin1`
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, t4))
return nil
})
assert.NoError(t, err)
// check that t4 has been assigned
failure2 := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (missing plugin on 3 nodes)", failure2.Status.Err)
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateNode(tx, n4))
return nil
})
assert.NoError(t, err)
// Check that t4 has been assigned
assignment3 := watchAssignment(t, watch)
assert.Equal(t, assignment3.ID, "task4_ID")
assert.Equal(t, assignment3.NodeID, "node4_ID")
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, t5))
return nil
})
assert.NoError(t, err)
assignment4 := watchAssignment(t, watch)
assert.Equal(t, assignment4.ID, "task5_ID")
assert.Equal(t, assignment4.NodeID, "node4_ID")
// check that t6 gets assigned to some node
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, t6))
return nil
})
assert.NoError(t, err)
assignment5 := watchAssignment(t, watch)
assert.Equal(t, assignment5.ID, "task6_ID")
assert.NotEqual(t, assignment5.NodeID, "")
// check that t7 gets assigned to some node
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, t7))
return nil
})
assert.NoError(t, err)
assignment6 := watchAssignment(t, watch)
assert.Equal(t, assignment6.ID, "task7_ID")
assert.NotEqual(t, assignment6.NodeID, "")
}
func BenchmarkScheduler1kNodes1kTasks(b *testing.B) {
benchScheduler(b, 1e3, 1e3, false)
}
func BenchmarkScheduler1kNodes10kTasks(b *testing.B) {
benchScheduler(b, 1e3, 1e4, false)
}
func BenchmarkScheduler1kNodes100kTasks(b *testing.B) {
benchScheduler(b, 1e3, 1e5, false)
}
func BenchmarkScheduler100kNodes100kTasks(b *testing.B) {
benchScheduler(b, 1e5, 1e5, false)
}
func BenchmarkScheduler100kNodes1kTasks(b *testing.B) {
benchScheduler(b, 1e5, 1e3, false)
}
func BenchmarkScheduler100kNodes1MTasks(b *testing.B) {
benchScheduler(b, 1e5, 1e6, false)
}
func BenchmarkSchedulerConstraints1kNodes1kTasks(b *testing.B) {
benchScheduler(b, 1e3, 1e3, true)
}
func BenchmarkSchedulerConstraints1kNodes10kTasks(b *testing.B) {
benchScheduler(b, 1e3, 1e4, true)
}
func BenchmarkSchedulerConstraints1kNodes100kTasks(b *testing.B) {
benchScheduler(b, 1e3, 1e5, true)
}
func BenchmarkSchedulerConstraints5kNodes100kTasks(b *testing.B) {
benchScheduler(b, 5e3, 1e5, true)
}
func benchScheduler(b *testing.B, nodes, tasks int, networkConstraints bool) {
ctx := context.Background()
for iters := 0; iters < b.N; iters++ {
b.StopTimer()
s := store.NewMemoryStore(nil)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
go func() {
_ = scheduler.Run(ctx)
}()
// Let the scheduler get started
runtime.Gosched()
_ = s.Update(func(tx store.Tx) error {
// Create initial nodes and tasks
for i := 0; i < nodes; i++ {
n := &api.Node{
ID: identity.NewID(),
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "name" + strconv.Itoa(i),
Labels: make(map[string]string),
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Engine: &api.EngineDescription{},
},
}
// Give every third node a special network
if i%3 == 0 {
n.Description.Engine.Plugins = []api.PluginDescription{
{
Name: "network",
Type: "Network",
},
}
}
err := store.CreateNode(tx, n)
if err != nil {
panic(err)
}
}
for i := 0; i < tasks; i++ {
id := "task" + strconv.Itoa(i)
t := &api.Task{
ID: id,
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: id,
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
if networkConstraints {
t.Networks = []*api.NetworkAttachment{
{
Network: &api.Network{
DriverState: &api.Driver{
Name: "network",
},
},
},
}
}
err := store.CreateTask(tx, t)
if err != nil {
panic(err)
}
}
b.StartTimer()
return nil
})
for i := 0; i != tasks; i++ {
<-watch
}
scheduler.Stop()
cancel()
s.Close()
}
}
func TestSchedulerHostPort(t *testing.T) {
ctx := context.Background()
node1 := &api.Node{
ID: "nodeid1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
node2 := &api.Node{
ID: "nodeid2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
},
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Endpoint: &api.Endpoint{
Ports: []*api.PortConfig{
{
PublishMode: api.PublishModeHost,
PublishedPort: 58,
Protocol: api.ProtocolTCP,
},
},
},
}
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
},
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Endpoint: &api.Endpoint{
Ports: []*api.PortConfig{
{
PublishMode: api.PublishModeHost,
PublishedPort: 58,
Protocol: api.ProtocolUDP,
},
},
},
}
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
},
ServiceAnnotations: api.Annotations{
Name: "name3",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Endpoint: &api.Endpoint{
Ports: []*api.PortConfig{
{
PublishMode: api.PublishModeHost,
PublishedPort: 58,
Protocol: api.ProtocolUDP,
},
{
PublishMode: api.PublishModeHost,
PublishedPort: 58,
Protocol: api.ProtocolTCP,
},
},
},
}
service1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
// Tasks shouldn't be scheduled because there are no nodes.
watchAssignmentFailure(t, watch)
watchAssignmentFailure(t, watch)
err = s.Update(func(tx store.Tx) error {
// Add initial node and task
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
return nil
})
assert.NoError(t, err)
// Tasks 1 and 2 should be assigned to different nodes.
assignment1 := watchAssignment(t, watch)
assignment2 := watchAssignment(t, watch)
assert.True(t, assignment1 != assignment2)
// Task 3 should not be schedulable.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task3))
return nil
})
assert.NoError(t, err)
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (host-mode port already in use on 2 nodes)", failure.Status.Err)
}
func TestSchedulerMaxReplicas(t *testing.T) {
ctx := context.Background()
node1 := &api.Node{
ID: "nodeid1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
node2 := &api.Node{
ID: "nodeid2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "name3",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
service1 := &api.Service{
ID: "serviceID1",
}
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
err := s.Update(func(tx store.Tx) error {
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)
scheduler := New(s)
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()
go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()
// Tasks shouldn't be scheduled because there are no nodes.
watchAssignmentFailure(t, watch)
watchAssignmentFailure(t, watch)
err = s.Update(func(tx store.Tx) error {
// Add initial node and task
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
return nil
})
assert.NoError(t, err)
// Tasks 1 and 2 should be assigned to different nodes.
assignment1 := watchAssignment(t, watch)
assignment2 := watchAssignment(t, watch)
assert.True(t, assignment1 != assignment2)
// Task 3 should not be schedulable.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task3))
return nil
})
assert.NoError(t, err)
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (max replicas per node limit exceed)", failure.Status.Err)
// Add third node to get task 3 scheduled
node3 := &api.Node{
ID: "nodeid3",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node3",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateNode(tx, node3))
return nil
})
assert.NoError(t, err)
// Create four more tasks to node 1
task4 := &api.Task{
ID: "id4",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name4",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task5 := &api.Task{
ID: "id5",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name5",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task6 := &api.Task{
ID: "id6",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name6",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task7 := &api.Task{
ID: "id7",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name7",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task4))
assert.NoError(t, store.CreateTask(tx, task5))
assert.NoError(t, store.CreateTask(tx, task6))
return nil
})
assert.NoError(t, err)
// Task 7 should not be schedulable.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task7))
return nil
})
assert.NoError(t, err)
failure = watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (scheduling constraints not satisfied on 3 nodes)", failure.Status.Err)
}