Installer: dynamically generate open port requests

App config can mark any of the input (int) fields as having a role.
For such fields installer first will make port reservation request to
Port Allocator, which will dynamically allocate and reserve one of the
available ports for the application. Once application is committed to
config repository, installer makes another request to port allocator
to actually open dynamically reserved port in the ingress service.

Added port reservation logic to Port Allocator. Reservation lasts 30
minutes.

Change-Id: Ic8caa0d04459b1a6e8a351e2ca6964ac15c7253d
diff --git a/core/installer/app_manager.go b/core/installer/app_manager.go
index 612f9a7..4f864e4 100644
--- a/core/installer/app_manager.go
+++ b/core/installer/app_manager.go
@@ -5,13 +5,14 @@
 	"encoding/json"
 	"errors"
 	"fmt"
+	"io"
 	"io/fs"
 	"net/http"
 	"path"
 	"path/filepath"
 	"strings"
 
-	"github.com/giolekva/pcloud/core/installer/io"
+	gio "github.com/giolekva/pcloud/core/installer/io"
 	"github.com/giolekva/pcloud/core/installer/soft"
 
 	helmv2 "github.com/fluxcd/helm-controller/api/v2"
@@ -151,9 +152,36 @@
 	SourcePort    int    `json:"sourcePort"`
 	TargetService string `json:"targetService"`
 	TargetPort    int    `json:"targetPort"`
+	Secret        string `json:"secret"`
 }
 
