AppManager: Support exposing cross-cluster ports
Change-Id: I4bdb3573209935f6777656ec2f3481e79d84a9c9
diff --git a/core/port-allocator/main.go b/core/port-allocator/main.go
index 6bc89e4..8b3ab80 100644
--- a/core/port-allocator/main.go
+++ b/core/port-allocator/main.go
@@ -1,6 +1,7 @@
package main
import (
+ "bytes"
"encoding/base64"
"encoding/json"
"errors"
@@ -12,11 +13,13 @@
"math/rand"
"net/http"
"os"
+ "path/filepath"
"strconv"
"strings"
"sync"
"time"
+ "github.com/giolekva/pcloud/core/installer"
"github.com/giolekva/pcloud/core/installer/soft"
"golang.org/x/crypto/ssh"
@@ -36,25 +39,42 @@
var preOpenPortsBatchSize = flag.Int("pre-open-ports-batch-size", 10, "Number of new ports to open at a time")
type client interface {
- ReservePort() (int, string, error)
+ ReservePort(remoteProxy bool) (int, string, error)
ReleaseReservedPort(port ...int)
AddPortForwarding(protocol string, port int, secret, dest string) error
RemovePortForwarding(protocol string, port int) error
}
+type Reservation struct {
+ Secret string `json:"secret"`
+ IsRemoteProxy bool `json:"isRemoteProxy"`
+}
+
type repoClient struct {
l sync.Locker
repo soft.RepoIO
path string
secretGenerator SecretGenerator
+ proxyCfg *installer.NginxProxyConfigurator
minPreOpenPorts int
preOpenPortsBatchSize int
preOpenPorts []int
+ proxyPreOpenPorts []int
blocklist map[int]struct{}
- reserve map[int]string
+ reserve map[int]Reservation
availablePorts []int
}
+func getProxyBackendConfigPath(repo soft.RepoIO, path string) (string, error) {
+ cfgPath := filepath.Join(filepath.Dir(path), "proxy-backend-config.yaml")
+ inp, err := repo.Reader(cfgPath)
+ if err != nil {
+ return "", nil
+ }
+ defer inp.Close()
+ return cfgPath, nil
+}
+
func newRepoClient(
repo soft.RepoIO,
path string,
@@ -62,16 +82,29 @@
preOpenPortsBatchSize int,
secretGenerator SecretGenerator,
) (client, error) {
+ proxyCfg, err := getProxyBackendConfigPath(repo, path)
+ if err != nil {
+ return nil, err
+ }
+ var cnc *installer.NginxProxyConfigurator
+ if proxyCfg != "" {
+ cnc = &installer.NginxProxyConfigurator{
+ Repo: repo,
+ ConfigPath: proxyCfg,
+ ServicePath: filepath.Join(filepath.Dir(proxyCfg), "proxy-backend-service.yaml"),
+ }
+ }
ret := &repoClient{
l: &sync.Mutex{},
repo: repo,
path: path,
secretGenerator: secretGenerator,
+ proxyCfg: cnc,
minPreOpenPorts: minPreOpenPorts,
preOpenPortsBatchSize: preOpenPortsBatchSize,
preOpenPorts: []int{},
blocklist: map[int]struct{}{},
- reserve: map[int]string{},
+ reserve: map[int]Reservation{},
availablePorts: []int{},
}
st, err := ret.readState(repo)
@@ -103,19 +136,31 @@
return ret, nil
}
-func (c *repoClient) ReservePort() (int, string, error) {
+func (c *repoClient) ReservePort(remoteProxy bool) (int, string, error) {
c.l.Lock()
defer c.l.Unlock()
- if len(c.preOpenPorts) == 0 {
- return -1, "", fmt.Errorf("no pre-open ports are available")
+ var port int
+ if !remoteProxy {
+ if len(c.preOpenPorts) == 0 {
+ return -1, "", fmt.Errorf("no pre-open ports are available")
+ }
+ port = c.preOpenPorts[0]
+ c.preOpenPorts = c.preOpenPorts[1:]
+ } else {
+ if c.proxyCfg == nil {
+ return -1, "", fmt.Errorf("does not support TCP/UDP proxy")
+ }
+ if len(c.proxyPreOpenPorts) == 0 {
+ return -1, "", fmt.Errorf("no proxy pre-open ports are available")
+ }
+ port = c.proxyPreOpenPorts[0]
+ c.proxyPreOpenPorts = c.proxyPreOpenPorts[1:]
}
- port := c.preOpenPorts[0]
- c.preOpenPorts = c.preOpenPorts[1:]
secret, err := c.secretGenerator()
if err != nil {
return -1, "", err
}
- c.reserve[port] = secret
+ c.reserve[port] = Reservation{secret, remoteProxy}
return port, secret, nil
}
@@ -127,8 +172,16 @@
defer c.l.Unlock()
if _, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
for _, p := range port {
+ r, ok := c.reserve[p]
+ if !ok {
+ continue
+ }
delete(c.reserve, p)
- c.preOpenPorts = append(c.preOpenPorts, p)
+ if r.IsRemoteProxy {
+ c.proxyPreOpenPorts = append(c.proxyPreOpenPorts, p)
+ } else {
+ c.preOpenPorts = append(c.preOpenPorts, p)
+ }
}
if err := c.writeState(fs); err != nil {
return "", err
@@ -139,30 +192,56 @@
}
}
+type oldState struct {
+ PreOpenPorts []int `json:"preOpenPorts"`
+ ProxyPreOpenPorts []int `json:"proxyPreOpenPorts"`
+ Blocklist map[int]struct{} `json:"blocklist"`
+ Reserve map[int]string `json:"reserve"`
+}
+
type state struct {
- PreOpenPorts []int `json:"preOpenPorts"`
- Blocklist map[int]struct{} `json:"blocklist"`
- Reserve map[int]string `json:"reserve"`
+ PreOpenPorts []int `json:"preOpenPorts"`
+ ProxyPreOpenPorts []int `json:"proxyPreOpenPorts"`
+ Blocklist map[int]struct{} `json:"blocklist"`
+ Reserve map[int]Reservation `json:"reserve"`
}
func (c *repoClient) preOpenNewPorts() error {
c.l.Lock()
defer c.l.Unlock()
- if len(c.preOpenPorts) >= c.minPreOpenPorts {
+ var ports []int
+ if len(c.preOpenPorts) < c.minPreOpenPorts {
+ for count := c.preOpenPortsBatchSize; count > 0; count-- {
+ if len(c.availablePorts) == 0 {
+ return fmt.Errorf("could not open new port")
+ }
+ r := rand.Intn(len(c.availablePorts))
+ p := c.availablePorts[r]
+ c.availablePorts[r] = c.availablePorts[len(c.availablePorts)-1]
+ c.availablePorts = c.availablePorts[:len(c.availablePorts)-1]
+ ports = append(ports, p)
+ c.preOpenPorts = append(c.preOpenPorts, p)
+ c.blocklist[p] = struct{}{}
+ }
return nil
}
- var ports []int
- for count := c.preOpenPortsBatchSize; count > 0; count-- {
- if len(c.availablePorts) == 0 {
- return fmt.Errorf("could not open new port")
+ if c.proxyCfg != nil && len(c.proxyPreOpenPorts) < c.minPreOpenPorts {
+ for count := c.preOpenPortsBatchSize; count > 0; count-- {
+ if len(c.availablePorts) == 0 {
+ return fmt.Errorf("could not open new port")
+ }
+ r := rand.Intn(len(c.availablePorts))
+ p := c.availablePorts[r]
+ c.availablePorts[r] = c.availablePorts[len(c.availablePorts)-1]
+ c.availablePorts = c.availablePorts[:len(c.availablePorts)-1]
+ ports = append(ports, p)
+ c.proxyPreOpenPorts = append(c.proxyPreOpenPorts, p)
+ c.blocklist[p] = struct{}{}
}
- r := rand.Intn(len(c.availablePorts))
- p := c.availablePorts[r]
- c.availablePorts[r] = c.availablePorts[len(c.availablePorts)-1]
- c.availablePorts = c.availablePorts[:len(c.availablePorts)-1]
- ports = append(ports, p)
- c.preOpenPorts = append(c.preOpenPorts, p)
- c.blocklist[p] = struct{}{}
+ return nil
+ }
+ if len(ports) == 0 {
+ return nil
}
_, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
if err := c.writeState(fs); err != nil {
@@ -172,10 +251,17 @@
if err != nil {
return "", err
}
- svcType, err := extractString(rel, "spec.values.controller.service.type")
+ svcType := ""
+ svcEnabled, err := extractBool(rel, "spec.values.controller.service.enabled")
if err != nil {
return "", err
}
+ if svcEnabled {
+ svcType, err = extractString(rel, "spec.values.controller.service.type")
+ if err != nil {
+ return "", err
+ }
+ }
if svcType == "NodePort" {
tcp, err := extractPorts(rel, "spec.values.controller.service.nodePorts.tcp")
if err != nil {
@@ -213,10 +299,31 @@
}()
c.l.Lock()
defer c.l.Unlock()
- if sec, ok := c.reserve[port]; !ok || sec != secret {
+ r, ok := c.reserve[port]
+ if !ok || r.Secret != secret {
return fmt.Errorf("wrong secret")
}
delete(c.reserve, port)
+ if r.IsRemoteProxy {
+ if c.proxyCfg == nil {
+ return fmt.Errorf("does not support TCP/UDP proxy")
+ }
+ var namespace string
+ var err error
+ switch strings.ToLower(protocol) {
+ case "tcp":
+ if namespace, err = c.proxyCfg.AddProxy(port, dest, installer.ProtocolTCP); err != nil {
+ return err
+ }
+ case "udp":
+ if namespace, err = c.proxyCfg.AddProxy(port, dest, installer.ProtocolUDP); err != nil {
+ return err
+ }
+ default:
+ return fmt.Errorf("unknown protocol: %s", protocol)
+ }
+ dest = fmt.Sprintf("%s/proxy-backend-service:%d", namespace, port)
+ }
_, err := c.repo.Do(func(fs soft.RepoFS) (string, error) {
if err := c.writeState(fs); err != nil {
return "", err
@@ -279,10 +386,17 @@
default:
panic("MUST NOT REACH")
}
- svcType, err := extractString(rel, "spec.values.controller.service.type")
+ svcType := ""
+ svcEnabled, err := extractBool(rel, "spec.values.controller.service.enabled")
if err != nil {
return "", err
}
+ if svcEnabled {
+ svcType, err = extractString(rel, "spec.values.controller.service.type")
+ if err != nil {
+ return "", err
+ }
+ }
if svcType == "NodePort" {
svcTCP, err := extractPorts(rel, "spec.values.controller.service.nodePorts.tcp")
if err != nil {
@@ -313,10 +427,27 @@
return state{}, err
}
defer r.Close()
- var ret state
- if err := json.NewDecoder(r).Decode(&ret); err != nil {
+ buf, err := io.ReadAll(r)
+ if err != nil {
return state{}, err
}
+ var ret state
+ if err := json.NewDecoder(bytes.NewReader(buf)).Decode(&ret); err == nil {
+ return ret, nil
+ }
+ var old oldState
+ if err := json.NewDecoder(bytes.NewReader(buf)).Decode(&old); err != nil {
+ return state{}, err
+ }
+ ret = state{
+ PreOpenPorts: old.PreOpenPorts,
+ ProxyPreOpenPorts: []int{},
+ Blocklist: old.Blocklist,
+ Reserve: map[int]Reservation{},
+ }
+ for port, secret := range old.Reserve {
+ ret.Reserve[port] = Reservation{secret, false}
+ }
return ret, err
}
@@ -326,7 +457,7 @@
return err
}
defer w.Close()
- if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.blocklist, c.reserve}); err != nil {
+ if err := json.NewEncoder(w).Encode(state{c.preOpenPorts, c.proxyPreOpenPorts, c.blocklist, c.reserve}); err != nil {
return err
}
return err
@@ -422,7 +553,7 @@
for _, i := range strings.Split(path, ".") {
valM, ok := val.(map[string]any)
if !ok {
- return nil, fmt.Errorf("expected map")
+ return nil, fmt.Errorf("expected map, %s", i)
}
val, ok = valM[i]
if !ok {
@@ -451,7 +582,19 @@
}
retS, ok := ret.(string)
if !ok {
- return "", fmt.Errorf("expected map")
+ return "", fmt.Errorf("expected string")
+ }
+ return retS, nil
+}
+
+func extractBool(data map[string]any, path string) (bool, error) {
+ ret, err := extractField(data, path)
+ if err != nil {
+ return false, err
+ }
+ retS, ok := ret.(bool)
+ if !ok {
+ return false, fmt.Errorf("expected boolean")
}
return retS, nil
}
@@ -496,15 +639,26 @@
}
}
+type reserveReq struct {
+ RemoteProxy bool `json:"remoteProxy"`
+}
+
func (s *server) handleReserve(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "only post method is supported", http.StatusBadRequest)
return
}
+ var req reserveReq
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ fmt.Println(err.Error())
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ fmt.Printf("%+v\n", req)
var port int
var secret string
var err error
- if port, secret, err = s.client.ReservePort(); err != nil {
+ if port, secret, err = s.client.ReservePort(req.RemoteProxy); err != nil {
fmt.Println(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return