PortAllocator: Persist reservations

Speed up improve random port generation.
Update unit tests and add them to CI/CD.

Change-Id: Ia77f0b4cbccfdce86e6c3cedc68afe6796ca8bf4
diff --git a/core/port-allocator/main.go b/core/port-allocator/main.go
index fb5a151..6bc89e4 100644
--- a/core/port-allocator/main.go
+++ b/core/port-allocator/main.go
@@ -1,14 +1,15 @@
 package main
 
 import (
-	"crypto/rand"
 	"encoding/base64"
 	"encoding/json"
+	"errors"
 	"flag"
 	"fmt"
 	"io"
+	"io/fs"
 	"log"
-	"math/big"
+	"math/rand"
 	"net/http"
 	"os"
 	"strconv"
@@ -23,6 +24,8 @@
 
 const (
 	secretLength = 20
+	start        = 49152
+	end          = 65535
 )
 
 var port = flag.Int("port", 8080, "Port to listen on")
@@ -34,7 +37,7 @@
 
 type client interface {
 	ReservePort() (int, string, error)
-	ReleaseReservedPort(port int)
+	ReleaseReservedPort(port ...int)
 	AddPortForwarding(protocol string, port int, secret, dest string) error
 	RemovePortForwarding(protocol string, port int) error
 }
@@ -43,11 +46,13 @@
 	l                     sync.Locker
 	repo                  soft.RepoIO
 	path                  string
+	secretGenerator       SecretGenerator
 	minPreOpenPorts       int
 	preOpenPortsBatchSize int
 	preOpenPorts          []int
 	blocklist             map[int]struct{}
 	reserve               map[int]string
+	availablePorts        []int
 }
 
 func newRepoClient(
@@ -55,32 +60,46 @@
 	path string,
 	minPreOpenPorts int,
 	preOpenPortsBatchSize int,
+	secretGenerator SecretGenerator,
 ) (client, error) {
 	ret := &repoClient{
 		l:                     &sync.Mutex{},
 		repo:                  repo,
 		path:                  path,
+		secretGenerator:       secretGenerator,
 		minPreOpenPorts:       minPreOpenPorts,
 		preOpenPortsBatchSize: preOpenPortsBatchSize,
+		preOpenPorts:          []int{},
+		blocklist:             map[int]struct{}{},
+		reserve:               map[int]string{},
+		availablePorts:        []int{},
 	}
-	r, err := repo.Reader(fmt.Sprintf("%s-state.json", path))
+	st, err := ret.readState(repo)
 	if err != nil {
-		// TODO(gio): create empty file on init
-		return nil, err
-	}
-	defer r.Close()
-	var st state
-	if err := json.NewDecoder(r).Decode(&st); err != nil {
-		return nil, err
-	}
-	ret.preOpenPorts = st.PreOpenPorts
-	ret.blocklist = st.Blocklist
-	ret.reserve = map[int]string{}
-	if len(ret.preOpenPorts) < minPreOpenPorts {
-		if err := ret.preOpenNewPorts(); err != nil {
+		if !errors.Is(err, fs.ErrNotExist) {
 			return nil, err
 		}
+	} else {
+		ret.preOpenPorts = st.PreOpenPorts
+		ret.blocklist = st.Blocklist
+		ret.reserve = st.Reserve
 	}
+	for i := start; i < end; i++ {
+		if _, ok := ret.blocklist[i]; !ok {
+			ret.availablePorts = append(ret.availablePorts, i)
+		}
+	}
+	if err := ret.preOpenNewPorts(); err != nil {
+		return nil, err
+	}
+	var reservedPorts []int
+	for k := range ret.reserve {
+		reservedPorts = append(reservedPorts, k)
+	}
+	go func() {
+		time.Sleep(30 * time.Minute)
+		ret.ReleaseReservedPort(reservedPorts...)
+	}()
 	return ret, nil
 }
 
@@ -92,7 +111,7 @@
 	}
 	port := c.preOpenPorts[0]
 	c.preOpenPorts = c.preOpenPorts[1:]
