Installer: dynamically generate open port requests
App config can mark any of the input (int) fields as having a role.
For such fields installer first will make port reservation request to
Port Allocator, which will dynamically allocate and reserve one of the
available ports for the application. Once application is committed to
config repository, installer makes another request to port allocator
to actually open dynamically reserved port in the ingress service.
Added port reservation logic to Port Allocator. Reservation lasts 30
minutes.
Change-Id: Ic8caa0d04459b1a6e8a351e2ca6964ac15c7253d
diff --git a/core/port-allocator/go.mod b/core/port-allocator/go.mod
index c7c5103..4e1db75 100644
--- a/core/port-allocator/go.mod
+++ b/core/port-allocator/go.mod
@@ -2,7 +2,7 @@
replace github.com/giolekva/pcloud/core/installer => /Users/lekva/dev/src/pcloud/core/installer
-go 1.21.5
+go 1.22.0
require (
github.com/giolekva/pcloud/core/installer v0.0.0-20240403111418-e9c05499ec80
diff --git a/core/port-allocator/main.go b/core/port-allocator/main.go
index ac063da..861b1dd 100644
--- a/core/port-allocator/main.go
+++ b/core/port-allocator/main.go
@@ -1,15 +1,19 @@
package main
import (
+ "crypto/rand"
"encoding/json"
"flag"
"fmt"
"io"
"log"
+ "math/big"
"net/http"
"os"
"strconv"
"strings"
+ "sync"
+ "time"
"github.com/giolekva/pcloud/core/installer/soft"
@@ -50,9 +54,11 @@
}
type server struct {
- s *http.Server
- r *http.ServeMux
- client client
+ l sync.Locker
+ s *http.Server
+ r *http.ServeMux
+ client client
+ reserve map[int]string
}
func newServer(port int, client client) *server {
@@ -61,11 +67,12 @@
Addr: fmt.Sprintf(":%d", port),
Handler: r,
}
- return &server{s, r, client}
+ return &server{&sync.Mutex{}, s, r, client, make(map[int]string)}
}
func (s *server) Start() error {
s.r.HandleFunc("/api/allocate", s.handleAllocate)
+ s.r.HandleFunc("/api/reserve", s.handleReserve)
if err := s.s.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return err
}
@@ -81,6 +88,7 @@
SourcePort int `json:"sourcePort"`
TargetService string `json:"targetService"`
TargetPort int `json:"targetPort"`
+ Secret string `json:"secret,omitempty"`
}
func extractAllocateReq(r io.Reader) (allocateReq, error) {
@@ -95,6 +103,11 @@
return req, nil
}
+type reserveResp struct {
+ Port int `json:"port"`
+ Secret string `json:"secret"`
+}
+
func extractPorts(rel map[string]any) (map[string]any, map[string]any, error) {
spec, ok := rel["spec"]
if !ok {
@@ -142,6 +155,24 @@
return nil
}
+const start = 49152
+const end = 65535
+
+func reservePort(pm map[string]struct{}, reserve map[int]string) (int, error) {
+ for i := 0; i < 3; i++ {
+ r, err := rand.Int(rand.Reader, big.NewInt(end-start))
+ if err != nil {
+ return -1, err
+ }
+ p := start + int(r.Int64())
+ ps := strconv.Itoa(p)
+ if _, ok := pm[ps]; !ok {
+ return p, nil
+ }
+ }
+ return -1, fmt.Errorf("could not generate random port")
+}
+
func (s *server) handleAllocate(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "only post method is supported", http.StatusBadRequest)
@@ -152,31 +183,36 @@
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
- fmt.Printf("%+v\n", req)
+ if req.Secret != "" {
+ http.Error(w, "secret missing", http.StatusBadRequest)
+ return
+ }
+ s.l.Lock()
+ defer s.l.Unlock()
ingressRel, err := s.client.ReadRelease()
if err != nil {
- fmt.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- fmt.Printf("%+v\n", ingressRel)
tcp, udp, err := extractPorts(ingressRel)
if err != nil {
- fmt.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- fmt.Printf("%+v %+v\n", tcp, udp)
+ if val, ok := s.reserve[req.SourcePort]; !ok || val != req.Secret {
+ http.Error(w, "invalid secret", http.StatusBadRequest)
+ return
+ } else {
+ delete(s.reserve, req.SourcePort)
+ }
switch req.Protocol {
case "tcp":
if err := addPort(tcp, req); err != nil {
- fmt.Println(err)
http.Error(w, err.Error(), http.StatusConflict)
return
}
case "udp":
if err := addPort(udp, req); err != nil {
- fmt.Println(err)
http.Error(w, err.Error(), http.StatusConflict)
return
}
@@ -190,6 +226,50 @@
}
}
+func (s *server) handleReserve(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ http.Error(w, "only post method is supported", http.StatusBadRequest)
+ return
+ }
+ s.l.Lock()
+ defer s.l.Unlock()
+ ingressRel, err := s.client.ReadRelease()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ tcp, udp, err := extractPorts(ingressRel)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ var port int
+ used := map[string]struct{}{}
+ for p, _ := range tcp {
+ used[p] = struct{}{}
+ }
+ for p, _ := range udp {
+ used[p] = struct{}{}
+ }
+ if port, err = reservePort(used, s.reserve); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ secret := generateSecret()
+ s.reserve[port] = secret
+ go func() {
+ time.Sleep(30 * time.Minute)
+ s.l.Lock()
+ defer s.l.Unlock()
+ delete(s.reserve, port)
+ }()
+ resp := reserveResp{port, secret}
+ if err := json.NewEncoder(w).Encode(resp); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
// TODO(gio): deduplicate
func createRepoClient(addr string, keyPath string) (soft.RepoIO, error) {
sshKey, err := os.ReadFile(keyPath)
@@ -211,6 +291,11 @@
return soft.NewRepoIO(repo, signer)
}
+func generateSecret() string {
+ // TODO(gio): implement
+ return "foo"
+}
+
func main() {
flag.Parse()
repo, err := createRepoClient(*repoAddr, *sshKey)