AppManager: Support exposing cross-cluster ports
Change-Id: I4bdb3573209935f6777656ec2f3481e79d84a9c9
diff --git a/core/installer/cluster.go b/core/installer/cluster.go
index 19220af..ac7d970 100644
--- a/core/installer/cluster.go
+++ b/core/installer/cluster.go
@@ -16,21 +16,26 @@
"github.com/giolekva/pcloud/core/installer/soft"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/yaml"
)
type ClusterNetworkConfigurator interface {
AddCluster(name string, ingressIP net.IP) error
RemoveCluster(name string, ingressIP net.IP) error
- AddProxy(src, dst string) error
- RemoveProxy(src, dst string) error
+ AddProxy(src int, dst string, protocol Protocol) (string, error)
+ AddIngressProxy(src, dst string) error
+ RemoveProxy(src int, dst string, protocol Protocol) error
+ RemoveIngressProxy(src, dst string) error
}
type NginxProxyConfigurator struct {
PrivateSubdomain string
DNSAPIAddr string
Repo soft.RepoIO
- NginxConfigPath string
+ ConfigPath string
+ ServicePath string
}
type createARecordReq struct {
@@ -80,10 +85,47 @@
return nil
}
-func (c *NginxProxyConfigurator) AddProxy(src, dst string) error {
+func (c *NginxProxyConfigurator) AddProxy(src int, dst string, protocol Protocol) (string, error) {
+ var namespace string
_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := func() error {
+ r, err := fs.Reader(c.ServicePath)
+ if err != nil {
+ return err
+ }
+ defer r.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, r); err != nil {
+ return err
+ }
+ var svc corev1.Service
+ if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
+ return err
+ }
+ svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+ Name: fmt.Sprintf("p%d", src),
+ Protocol: corev1.Protocol(ProtocolToString(protocol)),
+ Port: int32(src),
+ TargetPort: intstr.FromInt(src),
+ })
+ w, err := fs.Writer(c.ServicePath)
+ if err != nil {
+ return err
+ }
+ defer w.Close()
+ tmp, err := yaml.Marshal(svc)
+ if err != nil {
+ return err
+ }
+ if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
+ return err
+ }
+ return nil
+ }(); err != nil {
+ return "", err
+ }
cfg, err := func() (NginxProxyConfig, error) {
- r, err := fs.Reader(c.NginxConfigPath)
+ r, err := fs.Reader(c.ConfigPath)
if err != nil {
return NginxProxyConfig{}, err
}
@@ -93,11 +135,19 @@
if err != nil {
return "", err
}
- if v, ok := cfg.Proxies[src]; ok && v != dst {
- return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
+ namespace = cfg.Namespace
+ var proxyMap map[int]string
+ switch protocol {
+ case ProtocolTCP:
+ proxyMap = cfg.TCP
+ case ProtocolUDP:
+ proxyMap = cfg.UDP
+ default:
+ return "", fmt.Errorf("invalid protocol: %v", protocol)
}
- cfg.Proxies[src] = dst
- w, err := fs.Writer(c.NginxConfigPath)
+ // TODO(gio): check for already existing mapping
+ proxyMap[src] = dst
+ w, err := fs.Writer(c.ConfigPath)
if err != nil {
return "", err
}
@@ -108,7 +158,7 @@
return "", err
}
hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
- nginxPath := filepath.Join(filepath.Dir(c.NginxConfigPath), "ingress-nginx.yaml")
+ nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
nginx, err := func() (map[string]any, error) {
r, err := fs.Reader(nginxPath)
if err != nil {
@@ -149,15 +199,18 @@
if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
return "", err
}
- return fmt.Sprintf("add proxy mapping: %s %s", src, dst), nil
+ return fmt.Sprintf("add proxy mapping: %d %s", src, dst), nil
})
- return err
+ if err != nil {
+ return "", err
+ }
+ return namespace, nil
}
-func (c *NginxProxyConfigurator) RemoveProxy(src, dst string) error {
+func (c *NginxProxyConfigurator) AddIngressProxy(src, dst string) error {
_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
cfg, err := func() (NginxProxyConfig, error) {
- r, err := fs.Reader(c.NginxConfigPath)
+ r, err := fs.Reader(c.ConfigPath)
if err != nil {
return NginxProxyConfig{}, err
}
@@ -167,11 +220,11 @@
if err != nil {
return "", err
}
- if v, ok := cfg.Proxies[src]; !ok || v != dst {
- return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
+ if v, ok := cfg.Ingress[src]; ok && v != dst {
+ return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
}
- delete(cfg.Proxies, src)
- w, err := fs.Writer(c.NginxConfigPath)
+ cfg.Ingress[src] = dst
+ w, err := fs.Writer(c.ConfigPath)
if err != nil {
return "", err
}
@@ -182,7 +235,198 @@
return "", err
}
hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
- nginxPath := filepath.Join(filepath.Dir(c.NginxConfigPath), "ingress-nginx.yaml")
+ nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
+ nginx, err := func() (map[string]any, error) {
+ r, err := fs.Reader(nginxPath)
+ if err != nil {
+ return nil, err
+ }
+ defer r.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, r); err != nil {
+ return nil, err
+ }
+ ret := map[string]any{}
+ if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
+ return nil, err
+ }
+ return ret, nil
+ }()
+ if err != nil {
+ return "", err
+ }
+ cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
+ var annotations map[string]any
+ if a, ok := cv["podAnnotations"]; ok {
+ annotations = a.(map[string]any)
+ } else {
+ annotations = map[string]any{}
+ cv["podAnnotations"] = annotations
+ }
+ annotations["dodo.cloud/hash"] = string(hash)
+ buf, err := yaml.Marshal(nginx)
+ if err != nil {
+ return "", err
+ }
+ w, err = fs.Writer(nginxPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("add ingress proxy mapping: %s %s", src, dst), nil
+ })
+ return err
+}
+
+func (c *NginxProxyConfigurator) RemoveProxy(src int, dst string, protocol Protocol) error {
+ _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := func() error {
+ r, err := fs.Reader(c.ServicePath)
+ if err != nil {
+ return err
+ }
+ defer r.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, r); err != nil {
+ return err
+ }
+ var svc corev1.Service
+ if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
+ return err
+ }
+ for i, p := range svc.Spec.Ports {
+ if p.Port == int32(src) {
+ svc.Spec.Ports = append(svc.Spec.Ports[:i], svc.Spec.Ports[i+1:]...)
+ break
+ }
+ }
+ w, err := fs.Writer(c.ServicePath)
+ if err != nil {
+ return err
+ }
+ defer w.Close()
+ tmp, err := yaml.Marshal(svc)
+ if err != nil {
+ return err
+ }
+ if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
+ return err
+ }
+ return nil
+ }(); err != nil {
+ return "", err
+ }
+ cfg, err := func() (NginxProxyConfig, error) {
+ r, err := fs.Reader(c.ConfigPath)
+ if err != nil {
+ return NginxProxyConfig{}, err
+ }
+ defer r.Close()
+ return ParseNginxProxyConfig(r)
+ }()
+ if err != nil {
+ return "", err
+ }
+ var proxyMap map[int]string
+ switch protocol {
+ case ProtocolTCP:
+ proxyMap = cfg.TCP
+ case ProtocolUDP:
+ proxyMap = cfg.UDP
+ default:
+ return "", fmt.Errorf("invalid protocol: %v", protocol)
+ }
+ // TODO(gio): check for already existing mapping
+ delete(proxyMap, src)
+ w, err := fs.Writer(c.ConfigPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ h := sha256.New()
+ o := io.MultiWriter(w, h)
+ if err := cfg.Render(o); err != nil {
+ return "", err
+ }
+ hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
+ nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
+ nginx, err := func() (map[string]any, error) {
+ r, err := fs.Reader(nginxPath)
+ if err != nil {
+ return nil, err
+ }
+ defer r.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, r); err != nil {
+ return nil, err
+ }
+ ret := map[string]any{}
+ if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
+ return nil, err
+ }
+ return ret, nil
+ }()
+ if err != nil {
+ return "", err
+ }
+ cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
+ var annotations map[string]any
+ if a, ok := cv["podAnnotations"]; ok {
+ annotations = a.(map[string]any)
+ } else {
+ annotations = map[string]any{}
+ cv["podAnnotations"] = annotations
+ }
+ annotations["dodo.cloud/hash"] = string(hash)
+ buf, err := yaml.Marshal(nginx)
+ if err != nil {
+ return "", err
+ }
+ w, err = fs.Writer(nginxPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("remove proxy mapping: %d %s", src, dst), nil
+ })
+ return err
+}
+
+func (c *NginxProxyConfigurator) RemoveIngressProxy(src, dst string) error {
+ _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+ cfg, err := func() (NginxProxyConfig, error) {
+ r, err := fs.Reader(c.ConfigPath)
+ if err != nil {
+ return NginxProxyConfig{}, err
+ }
+ defer r.Close()
+ return ParseNginxProxyConfig(r)
+ }()
+ if err != nil {
+ return "", err
+ }
+ if v, ok := cfg.Ingress[src]; !ok || v != dst {
+ return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
+ }
+ delete(cfg.Ingress, src)
+ w, err := fs.Writer(c.ConfigPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ h := sha256.New()
+ o := io.MultiWriter(w, h)
+ if err := cfg.Render(o); err != nil {
+ return "", err
+ }
+ hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
+ nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
nginx, err := func() (map[string]any, error) {
r, err := fs.Reader(nginxPath)
if err != nil {
@@ -228,11 +472,40 @@
return err
}
+type Protocol int
+
+const (
+ ProtocolTCP Protocol = iota
+ ProtocolUDP
+)
+
+func ProtocolToString(p Protocol) string {
+ if p == ProtocolTCP {
+ return "TCP"
+ } else {
+ return "UDP"
+ }
+}
+
type NginxProxyConfig struct {
- Port int
- Resolvers []net.IP
- Proxies map[string]string
- PreConf []string
+ Namespace string
+ IngressPort int
+ Resolvers []net.IP
+ Ingress map[string]string
+ TCP map[int]string
+ UDP map[int]string
+ PreConf []string
+}
+
+func parseProtocol(s string) (Protocol, error) {
+ switch strings.ToLower(s) {
+ case "tcp":
+ return ProtocolTCP, nil
+ case "udp":
+ return ProtocolUDP, nil
+ default:
+ return ProtocolUDP, fmt.Errorf("invalid protocol: %s", s)
+ }
}
func ParseNginxProxyConfig(r io.Reader) (NginxProxyConfig, error) {
@@ -241,13 +514,19 @@
return NginxProxyConfig{}, err
}
ret := NginxProxyConfig{
- Port: -1,
- Resolvers: nil,
- Proxies: make(map[string]string),
+ IngressPort: -1,
+ Resolvers: nil,
+ Ingress: make(map[string]string),
+ TCP: make(map[int]string),
+ UDP: make(map[int]string),
}
lines := strings.Split(buf.String(), "\n")
- insideConf := true
+ insidePreConf := true
+ insideHttp := false
insideMap := false
+ insideStream := false
+ streamPort := -1
+ streamPortProtocol := ProtocolTCP
for _, l := range lines {
items := strings.Fields(strings.TrimSuffix(l, ";"))
if len(items) == 0 {
@@ -255,19 +534,49 @@
}
if strings.Contains(l, "nginx.conf") {
ret.PreConf = append(ret.PreConf, l)
- insideConf = false
- } else if insideConf {
+ insidePreConf = false
+ } else if insidePreConf {
ret.PreConf = append(ret.PreConf, l)
+ items := strings.Fields(l)
+ if items[0] == "namespace:" {
+ ret.Namespace = items[1]
+ }
+ } else if items[0] == "http" {
+ insideHttp = true
+ } else if insideHttp && items[0] == "map" {
+ insideMap = true
+ } else if items[0] == "stream" {
+ insideHttp = false
+ insideMap = false
+ insideStream = true
} else if strings.Contains(l, "listen") {
if len(items) < 2 {
- return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s\n", l)
+ return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s", l)
}
port, err := strconv.Atoi(items[1])
if err != nil {
return NginxProxyConfig{}, err
}
- ret.Port = port
- } else if strings.Contains(l, "resolver") {
+ if insideHttp {
+ if len(items) > 2 {
+ return NginxProxyConfig{}, fmt.Errorf("invalid http listen: %s", l)
+ }
+ ret.IngressPort = port
+ } else {
+ if !insideStream {
+ return NginxProxyConfig{}, fmt.Errorf("invalid state, expected to be inside stream section")
+ }
+ streamPort = port
+ if len(items) == 3 {
+ streamPortProtocol, err = parseProtocol(items[2])
+ if err != nil {
+ return NginxProxyConfig{}, err
+ }
+ } else {
+ streamPortProtocol = ProtocolTCP
+ }
+ }
+ } else if insideHttp && strings.Contains(l, "resolver") {
if len(items) < 2 {
return NginxProxyConfig{}, fmt.Errorf("invalid resolver: %s", l)
}
@@ -276,7 +585,7 @@
return NginxProxyConfig{}, fmt.Errorf("invalid resolver ip: %s", l)
}
ret.Resolvers = append(ret.Resolvers, ip)
- } else if insideMap {
+ } else if insideHttp && insideMap {
if items[0] == "}" {
insideMap = false
continue
@@ -284,9 +593,19 @@
if len(items) < 2 {
return NginxProxyConfig{}, fmt.Errorf("invalid map: %s", l)
}
- ret.Proxies[items[0]] = items[1]
- } else if items[0] == "map" {
- insideMap = true
+ ret.Ingress[items[0]] = items[1]
+ } else if insideStream && strings.Contains(l, "proxy_pass") {
+ if streamPort == -1 {
+ return NginxProxyConfig{}, fmt.Errorf("invalid state, expected server port to be defined")
+ }
+ if len(items) < 2 {
+ return NginxProxyConfig{}, fmt.Errorf("invalid proxy_pass: %s", l)
+ }
+ if streamPortProtocol == ProtocolTCP {
+ ret.TCP[streamPort] = items[1]
+ } else {
+ ret.UDP[streamPort] = items[1]
+ }
}
}
return ret, nil
@@ -310,12 +629,12 @@
}
http {
map $http_host $backend {
- {{- range $from, $to := .Proxies }}
+ {{- range $from, $to := .Ingress }}
{{ $from }} {{ $to }};
{{- end }}
}
server {
- listen {{ .Port }};
+ listen {{ .IngressPort }};
location / {
{{- range .Resolvers }}
resolver {{ . }};
@@ -323,4 +642,22 @@
proxy_pass http://$backend;
}
}
- }`
+ }
+ {{- if or .TCP .UDP }}
+ stream {
+ {{- range $port, $upstream := .TCP }}
+ server {
+ listen {{ $port }};
+ resolver 100.100.100.100;
+ proxy_pass {{ $upstream }};
+ }
+ {{- end }}
+ {{- range $port, $upstream := .UDP }}
+ server {
+ listen {{ $port }} udp;
+ resolver 100.100.100.100;
+ proxy_pass {{ $upstream }};
+ }
+ {{- end }}
+ }
+ {{- end }}`