AppManager: Support exposing cross-cluster ports

Change-Id: I4bdb3573209935f6777656ec2f3481e79d84a9c9
diff --git a/core/installer/app.go b/core/installer/app.go
index 948e17e..e57fee8 100644
--- a/core/installer/app.go
+++ b/core/installer/app.go
@@ -95,6 +95,7 @@
 }
 
 type PortForward struct {
+	Cluster     string `json:"cluster,omitempty"`
 	Allocator   string `json:"allocator"`
 	ReserveAddr string `json:"reservator"`
 	RemoveAddr  string `json:"deallocator"`
diff --git a/core/installer/app_configs/app_base.cue b/core/installer/app_configs/app_base.cue
index 7698553..7330b1e 100644
--- a/core/installer/app_configs/app_base.cue
+++ b/core/installer/app_configs/app_base.cue
@@ -109,9 +109,9 @@
 	imageRegistry: string | *"docker.io"
 }
 
-// TODO(gio): Support inter-cluster porf forwarding
 #PortForward: {
 	network: #Network
+	cluster?: string | null
 	port: int
 	service: close({
 		name: string
@@ -507,15 +507,44 @@
 	_volumeClaimName: "\(name)-postgresql"
 	_name: name
 
-	openPort: [for i, e in expose {
-		network: networks[strings.ToLower(e.network)]
-		port: input["port_postgresql_\(_name)_\(i)"]
-		protocol: "TCP"
-		service: {
-			name: "postgres-\(_name)"
-			port: 5432
+	openPort: list.FlattenN([for i, e in expose {
+		if cluster == _|_ {
+			network: networks[strings.ToLower(e.network)]
+			port: input["port_postgresql_\(_name)_\(i)"]
+			protocol: "TCP"
+			service: {
+				name: "postgres-\(_name)"
+				port: 5432
+			}
 		}
-	}]
+		if cluster != _|_ {
+			[{
+				network: #Network & {
+					name: "cluster_\(cluster.name)"
+					ingressClass: "default"
+					domain: ""
+					allocatePortAddr: "http://port-allocator.\(global.id)-cluster-\(cluster.name)-network.svc.cluster.local/api/allocate"
+					reservePortAddr: "http://port-allocator.\(global.id)-cluster-\(cluster.name)-network.svc.cluster.local/api/reserve"
+					deallocatePortAddr: "http://port-allocator.\(global.id)-cluster-\(cluster.name)-network.svc.cluster.local/api/remmove"
+				}
+				port: input["port_postgresql_\(_name)_\(i)_cluster"]
+				protocol: "TCP"
+				service: {
+					name: "postgres-\(_name)"
+					port: 5432
+				}
+			}, {
+				cluster: _cluster.name
+				network: networks[strings.ToLower(e.network)]
+				port: input["port_postgresql_\(_name)_\(i)"]
+				protocol: "TCP"
+				service: {
+					name: "cluster-\(cluster).private-network-proxy.devices.\(global.privateDomain)"
+					port: input["port_postgresql_\(_name)_\(i)_cluster"]
+				}
+			}]
+		}
+	}], -1)
 
 	images: {
 		postgres: {
diff --git a/core/installer/app_configs/app_global_env.cue b/core/installer/app_configs/app_global_env.cue
index 3841f02..6a02df2 100644
--- a/core/installer/app_configs/app_global_env.cue
+++ b/core/installer/app_configs/app_global_env.cue
@@ -62,8 +62,8 @@
 	if _cluster != _|_ {
 		clusterProxy: {
 			"\(name)": {
-				from: _domain
 				_sanitizedDomain: strings.Replace(_domain, ".", "-", -1)
+				from: _domain
 				to: "\(_sanitizedDomain).\(_cluster.name).cluster.\(global.privateDomain)"
 			}
 		}
diff --git a/core/installer/app_configs/dodo_app.cue b/core/installer/app_configs/dodo_app.cue
index bf5a7f8..6f7001a 100644
--- a/core/installer/app_configs/dodo_app.cue
+++ b/core/installer/app_configs/dodo_app.cue
@@ -24,6 +24,9 @@
 	for v in _postgresql {
 		for i, e in v.expose {
 			"port_postgresql_\(v.name)_\(i)": int @role(port)
+			if input.cluster != _|_ {
+				"port_postgresql_\(v.name)_\(i)_cluster": int @role(port)
+			}
 		}
 	}
 	for v in _mongodb {
diff --git a/core/installer/app_manager.go b/core/installer/app_manager.go
index c5b849f..3285c19 100644
--- a/core/installer/app_manager.go
+++ b/core/installer/app_manager.go
@@ -182,10 +182,19 @@
 	Secret string `json:"secret"`
 }
 
-func reservePorts(ports map[string]string) (map[string]reservePortResp, error) {
+type reservePortInfo struct {
+	reserveAddr string
+	RemoteProxy bool `json:"remoteProxy"`
+}
+
+func reservePorts(ports map[string]reservePortInfo) (map[string]reservePortResp, error) {
 	ret := map[string]reservePortResp{}
-	for p, reserveAddr := range ports {
-		resp, err := http.Post(reserveAddr, "application/json", nil) // TODO(gio): address
+	for p, cfg := range ports {
+		var buf bytes.Buffer
+		if err := json.NewEncoder(&buf).Encode(cfg); err != nil {
+			return nil, err
+		}
+		resp, err := http.Post(cfg.reserveAddr, "application/json", &buf)
 		if err != nil {
 			return nil, err
 		}
@@ -205,11 +214,17 @@
 
 func openPorts(ports []PortForward, reservations map[string]reservePortResp, allocators map[string]string, ns string) error {
 	for _, p := range ports {
+		var target string
+		if p.Cluster == "" {
+			target = fmt.Sprintf("%s/%s", ns, p.Service.Name)
+		} else {
+			target = p.Service.Name
+		}
 		var buf bytes.Buffer
 		req := allocatePortReq{
 			Protocol:      p.Protocol,
 			SourcePort:    p.Port,
-			TargetService: fmt.Sprintf("%s/%s", ns, p.Service.Name),
+			TargetService: target,
 			TargetPort:    p.Service.Port,
 		}
 		allocator := ""
@@ -475,10 +490,13 @@
 	if err != nil {
 		return ReleaseResources{}, err
 	}
-	reservators := map[string]string{}
+	reservators := map[string]reservePortInfo{}
 	allocators := map[string]string{}
 	for _, pf := range rendered.Ports {
-		reservators[portFields[pf.Port]] = pf.ReserveAddr
+		reservators[portFields[pf.Port]] = reservePortInfo{
+			reserveAddr: pf.ReserveAddr,
+			RemoteProxy: pf.Cluster != "",
+		}
 		allocators[portFields[pf.Port]] = pf.Allocator
 	}
 	portReservations, err := reservePorts(reservators)
@@ -530,7 +548,7 @@
 		return ReleaseResources{}, err
 	}
 	for _, p := range rendered.ClusterProxies {
-		if err := m.cnc.AddProxy(p.From, p.To); err != nil {
+		if err := m.cnc.AddIngressProxy(p.From, p.To); err != nil {
 			return ReleaseResources{}, err
 		}
 	}
@@ -651,7 +669,7 @@
 			}
 		}
 		if !found {
-			if err := m.cnc.RemoveProxy(ocp.From, ocp.To); err != nil {
+			if err := m.cnc.RemoveIngressProxy(ocp.From, ocp.To); err != nil {
 				return ReleaseResources{}, err
 			}
 		}
@@ -665,7 +683,7 @@
 			}
 		}
 		if !found {
-			if err := m.cnc.AddProxy(ncp.From, ncp.To); err != nil {
+			if err := m.cnc.AddIngressProxy(ncp.From, ncp.To); err != nil {
 				return ReleaseResources{}, err
 			}
 		}
@@ -723,7 +741,7 @@
 		return err
 	}
 	for _, cp := range cfg.Out.ClusterProxy {
-		if err := m.cnc.RemoveProxy(cp.From, cp.To); err != nil {
+		if err := m.cnc.RemoveIngressProxy(cp.From, cp.To); err != nil {
 			return err
 		}
 	}
diff --git a/core/installer/canvas-app.cue b/core/installer/canvas-app.cue
index a8be305..6843277 100644
--- a/core/installer/canvas-app.cue
+++ b/core/installer/canvas-app.cue
@@ -24,14 +24,14 @@
   		    "enabled": false
 		}
 	}],
-	  "mongodb": [{
+	  "postgresql": [{
 		  "name": "pgg",
 		  "size": "2Gi",
 		  "expose": [{
 		    "network": "private",
-			"subdomain": "mongoo"
+			"subdomain": "pggp"
 		  }]
 	  }],
-	  "cluster": "ct"
+	  "cluster": "asdc"
   }
 }
diff --git a/core/installer/cluster.go b/core/installer/cluster.go
index 19220af..ac7d970 100644
--- a/core/installer/cluster.go
+++ b/core/installer/cluster.go
@@ -16,21 +16,26 @@
 
 	"github.com/giolekva/pcloud/core/installer/soft"
 
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/util/intstr"
 	"sigs.k8s.io/yaml"
 )
 
 type ClusterNetworkConfigurator interface {
 	AddCluster(name string, ingressIP net.IP) error
 	RemoveCluster(name string, ingressIP net.IP) error
-	AddProxy(src, dst string) error
-	RemoveProxy(src, dst string) error
+	AddProxy(src int, dst string, protocol Protocol) (string, error)
+	AddIngressProxy(src, dst string) error
+	RemoveProxy(src int, dst string, protocol Protocol) error
+	RemoveIngressProxy(src, dst string) error
 }
 
 type NginxProxyConfigurator struct {
 	PrivateSubdomain string
 	DNSAPIAddr       string
 	Repo             soft.RepoIO
-	NginxConfigPath  string
+	ConfigPath       string
+	ServicePath      string
 }
 
 type createARecordReq struct {
@@ -80,10 +85,47 @@
 	return nil
 }
 
-func (c *NginxProxyConfigurator) AddProxy(src, dst string) error {
+func (c *NginxProxyConfigurator) AddProxy(src int, dst string, protocol Protocol) (string, error) {
+	var namespace string
 	_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+		if err := func() error {
+			r, err := fs.Reader(c.ServicePath)
+			if err != nil {
+				return err
+			}
+			defer r.Close()
+			var buf bytes.Buffer
+			if _, err := io.Copy(&buf, r); err != nil {
+				return err
+			}
+			var svc corev1.Service
+			if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
+				return err
+			}
+			svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+				Name:       fmt.Sprintf("p%d", src),
+				Protocol:   corev1.Protocol(ProtocolToString(protocol)),
+				Port:       int32(src),
+				TargetPort: intstr.FromInt(src),
+			})
+			w, err := fs.Writer(c.ServicePath)
+			if err != nil {
+				return err
+			}
+			defer w.Close()
+			tmp, err := yaml.Marshal(svc)
+			if err != nil {
+				return err
+			}
+			if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
+				return err
+			}
+			return nil
+		}(); err != nil {
+			return "", err
+		}
 		cfg, err := func() (NginxProxyConfig, error) {
-			r, err := fs.Reader(c.NginxConfigPath)
+			r, err := fs.Reader(c.ConfigPath)
 			if err != nil {
 				return NginxProxyConfig{}, err
 			}
@@ -93,11 +135,19 @@
 		if err != nil {
 			return "", err
 		}
-		if v, ok := cfg.Proxies[src]; ok && v != dst {
-			return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
+		namespace = cfg.Namespace
+		var proxyMap map[int]string
+		switch protocol {
+		case ProtocolTCP:
+			proxyMap = cfg.TCP
+		case ProtocolUDP:
+			proxyMap = cfg.UDP
+		default:
+			return "", fmt.Errorf("invalid protocol: %v", protocol)
 		}
-		cfg.Proxies[src] = dst
-		w, err := fs.Writer(c.NginxConfigPath)
+		// TODO(gio): check for already existing mapping
+		proxyMap[src] = dst
+		w, err := fs.Writer(c.ConfigPath)
 		if err != nil {
 			return "", err
 		}
@@ -108,7 +158,7 @@
 			return "", err
 		}
 		hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
-		nginxPath := filepath.Join(filepath.Dir(c.NginxConfigPath), "ingress-nginx.yaml")
+		nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
 		nginx, err := func() (map[string]any, error) {
 			r, err := fs.Reader(nginxPath)
 			if err != nil {
@@ -149,15 +199,18 @@
 		if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
 			return "", err
 		}
-		return fmt.Sprintf("add proxy mapping: %s %s", src, dst), nil
+		return fmt.Sprintf("add proxy mapping: %d %s", src, dst), nil
 	})
-	return err
+	if err != nil {
+		return "", err
+	}
+	return namespace, nil
 }
 
-func (c *NginxProxyConfigurator) RemoveProxy(src, dst string) error {
+func (c *NginxProxyConfigurator) AddIngressProxy(src, dst string) error {
 	_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
 		cfg, err := func() (NginxProxyConfig, error) {
-			r, err := fs.Reader(c.NginxConfigPath)
+			r, err := fs.Reader(c.ConfigPath)
 			if err != nil {
 				return NginxProxyConfig{}, err
 			}
@@ -167,11 +220,11 @@
 		if err != nil {
 			return "", err
 		}
-		if v, ok := cfg.Proxies[src]; !ok || v != dst {
-			return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
+		if v, ok := cfg.Ingress[src]; ok && v != dst {
+			return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
 		}
-		delete(cfg.Proxies, src)
-		w, err := fs.Writer(c.NginxConfigPath)
+		cfg.Ingress[src] = dst
+		w, err := fs.Writer(c.ConfigPath)
 		if err != nil {
 			return "", err
 		}
@@ -182,7 +235,198 @@
 			return "", err
 		}
 		hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
-		nginxPath := filepath.Join(filepath.Dir(c.NginxConfigPath), "ingress-nginx.yaml")
+		nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
+		nginx, err := func() (map[string]any, error) {
+			r, err := fs.Reader(nginxPath)
+			if err != nil {
+				return nil, err
+			}
+			defer r.Close()
+			var buf bytes.Buffer
+			if _, err := io.Copy(&buf, r); err != nil {
+				return nil, err
+			}
+			ret := map[string]any{}
+			if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
+				return nil, err
+			}
+			return ret, nil
+		}()
+		if err != nil {
+			return "", err
+		}
+		cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
+		var annotations map[string]any
+		if a, ok := cv["podAnnotations"]; ok {
+			annotations = a.(map[string]any)
+		} else {
+			annotations = map[string]any{}
+			cv["podAnnotations"] = annotations
+		}
+		annotations["dodo.cloud/hash"] = string(hash)
+		buf, err := yaml.Marshal(nginx)
+		if err != nil {
+			return "", err
+		}
+		w, err = fs.Writer(nginxPath)
+		if err != nil {
+			return "", err
+		}
+		defer w.Close()
+		if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
+			return "", err
+		}
+		return fmt.Sprintf("add ingress proxy mapping: %s %s", src, dst), nil
+	})
+	return err
+}
+
+func (c *NginxProxyConfigurator) RemoveProxy(src int, dst string, protocol Protocol) error {
+	_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+		if err := func() error {
+			r, err := fs.Reader(c.ServicePath)
+			if err != nil {
+				return err
+			}
+			defer r.Close()
+			var buf bytes.Buffer
+			if _, err := io.Copy(&buf, r); err != nil {
+				return err
+			}
+			var svc corev1.Service
+			if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
+				return err
+			}
+			for i, p := range svc.Spec.Ports {
+				if p.Port == int32(src) {
+					svc.Spec.Ports = append(svc.Spec.Ports[:i], svc.Spec.Ports[i+1:]...)
+					break
+				}
+			}
+			w, err := fs.Writer(c.ServicePath)
+			if err != nil {
+				return err
+			}
+			defer w.Close()
+			tmp, err := yaml.Marshal(svc)
+			if err != nil {
+				return err
+			}
+			if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
+				return err
+			}
+			return nil
+		}(); err != nil {
+			return "", err
+		}
+		cfg, err := func() (NginxProxyConfig, error) {
+			r, err := fs.Reader(c.ConfigPath)
+			if err != nil {
+				return NginxProxyConfig{}, err
+			}
+			defer r.Close()
+			return ParseNginxProxyConfig(r)
+		}()
+		if err != nil {
+			return "", err
+		}
+		var proxyMap map[int]string
+		switch protocol {
+		case ProtocolTCP:
+			proxyMap = cfg.TCP
+		case ProtocolUDP:
+			proxyMap = cfg.UDP
+		default:
+			return "", fmt.Errorf("invalid protocol: %v", protocol)
+		}
+		// TODO(gio): check for already existing mapping
+		delete(proxyMap, src)
+		w, err := fs.Writer(c.ConfigPath)
+		if err != nil {
+			return "", err
+		}
+		defer w.Close()
+		h := sha256.New()
+		o := io.MultiWriter(w, h)
+		if err := cfg.Render(o); err != nil {
+			return "", err
+		}
+		hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
+		nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
+		nginx, err := func() (map[string]any, error) {
+			r, err := fs.Reader(nginxPath)
+			if err != nil {
+				return nil, err
+			}
+			defer r.Close()
+			var buf bytes.Buffer
+			if _, err := io.Copy(&buf, r); err != nil {
+				return nil, err
+			}
+			ret := map[string]any{}
+			if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
+				return nil, err
+			}
+			return ret, nil
+		}()
+		if err != nil {
+			return "", err
+		}
+		cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
+		var annotations map[string]any
+		if a, ok := cv["podAnnotations"]; ok {
+			annotations = a.(map[string]any)
+		} else {
+			annotations = map[string]any{}
+			cv["podAnnotations"] = annotations
+		}
+		annotations["dodo.cloud/hash"] = string(hash)
+		buf, err := yaml.Marshal(nginx)
+		if err != nil {
+			return "", err
+		}
+		w, err = fs.Writer(nginxPath)
+		if err != nil {
+			return "", err
+		}
+		defer w.Close()
+		if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
+			return "", err
+		}
+		return fmt.Sprintf("remove proxy mapping: %d %s", src, dst), nil
+	})
+	return err
+}
+
+func (c *NginxProxyConfigurator) RemoveIngressProxy(src, dst string) error {
+	_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+		cfg, err := func() (NginxProxyConfig, error) {
+			r, err := fs.Reader(c.ConfigPath)
+			if err != nil {
+				return NginxProxyConfig{}, err
+			}
+			defer r.Close()
+			return ParseNginxProxyConfig(r)
+		}()
+		if err != nil {
+			return "", err
+		}
+		if v, ok := cfg.Ingress[src]; !ok || v != dst {
+			return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
+		}
+		delete(cfg.Ingress, src)
+		w, err := fs.Writer(c.ConfigPath)
+		if err != nil {
+			return "", err
+		}
+		defer w.Close()
+		h := sha256.New()
+		o := io.MultiWriter(w, h)
+		if err := cfg.Render(o); err != nil {
+			return "", err
+		}
+		hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
+		nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
 		nginx, err := func() (map[string]any, error) {
 			r, err := fs.Reader(nginxPath)
 			if err != nil {
@@ -228,11 +472,40 @@
 	return err
 }
 
+type Protocol int
+
+const (
+	ProtocolTCP Protocol = iota
+	ProtocolUDP
+)
+
+func ProtocolToString(p Protocol) string {
+	if p == ProtocolTCP {
+		return "TCP"
+	} else {
+		return "UDP"
+	}
+}
+
 type NginxProxyConfig struct {
-	Port      int
-	Resolvers []net.IP
-	Proxies   map[string]string
-	PreConf   []string
+	Namespace   string
+	IngressPort int
+	Resolvers   []net.IP
+	Ingress     map[string]string
+	TCP         map[int]string
+	UDP         map[int]string
+	PreConf     []string
+}
+
+func parseProtocol(s string) (Protocol, error) {
+	switch strings.ToLower(s) {
+	case "tcp":
+		return ProtocolTCP, nil
+	case "udp":
+		return ProtocolUDP, nil
+	default:
+		return ProtocolUDP, fmt.Errorf("invalid protocol: %s", s)
+	}
 }
 
 func ParseNginxProxyConfig(r io.Reader) (NginxProxyConfig, error) {
@@ -241,13 +514,19 @@
 		return NginxProxyConfig{}, err
 	}
 	ret := NginxProxyConfig{
-		Port:      -1,
-		Resolvers: nil,
-		Proxies:   make(map[string]string),
+		IngressPort: -1,
+		Resolvers:   nil,
+		Ingress:     make(map[string]string),
+		TCP:         make(map[int]string),
+		UDP:         make(map[int]string),
 	}
 	lines := strings.Split(buf.String(), "\n")
-	insideConf := true
+	insidePreConf := true
+	insideHttp := false
 	insideMap := false
+	insideStream := false
+	streamPort := -1
+	streamPortProtocol := ProtocolTCP
 	for _, l := range lines {
 		items := strings.Fields(strings.TrimSuffix(l, ";"))
 		if len(items) == 0 {
@@ -255,19 +534,49 @@
 		}
 		if strings.Contains(l, "nginx.conf") {
 			ret.PreConf = append(ret.PreConf, l)
-			insideConf = false
-		} else if insideConf {
+			insidePreConf = false
+		} else if insidePreConf {
 			ret.PreConf = append(ret.PreConf, l)
+			items := strings.Fields(l)
+			if items[0] == "namespace:" {
+				ret.Namespace = items[1]
+			}
+		} else if items[0] == "http" {
+			insideHttp = true
+		} else if insideHttp && items[0] == "map" {
+			insideMap = true
+		} else if items[0] == "stream" {
+			insideHttp = false
+			insideMap = false
+			insideStream = true
 		} else if strings.Contains(l, "listen") {
 			if len(items) < 2 {
-				return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s\n", l)
+				return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s", l)
 			}
 			port, err := strconv.Atoi(items[1])
 			if err != nil {
 				return NginxProxyConfig{}, err
 			}
-			ret.Port = port
-		} else if strings.Contains(l, "resolver") {
+			if insideHttp {
+				if len(items) > 2 {
+					return NginxProxyConfig{}, fmt.Errorf("invalid http listen: %s", l)
+				}
+				ret.IngressPort = port
+			} else {
+				if !insideStream {
+					return NginxProxyConfig{}, fmt.Errorf("invalid state, expected to be inside stream section")
+				}
+				streamPort = port
+				if len(items) == 3 {
+					streamPortProtocol, err = parseProtocol(items[2])
+					if err != nil {
+						return NginxProxyConfig{}, err
+					}
+				} else {
+					streamPortProtocol = ProtocolTCP
+				}
+			}
+		} else if insideHttp && strings.Contains(l, "resolver") {
 			if len(items) < 2 {
 				return NginxProxyConfig{}, fmt.Errorf("invalid resolver: %s", l)
 			}
@@ -276,7 +585,7 @@
 				return NginxProxyConfig{}, fmt.Errorf("invalid resolver ip: %s", l)
 			}
 			ret.Resolvers = append(ret.Resolvers, ip)
-		} else if insideMap {
+		} else if insideHttp && insideMap {
 			if items[0] == "}" {
 				insideMap = false
 				continue
@@ -284,9 +593,19 @@
 			if len(items) < 2 {
 				return NginxProxyConfig{}, fmt.Errorf("invalid map: %s", l)
 			}
-			ret.Proxies[items[0]] = items[1]
-		} else if items[0] == "map" {
-			insideMap = true
+			ret.Ingress[items[0]] = items[1]
+		} else if insideStream && strings.Contains(l, "proxy_pass") {
+			if streamPort == -1 {
+				return NginxProxyConfig{}, fmt.Errorf("invalid state, expected server port to be defined")
+			}
+			if len(items) < 2 {
+				return NginxProxyConfig{}, fmt.Errorf("invalid proxy_pass: %s", l)
+			}
+			if streamPortProtocol == ProtocolTCP {
+				ret.TCP[streamPort] = items[1]
+			} else {
+				ret.UDP[streamPort] = items[1]
+			}
 		}
 	}
 	return ret, nil
@@ -310,12 +629,12 @@
     }
     http {
         map $http_host $backend {
-            {{- range $from, $to := .Proxies }}
+            {{- range $from, $to := .Ingress }}
             {{ $from }} {{ $to }};
             {{- end }}
         }
         server {
-            listen {{ .Port }};
+            listen {{ .IngressPort }};
             location / {
                 {{- range .Resolvers }}
                 resolver {{ . }};
@@ -323,4 +642,22 @@
                 proxy_pass http://$backend;
             }
         }
-    }`
+    }
+    {{- if or .TCP .UDP }}
+    stream {
+        {{- range $port, $upstream := .TCP }}
+        server {
+            listen {{ $port }};
+            resolver 100.100.100.100;
+            proxy_pass {{ $upstream }};
+        }
+        {{- end  }}
+        {{- range $port, $upstream := .UDP }}
+        server {
+            listen {{ $port }} udp;
+            resolver 100.100.100.100;
+            proxy_pass {{ $upstream }};
+        }
+        {{- end  }}
+    }
+    {{- end }}`
diff --git a/core/installer/cluster_test.go b/core/installer/cluster_test.go
index e315202..b2ecd99 100644
--- a/core/installer/cluster_test.go
+++ b/core/installer/cluster_test.go
@@ -7,7 +7,14 @@
 )
 
 func TestParseNginxProxyConfig(t *testing.T) {
-	cfg, err := ParseNginxProxyConfig(strings.NewReader(`nginx.conf: |
+	cfg, err := ParseNginxProxyConfig(strings.NewReader(`
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: proxy-backend-config
+  namespace: foo
+data:
+nginx.conf: |
 # user       www www;
 worker_processes  1;
 error_log   /dev/null   crit;
@@ -33,33 +40,66 @@
 		}
 	}
 }