-	secret, err := generateSecret()
+	secret, err := c.secretGenerator()
 	if err != nil {
 		return -1, "", err
 	}
@@ -100,16 +119,30 @@
 	return port, secret, nil
 }
 
-func (c *repoClient) ReleaseReservedPort(port int) {
+func (c *repoClient) ReleaseReservedPort(port ...int) {
+	if len(port) == 0 {
+		return
+	}
 	c.l.Lock()
 	defer c.l.Unlock()
-	delete(c.reserve, port)
-	c.preOpenPorts = append(c.preOpenPorts, port)
+	if _, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
+		for _, p := range port {
+			delete(c.reserve, p)
+			c.preOpenPorts = append(c.preOpenPorts, p)
+		}
+		if err := c.writeState(fs); err != nil {
+			return "", err
+		}
+		return fmt.Sprintf("Released port reservations: %+v", port), nil
+	}); err != nil {
+		panic(err)
+	}
 }
 
 type state struct {
 	PreOpenPorts []int            `json:"preOpenPorts"`
 	Blocklist    map[int]struct{} `json:"blocklist"`
+	Reserve      map[int]string   `json:"reserve"`
 }
 
 func (c *repoClient) preOpenNewPorts() error {
@@ -120,26 +153,18 @@
 	}
 	var ports []int
 	for count := c.preOpenPortsBatchSize; count > 0; count-- {
-		generated := false
-		for i := 0; i < 3; i++ {
-			r, err := rand.Int(rand.Reader, big.NewInt(end-start))
-			if err != nil {
-				return err
-			}
-			p := start + int(r.Int64())
-			if _, ok := c.blocklist[p]; !ok {
-				generated = true
-				ports = append(ports, p)
-				c.preOpenPorts = append(c.preOpenPorts, p)
-				c.blocklist[p] = struct{}{}
-				break
-			}
-		}
-		if !generated {
+		if len(c.availablePorts) == 0 {
 			return fmt.Errorf("could not open new port")
 		}
+		r := rand.Intn(len(c.availablePorts))
+		p := c.availablePorts[r]
+		c.availablePorts[r] = c.availablePorts[len(c.availablePorts)-1]
+		c.availablePorts = c.availablePorts[:len(c.availablePorts)-1]
+		ports = append(ports, p)
+		c.preOpenPorts = append(c.preOpenPorts, p)
+		c.blocklist[p] = struct{}{}
 	}
-	return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+	_, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
 		if err := c.writeState(fs); err != nil {
 			return "", err
 		}
@@ -160,11 +185,15 @@
 			if err != nil {
 				return "", err
 			}
+			fmt.Printf("%+v\n", tcp)
+			fmt.Printf("%+v\n", udp)
 			for _, p := range ports {
 				ps := strconv.Itoa(p)
 				tcp[ps] = p
 				udp[ps] = p
 			}
+			fmt.Printf("%+v\n", tcp)
+			fmt.Printf("%+v\n", udp)
 			if err := c.writeRelease(fs, rel); err != nil {
 				return "", err
 			}
@@ -172,15 +201,15 @@
 		fmt.Printf("Pre opened new ports: %+v\n", ports)
 		return "preopen new ports", nil
 	})
+	return err
 }
 
 func (c *repoClient) AddPortForwarding(protocol string, port int, secret, dest string) error {
+	protocol = strings.ToLower(protocol)
 	defer func() {
-		go func() {
-			if err := c.preOpenNewPorts(); err != nil {
-				panic(err)
-			}
-		}()
+		if err := c.preOpenNewPorts(); err != nil {
+			panic(err)
+		}
 	}()
 	c.l.Lock()
 	defer c.l.Unlock()
@@ -188,7 +217,7 @@
 		return fmt.Errorf("wrong secret")
 	}
 	delete(c.reserve, port)
-	return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+	_, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
 		if err := c.writeState(fs); err != nil {
 			return "", err
 		}
