blob: e5279da8d413d0f15ab11586c4aceee22a87ba44 [file] [log] [blame]
giolekva96755fa2021-10-06 21:00:00 +04001package controllers
2
3import (
4 "context"
5 "fmt"
6 "io/ioutil"
7 "os"
8 "os/exec"
9 "path/filepath"
10 "time"
11
12 corev1 "k8s.io/api/core/v1"
13 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14 "k8s.io/apimachinery/pkg/labels"
15 "k8s.io/apimachinery/pkg/selection"
16 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
17 "k8s.io/apimachinery/pkg/util/wait"
18 "k8s.io/client-go/kubernetes"
19 "k8s.io/client-go/tools/cache"
20 "k8s.io/client-go/util/workqueue"
21 "k8s.io/klog/v2"
22
23 nebulav1 "github.com/giolekva/pcloud/core/nebula/apis/nebula/v1"
24 clientset "github.com/giolekva/pcloud/core/nebula/generated/clientset/versioned"
25 informers "github.com/giolekva/pcloud/core/nebula/generated/informers/externalversions"
26 listers "github.com/giolekva/pcloud/core/nebula/generated/listers/nebula/v1"
27)
28
29var secretImmutable = true
30
31type CAController struct {
32 kubeClient kubernetes.Interface
33 nebulaClient clientset.Interface
34 caLister listers.NebulaCALister
35 caSynced cache.InformerSynced
36 workqueue workqueue.RateLimitingInterface
37
38 nebulaCert string
39}
40
41func NewCAController(kubeClient kubernetes.Interface, nebulaClient clientset.Interface, nebulaInformerFactory informers.SharedInformerFactory, nebulaCert string) *CAController {
42 nebulaInformer := nebulaInformerFactory.Lekva().V1().NebulaCAs().Informer()
43 c := &CAController{
44 kubeClient: kubeClient,
45 nebulaClient: nebulaClient,
46 caLister: nebulaInformerFactory.Lekva().V1().NebulaCAs().Lister(),
47 caSynced: nebulaInformer.HasSynced,
48 workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "NebulaCAs"),
49 nebulaCert: nebulaCert,
50 }
51
52 nebulaInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
53 AddFunc: c.enqueue,
54 UpdateFunc: func(_, o interface{}) {
55 c.enqueue(o)
56 },
57 DeleteFunc: func(o interface{}) {
58 },
59 })
60
61 return c
62}
63
64func (c *CAController) enqueue(o interface{}) {
65 var key string
66 var err error
67 if key, err = cache.MetaNamespaceKeyFunc(o); err != nil {
68 utilruntime.HandleError(err)
69 return
70 }
71 c.workqueue.Add(key)
72}
73
74func (c *CAController) Run(workers int, stopCh <-chan struct{}) error {
75 defer utilruntime.HandleCrash()
76 defer c.workqueue.ShutDown()
77 klog.Info("Starting NebulaCA controller")
78 klog.Info("Waiting for informer caches to sync")
79 if ok := cache.WaitForCacheSync(stopCh, c.caSynced); !ok {
80 return fmt.Errorf("Failed to wait for caches to sync")
81 }
82 fmt.Println("Starting workers")
83 for i := 0; i < workers; i++ {
84 go wait.Until(c.runWorker, time.Second, stopCh)
85 }
86 fmt.Println("Started workers")
87 <-stopCh
88 fmt.Println("Shutting down workers")
89 return nil
90}
91
92func (c *CAController) runWorker() {
93 for c.processNextWorkItem() {
94 }
95}
96
97func (c *CAController) processNextWorkItem() bool {
98 o, shutdown := c.workqueue.Get()
99 if shutdown {
100 return false
101 }
102 err := func(o interface{}) error {
103 defer c.workqueue.Done(o)
104 var key string
105 var ok bool
106 if key, ok = o.(string); !ok {
107 c.workqueue.Forget(o)
108 utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", o))
109 return nil
110 }
111 if err := c.processCAWithKey(key); err != nil {
112 c.workqueue.AddRateLimited(key)
113 return fmt.Errorf("Rrror syncing '%s': %s, requeuing", key, err.Error())
114 }
115 c.workqueue.Forget(o)
116 fmt.Printf("Successfully synced '%s'\n", key)
117 return nil
118 }(o)
119 if err != nil {
120 utilruntime.HandleError(err)
121 return true
122 }
123 return true
124}
125
126func (c *CAController) processCAWithKey(key string) error {
127 namespace, name, err := cache.SplitMetaNamespaceKey(key)
128 if err != nil {
129 return nil
130 }
131 ca, err := c.getCA(namespace, name)
132 if err != nil {
133 panic(err)
134 }
135 if ca.Status.State == nebulav1.NebulaCAStateReady {
136 fmt.Printf("%s CA is already in Ready state\n", ca.Name)
137 return nil
138 }
139 keyDir, err := generateCAKey(ca.Spec.CAName, c.nebulaCert)
140 if err != nil {
141 panic(err)
142 }
143 defer os.RemoveAll(keyDir)
144 secret, err := createSecretFromDir(keyDir)
145 if err != nil {
146 panic(err)
147 }
148 secret.Immutable = &secretImmutable
149 secret.Name = ca.Spec.SecretName
150 _, err = c.kubeClient.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
151 if err != nil {
152 panic(err)
153 }
154 err = c.updateStatus(ca, nebulav1.NebulaCAStateReady, "Generated credentials")
155 if err != nil {
156 panic(err)
157 }
158 return nil
159}
160
161func (c *CAController) updateStatus(ca *nebulav1.NebulaCA, state nebulav1.NebulaCAState, msg string) error {
162 cp := ca.DeepCopy()
163 cp.Status.State = state
164 cp.Status.Message = msg
165 _, err := c.nebulaClient.LekvaV1().NebulaCAs(cp.Namespace).UpdateStatus(context.TODO(), cp, metav1.UpdateOptions{})
166 return err
167}
168
169func createSecretFromDir(path string) (*corev1.Secret, error) {
170 all, err := ioutil.ReadDir(path)
171 if err != nil {
172 return nil, err
173 }
174 secret := &corev1.Secret{
175 Data: make(map[string][]byte),
176 }
177 for _, f := range all {
178 if f.IsDir() {
179 continue
180 }
181 d, err := ioutil.ReadFile(filepath.Join(path, f.Name()))
182 if err != nil {
183 return nil, err
184 }
185 secret.Data[f.Name()] = d
186 }
187 return secret, nil
188}
189
190func generateCAKey(name, nebulaCert string) (string, error) {
191 tmp, err := os.MkdirTemp("", name)
192 if err != nil {
193 return "", err
194 }
195 fmt.Println(tmp)
196 cmd := exec.Command(nebulaCert, "ca",
197 "-name", name,
198 "-out-key", filepath.Join(tmp, "ca.key"),
199 "-out-crt", filepath.Join(tmp, "ca.crt"),
200 "-out-qr", filepath.Join(tmp, "ca.png"))
201 if err := cmd.Run(); err != nil {
202 return "", err
203 }
204 return tmp, nil
205}
206
207func (c *CAController) getCA(namespace, name string) (*nebulav1.NebulaCA, error) {
208 s := labels.NewSelector()
209 r, err := labels.NewRequirement("metadata.namespace", selection.Equals, []string{namespace})
210 if err != nil {
211 panic(err)
212 }
213 r1, err := labels.NewRequirement("metadata.name", selection.Equals, []string{name})
214 if err != nil {
215 panic(err)
216 }
217 s.Add(*r, *r1)
218 ncas, err := c.caLister.List(s)
219 if err != nil {
220 panic(err)
221 }
222 if len(ncas) != 1 {
223 panic("err")
224 }
225 return ncas[0], nil
226}