+stream {
+	server {
+		listen 9091;
+		proxy_pass foo:1;
+	}
+	server {
+		listen 9092 udp;
+		proxy_pass bar:2;
+	}
+	server {
+		listen 9093;
+		proxy_pass dev:3;
+	}
+}
 `))
 	if err != nil {
 		t.Fatal(err)
 	}
-	if cfg.Port != 9090 {
-		t.Errorf("invalid port: expected 9090, got %d", cfg.Port)
+	if cfg.Namespace != "foo" {
+		t.Errorf("invalid namespace: expeced foo, got %s", cfg.Namespace)
+	}
+	if cfg.IngressPort != 9090 {
+		t.Errorf("invalid port: expected 9090, got %d", cfg.IngressPort)
 	}
 	if len(cfg.Resolvers) != 2 ||
 		!cfg.Resolvers[0].Equal(net.ParseIP("1.1.1.1")) ||
 		!cfg.Resolvers[1].Equal(net.ParseIP("2.2.2.2")) {
 		t.Errorf("invalid resolvers: expected [1.1.1.1 2.2.2.2], got %s", cfg.Resolvers)
 	}
-	if len(cfg.Proxies) != 2 ||
-		cfg.Proxies["a"] != "A" ||
-		cfg.Proxies["b"] != "B" {
-		t.Errorf("invalid proxies: expected map[a:A, b:B], got %s", cfg.Proxies)
+	if len(cfg.Ingress) != 2 ||
+		cfg.Ingress["a"] != "A" ||
+		cfg.Ingress["b"] != "B" {
+		t.Errorf("invalid ingress proxies: expected map[a:A, b:B], got %s", cfg.Ingress)
+	}
+	if len(cfg.TCP) != 2 ||
+		cfg.TCP[9091] != "foo:1" ||
+		cfg.TCP[9093] != "dev:3" {
+		t.Errorf("invalid TCP proxies: expected map[9091:foo:1, 9093:dev:3], got %v", cfg.TCP)
+	}
+	if len(cfg.UDP) != 1 ||
+		cfg.UDP[9092] != "bar:2" {
+		t.Errorf("invalid UDP proxies: expected map[9092:bar:2], got %v", cfg.UDP)
 	}
 }
 
 func TestRenderNginxProxyConfig(t *testing.T) {
 	cfg := NginxProxyConfig{
-		Port:      8080,
-		Resolvers: []net.IP{net.ParseIP("1.1.1.1"), net.ParseIP("2.2.2.2")},
-		Proxies: map[string]string{
+		IngressPort: 8080,
+		Resolvers:   []net.IP{net.ParseIP("1.1.1.1"), net.ParseIP("2.2.2.2")},
+		Ingress: map[string]string{
 			"a": "A",
 			"b": "B",
 		},
+		TCP: map[int]string{
+			1: "foo:1",
+			3: "dev:3",
+		},
+		UDP: map[int]string{
+			2: "bar:2",
+		},
 		PreConf: []string{"line1", "line2"},
 	}
 	var buf strings.Builder
diff --git a/core/installer/cmd/app_manager.go b/core/installer/cmd/app_manager.go
index afc7d32..c2547d6 100644
--- a/core/installer/cmd/app_manager.go
+++ b/core/installer/cmd/app_manager.go
@@ -3,8 +3,8 @@
 import (
 	"log"
 	"os"
-
-	"golang.org/x/crypto/ssh"
+	"path/filepath"
+	"strings"
 
 	"github.com/giolekva/pcloud/core/installer"
 	"github.com/giolekva/pcloud/core/installer/server/appmanager"
@@ -80,20 +80,14 @@
 	if err != nil {
 		return err
 	}
-	signer, err := ssh.ParsePrivateKey(sshKey)
+	items := strings.Split(appManagerFlags.repoAddr, "/")
+	ipPort := items[len(items)-2]
+	repoName := items[len(items)-1]
+	ssClient, err := soft.NewClient(ipPort, sshKey, log.Default())
 	if err != nil {
 		return err
 	}
-	addr, err := soft.ParseRepositoryAddress(appManagerFlags.repoAddr)
-	if err != nil {
-		return err
-	}
-	repo, err := soft.CloneRepository(addr, signer)
-	if err != nil {
-		return err
-	}
-	log.Println("Cloned repository")
-	repoIO, err := soft.NewRepoIO(repo, signer)
+	repoIO, err := ssClient.GetRepo(repoName)
 	if err != nil {
 		return err
 	}
@@ -112,7 +106,8 @@
 		PrivateSubdomain: "p",
 		DNSAPIAddr:       appManagerFlags.dnsAPIAddr,
 		Repo:             repoIO,
-		NginxConfigPath:  appManagerFlags.clusterProxyConfigPath,
+		ConfigPath:       appManagerFlags.clusterProxyConfigPath,
+		ServicePath:      filepath.Join(filepath.Dir(appManagerFlags.clusterProxyConfigPath), "proxy-backend-service.yaml"),
 	}
 	m, err := installer.NewAppManager(repoIO, nsc, jc, hf, vpnAPIClient, cnc, "/apps")
 	if err != nil {
@@ -145,6 +140,7 @@
 	}
 	s, err := appmanager.NewServer(
 		appManagerFlags.port,
+		ssClient,
 		repoIO,
 		m,
 		r,
diff --git a/core/installer/cmd/dodo_app.go b/core/installer/cmd/dodo_app.go
index adc8beb..9b5da30 100644
--- a/core/installer/cmd/dodo_app.go
+++ b/core/installer/cmd/dodo_app.go
@@ -245,6 +245,8 @@
 	return s.Start()
 }
 
+// TODO(gio): this should be removed, all of app installtions including dodo app
+// must be done directly by main app manager.
 type proxyConfigurator struct {
 	apiAddr string
 }
@@ -262,7 +264,7 @@
 	To   string `json:"to"`
 }
 
-func (pc *proxyConfigurator) AddProxy(src, dst string) error {
+func (pc *proxyConfigurator) AddIngressProxy(src, dst string) error {
 	var buf bytes.Buffer
 	if err := json.NewEncoder(&buf).Encode(proxyPair{src, dst}); err != nil {
 		return err
@@ -279,7 +281,17 @@
 	return nil
 }
 
-func (pc *proxyConfigurator) RemoveProxy(src, dst string) error {
+func (pc *proxyConfigurator) AddProxy(src int, dst string, protocol installer.Protocol) (string, error) {
+	// TODO(gio): implement
+	return "", fmt.Errorf("NOT IMPLEMENTED")
+}
+
+func (pc *proxyConfigurator) RemoveProxy(src int, dst string, protocol installer.Protocol) error {
+	// TODO(gio): implement
+	return fmt.Errorf("NOT IMPLEMENTED")
+}
+
+func (pc *proxyConfigurator) RemoveIngressProxy(src, dst string) error {
 	var buf bytes.Buffer
 	if err := json.NewEncoder(&buf).Encode(proxyPair{src, dst}); err != nil {
 		return err
diff --git a/core/installer/dodo_app_test.go b/core/installer/dodo_app_test.go
index 5b0f3b1..eaad590 100644
--- a/core/installer/dodo_app_test.go
+++ b/core/installer/dodo_app_test.go
@@ -1,6 +1,8 @@
 package installer
 
 import (
+	"bytes"
+	"encoding/json"
 	"testing"
 
 	"cuelang.org/go/cue/errors"
@@ -210,3 +212,61 @@
 	}
 	t.Log(string(r.Raw))
 }
+
+const exposeRemoteCluster = `
+{
+    "cluster": "remote",
+    "postgresql": [{
+		"name": "db",
+		"size": "1Gi",
+		"expose": [{
+			"network": "Private",
+			"subdomain": "pg"
+		}]
+	}],
+}
+`
+
+func TestExposeRemoteCluster(t *testing.T) {
+	var buf bytes.Buffer
+	if _, err := buf.WriteString(exposeRemoteCluster); err != nil {
+		t.Fatal(err)
+	}
+	clusters := []Cluster{{
+		Name:             "remote",
+		Kubeconfig:       "<KUBECONFIG>",
+		IngressClassName: "<INGRESS_CLASS_NAME>",
+	}}
+	if err := json.NewEncoder(&buf).Encode(struct {
+		Clusters []Cluster `json:"clusters"`
+	}{
+		clusters,
+	}); err != nil {
+		t.Fatal(err)
+	}
+	app, err := NewDodoApp(buf.Bytes())
+	if err != nil {
+		for _, e := range errors.Errors(err) {
+			t.Log(e)
+		}
+		t.Fatal(err)
+	}
+	release := Release{
+		Namespace:     "foo",
+		AppInstanceId: "foo-bar",
+		RepoAddr:      "ssh://192.168.100.210:22/config",
+		AppDir:        "/foo/bar",
+	}
+	keyGen := testKeyGen{}
+	r, err := app.Render(release, env, networks, clusters, map[string]any{
+		"managerAddr":                  "",
+		"appId":                        "",
+		"sshPrivateKey":                "",
+		"port_postgresql_db_0":         1,
+		"port_postgresql_db_0_cluster": 2,
+	}, nil, keyGen)
+	if err != nil {
+		t.Fatal(err)
+	}
+	t.Log(string(r.Raw))
+}
diff --git a/core/installer/server/appmanager/server.go b/core/installer/server/appmanager/server.go
index b3a0883..3360ff4 100644
--- a/core/installer/server/appmanager/server.go
+++ b/core/installer/server/appmanager/server.go
@@ -41,6 +41,7 @@
 type Server struct {
 	l            sync.Locker
 	port         int
+	ssClient     soft.Client
 	repo         soft.RepoIO
 	m            *installer.AppManager
 	r            installer.AppRepository
@@ -98,6 +99,7 @@
 
 func NewServer(
 	port int,
+	ssClient soft.Client,
 	repo soft.RepoIO,
 	m *installer.AppManager,
 	r installer.AppRepository,
@@ -114,6 +116,7 @@
 	return &Server{
 		l:            &sync.Mutex{},
 		port:         port,
+		ssClient:     ssClient,
 		repo:         repo,
 		m:            m,
 		r:            r,
@@ -187,7 +190,7 @@
 		return
 	}
 	appDir := filepath.Join("/dodo-app", req.Id)
-	namespace := "dodo-app-test" // TODO(gio)
+	namespace := "dodo-app-testttt" // TODO(gio)
 	if _, err := s.m.Install(app, req.Id, appDir, namespace, map[string]any{
 		"managerAddr":   "", // TODO(gio)
 		"appId":         req.Id,
@@ -238,7 +241,7 @@
 		http.Error(w, err.Error(), http.StatusBadRequest)
 		return
 	}
-	if err := s.cnc.AddProxy(req.From, req.To); err != nil {
+	if err := s.cnc.AddIngressProxy(req.From, req.To); err != nil {
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
 	}
@@ -250,7 +253,7 @@
 		http.Error(w, err.Error(), http.StatusBadRequest)
 		return
 	}
-	if err := s.cnc.RemoveProxy(req.From, req.To); err != nil {
+	if err := s.cnc.RemoveIngressProxy(req.From, req.To); err != nil {
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
 	}
@@ -891,6 +894,17 @@
 			if err != nil {
 				return installer.ReleaseResources{}, err
 			}
+			keys, err := installer.NewSSHKeyPair("port-allocator")
+			if err != nil {
+				return installer.ReleaseResources{}, err
+			}
+			user := fmt.Sprintf("%s-cluster-%s-port-allocator", env.Id, name)
+			if err := s.ssClient.AddUser(user, keys.AuthorizedKey()); err != nil {
+				return installer.ReleaseResources{}, err
+			}
+			if err := s.ssClient.AddReadWriteCollaborator("config", user); err != nil {
+				return installer.ReleaseResources{}, err
+			}
 			instanceId := fmt.Sprintf("%s-%s", app.Slug(), name)
 			appDir := fmt.Sprintf("/clusters/%s/ingress", name)
 			namespace := fmt.Sprintf("%scluster-%s-network", env.NamespacePrefix, name)
@@ -903,6 +917,7 @@
 				// TODO(gio): remove hardcoded user
 				"vpnUser":          vpnUser,
 				"vpnProxyHostname": hostname,
+				"sshPrivateKey":    string(keys.RawPrivateKey()),
 			})
 			if err != nil {
 				return installer.ReleaseResources{}, err
diff --git a/core/installer/values-tmpl/cluster-network.cue b/core/installer/values-tmpl/cluster-network.cue
index 393ac41..85551e6 100644
--- a/core/installer/values-tmpl/cluster-network.cue
+++ b/core/installer/values-tmpl/cluster-network.cue
@@ -1,5 +1,5 @@
 import (
-	// "encoding/base64"
+	"encoding/base64"
 )
 
 input: {
@@ -7,7 +7,7 @@
 	vpnUser: string
 	vpnProxyHostname: string
 	vpnAuthKey: string @role(VPNAuthKey) @usernameField(vpnUser)
-	// TODO(gio): support port allocator
+	sshPrivateKey: string
 }
 
 name: "Cluster Network"
@@ -28,12 +28,12 @@
 			tag: "v1.82.0"
 			pullPolicy: "IfNotPresent"
 		}
-		// portAllocator: {
-		// 	repository: "giolekva"
-		// 	name: "port-allocator"
-		// 	tag: "latest"
-		// 	pullPolicy: "Always"
-		// }
+		portAllocator: {
+			repository: "giolekva"
+			name: "port-allocator"
+			tag: "latest"
+			pullPolicy: "Always"
+		}
 	}
 
 	charts: {
@@ -55,12 +55,12 @@
 			branch: "main"
 			path: "charts/tailscale-proxy"
 		}
-		// portAllocator: {
-		// 	kind: "GitRepository"
-		// 	address: "https://code.v1.dodo.cloud/helm-charts"
-		// 	branch: "main"
-		// 	path: "charts/port-allocator"
-		// }
+		portAllocator: {
+			kind: "GitRepository"
+			address: "https://code.v1.dodo.cloud/helm-charts"
+			branch: "main"
+			path: "charts/port-allocator"
+		}
 	}
 
 	helm: {
@@ -119,20 +119,23 @@
 						}]
   				    }]
 				}
+				tcp: {}
+				udp: {}
 			}
 		}
-		// "port-allocator": {
-		// 	chart: charts.portAllocator
-		// 	values: {
-		// 		repoAddr: release.repoAddr
-		// 		sshPrivateKey: base64.Encode(null, input.sshPrivateKey)
-		// 		ingressNginxPath: "\(release.appDir)/resources/ingress-nginx.yaml"
-		// 		image: {
-		// 			repository: images.portAllocator.fullName
-		// 			tag: images.portAllocator.tag
-		// 			pullPolicy: images.portAllocator.pullPolicy
-		// 		}
-		// 	}
-		// }
+		"port-allocator": {
+			chart: charts.portAllocator
+			cluster: null
+			values: {
+				repoAddr: release.repoAddr
+				sshPrivateKey: base64.Encode(null, input.sshPrivateKey)
+				ingressNginxPath: "\(release.appDir)/resources/ingress-nginx.yaml"
+				image: {
+					repository: images.portAllocator.fullName
+					tag: "amd64" // TODO(gio): images.portAllocator.tag
+					pullPolicy: images.portAllocator.pullPolicy
+				}
+			}
+		}
 	}
 }