122 lines
2.8 KiB
Go
122 lines
2.8 KiB
Go
package hostdiscovery
|
|
|
|
import (
|
|
"net"
|
|
"sync"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
mapset "github.com/deckarep/golang-set"
|
|
"github.com/docker/docker/pkg/discovery"
|
|
// Including KV
|
|
_ "github.com/docker/docker/pkg/discovery/kv"
|
|
"github.com/docker/libkv/store/consul"
|
|
"github.com/docker/libkv/store/etcd"
|
|
"github.com/docker/libkv/store/zookeeper"
|
|
"github.com/docker/libnetwork/types"
|
|
)
|
|
|
|
type hostDiscovery struct {
|
|
watcher discovery.Watcher
|
|
nodes mapset.Set
|
|
stopChan chan struct{}
|
|
sync.Mutex
|
|
}
|
|
|
|
func init() {
|
|
consul.Register()
|
|
etcd.Register()
|
|
zookeeper.Register()
|
|
}
|
|
|
|
// NewHostDiscovery function creates a host discovery object
|
|
func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery {
|
|
return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})}
|
|
}
|
|
|
|
func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
|
|
h.Lock()
|
|
d := h.watcher
|
|
h.Unlock()
|
|
if d == nil {
|
|
return types.BadRequestErrorf("invalid discovery watcher")
|
|
}
|
|
discoveryCh, errCh := d.Watch(h.stopChan)
|
|
go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback)
|
|
return nil
|
|
}
|
|
|
|
func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error,
|
|
activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
|
|
for {
|
|
select {
|
|
case entries := <-ch:
|
|
h.processCallback(entries, activeCallback, joinCallback, leaveCallback)
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
logrus.Errorf("discovery error: %v", err)
|
|
}
|
|
case <-h.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *hostDiscovery) StopDiscovery() error {
|
|
h.Lock()
|
|
stopChan := h.stopChan
|
|
h.watcher = nil
|
|
h.Unlock()
|
|
|
|
close(stopChan)
|
|
return nil
|
|
}
|
|
|
|
func (h *hostDiscovery) processCallback(entries discovery.Entries,
|
|
activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
|
|
updated := hosts(entries)
|
|
h.Lock()
|
|
existing := h.nodes
|
|
added, removed := diff(existing, updated)
|
|
h.nodes = updated
|
|
h.Unlock()
|
|
|
|
activeCallback()
|
|
if len(added) > 0 {
|
|
joinCallback(added)
|
|
}
|
|
if len(removed) > 0 {
|
|
leaveCallback(removed)
|
|
}
|
|
}
|
|
|
|
func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []net.IP) {
|
|
addSlice := updated.Difference(existing).ToSlice()
|
|
removeSlice := existing.Difference(updated).ToSlice()
|
|
for _, ip := range addSlice {
|
|
added = append(added, net.ParseIP(ip.(string)))
|
|
}
|
|
for _, ip := range removeSlice {
|
|
removed = append(removed, net.ParseIP(ip.(string)))
|
|
}
|
|
return
|
|
}
|
|
|
|
func (h *hostDiscovery) Fetch() []net.IP {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
ips := []net.IP{}
|
|
for _, ipstr := range h.nodes.ToSlice() {
|
|
ips = append(ips, net.ParseIP(ipstr.(string)))
|
|
}
|
|
return ips
|
|
}
|
|
|
|
func hosts(entries discovery.Entries) mapset.Set {
|
|
hosts := mapset.NewSet()
|
|
for _, entry := range entries {
|
|
hosts.Add(entry.Host)
|
|
}
|
|
return hosts
|
|
}
|