blob: ac7d970608865736bfaea6fe8bd184969eb02926 [file] [log] [blame]
package installer
import (
"bytes"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"path/filepath"
"strconv"
"strings"
"text/template"
"github.com/giolekva/pcloud/core/installer/soft"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/yaml"
)
type ClusterNetworkConfigurator interface {
AddCluster(name string, ingressIP net.IP) error
RemoveCluster(name string, ingressIP net.IP) error
AddProxy(src int, dst string, protocol Protocol) (string, error)
AddIngressProxy(src, dst string) error
RemoveProxy(src int, dst string, protocol Protocol) error
RemoveIngressProxy(src, dst string) error
}
type NginxProxyConfigurator struct {
PrivateSubdomain string
DNSAPIAddr string
Repo soft.RepoIO
ConfigPath string
ServicePath string
}
type createARecordReq struct {
Entry string `json:"entry"`
IP net.IP `json:"text"`
}
func (c *NginxProxyConfigurator) AddCluster(name string, ingressIP net.IP) error {
req := createARecordReq{
Entry: fmt.Sprintf("*.%s.cluster.%s", name, c.PrivateSubdomain),
IP: ingressIP,
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(req); err != nil {
return err
}
resp, err := http.Post(fmt.Sprintf("%s/create-a-record", c.DNSAPIAddr), "application/json", &buf)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
var buf bytes.Buffer
io.Copy(&buf, resp.Body)
return fmt.Errorf(buf.String())
}
return nil
}
func (c *NginxProxyConfigurator) RemoveCluster(name string, ingressIP net.IP) error {
req := createARecordReq{
Entry: fmt.Sprintf("*.%s.cluster.%s", name, c.PrivateSubdomain),
IP: ingressIP,
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(req); err != nil {
return err
}
resp, err := http.Post(fmt.Sprintf("%s/delete-a-record", c.DNSAPIAddr), "application/json", &buf)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
var buf bytes.Buffer
io.Copy(&buf, resp.Body)
return fmt.Errorf(buf.String())
}
return nil
}
func (c *NginxProxyConfigurator) AddProxy(src int, dst string, protocol Protocol) (string, error) {
var namespace string
_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
if err := func() error {
r, err := fs.Reader(c.ServicePath)
if err != nil {
return err
}
defer r.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
return err
}
var svc corev1.Service
if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
return err
}
svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
Name: fmt.Sprintf("p%d", src),
Protocol: corev1.Protocol(ProtocolToString(protocol)),
Port: int32(src),
TargetPort: intstr.FromInt(src),
})
w, err := fs.Writer(c.ServicePath)
if err != nil {
return err
}
defer w.Close()
tmp, err := yaml.Marshal(svc)
if err != nil {
return err
}
if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
return err
}
return nil
}(); err != nil {
return "", err
}
cfg, err := func() (NginxProxyConfig, error) {
r, err := fs.Reader(c.ConfigPath)
if err != nil {
return NginxProxyConfig{}, err
}
defer r.Close()
return ParseNginxProxyConfig(r)
}()
if err != nil {
return "", err
}
namespace = cfg.Namespace
var proxyMap map[int]string
switch protocol {
case ProtocolTCP:
proxyMap = cfg.TCP
case ProtocolUDP:
proxyMap = cfg.UDP
default:
return "", fmt.Errorf("invalid protocol: %v", protocol)
}
// TODO(gio): check for already existing mapping
proxyMap[src] = dst
w, err := fs.Writer(c.ConfigPath)
if err != nil {
return "", err
}
defer w.Close()
h := sha256.New()
o := io.MultiWriter(w, h)
if err := cfg.Render(o); err != nil {
return "", err
}
hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
nginx, err := func() (map[string]any, error) {
r, err := fs.Reader(nginxPath)
if err != nil {
return nil, err
}
defer r.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
return nil, err
}
ret := map[string]any{}
if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
return nil, err
}
return ret, nil
}()
if err != nil {
return "", err
}
cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
var annotations map[string]any
if a, ok := cv["podAnnotations"]; ok {
annotations = a.(map[string]any)
} else {
annotations = map[string]any{}
cv["podAnnotations"] = annotations
}
annotations["dodo.cloud/hash"] = string(hash)
buf, err := yaml.Marshal(nginx)
if err != nil {
return "", err
}
w, err = fs.Writer(nginxPath)
if err != nil {
return "", err
}
defer w.Close()
if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
return "", err
}
return fmt.Sprintf("add proxy mapping: %d %s", src, dst), nil
})
if err != nil {
return "", err
}
return namespace, nil
}
func (c *NginxProxyConfigurator) AddIngressProxy(src, dst string) error {
_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
cfg, err := func() (NginxProxyConfig, error) {
r, err := fs.Reader(c.ConfigPath)
if err != nil {
return NginxProxyConfig{}, err
}
defer r.Close()
return ParseNginxProxyConfig(r)
}()
if err != nil {
return "", err
}
if v, ok := cfg.Ingress[src]; ok && v != dst {
return "", fmt.Errorf("wrong mapping %s already exists (%s)", src, v)
}
cfg.Ingress[src] = dst
w, err := fs.Writer(c.ConfigPath)
if err != nil {
return "", err
}
defer w.Close()
h := sha256.New()
o := io.MultiWriter(w, h)
if err := cfg.Render(o); err != nil {
return "", err
}
hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
nginx, err := func() (map[string]any, error) {
r, err := fs.Reader(nginxPath)
if err != nil {
return nil, err
}
defer r.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
return nil, err
}
ret := map[string]any{}
if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
return nil, err
}
return ret, nil
}()
if err != nil {
return "", err
}
cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
var annotations map[string]any
if a, ok := cv["podAnnotations"]; ok {
annotations = a.(map[string]any)
} else {
annotations = map[string]any{}
cv["podAnnotations"] = annotations
}
annotations["dodo.cloud/hash"] = string(hash)
buf, err := yaml.Marshal(nginx)
if err != nil {
return "", err
}
w, err = fs.Writer(nginxPath)
if err != nil {
return "", err
}
defer w.Close()
if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
return "", err
}
return fmt.Sprintf("add ingress proxy mapping: %s %s", src, dst), nil
})
return err
}
func (c *NginxProxyConfigurator) RemoveProxy(src int, dst string, protocol Protocol) error {
_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
if err := func() error {
r, err := fs.Reader(c.ServicePath)
if err != nil {
return err
}
defer r.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
return err
}
var svc corev1.Service
if err := yaml.Unmarshal(buf.Bytes(), &svc); err != nil {
return err
}
for i, p := range svc.Spec.Ports {
if p.Port == int32(src) {
svc.Spec.Ports = append(svc.Spec.Ports[:i], svc.Spec.Ports[i+1:]...)
break
}
}
w, err := fs.Writer(c.ServicePath)
if err != nil {
return err
}
defer w.Close()
tmp, err := yaml.Marshal(svc)
if err != nil {
return err
}
if _, err := io.Copy(w, bytes.NewReader(tmp)); err != nil {
return err
}
return nil
}(); err != nil {
return "", err
}
cfg, err := func() (NginxProxyConfig, error) {
r, err := fs.Reader(c.ConfigPath)
if err != nil {
return NginxProxyConfig{}, err
}
defer r.Close()
return ParseNginxProxyConfig(r)
}()
if err != nil {
return "", err
}
var proxyMap map[int]string
switch protocol {
case ProtocolTCP:
proxyMap = cfg.TCP
case ProtocolUDP:
proxyMap = cfg.UDP
default:
return "", fmt.Errorf("invalid protocol: %v", protocol)
}
// TODO(gio): check for already existing mapping
delete(proxyMap, src)
w, err := fs.Writer(c.ConfigPath)
if err != nil {
return "", err
}
defer w.Close()
h := sha256.New()
o := io.MultiWriter(w, h)
if err := cfg.Render(o); err != nil {
return "", err
}
hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
nginx, err := func() (map[string]any, error) {
r, err := fs.Reader(nginxPath)
if err != nil {
return nil, err
}
defer r.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
return nil, err
}
ret := map[string]any{}
if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
return nil, err
}
return ret, nil
}()
if err != nil {
return "", err
}
cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
var annotations map[string]any
if a, ok := cv["podAnnotations"]; ok {
annotations = a.(map[string]any)
} else {
annotations = map[string]any{}
cv["podAnnotations"] = annotations
}
annotations["dodo.cloud/hash"] = string(hash)
buf, err := yaml.Marshal(nginx)
if err != nil {
return "", err
}
w, err = fs.Writer(nginxPath)
if err != nil {
return "", err
}
defer w.Close()
if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
return "", err
}
return fmt.Sprintf("remove proxy mapping: %d %s", src, dst), nil
})
return err
}
func (c *NginxProxyConfigurator) RemoveIngressProxy(src, dst string) error {
_, err := c.Repo.Do(func(fs soft.RepoFS) (string, error) {
cfg, err := func() (NginxProxyConfig, error) {
r, err := fs.Reader(c.ConfigPath)
if err != nil {
return NginxProxyConfig{}, err
}
defer r.Close()
return ParseNginxProxyConfig(r)
}()
if err != nil {
return "", err
}
if v, ok := cfg.Ingress[src]; !ok || v != dst {
return "", fmt.Errorf("wrong mapping from source: %s actual: %s expected: %s", src, v, dst)
}
delete(cfg.Ingress, src)
w, err := fs.Writer(c.ConfigPath)
if err != nil {
return "", err
}
defer w.Close()
h := sha256.New()
o := io.MultiWriter(w, h)
if err := cfg.Render(o); err != nil {
return "", err
}
hash := base64.StdEncoding.EncodeToString(h.Sum(nil))
nginxPath := filepath.Join(filepath.Dir(c.ConfigPath), "ingress-nginx.yaml")
nginx, err := func() (map[string]any, error) {
r, err := fs.Reader(nginxPath)
if err != nil {
return nil, err
}
defer r.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
return nil, err
}
ret := map[string]any{}
if err := yaml.Unmarshal(buf.Bytes(), &ret); err != nil {
return nil, err
}
return ret, nil
}()
if err != nil {
return "", err
}
cv := nginx["spec"].(map[string]any)["values"].(map[string]any)["controller"].(map[string]any)
var annotations map[string]any
if a, ok := cv["podAnnotations"]; ok {
annotations = a.(map[string]any)
} else {
annotations = map[string]any{}
cv["podAnnotations"] = annotations
}
annotations["dodo.cloud/hash"] = string(hash)
buf, err := yaml.Marshal(nginx)
if err != nil {
return "", err
}
w, err = fs.Writer(nginxPath)
if err != nil {
return "", err
}
defer w.Close()
if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
return "", err
}
return fmt.Sprintf("remove proxy mapping: %s %s", src, dst), nil
})
return err
}
type Protocol int
const (
ProtocolTCP Protocol = iota
ProtocolUDP
)
func ProtocolToString(p Protocol) string {
if p == ProtocolTCP {
return "TCP"
} else {
return "UDP"
}
}
type NginxProxyConfig struct {
Namespace string
IngressPort int
Resolvers []net.IP
Ingress map[string]string
TCP map[int]string
UDP map[int]string
PreConf []string
}
func parseProtocol(s string) (Protocol, error) {
switch strings.ToLower(s) {
case "tcp":
return ProtocolTCP, nil
case "udp":
return ProtocolUDP, nil
default:
return ProtocolUDP, fmt.Errorf("invalid protocol: %s", s)
}
}
func ParseNginxProxyConfig(r io.Reader) (NginxProxyConfig, error) {
var buf strings.Builder
if _, err := io.Copy(&buf, r); err != nil {
return NginxProxyConfig{}, err
}
ret := NginxProxyConfig{
IngressPort: -1,
Resolvers: nil,
Ingress: make(map[string]string),
TCP: make(map[int]string),
UDP: make(map[int]string),
}
lines := strings.Split(buf.String(), "\n")
insidePreConf := true
insideHttp := false
insideMap := false
insideStream := false
streamPort := -1
streamPortProtocol := ProtocolTCP
for _, l := range lines {
items := strings.Fields(strings.TrimSuffix(l, ";"))
if len(items) == 0 {
continue
}
if strings.Contains(l, "nginx.conf") {
ret.PreConf = append(ret.PreConf, l)
insidePreConf = false
} else if insidePreConf {
ret.PreConf = append(ret.PreConf, l)
items := strings.Fields(l)
if items[0] == "namespace:" {
ret.Namespace = items[1]
}
} else if items[0] == "http" {
insideHttp = true
} else if insideHttp && items[0] == "map" {
insideMap = true
} else if items[0] == "stream" {
insideHttp = false
insideMap = false
insideStream = true
} else if strings.Contains(l, "listen") {
if len(items) < 2 {
return NginxProxyConfig{}, fmt.Errorf("invalid listen: %s", l)
}
port, err := strconv.Atoi(items[1])
if err != nil {
return NginxProxyConfig{}, err
}
if insideHttp {
if len(items) > 2 {
return NginxProxyConfig{}, fmt.Errorf("invalid http listen: %s", l)
}
ret.IngressPort = port
} else {
if !insideStream {
return NginxProxyConfig{}, fmt.Errorf("invalid state, expected to be inside stream section")
}
streamPort = port
if len(items) == 3 {
streamPortProtocol, err = parseProtocol(items[2])
if err != nil {
return NginxProxyConfig{}, err
}
} else {
streamPortProtocol = ProtocolTCP
}
}
} else if insideHttp && strings.Contains(l, "resolver") {
if len(items) < 2 {
return NginxProxyConfig{}, fmt.Errorf("invalid resolver: %s", l)
}
ip := net.ParseIP(items[1])
if ip == nil {
return NginxProxyConfig{}, fmt.Errorf("invalid resolver ip: %s", l)
}
ret.Resolvers = append(ret.Resolvers, ip)
} else if insideHttp && insideMap {
if items[0] == "}" {
insideMap = false
continue
}
if len(items) < 2 {
return NginxProxyConfig{}, fmt.Errorf("invalid map: %s", l)
}
ret.Ingress[items[0]] = items[1]
} else if insideStream && strings.Contains(l, "proxy_pass") {
if streamPort == -1 {
return NginxProxyConfig{}, fmt.Errorf("invalid state, expected server port to be defined")
}
if len(items) < 2 {
return NginxProxyConfig{}, fmt.Errorf("invalid proxy_pass: %s", l)
}
if streamPortProtocol == ProtocolTCP {
ret.TCP[streamPort] = items[1]
} else {
ret.UDP[streamPort] = items[1]
}
}
}
return ret, nil
}
func (c NginxProxyConfig) Render(w io.Writer) error {
for _, l := range c.PreConf {
fmt.Fprintln(w, l)
}
tmpl, err := template.New("nginx.conf").Parse(nginxConfigTmpl)
if err != nil {
return err
}
return tmpl.Execute(w, c)
}
const nginxConfigTmpl = ` worker_processes 1;
worker_rlimit_nofile 8192;
events {
worker_connections 1024;
}
http {
map $http_host $backend {
{{- range $from, $to := .Ingress }}
{{ $from }} {{ $to }};
{{- end }}
}
server {
listen {{ .IngressPort }};
location / {
{{- range .Resolvers }}
resolver {{ . }};
{{- end }}
proxy_pass http://$backend;
}
}
}
{{- if or .TCP .UDP }}
stream {
{{- range $port, $upstream := .TCP }}
server {
listen {{ $port }};
resolver 100.100.100.100;
proxy_pass {{ $upstream }};
}
{{- end }}
{{- range $port, $upstream := .UDP }}
server {
listen {{ $port }} udp;
resolver 100.100.100.100;
proxy_pass {{ $upstream }};
}
{{- end }}
}
{{- end }}`