-func openPorts(ports []PortForward) error {
+type reservePortResp struct {
+	Port   int    `json:"port"`
+	Secret string `json:"secret"`
+}
+
+func reservePorts(ports map[string]string) (map[string]reservePortResp, error) {
+	ret := map[string]reservePortResp{}
+	for p, reserveAddr := range ports {
+		resp, err := http.Post(reserveAddr, "application/json", nil) // TODO(gio): address
+		if err != nil {
+			return nil, err
+		}
+		if resp.StatusCode != http.StatusOK {
+			var e bytes.Buffer
+			io.Copy(&e, resp.Body)
+			return nil, fmt.Errorf("Could not reserve port: %s", e.String())
+		}
+		var r reservePortResp
+		if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+			return nil, err
+		}
+		ret[p] = r
+	}
+	return ret, nil
+}
+
+func openPorts(ports []PortForward, reservations map[string]reservePortResp, allocators map[string]string) error {
 	for _, p := range ports {
 		var buf bytes.Buffer
 		req := allocatePortReq{
@@ -165,7 +193,18 @@
 		if err := json.NewEncoder(&buf).Encode(req); err != nil {
 			return err
 		}
-		resp, err := http.Post(p.Allocator, "application/json", &buf)
+		allocator := ""
+		for n, r := range reservations {
+			if p.SourcePort == r.Port {
+				allocator = allocators[n]
+				req.Secret = reservations[n].Secret
+				break
+			}
+		}
+		if allocator == "" {
+			return fmt.Errorf("Could not find allocator for: %d", p.SourcePort)
+		}
+		resp, err := http.Post(allocator, "application/json", &buf)
 		if err != nil {
 			return err
 		}
@@ -183,7 +222,7 @@
 		kust, err := soft.ReadKustomization(r, kustPath)
 		if err != nil {
 			if errors.Is(err, fs.ErrNotExist) {
-				k := io.NewKustomization()
+				k := gio.NewKustomization()
 				kust = &k
 			} else {
 				return err
@@ -264,7 +303,7 @@
 			if err := createKustomizationChain(r, resourcesDir); err != nil {
 				return "", err
 			}
-			appKust := io.NewKustomization()
+			appKust := gio.NewKustomization()
 			for name, contents := range resources {
 				appKust.AddResources(name)
 				w, err := r.Writer(path.Join(resourcesDir, name))
@@ -293,6 +332,14 @@
 	values map[string]any,
 	opts ...InstallOption,
 ) (ReleaseResources, error) {
+	portFields := findPortFields(app.Schema())
+	fakeReservations := map[string]reservePortResp{}
+	for i, f := range portFields {
+		fakeReservations[f] = reservePortResp{Port: i}
+	}
+	if err := setPortFields(values, fakeReservations); err != nil {
+		return ReleaseResources{}, err
+	}
 	o := &installOptions{}
 	for _, i := range opts {
 		i(o)
@@ -330,6 +377,19 @@
 	if err != nil {
 		return ReleaseResources{}, err
 	}
+	reservators := map[string]string{}
+	allocators := map[string]string{}
+	for _, pf := range rendered.Ports {
+		reservators[portFields[pf.SourcePort]] = pf.ReserveAddr
+		allocators[portFields[pf.SourcePort]] = pf.Allocator
+	}
+	portReservations, err := reservePorts(reservators)
+	if err != nil {
+		return ReleaseResources{}, err
+	}
+	if err := setPortFields(values, portReservations); err != nil {
+		return ReleaseResources{}, err
+	}
 	imageRegistry := fmt.Sprintf("zot.%s", env.PrivateDomain)
 	if o.FetchContainerImages {
 		if err := pullContainerImages(instanceId, rendered.ContainerImages, imageRegistry, namespace, m.jc); err != nil {
@@ -358,7 +418,7 @@
 		return ReleaseResources{}, err
 	}
 	// TODO(gio): add ingress-nginx to release resources
-	if err := openPorts(rendered.Ports); err != nil {
+	if err := openPorts(rendered.Ports, portReservations, allocators); err != nil {
 		return ReleaseResources{}, err
 	}
 	return ReleaseResources{
@@ -466,12 +526,14 @@
 			CertificateIssuer: fmt.Sprintf("%s-public", env.Id),
 			Domain:            env.Domain,
 			AllocatePortAddr:  fmt.Sprintf("http://port-allocator.%s-ingress-public.svc.cluster.local/api/allocate", env.InfraName),
+			ReservePortAddr:   fmt.Sprintf("http://port-allocator.%s-ingress-public.svc.cluster.local/api/reserve", env.InfraName),
 		},
 		{
 			Name:             "Private",
 			IngressClass:     fmt.Sprintf("%s-ingress-private", env.Id),
 			Domain:           env.PrivateDomain,
 			AllocatePortAddr: fmt.Sprintf("http://port-allocator.%s-ingress-private.svc.cluster.local/api/allocate", env.Id),
+			ReservePortAddr:  fmt.Sprintf("http://port-allocator.%s-ingress-private.svc.cluster.local/api/reserve", env.Id),
 		},
 	}
 }
@@ -706,3 +768,71 @@
 	}
 	return cfg.LocalCharts, nil
 }
+
+func findPortFields(scm Schema) []string {
+	switch scm.Kind() {
+	case KindBoolean:
+		return []string{}
+	case KindInt:
+		return []string{}
+	case KindString:
+		return []string{}
+	case KindStruct:
+		ret := []string{}
+		for _, f := range scm.Fields() {
+			for _, p := range findPortFields(f.Schema) {
+				if p == "" {
+					ret = append(ret, f.Name)
+				} else {
+					ret = append(ret, fmt.Sprintf("%s.%s", f.Name, p))
+				}
+			}
+		}
+		return ret
+	case KindNetwork:
+		return []string{}
+	case KindAuth:
+		return []string{}
+	case KindSSHKey:
+		return []string{}
+	case KindNumber:
+		return []string{}
+	case KindArrayString:
+		return []string{}
+	case KindPort:
+		return []string{""}
+	default:
+		panic("MUST NOT REACH!")
+	}
+}
+
+func setPortFields(values map[string]any, ports map[string]reservePortResp) error {
+	for p, r := range ports {
+		if err := setPortField(values, p, r.Port); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func setPortField(values map[string]any, field string, port int) error {
+	f := strings.SplitN(field, ".", 2)
+	if len(f) == 2 {
+		var sub map[string]any
+		if s, ok := values[f[0]]; ok {
+			sub, ok = s.(map[string]any)
+			if !ok {
+				return fmt.Errorf("expected map")
+			}
+		} else {
+			sub = map[string]any{}
+			values[f[0]] = sub
+		}
+		if err := setPortField(sub, f[1], port); err != nil {
+			return err
+		}
+	} else {
+		values[f[0]] = port
+	}
+	return nil
+}