AppManager: Support exposing cross-cluster ports

Change-Id: I4bdb3573209935f6777656ec2f3481e79d84a9c9
diff --git a/core/installer/cluster.go b/core/installer/cluster.go
index 19220af..ac7d970 100644
--- a/core/installer/cluster.go
+++ b/core/installer/cluster.go
@@ -16,21 +16,26 @@
 
 	"github.com/giolekva/pcloud/core/installer/soft"
 
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/util/intstr"
 	"sigs.k8s.io/yaml"
 )
 
 type ClusterNetworkConfigurator interface {
 	AddCluster(name string, ingressIP net.IP) error
 	RemoveCluster(name string, ingressIP net.IP) error
-	AddProxy(src, dst string) error
-	RemoveProxy(src, dst string) error
+	AddProxy(src int, dst string, protocol Protocol) (string, error)
+	AddIngressProxy(src, dst string) error
+	RemoveProxy(src int, dst string, protocol Protocol) error
+	RemoveIngressProxy(src, dst string) error
 }
 
 type NginxProxyConfigurator struct {
 	PrivateSubdomain string
 	DNSAPIAddr       string
 	Repo             soft.RepoIO
-	NginxConfigPath  string
+	ConfigPath       string
+	ServicePath      string
 }
 
 type createARecordReq struct {
@@ -80,10 +85,47 @@
 	return nil
 }
 
-func (c *NginxProxyConfigurator) AddProxy(src, dst string) error {
+func (c *NginxProxyConfigurator) AddProxy(src int, dst string, protocol Protocol) (string, error) {
+	var namespace string
 	_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+		if err := func() error {
+			r, err := fs.Reader(c.ServicePath)
+			if err != nil {
+				return err
+			}
+			defer r.Close()
+			var buf bytes.Buffer
+			if _, err := io.Copy(&buf, r); err != nil {
+				return err
+			}
+			var svc corev1.Service
+			if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
+				return err
+			}
+			svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+				Name:       fmt.Sprintf("p%d", src),
+				Protocol:   corev1.Protocol(ProtocolToString(protocol)),
+				Port:       int32(src),
+				TargetPort: intstr.FromInt(src),
+			})
+			w, err := fs.Writer(c.ServicePath)
+			if err != nil {
+				return err
+			}
+			defer w.Close()
+			tmp, err := yaml.Marshal(svc)
+			if err != nil {
+				return err
+			}
+			if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
+				return err
+			}
+			return nil
+		}(); err != nil {
+			return "", err
+		}
 		cfg, err := func() (NginxProxyConfig, error) {
-			r, err := fs.Reader(c.NginxConfigPath)
+			r, err := fs.Reader(c.ConfigPath)
 			if err != nil {
 				return NginxProxyConfig{}, err
 			}
@@ -93,11 +135,19 @@
 		if err != nil {
 			return "", err
 		}
-		if v, ok := cfg.Proxies[src]; ok && v != dst {
-			return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
+		namespace = cfg.Namespace
+		var proxyMap map[int]string
+		switch protocol {
+		case ProtocolTCP:
+			proxyMap = cfg.TCP
+		case ProtocolUDP:
+			proxyMap = cfg.UDP
+		default:
+			return "", fmt.Errorf("invalid protocol: %v", protocol)
 		}
-		cfg.Proxies[src] = dst
-		w, err := fs.Writer(c.NginxConfigPath)
+		// TODO(gio): check for already existing mapping
+		proxyMap[src] = dst
+		w, err := fs.Writer(c.ConfigPath)
 		if err != nil {
 			return "", err
 		}
@@ -108,7 +158,7 @@
 			return "", err
 		}
 		hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
-		nginxPath := filepath.Join(filepath.Dir(c.NginxConfigPath), "ingress-nginx.yaml")
+		nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
 		nginx, err := func() (map[string]any, error) {
 			r, err := fs.Reader(nginxPath)
 			if err != nil {
@@ -149,15 +199,18 @@
 		if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
 			return "", err
 		}
-		return fmt.Sprintf("add proxy mapping: %s %s", src, dst), nil
+		return fmt.Sprintf("add proxy mapping: %d %s", src, dst), nil
 	})
-	return err
+	if err != nil {
+		return "", err
+	}
+	return namespace, nil
 }
 
