blob: ee123fa15e74283cfb9e74885f0bc62d4f551ecc [file] [log] [blame]
giof6ad2982024-08-23 17:42:49 +04001package cluster
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/crypto/ssh"
7 "net"
8 "os"
9 "sync"
10 "time"
11
12 "github.com/giolekva/pcloud/core/installer/kube"
13
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 "k8s.io/kubectl/pkg/drain"
16)
17
18type KubeManager struct {
19 l sync.Locker
20 name string
21 ingressClassName string
22 ingressIP net.IP
23 kubeCfg string
24 serverAddr string
25 serverToken string
26 controllers []Server
27 workers []Server
28}
29
30func NewKubeManager() *KubeManager {
31 return &KubeManager{l: &sync.Mutex{}}
32}
33
34func RestoreKubeManager(st State) (*KubeManager, error) {
35 return &KubeManager{
36 l: &sync.Mutex{},
37 name: st.Name,
38 ingressClassName: st.IngressClassName,
39 ingressIP: st.IngressIP,
40 kubeCfg: st.Kubeconfig,
41 serverAddr: st.ServerAddr,
42 serverToken: st.ServerToken,
43 controllers: st.Controllers,
44 workers: st.Workers,
45 }, nil
46}
47
48func (m *KubeManager) State() State {
49 m.l.Lock()
50 defer m.l.Unlock()
51 return State{
52 m.name,
53 m.ingressClassName,
54 m.ingressIP,
55 m.serverAddr,
56 m.serverToken,
57 m.kubeCfg,
58 m.controllers,
59 m.workers,
60 }
61}
62
63func (m *KubeManager) Init(s Server, setupFn ClusterSetupFunc) (net.IP, error) {
64 m.l.Lock()
65 defer m.l.Unlock()
66 if m.kubeCfg != "" {
67 return nil, fmt.Errorf("already initialized")
68 }
69 c, err := m.connect(&s)
70 if err != nil {
71 return nil, err
72 }
73 defer c.Close()
74 if err := InstallTailscale(c); err != nil {
75 return nil, err
76 }
77 const loginServer = "https://headscale.v1.dodo.cloud"
78 if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
79 return nil, err
80 }
81 if err := InstallK3s(c); err != nil {
82 return nil, err
83 }
84 kubeCfg, err := GetKubeconfig(c)
85 if err != nil {
86 return nil, err
87 }
88 m.kubeCfg = kubeCfg
89 serverIP, err := GetTailscaleIP(c)
90 if err != nil {
91 return nil, err
92 }
93 m.serverAddr = fmt.Sprintf("%s:6443", serverIP)
94 serverToken, err := GetServerToken(c)
95 if err != nil {
96 return nil, err
97 }
98 m.serverToken = serverToken
99 m.controllers = []Server{s}
100 m.ingressClassName = "default"
101 ingressIP, err := setupFn(m.name, m.kubeCfg, m.ingressClassName)
102 if err != nil {
103 return nil, err
104 }
105 m.ingressIP = ingressIP
106 return ingressIP, nil
107}
108
109func (m *KubeManager) JoinController(s Server) error {
110 m.l.Lock()
111 defer m.l.Unlock()
112 if m.kubeCfg == "" {
113 return fmt.Errorf("not initialized")
114 }
115 if i := m.findServerByIP(s.IP); i != nil {
116 return fmt.Errorf("already exists")
117 }
118 c, err := m.connect(&s)
119 if err != nil {
120 return err
121 }
122 defer c.Close()
123 if err := InstallTailscale(c); err != nil {
124 return err
125 }
126 const loginServer = "https://headscale.v1.dodo.cloud"
127 if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
128 return err
129 }
130 if err := InstallK3sJoinServer(c, m.serverAddr, m.serverToken); err != nil {
131 return err
132 }
133 m.controllers = append(m.controllers, s)
134 return nil
135}
136
137func (m *KubeManager) JoinWorker(s Server) error {
138 m.l.Lock()
139 defer m.l.Unlock()
140 if m.kubeCfg == "" {
141 return fmt.Errorf("not initialized")
142 }
143 if i := m.findServerByIP(s.IP); i != nil {
144 return fmt.Errorf("already exists")
145 }
146 c, err := m.connect(&s)
147 if err != nil {
148 return err
149 }
150 defer c.Close()
151 if err := InstallTailscale(c); err != nil {
152 return err
153 }
154 const loginServer = "https://headscale.v1.dodo.cloud"
155 if err := TailscaleUp(c, loginServer, s.Name, s.AuthKey); err != nil {
156 return err
157 }
158 if err := InstallK3sJoinAgent(c, m.serverAddr, m.serverToken); err != nil {
159 return err
160 }
161 m.workers = append(m.workers, s)
162 return nil
163}
164
165func (m *KubeManager) RemoveServer(name string) error {
166 m.l.Lock()
167 defer m.l.Unlock()
168 client, err := kube.NewKubeClient(kube.KubeConfigOpts{
169 KubeConfig: m.kubeCfg,
170 })
171 if err != nil {
172 return err
173 }
174 helper := &drain.Helper{
175 Ctx: context.Background(),
176 Client: client,
177 Force: true,
178 GracePeriodSeconds: -1,
179 IgnoreAllDaemonSets: true,
180 Out: os.Stdout,
181 ErrOut: os.Stdout,
182 // We want to proceed even when pods are using emptyDir volumes
183 DeleteEmptyDirData: true,
184 Timeout: 10 * time.Minute,
185 }
186 node, err := client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
187 if err != nil {
188 return err
189 }
190 if err := drain.RunCordonOrUncordon(helper, node, true); err != nil {
191 return err
192 }
193 if err := drain.RunNodeDrain(helper, name); err != nil {
194 return err
195 }
196 if err := client.CoreV1().Nodes().Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil {
197 return err
198 }
199 for i, s := range m.controllers {
200 if s.Name == name {
201 c, err := m.connect(&s)
202 if err != nil {
203 return err
204 }
205 defer c.Close()
206 if err := UninstallK3sServer(c); err != nil {
207 return err
208 }
209 m.controllers = append(m.controllers[:i], m.controllers[i+1:]...)
210 return nil
211 }
212 }
213 for i, s := range m.workers {
214 if s.Name == name {
215 c, err := m.connect(&s)
216 if err != nil {
217 return err
218 }
219 defer c.Close()
220 if err := UninstallK3sAgent(c); err != nil {
221 return err
222 }
223 m.workers = append(m.workers[:i], m.workers[i+1:]...)
224 return nil
225 }
226 }
227 return fmt.Errorf("not found")
228}
229
230// Expects manager state to be locked by caller.
231func (m *KubeManager) findServerByIP(ip net.IP) *Server {
232 for _, s := range m.controllers {
233 if s.IP.Equal(ip) {
234 return &s
235 }
236 }
237 for _, s := range m.workers {
238 if ip.Equal(s.IP) {
239 return &s
240 }
241 }
242 return nil
243}
244
245func (m *KubeManager) connect(s *Server) (*SSHClient, error) {
246 cfg := &ssh.ClientConfig{
247 User: s.User,
248 Auth: []ssh.AuthMethod{},
249 Timeout: 10 * time.Second,
250 }
251 if s.ClientKey != "" {
252 clientKey, err := ssh.ParsePrivateKey([]byte(s.ClientKey))
253 if err != nil {
254 return nil, err
255 }
256 cfg.Auth = append(cfg.Auth, ssh.PublicKeys(clientKey))
257 }
258 if s.Password != "" {
259 cfg.Auth = append(cfg.Auth, ssh.Password(s.Password))
260 }
261 if s.HostKey != "" {
262 hostKey, err := ssh.ParsePublicKey([]byte(s.HostKey))
263 if err != nil {
264 return nil, err
265 }
266 cfg.HostKeyCallback = ssh.FixedHostKey(hostKey)
267 } else {
268 cfg.HostKeyCallback = ssh.InsecureIgnoreHostKey()
269 }
270 client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.IP.String(), s.Port), cfg)
271 if err != nil {
272 return nil, err
273 }
274 ret := &SSHClient{client}
275 s.Name, err = GetHostname(ret)
276 if err != nil {
277 return nil, err
278 }
279 return ret, nil
280}