blob: ac7d970608865736bfaea6fe8bd184969eb02926 [file] [log] [blame]
giof6ad2982024-08-23 17:42:49 +04001package installer
2
3import (
4 "bytes"
5 "crypto/sha256"
6 "encoding/base64"
7 "encoding/json"
8 "fmt"
9 "io"
10 "net"
11 "net/http"
12 "path/filepath"
13 "strconv"
14 "strings"
15 "text/template"
16
17 "github.com/giolekva/pcloud/core/installer/soft"
18
gio721c0042025-04-03 11:56:36 +040019 corev1 "k8s.io/api/core/v1"
20 "k8s.io/apimachinery/pkg/util/intstr"
giof6ad2982024-08-23 17:42:49 +040021 "sigs.k8s.io/yaml"
22)
23
24type ClusterNetworkConfigurator interface {
25 AddCluster(name string, ingressIP net.IP) error
26 RemoveCluster(name string, ingressIP net.IP) error
gio721c0042025-04-03 11:56:36 +040027 AddProxy(src int, dst string, protocol Protocol) (string, error)
28 AddIngressProxy(src, dst string) error
29 RemoveProxy(src int, dst string, protocol Protocol) error
30 RemoveIngressProxy(src, dst string) error
giof6ad2982024-08-23 17:42:49 +040031}
32
33type NginxProxyConfigurator struct {
34 PrivateSubdomain string
35 DNSAPIAddr string
36 Repo soft.RepoIO
gio721c0042025-04-03 11:56:36 +040037 ConfigPath string
38 ServicePath string
giof6ad2982024-08-23 17:42:49 +040039}
40
41type createARecordReq struct {
42 Entry string `json:"entry"`
43 IP net.IP `json:"text"`
44}
45
46func (c *NginxProxyConfigurator) AddCluster(name string, ingressIP net.IP) error {
47 req := createARecordReq{
48 Entry: fmt.Sprintf("*.%s.cluster.%s", name, c.PrivateSubdomain),
49 IP: ingressIP,
50 }
51 var buf bytes.Buffer
52 if err := json.NewEncoder(&buf).Encode(req); err != nil {
53 return err
54 }
55 resp, err := http.Post(fmt.Sprintf("%s/create-a-record", c.DNSAPIAddr), "application/json", &buf)
56 if err != nil {
57 return err
58 }
59 if resp.StatusCode != http.StatusOK {
60 var buf bytes.Buffer
61 io.Copy(&buf, resp.Body)
62 return fmt.Errorf(buf.String())
63 }
64 return nil
65}
66
67func (c *NginxProxyConfigurator) RemoveCluster(name string, ingressIP net.IP) error {
68 req := createARecordReq{
69 Entry: fmt.Sprintf("*.%s.cluster.%s", name, c.PrivateSubdomain),
70 IP: ingressIP,
71 }
72 var buf bytes.Buffer
73 if err := json.NewEncoder(&buf).Encode(req); err != nil {
74 return err
75 }
76 resp, err := http.Post(fmt.Sprintf("%s/delete-a-record", c.DNSAPIAddr), "application/json", &buf)
77 if err != nil {
78 return err
79 }
80 if resp.StatusCode != http.StatusOK {
81 var buf bytes.Buffer
82 io.Copy(&buf, resp.Body)
83 return fmt.Errorf(buf.String())
84 }
85 return nil
86}
87
gio721c0042025-04-03 11:56:36 +040088func (c *NginxProxyConfigurator) AddProxy(src int, dst string, protocol Protocol) (string, error) {
89 var namespace string
giof6ad2982024-08-23 17:42:49 +040090 _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
gio721c0042025-04-03 11:56:36 +040091 if err := func() error {
92 r, err := fs.Reader(c.ServicePath)
93 if err != nil {
94 return err
95 }
96 defer r.Close()
97 var buf bytes.Buffer
98 if _, err := io.Copy(&buf, r); err != nil {
99 return err
100 }
101 var svc corev1.Service
102 if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
103 return err
104 }
105 svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
106 Name: fmt.Sprintf("p%d", src),
107 Protocol: corev1.Protocol(ProtocolToString(protocol)),
108 Port: int32(src),
109 TargetPort: intstr.FromInt(src),
110 })
111 w, err := fs.Writer(c.ServicePath)
112 if err != nil {
113 return err
114 }
115 defer w.Close()
116 tmp, err := yaml.Marshal(svc)
117 if err != nil {
118 return err
119 }
120 if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
121 return err
122 }
123 return nil
124 }(); err != nil {
125 return "", err
126 }
giof6ad2982024-08-23 17:42:49 +0400127 cfg, err := func() (NginxProxyConfig, error) {
gio721c0042025-04-03 11:56:36 +0400128 r, err := fs.Reader(c.ConfigPath)
giof6ad2982024-08-23 17:42:49 +0400129 if err != nil {
130 return NginxProxyConfig{}, err
131 }
132 defer r.Close()
133 return ParseNginxProxyConfig(r)
134 }()
135 if err != nil {
136 return "", err
137 }
gio721c0042025-04-03 11:56:36 +0400138 namespace = cfg.Namespace
139 var proxyMap map[int]string
140 switch protocol {
141 case ProtocolTCP:
142 proxyMap = cfg.TCP
143 case ProtocolUDP:
144 proxyMap = cfg.UDP
145 default:
146 return "", fmt.Errorf("invalid protocol: %v", protocol)
giof6ad2982024-08-23 17:42:49 +0400147 }
gio721c0042025-04-03 11:56:36 +0400148 // TODO(gio): check for already existing mapping
149 proxyMap[src] = dst
150 w, err := fs.Writer(c.ConfigPath)
giof6ad2982024-08-23 17:42:49 +0400151 if err != nil {
152 return "", err
153 }
154 defer w.Close()
155 h := sha256.New()
156 o := io.MultiWriter(w, h)
157 if err := cfg.Render(o); err != nil {
158 return "", err
159 }
160 hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
gio721c0042025-04-03 11:56:36 +0400161 nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
giof6ad2982024-08-23 17:42:49 +0400162 nginx, err := func() (map[string]any, error) {
163 r, err := fs.Reader(nginxPath)
164 if err != nil {
165 return nil, err
166 }
167 defer r.Close()
168 var buf bytes.Buffer
169 if _, err := io.Copy(&buf, r); err != nil {
170 return nil, err
171 }
172 ret := map[string]any{}
173 if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
174 return nil, err
175 }
176 return ret, nil
177 }()
178 if err != nil {
179 return "", err
180 }
181 cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
182 var annotations map[string]any
183 if a, ok := cv["podAnnotations"]; ok {
184 annotations = a.(map[string]any)
185 } else {
186 annotations = map[string]any{}
187 cv["podAnnotations"] = annotations
188 }
189 annotations["dodo.cloud/hash"] = string(hash)
190 buf, err := yaml.Marshal(nginx)
191 if err != nil {
192 return "", err
193 }
194 w, err = fs.Writer(nginxPath)
195 if err != nil {
196 return "", err
197 }
198 defer w.Close()
199 if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
200 return "", err
201 }
gio721c0042025-04-03 11:56:36 +0400202 return fmt.Sprintf("add proxy mapping: %d %s", src, dst), nil
giof6ad2982024-08-23 17:42:49 +0400203 })
gio721c0042025-04-03 11:56:36 +0400204 if err != nil {
205 return "", err
206 }
207 return namespace, nil
giof6ad2982024-08-23 17:42:49 +0400208}
209
gio721c0042025-04-03 11:56:36 +0400210func (c *NginxProxyConfigurator) AddIngressProxy(src, dst string) error {
giof6ad2982024-08-23 17:42:49 +0400211 _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
212 cfg, err := func() (NginxProxyConfig, error) {
gio721c0042025-04-03 11:56:36 +0400213 r, err := fs.Reader(c.ConfigPath)
giof6ad2982024-08-23 17:42:49 +0400214 if err != nil {
215 return NginxProxyConfig{}, err
216 }
217 defer r.Close()
218 return ParseNginxProxyConfig(r)
219 }()
220 if err != nil {
221 return "", err
222 }
gio721c0042025-04-03 11:56:36 +0400223 if v, ok := cfg.Ingress[src]; ok && v != dst {
224 return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
giof6ad2982024-08-23 17:42:49 +0400225 }
gio721c0042025-04-03 11:56:36 +0400226 cfg.Ingress[src] = dst
227 w, err := fs.Writer(c.ConfigPath)
giof6ad2982024-08-23 17:42:49 +0400228 if err != nil {
229 return "", err
230 }
231 defer w.Close()
232 h := sha256.New()
233 o := io.MultiWriter(w, h)
234 if err := cfg.Render(o); err != nil {
235 return "", err
236 }
237 hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
gio721c0042025-04-03 11:56:36 +0400238 nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
239 nginx, err := func() (map[string]any, error) {
240 r, err := fs.Reader(nginxPath)
241 if err != nil {
242 return nil, err
243 }
244 defer r.Close()
245 var buf bytes.Buffer
246 if _, err := io.Copy(&buf, r); err != nil {
247 return nil, err
248 }
249 ret := map[string]any{}
250 if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
251 return nil, err
252 }
253 return ret, nil
254 }()
255 if err != nil {
256 return "", err
257 }
258 cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
259 var annotations map[string]any
260 if a, ok := cv["podAnnotations"]; ok {
261 annotations = a.(map[string]any)
262 } else {
263 annotations = map[string]any{}
264 cv["podAnnotations"] = annotations
265 }
266 annotations["dodo.cloud/hash"] = string(hash)
267 buf, err := yaml.Marshal(nginx)
268 if err != nil {
269 return "", err
270 }
271 w, err = fs.Writer(nginxPath)
272 if err != nil {
273 return "", err
274 }
275 defer w.Close()
276 if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
277 return "", err
278 }
279 return fmt.Sprintf("add ingress proxy mapping: %s %s", src, dst), nil
280 })
281 return err
282}
283
284func (c *NginxProxyConfigurator) RemoveProxy(src int, dst string, protocol Protocol) error {
285 _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
286 if err := func() error {
287 r, err := fs.Reader(c.ServicePath)
288 if err != nil {
289 return err
290 }
291 defer r.Close()
292 var buf bytes.Buffer
293 if _, err := io.Copy(&buf, r); err != nil {
294 return err
295 }
296 var svc corev1.Service
297 if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
298 return err
299 }
300 for i, p := range svc.Spec.Ports {
301 if p.Port == int32(src) {
302 svc.Spec.Ports = append(svc.Spec.Ports[:i], svc.Spec.Ports[i+1:]...)
303 break
304 }
305 }
306 w, err := fs.Writer(c.ServicePath)
307 if err != nil {
308 return err
309 }
310 defer w.Close()
311 tmp, err := yaml.Marshal(svc)
312 if err != nil {
313 return err
314 }
315 if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
316 return err
317 }
318 return nil
319 }(); err != nil {
320 return "", err
321 }
322 cfg, err := func() (NginxProxyConfig, error) {
323 r, err := fs.Reader(c.ConfigPath)
324 if err != nil {
325 return NginxProxyConfig{}, err
326 }
327 defer r.Close()
328 return ParseNginxProxyConfig(r)
329 }()
330 if err != nil {
331 return "", err
332 }
333 var proxyMap map[int]string
334 switch protocol {
335 case ProtocolTCP:
336 proxyMap = cfg.TCP
337 case ProtocolUDP:
338 proxyMap = cfg.UDP
339 default:
340 return "", fmt.Errorf("invalid protocol: %v", protocol)
341 }
342 // TODO(gio): check for already existing mapping
343 delete(proxyMap, src)
344 w, err := fs.Writer(c.ConfigPath)
345 if err != nil {
346 return "", err
347 }
348 defer w.Close()
349 h := sha256.New()
350 o := io.MultiWriter(w, h)
351 if err := cfg.Render(o); err != nil {
352 return "", err
353 }
354 hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
355 nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
356 nginx, err := func() (map[string]any, error) {
357 r, err := fs.Reader(nginxPath)
358 if err != nil {
359 return nil, err
360 }
361 defer r.Close()
362 var buf bytes.Buffer
363 if _, err := io.Copy(&buf, r); err != nil {
364 return nil, err
365 }
366 ret := map[string]any{}
367 if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
368 return nil, err
369 }
370 return ret, nil
371 }()
372 if err != nil {
373 return "", err
374 }
375 cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
376 var annotations map[string]any
377 if a, ok := cv["podAnnotations"]; ok {
378 annotations = a.(map[string]any)
379 } else {
380 annotations = map[string]any{}
381 cv["podAnnotations"] = annotations
382 }
383 annotations["dodo.cloud/hash"] = string(hash)
384 buf, err := yaml.Marshal(nginx)
385 if err != nil {
386 return "", err
387 }
388 w, err = fs.Writer(nginxPath)
389 if err != nil {
390 return "", err
391 }
392 defer w.Close()
393 if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
394 return "", err
395 }
396 return fmt.Sprintf("remove proxy mapping: %d %s", src, dst), nil
397 })
398 return err
399}
400
401func (c *NginxProxyConfigurator) RemoveIngressProxy(src, dst string) error {
402 _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
403 cfg, err := func() (NginxProxyConfig, error) {
404 r, err := fs.Reader(c.ConfigPath)
405 if err != nil {
406 return NginxProxyConfig{}, err
407 }
408 defer r.Close()
409 return ParseNginxProxyConfig(r)
410 }()
411 if err != nil {
412 return "", err
413 }
414 if v, ok := cfg.Ingress[src]; !ok || v != dst {
415 return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
416 }
417 delete(cfg.Ingress, src)
418 w, err := fs.Writer(c.ConfigPath)
419 if err != nil {
420 return "", err
421 }
422 defer w.Close()
423 h := sha256.New()
424 o := io.MultiWriter(w, h)
425 if err := cfg.Render(o); err != nil {
426 return "", err
427 }
428 hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
429 nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
giof6ad2982024-08-23 17:42:49 +0400430 nginx, err := func() (map[string]any, error) {
431 r, err := fs.Reader(nginxPath)
432 if err != nil {
433 return nil, err
434 }
435 defer r.Close()
436 var buf bytes.Buffer
437 if _, err := io.Copy(&buf, r); err != nil {
438 return nil, err
439 }
440 ret := map[string]any{}
441 if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
442 return nil, err
443 }
444 return ret, nil
445 }()
446 if err != nil {
447 return "", err
448 }
449 cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
450 var annotations map[string]any
451 if a, ok := cv["podAnnotations"]; ok {
452 annotations = a.(map[string]any)
453 } else {
454 annotations = map[string]any{}
455 cv["podAnnotations"] = annotations
456 }
457 annotations["dodo.cloud/hash"] = string(hash)
458 buf, err := yaml.Marshal(nginx)
459 if err != nil {
460 return "", err
461 }
462 w, err = fs.Writer(nginxPath)
463 if err != nil {
464 return "", err
465 }
466 defer w.Close()
467 if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
468 return "", err
469 }
470 return fmt.Sprintf("remove proxy mapping: %s %s", src, dst), nil
471 })
472 return err
473}
474
gio721c0042025-04-03 11:56:36 +0400475type Protocol int
476
477const (
478 ProtocolTCP Protocol = iota
479 ProtocolUDP
480)
481
482func ProtocolToString(p Protocol) string {
483 if p == ProtocolTCP {
484 return "TCP"
485 } else {
486 return "UDP"
487 }
488}
489
giof6ad2982024-08-23 17:42:49 +0400490type NginxProxyConfig struct {
gio721c0042025-04-03 11:56:36 +0400491 Namespace string
492 IngressPort int
493 Resolvers []net.IP
494 Ingress map[string]string
495 TCP map[int]string
496 UDP map[int]string
497 PreConf []string
498}
499
500func parseProtocol(s string) (Protocol, error) {
501 switch strings.ToLower(s) {
502 case "tcp":
503 return ProtocolTCP, nil
504 case "udp":
505 return ProtocolUDP, nil
506 default:
507 return ProtocolUDP, fmt.Errorf("invalid protocol: %s", s)
508 }
giof6ad2982024-08-23 17:42:49 +0400509}
510
511func ParseNginxProxyConfig(r io.Reader) (NginxProxyConfig, error) {
512 var buf strings.Builder
513 if _, err := io.Copy(&buf, r); err != nil {
514 return NginxProxyConfig{}, err
515 }
516 ret := NginxProxyConfig{
gio721c0042025-04-03 11:56:36 +0400517 IngressPort: -1,
518 Resolvers: nil,
519 Ingress: make(map[string]string),
520 TCP: make(map[int]string),
521 UDP: make(map[int]string),
giof6ad2982024-08-23 17:42:49 +0400522 }
523 lines := strings.Split(buf.String(), "\n")
gio721c0042025-04-03 11:56:36 +0400524 insidePreConf := true
525 insideHttp := false
giof6ad2982024-08-23 17:42:49 +0400526 insideMap := false
gio721c0042025-04-03 11:56:36 +0400527 insideStream := false
528 streamPort := -1
529 streamPortProtocol := ProtocolTCP
giof6ad2982024-08-23 17:42:49 +0400530 for _, l := range lines {
531 items := strings.Fields(strings.TrimSuffix(l, ";"))
532 if len(items) == 0 {
533 continue
534 }
535 if strings.Contains(l, "nginx.conf") {
536 ret.PreConf = append(ret.PreConf, l)
gio721c0042025-04-03 11:56:36 +0400537 insidePreConf = false
538 } else if insidePreConf {
giof6ad2982024-08-23 17:42:49 +0400539 ret.PreConf = append(ret.PreConf, l)
gio721c0042025-04-03 11:56:36 +0400540 items := strings.Fields(l)
541 if items[0] == "namespace:" {
542 ret.Namespace = items[1]
543 }
544 } else if items[0] == "http" {
545 insideHttp = true
546 } else if insideHttp && items[0] == "map" {
547 insideMap = true
548 } else if items[0] == "stream" {
549 insideHttp = false
550 insideMap = false
551 insideStream = true
giof6ad2982024-08-23 17:42:49 +0400552 } else if strings.Contains(l, "listen") {
553 if len(items) < 2 {
gio721c0042025-04-03 11:56:36 +0400554 return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s", l)
giof6ad2982024-08-23 17:42:49 +0400555 }
556 port, err := strconv.Atoi(items[1])
557 if err != nil {
558 return NginxProxyConfig{}, err
559 }
gio721c0042025-04-03 11:56:36 +0400560 if insideHttp {
561 if len(items) > 2 {
562 return NginxProxyConfig{}, fmt.Errorf("invalid http listen: %s", l)
563 }
564 ret.IngressPort = port
565 } else {
566 if !insideStream {
567 return NginxProxyConfig{}, fmt.Errorf("invalid state, expected to be inside stream section")
568 }
569 streamPort = port
570 if len(items) == 3 {
571 streamPortProtocol, err = parseProtocol(items[2])
572 if err != nil {
573 return NginxProxyConfig{}, err
574 }
575 } else {
576 streamPortProtocol = ProtocolTCP
577 }
578 }
579 } else if insideHttp && strings.Contains(l, "resolver") {
giof6ad2982024-08-23 17:42:49 +0400580 if len(items) < 2 {
581 return NginxProxyConfig{}, fmt.Errorf("invalid resolver: %s", l)
582 }
583 ip := net.ParseIP(items[1])
584 if ip == nil {
585 return NginxProxyConfig{}, fmt.Errorf("invalid resolver ip: %s", l)
586 }
587 ret.Resolvers = append(ret.Resolvers, ip)
gio721c0042025-04-03 11:56:36 +0400588 } else if insideHttp && insideMap {
giof6ad2982024-08-23 17:42:49 +0400589 if items[0] == "}" {
590 insideMap = false
591 continue
592 }
593 if len(items) < 2 {
594 return NginxProxyConfig{}, fmt.Errorf("invalid map: %s", l)
595 }
gio721c0042025-04-03 11:56:36 +0400596 ret.Ingress[items[0]] = items[1]
597 } else if insideStream && strings.Contains(l, "proxy_pass") {
598 if streamPort == -1 {
599 return NginxProxyConfig{}, fmt.Errorf("invalid state, expected server port to be defined")
600 }
601 if len(items) < 2 {
602 return NginxProxyConfig{}, fmt.Errorf("invalid proxy_pass: %s", l)
603 }
604 if streamPortProtocol == ProtocolTCP {
605 ret.TCP[streamPort] = items[1]
606 } else {
607 ret.UDP[streamPort] = items[1]
608 }
giof6ad2982024-08-23 17:42:49 +0400609 }
610 }
611 return ret, nil
612}
613
614func (c NginxProxyConfig) Render(w io.Writer) error {
615 for _, l := range c.PreConf {
616 fmt.Fprintln(w, l)
617 }
618 tmpl, err := template.New("nginx.conf").Parse(nginxConfigTmpl)
619 if err != nil {
620 return err
621 }
622 return tmpl.Execute(w, c)
623}
624
625const nginxConfigTmpl = ` worker_processes 1;
626 worker_rlimit_nofile 8192;
627 events {
628 worker_connections 1024;
629 }
630 http {
631 map $http_host $backend {
gio721c0042025-04-03 11:56:36 +0400632 {{- range $from, $to := .Ingress }}
giof6ad2982024-08-23 17:42:49 +0400633 {{ $from }} {{ $to }};
634 {{- end }}
635 }
636 server {
gio721c0042025-04-03 11:56:36 +0400637 listen {{ .IngressPort }};
giof6ad2982024-08-23 17:42:49 +0400638 location / {
639 {{- range .Resolvers }}
640 resolver {{ . }};
641 {{- end }}
642 proxy_pass http://$backend;
643 }
644 }
gio721c0042025-04-03 11:56:36 +0400645 }
646 {{- if or .TCP .UDP }}
647 stream {
648 {{- range $port, $upstream := .TCP }}
649 server {
650 listen {{ $port }};
651 resolver 100.100.100.100;
652 proxy_pass {{ $upstream }};
653 }
654 {{- end }}
655 {{- range $port, $upstream := .UDP }}
656 server {
657 listen {{ $port }} udp;
658 resolver 100.100.100.100;
659 proxy_pass {{ $upstream }};
660 }
661 {{- end }}
662 }
663 {{- end }}`