-func (c *NginxProxyConfigurator) RemoveProxy(src, dst string) error {
+func (c *NginxProxyConfigurator) AddIngressProxy(src, dst string) error {
 	_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
 		cfg, err := func() (NginxProxyConfig, error) {
-			r, err := fs.Reader(c.NginxConfigPath)
+			r, err := fs.Reader(c.ConfigPath)
 			if err != nil {
 				return NginxProxyConfig{}, err
 			}
@@ -167,11 +220,11 @@
 		if err != nil {
 			return "", err
 		}
-		if v, ok := cfg.Proxies[src]; !ok || v != dst {
-			return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
+		if v, ok := cfg.Ingress[src]; ok && v != dst {
+			return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
 		}
-		delete(cfg.Proxies, src)
-		w, err := fs.Writer(c.NginxConfigPath)
+		cfg.Ingress[src] = dst
+		w, err := fs.Writer(c.ConfigPath)
 		if err != nil {
 			return "", err
 		}
@@ -182,7 +235,198 @@
 			return "", err
 		}
 		hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
-		nginxPath := filepath.Join(filepath.Dir(c.NginxConfigPath), "ingress-nginx.yaml")
+		nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
+		nginx, err := func() (map[string]any, error) {
+			r, err := fs.Reader(nginxPath)
+			if err != nil {
+				return nil, err
+			}
+			defer r.Close()
+			var buf bytes.Buffer
+			if _, err := io.Copy(&buf, r); err != nil {
+				return nil, err
+			}
+			ret := map[string]any{}
+			if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
+				return nil, err
+			}
+			return ret, nil
+		}()
+		if err != nil {
+			return "", err
+		}
+		cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
+		var annotations map[string]any
+		if a, ok := cv["podAnnotations"]; ok {
+			annotations = a.(map[string]any)
+		} else {
+			annotations = map[string]any{}
+			cv["podAnnotations"] = annotations
+		}
+		annotations["dodo.cloud/hash"] = string(hash)
+		buf, err := yaml.Marshal(nginx)
+		if err != nil {
+			return "", err
+		}
+		w, err = fs.Writer(nginxPath)
+		if err != nil {
+			return "", err
+		}
+		defer w.Close()
+		if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
+			return "", err
+		}
+		return fmt.Sprintf("add ingress proxy mapping: %s %s", src, dst), nil
+	})
+	return err
+}
+
+func (c *NginxProxyConfigurator) RemoveProxy(src int, dst string, protocol Protocol) error {
+	_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+		if err := func() error {
+			r, err := fs.Reader(c.ServicePath)
+			if err != nil {
+				return err
+			}
+			defer r.Close()
+			var buf bytes.Buffer
+			if _, err := io.Copy(&buf, r); err != nil {
+				return err
+			}
+			var svc corev1.Service
+			if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
+				return err
+			}
+			for i, p := range svc.Spec.Ports {
+				if p.Port == int32(src) {
+					svc.Spec.Ports = append(svc.Spec.Ports[:i], svc.Spec.Ports[i+1:]...)
+					break
+				}
+			}
+			w, err := fs.Writer(c.ServicePath)
+			if err != nil {
+				return err
+			}
+			defer w.Close()
+			tmp, err := yaml.Marshal(svc)
+			if err != nil {
+				return err
+			}
+			if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
+				return err
+			}
+			return nil
+		}(); err != nil {
+			return "", err
+		}
+		cfg, err := func() (NginxProxyConfig, error) {
+			r, err := fs.Reader(c.ConfigPath)
+			if err != nil {
+				return NginxProxyConfig{}, err
+			}
+			defer r.Close()
+			return ParseNginxProxyConfig(r)
+		}()
+		if err != nil {
+			return "", err
+		}
+		var proxyMap map[int]string
+		switch protocol {
+		case ProtocolTCP:
+			proxyMap = cfg.TCP
+		case ProtocolUDP:
+			proxyMap = cfg.UDP
+		default:
+			return "", fmt.Errorf("invalid protocol: %v", protocol)
+		}
+		// TODO(gio): check for already existing mapping
+		delete(proxyMap, src)
+		w, err := fs.Writer(c.ConfigPath)
+		if err != nil {
+			return "", err
+		}
+		defer w.Close()
+		h := sha256.New()
+		o := io.MultiWriter(w, h)
+		if err := cfg.Render(o); err != nil {
+			return "", err
+		}
+		hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
+		nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
+		nginx, err := func() (map[string]any, error) {
+			r, err := fs.Reader(nginxPath)
+			if err != nil {
+				return nil, err
+			}
+			defer r.Close()
+			var buf bytes.Buffer
+			if _, err := io.Copy(&buf, r); err != nil {
+				return nil, err
+			}
+			ret := map[string]any{}
+			if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
+				return nil, err
+			}
+			return ret, nil
+		}()
+		if err != nil {
+			return "", err
+		}
+		cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
+		var annotations map[string]any
+		if a, ok := cv["podAnnotations"]; ok {
+			annotations = a.(map[string]any)
+		} else {
+			annotations = map[string]any{}
+			cv["podAnnotations"] = annotations
+		}
+		annotations["dodo.cloud/hash"] = string(hash)
+		buf, err := yaml.Marshal(nginx)
+		if err != nil {
+			return "", err
+		}
+		w, err = fs.Writer(nginxPath)
+		if err != nil {
+			return "", err
+		}
+		defer w.Close()
+		if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
+			return "", err
+		}
+		return fmt.Sprintf("remove proxy mapping: %d %s", src, dst), nil
+	})
+	return err
+}
+
+func (c *NginxProxyConfigurator) RemoveIngressProxy(src, dst string) error {
+	_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+		cfg, err := func() (NginxProxyConfig, error) {
+			r, err := fs.Reader(c.ConfigPath)
+			if err != nil {
+				return NginxProxyConfig{}, err
+			}
+			defer r.Close()
+			return ParseNginxProxyConfig(r)
+		}()
+		if err != nil {
+			return "", err
+		}
+		if v, ok := cfg.Ingress[src]; !ok || v != dst {
+			return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
+		}
+		delete(cfg.Ingress, src)
+		w, err := fs.Writer(c.ConfigPath)
+		if err != nil {
+			return "", err
+		}
+		defer w.Close()
+		h := sha256.New()
+		o := io.MultiWriter(w, h)
+		if err := cfg.Render(o); err != nil {
+			return "", err
+		}
+		hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
+		nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
 		nginx, err := func() (map[string]any, error) {
 			r, err := fs.Reader(nginxPath)
 			if err != nil {
@@ -228,11 +472,40 @@
 	return err
 }
 
+type Protocol int
+
+const (
+	ProtocolTCP Protocol = iota
+	ProtocolUDP
+)
+
+func ProtocolToString(p Protocol) string {
+	if p == ProtocolTCP {
+		return "TCP"
+	} else {
+		return "UDP"
+	}
+}
+
 type NginxProxyConfig struct {
-	Port      int
-	Resolvers []net.IP
-	Proxies   map[string]string
-	PreConf   []string
+	Namespace   string
+	IngressPort int
+	Resolvers   []net.IP
+	Ingress     map[string]string
+	TCP         map[int]string
+	UDP         map[int]string
+	PreConf     []string
+}
+
+func parseProtocol(s string) (Protocol, error) {
+	switch strings.ToLower(s) {
+	case "tcp":
+		return ProtocolTCP, nil
+	case "udp":
+		return ProtocolUDP, nil
+	default:
+		return ProtocolUDP, fmt.Errorf("invalid protocol: %s", s)
+	}
 }
 
 func ParseNginxProxyConfig(r io.Reader) (NginxProxyConfig, error) {
@@ -241,13 +514,19 @@
 		return NginxProxyConfig{}, err
 	}
 	ret := NginxProxyConfig{
-		Port:      -1,
-		Resolvers: nil,
-		Proxies:   make(map[string]string),
+		IngressPort: -1,
+		Resolvers:   nil,
+		Ingress:     make(map[string]string),
+		TCP:         make(map[int]string),
+		UDP:         make(map[int]string),
 	}
 	lines := strings.Split(buf.String(), "\n")
-	insideConf := true
+	insidePreConf := true
+	insideHttp := false
 	insideMap := false
+	insideStream := false
+	streamPort := -1
+	streamPortProtocol := ProtocolTCP
 	for _, l := range lines {
 		items := strings.Fields(strings.TrimSuffix(l, ";"))
 		if len(items) == 0 {
@@ -255,19 +534,49 @@
 		}
 		if strings.Contains(l, "nginx.conf") {
 			ret.PreConf = append(ret.PreConf, l)
-			insideConf = false
-		} else if insideConf {
+			insidePreConf = false
+		} else if insidePreConf {
 			ret.PreConf = append(ret.PreConf, l)
+			items := strings.Fields(l)
+			if items[0] == "namespace:" {
+				ret.Namespace = items[1]
+			}
+		} else if items[0] == "http" {
+			insideHttp = true
+		} else if insideHttp && items[0] == "map" {
+			insideMap = true
+		} else if items[0] == "stream" {
+			insideHttp = false
+			insideMap = false
+			insideStream = true
 		} else if strings.Contains(l, "listen") {
 			if len(items) < 2 {
-				return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s\n", l)
+				return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s", l)
 			}
 			port, err := strconv.Atoi(items[1])
 			if err != nil {
 				return NginxProxyConfig{}, err
 			}
-			ret.Port = port
-		} else if strings.Contains(l, "resolver") {
+			if insideHttp {
+				if len(items) > 2 {
+					return NginxProxyConfig{}, fmt.Errorf("invalid http listen: %s", l)
+				}
+				ret.IngressPort = port
+			} else {
+				if !insideStream {
+					return NginxProxyConfig{}, fmt.Errorf("invalid state, expected to be inside stream section")
+				}
+				streamPort = port
+				if len(items) == 3 {
+					streamPortProtocol, err = parseProtocol(items[2])
+					if err != nil {
+						return NginxProxyConfig{}, err
+					}
+				} else {
+					streamPortProtocol = ProtocolTCP
+				}
+			}
+		} else if insideHttp && strings.Contains(l, "resolver") {
 			if len(items) < 2 {
 				return NginxProxyConfig{}, fmt.Errorf("invalid resolver: %s", l)
 			}
@@ -276,7 +585,7 @@
 				return NginxProxyConfig{}, fmt.Errorf("invalid resolver ip: %s", l)
 			}
 			ret.Resolvers = append(ret.Resolvers, ip)
-		} else if insideMap {
+		} else if insideHttp && insideMap {
 			if items[0] == "}" {
 				insideMap = false
 				continue
@@ -284,9 +593,19 @@
 			if len(items) < 2 {
 				return NginxProxyConfig{}, fmt.Errorf("invalid map: %s", l)
 			}
-			ret.Proxies[items[0]] = items[1]
-		} else if items[0] == "map" {
-			insideMap = true
+			ret.Ingress[items[0]] = items[1]
+		} else if insideStream && strings.Contains(l, "proxy_pass") {
+			if streamPort == -1 {
+				return NginxProxyConfig{}, fmt.Errorf("invalid state, expected server port to be defined")
+			}
+			if len(items) < 2 {
+				return NginxProxyConfig{}, fmt.Errorf("invalid proxy_pass: %s", l)
+			}
+			if streamPortProtocol == ProtocolTCP {
+				ret.TCP[streamPort] = items[1]
+			} else {
+				ret.UDP[streamPort] = items[1]
+			}
 		}
 	}
 	return ret, nil
@@ -310,12 +629,12 @@
     }
     http {
         map $http_host $backend {
-            {{- range $from, $to := .Proxies }}
+            {{- range $from, $to := .Ingress }}
             {{ $from }} {{ $to }};
             {{- end }}
         }
         server {
-            listen {{ .Port }};
+            listen {{ .IngressPort }};
             location / {
                 {{- range .Resolvers }}
                 resolver {{ . }};
@@ -323,4 +642,22 @@
                 proxy_pass http://$backend;
             }
         }
-    }`
+    }
+    {{- if or .TCP .UDP }}
+    stream {
+        {{- range $port, $upstream := .TCP }}
+        server {
+            listen {{ $port }};
+            resolver 100.100.100.100;
+            proxy_pass {{ $upstream }};
+        }
+        {{- end  }}
+        {{- range $port, $upstream := .UDP }}
+        server {
+            listen {{ $port }} udp;
+            resolver 100.100.100.100;
+            proxy_pass {{ $upstream }};
+        }
+        {{- end  }}
+    }
+    {{- end }}`