blob: 3391e3e2086a02fce3b66a86f73567ee65328c6d [file] [log] [blame]
giof6ad2982024-08-23 17:42:49 +04001package cluster
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/crypto/ssh"
7 "net"
8 "os"
9 "sync"
10 "time"
11
12 "github.com/giolekva/pcloud/core/installer/kube"
13
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 "k8s.io/kubectl/pkg/drain"
16)
17
18type KubeManager struct {
19 l sync.Locker
20 name string
21 ingressClassName string
22 ingressIP net.IP
23 kubeCfg string
24 serverAddr string
25 serverToken string
26 controllers []Server
27 workers []Server
gio8f290322024-09-21 15:37:45 +040028 storageEnabled bool
giof6ad2982024-08-23 17:42:49 +040029}
30
31func NewKubeManager() *KubeManager {
32 return &KubeManager{l: &sync.Mutex{}}
33}
34
35func RestoreKubeManager(st State) (*KubeManager, error) {
36 return &KubeManager{
37 l: &sync.Mutex{},
38 name: st.Name,
39 ingressClassName: st.IngressClassName,
40 ingressIP: st.IngressIP,
41 kubeCfg: st.Kubeconfig,
42 serverAddr: st.ServerAddr,
43 serverToken: st.ServerToken,
44 controllers: st.Controllers,
45 workers: st.Workers,
gio8f290322024-09-21 15:37:45 +040046 storageEnabled: st.StorageEnabled,
giof6ad2982024-08-23 17:42:49 +040047 }, nil
48}
49
50func (m *KubeManager) State() State {
51 m.l.Lock()
52 defer m.l.Unlock()
53 return State{
54 m.name,
55 m.ingressClassName,
56 m.ingressIP,
57 m.serverAddr,
58 m.serverToken,
59 m.kubeCfg,
60 m.controllers,
61 m.workers,
gio8f290322024-09-21 15:37:45 +040062 m.storageEnabled,
giof6ad2982024-08-23 17:42:49 +040063 }
64}
65
gio8f290322024-09-21 15:37:45 +040066func (m *KubeManager) EnableStorage() {
67 m.storageEnabled = true
68}
69
70func (m *KubeManager) Init(s Server, setupFn ClusterIngressSetupFunc) (net.IP, error) {
giof6ad2982024-08-23 17:42:49 +040071 m.l.Lock()
72 defer m.l.Unlock()
73 if m.kubeCfg != "" {
74 return nil, fmt.Errorf("already initialized")
75 }
76 c, err := m.connect(&s)
77 if err != nil {
78 return nil, err
79 }
80 defer c.Close()
81 if err := InstallTailscale(c); err != nil {
82 return nil, err
83 }
84 const loginServer = "https://headscale.v1.dodo.cloud"
85 if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
86 return nil, err
87 }
88 if err := InstallK3s(c); err != nil {
89 return nil, err
90 }
91 kubeCfg, err := GetKubeconfig(c)
92 if err != nil {
93 return nil, err
94 }
95 m.kubeCfg = kubeCfg
96 serverIP, err := GetTailscaleIP(c)
97 if err != nil {
98 return nil, err
99 }
100 m.serverAddr = fmt.Sprintf("%s:6443", serverIP)
101 serverToken, err := GetServerToken(c)
102 if err != nil {
103 return nil, err
104 }
105 m.serverToken = serverToken
106 m.controllers = []Server{s}
107 m.ingressClassName = "default"
108 ingressIP, err := setupFn(m.name, m.kubeCfg, m.ingressClassName)
109 if err != nil {
110 return nil, err
111 }
112 m.ingressIP = ingressIP
113 return ingressIP, nil
114}
115
116func (m *KubeManager) JoinController(s Server) error {
117 m.l.Lock()
118 defer m.l.Unlock()
119 if m.kubeCfg == "" {
120 return fmt.Errorf("not initialized")
121 }
122 if i := m.findServerByIP(s.IP); i != nil {
123 return fmt.Errorf("already exists")
124 }
125 c, err := m.connect(&s)
126 if err != nil {
127 return err
128 }
129 defer c.Close()
130 if err := InstallTailscale(c); err != nil {
131 return err
132 }
133 const loginServer = "https://headscale.v1.dodo.cloud"
134 if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
135 return err
136 }
137 if err := InstallK3sJoinServer(c, m.serverAddr, m.serverToken); err != nil {
138 return err
139 }
140 m.controllers = append(m.controllers, s)
141 return nil
142}
143
144func (m *KubeManager) JoinWorker(s Server) error {
145 m.l.Lock()
146 defer m.l.Unlock()
147 if m.kubeCfg == "" {
148 return fmt.Errorf("not initialized")
149 }
150 if i := m.findServerByIP(s.IP); i != nil {
151 return fmt.Errorf("already exists")
152 }
153 c, err := m.connect(&s)
154 if err != nil {
155 return err
156 }
157 defer c.Close()
158 if err := InstallTailscale(c); err != nil {
159 return err
160 }
161 const loginServer = "https://headscale.v1.dodo.cloud"
162 if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
163 return err
164 }
165 if err := InstallK3sJoinAgent(c, m.serverAddr, m.serverToken); err != nil {
166 return err
167 }
168 m.workers = append(m.workers, s)
169 return nil
170}
171
172func (m *KubeManager) RemoveServer(name string) error {
173 m.l.Lock()
174 defer m.l.Unlock()
giobe9ce3f2025-04-17 08:30:28 +0400175 if len(m.controllers)+len(m.workers) > 1 {
176 client, err := kube.NewKubeClient(kube.KubeConfigOpts{
177 KubeConfig: m.kubeCfg,
178 })
179 if err != nil {
180 return err
181 }
182 helper := &drain.Helper{
183 Ctx: context.Background(),
184 Client: client,
185 Force: true,
186 GracePeriodSeconds: -1,
187 IgnoreAllDaemonSets: true,
188 Out: os.Stdout,
189 ErrOut: os.Stdout,
190 // We want to proceed even when pods are using emptyDir volumes
191 DeleteEmptyDirData: true,
192 Timeout: 10 * time.Minute,
193 }
194 node, err := client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
195 if err != nil {
196 return err
197 }
198 if err := drain.RunCordonOrUncordon(helper, node, true); err != nil {
199 return err
200 }
201 if err := drain.RunNodeDrain(helper, name); err != nil {
202 return err
203 }
204 if err := client.CoreV1().Nodes().Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil {
205 return err
206 }
giof6ad2982024-08-23 17:42:49 +0400207 }
208 for i, s := range m.controllers {
209 if s.Name == name {
210 c, err := m.connect(&s)
211 if err != nil {
212 return err
213 }
214 defer c.Close()
215 if err := UninstallK3sServer(c); err != nil {
216 return err
217 }
218 m.controllers = append(m.controllers[:i], m.controllers[i+1:]...)
219 return nil
220 }
221 }
222 for i, s := range m.workers {
223 if s.Name == name {
224 c, err := m.connect(&s)
225 if err != nil {
226 return err
227 }
228 defer c.Close()
229 if err := UninstallK3sAgent(c); err != nil {
230 return err
231 }
232 m.workers = append(m.workers[:i], m.workers[i+1:]...)
233 return nil
234 }
235 }
236 return fmt.Errorf("not found")
237}
238
239// Expects manager state to be locked by caller.
240func (m *KubeManager) findServerByIP(ip net.IP) *Server {
241 for _, s := range m.controllers {
242 if s.IP.Equal(ip) {
243 return &s
244 }
245 }
246 for _, s := range m.workers {
247 if ip.Equal(s.IP) {
248 return &s
249 }
250 }
251 return nil
252}
253
254func (m *KubeManager) connect(s *Server) (*SSHClient, error) {
255 cfg := &ssh.ClientConfig{
256 User: s.User,
257 Auth: []ssh.AuthMethod{},
258 Timeout: 10 * time.Second,
259 }
260 if s.ClientKey != "" {
261 clientKey, err := ssh.ParsePrivateKey([]byte(s.ClientKey))
262 if err != nil {
263 return nil, err
264 }
265 cfg.Auth = append(cfg.Auth, ssh.PublicKeys(clientKey))
266 }
267 if s.Password != "" {
268 cfg.Auth = append(cfg.Auth, ssh.Password(s.Password))
269 }
270 if s.HostKey != "" {
271 hostKey, err := ssh.ParsePublicKey([]byte(s.HostKey))
272 if err != nil {
273 return nil, err
274 }
275 cfg.HostKeyCallback = ssh.FixedHostKey(hostKey)
276 } else {
277 cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey()
278 }
279 client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.IP.String(), s.Port), cfg)
280 if err != nil {
281 return nil, err
282 }
283 ret := &SSHClient{client}
284 s.Name, err = GetHostname(ret)
285 if err != nil {
286 return nil, err
287 }
288 return ret, nil
289}