IngressPublic: use Deployment with NodePort service

Spread replicas on different hosts.
Now PortAllocator allocates node ports as well.

Change-Id: Ia16cb24673fc6c61185f21ab30fde8964157aa4c
diff --git a/core/installer/values-tmpl/ingress-public.cue b/core/installer/values-tmpl/ingress-public.cue
index 619f15a..e675fc9 100644
--- a/core/installer/values-tmpl/ingress-public.cue
+++ b/core/installer/values-tmpl/ingress-public.cue
@@ -46,10 +46,41 @@
 		values: {
 			fullnameOverride: "\(global.pcloudEnvName)-ingress-public"
 			controller: {
-				kind: "DaemonSet"
-				hostNetwork: true
-				hostPort: enabled: true
-				service: enabled: false
+				kind: "Deployment"
+				replicaCount: 1 // TODO(gio): configurable
+				topologySpreadConstraints: [{
+					labelSelector: {
+						matchLabels: {
+							"app.kubernetes.io/instance": "ingress-public"
+						}
+					}
+					maxSkew: 1
+					topologyKey: "kubernetes.io/hostname"
+					whenUnsatisfiable: "DoNotSchedule"
+				}]
+				hostNetwork: false
+				hostPort: enabled: false
+				updateStrategy: {
+					type: "RollingUpdate"
+					rollingUpdate: {
+						maxSurge: "100%"
+						maxUnavailable: "30%"
+					}
+				}
+				service: {
+					enabled: true
+					type: "NodePort"
+					nodePorts: {
+						http: 80
+						https: 443
+						tcp: {
+							"53": 53
+						}
+						udp: {
+							"53": 53
+						}
+					}
+				}
 				ingressClassByName: true
 				ingressClassResource: {
 					name: networks.public.ingressClass
diff --git a/core/port-allocator/main.go b/core/port-allocator/main.go
index 6c7d361..8c14183 100644
--- a/core/port-allocator/main.go
+++ b/core/port-allocator/main.go
@@ -29,41 +29,268 @@
 var repoAddr = flag.String("repo-addr", "", "Git repository address where Helm releases are stored")
 var sshKey = flag.String("ssh-key", "", "Path to SHH key used to connect with Git repository")
 var ingressNginxPath = flag.String("ingress-nginx-path", "", "Path to the ingress-nginx Helm release")
+var minPreOpenPorts = flag.Int("min-pre-open-ports", 5, "Minimum number of pre-open ports to keep in reserve")
+var preOpenPortsBatchSize = flag.Int("pre-open-ports-batch-size", 10, "Number of new ports to open at a time")
 
 type client interface {
-	ReadRelease() (map[string]any, error)
-	WriteRelease(rel map[string]any, meta string) error
+	ReservePort() (int, string, error)
+	ReleaseReservedPort(port int)
+	AddPortForwarding(protocol string, port int, secret, dest string) error
+	RemovePortForwarding(protocol string, port int) error
 }
 
 type repoClient struct {
-	repo soft.RepoIO
-	path string
+	l                     sync.Locker
+	repo                  soft.RepoIO
+	path                  string
+	minPreOpenPorts       int
+	preOpenPortsBatchSize int
+	preOpenPorts          []int
+	blocklist             map[int]struct{}
+	reserve               map[int]string
 }
 
-func (c *repoClient) ReadRelease() (map[string]any, error) {
-	if err := c.repo.Pull(); err != nil {
+func newRepoClient(
+	repo soft.RepoIO,
+	path string,
+	minPreOpenPorts int,
+	preOpenPortsBatchSize int,
+) (client, error) {
+	ret := &repoClient{
+		l:                     &sync.Mutex{},
+		repo:                  repo,
+		path:                  path,
+		minPreOpenPorts:       minPreOpenPorts,
+		preOpenPortsBatchSize: preOpenPortsBatchSize,
+	}
+	r, err := repo.Reader(fmt.Sprintf("%s-state.json", path))
+	if err != nil {
+		// TODO(gio): create empty file on init
 		return nil, err
 	}
-	ingressRel := map[string]any{}
-	if err := soft.ReadYaml(c.repo, c.path, &ingressRel); err != nil {
+	defer r.Close()
+	var st state
+	if err := json.NewDecoder(r).Decode(&st); err != nil {
 		return nil, err
 	}
-	return ingressRel, nil
+	ret.preOpenPorts = st.PreOpenPorts
+	ret.blocklist = st.Blocklist
+	ret.reserve = map[int]string{}
+	if len(ret.preOpenPorts) < minPreOpenPorts {
+		if err := ret.preOpenNewPorts(); err != nil {
+			return nil, err
+		}
+	}
+	return ret, nil
 }
 
-func (c *repoClient) WriteRelease(rel map[string]any, meta string) error {
-	if err := soft.WriteYaml(c.repo, c.path, rel); err != nil {
+func (c *repoClient) ReservePort() (int, string, error) {
+	c.l.Lock()
+	defer c.l.Unlock()
+	if len(c.preOpenPorts) == 0 {
+		return -1, "", fmt.Errorf("no pre-open ports are available")
+	}
+	port := c.preOpenPorts[0]
+	c.preOpenPorts = c.preOpenPorts[1:]
+	secret, err := generateSecret()
+	if err != nil {
+		return -1, "", err
+	}
+	c.reserve[port] = secret
+	return port, secret, nil
+}
+
+func (c *repoClient) ReleaseReservedPort(port int) {
+	c.l.Lock()
+	defer c.l.Unlock()
+	delete(c.reserve, port)
+	c.preOpenPorts = append(c.preOpenPorts, port)
+}
+
+type state struct {
+	PreOpenPorts []int            `json:"preOpenPorts"`
+	Blocklist    map[int]struct{} `json:"blocklist"`
+}
+
+func (c *repoClient) preOpenNewPorts() error {
+	c.l.Lock()
+	defer c.l.Unlock()
+	if len(c.preOpenPorts) >= c.minPreOpenPorts {
+		return nil
+	}
+	var ports []int
+	for count := c.preOpenPortsBatchSize; count > 0; count-- {
+		generated := false
+		for i := 0; i < 3; i++ {
+			r, err := rand.Int(rand.Reader, big.NewInt(end-start))
+			if err != nil {
+				return err
+			}
+			p := start + int(r.Int64())
+			if _, ok := c.blocklist[p]; !ok {
+				generated = true
+				ports = append(ports, p)
+				c.preOpenPorts = append(c.preOpenPorts, p)
+				c.blocklist[p] = struct{}{}
+				break
+			}
+		}
+		if !generated {
+			return fmt.Errorf("could not open new port")
+		}
+	}
+	return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+		if err := c.writeState(fs); err != nil {
+			return "", err
+		}
+		rel, err := c.readRelease(fs)
+		if err != nil {
+			return "", err
+		}
+		tcp, err := extractPorts(rel, "spec.values.controller.service.nodePorts.tcp")
+		if err != nil {
+			return "", err
+		}
+		udp, err := extractPorts(rel, "spec.values.controller.service.nodePorts.udp")
+		if err != nil {
+			return "", err
+		}
+		for _, p := range ports {
+			ps := strconv.Itoa(p)
+			tcp[ps] = p
+			udp[ps] = p
+		}
+		if err := c.writeRelease(fs, rel); err != nil {
+			return "", err
+		}
+		fmt.Printf("Pre opened new ports: %s\n", ports)
+		return "preopen new ports", nil
+	})
+}
+
+func (c *repoClient) AddPortForwarding(protocol string, port int, secret, dest string) error {
+	defer func() {
+		go func() {
+			if err := c.preOpenNewPorts(); err != nil {
+				panic(err)
+			}
+		}()
+	}()
+	c.l.Lock()
+	defer c.l.Unlock()
+	if sec, ok := c.reserve[port]; !ok || sec != secret {
+		return fmt.Errorf("wrong secret")
+	}
+	delete(c.reserve, port)
+	return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+		if err := c.writeState(fs); err != nil {
+			return "", err
+		}
+		rel, err := c.readRelease(fs)
+		if err != nil {
+			return "", err
+		}
+		portStr := strconv.Itoa(port)
+		switch protocol {
+		case "tcp":
+			tcp, err := extractPorts(rel, "spec.values.tcp")
+			if err != nil {
+				return "", err
+			}
+			tcp[portStr] = dest
+		case "udp":
+			udp, err := extractPorts(rel, "spec.values.udp")
+			if err != nil {
+				return "", err
+			}
+			udp[portStr] = dest
+		default:
+			panic("MUST NOT REACH")
+		}
+		if err := c.writeRelease(fs, rel); err != nil {
+			return "", err
+		}
+		return fmt.Sprintf("ingress: port %s map %d %s", protocol, port, dest), nil
+	})
+}
+
+func (c *repoClient) RemovePortForwarding(protocol string, port int) error {
+	c.l.Lock()
+	defer c.l.Unlock()
+	return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+		rel, err := c.readRelease(fs)
+		if err != nil {
+			return "", err
+		}
+		switch protocol {
+		case "tcp":
+			tcp, err := extractPorts(rel, "spec.values.tcp")
+			if err != nil {
+				return "", err
+			}
+			if err := removePort(tcp, port); err != nil {
+				return "", err
+			}
+		case "udp":
+			udp, err := extractPorts(rel, "spec.values.udp")
+			if err != nil {
+				return "", err
+			}
+			if err := removePort(udp, port); err != nil {
+				return "", err
+			}
+		default:
+			panic("MUST NOT REACH")
+		}
+		svcTCP, err := extractPorts(rel, "spec.values.controller.service.nodePorts.tcp")
+		if err != nil {
+			return "", err
+		}
+		svcUDP, err := extractPorts(rel, "spec.values.controller.service.nodePorts.udp")
+		if err != nil {
+			return "", err
+		}
+		if err := removePort(svcTCP, port); err != nil {
+			return "", err
+		}
+		if err := removePort(svcUDP, port); err != nil {
+			return "", err
+		}
+		if err := c.writeRelease(fs, rel); err != nil {
+			return "", err
+		}
+		return fmt.Sprintf("ingress: remove %s port map %d", protocol, port), nil
+	})
+}
+
+func (c *repoClient) writeState(fs soft.RepoFS) error {
+	w, err := fs.Writer(fmt.Sprintf("%s-state.json", c.path))
+	if err != nil {
 		return err
 	}
-	return c.repo.CommitAndPush(meta)
+	defer w.Close()
+	if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist}); err != nil {
+		return err
+	}
+	return err
+}
+
+func (c *repoClient) readRelease(fs soft.RepoFS) (map[string]any, error) {
+	ret := map[string]any{}
+	if err := soft.ReadYaml(fs, c.path, &ret); err != nil {
+		return nil, err
+	}
+	return ret, nil
+}
+
+func (c *repoClient) writeRelease(fs soft.RepoFS, rel map[string]any) error {
+	return soft.WriteYaml(fs, c.path, rel)
 }
 
 type server struct {
-	l       sync.Locker
-	s       *http.Server
-	r       *http.ServeMux
-	client  client
-	reserve map[int]string
+	s      *http.Server
+	r      *http.ServeMux
+	client client
 }
 
 func newServer(port int, client client) *server {
@@ -72,7 +299,7 @@
 		Addr:    fmt.Sprintf(":%d", port),
 		Handler: r,
 	}
