IngressPublic: use Deployment with NodePort service
Spread replicas on different hosts.
Now PortAllocator allocates node ports as well.
Change-Id: Ia16cb24673fc6c61185f21ab30fde8964157aa4c
diff --git a/core/installer/values-tmpl/ingress-public.cue b/core/installer/values-tmpl/ingress-public.cue
index 619f15a..e675fc9 100644
--- a/core/installer/values-tmpl/ingress-public.cue
+++ b/core/installer/values-tmpl/ingress-public.cue
@@ -46,10 +46,41 @@
values: {
fullnameOverride: "\(global.pcloudEnvName)-ingress-public"
controller: {
- kind: "DaemonSet"
- hostNetwork: true
- hostPort: enabled: true
- service: enabled: false
+ kind: "Deployment"
+ replicaCount: 1 // TODO(gio): configurable
+ topologySpreadConstraints: [{
+ labelSelector: {
+ matchLabels: {
+ "app.kubernetes.io/instance": "ingress-public"
+ }
+ }
+ maxSkew: 1
+ topologyKey: "kubernetes.io/hostname"
+ whenUnsatisfiable: "DoNotSchedule"
+ }]
+ hostNetwork: false
+ hostPort: enabled: false
+ updateStrategy: {
+ type: "RollingUpdate"
+ rollingUpdate: {
+ maxSurge: "100%"
+ maxUnavailable: "30%"
+ }
+ }
+ service: {
+ enabled: true
+ type: "NodePort"
+ nodePorts: {
+ http: 80
+ https: 443
+ tcp: {
+ "53": 53
+ }
+ udp: {
+ "53": 53
+ }
+ }
+ }
ingressClassByName: true
ingressClassResource: {
name: networks.public.ingressClass
diff --git a/core/port-allocator/main.go b/core/port-allocator/main.go
index 6c7d361..8c14183 100644
--- a/core/port-allocator/main.go
+++ b/core/port-allocator/main.go
@@ -29,41 +29,268 @@
var repoAddr = flag.String("repo-addr", "", "Git repository address where Helm releases are stored")
var sshKey = flag.String("ssh-key", "", "Path to SHH key used to connect with Git repository")
var ingressNginxPath = flag.String("ingress-nginx-path", "", "Path to the ingress-nginx Helm release")
+var minPreOpenPorts = flag.Int("min-pre-open-ports", 5, "Minimum number of pre-open ports to keep in reserve")
+var preOpenPortsBatchSize = flag.Int("pre-open-ports-batch-size", 10, "Number of new ports to open at a time")
type client interface {
- ReadRelease() (map[string]any, error)
- WriteRelease(rel map[string]any, meta string) error
+ ReservePort() (int, string, error)
+ ReleaseReservedPort(port int)
+ AddPortForwarding(protocol string, port int, secret, dest string) error
+ RemovePortForwarding(protocol string, port int) error
}
type repoClient struct {
- repo soft.RepoIO
- path string
+ l sync.Locker
+ repo soft.RepoIO
+ path string
+ minPreOpenPorts int
+ preOpenPortsBatchSize int
+ preOpenPorts []int
+ blocklist map[int]struct{}
+ reserve map[int]string
}
-func (c *repoClient) ReadRelease() (map[string]any, error) {
- if err := c.repo.Pull(); err != nil {
+func newRepoClient(
+ repo soft.RepoIO,
+ path string,
+ minPreOpenPorts int,
+ preOpenPortsBatchSize int,
+) (client, error) {
+ ret := &repoClient{
+ l: &sync.Mutex{},
+ repo: repo,
+ path: path,
+ minPreOpenPorts: minPreOpenPorts,
+ preOpenPortsBatchSize: preOpenPortsBatchSize,
+ }
+ r, err := repo.Reader(fmt.Sprintf("%s-state.json", path))
+ if err != nil {
+ // TODO(gio): create empty file on init
return nil, err
}
- ingressRel := map[string]any{}
- if err := soft.ReadYaml(c.repo, c.path, &ingressRel); err != nil {
+ defer r.Close()
+ var st state
+ if err := json.NewDecoder(r).Decode(&st); err != nil {
return nil, err
}
- return ingressRel, nil
+ ret.preOpenPorts = st.PreOpenPorts
+ ret.blocklist = st.Blocklist
+ ret.reserve = map[int]string{}
+ if len(ret.preOpenPorts) < minPreOpenPorts {
+ if err := ret.preOpenNewPorts(); err != nil {
+ return nil, err
+ }
+ }
+ return ret, nil
}
-func (c *repoClient) WriteRelease(rel map[string]any, meta string) error {
- if err := soft.WriteYaml(c.repo, c.path, rel); err != nil {
+func (c *repoClient) ReservePort() (int, string, error) {
+ c.l.Lock()
+ defer c.l.Unlock()
+ if len(c.preOpenPorts) == 0 {
+ return -1, "", fmt.Errorf("no pre-open ports are available")
+ }
+ port := c.preOpenPorts[0]
+ c.preOpenPorts = c.preOpenPorts[1:]
+ secret, err := generateSecret()
+ if err != nil {
+ return -1, "", err
+ }
+ c.reserve[port] = secret
+ return port, secret, nil
+}
+
+func (c *repoClient) ReleaseReservedPort(port int) {
+ c.l.Lock()
+ defer c.l.Unlock()
+ delete(c.reserve, port)
+ c.preOpenPorts = append(c.preOpenPorts, port)
+}
+
+type state struct {
+ PreOpenPorts []int `json:"preOpenPorts"`
+ Blocklist map[int]struct{} `json:"blocklist"`
+}
+
+func (c *repoClient) preOpenNewPorts() error {
+ c.l.Lock()
+ defer c.l.Unlock()
+ if len(c.preOpenPorts) >= c.minPreOpenPorts {
+ return nil
+ }
+ var ports []int
+ for count := c.preOpenPortsBatchSize; count > 0; count-- {
+ generated := false
+ for i := 0; i < 3; i++ {
+ r, err := rand.Int(rand.Reader, big.NewInt(end-start))
+ if err != nil {
+ return err
+ }
+ p := start + int(r.Int64())
+ if _, ok := c.blocklist[p]; !ok {
+ generated = true
+ ports = append(ports, p)
+ c.preOpenPorts = append(c.preOpenPorts, p)
+ c.blocklist[p] = struct{}{}
+ break
+ }
+ }
+ if !generated {
+ return fmt.Errorf("could not open new port")
+ }
+ }
+ return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := c.writeState(fs); err != nil {
+ return "", err
+ }
+ rel, err := c.readRelease(fs)
+ if err != nil {
+ return "", err
+ }
+ tcp, err := extractPorts(rel, "spec.values.controller.service.nodePorts.tcp")
+ if err != nil {
+ return "", err
+ }
+ udp, err := extractPorts(rel, "spec.values.controller.service.nodePorts.udp")
+ if err != nil {
+ return "", err
+ }
+ for _, p := range ports {
+ ps := strconv.Itoa(p)
+ tcp[ps] = p
+ udp[ps] = p
+ }
+ if err := c.writeRelease(fs, rel); err != nil {
+ return "", err
+ }
+ fmt.Printf("Pre opened new ports: %s\n", ports)
+ return "preopen new ports", nil
+ })
+}
+
+func (c *repoClient) AddPortForwarding(protocol string, port int, secret, dest string) error {
+ defer func() {
+ go func() {
+ if err := c.preOpenNewPorts(); err != nil {
+ panic(err)
+ }
+ }()
+ }()
+ c.l.Lock()
+ defer c.l.Unlock()
+ if sec, ok := c.reserve[port]; !ok || sec != secret {
+ return fmt.Errorf("wrong secret")
+ }
+ delete(c.reserve, port)
+ return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := c.writeState(fs); err != nil {
+ return "", err
+ }
+ rel, err := c.readRelease(fs)
+ if err != nil {
+ return "", err
+ }
+ portStr := strconv.Itoa(port)
+ switch protocol {
+ case "tcp":
+ tcp, err := extractPorts(rel, "spec.values.tcp")
+ if err != nil {
+ return "", err
+ }
+ tcp[portStr] = dest
+ case "udp":
+ udp, err := extractPorts(rel, "spec.values.udp")
+ if err != nil {
+ return "", err
+ }
+ udp[portStr] = dest
+ default:
+ panic("MUST NOT REACH")
+ }
+ if err := c.writeRelease(fs, rel); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("ingress: port %s map %d %s", protocol, port, dest), nil
+ })
+}
+
+func (c *repoClient) RemovePortForwarding(protocol string, port int) error {
+ c.l.Lock()
+ defer c.l.Unlock()
+ return c.repo.Do(func(fs soft.RepoFS) (string, error) {
+ rel, err := c.readRelease(fs)
+ if err != nil {
+ return "", err
+ }
+ switch protocol {
+ case "tcp":
+ tcp, err := extractPorts(rel, "spec.values.tcp")
+ if err != nil {
+ return "", err
+ }
+ if err := removePort(tcp, port); err != nil {
+ return "", err
+ }
+ case "udp":
+ udp, err := extractPorts(rel, "spec.values.udp")
+ if err != nil {
+ return "", err
+ }
+ if err := removePort(udp, port); err != nil {
+ return "", err
+ }
+ default:
+ panic("MUST NOT REACH")
+ }
+ svcTCP, err := extractPorts(rel, "spec.values.controller.service.nodePorts.tcp")
+ if err != nil {
+ return "", err
+ }
+ svcUDP, err := extractPorts(rel, "spec.values.controller.service.nodePorts.udp")
+ if err != nil {
+ return "", err
+ }
+ if err := removePort(svcTCP, port); err != nil {
+ return "", err
+ }
+ if err := removePort(svcUDP, port); err != nil {
+ return "", err
+ }
+ if err := c.writeRelease(fs, rel); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("ingress: remove %s port map %d", protocol, port), nil
+ })
+}
+
+func (c *repoClient) writeState(fs soft.RepoFS) error {
+ w, err := fs.Writer(fmt.Sprintf("%s-state.json", c.path))
+ if err != nil {
return err
}
- return c.repo.CommitAndPush(meta)
+ defer w.Close()
+ if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist}); err != nil {
+ return err
+ }
+ return err
+}
+
+func (c *repoClient) readRelease(fs soft.RepoFS) (map[string]any, error) {
+ ret := map[string]any{}
+ if err := soft.ReadYaml(fs, c.path, &ret); err != nil {
+ return nil, err
+ }
+ return ret, nil
+}
+
+func (c *repoClient) writeRelease(fs soft.RepoFS, rel map[string]any) error {
+ return soft.WriteYaml(fs, c.path, rel)
}
type server struct {
- l sync.Locker
- s *http.Server
- r *http.ServeMux
- client client
- reserve map[int]string
+ s *http.Server
+ r *http.ServeMux
+ client client
}
func newServer(port int, client client) *server {
@@ -72,7 +299,7 @@
Addr: fmt.Sprintf(":%d", port),
Handler: r,
}
- return &server{&sync.Mutex{}, s, r, client, make(map[int]string)}
+ return &server{s, r, client}
}
func (s *server) Start() error {
@@ -133,57 +360,34 @@
Secret string `json:"secret"`
}
-func extractPorts(rel map[string]any) (map[string]any, map[string]any, error) {
- spec, ok := rel["spec"]
- if !ok {
- return nil, nil, fmt.Errorf("spec not found")
+func extractPorts(data map[string]any, path string) (map[string]any, error) {
+ for _, i := range strings.Split(path, ".") {
+ val, ok := data[i]
+ if !ok {
+ return nil, fmt.Errorf("%s not found", i)
+ }
+ valM, ok := val.(map[string]any)
+ if !ok {
+ return nil, fmt.Errorf("%s is not a map", i)
+ }
+ data = valM
}
- specM, ok := spec.(map[string]any)
- if !ok {
- return nil, nil, fmt.Errorf("spec is not a map")
- }
- values, ok := specM["values"]
- if !ok {
- return nil, nil, fmt.Errorf("spec.values not found")
- }
- valuesM, ok := values.(map[string]any)
- if !ok {
- return nil, nil, fmt.Errorf("spec.values is not a map")
- }
- tcp, ok := valuesM["tcp"]
- if !ok {
- tcp = map[string]any{}
- valuesM["tcp"] = tcp
- }
- udp, ok := valuesM["udp"]
- if !ok {
- udp = map[string]any{}
- valuesM["udp"] = udp
- }
- tcpM, ok := tcp.(map[string]any)
- if !ok {
- return nil, nil, fmt.Errorf("spec.values.tcp is not a map")
- }
- udpM, ok := udp.(map[string]any)
- if !ok {
- return nil, nil, fmt.Errorf("spec.values.udp is not a map")
- }
- return tcpM, udpM, nil
+ return data, nil
}
-func addPort(pm map[string]any, req allocateReq) error {
- sourcePortStr := strconv.Itoa(req.SourcePort)
- if _, ok := pm[sourcePortStr]; ok || req.SourcePort == 80 || req.SourcePort == 443 || req.SourcePort == 22 {
- return fmt.Errorf("port %d is already taken", req.SourcePort)
+func addPort(pm map[string]any, sourcePort int, targetService string, targetPort int) error {
+ sourcePortStr := strconv.Itoa(sourcePort)
+ if _, ok := pm[sourcePortStr]; ok || sourcePort == 80 || sourcePort == 443 || sourcePort == 22 {
+ return fmt.Errorf("port %d is already taken", sourcePort)
}
- pm[sourcePortStr] = fmt.Sprintf("%s:%d", req.TargetService, req.TargetPort)
+ pm[sourcePortStr] = fmt.Sprintf("%s:%d", targetService, targetPort)
return nil
}
-func removePort(pm map[string]any, req removeReq) error {
- sourcePortStr := strconv.Itoa(req.SourcePort)
+func removePort(pm map[string]any, port int) error {
+ sourcePortStr := strconv.Itoa(port)
if _, ok := pm[sourcePortStr]; !ok {
- return fmt.Errorf("port %d is not open to remove", req.SourcePort)
+ return fmt.Errorf("port %d is not open to remove", port)
}
delete(pm, sourcePortStr)
return nil
@@ -192,21 +396,57 @@
const start = 49152
const end = 65535
-func reservePort(pm map[string]struct{}, reserve map[int]string) (int, error) {
- for i := 0; i < 3; i++ {
- r, err := rand.Int(rand.Reader, big.NewInt(end-start))
- if err != nil {
- return -1, err
- }
- p := start + int(r.Int64())
- ps := strconv.Itoa(p)
- if _, ok := pm[ps]; !ok {
- if _, ok := reserve[p]; !ok {
- return p, nil
- }
+func updateNodePorts(rel map[string]any, protocol string, pm map[string]any) error {
+ spec, ok := rel["spec"]
+ if !ok {
+ return fmt.Errorf("spec not found")
+ }
+ specM, ok := spec.(map[string]any)
+ if !ok {
+ return fmt.Errorf("spec is not a map")
+ }
+ values, ok := specM["values"]
+ if !ok {
+ return fmt.Errorf("spec.values not found")
+ }
+ valuesM, ok := values.(map[string]any)
+ if !ok {
+ return fmt.Errorf("spec.values is not a map")
+ }
+ controller, ok := valuesM["controller"]
+ if !ok {
+ return fmt.Errorf("spec.values.controller not found")
+ }
+ controllerM, ok := controller.(map[string]any)
+ if !ok {
+ return fmt.Errorf("spec.values.controller is not a map")
+ }
+ service, ok := controllerM["service"]
+ if !ok {
+ return fmt.Errorf("spec.values.controller.service not found")
+ }
+ serviceM, ok := service.(map[string]any)
+ if !ok {
+ return fmt.Errorf("spec.values.controller.service is not a map")
+ }
+ nodePorts, ok := serviceM["nodePorts"]
+ if !ok {
+ return fmt.Errorf("spec.values.controller.service.nodePorts not found")
+ }
+ nodePortsM, ok := nodePorts.(map[string]any)
+ if !ok {
+ return fmt.Errorf("spec.values.controller.service.nodePorts is not a map")
+ }
+ npm := map[string]any{}
+ for p, _ := range pm {
+ if v, err := strconv.Atoi(p); err != nil {
+ return err
+ } else {
+ npm[p] = v
}
}
- return -1, fmt.Errorf("could not generate random port")
+ nodePortsM[protocol] = npm
+ return nil
}
func (s *server) handleAllocate(w http.ResponseWriter, r *http.Request) {
@@ -216,45 +456,18 @@
}
req, err := extractAllocateReq(r.Body)
if err != nil {
+ fmt.Println(err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
- s.l.Lock()
- defer s.l.Unlock()
- ingressRel, err := s.client.ReadRelease()
- if err != nil {
+ if err := s.client.AddPortForwarding(
+ req.Protocol,
+ req.SourcePort,
+ req.Secret,
+ fmt.Sprintf("%s:%d", req.TargetService, req.TargetPort),
+ ); err != nil {
+ fmt.Println(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- tcp, udp, err := extractPorts(ingressRel)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- if val, ok := s.reserve[req.SourcePort]; !ok || val != req.Secret {
- http.Error(w, "invalid secret", http.StatusBadRequest)
- return
- } else {
- delete(s.reserve, req.SourcePort)
- }
- switch req.Protocol {
- case "tcp":
- if err := addPort(tcp, req); err != nil {
- http.Error(w, err.Error(), http.StatusConflict)
- return
- }
- case "udp":
- if err := addPort(udp, req); err != nil {
- http.Error(w, err.Error(), http.StatusConflict)
- return
- }
- default:
- panic("MUST NOT REACH")
- }
- commitMsg := fmt.Sprintf("ingress: port map %d %s", req.SourcePort, req.Protocol)
- if err := s.client.WriteRelease(ingressRel, commitMsg); err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
}
}
@@ -263,44 +476,20 @@
http.Error(w, "only post method is supported", http.StatusBadRequest)
return
}
- s.l.Lock()
- defer s.l.Unlock()
- ingressRel, err := s.client.ReadRelease()
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- tcp, udp, err := extractPorts(ingressRel)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
var port int
- used := map[string]struct{}{}
- for p, _ := range tcp {
- used[p] = struct{}{}
- }
- for p, _ := range udp {
- used[p] = struct{}{}
- }
- if port, err = reservePort(used, s.reserve); err != nil {
+ var secret string
+ var err error
+ if port, secret, err = s.client.ReservePort(); err != nil {
+ fmt.Println(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- secret, err := generateSecret()
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- s.reserve[port] = secret
go func() {
time.Sleep(30 * time.Minute)
- s.l.Lock()
- defer s.l.Unlock()
- delete(s.reserve, port)
+ s.client.ReleaseReservedPort(port)
}()
- resp := reserveResp{port, secret}
- if err := json.NewEncoder(w).Encode(resp); err != nil {
+ if err := json.NewEncoder(w).Encode(reserveResp{port, secret}); err != nil {
+ fmt.Println(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -313,41 +502,15 @@
}
req, err := extractRemoveReq(r.Body)
if err != nil {
+ fmt.Println(err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
- s.l.Lock()
- defer s.l.Unlock()
- ingressRel, err := s.client.ReadRelease()
- if err != nil {
+ if err := s.client.RemovePortForwarding(req.Protocol, req.SourcePort); err != nil {
+ fmt.Println(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- tcp, udp, err := extractPorts(ingressRel)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- switch req.Protocol {
- case "tcp":
- if err := removePort(tcp, req); err != nil {
- http.Error(w, err.Error(), http.StatusConflict)
- return
- }
- case "udp":
- if err := removePort(udp, req); err != nil {
- http.Error(w, err.Error(), http.StatusConflict)
- return
- }
- default:
- panic("MUST NOT REACH")
- }
- commitMsg := fmt.Sprintf("ingress: remove port map %d %s", req.SourcePort, req.Protocol)
- if err := s.client.WriteRelease(ingressRel, commitMsg); err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- delete(s.reserve, req.SourcePort)
}
// TODO(gio): deduplicate
@@ -386,9 +549,15 @@
if err != nil {
log.Fatal(err)
}
- s := newServer(
- *port,
- &repoClient{repo, *ingressNginxPath},
+ c, err := newRepoClient(
+ repo,
+ *ingressNginxPath,
+ *minPreOpenPorts,
+ *preOpenPortsBatchSize,
)
+ if err != nil {
+ log.Fatal(err)
+ }
+ s := newServer(*port, c)
log.Fatal(s.Start())
}
diff --git a/core/port-allocator/main_test.go b/core/port-allocator/main_test.go
index c56f51c..caf2d51 100644
--- a/core/port-allocator/main_test.go
+++ b/core/port-allocator/main_test.go
@@ -171,17 +171,3 @@
}
t.Logf("Generated secret: %s", secret)
}
-
-func TestReservePort(t *testing.T) {
- pm := map[string]struct{}{
- "10000": {},
- }
- reserve := make(map[int]string)
- for i := start; i <= end; i++ {
- reserve[i] = "reserved"
- }
- _, err := reservePort(pm, reserve)
- if err != nil {
- t.Fatalf("error: %v", err)
- }
-}