PortAllocator: Persist reservations

Speed up improve random port generation.
Update unit tests and add them to CI/CD.

Change-Id: Ia77f0b4cbccfdce86e6c3cedc68afe6796ca8bf4
diff --git a/Jenkinsfile b/Jenkinsfile
index 375983c..b800f63 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -13,7 +13,7 @@
 		}
     }
     stages {
-        stage('installer auth') {
+        stage('build/test') {
             steps {
 				container('golang') {
                 	dir('core/installer') {
@@ -26,6 +26,11 @@
                 		sh 'go build *.go'
                 		sh 'go test ./...'
 					}
+                    dir('core/port-allocator') {
+                        sh 'go mod tidy'
+                		sh 'go build *.go'
+                		sh 'go test ./...'
+					}
 				}
             }
         }
diff --git a/core/installer/soft/repoio_mem.go b/core/installer/soft/repoio_mem.go
new file mode 100644
index 0000000..f0ac1b6
--- /dev/null
+++ b/core/installer/soft/repoio_mem.go
@@ -0,0 +1,46 @@
+package soft
+
+import (
+	"sync"
+	"testing"
+)
+
+type mockRepoIO struct {
+	RepoFS
+	addr string
+	t    *testing.T
+	l    sync.Locker
+}
+
+func NewMockRepoIO(fs RepoFS, addr string, t *testing.T) RepoIO {
+	return &mockRepoIO{
+		RepoFS: fs,
+		addr:   addr,
+		t:      t,
+		l:      &sync.Mutex{},
+	}
+}
+
+func (r mockRepoIO) FullAddress() string {
+	return r.addr
+}
+
+func (r mockRepoIO) Pull() error {
+	r.t.Logf("Pull: %s", r.addr)
+	return nil
+}
+
+func (r mockRepoIO) CommitAndPush(message string, opts ...PushOption) (string, error) {
+	r.t.Logf("Commit and push: %s", message)
+	return "", nil
+}
+
+func (r mockRepoIO) Do(op DoFn, _ ...DoOption) (string, error) {
+	r.l.Lock()
+	defer r.l.Unlock()
+	msg, err := op(r)
+	if err != nil {
+		return "", err
+	}
+	return r.CommitAndPush(msg)
+}
diff --git a/core/installer/welcome/env_test.go b/core/installer/welcome/env_test.go
index e7e75a8..f41110d 100644
--- a/core/installer/welcome/env_test.go
+++ b/core/installer/welcome/env_test.go
@@ -260,7 +260,7 @@
 	jc := fakeJobCreator{t}
 	hf := fakeHelmFetcher{t}
 	lg := installer.GitRepositoryLocalChartGenerator{"foo", "bar"}
-	infraRepo := mockRepoIO{soft.NewBillyRepoFS(infraFS), "foo.bar", t, &sync.Mutex{}}
+	infraRepo := soft.NewMockRepoIO(soft.NewBillyRepoFS(infraFS), "foo.bar", t)
 	infraMgr, err := installer.NewInfraAppManager(infraRepo, nsCreator, hf, lg)
 	if err != nil {
 		t.Fatal(err)
diff --git a/core/port-allocator/main.go b/core/port-allocator/main.go
index fb5a151..6bc89e4 100644
--- a/core/port-allocator/main.go
+++ b/core/port-allocator/main.go
@@ -1,14 +1,15 @@
 package main
 
 import (
-	"crypto/rand"
 	"encoding/base64"
 	"encoding/json"
+	"errors"
 	"flag"
 	"fmt"
 	"io"
+	"io/fs"
 	"log"
-	"math/big"
+	"math/rand"
 	"net/http"
 	"os"
 	"strconv"
@@ -23,6 +24,8 @@
 
 const (
 	secretLength = 20
+	start        = 49152
+	end          = 65535
 )
 
 var port = flag.Int("port", 8080, "Port to listen on")
@@ -34,7 +37,7 @@
 
 type client interface {
 	ReservePort() (int, string, error)
-	ReleaseReservedPort(port int)
+	ReleaseReservedPort(port ...int)
 	AddPortForwarding(protocol string, port int, secret, dest string) error
 	RemovePortForwarding(protocol string, port int) error
 }
@@ -43,11 +46,13 @@
 	l                     sync.Locker
 	repo                  soft.RepoIO
 	path                  string
+	secretGenerator       SecretGenerator
 	minPreOpenPorts       int
 	preOpenPortsBatchSize int
 	preOpenPorts          []int
 	blocklist             map[int]struct{}
 	reserve               map[int]string
+	availablePorts        []int
 }
 
 func newRepoClient(
@@ -55,32 +60,46 @@
 	path string,
 	minPreOpenPorts int,
 	preOpenPortsBatchSize int,
+	secretGenerator SecretGenerator,
 ) (client, error) {
 	ret := &repoClient{
 		l:                     &sync.Mutex{},
 		repo:                  repo,
 		path:                  path,
+		secretGenerator:       secretGenerator,
 		minPreOpenPorts:       minPreOpenPorts,
 		preOpenPortsBatchSize: preOpenPortsBatchSize,
+		preOpenPorts:          []int{},
+		blocklist:             map[int]struct{}{},
+		reserve:               map[int]string{},
+		availablePorts:        []int{},
 	}
-	r, err := repo.Reader(fmt.Sprintf("%s-state.json", path))
+	st, err := ret.readState(repo)
 	if err != nil {
-		// TODO(gio): create empty file on init
-		return nil, err
-	}
-	defer r.Close()
-	var st state
-	if err := json.NewDecoder(r).Decode(&st); err != nil {
-		return nil, err
-	}
-	ret.preOpenPorts = st.PreOpenPorts
-	ret.blocklist = st.Blocklist
-	ret.reserve = map[int]string{}
-	if len(ret.preOpenPorts) < minPreOpenPorts {
-		if err := ret.preOpenNewPorts(); err != nil {
+		if !errors.Is(err, fs.ErrNotExist) {
 			return nil, err
 		}
+	} else {
+		ret.preOpenPorts = st.PreOpenPorts
+		ret.blocklist = st.Blocklist
+		ret.reserve = st.Reserve
 	}
+	for i := start; i < end; i++ {
+		if _, ok := ret.blocklist[i]; !ok {
+			ret.availablePorts = append(ret.availablePorts, i)
+		}
+	}
+	if err := ret.preOpenNewPorts(); err != nil {
+		return nil, err
+	}
+	var reservedPorts []int
+	for k := range ret.reserve {
+		reservedPorts = append(reservedPorts, k)
+	}
+	go func() {
+		time.Sleep(30 * time.Minute)
+		ret.ReleaseReservedPort(reservedPorts...)
+	}()
 	return ret, nil
 }
 
@@ -92,7 +111,7 @@
 	}
 	port := c.preOpenPorts[0]
 	c.preOpenPorts = c.preOpenPorts[1:]
-	secret, err := generateSecret()
+	secret, err := c.secretGenerator()
 	if err != nil {
 		return -1, "", err
 	}
@@ -100,16 +119,30 @@
 	return port, secret, nil
 }
 
-func (c *repoClient) ReleaseReservedPort(port int) {
+func (c *repoClient) ReleaseReservedPort(port ...int) {
+	if len(port) == 0 {
+		return
+	}
 	c.l.Lock()
 	defer c.l.Unlock()
-	delete(c.reserve, port)
-	c.preOpenPorts = append(c.preOpenPorts, port)
+	if _, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
+		for _, p := range port {
+			delete(c.reserve, p)
+			c.preOpenPorts = append(c.preOpenPorts, p)
+		}
+		if err := c.writeState(fs); err != nil {
+			return "", err
+		}
+		return fmt.Sprintf("Released port reservations: %+v", port), nil
+	}); err != nil {
+		panic(err)
+	}
 }
 
 type state struct {
 	PreOpenPorts []int            `json:"preOpenPorts"`
 	Blocklist    map[int]struct{} `json:"blocklist"`
+	Reserve      map[int]string   `json:"reserve"`
 }
 
 func (c *repoClient) preOpenNewPorts() error {
@@ -120,26 +153,18 @@
 	}
 	var ports []int
 	for count := c.preOpenPortsBatchSize; count > 0; count-- {
-		generated := false
-		for i := 0; i < 3; i++ {
-			r, err := rand.Int(rand.Reader, big.NewInt(end-start))
-			if err != nil {
-				return err
-			}
-			p := start + int(r.Int64())
-			if _, ok := c.blocklist[p]; !ok {
-				generated = true
-				ports = append(ports, p)
-				c.preOpenPorts = append(c.preOpenPorts, p)
-				c.blocklist[p] = struct{}{}
-				break
-			}
-		}
-		if !generated {
+		if len(c.availablePorts) == 0 {
 			return fmt.Errorf("could not open new port")
 		}
+		r := rand.Intn(len(c.availablePorts))
+		p := c.availablePorts[r]
+		c.availablePorts[r] = c.availablePorts[len(c.availablePorts)-1]
+		c.availablePorts = c.availablePorts[:len(c.availablePorts)-1]
+		ports = append(ports, p)
+		c.preOpenPorts = append(c.preOpenPorts, p)
+		c.blocklist[p] = struct{}{}
 	}
-	return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+	_, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
 		if err := c.writeState(fs); err != nil {
 			return "", err
 		}
@@ -160,11 +185,15 @@
 			if err != nil {
 				return "", err
 			}
+			fmt.Printf("%+v\n", tcp)
+			fmt.Printf("%+v\n", udp)
 			for _, p := range ports {
 				ps := strconv.Itoa(p)
 				tcp[ps] = p
 				udp[ps] = p
 			}
+			fmt.Printf("%+v\n", tcp)
+			fmt.Printf("%+v\n", udp)
 			if err := c.writeRelease(fs, rel); err != nil {
 				return "", err
 			}
@@ -172,15 +201,15 @@
 		fmt.Printf("Pre opened new ports: %+v\n", ports)
 		return "preopen new ports", nil
 	})
