blob: 3391e3e2086a02fce3b66a86f73567ee65328c6d [file] [log] [blame]
package cluster
import (
"context"
"fmt"
"golang.org/x/crypto/ssh"
"net"
"os"
"sync"
"time"
"github.com/giolekva/pcloud/core/installer/kube"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubectl/pkg/drain"
)
type KubeManager struct {
l sync.Locker
name string
ingressClassName string
ingressIP net.IP
kubeCfg string
serverAddr string
serverToken string
controllers []Server
workers []Server
storageEnabled bool
}
func NewKubeManager() *KubeManager {
return &KubeManager{l: &sync.Mutex{}}
}
func RestoreKubeManager(st State) (*KubeManager, error) {
return &KubeManager{
l: &sync.Mutex{},
name: st.Name,
ingressClassName: st.IngressClassName,
ingressIP: st.IngressIP,
kubeCfg: st.Kubeconfig,
serverAddr: st.ServerAddr,
serverToken: st.ServerToken,
controllers: st.Controllers,
workers: st.Workers,
storageEnabled: st.StorageEnabled,
}, nil
}
func (m *KubeManager) State() State {
m.l.Lock()
defer m.l.Unlock()
return State{
m.name,
m.ingressClassName,
m.ingressIP,
m.serverAddr,
m.serverToken,
m.kubeCfg,
m.controllers,
m.workers,
m.storageEnabled,
}
}
func (m *KubeManager) EnableStorage() {
m.storageEnabled = true
}
func (m *KubeManager) Init(s Server, setupFn ClusterIngressSetupFunc) (net.IP, error) {
m.l.Lock()
defer m.l.Unlock()
if m.kubeCfg != "" {
return nil, fmt.Errorf("already initialized")
}
c, err := m.connect(&s)
if err != nil {
return nil, err
}
defer c.Close()
if err := InstallTailscale(c); err != nil {
return nil, err
}
const loginServer = "https://headscale.v1.dodo.cloud"
if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
return nil, err
}
if err := InstallK3s(c); err != nil {
return nil, err
}
kubeCfg, err := GetKubeconfig(c)
if err != nil {
return nil, err
}
m.kubeCfg = kubeCfg
serverIP, err := GetTailscaleIP(c)
if err != nil {
return nil, err
}
m.serverAddr = fmt.Sprintf("%s:6443", serverIP)
serverToken, err := GetServerToken(c)
if err != nil {
return nil, err
}
m.serverToken = serverToken
m.controllers = []Server{s}
m.ingressClassName = "default"
ingressIP, err := setupFn(m.name, m.kubeCfg, m.ingressClassName)
if err != nil {
return nil, err
}
m.ingressIP = ingressIP
return ingressIP, nil
}
func (m *KubeManager) JoinController(s Server) error {
m.l.Lock()
defer m.l.Unlock()
if m.kubeCfg == "" {
return fmt.Errorf("not initialized")
}
if i := m.findServerByIP(s.IP); i != nil {
return fmt.Errorf("already exists")
}
c, err := m.connect(&s)
if err != nil {
return err
}
defer c.Close()
if err := InstallTailscale(c); err != nil {
return err
}
const loginServer = "https://headscale.v1.dodo.cloud"
if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
return err
}
if err := InstallK3sJoinServer(c, m.serverAddr, m.serverToken); err != nil {
return err
}
m.controllers = append(m.controllers, s)
return nil
}
func (m *KubeManager) JoinWorker(s Server) error {
m.l.Lock()
defer m.l.Unlock()
if m.kubeCfg == "" {
return fmt.Errorf("not initialized")
}
if i := m.findServerByIP(s.IP); i != nil {
return fmt.Errorf("already exists")
}
c, err := m.connect(&s)
if err != nil {
return err
}
defer c.Close()
if err := InstallTailscale(c); err != nil {
return err
}
const loginServer = "https://headscale.v1.dodo.cloud"
if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
return err
}
if err := InstallK3sJoinAgent(c, m.serverAddr, m.serverToken); err != nil {
return err
}
m.workers = append(m.workers, s)
return nil
}
func (m *KubeManager) RemoveServer(name string) error {
m.l.Lock()
defer m.l.Unlock()
if len(m.controllers)+len(m.workers) > 1 {
client, err := kube.NewKubeClient(kube.KubeConfigOpts{
KubeConfig: m.kubeCfg,
})
if err != nil {
return err
}
helper := &drain.Helper{
Ctx: context.Background(),
Client: client,
Force: true,
GracePeriodSeconds: -1,
IgnoreAllDaemonSets: true,
Out: os.Stdout,
ErrOut: os.Stdout,
// We want to proceed even when pods are using emptyDir volumes
DeleteEmptyDirData: true,
Timeout: 10 * time.Minute,
}
node, err := client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return err
}
if err := drain.RunCordonOrUncordon(helper, node, true); err != nil {
return err
}
if err := drain.RunNodeDrain(helper, name); err != nil {
return err
}
if err := client.CoreV1().Nodes().Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil {
return err
}
}
for i, s := range m.controllers {
if s.Name == name {
c, err := m.connect(&s)
if err != nil {
return err
}
defer c.Close()
if err := UninstallK3sServer(c); err != nil {
return err
}
m.controllers = append(m.controllers[:i], m.controllers[i+1:]...)
return nil
}
}
for i, s := range m.workers {
if s.Name == name {
c, err := m.connect(&s)
if err != nil {
return err
}
defer c.Close()
if err := UninstallK3sAgent(c); err != nil {
return err
}
m.workers = append(m.workers[:i], m.workers[i+1:]...)
return nil
}
}
return fmt.Errorf("not found")
}
// Expects manager state to be locked by caller.
func (m *KubeManager) findServerByIP(ip net.IP) *Server {
for _, s := range m.controllers {
if s.IP.Equal(ip) {
return &s
}
}
for _, s := range m.workers {
if ip.Equal(s.IP) {
return &s
}
}
return nil
}
func (m *KubeManager) connect(s *Server) (*SSHClient, error) {
cfg := &ssh.ClientConfig{
User: s.User,
Auth: []ssh.AuthMethod{},
Timeout: 10 * time.Second,
}
if s.ClientKey != "" {
clientKey, err := ssh.ParsePrivateKey([]byte(s.ClientKey))
if err != nil {
return nil, err
}
cfg.Auth = append(cfg.Auth, ssh.PublicKeys(clientKey))
}
if s.Password != "" {
cfg.Auth = append(cfg.Auth, ssh.Password(s.Password))
}
if s.HostKey != "" {
hostKey, err := ssh.ParsePublicKey([]byte(s.HostKey))
if err != nil {
return nil, err
}
cfg.HostKeyCallback = ssh.FixedHostKey(hostKey)
} else {
cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey()
}
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.IP.String(), s.Port), cfg)
if err != nil {
return nil, err
}
ret := &SSHClient{client}
s.Name, err = GetHostname(ret)
if err != nil {
return nil, err
}
return ret, nil
}