AppManager: Support exposing cross-cluster ports

Change-Id: I4bdb3573209935f6777656ec2f3481e79d84a9c9
diff --git a/core/port-allocator/main.go b/core/port-allocator/main.go
index 6bc89e4..8b3ab80 100644
--- a/core/port-allocator/main.go
+++ b/core/port-allocator/main.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"bytes"
 	"encoding/base64"
 	"encoding/json"
 	"errors"
@@ -12,11 +13,13 @@
 	"math/rand"
 	"net/http"
 	"os"
+	"path/filepath"
 	"strconv"
 	"strings"
 	"sync"
 	"time"
 
+	"github.com/giolekva/pcloud/core/installer"
 	"github.com/giolekva/pcloud/core/installer/soft"
 
 	"golang.org/x/crypto/ssh"
@@ -36,25 +39,42 @@
 var preOpenPortsBatchSize = flag.Int("pre-open-ports-batch-size", 10, "Number of new ports to open at a time")
 
 type client interface {
-	ReservePort() (int, string, error)
+	ReservePort(remoteProxy bool) (int, string, error)
 	ReleaseReservedPort(port ...int)
 	AddPortForwarding(protocol string, port int, secret, dest string) error
 	RemovePortForwarding(protocol string, port int) error
 }
 
+type Reservation struct {
+	Secret        string `json:"secret"`
+	IsRemoteProxy bool   `json:"isRemoteProxy"`
+}
+
 type repoClient struct {
 	l                     sync.Locker
 	repo                  soft.RepoIO
 	path                  string
 	secretGenerator       SecretGenerator
+	proxyCfg              *installer.NginxProxyConfigurator
 	minPreOpenPorts       int
 	preOpenPortsBatchSize int
 	preOpenPorts          []int
+	proxyPreOpenPorts     []int
 	blocklist             map[int]struct{}
-	reserve               map[int]string
+	reserve               map[int]Reservation
 	availablePorts        []int
 }
 
+func getProxyBackendConfigPath(repo soft.RepoIO, path string) (string, error) {
+	cfgPath := filepath.Join(filepath.Dir(path), "proxy-backend-config.yaml")
+	inp, err := repo.Reader(cfgPath)
+	if err != nil {
+		return "", nil
+	}
+	defer inp.Close()
+	return cfgPath, nil
+}
+
 func newRepoClient(
 	repo soft.RepoIO,
 	path string,
@@ -62,16 +82,29 @@
 	preOpenPortsBatchSize int,
 	secretGenerator SecretGenerator,
 ) (client, error) {
+	proxyCfg, err := getProxyBackendConfigPath(repo, path)
+	if err != nil {
+		return nil, err
+	}
+	var cnc *installer.NginxProxyConfigurator
+	if proxyCfg != "" {
+		cnc = &installer.NginxProxyConfigurator{
+			Repo:        repo,
+			ConfigPath:  proxyCfg,
+			ServicePath: filepath.Join(filepath.Dir(proxyCfg), "proxy-backend-service.yaml"),
+		}
+	}
 	ret := &repoClient{
 		l:                     &sync.Mutex{},
 		repo:                  repo,
 		path:                  path,
 		secretGenerator:       secretGenerator,
+		proxyCfg:              cnc,
 		minPreOpenPorts:       minPreOpenPorts,
 		preOpenPortsBatchSize: preOpenPortsBatchSize,
 		preOpenPorts:          []int{},
 		blocklist:             map[int]struct{}{},
-		reserve:               map[int]string{},
+		reserve:               map[int]Reservation{},
 		availablePorts:        []int{},
 	}
 	st, err := ret.readState(repo)
@@ -103,19 +136,31 @@
 	return ret, nil
 }
 