+	return err
 }
 
 func (c *repoClient) AddPortForwarding(protocol string, port int, secret, dest string) error {
+	protocol = strings.ToLower(protocol)
 	defer func() {
-		go func() {
-			if err := c.preOpenNewPorts(); err != nil {
-				panic(err)
-			}
-		}()
+		if err := c.preOpenNewPorts(); err != nil {
+			panic(err)
+		}
 	}()
 	c.l.Lock()
 	defer c.l.Unlock()
@@ -188,7 +217,7 @@
 		return fmt.Errorf("wrong secret")
 	}
 	delete(c.reserve, port)
-	return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+	_, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
 		if err := c.writeState(fs); err != nil {
 			return "", err
 		}
@@ -218,12 +247,14 @@
 		}
 		return fmt.Sprintf("ingress: port %s map %d %s", protocol, port, dest), nil
 	})
+	return err
 }
 
 func (c *repoClient) RemovePortForwarding(protocol string, port int) error {
+	protocol = strings.ToLower(protocol)
 	c.l.Lock()
 	defer c.l.Unlock()
-	return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+	_, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
 		rel, err := c.readRelease(fs)
 		if err != nil {
 			return "", err
@@ -273,6 +304,20 @@
 		}
 		return fmt.Sprintf("ingress: remove %s port map %d", protocol, port), nil
 	})
