blob: a51435f2340e0f0ca2ecd02bbbb46a02830a4b0f [file] [log] [blame]
giof6ad2982024-08-23 17:42:49 +04001package installer
2
3import (
4 "bytes"
giof6ad2982024-08-23 17:42:49 +04005 "encoding/json"
6 "fmt"
7 "io"
8 "net"
9 "net/http"
10 "path/filepath"
11 "strconv"
12 "strings"
13 "text/template"
14
15 "github.com/giolekva/pcloud/core/installer/soft"
16
gio721c0042025-04-03 11:56:36 +040017 corev1 "k8s.io/api/core/v1"
18 "k8s.io/apimachinery/pkg/util/intstr"
giof6ad2982024-08-23 17:42:49 +040019 "sigs.k8s.io/yaml"
20)
21
22type ClusterNetworkConfigurator interface {
23 AddCluster(name string, ingressIP net.IP) error
24 RemoveCluster(name string, ingressIP net.IP) error
gio721c0042025-04-03 11:56:36 +040025 AddProxy(src int, dst string, protocol Protocol) (string, error)
26 AddIngressProxy(src, dst string) error
27 RemoveProxy(src int, dst string, protocol Protocol) error
28 RemoveIngressProxy(src, dst string) error
giof6ad2982024-08-23 17:42:49 +040029}
30
31type NginxProxyConfigurator struct {
32 PrivateSubdomain string
33 DNSAPIAddr string
34 Repo soft.RepoIO
gio721c0042025-04-03 11:56:36 +040035 ConfigPath string
36 ServicePath string
giof6ad2982024-08-23 17:42:49 +040037}
38
39type createARecordReq struct {
40 Entry string `json:"entry"`
41 IP net.IP `json:"text"`
42}
43
44func (c *NginxProxyConfigurator) AddCluster(name string, ingressIP net.IP) error {
45 req := createARecordReq{
46 Entry: fmt.Sprintf("*.%s.cluster.%s", name, c.PrivateSubdomain),
47 IP: ingressIP,
48 }
49 var buf bytes.Buffer
50 if err := json.NewEncoder(&buf).Encode(req); err != nil {
51 return err
52 }
53 resp, err := http.Post(fmt.Sprintf("%s/create-a-record", c.DNSAPIAddr), "application/json", &buf)
54 if err != nil {
55 return err
56 }
57 if resp.StatusCode != http.StatusOK {
58 var buf bytes.Buffer
59 io.Copy(&buf, resp.Body)
60 return fmt.Errorf(buf.String())
61 }
62 return nil
63}
64
65func (c *NginxProxyConfigurator) RemoveCluster(name string, ingressIP net.IP) error {
66 req := createARecordReq{
67 Entry: fmt.Sprintf("*.%s.cluster.%s", name, c.PrivateSubdomain),
68 IP: ingressIP,
69 }
70 var buf bytes.Buffer
71 if err := json.NewEncoder(&buf).Encode(req); err != nil {
72 return err
73 }
74 resp, err := http.Post(fmt.Sprintf("%s/delete-a-record", c.DNSAPIAddr), "application/json", &buf)
75 if err != nil {
76 return err
77 }
78 if resp.StatusCode != http.StatusOK {
79 var buf bytes.Buffer
80 io.Copy(&buf, resp.Body)
81 return fmt.Errorf(buf.String())
82 }
83 return nil
84}
85
gio721c0042025-04-03 11:56:36 +040086func (c *NginxProxyConfigurator) AddProxy(src int, dst string, protocol Protocol) (string, error) {
87 var namespace string
giof6ad2982024-08-23 17:42:49 +040088 _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
gio721c0042025-04-03 11:56:36 +040089 if err := func() error {
90 r, err := fs.Reader(c.ServicePath)
91 if err != nil {
92 return err
93 }
94 defer r.Close()
95 var buf bytes.Buffer
96 if _, err := io.Copy(&buf, r); err != nil {
97 return err
98 }
99 var svc corev1.Service
100 if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
101 return err
102 }
103 svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
104 Name: fmt.Sprintf("p%d", src),
105 Protocol: corev1.Protocol(ProtocolToString(protocol)),
106 Port: int32(src),
107 TargetPort: intstr.FromInt(src),
108 })
109 w, err := fs.Writer(c.ServicePath)
110 if err != nil {
111 return err
112 }
113 defer w.Close()
114 tmp, err := yaml.Marshal(svc)
115 if err != nil {
116 return err
117 }
118 if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
119 return err
120 }
121 return nil
122 }(); err != nil {
123 return "", err
124 }
giof6ad2982024-08-23 17:42:49 +0400125 cfg, err := func() (NginxProxyConfig, error) {
gio721c0042025-04-03 11:56:36 +0400126 r, err := fs.Reader(c.ConfigPath)
giof6ad2982024-08-23 17:42:49 +0400127 if err != nil {
128 return NginxProxyConfig{}, err
129 }
130 defer r.Close()
131 return ParseNginxProxyConfig(r)
132 }()
133 if err != nil {
134 return "", err
135 }
gio721c0042025-04-03 11:56:36 +0400136 namespace = cfg.Namespace
137 var proxyMap map[int]string
138 switch protocol {
139 case ProtocolTCP:
140 proxyMap = cfg.TCP
141 case ProtocolUDP:
142 proxyMap = cfg.UDP
143 default:
144 return "", fmt.Errorf("invalid protocol: %v", protocol)
giof6ad2982024-08-23 17:42:49 +0400145 }
gio721c0042025-04-03 11:56:36 +0400146 // TODO(gio): check for already existing mapping
147 proxyMap[src] = dst
148 w, err := fs.Writer(c.ConfigPath)
giof6ad2982024-08-23 17:42:49 +0400149 if err != nil {
150 return "", err
151 }
152 defer w.Close()
giof55ab362025-04-11 17:48:17 +0400153 if err := cfg.Render(w); err != nil {
giof6ad2982024-08-23 17:42:49 +0400154 return "", err
155 }
gio721c0042025-04-03 11:56:36 +0400156 nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
giof6ad2982024-08-23 17:42:49 +0400157 nginx, err := func() (map[string]any, error) {
158 r, err := fs.Reader(nginxPath)
159 if err != nil {
160 return nil, err
161 }
162 defer r.Close()
163 var buf bytes.Buffer
164 if _, err := io.Copy(&buf, r); err != nil {
165 return nil, err
166 }
167 ret := map[string]any{}
168 if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
169 return nil, err
170 }
171 return ret, nil
172 }()
173 if err != nil {
174 return "", err
175 }
giof6ad2982024-08-23 17:42:49 +0400176 buf, err := yaml.Marshal(nginx)
177 if err != nil {
178 return "", err
179 }
180 w, err = fs.Writer(nginxPath)
181 if err != nil {
182 return "", err
183 }
184 defer w.Close()
185 if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
186 return "", err
187 }
gio721c0042025-04-03 11:56:36 +0400188 return fmt.Sprintf("add proxy mapping: %d %s", src, dst), nil
giof6ad2982024-08-23 17:42:49 +0400189 })
gio721c0042025-04-03 11:56:36 +0400190 if err != nil {
191 return "", err
192 }
193 return namespace, nil
giof6ad2982024-08-23 17:42:49 +0400194}
195
gio721c0042025-04-03 11:56:36 +0400196func (c *NginxProxyConfigurator) AddIngressProxy(src, dst string) error {
giof6ad2982024-08-23 17:42:49 +0400197 _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
198 cfg, err := func() (NginxProxyConfig, error) {
gio721c0042025-04-03 11:56:36 +0400199 r, err := fs.Reader(c.ConfigPath)
giof6ad2982024-08-23 17:42:49 +0400200 if err != nil {
201 return NginxProxyConfig{}, err
202 }
203 defer r.Close()
204 return ParseNginxProxyConfig(r)
205 }()
206 if err != nil {
207 return "", err
208 }
gio721c0042025-04-03 11:56:36 +0400209 if v, ok := cfg.Ingress[src]; ok && v != dst {
210 return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
giof6ad2982024-08-23 17:42:49 +0400211 }
gio721c0042025-04-03 11:56:36 +0400212 cfg.Ingress[src] = dst
213 w, err := fs.Writer(c.ConfigPath)
giof6ad2982024-08-23 17:42:49 +0400214 if err != nil {
215 return "", err
216 }
217 defer w.Close()
giof55ab362025-04-11 17:48:17 +0400218 if err := cfg.Render(w); err != nil {
giof6ad2982024-08-23 17:42:49 +0400219 return "", err
220 }
gio721c0042025-04-03 11:56:36 +0400221 nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
222 nginx, err := func() (map[string]any, error) {
223 r, err := fs.Reader(nginxPath)
224 if err != nil {
225 return nil, err
226 }
227 defer r.Close()
228 var buf bytes.Buffer
229 if _, err := io.Copy(&buf, r); err != nil {
230 return nil, err
231 }
232 ret := map[string]any{}
233 if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
234 return nil, err
235 }
236 return ret, nil
237 }()
238 if err != nil {
239 return "", err
240 }
gio721c0042025-04-03 11:56:36 +0400241 buf, err := yaml.Marshal(nginx)
242 if err != nil {
243 return "", err
244 }
245 w, err = fs.Writer(nginxPath)
246 if err != nil {
247 return "", err
248 }
249 defer w.Close()
250 if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
251 return "", err
252 }
253 return fmt.Sprintf("add ingress proxy mapping: %s %s", src, dst), nil
254 })
255 return err
256}
257
258func (c *NginxProxyConfigurator) RemoveProxy(src int, dst string, protocol Protocol) error {
259 _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
260 if err := func() error {
261 r, err := fs.Reader(c.ServicePath)
262 if err != nil {
263 return err
264 }
265 defer r.Close()
266 var buf bytes.Buffer
267 if _, err := io.Copy(&buf, r); err != nil {
268 return err
269 }
270 var svc corev1.Service
271 if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
272 return err
273 }
274 for i, p := range svc.Spec.Ports {
275 if p.Port == int32(src) {
276 svc.Spec.Ports = append(svc.Spec.Ports[:i], svc.Spec.Ports[i+1:]...)
277 break
278 }
279 }
280 w, err := fs.Writer(c.ServicePath)
281 if err != nil {
282 return err
283 }
284 defer w.Close()
285 tmp, err := yaml.Marshal(svc)
286 if err != nil {
287 return err
288 }
289 if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
290 return err
291 }
292 return nil
293 }(); err != nil {
294 return "", err
295 }
296 cfg, err := func() (NginxProxyConfig, error) {
297 r, err := fs.Reader(c.ConfigPath)
298 if err != nil {
299 return NginxProxyConfig{}, err
300 }
301 defer r.Close()
302 return ParseNginxProxyConfig(r)
303 }()
304 if err != nil {
305 return "", err
306 }
307 var proxyMap map[int]string
308 switch protocol {
309 case ProtocolTCP:
310 proxyMap = cfg.TCP
311 case ProtocolUDP:
312 proxyMap = cfg.UDP
313 default:
314 return "", fmt.Errorf("invalid protocol: %v", protocol)
315 }
316 // TODO(gio): check for already existing mapping
317 delete(proxyMap, src)
318 w, err := fs.Writer(c.ConfigPath)
319 if err != nil {
320 return "", err
321 }
322 defer w.Close()
giof55ab362025-04-11 17:48:17 +0400323 if err := cfg.Render(w); err != nil {
gio721c0042025-04-03 11:56:36 +0400324 return "", err
325 }
gio721c0042025-04-03 11:56:36 +0400326 nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
327 nginx, err := func() (map[string]any, error) {
328 r, err := fs.Reader(nginxPath)
329 if err != nil {
330 return nil, err
331 }
332 defer r.Close()
333 var buf bytes.Buffer
334 if _, err := io.Copy(&buf, r); err != nil {
335 return nil, err
336 }
337 ret := map[string]any{}
338 if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
339 return nil, err
340 }
341 return ret, nil
342 }()
343 if err != nil {
344 return "", err
345 }
gio721c0042025-04-03 11:56:36 +0400346 buf, err := yaml.Marshal(nginx)
347 if err != nil {
348 return "", err
349 }
350 w, err = fs.Writer(nginxPath)
351 if err != nil {
352 return "", err
353 }
354 defer w.Close()
355 if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
356 return "", err
357 }
358 return fmt.Sprintf("remove proxy mapping: %d %s", src, dst), nil
359 })
360 return err
361}
362
363func (c *NginxProxyConfigurator) RemoveIngressProxy(src, dst string) error {
364 _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
365 cfg, err := func() (NginxProxyConfig, error) {
366 r, err := fs.Reader(c.ConfigPath)
367 if err != nil {
368 return NginxProxyConfig{}, err
369 }
370 defer r.Close()
371 return ParseNginxProxyConfig(r)
372 }()
373 if err != nil {
374 return "", err
375 }
376 if v, ok := cfg.Ingress[src]; !ok || v != dst {
377 return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
378 }
379 delete(cfg.Ingress, src)
380 w, err := fs.Writer(c.ConfigPath)
381 if err != nil {
382 return "", err
383 }
384 defer w.Close()
giof55ab362025-04-11 17:48:17 +0400385 if err := cfg.Render(w); err != nil {
gio721c0042025-04-03 11:56:36 +0400386 return "", err
387 }
gio721c0042025-04-03 11:56:36 +0400388 nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
giof6ad2982024-08-23 17:42:49 +0400389 nginx, err := func() (map[string]any, error) {
390 r, err := fs.Reader(nginxPath)
391 if err != nil {
392 return nil, err
393 }
394 defer r.Close()
395 var buf bytes.Buffer
396 if _, err := io.Copy(&buf, r); err != nil {
397 return nil, err
398 }
399 ret := map[string]any{}
400 if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
401 return nil, err
402 }
403 return ret, nil
404 }()
405 if err != nil {
406 return "", err
407 }
giof6ad2982024-08-23 17:42:49 +0400408 buf, err := yaml.Marshal(nginx)
409 if err != nil {
410 return "", err
411 }
412 w, err = fs.Writer(nginxPath)
413 if err != nil {
414 return "", err
415 }
416 defer w.Close()
417 if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
418 return "", err
419 }
420 return fmt.Sprintf("remove proxy mapping: %s %s", src, dst), nil
421 })
422 return err
423}
424
gio721c0042025-04-03 11:56:36 +0400425type Protocol int
426
427const (
428 ProtocolTCP Protocol = iota
429 ProtocolUDP
430)
431
432func ProtocolToString(p Protocol) string {
433 if p == ProtocolTCP {
434 return "TCP"
435 } else {
436 return "UDP"
437 }
438}
439
giof6ad2982024-08-23 17:42:49 +0400440type NginxProxyConfig struct {
gio721c0042025-04-03 11:56:36 +0400441 Namespace string
giof55ab362025-04-11 17:48:17 +0400442 PID string
gio721c0042025-04-03 11:56:36 +0400443 IngressPort int
444 Resolvers []net.IP
445 Ingress map[string]string
446 TCP map[int]string
447 UDP map[int]string
448 PreConf []string
449}
450
451func parseProtocol(s string) (Protocol, error) {
452 switch strings.ToLower(s) {
453 case "tcp":
454 return ProtocolTCP, nil
455 case "udp":
456 return ProtocolUDP, nil
457 default:
458 return ProtocolUDP, fmt.Errorf("invalid protocol: %s", s)
459 }
giof6ad2982024-08-23 17:42:49 +0400460}
461
462func ParseNginxProxyConfig(r io.Reader) (NginxProxyConfig, error) {
463 var buf strings.Builder
464 if _, err := io.Copy(&buf, r); err != nil {
465 return NginxProxyConfig{}, err
466 }
467 ret := NginxProxyConfig{
gio721c0042025-04-03 11:56:36 +0400468 IngressPort: -1,
469 Resolvers: nil,
470 Ingress: make(map[string]string),
471 TCP: make(map[int]string),
472 UDP: make(map[int]string),
giof6ad2982024-08-23 17:42:49 +0400473 }
474 lines := strings.Split(buf.String(), "\n")
gio721c0042025-04-03 11:56:36 +0400475 insidePreConf := true
476 insideHttp := false
giof6ad2982024-08-23 17:42:49 +0400477 insideMap := false
gio721c0042025-04-03 11:56:36 +0400478 insideStream := false
479 streamPort := -1
480 streamPortProtocol := ProtocolTCP
giof6ad2982024-08-23 17:42:49 +0400481 for _, l := range lines {
482 items := strings.Fields(strings.TrimSuffix(l, ";"))
483 if len(items) == 0 {
484 continue
485 }
486 if strings.Contains(l, "nginx.conf") {
487 ret.PreConf = append(ret.PreConf, l)
gio721c0042025-04-03 11:56:36 +0400488 insidePreConf = false
489 } else if insidePreConf {
giof6ad2982024-08-23 17:42:49 +0400490 ret.PreConf = append(ret.PreConf, l)
gio721c0042025-04-03 11:56:36 +0400491 items := strings.Fields(l)
492 if items[0] == "namespace:" {
493 ret.Namespace = items[1]
494 }
giof55ab362025-04-11 17:48:17 +0400495 } else if items[0] == "pid" {
496
497 ret.PID = items[1]
gio721c0042025-04-03 11:56:36 +0400498 } else if items[0] == "http" {
499 insideHttp = true
500 } else if insideHttp && items[0] == "map" {
501 insideMap = true
502 } else if items[0] == "stream" {
503 insideHttp = false
504 insideMap = false
505 insideStream = true
giof6ad2982024-08-23 17:42:49 +0400506 } else if strings.Contains(l, "listen") {
507 if len(items) < 2 {
gio721c0042025-04-03 11:56:36 +0400508 return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s", l)
giof6ad2982024-08-23 17:42:49 +0400509 }
510 port, err := strconv.Atoi(items[1])
511 if err != nil {
512 return NginxProxyConfig{}, err
513 }
gio721c0042025-04-03 11:56:36 +0400514 if insideHttp {
515 if len(items) > 2 {
516 return NginxProxyConfig{}, fmt.Errorf("invalid http listen: %s", l)
517 }
518 ret.IngressPort = port
519 } else {
520 if !insideStream {
521 return NginxProxyConfig{}, fmt.Errorf("invalid state, expected to be inside stream section")
522 }
523 streamPort = port
524 if len(items) == 3 {
525 streamPortProtocol, err = parseProtocol(items[2])
526 if err != nil {
527 return NginxProxyConfig{}, err
528 }
529 } else {
530 streamPortProtocol = ProtocolTCP
531 }
532 }
533 } else if insideHttp && strings.Contains(l, "resolver") {
giof6ad2982024-08-23 17:42:49 +0400534 if len(items) < 2 {
535 return NginxProxyConfig{}, fmt.Errorf("invalid resolver: %s", l)
536 }
537 ip := net.ParseIP(items[1])
538 if ip == nil {
539 return NginxProxyConfig{}, fmt.Errorf("invalid resolver ip: %s", l)
540 }
541 ret.Resolvers = append(ret.Resolvers, ip)
gio721c0042025-04-03 11:56:36 +0400542 } else if insideHttp && insideMap {
giof6ad2982024-08-23 17:42:49 +0400543 if items[0] == "}" {
544 insideMap = false
545 continue
546 }
547 if len(items) < 2 {
548 return NginxProxyConfig{}, fmt.Errorf("invalid map: %s", l)
549 }
gio721c0042025-04-03 11:56:36 +0400550 ret.Ingress[items[0]] = items[1]
551 } else if insideStream && strings.Contains(l, "proxy_pass") {
552 if streamPort == -1 {
553 return NginxProxyConfig{}, fmt.Errorf("invalid state, expected server port to be defined")
554 }
555 if len(items) < 2 {
556 return NginxProxyConfig{}, fmt.Errorf("invalid proxy_pass: %s", l)
557 }
558 if streamPortProtocol == ProtocolTCP {
559 ret.TCP[streamPort] = items[1]
560 } else {
561 ret.UDP[streamPort] = items[1]
562 }
giof6ad2982024-08-23 17:42:49 +0400563 }
564 }
565 return ret, nil
566}
567
568func (c NginxProxyConfig) Render(w io.Writer) error {
569 for _, l := range c.PreConf {
570 fmt.Fprintln(w, l)
571 }
572 tmpl, err := template.New("nginx.conf").Parse(nginxConfigTmpl)
573 if err != nil {
574 return err
575 }
576 return tmpl.Execute(w, c)
577}
578
579const nginxConfigTmpl = ` worker_processes 1;
580 worker_rlimit_nofile 8192;
giof55ab362025-04-11 17:48:17 +0400581 {{- if .PID }}
582 pid {{ .PID }};
583 {{- end }}
giof6ad2982024-08-23 17:42:49 +0400584 events {
585 worker_connections 1024;
586 }
587 http {
588 map $http_host $backend {
gio721c0042025-04-03 11:56:36 +0400589 {{- range $from, $to := .Ingress }}
giof6ad2982024-08-23 17:42:49 +0400590 {{ $from }} {{ $to }};
591 {{- end }}
592 }
593 server {
gio721c0042025-04-03 11:56:36 +0400594 listen {{ .IngressPort }};
giof6ad2982024-08-23 17:42:49 +0400595 location / {
596 {{- range .Resolvers }}
597 resolver {{ . }};
598 {{- end }}
599 proxy_pass http://$backend;
600 }
601 }
gio721c0042025-04-03 11:56:36 +0400602 }
603 {{- if or .TCP .UDP }}
604 stream {
605 {{- range $port, $upstream := .TCP }}
606 server {
607 listen {{ $port }};
608 resolver 100.100.100.100;
609 proxy_pass {{ $upstream }};
610 }
611 {{- end }}
612 {{- range $port, $upstream := .UDP }}
613 server {
614 listen {{ $port }} udp;
gio721c0042025-04-03 11:56:36 +0400615 proxy_pass {{ $upstream }};
616 }
617 {{- end }}
618 }
619 {{- end }}`