ClusterManager: Implements support of remote clusters.

After this change users will be able to:
* Create cluster and add/remove servers to it
* Install apps on remote cluster
* Move already installed apps between clusters
* Apps running on server being removed will auto-migrate
  to another server from that same cluster

This is achieved by:
* Installing and running minimal version of dodo on remote cluster
* Ingress-nginx is installed automatically on new clusters
* Next to nginx we run VPN client in the same pod, so that
  default cluster can establish secure communication with it
* Multiple reverse proxies are configured to get to the
  remote cluster service from ingress installed on default cluster.

Next steps:
* Support remote clusters in dodo apps (prototype ready)
* Clean up old cluster when moving app to the new one. Currently
  old cluster keeps running app pods even though no ingress can
  reach it anymore.

Change-Id: Iffc908c93416d4126a8e1c2832eae7b659cb8044
diff --git a/core/installer/cluster/kube.go b/core/installer/cluster/kube.go
new file mode 100644
index 0000000..ee123fa
--- /dev/null
+++ b/core/installer/cluster/kube.go
@@ -0,0 +1,280 @@
+package cluster
+
+import (
+	"context"
+	"fmt"
+	"golang.org/x/crypto/ssh"
+	"net"
+	"os"
+	"sync"
+	"time"
+
+	"github.com/giolekva/pcloud/core/installer/kube"
+
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/kubectl/pkg/drain"
+)
+
+type KubeManager struct {
+	l                sync.Locker
+	name             string
+	ingressClassName string
+	ingressIP        net.IP
+	kubeCfg          string
+	serverAddr       string
+	serverToken      string
+	controllers      []Server
+	workers          []Server
+}
+
+func NewKubeManager() *KubeManager {
+	return &KubeManager{l: &sync.Mutex{}}
+}
+
+func RestoreKubeManager(st State) (*KubeManager, error) {
+	return &KubeManager{
+		l:                &sync.Mutex{},
+		name:             st.Name,
+		ingressClassName: st.IngressClassName,
+		ingressIP:        st.IngressIP,
+		kubeCfg:          st.Kubeconfig,
+		serverAddr:       st.ServerAddr,
+		serverToken:      st.ServerToken,
+		controllers:      st.Controllers,
+		workers:          st.Workers,
+	}, nil
+}
+
+func (m *KubeManager) State() State {
+	m.l.Lock()
+	defer m.l.Unlock()
+	return State{
+		m.name,
+		m.ingressClassName,
+		m.ingressIP,
+		m.serverAddr,
+		m.serverToken,
+		m.kubeCfg,
+		m.controllers,
+		m.workers,
+	}
+}
+
+func (m *KubeManager) Init(s Server, setupFn ClusterSetupFunc) (net.IP, error) {
+	m.l.Lock()
+	defer m.l.Unlock()
+	if m.kubeCfg != "" {
+		return nil, fmt.Errorf("already initialized")
+	}
+	c, err := m.connect(&s)
+	if err != nil {
+		return nil, err
+	}
+	defer c.Close()
+	if err := InstallTailscale(c); err != nil {
+		return nil, err
+	}
+	const loginServer = "https://headscale.v1.dodo.cloud"
+	if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
+		return nil, err
+	}
+	if err := InstallK3s(c); err != nil {
+		return nil, err
+	}
+	kubeCfg, err := GetKubeconfig(c)
+	if err != nil {
+		return nil, err
+	}
+	m.kubeCfg = kubeCfg
+	serverIP, err := GetTailscaleIP(c)
+	if err != nil {
+		return nil, err
+	}
+	m.serverAddr = fmt.Sprintf("%s:6443", serverIP)
+	serverToken, err := GetServerToken(c)
+	if err != nil {
+		return nil, err
+	}
+	m.serverToken = serverToken
+	m.controllers = []Server{s}
+	m.ingressClassName = "default"
+	ingressIP, err := setupFn(m.name, m.kubeCfg, m.ingressClassName)
+	if err != nil {
+		return nil, err
+	}
+	m.ingressIP = ingressIP
+	return ingressIP, nil
+}
+
+func (m *KubeManager) JoinController(s Server) error {
+	m.l.Lock()
+	defer m.l.Unlock()
+	if m.kubeCfg == "" {
+		return fmt.Errorf("not initialized")
+	}
+	if i := m.findServerByIP(s.IP); i != nil {
+		return fmt.Errorf("already exists")
+	}
+	c, err := m.connect(&s)
+	if err != nil {
+		return err
+	}
+	defer c.Close()
+	if err := InstallTailscale(c); err != nil {
+		return err
+	}
+	const loginServer = "https://headscale.v1.dodo.cloud"
+	if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
+		return err
+	}
+	if err := InstallK3sJoinServer(c, m.serverAddr, m.serverToken); err != nil {
+		return err
+	}
+	m.controllers = append(m.controllers, s)
+	return nil
+}
+
+func (m *KubeManager) JoinWorker(s Server) error {
+	m.l.Lock()
+	defer m.l.Unlock()
+	if m.kubeCfg == "" {
+		return fmt.Errorf("not initialized")
+	}
+	if i := m.findServerByIP(s.IP); i != nil {
+		return fmt.Errorf("already exists")
+	}
+	c, err := m.connect(&s)
+	if err != nil {
+		return err
+	}
+	defer c.Close()
+	if err := InstallTailscale(c); err != nil {
+		return err
+	}
+	const loginServer = "https://headscale.v1.dodo.cloud"
+	if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
+		return err
+	}
+	if err := InstallK3sJoinAgent(c, m.serverAddr, m.serverToken); err != nil {
+		return err
+	}
+	m.workers = append(m.workers, s)
+	return nil
+}
+
+func (m *KubeManager) RemoveServer(name string) error {
+	m.l.Lock()
+	defer m.l.Unlock()
+	client, err := kube.NewKubeClient(kube.KubeConfigOpts{
+		KubeConfig: m.kubeCfg,
+	})
+	if err != nil {
+		return err
+	}
+	helper := &drain.Helper{
+		Ctx:                 context.Background(),
+		Client:              client,
+		Force:               true,
+		GracePeriodSeconds:  -1,
+		IgnoreAllDaemonSets: true,
+		Out:                 os.Stdout,
+		ErrOut:              os.Stdout,
+		// We want to proceed even when pods are using emptyDir volumes
+		DeleteEmptyDirData: true,
+		Timeout:            10 * time.Minute,
+	}
+	node, err := client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
+	if err != nil {
+		return err
+	}
+	if err := drain.RunCordonOrUncordon(helper, node, true); err != nil {
+		return err
+	}
+	if err := drain.RunNodeDrain(helper, name); err != nil {
+		return err
+	}
+	if err := client.CoreV1().Nodes().Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil {
+		return err
+	}
+	for i, s := range m.controllers {
+		if s.Name == name {
+			c, err := m.connect(&s)
+			if err != nil {
+				return err
+			}
+			defer c.Close()
+			if err := UninstallK3sServer(c); err != nil {
+				return err
+			}
+			m.controllers = append(m.controllers[:i], m.controllers[i+1:]...)
+			return nil
+		}
+	}
+	for i, s := range m.workers {
+		if s.Name == name {
+			c, err := m.connect(&s)
+			if err != nil {
+				return err
+			}
+			defer c.Close()
+			if err := UninstallK3sAgent(c); err != nil {
+				return err
+			}
+			m.workers = append(m.workers[:i], m.workers[i+1:]...)
+			return nil
+		}
+	}
+	return fmt.Errorf("not found")
+}
+
+// Expects manager state to be locked by caller.
+func (m *KubeManager) findServerByIP(ip net.IP) *Server {
+	for _, s := range m.controllers {
+		if s.IP.Equal(ip) {
+			return &s
+		}
+	}
+	for _, s := range m.workers {
+		if ip.Equal(s.IP) {
+			return &s
+		}
+	}
+	return nil
+}
+
+func (m *KubeManager) connect(s *Server) (*SSHClient, error) {
+	cfg := &ssh.ClientConfig{
+		User:    s.User,
+		Auth:    []ssh.AuthMethod{},
+		Timeout: 10 * time.Second,
+	}
+	if s.ClientKey != "" {
+		clientKey, err := ssh.ParsePrivateKey([]byte(s.ClientKey))
+		if err != nil {
+			return nil, err
+		}
+		cfg.Auth = append(cfg.Auth, ssh.PublicKeys(clientKey))
+	}
+	if s.Password != "" {
+		cfg.Auth = append(cfg.Auth, ssh.Password(s.Password))
+	}
+	if s.HostKey != "" {
+		hostKey, err := ssh.ParsePublicKey([]byte(s.HostKey))
+		if err != nil {
+			return nil, err
+		}
+		cfg.HostKeyCallback = ssh.FixedHostKey(hostKey)
+	} else {
+		cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey()
+	}
+	client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.IP.String(), s.Port), cfg)
+	if err != nil {
+		return nil, err
+	}
+	ret := &SSHClient{client}
+	s.Name, err = GetHostname(ret)
+	if err != nil {
+		return nil, err
+	}
+	return ret, nil
+}