ClusterManager: Implements support of remote clusters.
After this change users will be able to:
* Create cluster and add/remove servers to it
* Install apps on remote cluster
* Move already installed apps between clusters
* Apps running on server being removed will auto-migrate
to another server from that same cluster
This is achieved by:
* Installing and running minimal version of dodo on remote cluster
* Ingress-nginx is installed automatically on new clusters
* Next to nginx we run VPN client in the same pod, so that
default cluster can establish secure communication with it
* Multiple reverse proxies are configured to get to the
remote cluster service from ingress installed on default cluster.
Next steps:
* Support remote clusters in dodo apps (prototype ready)
* Clean up old cluster when moving app to the new one. Currently
old cluster keeps running app pods even though no ingress can
reach it anymore.
Change-Id: Iffc908c93416d4126a8e1c2832eae7b659cb8044
diff --git a/core/installer/cluster/kube.go b/core/installer/cluster/kube.go
new file mode 100644
index 0000000..ee123fa
--- /dev/null
+++ b/core/installer/cluster/kube.go
@@ -0,0 +1,280 @@
+package cluster
+
+import (
+ "context"
+ "fmt"
+ "golang.org/x/crypto/ssh"
+ "net"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/giolekva/pcloud/core/installer/kube"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/kubectl/pkg/drain"
+)
+
+type KubeManager struct {
+ l sync.Locker
+ name string
+ ingressClassName string
+ ingressIP net.IP
+ kubeCfg string
+ serverAddr string
+ serverToken string
+ controllers []Server
+ workers []Server
+}
+
+func NewKubeManager() *KubeManager {
+ return &KubeManager{l: &sync.Mutex{}}
+}
+
+func RestoreKubeManager(st State) (*KubeManager, error) {
+ return &KubeManager{
+ l: &sync.Mutex{},
+ name: st.Name,
+ ingressClassName: st.IngressClassName,
+ ingressIP: st.IngressIP,
+ kubeCfg: st.Kubeconfig,
+ serverAddr: st.ServerAddr,
+ serverToken: st.ServerToken,
+ controllers: st.Controllers,
+ workers: st.Workers,
+ }, nil
+}
+
+func (m *KubeManager) State() State {
+ m.l.Lock()
+ defer m.l.Unlock()
+ return State{
+ m.name,
+ m.ingressClassName,
+ m.ingressIP,
+ m.serverAddr,
+ m.serverToken,
+ m.kubeCfg,
+ m.controllers,
+ m.workers,
+ }
+}
+
+func (m *KubeManager) Init(s Server, setupFn ClusterSetupFunc) (net.IP, error) {
+ m.l.Lock()
+ defer m.l.Unlock()
+ if m.kubeCfg != "" {
+ return nil, fmt.Errorf("already initialized")
+ }
+ c, err := m.connect(&s)
+ if err != nil {
+ return nil, err
+ }
+ defer c.Close()
+ if err := InstallTailscale(c); err != nil {
+ return nil, err
+ }
+ const loginServer = "https://headscale.v1.dodo.cloud"
+ if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
+ return nil, err
+ }
+ if err := InstallK3s(c); err != nil {
+ return nil, err
+ }
+ kubeCfg, err := GetKubeconfig(c)
+ if err != nil {
+ return nil, err
+ }
+ m.kubeCfg = kubeCfg
+ serverIP, err := GetTailscaleIP(c)
+ if err != nil {
+ return nil, err
+ }
+ m.serverAddr = fmt.Sprintf("%s:6443", serverIP)
+ serverToken, err := GetServerToken(c)
+ if err != nil {
+ return nil, err
+ }
+ m.serverToken = serverToken
+ m.controllers = []Server{s}
+ m.ingressClassName = "default"
+ ingressIP, err := setupFn(m.name, m.kubeCfg, m.ingressClassName)
+ if err != nil {
+ return nil, err
+ }
+ m.ingressIP = ingressIP
+ return ingressIP, nil
+}
+
+func (m *KubeManager) JoinController(s Server) error {
+ m.l.Lock()
+ defer m.l.Unlock()
+ if m.kubeCfg == "" {
+ return fmt.Errorf("not initialized")
+ }
+ if i := m.findServerByIP(s.IP); i != nil {
+ return fmt.Errorf("already exists")
+ }
+ c, err := m.connect(&s)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ if err := InstallTailscale(c); err != nil {
+ return err
+ }
+ const loginServer = "https://headscale.v1.dodo.cloud"
+ if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
+ return err
+ }
+ if err := InstallK3sJoinServer(c, m.serverAddr, m.serverToken); err != nil {
+ return err
+ }
+ m.controllers = append(m.controllers, s)
+ return nil
+}
+
+func (m *KubeManager) JoinWorker(s Server) error {
+ m.l.Lock()
+ defer m.l.Unlock()
+ if m.kubeCfg == "" {
+ return fmt.Errorf("not initialized")
+ }
+ if i := m.findServerByIP(s.IP); i != nil {
+ return fmt.Errorf("already exists")
+ }
+ c, err := m.connect(&s)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ if err := InstallTailscale(c); err != nil {
+ return err
+ }
+ const loginServer = "https://headscale.v1.dodo.cloud"
+ if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
+ return err
+ }
+ if err := InstallK3sJoinAgent(c, m.serverAddr, m.serverToken); err != nil {
+ return err
+ }
+ m.workers = append(m.workers, s)
+ return nil
+}
+
+func (m *KubeManager) RemoveServer(name string) error {
+ m.l.Lock()
+ defer m.l.Unlock()
+ client, err := kube.NewKubeClient(kube.KubeConfigOpts{
+ KubeConfig: m.kubeCfg,
+ })
+ if err != nil {
+ return err
+ }
+ helper := &drain.Helper{
+ Ctx: context.Background(),
+ Client: client,
+ Force: true,
+ GracePeriodSeconds: -1,
+ IgnoreAllDaemonSets: true,
+ Out: os.Stdout,
+ ErrOut: os.Stdout,
+ // We want to proceed even when pods are using emptyDir volumes
+ DeleteEmptyDirData: true,
+ Timeout: 10 * time.Minute,
+ }
+ node, err := client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ if err := drain.RunCordonOrUncordon(helper, node, true); err != nil {
+ return err
+ }
+ if err := drain.RunNodeDrain(helper, name); err != nil {
+ return err
+ }
+ if err := client.CoreV1().Nodes().Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil {
+ return err
+ }
+ for i, s := range m.controllers {
+ if s.Name == name {
+ c, err := m.connect(&s)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ if err := UninstallK3sServer(c); err != nil {
+ return err
+ }
+ m.controllers = append(m.controllers[:i], m.controllers[i+1:]...)
+ return nil
+ }
+ }
+ for i, s := range m.workers {
+ if s.Name == name {
+ c, err := m.connect(&s)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ if err := UninstallK3sAgent(c); err != nil {
+ return err
+ }
+ m.workers = append(m.workers[:i], m.workers[i+1:]...)
+ return nil
+ }
+ }
+ return fmt.Errorf("not found")
+}
+
+// Expects manager state to be locked by caller.
+func (m *KubeManager) findServerByIP(ip net.IP) *Server {
+ for _, s := range m.controllers {
+ if s.IP.Equal(ip) {
+ return &s
+ }
+ }
+ for _, s := range m.workers {
+ if ip.Equal(s.IP) {
+ return &s
+ }
+ }
+ return nil
+}
+
+func (m *KubeManager) connect(s *Server) (*SSHClient, error) {
+ cfg := &ssh.ClientConfig{
+ User: s.User,
+ Auth: []ssh.AuthMethod{},
+ Timeout: 10 * time.Second,
+ }
+ if s.ClientKey != "" {
+ clientKey, err := ssh.ParsePrivateKey([]byte(s.ClientKey))
+ if err != nil {
+ return nil, err
+ }
+ cfg.Auth = append(cfg.Auth, ssh.PublicKeys(clientKey))
+ }
+ if s.Password != "" {
+ cfg.Auth = append(cfg.Auth, ssh.Password(s.Password))
+ }
+ if s.HostKey != "" {
+ hostKey, err := ssh.ParsePublicKey([]byte(s.HostKey))
+ if err != nil {
+ return nil, err
+ }
+ cfg.HostKeyCallback = ssh.FixedHostKey(hostKey)
+ } else {
+ cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey()
+ }
+ client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.IP.String(), s.Port), cfg)
+ if err != nil {
+ return nil, err
+ }
+ ret := &SSHClient{client}
+ s.Name, err = GetHostname(ret)
+ if err != nil {
+ return nil, err
+ }
+ return ret, nil
+}
diff --git a/core/installer/cluster/manager.go b/core/installer/cluster/manager.go
new file mode 100644
index 0000000..c8e62d1
--- /dev/null
+++ b/core/installer/cluster/manager.go
@@ -0,0 +1,43 @@
+package cluster
+
+import (
+ "net"
+)
+
+const (
+ whichTailscale = "which tailscale"
+ tailscaleInstallCmd = "curl -fsSL https://tailscale.com/install.sh | sh"
+ tailscaleUpCmd = "sudo tailscale up --login-server=%s --auth-key=%s --hostname=%s --reset"
+)
+
+type Server struct {
+ Name string `json:"name"`
+ IP net.IP `json:"ip"`
+ Port int `json:"port"`
+ HostKey string `json:"hostKey"`
+ User string `json:"user"`
+ Password string `json:"password"`
+ ClientKey string `json:"clientKey"`
+ AuthKey string `json:"authKey"`
+}
+
+type State struct {
+ Name string `json:"name"`
+ IngressClassName string `json:"ingressClassName"`
+ IngressIP net.IP `json:"ingressIP"`
+ ServerAddr string `json:"serverAddr"`
+ ServerToken string `json:"serverToken"`
+ Kubeconfig string `json:"kubeconfig"`
+ Controllers []Server `json:"controllers"`
+ Workers []Server `json:"workers"`
+}
+
+type ClusterSetupFunc func(name, kubeconfig, ingressClassName string) (net.IP, error)
+
+type Manager interface {
+ Init(s Server, setupFn ClusterSetupFunc) (net.IP, error)
+ JoinController(s Server) error
+ JoinWorker(s Server) error
+ RemoveServer(name string) error
+ State() State
+}
diff --git a/core/installer/cluster/ssh.go b/core/installer/cluster/ssh.go
new file mode 100644
index 0000000..751a797
--- /dev/null
+++ b/core/installer/cluster/ssh.go
@@ -0,0 +1,145 @@
+package cluster
+
+import (
+ "bytes"
+ "fmt"
+ "golang.org/x/crypto/ssh"
+ "os"
+ "strings"
+)
+
+type SSHClient struct {
+ client *ssh.Client
+}
+
+func (c *SSHClient) Close() error {
+ return c.client.Close()
+}
+
+func (c *SSHClient) Exec(cmd string) (string, error) {
+ ses, err := c.client.NewSession()
+ if err != nil {
+ return "", err
+ }
+ defer ses.Close()
+ var out bytes.Buffer
+ ses.Stdout = &out
+ ses.Stderr = os.Stdout
+ err = ses.Run(cmd)
+ if err != nil {
+ return "", err
+ }
+ return out.String(), nil
+}
+
+func GetHostname(c *SSHClient) (string, error) {
+ name, err := c.Exec("hostname")
+ if err != nil {
+ return "", nil
+ }
+ return strings.TrimSpace(name), nil
+}
+
+func InstallTailscale(c *SSHClient) error {
+ return nil
+ fmt.Println("Installing Tailscale")
+ if _, err := c.Exec("which tailscale"); err == nil {
+ return nil
+ }
+ _, err := c.Exec(tailscaleInstallCmd)
+ return err
+}
+
+func TailscaleUp(c *SSHClient, loginServer, hostname, authKey string) error {
+ return nil
+ fmt.Println("Starting up Tailscale")
+ if _, err := c.Exec("sudo tailscale down"); err != nil {
+ return err
+ }
+ cmd := fmt.Sprintf(tailscaleUpCmd, loginServer, authKey, hostname)
+ fmt.Println(cmd)
+ _, err := c.Exec(cmd)
+ return err
+}
+
+func InstallK3s(c *SSHClient) error {
+ fmt.Println("Starting k3s")
+ if _, err := c.Exec("which k3s"); err == nil {
+ return nil
+ }
+ _, err := c.Exec("curl -sfL https://get.k3s.io | sh -s - --cluster-init --disable traefik --disable local-storage --disable servicelb --kube-proxy-arg proxy-mode=ipvs --kube-proxy-arg ipvs-strict-arp --flannel-backend vxlan --cluster-cidr=10.45.0.0/16 --service-cidr=10.46.0.0/16 # --flannel-iface=tailscale0")
+ return err
+}
+
+func InstallK3sJoinServer(c *SSHClient, serverAddr, token string) error {
+ fmt.Println("Starting k3s")
+ if _, err := c.Exec("which k3s"); err == nil {
+ return nil
+ }
+ _, err := c.Exec(fmt.Sprintf("curl -sfL https://get.k3s.io | sh -s - server --server=https://%s --token=%s --disable traefik --disable local-storage --disable servicelb --kube-proxy-arg proxy-mode=ipvs --kube-proxy-arg ipvs-strict-arp --flannel-backend vxlan --cluster-cidr=10.45.0.0/16 --service-cidr=10.46.0.0/16 # --flannel-iface=tailscale0", serverAddr, token))
+ return err
+}
+
+func InstallK3sJoinAgent(c *SSHClient, serverAddr, token string) error {
+ fmt.Println("Starting k3s")
+ if _, err := c.Exec("which k3s"); err == nil {
+ return nil
+ }
+ _, err := c.Exec(fmt.Sprintf("curl -sfL https://get.k3s.io | sh -s - agent --server=https://%s --token=%s", serverAddr, token))
+ return err
+}
+
+func UninstallK3sServer(c *SSHClient) error {
+ fmt.Println("Uninstalling k3s")
+ if _, err := c.Exec("which k3s-uninstall.sh"); err != nil {
+ return nil
+ }
+ _, err := c.Exec("k3s-uninstall.sh")
+ return err
+}
+
+func UninstallK3sAgent(c *SSHClient) error {
+ fmt.Println("Uninstalling k3s")
+ if _, err := c.Exec("which k3s-agent-uninstall.sh"); err != nil {
+ return nil
+ }
+ _, err := c.Exec("k3s-agent-uninstall.sh")
+ return err
+}
+
+func GetTailscaleIP(c *SSHClient) (string, error) {
+ fmt.Println("Getting Tailscale IP")
+ if _, err := c.Exec("sudo apt-get install net-tools -y"); err != nil {
+ return "", err
+ }
+ ip, err := c.Exec("sudo ifconfig | grep 10.42")
+ if err != nil {
+ return "", err
+ }
+ return strings.Fields(ip)[1], nil
+ // ip, err := c.Exec("sudo tailscale ip")
+ // return strings.TrimSpace(ip), err
+}
+
+func GetKubeconfig(c *SSHClient) (string, error) {
+ // return "", nil
+ fmt.Println("Getting Kubeconfig")
+ out, err := c.Exec("sudo cat /etc/rancher/k3s/k3s.yaml")
+ if err != nil {
+ return "", err
+ }
+ ip, err := GetTailscaleIP(c)
+ if err != nil {
+ return "", err
+ }
+ return strings.Replace(out, "server: https://127.0.0.1:6443", fmt.Sprintf("server: https://%s:6443", ip), 1), nil
+}
+
+func GetServerToken(c *SSHClient) (string, error) {
+ fmt.Println("Getting server token")
+ out, err := c.Exec("sudo cat /var/lib/rancher/k3s/server/node-token")
+ if err != nil {
+ return "", err
+ }
+ return strings.TrimSpace(out), err
+}