-func (c *repoClient) ReservePort() (int, string, error) {
+func (c *repoClient) ReservePort(remoteProxy bool) (int, string, error) {
 	c.l.Lock()
 	defer c.l.Unlock()
-	if len(c.preOpenPorts) == 0 {
-		return -1, "", fmt.Errorf("no pre-open ports are available")
+	var port int
+	if !remoteProxy {
+		if len(c.preOpenPorts) == 0 {
+			return -1, "", fmt.Errorf("no pre-open ports are available")
+		}
+		port = c.preOpenPorts[0]
+		c.preOpenPorts = c.preOpenPorts[1:]
+	} else {
+		if c.proxyCfg == nil {
+			return -1, "", fmt.Errorf("does not support TCP/UDP proxy")
+		}
+		if len(c.proxyPreOpenPorts) == 0 {
+			return -1, "", fmt.Errorf("no proxy pre-open ports are available")
+		}
+		port = c.proxyPreOpenPorts[0]
+		c.proxyPreOpenPorts = c.proxyPreOpenPorts[1:]
 	}
-	port := c.preOpenPorts[0]
-	c.preOpenPorts = c.preOpenPorts[1:]
 	secret, err := c.secretGenerator()
 	if err != nil {
 		return -1, "", err
 	}
-	c.reserve[port] = secret
+	c.reserve[port] = Reservation{secret, remoteProxy}
 	return port, secret, nil
 }
 
@@ -127,8 +172,16 @@
 	defer c.l.Unlock()
 	if _, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
 		for _, p := range port {
+			r, ok := c.reserve[p]
+			if !ok {
+				continue
+			}
 			delete(c.reserve, p)
-			c.preOpenPorts = append(c.preOpenPorts, p)
+			if r.IsRemoteProxy {
+				c.proxyPreOpenPorts = append(c.proxyPreOpenPorts, p)
+			} else {
+				c.preOpenPorts = append(c.preOpenPorts, p)
+			}
 		}
 		if err := c.writeState(fs); err != nil {
 			return "", err
@@ -139,30 +192,56 @@
 	}
 }
 
+type oldState struct {
+	PreOpenPorts      []int            `json:"preOpenPorts"`
+	ProxyPreOpenPorts []int            `json:"proxyPreOpenPorts"`
+	Blocklist         map[int]struct{} `json:"blocklist"`
+	Reserve           map[int]string   `json:"reserve"`
+}
+
 type state struct {
-	PreOpenPorts []int            `json:"preOpenPorts"`
-	Blocklist    map[int]struct{} `json:"blocklist"`
-	Reserve      map[int]string   `json:"reserve"`
+	PreOpenPorts      []int               `json:"preOpenPorts"`
+	ProxyPreOpenPorts []int               `json:"proxyPreOpenPorts"`
+	Blocklist         map[int]struct{}    `json:"blocklist"`
+	Reserve           map[int]Reservation `json:"reserve"`
 }
 
 func (c *repoClient) preOpenNewPorts() error {
 	c.l.Lock()
 	defer c.l.Unlock()
-	if len(c.preOpenPorts) >= c.minPreOpenPorts {
+	var ports []int
+	if len(c.preOpenPorts) < c.minPreOpenPorts {
+		for count := c.preOpenPortsBatchSize; count > 0; count-- {
+			if len(c.availablePorts) == 0 {
+				return fmt.Errorf("could not open new port")
+			}
+			r := rand.Intn(len(c.availablePorts))
+			p := c.availablePorts[r]
+			c.availablePorts[r] = c.availablePorts[len(c.availablePorts)-1]
+			c.availablePorts = c.availablePorts[:len(c.availablePorts)-1]
+			ports = append(ports, p)
+			c.preOpenPorts = append(c.preOpenPorts, p)
+			c.blocklist[p] = struct{}{}
+		}
 		return nil
 	}
-	var ports []int
-	for count := c.preOpenPortsBatchSize; count > 0; count-- {
-		if len(c.availablePorts) == 0 {
-			return fmt.Errorf("could not open new port")
+	if c.proxyCfg != nil && len(c.proxyPreOpenPorts) < c.minPreOpenPorts {
+		for count := c.preOpenPortsBatchSize; count > 0; count-- {
+			if len(c.availablePorts) == 0 {
+				return fmt.Errorf("could not open new port")
+			}
+			r := rand.Intn(len(c.availablePorts))
+			p := c.availablePorts[r]
+			c.availablePorts[r] = c.availablePorts[len(c.availablePorts)-1]
+			c.availablePorts = c.availablePorts[:len(c.availablePorts)-1]
+			ports = append(ports, p)
+			c.proxyPreOpenPorts = append(c.proxyPreOpenPorts, p)
+			c.blocklist[p] = struct{}{}
 		}
-		r := rand.Intn(len(c.availablePorts))
-		p := c.availablePorts[r]
-		c.availablePorts[r] = c.availablePorts[len(c.availablePorts)-1]
-		c.availablePorts = c.availablePorts[:len(c.availablePorts)-1]
-		ports = append(ports, p)
-		c.preOpenPorts = append(c.preOpenPorts, p)
-		c.blocklist[p] = struct{}{}
+		return nil
+	}
+	if len(ports) == 0 {
+		return nil
 	}
 	_, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
 		if err := c.writeState(fs); err != nil {
@@ -172,10 +251,17 @@
 		if err != nil {
 			return "", err
 		}
-		svcType, err := extractString(rel, "spec.values.controller.service.type")
+		svcType := ""
+		svcEnabled, err := extractBool(rel, "spec.values.controller.service.enabled")
 		if err != nil {
 			return "", err
 		}
+		if svcEnabled {
+			svcType, err = extractString(rel, "spec.values.controller.service.type")
+			if err != nil {
+				return "", err
+			}
+		}
 		if svcType == "NodePort" {
 			tcp, err := extractPorts(rel, "spec.values.controller.service.nodePorts.tcp")
 			if err != nil {
@@ -213,10 +299,31 @@
 	}()
 	c.l.Lock()
 	defer c.l.Unlock()
-	if sec, ok := c.reserve[port]; !ok || sec != secret {
+	r, ok := c.reserve[port]
+	if !ok || r.Secret != secret {
 		return fmt.Errorf("wrong secret")
 	}
 	delete(c.reserve, port)
+	if r.IsRemoteProxy {
+		if c.proxyCfg == nil {
+			return fmt.Errorf("does not support TCP/UDP proxy")
+		}
+		var namespace string
+		var err error
+		switch strings.ToLower(protocol) {
+		case "tcp":
+			if namespace, err = c.proxyCfg.AddProxy(port, dest, installer.ProtocolTCP); err != nil {
+				return err
+			}
+		case "udp":
+			if namespace, err = c.proxyCfg.AddProxy(port, dest, installer.ProtocolUDP); err != nil {
+				return err
+			}
+		default:
+			return fmt.Errorf("unknown protocol: %s", protocol)
+		}
+		dest = fmt.Sprintf("%s/proxy-backend-service:%d", namespace, port)
+	}
 	_, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
 		if err := c.writeState(fs); err != nil {
 			return "", err
@@ -279,10 +386,17 @@
 		default:
 			panic("MUST NOT REACH")
 		}
-		svcType, err := extractString(rel, "spec.values.controller.service.type")
+		svcType := ""
+		svcEnabled, err := extractBool(rel, "spec.values.controller.service.enabled")
 		if err != nil {
 			return "", err
 		}
+		if svcEnabled {
+			svcType, err = extractString(rel, "spec.values.controller.service.type")
+			if err != nil {
+				return "", err
+			}
+		}
 		if svcType == "NodePort" {
 			svcTCP, err := extractPorts(rel, "spec.values.controller.service.nodePorts.tcp")
 			if err != nil {
@@ -313,10 +427,27 @@
 		return state{}, err
 	}
 	defer r.Close()
-	var ret state
-	if err := json.NewDecoder(r).Decode(&ret); err != nil {
+	buf, err := io.ReadAll(r)
+	if err != nil {
 		return state{}, err
 	}
+	var ret state
+	if err := json.NewDecoder(bytes.NewReader(buf)).Decode(&ret); err == nil {
+		return ret, nil
+	}
+	var old oldState
+	if err := json.NewDecoder(bytes.NewReader(buf)).Decode(&old); err != nil {
+		return state{}, err
+	}
+	ret = state{
+		PreOpenPorts:      old.PreOpenPorts,
+		ProxyPreOpenPorts: []int{},
+		Blocklist:         old.Blocklist,
+		Reserve:           map[int]Reservation{},
+	}
+	for port, secret := range old.Reserve {
+		ret.Reserve[port] = Reservation{secret, false}
+	}
 	return ret, err
 }
 
@@ -326,7 +457,7 @@
 		return err
 	}
 	defer w.Close()
-	if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist, c.reserve}); err != nil {
+	if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.proxyPreOpenPorts, c.blocklist, c.reserve}); err != nil {
 		return err
 	}
 	return err
@@ -422,7 +553,7 @@
 	for _, i := range strings.Split(path, ".") {
 		valM, ok := val.(map[string]any)
 		if !ok {
-			return nil, fmt.Errorf("expected map")
+			return nil, fmt.Errorf("expected map, %s", i)
 		}
 		val, ok = valM[i]
 		if !ok {
@@ -451,7 +582,19 @@
 	}
 	retS, ok := ret.(string)
 	if !ok {
-		return "", fmt.Errorf("expected map")
+		return "", fmt.Errorf("expected string")
+	}
+	return retS, nil
+}
+
+func extractBool(data map[string]any, path string) (bool, error) {
+	ret, err := extractField(data, path)
+	if err != nil {
+		return false, err
+	}
+	retS, ok := ret.(bool)
+	if !ok {
+		return false, fmt.Errorf("expected boolean")
 	}
 	return retS, nil
 }
@@ -496,15 +639,26 @@
 	}
 }
 
+type reserveReq struct {
+	RemoteProxy bool `json:"remoteProxy"`
+}
+
 func (s *server) handleReserve(w http.ResponseWriter, r *http.Request) {
 	if r.Method != http.MethodPost {
 		http.Error(w, "only post method is supported", http.StatusBadRequest)
 		return
 	}
+	var req reserveReq
+	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+		fmt.Println(err.Error())
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+	fmt.Printf("%+v\n", req)
 	var port int
 	var secret string
 	var err error
-	if port, secret, err = s.client.ReservePort(); err != nil {
+	if port, secret, err = s.client.ReservePort(req.RemoteProxy); err != nil {
 		fmt.Println(err.Error())
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return