@@ -218,12 +247,14 @@
 		}
 		return fmt.Sprintf("ingress: port %s map %d %s", protocol, port, dest), nil
 	})
+	return err
 }
 
 func (c *repoClient) RemovePortForwarding(protocol string, port int) error {
+	protocol = strings.ToLower(protocol)
 	c.l.Lock()
 	defer c.l.Unlock()
-	return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+	_, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
 		rel, err := c.readRelease(fs)
 		if err != nil {
 			return "", err
@@ -273,6 +304,20 @@
 		}
 		return fmt.Sprintf("ingress: remove %s port map %d", protocol, port), nil
 	})
+	return err
+}
+
+func (c *repoClient) readState(fs soft.RepoFS) (state, error) {
+	r, err := fs.Reader(fmt.Sprintf("%s-state.json", c.path))
+	if err != nil {
+		return state{}, err
+	}
+	defer r.Close()
+	var ret state
+	if err := json.NewDecoder(r).Decode(&ret); err != nil {
+		return state{}, err
+	}
+	return ret, err
 }
 
 func (c *repoClient) writeState(fs soft.RepoFS) error {
@@ -281,7 +326,7 @@
 		return err
 	}
 	defer w.Close()
-	if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist}); err != nil {
+	if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist, c.reserve}); err != nil {
 		return err
 	}
 	return err
@@ -429,62 +474,6 @@
 	return nil
 }
 
-const start = 49152
-const end = 65535
-
-func updateNodePorts(rel map[string]any, protocol string, pm map[string]any) error {
-	spec, ok := rel["spec"]
-	if !ok {
-		return fmt.Errorf("spec not found")
-	}
-	specM, ok := spec.(map[string]any)
-	if !ok {
-		return fmt.Errorf("spec is not a map")
-	}
-	values, ok := specM["values"]
-	if !ok {
-		return fmt.Errorf("spec.values not found")
-	}
-	valuesM, ok := values.(map[string]any)
-	if !ok {
-		return fmt.Errorf("spec.values is not a map")
-	}
-	controller, ok := valuesM["controller"]
-	if !ok {
-		return fmt.Errorf("spec.values.controller not found")
-	}
-	controllerM, ok := controller.(map[string]any)
-	if !ok {
-		return fmt.Errorf("spec.values.controller is not a map")
-	}
-	service, ok := controllerM["service"]
-	if !ok {
-		return fmt.Errorf("spec.values.controller.service not found")
-	}
-	serviceM, ok := service.(map[string]any)
-	if !ok {
-		return fmt.Errorf("spec.values.controller.service is not a map")
-	}
-	nodePorts, ok := serviceM["nodePorts"]
-	if !ok {
-		return fmt.Errorf("spec.values.controller.service.nodePorts not found")
-	}
-	nodePortsM, ok := nodePorts.(map[string]any)
-	if !ok {
-		return fmt.Errorf("spec.values.controller.service.nodePorts is not a map")
-	}
-	npm := map[string]any{}
-	for p, _ := range pm {
-		if v, err := strconv.Atoi(p); err != nil {
-			return err
-		} else {
-			npm[p] = v
-		}
-	}
-	nodePortsM[protocol] = npm
-	return nil
-}
-
 func (s *server) handleAllocate(w http.ResponseWriter, r *http.Request) {
 	if r.Method != http.MethodPost {
 		http.Error(w, "only post method is supported", http.StatusBadRequest)
@@ -570,6 +559,8 @@
 	return soft.NewRepoIO(repo, signer)
 }
 
+type SecretGenerator func() (string, error)
+
 func generateSecret() (string, error) {
 	b := make([]byte, secretLength)
 	_, err := rand.Read(b)
@@ -590,6 +581,7 @@
 		*ingressNginxPath,
 		*minPreOpenPorts,
 		*preOpenPortsBatchSize,
+		generateSecret,
 	)
 	if err != nil {
 		log.Fatal(err)