+	return err
+}
+
+func (c *repoClient) readState(fs soft.RepoFS) (state, error) {
+	r, err := fs.Reader(fmt.Sprintf("%s-state.json", c.path))
+	if err != nil {
+		return state{}, err
+	}
+	defer r.Close()
+	var ret state
+	if err := json.NewDecoder(r).Decode(&ret); err != nil {
+		return state{}, err
+	}
+	return ret, err
 }
 
 func (c *repoClient) writeState(fs soft.RepoFS) error {
@@ -281,7 +326,7 @@
 		return err
 	}
 	defer w.Close()
-	if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist}); err != nil {
+	if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist, c.reserve}); err != nil {
 		return err
 	}
 	return err
@@ -429,62 +474,6 @@
 	return nil
 }
 
-const start = 49152
-const end = 65535
-
-func updateNodePorts(rel map[string]any, protocol string, pm map[string]any) error {
-	spec, ok := rel["spec"]
-	if !ok {
-		return fmt.Errorf("spec not found")
-	}
-	specM, ok := spec.(map[string]any)
-	if !ok {
-		return fmt.Errorf("spec is not a map")
-	}
-	values, ok := specM["values"]
-	if !ok {
-		return fmt.Errorf("spec.values not found")
-	}
-	valuesM, ok := values.(map[string]any)
-	if !ok {
-		return fmt.Errorf("spec.values is not a map")
-	}
-	controller, ok := valuesM["controller"]
-	if !ok {
-		return fmt.Errorf("spec.values.controller not found")
-	}
-	controllerM, ok := controller.(map[string]any)
-	if !ok {
-		return fmt.Errorf("spec.values.controller is not a map")
-	}
-	service, ok := controllerM["service"]
-	if !ok {
-		return fmt.Errorf("spec.values.controller.service not found")
-	}
-	serviceM, ok := service.(map[string]any)
-	if !ok {
-		return fmt.Errorf("spec.values.controller.service is not a map")
-	}
-	nodePorts, ok := serviceM["nodePorts"]
-	if !ok {
-		return fmt.Errorf("spec.values.controller.service.nodePorts not found")
-	}
-	nodePortsM, ok := nodePorts.(map[string]any)
-	if !ok {
-		return fmt.Errorf("spec.values.controller.service.nodePorts is not a map")
-	}
-	npm := map[string]any{}
-	for p, _ := range pm {
-		if v, err := strconv.Atoi(p); err != nil {
-			return err
-		} else {
-			npm[p] = v
-		}
-	}
-	nodePortsM[protocol] = npm
-	return nil
-}
-
 func (s *server) handleAllocate(w http.ResponseWriter, r *http.Request) {
 	if r.Method != http.MethodPost {
 		http.Error(w, "only post method is supported", http.StatusBadRequest)
@@ -570,6 +559,8 @@
 	return soft.NewRepoIO(repo, signer)
 }
 
+type SecretGenerator func() (string, error)
+
 func generateSecret() (string, error) {
 	b := make([]byte, secretLength)
 	_, err := rand.Read(b)
@@ -590,6 +581,7 @@
 		*ingressNginxPath,
 		*minPreOpenPorts,
 		*preOpenPortsBatchSize,
+		generateSecret,
 	)
 	if err != nil {
 		log.Fatal(err)
diff --git a/core/port-allocator/main_test.go b/core/port-allocator/main_test.go
index caf2d51..93ae574 100644
--- a/core/port-allocator/main_test.go
+++ b/core/port-allocator/main_test.go
@@ -1,166 +1,79 @@
 package main
 
 import (
-	"bytes"
-	"encoding/json"
-	"io"
-	"net/http"
-	"os"
+	"fmt"
 	"reflect"
+	"strconv"
 	"testing"
+
+	"github.com/giolekva/pcloud/core/installer/soft"
+
+	"github.com/go-git/go-billy/v5/memfs"
 )
 
-type fakeClient struct {
-	contents map[string]any
-}
-
-func (c *fakeClient) ReadRelease() (map[string]any, error) {
-	return c.contents, nil
-}
-
-func (c *fakeClient) WriteRelease(rel map[string]any, meta string) error {
-	c.contents = rel
-	return nil
+func fakeSecretGenerator(secret string) SecretGenerator {
+	return func() (string, error) {
+		return secret, nil
+	}
 }
 
 func TestAllocateSucceeds(t *testing.T) {
-	c := &fakeClient{map[string]any{
-		"spec": map[string]any{
-			"values": map[string]any{},
-		},
-	}}
-	s := newServer(8080, c) // TODO(gio): run using unix socket
-	go func() {
-		s.Start()
-	}()
-	defer s.Close()
-	var buf bytes.Buffer
-	req := allocateReq{
-		Protocol:      "TCP",
-		SourcePort:    22,
-		TargetService: "foo",
-		TargetPort:    2222,
-	}
-	if err := json.NewEncoder(&buf).Encode(req); err != nil {
-		t.Fatal(err)
-	}
-	resp, err := http.Post("http://localhost:8080/api/allocate", "application/json", &buf)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if resp.StatusCode != http.StatusOK {
-		io.Copy(os.Stdout, resp.Body)
-		t.Fatalf("Expected %d, got %d", http.StatusOK, resp.StatusCode)
-	}
-	expected := map[string]any{
+	ingressPath := "/ingress.yaml"
+	fs := memfs.New()
+	repo := soft.NewMockRepoIO(soft.NewBillyRepoFS(fs), "foo.bar", t)
+	if err := soft.WriteYaml(repo, ingressPath, map[string]any{
 		"spec": map[string]any{
 			"values": map[string]any{
-				"tcp": map[string]any{
-					"22": "foo:2222",
+				"controller": map[string]any{
+					"service": map[string]any{
+						"type": "ClusterIP",
+					},
 				},
+				"tcp": map[string]any{},
 				"udp": map[string]any{},
 			},
 		},
+	}); err != nil {
+		t.Fatal(err)
 	}
-	if !reflect.DeepEqual(expected, c.contents) {
-		t.Fatalf("Expected %v, got %v", expected, c.contents)
+	c, err := newRepoClient(repo, ingressPath, 5, 10, fakeSecretGenerator("test"))
+	if err != nil {
+		t.Fatal(err)
 	}
-}
-
-func TestAllocateConflicts(t *testing.T) {
-	c := &fakeClient{map[string]any{
+	tcp := map[string]any{}
+	udp := map[string]any{}
+	expected := map[string]any{
 		"spec": map[string]any{
 			"values": map[string]any{
-				"tcp": map[string]any{
-					"22": "foo:2222",
+				"controller": map[string]any{
+					"service": map[string]any{
+						"type": "ClusterIP",
+					},
 				},
+				"tcp": tcp,
+				"udp": udp,
 			},
 		},
-	}}
-	s := newServer(8080, c) // TODO(gio): run using unix socket
-	go func() {
-		s.Start()
-	}()
-	defer s.Close()
-	var buf bytes.Buffer
-	req := allocateReq{
-		Protocol:      "TCP",
-		SourcePort:    22,
-		TargetService: "foo",
-		TargetPort:    2222,
 	}
-	if err := json.NewEncoder(&buf).Encode(req); err != nil {
+	for i := 0; i < 500; i++ {
+		for _, protocol := range []string{"tcp", "udp"} {
+			port, secret, err := c.ReservePort()
+			if err != nil {
+				t.Fatal(err)
+			}
+			target := fmt.Sprintf("%s/bar:%d", protocol, port)
+			if err := c.AddPortForwarding("tcp", port, secret, target); err != nil {
+				t.Fatal(err)
+			}
+			tcp[strconv.Itoa(port)] = target
+		}
+	}
+	var actual map[string]any
+	if err := soft.ReadYaml(repo, ingressPath, &actual); err != nil {
 		t.Fatal(err)
 	}
-	resp, err := http.Post("http://localhost:8080/api/allocate", "application/json", &buf)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if resp.StatusCode != http.StatusConflict {
-		io.Copy(os.Stdout, resp.Body)
-		t.Fatalf("Expected %d, got %d", http.StatusConflict, resp.StatusCode)
-	}
-}
-
-func TestAllocate80Taken(t *testing.T) {
-	c := &fakeClient{map[string]any{
-		"spec": map[string]any{
-			"values": map[string]any{},
-		},
-	}}
-	s := newServer(8080, c) // TODO(gio): run using unix socket
-	go func() {
-		s.Start()
-	}()
-	defer s.Close()
-	var buf bytes.Buffer
-	req := allocateReq{
-		Protocol:      "TCP",
-		SourcePort:    80,
-		TargetService: "foo",
-		TargetPort:    2222,
-	}
-	if err := json.NewEncoder(&buf).Encode(req); err != nil {
-		t.Fatal(err)
-	}
-	resp, err := http.Post("http://localhost:8080/api/allocate", "application/json", &buf)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if resp.StatusCode != http.StatusConflict {
-		io.Copy(os.Stdout, resp.Body)
-		t.Fatalf("Expected %d, got %d", http.StatusConflict, resp.StatusCode)
-	}
-}
-
-func TestAllocate443Taken(t *testing.T) {
-	c := &fakeClient{map[string]any{
-		"spec": map[string]any{
-			"values": map[string]any{},
-		},
-	}}
-	s := newServer(8080, c) // TODO(gio): run using unix socket
-	go func() {
-		s.Start()
-	}()
-	defer s.Close()
-	var buf bytes.Buffer
-	req := allocateReq{
-		Protocol:      "TCP",
-		SourcePort:    443,
-		TargetService: "foo",
-		TargetPort:    2222,
-	}
-	if err := json.NewEncoder(&buf).Encode(req); err != nil {
-		t.Fatal(err)
-	}
-	resp, err := http.Post("http://localhost:8080/api/allocate", "application/json", &buf)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if resp.StatusCode != http.StatusConflict {
-		io.Copy(os.Stdout, resp.Body)
-		t.Fatalf("Expected %d, got %d", http.StatusConflict, resp.StatusCode)
+	if !reflect.DeepEqual(expected, actual) {
+		t.Fatalf("Expected %v, got %v", expected, actual)
 	}
 }