blob: 11fe746d2900d97fd852b0317bff7b6e369881df [file] [log] [blame]
giof6ad2982024-08-23 17:42:49 +04001package cluster
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/crypto/ssh"
7 "net"
8 "os"
9 "sync"
10 "time"
11
12 "github.com/giolekva/pcloud/core/installer/kube"
13
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 "k8s.io/kubectl/pkg/drain"
16)
17
18type KubeManager struct {
19 l sync.Locker
20 name string
21 ingressClassName string
22 ingressIP net.IP
23 kubeCfg string
24 serverAddr string
25 serverToken string
26 controllers []Server
27 workers []Server
gio8f290322024-09-21 15:37:45 +040028 storageEnabled bool
giof6ad2982024-08-23 17:42:49 +040029}
30
31func NewKubeManager() *KubeManager {
32 return &KubeManager{l: &sync.Mutex{}}
33}
34
35func RestoreKubeManager(st State) (*KubeManager, error) {
36 return &KubeManager{
37 l: &sync.Mutex{},
38 name: st.Name,
39 ingressClassName: st.IngressClassName,
40 ingressIP: st.IngressIP,
41 kubeCfg: st.Kubeconfig,
42 serverAddr: st.ServerAddr,
43 serverToken: st.ServerToken,
44 controllers: st.Controllers,
45 workers: st.Workers,
gio8f290322024-09-21 15:37:45 +040046 storageEnabled: st.StorageEnabled,
giof6ad2982024-08-23 17:42:49 +040047 }, nil
48}
49
50func (m *KubeManager) State() State {
51 m.l.Lock()
52 defer m.l.Unlock()
53 return State{
54 m.name,
55 m.ingressClassName,
56 m.ingressIP,
57 m.serverAddr,
58 m.serverToken,
59 m.kubeCfg,
60 m.controllers,
61 m.workers,
gio8f290322024-09-21 15:37:45 +040062 m.storageEnabled,
giof6ad2982024-08-23 17:42:49 +040063 }
64}
65
gio8f290322024-09-21 15:37:45 +040066func (m *KubeManager) EnableStorage() {
67 m.storageEnabled = true
68}
69
70func (m *KubeManager) Init(s Server, setupFn ClusterIngressSetupFunc) (net.IP, error) {
giof6ad2982024-08-23 17:42:49 +040071 m.l.Lock()
72 defer m.l.Unlock()
73 if m.kubeCfg != "" {
74 return nil, fmt.Errorf("already initialized")
75 }
76 c, err := m.connect(&s)
77 if err != nil {
78 return nil, err
79 }
80 defer c.Close()
81 if err := InstallTailscale(c); err != nil {
82 return nil, err
83 }
84 const loginServer = "https://headscale.v1.dodo.cloud"
85 if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
86 return nil, err
87 }
88 if err := InstallK3s(c); err != nil {
89 return nil, err
90 }
91 kubeCfg, err := GetKubeconfig(c)
92 if err != nil {
93 return nil, err
94 }
95 m.kubeCfg = kubeCfg
96 serverIP, err := GetTailscaleIP(c)
97 if err != nil {
98 return nil, err
99 }
100 m.serverAddr = fmt.Sprintf("%s:6443", serverIP)
101 serverToken, err := GetServerToken(c)
102 if err != nil {
103 return nil, err
104 }
105 m.serverToken = serverToken
106 m.controllers = []Server{s}
107 m.ingressClassName = "default"
108 ingressIP, err := setupFn(m.name, m.kubeCfg, m.ingressClassName)
109 if err != nil {
110 return nil, err
111 }
112 m.ingressIP = ingressIP
113 return ingressIP, nil
114}
115
116func (m *KubeManager) JoinController(s Server) error {
117 m.l.Lock()
118 defer m.l.Unlock()
119 if m.kubeCfg == "" {
120 return fmt.Errorf("not initialized")
121 }
122 if i := m.findServerByIP(s.IP); i != nil {
123 return fmt.Errorf("already exists")
124 }
125 c, err := m.connect(&s)
126 if err != nil {
127 return err
128 }
129 defer c.Close()
130 if err := InstallTailscale(c); err != nil {
131 return err
132 }
133 const loginServer = "https://headscale.v1.dodo.cloud"
134 if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
135 return err
136 }
137 if err := InstallK3sJoinServer(c, m.serverAddr, m.serverToken); err != nil {
138 return err
139 }
140 m.controllers = append(m.controllers, s)
141 return nil
142}
143
144func (m *KubeManager) JoinWorker(s Server) error {
145 m.l.Lock()
146 defer m.l.Unlock()
147 if m.kubeCfg == "" {
148 return fmt.Errorf("not initialized")
149 }
150 if i := m.findServerByIP(s.IP); i != nil {
151 return fmt.Errorf("already exists")
152 }
153 c, err := m.connect(&s)
154 if err != nil {
155 return err
156 }
157 defer c.Close()
158 if err := InstallTailscale(c); err != nil {
159 return err
160 }
161 const loginServer = "https://headscale.v1.dodo.cloud"
162 if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
163 return err
164 }
165 if err := InstallK3sJoinAgent(c, m.serverAddr, m.serverToken); err != nil {
166 return err
167 }
168 m.workers = append(m.workers, s)
169 return nil
170}
171
172func (m *KubeManager) RemoveServer(name string) error {
173 m.l.Lock()
174 defer m.l.Unlock()
175 client, err := kube.NewKubeClient(kube.KubeConfigOpts{
176 KubeConfig: m.kubeCfg,
177 })
178 if err != nil {
179 return err
180 }
181 helper := &drain.Helper{
182 Ctx: context.Background(),
183 Client: client,
184 Force: true,
185 GracePeriodSeconds: -1,
186 IgnoreAllDaemonSets: true,
187 Out: os.Stdout,
188 ErrOut: os.Stdout,
189 // We want to proceed even when pods are using emptyDir volumes
190 DeleteEmptyDirData: true,
191 Timeout: 10 * time.Minute,
192 }
193 node, err := client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
194 if err != nil {
195 return err
196 }
197 if err := drain.RunCordonOrUncordon(helper, node, true); err != nil {
198 return err
199 }
200 if err := drain.RunNodeDrain(helper, name); err != nil {
201 return err
202 }
203 if err := client.CoreV1().Nodes().Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil {
204 return err
205 }
206 for i, s := range m.controllers {
207 if s.Name == name {
208 c, err := m.connect(&s)
209 if err != nil {
210 return err
211 }
212 defer c.Close()
213 if err := UninstallK3sServer(c); err != nil {
214 return err
215 }
216 m.controllers = append(m.controllers[:i], m.controllers[i+1:]...)
217 return nil
218 }
219 }
220 for i, s := range m.workers {
221 if s.Name == name {
222 c, err := m.connect(&s)
223 if err != nil {
224 return err
225 }
226 defer c.Close()
227 if err := UninstallK3sAgent(c); err != nil {
228 return err
229 }
230 m.workers = append(m.workers[:i], m.workers[i+1:]...)
231 return nil
232 }
233 }
234 return fmt.Errorf("not found")
235}
236
237// Expects manager state to be locked by caller.
238func (m *KubeManager) findServerByIP(ip net.IP) *Server {
239 for _, s := range m.controllers {
240 if s.IP.Equal(ip) {
241 return &s
242 }
243 }
244 for _, s := range m.workers {
245 if ip.Equal(s.IP) {
246 return &s
247 }
248 }
249 return nil
250}
251
252func (m *KubeManager) connect(s *Server) (*SSHClient, error) {
253 cfg := &ssh.ClientConfig{
254 User: s.User,
255 Auth: []ssh.AuthMethod{},
256 Timeout: 10 * time.Second,
257 }
258 if s.ClientKey != "" {
259 clientKey, err := ssh.ParsePrivateKey([]byte(s.ClientKey))
260 if err != nil {
261 return nil, err
262 }
263 cfg.Auth = append(cfg.Auth, ssh.PublicKeys(clientKey))
264 }
265 if s.Password != "" {
266 cfg.Auth = append(cfg.Auth, ssh.Password(s.Password))
267 }
268 if s.HostKey != "" {
269 hostKey, err := ssh.ParsePublicKey([]byte(s.HostKey))
270 if err != nil {
271 return nil, err
272 }
273 cfg.HostKeyCallback = ssh.FixedHostKey(hostKey)
274 } else {
275 cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey()
276 }
277 client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.IP.String(), s.Port), cfg)
278 if err != nil {
279 return nil, err
280 }
281 ret := &SSHClient{client}
282 s.Name, err = GetHostname(ret)
283 if err != nil {
284 return nil, err
285 }
286 return ret, nil
287}