PortAllocator: Persist reservations
Speed up improve random port generation.
Update unit tests and add them to CI/CD.
Change-Id: Ia77f0b4cbccfdce86e6c3cedc68afe6796ca8bf4
diff --git a/Jenkinsfile b/Jenkinsfile
index 375983c..b800f63 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -13,7 +13,7 @@
}
}
stages {
- stage('installer auth') {
+ stage('build/test') {
steps {
container('golang') {
dir('core/installer') {
@@ -26,6 +26,11 @@
sh 'go build *.go'
sh 'go test ./...'
}
+ dir('core/port-allocator') {
+ sh 'go mod tidy'
+ sh 'go build *.go'
+ sh 'go test ./...'
+ }
}
}
}
diff --git a/core/installer/soft/repoio_mem.go b/core/installer/soft/repoio_mem.go
new file mode 100644
index 0000000..f0ac1b6
--- /dev/null
+++ b/core/installer/soft/repoio_mem.go
@@ -0,0 +1,46 @@
+package soft
+
+import (
+ "sync"
+ "testing"
+)
+
+type mockRepoIO struct {
+ RepoFS
+ addr string
+ t *testing.T
+ l sync.Locker
+}
+
+func NewMockRepoIO(fs RepoFS, addr string, t *testing.T) RepoIO {
+ return &mockRepoIO{
+ RepoFS: fs,
+ addr: addr,
+ t: t,
+ l: &sync.Mutex{},
+ }
+}
+
+func (r mockRepoIO) FullAddress() string {
+ return r.addr
+}
+
+func (r mockRepoIO) Pull() error {
+ r.t.Logf("Pull: %s", r.addr)
+ return nil
+}
+
+func (r mockRepoIO) CommitAndPush(message string, opts ...PushOption) (string, error) {
+ r.t.Logf("Commit and push: %s", message)
+ return "", nil
+}
+
+func (r mockRepoIO) Do(op DoFn, _ ...DoOption) (string, error) {
+ r.l.Lock()
+ defer r.l.Unlock()
+ msg, err := op(r)
+ if err != nil {
+ return "", err
+ }
+ return r.CommitAndPush(msg)
+}
diff --git a/core/installer/welcome/env_test.go b/core/installer/welcome/env_test.go
index e7e75a8..f41110d 100644
--- a/core/installer/welcome/env_test.go
+++ b/core/installer/welcome/env_test.go
@@ -260,7 +260,7 @@
jc := fakeJobCreator{t}
hf := fakeHelmFetcher{t}
lg := installer.GitRepositoryLocalChartGenerator{"foo", "bar"}
- infraRepo := mockRepoIO{soft.NewBillyRepoFS(infraFS), "foo.bar", t, &sync.Mutex{}}
+ infraRepo := soft.NewMockRepoIO(soft.NewBillyRepoFS(infraFS), "foo.bar", t)
infraMgr, err := installer.NewInfraAppManager(infraRepo, nsCreator, hf, lg)
if err != nil {
t.Fatal(err)
diff --git a/core/port-allocator/main.go b/core/port-allocator/main.go
index fb5a151..6bc89e4 100644
--- a/core/port-allocator/main.go
+++ b/core/port-allocator/main.go
@@ -1,14 +1,15 @@
package main
import (
- "crypto/rand"
"encoding/base64"
"encoding/json"
+ "errors"
"flag"
"fmt"
"io"
+ "io/fs"
"log"
- "math/big"
+ "math/rand"
"net/http"
"os"
"strconv"
@@ -23,6 +24,8 @@
const (
secretLength = 20
+ start = 49152
+ end = 65535
)
var port = flag.Int("port", 8080, "Port to listen on")
@@ -34,7 +37,7 @@
type client interface {
ReservePort() (int, string, error)
- ReleaseReservedPort(port int)
+ ReleaseReservedPort(port ...int)
AddPortForwarding(protocol string, port int, secret, dest string) error
RemovePortForwarding(protocol string, port int) error
}
@@ -43,11 +46,13 @@
l sync.Locker
repo soft.RepoIO
path string
+ secretGenerator SecretGenerator
minPreOpenPorts int
preOpenPortsBatchSize int
preOpenPorts []int
blocklist map[int]struct{}
reserve map[int]string
+ availablePorts []int
}
func newRepoClient(
@@ -55,32 +60,46 @@
path string,
minPreOpenPorts int,
preOpenPortsBatchSize int,
+ secretGenerator SecretGenerator,
) (client, error) {
ret := &repoClient{
l: &sync.Mutex{},
repo: repo,
path: path,
+ secretGenerator: secretGenerator,
minPreOpenPorts: minPreOpenPorts,
preOpenPortsBatchSize: preOpenPortsBatchSize,
+ preOpenPorts: []int{},
+ blocklist: map[int]struct{}{},
+ reserve: map[int]string{},
+ availablePorts: []int{},
}
- r, err := repo.Reader(fmt.Sprintf("%s-state.json", path))
+ st, err := ret.readState(repo)
if err != nil {
- // TODO(gio): create empty file on init
- return nil, err
- }
- defer r.Close()
- var st state
- if err := json.NewDecoder(r).Decode(&st); err != nil {
- return nil, err
- }
- ret.preOpenPorts = st.PreOpenPorts
- ret.blocklist = st.Blocklist
- ret.reserve = map[int]string{}
- if len(ret.preOpenPorts) < minPreOpenPorts {
- if err := ret.preOpenNewPorts(); err != nil {
+ if !errors.Is(err, fs.ErrNotExist) {
return nil, err
}
+ } else {
+ ret.preOpenPorts = st.PreOpenPorts
+ ret.blocklist = st.Blocklist
+ ret.reserve = st.Reserve
}
+ for i := start; i < end; i++ {
+ if _, ok := ret.blocklist[i]; !ok {
+ ret.availablePorts = append(ret.availablePorts, i)
+ }
+ }
+ if err := ret.preOpenNewPorts(); err != nil {
+ return nil, err
+ }
+ var reservedPorts []int
+ for k := range ret.reserve {
+ reservedPorts = append(reservedPorts, k)
+ }
+ go func() {
+ time.Sleep(30 * time.Minute)
+ ret.ReleaseReservedPort(reservedPorts...)
+ }()
return ret, nil
}
@@ -92,7 +111,7 @@
}
port := c.preOpenPorts[0]
c.preOpenPorts = c.preOpenPorts[1:]
- secret, err := generateSecret()
+ secret, err := c.secretGenerator()
if err != nil {
return -1, "", err
}
@@ -100,16 +119,30 @@
return port, secret, nil
}
-func (c *repoClient) ReleaseReservedPort(port int) {
+func (c *repoClient) ReleaseReservedPort(port ...int) {
+ if len(port) == 0 {
+ return
+ }
c.l.Lock()
defer c.l.Unlock()
- delete(c.reserve, port)
- c.preOpenPorts = append(c.preOpenPorts, port)
+ if _, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
+ for _, p := range port {
+ delete(c.reserve, p)
+ c.preOpenPorts = append(c.preOpenPorts, p)
+ }
+ if err := c.writeState(fs); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("Released port reservations: %+v", port), nil
+ }); err != nil {
+ panic(err)
+ }
}
type state struct {
PreOpenPorts []int `json:"preOpenPorts"`
Blocklist map[int]struct{} `json:"blocklist"`
+ Reserve map[int]string `json:"reserve"`
}
func (c *repoClient) preOpenNewPorts() error {
@@ -120,26 +153,18 @@
}
var ports []int
for count := c.preOpenPortsBatchSize; count > 0; count-- {
- generated := false
- for i := 0; i < 3; i++ {
- r, err := rand.Int(rand.Reader, big.NewInt(end-start))
- if err != nil {
- return err
- }
- p := start + int(r.Int64())
- if _, ok := c.blocklist[p]; !ok {
- generated = true
- ports = append(ports, p)
- c.preOpenPorts = append(c.preOpenPorts, p)
- c.blocklist[p] = struct{}{}
- break
- }
- }
- if !generated {
+ if len(c.availablePorts) == 0 {
return fmt.Errorf("could not open new port")
}
+ r := rand.Intn(len(c.availablePorts))
+ p := c.availablePorts[r]
+ c.availablePorts[r] = c.availablePorts[len(c.availablePorts)-1]
+ c.availablePorts = c.availablePorts[:len(c.availablePorts)-1]
+ ports = append(ports, p)
+ c.preOpenPorts = append(c.preOpenPorts, p)
+ c.blocklist[p] = struct{}{}
}
- return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+ _, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
if err := c.writeState(fs); err != nil {
return "", err
}
@@ -160,11 +185,15 @@
if err != nil {
return "", err
}
+ fmt.Printf("%+v\n", tcp)
+ fmt.Printf("%+v\n", udp)
for _, p := range ports {
ps := strconv.Itoa(p)
tcp[ps] = p
udp[ps] = p
}
+ fmt.Printf("%+v\n", tcp)
+ fmt.Printf("%+v\n", udp)
if err := c.writeRelease(fs, rel); err != nil {
return "", err
}
@@ -172,15 +201,15 @@
fmt.Printf("Pre opened new ports: %+v\n", ports)
return "preopen new ports", nil
})
+ return err
}
func (c *repoClient) AddPortForwarding(protocol string, port int, secret, dest string) error {
+ protocol = strings.ToLower(protocol)
defer func() {
- go func() {
- if err := c.preOpenNewPorts(); err != nil {
- panic(err)
- }
- }()
+ if err := c.preOpenNewPorts(); err != nil {
+ panic(err)
+ }
}()
c.l.Lock()
defer c.l.Unlock()
@@ -188,7 +217,7 @@
return fmt.Errorf("wrong secret")
}
delete(c.reserve, port)
- return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+ _, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
if err := c.writeState(fs); err != nil {
return "", err
}
@@ -218,12 +247,14 @@
}
return fmt.Sprintf("ingress: port %s map %d %s", protocol, port, dest), nil
})
+ return err
}
func (c *repoClient) RemovePortForwarding(protocol string, port int) error {
+ protocol = strings.ToLower(protocol)
c.l.Lock()
defer c.l.Unlock()
- return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+ _, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
rel, err := c.readRelease(fs)
if err != nil {
return "", err
@@ -273,6 +304,20 @@
}
return fmt.Sprintf("ingress: remove %s port map %d", protocol, port), nil
})
+ return err
+}
+
+func (c *repoClient) readState(fs soft.RepoFS) (state, error) {
+ r, err := fs.Reader(fmt.Sprintf("%s-state.json", c.path))
+ if err != nil {
+ return state{}, err
+ }
+ defer r.Close()
+ var ret state
+ if err := json.NewDecoder(r).Decode(&ret); err != nil {
+ return state{}, err
+ }
+ return ret, err
}
func (c *repoClient) writeState(fs soft.RepoFS) error {
@@ -281,7 +326,7 @@
return err
}
defer w.Close()
- if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist}); err != nil {
+ if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist, c.reserve}); err != nil {
return err
}
return err
@@ -429,62 +474,6 @@
return nil
}
-const start = 49152
-const end = 65535
-
-func updateNodePorts(rel map[string]any, protocol string, pm map[string]any) error {
- spec, ok := rel["spec"]
- if !ok {
- return fmt.Errorf("spec not found")
- }
- specM, ok := spec.(map[string]any)
- if !ok {
- return fmt.Errorf("spec is not a map")
- }
- values, ok := specM["values"]
- if !ok {
- return fmt.Errorf("spec.values not found")
- }
- valuesM, ok := values.(map[string]any)
- if !ok {
- return fmt.Errorf("spec.values is not a map")
- }
- controller, ok := valuesM["controller"]
- if !ok {
- return fmt.Errorf("spec.values.controller not found")
- }
- controllerM, ok := controller.(map[string]any)
- if !ok {
- return fmt.Errorf("spec.values.controller is not a map")
- }
- service, ok := controllerM["service"]
- if !ok {
- return fmt.Errorf("spec.values.controller.service not found")
- }
- serviceM, ok := service.(map[string]any)
- if !ok {
- return fmt.Errorf("spec.values.controller.service is not a map")
- }
- nodePorts, ok := serviceM["nodePorts"]
- if !ok {
- return fmt.Errorf("spec.values.controller.service.nodePorts not found")
- }
- nodePortsM, ok := nodePorts.(map[string]any)
- if !ok {
- return fmt.Errorf("spec.values.controller.service.nodePorts is not a map")
- }
- npm := map[string]any{}
- for p, _ := range pm {
- if v, err := strconv.Atoi(p); err != nil {
- return err
- } else {
- npm[p] = v
- }
- }
- nodePortsM[protocol] = npm
- return nil
-}
-
func (s *server) handleAllocate(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "only post method is supported", http.StatusBadRequest)
@@ -570,6 +559,8 @@
return soft.NewRepoIO(repo, signer)
}
+type SecretGenerator func() (string, error)
+
func generateSecret() (string, error) {
b := make([]byte, secretLength)
_, err := rand.Read(b)
@@ -590,6 +581,7 @@
*ingressNginxPath,
*minPreOpenPorts,
*preOpenPortsBatchSize,
+ generateSecret,
)
if err != nil {
log.Fatal(err)
diff --git a/core/port-allocator/main_test.go b/core/port-allocator/main_test.go
index caf2d51..93ae574 100644
--- a/core/port-allocator/main_test.go
+++ b/core/port-allocator/main_test.go
@@ -1,166 +1,79 @@
package main
import (
- "bytes"
- "encoding/json"
- "io"
- "net/http"
- "os"
+ "fmt"
"reflect"
+ "strconv"
"testing"
+
+ "github.com/giolekva/pcloud/core/installer/soft"
+
+ "github.com/go-git/go-billy/v5/memfs"
)
-type fakeClient struct {
- contents map[string]any
-}
-
-func (c *fakeClient) ReadRelease() (map[string]any, error) {
- return c.contents, nil
-}
-
-func (c *fakeClient) WriteRelease(rel map[string]any, meta string) error {
- c.contents = rel
- return nil
+func fakeSecretGenerator(secret string) SecretGenerator {
+ return func() (string, error) {
+ return secret, nil
+ }
}
func TestAllocateSucceeds(t *testing.T) {
- c := &fakeClient{map[string]any{
- "spec": map[string]any{
- "values": map[string]any{},
- },
- }}
- s := newServer(8080, c) // TODO(gio): run using unix socket
- go func() {
- s.Start()
- }()
- defer s.Close()
- var buf bytes.Buffer
- req := allocateReq{
- Protocol: "TCP",
- SourcePort: 22,
- TargetService: "foo",
- TargetPort: 2222,
- }
- if err := json.NewEncoder(&buf).Encode(req); err != nil {
- t.Fatal(err)
- }
- resp, err := http.Post("http://localhost:8080/api/allocate", "application/json", &buf)
- if err != nil {
- t.Fatal(err)
- }
- if resp.StatusCode != http.StatusOK {
- io.Copy(os.Stdout, resp.Body)
- t.Fatalf("Expected %d, got %d", http.StatusOK, resp.StatusCode)
- }
- expected := map[string]any{
+ ingressPath := "/ingress.yaml"
+ fs := memfs.New()
+ repo := soft.NewMockRepoIO(soft.NewBillyRepoFS(fs), "foo.bar", t)
+ if err := soft.WriteYaml(repo, ingressPath, map[string]any{
"spec": map[string]any{
"values": map[string]any{
- "tcp": map[string]any{
- "22": "foo:2222",
+ "controller": map[string]any{
+ "service": map[string]any{
+ "type": "ClusterIP",
+ },
},
+ "tcp": map[string]any{},
"udp": map[string]any{},
},
},
+ }); err != nil {
+ t.Fatal(err)
}
- if !reflect.DeepEqual(expected, c.contents) {
- t.Fatalf("Expected %v, got %v", expected, c.contents)
+ c, err := newRepoClient(repo, ingressPath, 5, 10, fakeSecretGenerator("test"))
+ if err != nil {
+ t.Fatal(err)
}
-}
-
-func TestAllocateConflicts(t *testing.T) {
- c := &fakeClient{map[string]any{
+ tcp := map[string]any{}
+ udp := map[string]any{}
+ expected := map[string]any{
"spec": map[string]any{
"values": map[string]any{
- "tcp": map[string]any{
- "22": "foo:2222",
+ "controller": map[string]any{
+ "service": map[string]any{
+ "type": "ClusterIP",
+ },
},
+ "tcp": tcp,
+ "udp": udp,
},
},
- }}
- s := newServer(8080, c) // TODO(gio): run using unix socket
- go func() {
- s.Start()
- }()
- defer s.Close()
- var buf bytes.Buffer
- req := allocateReq{
- Protocol: "TCP",
- SourcePort: 22,
- TargetService: "foo",
- TargetPort: 2222,
}
- if err := json.NewEncoder(&buf).Encode(req); err != nil {
+ for i := 0; i < 500; i++ {
+ for _, protocol := range []string{"tcp", "udp"} {
+ port, secret, err := c.ReservePort()
+ if err != nil {
+ t.Fatal(err)
+ }
+ target := fmt.Sprintf("%s/bar:%d", protocol, port)
+ if err := c.AddPortForwarding("tcp", port, secret, target); err != nil {
+ t.Fatal(err)
+ }
+ tcp[strconv.Itoa(port)] = target
+ }
+ }
+ var actual map[string]any
+ if err := soft.ReadYaml(repo, ingressPath, &actual); err != nil {
t.Fatal(err)
}
- resp, err := http.Post("http://localhost:8080/api/allocate", "application/json", &buf)
- if err != nil {
- t.Fatal(err)
- }
- if resp.StatusCode != http.StatusConflict {
- io.Copy(os.Stdout, resp.Body)
- t.Fatalf("Expected %d, got %d", http.StatusConflict, resp.StatusCode)
- }
-}
-
-func TestAllocate80Taken(t *testing.T) {
- c := &fakeClient{map[string]any{
- "spec": map[string]any{
- "values": map[string]any{},
- },
- }}
- s := newServer(8080, c) // TODO(gio): run using unix socket
- go func() {
- s.Start()
- }()
- defer s.Close()
- var buf bytes.Buffer
- req := allocateReq{
- Protocol: "TCP",
- SourcePort: 80,
- TargetService: "foo",
- TargetPort: 2222,
- }
- if err := json.NewEncoder(&buf).Encode(req); err != nil {
- t.Fatal(err)
- }
- resp, err := http.Post("http://localhost:8080/api/allocate", "application/json", &buf)
- if err != nil {
- t.Fatal(err)
- }
- if resp.StatusCode != http.StatusConflict {
- io.Copy(os.Stdout, resp.Body)
- t.Fatalf("Expected %d, got %d", http.StatusConflict, resp.StatusCode)
- }
-}
-
-func TestAllocate443Taken(t *testing.T) {
- c := &fakeClient{map[string]any{
- "spec": map[string]any{
- "values": map[string]any{},
- },
- }}
- s := newServer(8080, c) // TODO(gio): run using unix socket
- go func() {
- s.Start()
- }()
- defer s.Close()
- var buf bytes.Buffer
- req := allocateReq{
- Protocol: "TCP",
- SourcePort: 443,
- TargetService: "foo",
- TargetPort: 2222,
- }
- if err := json.NewEncoder(&buf).Encode(req); err != nil {
- t.Fatal(err)
- }
- resp, err := http.Post("http://localhost:8080/api/allocate", "application/json", &buf)
- if err != nil {
- t.Fatal(err)
- }
- if resp.StatusCode != http.StatusConflict {
- io.Copy(os.Stdout, resp.Body)
- t.Fatalf("Expected %d, got %d", http.StatusConflict, resp.StatusCode)
+ if !reflect.DeepEqual(expected, actual) {
+ t.Fatalf("Expected %v, got %v", expected, actual)
}
}