AppManager: Support exposing cross-cluster ports
Change-Id: I4bdb3573209935f6777656ec2f3481e79d84a9c9
diff --git a/core/installer/app.go b/core/installer/app.go
index 948e17e..e57fee8 100644
--- a/core/installer/app.go
+++ b/core/installer/app.go
@@ -95,6 +95,7 @@
}
type PortForward struct {
+ Cluster string `json:"cluster,omitempty"`
Allocator string `json:"allocator"`
ReserveAddr string `json:"reservator"`
RemoveAddr string `json:"deallocator"`
diff --git a/core/installer/app_configs/app_base.cue b/core/installer/app_configs/app_base.cue
index 7698553..7330b1e 100644
--- a/core/installer/app_configs/app_base.cue
+++ b/core/installer/app_configs/app_base.cue
@@ -109,9 +109,9 @@
imageRegistry: string | *"docker.io"
}
-// TODO(gio): Support inter-cluster porf forwarding
#PortForward: {
network: #Network
+ cluster?: string | null
port: int
service: close({
name: string
@@ -507,15 +507,44 @@
_volumeClaimName: "\(name)-postgresql"
_name: name
- openPort: [for i, e in expose {
- network: networks[strings.ToLower(e.network)]
- port: input["port_postgresql_\(_name)_\(i)"]
- protocol: "TCP"
- service: {
- name: "postgres-\(_name)"
- port: 5432
+ openPort: list.FlattenN([for i, e in expose {
+ if cluster == _|_ {
+ network: networks[strings.ToLower(e.network)]
+ port: input["port_postgresql_\(_name)_\(i)"]
+ protocol: "TCP"
+ service: {
+ name: "postgres-\(_name)"
+ port: 5432
+ }
}
- }]
+ if cluster != _|_ {
+ [{
+ network: #Network & {
+ name: "cluster_\(cluster.name)"
+ ingressClass: "default"
+ domain: ""
+ allocatePortAddr: "http://port-allocator.\(global.id)-cluster-\(cluster.name)-network.svc.cluster.local/api/allocate"
+ reservePortAddr: "http://port-allocator.\(global.id)-cluster-\(cluster.name)-network.svc.cluster.local/api/reserve"
+ deallocatePortAddr: "http://port-allocator.\(global.id)-cluster-\(cluster.name)-network.svc.cluster.local/api/remmove"
+ }
+ port: input["port_postgresql_\(_name)_\(i)_cluster"]
+ protocol: "TCP"
+ service: {
+ name: "postgres-\(_name)"
+ port: 5432
+ }
+ }, {
+ cluster: _cluster.name
+ network: networks[strings.ToLower(e.network)]
+ port: input["port_postgresql_\(_name)_\(i)"]
+ protocol: "TCP"
+ service: {
+ name: "cluster-\(cluster).private-network-proxy.devices.\(global.privateDomain)"
+ port: input["port_postgresql_\(_name)_\(i)_cluster"]
+ }
+ }]
+ }
+ }], -1)
images: {
postgres: {
diff --git a/core/installer/app_configs/app_global_env.cue b/core/installer/app_configs/app_global_env.cue
index 3841f02..6a02df2 100644
--- a/core/installer/app_configs/app_global_env.cue
+++ b/core/installer/app_configs/app_global_env.cue
@@ -62,8 +62,8 @@
if _cluster != _|_ {
clusterProxy: {
"\(name)": {
- from: _domain
_sanitizedDomain: strings.Replace(_domain, ".", "-", -1)
+ from: _domain
to: "\(_sanitizedDomain).\(_cluster.name).cluster.\(global.privateDomain)"
}
}
diff --git a/core/installer/app_configs/dodo_app.cue b/core/installer/app_configs/dodo_app.cue
index bf5a7f8..6f7001a 100644
--- a/core/installer/app_configs/dodo_app.cue
+++ b/core/installer/app_configs/dodo_app.cue
@@ -24,6 +24,9 @@
for v in _postgresql {
for i, e in v.expose {
"port_postgresql_\(v.name)_\(i)": int @role(port)
+ if input.cluster != _|_ {
+ "port_postgresql_\(v.name)_\(i)_cluster": int @role(port)
+ }
}
}
for v in _mongodb {
diff --git a/core/installer/app_manager.go b/core/installer/app_manager.go
index c5b849f..3285c19 100644
--- a/core/installer/app_manager.go
+++ b/core/installer/app_manager.go
@@ -182,10 +182,19 @@
Secret string `json:"secret"`
}
-func reservePorts(ports map[string]string) (map[string]reservePortResp, error) {
+type reservePortInfo struct {
+ reserveAddr string
+ RemoteProxy bool `json:"remoteProxy"`
+}
+
+func reservePorts(ports map[string]reservePortInfo) (map[string]reservePortResp, error) {
ret := map[string]reservePortResp{}
- for p, reserveAddr := range ports {
- resp, err := http.Post(reserveAddr, "application/json", nil) // TODO(gio): address
+ for p, cfg := range ports {
+ var buf bytes.Buffer
+ if err := json.NewEncoder(&buf).Encode(cfg); err != nil {
+ return nil, err
+ }
+ resp, err := http.Post(cfg.reserveAddr, "application/json", &buf)
if err != nil {
return nil, err
}
@@ -205,11 +214,17 @@
func openPorts(ports []PortForward, reservations map[string]reservePortResp, allocators map[string]string, ns string) error {
for _, p := range ports {
+ var target string
+ if p.Cluster == "" {
+ target = fmt.Sprintf("%s/%s", ns, p.Service.Name)
+ } else {
+ target = p.Service.Name
+ }
var buf bytes.Buffer
req := allocatePortReq{
Protocol: p.Protocol,
SourcePort: p.Port,
- TargetService: fmt.Sprintf("%s/%s", ns, p.Service.Name),
+ TargetService: target,
TargetPort: p.Service.Port,
}
allocator := ""
@@ -475,10 +490,13 @@
if err != nil {
return ReleaseResources{}, err
}
- reservators := map[string]string{}
+ reservators := map[string]reservePortInfo{}
allocators := map[string]string{}
for _, pf := range rendered.Ports {
- reservators[portFields[pf.Port]] = pf.ReserveAddr
+ reservators[portFields[pf.Port]] = reservePortInfo{
+ reserveAddr: pf.ReserveAddr,
+ RemoteProxy: pf.Cluster != "",
+ }
allocators[portFields[pf.Port]] = pf.Allocator
}
portReservations, err := reservePorts(reservators)
@@ -530,7 +548,7 @@
return ReleaseResources{}, err
}
for _, p := range rendered.ClusterProxies {
- if err := m.cnc.AddProxy(p.From, p.To); err != nil {
+ if err := m.cnc.AddIngressProxy(p.From, p.To); err != nil {
return ReleaseResources{}, err
}
}
@@ -651,7 +669,7 @@
}
}
if !found {
- if err := m.cnc.RemoveProxy(ocp.From, ocp.To); err != nil {
+ if err := m.cnc.RemoveIngressProxy(ocp.From, ocp.To); err != nil {
return ReleaseResources{}, err
}
}
@@ -665,7 +683,7 @@
}
}
if !found {
- if err := m.cnc.AddProxy(ncp.From, ncp.To); err != nil {
+ if err := m.cnc.AddIngressProxy(ncp.From, ncp.To); err != nil {
return ReleaseResources{}, err
}
}
@@ -723,7 +741,7 @@
return err
}
for _, cp := range cfg.Out.ClusterProxy {
- if err := m.cnc.RemoveProxy(cp.From, cp.To); err != nil {
+ if err := m.cnc.RemoveIngressProxy(cp.From, cp.To); err != nil {
return err
}
}
diff --git a/core/installer/canvas-app.cue b/core/installer/canvas-app.cue
index a8be305..6843277 100644
--- a/core/installer/canvas-app.cue
+++ b/core/installer/canvas-app.cue
@@ -24,14 +24,14 @@
"enabled": false
}
}],
- "mongodb": [{
+ "postgresql": [{
"name": "pgg",
"size": "2Gi",
"expose": [{
"network": "private",
- "subdomain": "mongoo"
+ "subdomain": "pggp"
}]
}],
- "cluster": "ct"
+ "cluster": "asdc"
}
}
diff --git a/core/installer/cluster.go b/core/installer/cluster.go
index 19220af..ac7d970 100644
--- a/core/installer/cluster.go
+++ b/core/installer/cluster.go
@@ -16,21 +16,26 @@
"github.com/giolekva/pcloud/core/installer/soft"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/yaml"
)
type ClusterNetworkConfigurator interface {
AddCluster(name string, ingressIP net.IP) error
RemoveCluster(name string, ingressIP net.IP) error
- AddProxy(src, dst string) error
- RemoveProxy(src, dst string) error
+ AddProxy(src int, dst string, protocol Protocol) (string, error)
+ AddIngressProxy(src, dst string) error
+ RemoveProxy(src int, dst string, protocol Protocol) error
+ RemoveIngressProxy(src, dst string) error
}
type NginxProxyConfigurator struct {
PrivateSubdomain string
DNSAPIAddr string
Repo soft.RepoIO
- NginxConfigPath string
+ ConfigPath string
+ ServicePath string
}
type createARecordReq struct {
@@ -80,10 +85,47 @@
return nil
}
-func (c *NginxProxyConfigurator) AddProxy(src, dst string) error {
+func (c *NginxProxyConfigurator) AddProxy(src int, dst string, protocol Protocol) (string, error) {
+ var namespace string
_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := func() error {
+ r, err := fs.Reader(c.ServicePath)
+ if err != nil {
+ return err
+ }
+ defer r.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, r); err != nil {
+ return err
+ }
+ var svc corev1.Service
+ if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
+ return err
+ }
+ svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
+ Name: fmt.Sprintf("p%d", src),
+ Protocol: corev1.Protocol(ProtocolToString(protocol)),
+ Port: int32(src),
+ TargetPort: intstr.FromInt(src),
+ })
+ w, err := fs.Writer(c.ServicePath)
+ if err != nil {
+ return err
+ }
+ defer w.Close()
+ tmp, err := yaml.Marshal(svc)
+ if err != nil {
+ return err
+ }
+ if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
+ return err
+ }
+ return nil
+ }(); err != nil {
+ return "", err
+ }
cfg, err := func() (NginxProxyConfig, error) {
- r, err := fs.Reader(c.NginxConfigPath)
+ r, err := fs.Reader(c.ConfigPath)
if err != nil {
return NginxProxyConfig{}, err
}
@@ -93,11 +135,19 @@
if err != nil {
return "", err
}
- if v, ok := cfg.Proxies[src]; ok && v != dst {
- return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
+ namespace = cfg.Namespace
+ var proxyMap map[int]string
+ switch protocol {
+ case ProtocolTCP:
+ proxyMap = cfg.TCP
+ case ProtocolUDP:
+ proxyMap = cfg.UDP
+ default:
+ return "", fmt.Errorf("invalid protocol: %v", protocol)
}
- cfg.Proxies[src] = dst
- w, err := fs.Writer(c.NginxConfigPath)
+ // TODO(gio): check for already existing mapping
+ proxyMap[src] = dst
+ w, err := fs.Writer(c.ConfigPath)
if err != nil {
return "", err
}
@@ -108,7 +158,7 @@
return "", err
}
hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
- nginxPath := filepath.Join(filepath.Dir(c.NginxConfigPath), "ingress-nginx.yaml")
+ nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
nginx, err := func() (map[string]any, error) {
r, err := fs.Reader(nginxPath)
if err != nil {
@@ -149,15 +199,18 @@
if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
return "", err
}
- return fmt.Sprintf("add proxy mapping: %s %s", src, dst), nil
+ return fmt.Sprintf("add proxy mapping: %d %s", src, dst), nil
})
- return err
+ if err != nil {
+ return "", err
+ }
+ return namespace, nil
}
-func (c *NginxProxyConfigurator) RemoveProxy(src, dst string) error {
+func (c *NginxProxyConfigurator) AddIngressProxy(src, dst string) error {
_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
cfg, err := func() (NginxProxyConfig, error) {
- r, err := fs.Reader(c.NginxConfigPath)
+ r, err := fs.Reader(c.ConfigPath)
if err != nil {
return NginxProxyConfig{}, err
}
@@ -167,11 +220,11 @@
if err != nil {
return "", err
}
- if v, ok := cfg.Proxies[src]; !ok || v != dst {
- return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
+ if v, ok := cfg.Ingress[src]; ok && v != dst {
+ return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
}
- delete(cfg.Proxies, src)
- w, err := fs.Writer(c.NginxConfigPath)
+ cfg.Ingress[src] = dst
+ w, err := fs.Writer(c.ConfigPath)
if err != nil {
return "", err
}
@@ -182,7 +235,198 @@
return "", err
}
hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
- nginxPath := filepath.Join(filepath.Dir(c.NginxConfigPath), "ingress-nginx.yaml")
+ nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
+ nginx, err := func() (map[string]any, error) {
+ r, err := fs.Reader(nginxPath)
+ if err != nil {
+ return nil, err
+ }
+ defer r.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, r); err != nil {
+ return nil, err
+ }
+ ret := map[string]any{}
+ if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
+ return nil, err
+ }
+ return ret, nil
+ }()
+ if err != nil {
+ return "", err
+ }
+ cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
+ var annotations map[string]any
+ if a, ok := cv["podAnnotations"]; ok {
+ annotations = a.(map[string]any)
+ } else {
+ annotations = map[string]any{}
+ cv["podAnnotations"] = annotations
+ }
+ annotations["dodo.cloud/hash"] = string(hash)
+ buf, err := yaml.Marshal(nginx)
+ if err != nil {
+ return "", err
+ }
+ w, err = fs.Writer(nginxPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("add ingress proxy mapping: %s %s", src, dst), nil
+ })
+ return err
+}
+
+func (c *NginxProxyConfigurator) RemoveProxy(src int, dst string, protocol Protocol) error {
+ _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+ if err := func() error {
+ r, err := fs.Reader(c.ServicePath)
+ if err != nil {
+ return err
+ }
+ defer r.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, r); err != nil {
+ return err
+ }
+ var svc corev1.Service
+ if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
+ return err
+ }
+ for i, p := range svc.Spec.Ports {
+ if p.Port == int32(src) {
+ svc.Spec.Ports = append(svc.Spec.Ports[:i], svc.Spec.Ports[i+1:]...)
+ break
+ }
+ }
+ w, err := fs.Writer(c.ServicePath)
+ if err != nil {
+ return err
+ }
+ defer w.Close()
+ tmp, err := yaml.Marshal(svc)
+ if err != nil {
+ return err
+ }
+ if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
+ return err
+ }
+ return nil
+ }(); err != nil {
+ return "", err
+ }
+ cfg, err := func() (NginxProxyConfig, error) {
+ r, err := fs.Reader(c.ConfigPath)
+ if err != nil {
+ return NginxProxyConfig{}, err
+ }
+ defer r.Close()
+ return ParseNginxProxyConfig(r)
+ }()
+ if err != nil {
+ return "", err
+ }
+ var proxyMap map[int]string
+ switch protocol {
+ case ProtocolTCP:
+ proxyMap = cfg.TCP
+ case ProtocolUDP:
+ proxyMap = cfg.UDP
+ default:
+ return "", fmt.Errorf("invalid protocol: %v", protocol)
+ }
+ // TODO(gio): check for already existing mapping
+ delete(proxyMap, src)
+ w, err := fs.Writer(c.ConfigPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ h := sha256.New()
+ o := io.MultiWriter(w, h)
+ if err := cfg.Render(o); err != nil {
+ return "", err
+ }
+ hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
+ nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
+ nginx, err := func() (map[string]any, error) {
+ r, err := fs.Reader(nginxPath)
+ if err != nil {
+ return nil, err
+ }
+ defer r.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, r); err != nil {
+ return nil, err
+ }
+ ret := map[string]any{}
+ if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
+ return nil, err
+ }
+ return ret, nil
+ }()
+ if err != nil {
+ return "", err
+ }
+ cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
+ var annotations map[string]any
+ if a, ok := cv["podAnnotations"]; ok {
+ annotations = a.(map[string]any)
+ } else {
+ annotations = map[string]any{}
+ cv["podAnnotations"] = annotations
+ }
+ annotations["dodo.cloud/hash"] = string(hash)
+ buf, err := yaml.Marshal(nginx)
+ if err != nil {
+ return "", err
+ }
+ w, err = fs.Writer(nginxPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
+ return "", err
+ }
+ return fmt.Sprintf("remove proxy mapping: %d %s", src, dst), nil
+ })
+ return err
+}
+
+func (c *NginxProxyConfigurator) RemoveIngressProxy(src, dst string) error {
+ _, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
+ cfg, err := func() (NginxProxyConfig, error) {
+ r, err := fs.Reader(c.ConfigPath)
+ if err != nil {
+ return NginxProxyConfig{}, err
+ }
+ defer r.Close()
+ return ParseNginxProxyConfig(r)
+ }()
+ if err != nil {
+ return "", err
+ }
+ if v, ok := cfg.Ingress[src]; !ok || v != dst {
+ return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
+ }
+ delete(cfg.Ingress, src)
+ w, err := fs.Writer(c.ConfigPath)
+ if err != nil {
+ return "", err
+ }
+ defer w.Close()
+ h := sha256.New()
+ o := io.MultiWriter(w, h)
+ if err := cfg.Render(o); err != nil {
+ return "", err
+ }
+ hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
+ nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
nginx, err := func() (map[string]any, error) {
r, err := fs.Reader(nginxPath)
if err != nil {
@@ -228,11 +472,40 @@
return err
}
+type Protocol int
+
+const (
+ ProtocolTCP Protocol = iota
+ ProtocolUDP
+)
+
+func ProtocolToString(p Protocol) string {
+ if p == ProtocolTCP {
+ return "TCP"
+ } else {
+ return "UDP"
+ }
+}
+
type NginxProxyConfig struct {
- Port int
- Resolvers []net.IP
- Proxies map[string]string
- PreConf []string
+ Namespace string
+ IngressPort int
+ Resolvers []net.IP
+ Ingress map[string]string
+ TCP map[int]string
+ UDP map[int]string
+ PreConf []string
+}
+
+func parseProtocol(s string) (Protocol, error) {
+ switch strings.ToLower(s) {
+ case "tcp":
+ return ProtocolTCP, nil
+ case "udp":
+ return ProtocolUDP, nil
+ default:
+ return ProtocolUDP, fmt.Errorf("invalid protocol: %s", s)
+ }
}
func ParseNginxProxyConfig(r io.Reader) (NginxProxyConfig, error) {
@@ -241,13 +514,19 @@
return NginxProxyConfig{}, err
}
ret := NginxProxyConfig{
- Port: -1,
- Resolvers: nil,
- Proxies: make(map[string]string),
+ IngressPort: -1,
+ Resolvers: nil,
+ Ingress: make(map[string]string),
+ TCP: make(map[int]string),
+ UDP: make(map[int]string),
}
lines := strings.Split(buf.String(), "\n")
- insideConf := true
+ insidePreConf := true
+ insideHttp := false
insideMap := false
+ insideStream := false
+ streamPort := -1
+ streamPortProtocol := ProtocolTCP
for _, l := range lines {
items := strings.Fields(strings.TrimSuffix(l, ";"))
if len(items) == 0 {
@@ -255,19 +534,49 @@
}
if strings.Contains(l, "nginx.conf") {
ret.PreConf = append(ret.PreConf, l)
- insideConf = false
- } else if insideConf {
+ insidePreConf = false
+ } else if insidePreConf {
ret.PreConf = append(ret.PreConf, l)
+ items := strings.Fields(l)
+ if items[0] == "namespace:" {
+ ret.Namespace = items[1]
+ }
+ } else if items[0] == "http" {
+ insideHttp = true
+ } else if insideHttp && items[0] == "map" {
+ insideMap = true
+ } else if items[0] == "stream" {
+ insideHttp = false
+ insideMap = false
+ insideStream = true
} else if strings.Contains(l, "listen") {
if len(items) < 2 {
- return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s\n", l)
+ return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s", l)
}
port, err := strconv.Atoi(items[1])
if err != nil {
return NginxProxyConfig{}, err
}
- ret.Port = port
- } else if strings.Contains(l, "resolver") {
+ if insideHttp {
+ if len(items) > 2 {
+ return NginxProxyConfig{}, fmt.Errorf("invalid http listen: %s", l)
+ }
+ ret.IngressPort = port
+ } else {
+ if !insideStream {
+ return NginxProxyConfig{}, fmt.Errorf("invalid state, expected to be inside stream section")
+ }
+ streamPort = port
+ if len(items) == 3 {
+ streamPortProtocol, err = parseProtocol(items[2])
+ if err != nil {
+ return NginxProxyConfig{}, err
+ }
+ } else {
+ streamPortProtocol = ProtocolTCP
+ }
+ }
+ } else if insideHttp && strings.Contains(l, "resolver") {
if len(items) < 2 {
return NginxProxyConfig{}, fmt.Errorf("invalid resolver: %s", l)
}
@@ -276,7 +585,7 @@
return NginxProxyConfig{}, fmt.Errorf("invalid resolver ip: %s", l)
}
ret.Resolvers = append(ret.Resolvers, ip)
- } else if insideMap {
+ } else if insideHttp && insideMap {
if items[0] == "}" {
insideMap = false
continue
@@ -284,9 +593,19 @@
if len(items) < 2 {
return NginxProxyConfig{}, fmt.Errorf("invalid map: %s", l)
}
- ret.Proxies[items[0]] = items[1]
- } else if items[0] == "map" {
- insideMap = true
+ ret.Ingress[items[0]] = items[1]
+ } else if insideStream && strings.Contains(l, "proxy_pass") {
+ if streamPort == -1 {
+ return NginxProxyConfig{}, fmt.Errorf("invalid state, expected server port to be defined")
+ }
+ if len(items) < 2 {
+ return NginxProxyConfig{}, fmt.Errorf("invalid proxy_pass: %s", l)
+ }
+ if streamPortProtocol == ProtocolTCP {
+ ret.TCP[streamPort] = items[1]
+ } else {
+ ret.UDP[streamPort] = items[1]
+ }
}
}
return ret, nil
@@ -310,12 +629,12 @@
}
http {
map $http_host $backend {
- {{- range $from, $to := .Proxies }}
+ {{- range $from, $to := .Ingress }}
{{ $from }} {{ $to }};
{{- end }}
}
server {
- listen {{ .Port }};
+ listen {{ .IngressPort }};
location / {
{{- range .Resolvers }}
resolver {{ . }};
@@ -323,4 +642,22 @@
proxy_pass http://$backend;
}
}
- }`
+ }
+ {{- if or .TCP .UDP }}
+ stream {
+ {{- range $port, $upstream := .TCP }}
+ server {
+ listen {{ $port }};
+ resolver 100.100.100.100;
+ proxy_pass {{ $upstream }};
+ }
+ {{- end }}
+ {{- range $port, $upstream := .UDP }}
+ server {
+ listen {{ $port }} udp;
+ resolver 100.100.100.100;
+ proxy_pass {{ $upstream }};
+ }
+ {{- end }}
+ }
+ {{- end }}`
diff --git a/core/installer/cluster_test.go b/core/installer/cluster_test.go
index e315202..b2ecd99 100644
--- a/core/installer/cluster_test.go
+++ b/core/installer/cluster_test.go
@@ -7,7 +7,14 @@
)
func TestParseNginxProxyConfig(t *testing.T) {
- cfg, err := ParseNginxProxyConfig(strings.NewReader(`nginx.conf: |
+ cfg, err := ParseNginxProxyConfig(strings.NewReader(`
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: proxy-backend-config
+ namespace: foo
+data:
+nginx.conf: |
# user www www;
worker_processes 1;
error_log /dev/null crit;
@@ -33,33 +40,66 @@
}
}
}
+stream {
+ server {
+ listen 9091;
+ proxy_pass foo:1;
+ }
+ server {
+ listen 9092 udp;
+ proxy_pass bar:2;
+ }
+ server {
+ listen 9093;
+ proxy_pass dev:3;
+ }
+}
`))
if err != nil {
t.Fatal(err)
}
- if cfg.Port != 9090 {
- t.Errorf("invalid port: expected 9090, got %d", cfg.Port)
+ if cfg.Namespace != "foo" {
+ t.Errorf("invalid namespace: expeced foo, got %s", cfg.Namespace)
+ }
+ if cfg.IngressPort != 9090 {
+ t.Errorf("invalid port: expected 9090, got %d", cfg.IngressPort)
}
if len(cfg.Resolvers) != 2 ||
!cfg.Resolvers[0].Equal(net.ParseIP("1.1.1.1")) ||
!cfg.Resolvers[1].Equal(net.ParseIP("2.2.2.2")) {
t.Errorf("invalid resolvers: expected [1.1.1.1 2.2.2.2], got %s", cfg.Resolvers)
}
- if len(cfg.Proxies) != 2 ||
- cfg.Proxies["a"] != "A" ||
- cfg.Proxies["b"] != "B" {
- t.Errorf("invalid proxies: expected map[a:A, b:B], got %s", cfg.Proxies)
+ if len(cfg.Ingress) != 2 ||
+ cfg.Ingress["a"] != "A" ||
+ cfg.Ingress["b"] != "B" {
+ t.Errorf("invalid ingress proxies: expected map[a:A, b:B], got %s", cfg.Ingress)
+ }
+ if len(cfg.TCP) != 2 ||
+ cfg.TCP[9091] != "foo:1" ||
+ cfg.TCP[9093] != "dev:3" {
+ t.Errorf("invalid TCP proxies: expected map[9091:foo:1, 9093:dev:3], got %v", cfg.TCP)
+ }
+ if len(cfg.UDP) != 1 ||
+ cfg.UDP[9092] != "bar:2" {
+ t.Errorf("invalid UDP proxies: expected map[9092:bar:2], got %v", cfg.UDP)
}
}
func TestRenderNginxProxyConfig(t *testing.T) {
cfg := NginxProxyConfig{
- Port: 8080,
- Resolvers: []net.IP{net.ParseIP("1.1.1.1"), net.ParseIP("2.2.2.2")},
- Proxies: map[string]string{
+ IngressPort: 8080,
+ Resolvers: []net.IP{net.ParseIP("1.1.1.1"), net.ParseIP("2.2.2.2")},
+ Ingress: map[string]string{
"a": "A",
"b": "B",
},
+ TCP: map[int]string{
+ 1: "foo:1",
+ 3: "dev:3",
+ },
+ UDP: map[int]string{
+ 2: "bar:2",
+ },
PreConf: []string{"line1", "line2"},
}
var buf strings.Builder
diff --git a/core/installer/cmd/app_manager.go b/core/installer/cmd/app_manager.go
index afc7d32..c2547d6 100644
--- a/core/installer/cmd/app_manager.go
+++ b/core/installer/cmd/app_manager.go
@@ -3,8 +3,8 @@
import (
"log"
"os"
-
- "golang.org/x/crypto/ssh"
+ "path/filepath"
+ "strings"
"github.com/giolekva/pcloud/core/installer"
"github.com/giolekva/pcloud/core/installer/server/appmanager"
@@ -80,20 +80,14 @@
if err != nil {
return err
}
- signer, err := ssh.ParsePrivateKey(sshKey)
+ items := strings.Split(appManagerFlags.repoAddr, "/")
+ ipPort := items[len(items)-2]
+ repoName := items[len(items)-1]
+ ssClient, err := soft.NewClient(ipPort, sshKey, log.Default())
if err != nil {
return err
}
- addr, err := soft.ParseRepositoryAddress(appManagerFlags.repoAddr)
- if err != nil {
- return err
- }
- repo, err := soft.CloneRepository(addr, signer)
- if err != nil {
- return err
- }
- log.Println("Cloned repository")
- repoIO, err := soft.NewRepoIO(repo, signer)
+ repoIO, err := ssClient.GetRepo(repoName)
if err != nil {
return err
}
@@ -112,7 +106,8 @@
PrivateSubdomain: "p",
DNSAPIAddr: appManagerFlags.dnsAPIAddr,
Repo: repoIO,
- NginxConfigPath: appManagerFlags.clusterProxyConfigPath,
+ ConfigPath: appManagerFlags.clusterProxyConfigPath,
+ ServicePath: filepath.Join(filepath.Dir(appManagerFlags.clusterProxyConfigPath), "proxy-backend-service.yaml"),
}
m, err := installer.NewAppManager(repoIO, nsc, jc, hf, vpnAPIClient, cnc, "/apps")
if err != nil {
@@ -145,6 +140,7 @@
}
s, err := appmanager.NewServer(
appManagerFlags.port,
+ ssClient,
repoIO,
m,
r,
diff --git a/core/installer/cmd/dodo_app.go b/core/installer/cmd/dodo_app.go
index adc8beb..9b5da30 100644
--- a/core/installer/cmd/dodo_app.go
+++ b/core/installer/cmd/dodo_app.go
@@ -245,6 +245,8 @@
return s.Start()
}
+// TODO(gio): this should be removed, all of app installtions including dodo app
+// must be done directly by main app manager.
type proxyConfigurator struct {
apiAddr string
}
@@ -262,7 +264,7 @@
To string `json:"to"`
}
-func (pc *proxyConfigurator) AddProxy(src, dst string) error {
+func (pc *proxyConfigurator) AddIngressProxy(src, dst string) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(proxyPair{src, dst}); err != nil {
return err
@@ -279,7 +281,17 @@
return nil
}
-func (pc *proxyConfigurator) RemoveProxy(src, dst string) error {
+func (pc *proxyConfigurator) AddProxy(src int, dst string, protocol installer.Protocol) (string, error) {
+ // TODO(gio): implement
+ return "", fmt.Errorf("NOT IMPLEMENTED")
+}
+
+func (pc *proxyConfigurator) RemoveProxy(src int, dst string, protocol installer.Protocol) error {
+ // TODO(gio): implement
+ return fmt.Errorf("NOT IMPLEMENTED")
+}
+
+func (pc *proxyConfigurator) RemoveIngressProxy(src, dst string) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(proxyPair{src, dst}); err != nil {
return err
diff --git a/core/installer/dodo_app_test.go b/core/installer/dodo_app_test.go
index 5b0f3b1..eaad590 100644
--- a/core/installer/dodo_app_test.go
+++ b/core/installer/dodo_app_test.go
@@ -1,6 +1,8 @@
package installer
import (
+ "bytes"
+ "encoding/json"
"testing"
"cuelang.org/go/cue/errors"
@@ -210,3 +212,61 @@
}
t.Log(string(r.Raw))
}
+
+const exposeRemoteCluster = `
+{
+ "cluster": "remote",
+ "postgresql": [{
+ "name": "db",
+ "size": "1Gi",
+ "expose": [{
+ "network": "Private",
+ "subdomain": "pg"
+ }]
+ }],
+}
+`
+
+func TestExposeRemoteCluster(t *testing.T) {
+ var buf bytes.Buffer
+ if _, err := buf.WriteString(exposeRemoteCluster); err != nil {
+ t.Fatal(err)
+ }
+ clusters := []Cluster{{
+ Name: "remote",
+ Kubeconfig: "<KUBECONFIG>",
+ IngressClassName: "<INGRESS_CLASS_NAME>",
+ }}
+ if err := json.NewEncoder(&buf).Encode(struct {
+ Clusters []Cluster `json:"clusters"`
+ }{
+ clusters,
+ }); err != nil {
+ t.Fatal(err)
+ }
+ app, err := NewDodoApp(buf.Bytes())
+ if err != nil {
+ for _, e := range errors.Errors(err) {
+ t.Log(e)
+ }
+ t.Fatal(err)
+ }
+ release := Release{
+ Namespace: "foo",
+ AppInstanceId: "foo-bar",
+ RepoAddr: "ssh://192.168.100.210:22/config",
+ AppDir: "/foo/bar",
+ }
+ keyGen := testKeyGen{}
+ r, err := app.Render(release, env, networks, clusters, map[string]any{
+ "managerAddr": "",
+ "appId": "",
+ "sshPrivateKey": "",
+ "port_postgresql_db_0": 1,
+ "port_postgresql_db_0_cluster": 2,
+ }, nil, keyGen)
+ if err != nil {
+ t.Fatal(err)
+ }
+ t.Log(string(r.Raw))
+}
diff --git a/core/installer/server/appmanager/server.go b/core/installer/server/appmanager/server.go
index b3a0883..3360ff4 100644
--- a/core/installer/server/appmanager/server.go
+++ b/core/installer/server/appmanager/server.go
@@ -41,6 +41,7 @@
type Server struct {
l sync.Locker
port int
+ ssClient soft.Client
repo soft.RepoIO
m *installer.AppManager
r installer.AppRepository
@@ -98,6 +99,7 @@
func NewServer(
port int,
+ ssClient soft.Client,
repo soft.RepoIO,
m *installer.AppManager,
r installer.AppRepository,
@@ -114,6 +116,7 @@
return &Server{
l: &sync.Mutex{},
port: port,
+ ssClient: ssClient,
repo: repo,
m: m,
r: r,
@@ -187,7 +190,7 @@
return
}
appDir := filepath.Join("/dodo-app", req.Id)
- namespace := "dodo-app-test" // TODO(gio)
+ namespace := "dodo-app-testttt" // TODO(gio)
if _, err := s.m.Install(app, req.Id, appDir, namespace, map[string]any{
"managerAddr": "", // TODO(gio)
"appId": req.Id,
@@ -238,7 +241,7 @@
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
- if err := s.cnc.AddProxy(req.From, req.To); err != nil {
+ if err := s.cnc.AddIngressProxy(req.From, req.To); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -250,7 +253,7 @@
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
- if err := s.cnc.RemoveProxy(req.From, req.To); err != nil {
+ if err := s.cnc.RemoveIngressProxy(req.From, req.To); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -891,6 +894,17 @@
if err != nil {
return installer.ReleaseResources{}, err
}
+ keys, err := installer.NewSSHKeyPair("port-allocator")
+ if err != nil {
+ return installer.ReleaseResources{}, err
+ }
+ user := fmt.Sprintf("%s-cluster-%s-port-allocator", env.Id, name)
+ if err := s.ssClient.AddUser(user, keys.AuthorizedKey()); err != nil {
+ return installer.ReleaseResources{}, err
+ }
+ if err := s.ssClient.AddReadWriteCollaborator("config", user); err != nil {
+ return installer.ReleaseResources{}, err
+ }
instanceId := fmt.Sprintf("%s-%s", app.Slug(), name)
appDir := fmt.Sprintf("/clusters/%s/ingress", name)
namespace := fmt.Sprintf("%scluster-%s-network", env.NamespacePrefix, name)
@@ -903,6 +917,7 @@
// TODO(gio): remove hardcoded user
"vpnUser": vpnUser,
"vpnProxyHostname": hostname,
+ "sshPrivateKey": string(keys.RawPrivateKey()),
})
if err != nil {
return installer.ReleaseResources{}, err
diff --git a/core/installer/values-tmpl/cluster-network.cue b/core/installer/values-tmpl/cluster-network.cue
index 393ac41..85551e6 100644
--- a/core/installer/values-tmpl/cluster-network.cue
+++ b/core/installer/values-tmpl/cluster-network.cue
@@ -1,5 +1,5 @@
import (
- // "encoding/base64"
+ "encoding/base64"
)
input: {
@@ -7,7 +7,7 @@
vpnUser: string
vpnProxyHostname: string
vpnAuthKey: string @role(VPNAuthKey) @usernameField(vpnUser)
- // TODO(gio): support port allocator
+ sshPrivateKey: string
}
name: "Cluster Network"
@@ -28,12 +28,12 @@
tag: "v1.82.0"
pullPolicy: "IfNotPresent"
}
- // portAllocator: {
- // repository: "giolekva"
- // name: "port-allocator"
- // tag: "latest"
- // pullPolicy: "Always"
- // }
+ portAllocator: {
+ repository: "giolekva"
+ name: "port-allocator"
+ tag: "latest"
+ pullPolicy: "Always"
+ }
}
charts: {
@@ -55,12 +55,12 @@
branch: "main"
path: "charts/tailscale-proxy"
}
- // portAllocator: {
- // kind: "GitRepository"
- // address: "https://code.v1.dodo.cloud/helm-charts"
- // branch: "main"
- // path: "charts/port-allocator"
- // }
+ portAllocator: {
+ kind: "GitRepository"
+ address: "https://code.v1.dodo.cloud/helm-charts"
+ branch: "main"
+ path: "charts/port-allocator"
+ }
}
helm: {
@@ -119,20 +119,23 @@
}]
}]
}
+ tcp: {}
+ udp: {}
}
}
- // "port-allocator": {
- // chart: charts.portAllocator
- // values: {
- // repoAddr: release.repoAddr
- // sshPrivateKey: base64.Encode(null, input.sshPrivateKey)
- // ingressNginxPath: "\(release.appDir)/resources/ingress-nginx.yaml"
- // image: {
- // repository: images.portAllocator.fullName
- // tag: images.portAllocator.tag
- // pullPolicy: images.portAllocator.pullPolicy
- // }
- // }
- // }
+ "port-allocator": {
+ chart: charts.portAllocator
+ cluster: null
+ values: {
+ repoAddr: release.repoAddr
+ sshPrivateKey: base64.Encode(null, input.sshPrivateKey)
+ ingressNginxPath: "\(release.appDir)/resources/ingress-nginx.yaml"
+ image: {
+ repository: images.portAllocator.fullName
+ tag: "amd64" // TODO(gio): images.portAllocator.tag
+ pullPolicy: images.portAllocator.pullPolicy
+ }
+ }
+ }
}
}