-	return &server{&sync.Mutex{}, s, r, client, make(map[int]string)}
+	return &server{s, r, client}
 }
 
 func (s *server) Start() error {
@@ -133,57 +360,34 @@
 	Secret string `json:"secret"`
 }
 
-func extractPorts(rel map[string]any) (map[string]any, map[string]any, error) {
-	spec, ok := rel["spec"]
-	if !ok {
-		return nil, nil, fmt.Errorf("spec not found")
+func extractPorts(data map[string]any, path string) (map[string]any, error) {
+	for _, i := range strings.Split(path, ".") {
+		val, ok := data[i]
+		if !ok {
+			return nil, fmt.Errorf("%s not found", i)
+		}
+		valM, ok := val.(map[string]any)
+		if !ok {
+			return nil, fmt.Errorf("%s is not a map", i)
+		}
+		data = valM
 	}
-	specM, ok := spec.(map[string]any)
-	if !ok {
-		return nil, nil, fmt.Errorf("spec is not a map")
-	}
-	values, ok := specM["values"]
-	if !ok {
-		return nil, nil, fmt.Errorf("spec.values not found")
-	}
-	valuesM, ok := values.(map[string]any)
-	if !ok {
-		return nil, nil, fmt.Errorf("spec.values is not a map")
-	}
-	tcp, ok := valuesM["tcp"]
-	if !ok {
-		tcp = map[string]any{}
-		valuesM["tcp"] = tcp
-	}
-	udp, ok := valuesM["udp"]
-	if !ok {
-		udp = map[string]any{}
-		valuesM["udp"] = udp
-	}
-	tcpM, ok := tcp.(map[string]any)
-	if !ok {
-		return nil, nil, fmt.Errorf("spec.values.tcp is not a map")
-	}
-	udpM, ok := udp.(map[string]any)
-	if !ok {
-		return nil, nil, fmt.Errorf("spec.values.udp is not a map")
-	}
-	return tcpM, udpM, nil
+	return data, nil
 }
 
-func addPort(pm map[string]any, req allocateReq) error {
-	sourcePortStr := strconv.Itoa(req.SourcePort)
-	if _, ok := pm[sourcePortStr]; ok || req.SourcePort == 80 || req.SourcePort == 443 || req.SourcePort == 22 {
-		return fmt.Errorf("port %d is already taken", req.SourcePort)
+func addPort(pm map[string]any, sourcePort int, targetService string, targetPort int) error {
+	sourcePortStr := strconv.Itoa(sourcePort)
+	if _, ok := pm[sourcePortStr]; ok || sourcePort == 80 || sourcePort == 443 || sourcePort == 22 {
+		return fmt.Errorf("port %d is already taken", sourcePort)
 	}
-	pm[sourcePortStr] = fmt.Sprintf("%s:%d", req.TargetService, req.TargetPort)
+	pm[sourcePortStr] = fmt.Sprintf("%s:%d", targetService, targetPort)
 	return nil
 }
 
-func removePort(pm map[string]any, req removeReq) error {
-	sourcePortStr := strconv.Itoa(req.SourcePort)
+func removePort(pm map[string]any, port int) error {
+	sourcePortStr := strconv.Itoa(port)
 	if _, ok := pm[sourcePortStr]; !ok {
-		return fmt.Errorf("port %d is not open to remove", req.SourcePort)
+		return fmt.Errorf("port %d is not open to remove", port)
 	}
 	delete(pm, sourcePortStr)
 	return nil
@@ -192,21 +396,57 @@
 const start = 49152
 const end = 65535
 
-func reservePort(pm map[string]struct{}, reserve map[int]string) (int, error) {
-	for i := 0; i < 3; i++ {
-		r, err := rand.Int(rand.Reader, big.NewInt(end-start))
-		if err != nil {
-			return -1, err
-		}
-		p := start + int(r.Int64())
-		ps := strconv.Itoa(p)
-		if _, ok := pm[ps]; !ok {
-			if _, ok := reserve[p]; !ok {
-				return p, nil
-			}
+func updateNodePorts(rel map[string]any, protocol string, pm map[string]any) error {
+	spec, ok := rel["spec"]
+	if !ok {
+		return fmt.Errorf("spec not found")
+	}
+	specM, ok := spec.(map[string]any)
+	if !ok {
+		return fmt.Errorf("spec is not a map")
+	}
+	values, ok := specM["values"]
+	if !ok {
+		return fmt.Errorf("spec.values not found")
+	}
+	valuesM, ok := values.(map[string]any)
+	if !ok {
+		return fmt.Errorf("spec.values is not a map")
+	}
+	controller, ok := valuesM["controller"]
+	if !ok {
+		return fmt.Errorf("spec.values.controller not found")
+	}
+	controllerM, ok := controller.(map[string]any)
+	if !ok {
+		return fmt.Errorf("spec.values.controller is not a map")
+	}
+	service, ok := controllerM["service"]
+	if !ok {
+		return fmt.Errorf("spec.values.controller.service not found")
+	}
+	serviceM, ok := service.(map[string]any)
+	if !ok {
+		return fmt.Errorf("spec.values.controller.service is not a map")
+	}
+	nodePorts, ok := serviceM["nodePorts"]
+	if !ok {
+		return fmt.Errorf("spec.values.controller.service.nodePorts not found")
+	}
+	nodePortsM, ok := nodePorts.(map[string]any)
+	if !ok {
+		return fmt.Errorf("spec.values.controller.service.nodePorts is not a map")
+	}
+	npm := map[string]any{}
+	for p, _ := range pm {
+		if v, err := strconv.Atoi(p); err != nil {
+			return err
+		} else {
+			npm[p] = v
 		}
 	}
-	return -1, fmt.Errorf("could not generate random port")
+	nodePortsM[protocol] = npm
+	return nil
 }
 
 func (s *server) handleAllocate(w http.ResponseWriter, r *http.Request) {
@@ -216,45 +456,18 @@
 	}
 	req, err := extractAllocateReq(r.Body)
 	if err != nil {
+		fmt.Println(err.Error())
 		http.Error(w, err.Error(), http.StatusBadRequest)
 		return
 	}
-	s.l.Lock()
-	defer s.l.Unlock()
-	ingressRel, err := s.client.ReadRelease()
-	if err != nil {
+	if err := s.client.AddPortForwarding(
+		req.Protocol,
+		req.SourcePort,
+		req.Secret,
+		fmt.Sprintf("%s:%d", req.TargetService, req.TargetPort),
+	); err != nil {
+		fmt.Println(err.Error())
 		http.Error(w, err.Error(), http.StatusInternalServerError)
-		return
-	}
-	tcp, udp, err := extractPorts(ingressRel)
-	if err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
-		return
-	}
-	if val, ok := s.reserve[req.SourcePort]; !ok || val != req.Secret {
-		http.Error(w, "invalid secret", http.StatusBadRequest)
-		return
-	} else {
-		delete(s.reserve, req.SourcePort)
-	}
-	switch req.Protocol {
-	case "tcp":
-		if err := addPort(tcp, req); err != nil {
-			http.Error(w, err.Error(), http.StatusConflict)
-			return
-		}
-	case "udp":
-		if err := addPort(udp, req); err != nil {
-			http.Error(w, err.Error(), http.StatusConflict)
-			return
-		}
-	default:
-		panic("MUST NOT REACH")
-	}
-	commitMsg := fmt.Sprintf("ingress: port map %d %s", req.SourcePort, req.Protocol)
-	if err := s.client.WriteRelease(ingressRel, commitMsg); err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
-		return
 	}
 }
 
@@ -263,44 +476,20 @@
 		http.Error(w, "only post method is supported", http.StatusBadRequest)
 		return
 	}
-	s.l.Lock()
-	defer s.l.Unlock()
-	ingressRel, err := s.client.ReadRelease()
-	if err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
-		return
-	}
-	tcp, udp, err := extractPorts(ingressRel)
-	if err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
-		return
-	}
 	var port int
-	used := map[string]struct{}{}
-	for p, _ := range tcp {
-		used[p] = struct{}{}
-	}
-	for p, _ := range udp {
-		used[p] = struct{}{}
-	}
-	if port, err = reservePort(used, s.reserve); err != nil {
+	var secret string
+	var err error
+	if port, secret, err = s.client.ReservePort(); err != nil {
+		fmt.Println(err.Error())
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
 	}
-	secret, err := generateSecret()
-	if err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
-		return
-	}
-	s.reserve[port] = secret
 	go func() {
 		time.Sleep(30 * time.Minute)
-		s.l.Lock()
-		defer s.l.Unlock()
-		delete(s.reserve, port)
+		s.client.ReleaseReservedPort(port)
 	}()
-	resp := reserveResp{port, secret}
-	if err := json.NewEncoder(w).Encode(resp); err != nil {
+	if err := json.NewEncoder(w).Encode(reserveResp{port, secret}); err != nil {
+		fmt.Println(err.Error())
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
 	}
@@ -313,41 +502,15 @@
 	}
 	req, err := extractRemoveReq(r.Body)
 	if err != nil {
+		fmt.Println(err.Error())
 		http.Error(w, err.Error(), http.StatusBadRequest)
 		return
 	}
-	s.l.Lock()
-	defer s.l.Unlock()
-	ingressRel, err := s.client.ReadRelease()
-	if err != nil {
+	if err := s.client.RemovePortForwarding(req.Protocol, req.SourcePort); err != nil {
+		fmt.Println(err.Error())
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
 	}
-	tcp, udp, err := extractPorts(ingressRel)
-	if err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
-		return
-	}
-	switch req.Protocol {
-	case "tcp":
-		if err := removePort(tcp, req); err != nil {
-			http.Error(w, err.Error(), http.StatusConflict)
-			return
-		}
-	case "udp":
-		if err := removePort(udp, req); err != nil {
-			http.Error(w, err.Error(), http.StatusConflict)
-			return
-		}
-	default:
-		panic("MUST NOT REACH")
-	}
-	commitMsg := fmt.Sprintf("ingress: remove port map %d %s", req.SourcePort, req.Protocol)
-	if err := s.client.WriteRelease(ingressRel, commitMsg); err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
-		return
-	}
-	delete(s.reserve, req.SourcePort)
 }
 
 // TODO(gio): deduplicate
@@ -386,9 +549,15 @@
 	if err != nil {
 		log.Fatal(err)
 	}
-	s := newServer(
-		*port,
-		&repoClient{repo, *ingressNginxPath},
+	c, err := newRepoClient(
+		repo,
+		*ingressNginxPath,
+		*minPreOpenPorts,
+		*preOpenPortsBatchSize,
 	)
+	if err != nil {
+		log.Fatal(err)
+	}
+	s := newServer(*port, c)
 	log.Fatal(s.Start())
 }
diff --git a/core/port-allocator/main_test.go b/core/port-allocator/main_test.go
index c56f51c..caf2d51 100644
--- a/core/port-allocator/main_test.go
+++ b/core/port-allocator/main_test.go
@@ -171,17 +171,3 @@
 	}
 	t.Logf("Generated secret: %s", secret)
 }
-
-func TestReservePort(t *testing.T) {
-	pm := map[string]struct{}{
-		"10000": {},
-	}
-	reserve := make(map[int]string)
-	for i := start; i <= end; i++ {
-		reserve[i] = "reserved"
-	}
-	_, err := reservePort(pm, reserve)
-	if err != nil {
-		t.Fatalf("error: %v", err)
-	}
-}