blob: 452b9e5930563df2fc05cc5fe5d6447edf67c3a3 [file] [log] [blame]
giolekva96755fa2021-10-06 21:00:00 +04001package controllers
2
3import (
4 "context"
5 "fmt"
6 "io/ioutil"
7 "os"
8 "os/exec"
9 "path/filepath"
10 "time"
11
12 corev1 "k8s.io/api/core/v1"
13 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14 "k8s.io/apimachinery/pkg/labels"
15 "k8s.io/apimachinery/pkg/selection"
16 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
17 "k8s.io/apimachinery/pkg/util/wait"
giolekva5ebab802021-10-07 21:50:34 +040018 corev1informers "k8s.io/client-go/informers/core/v1"
giolekva96755fa2021-10-06 21:00:00 +040019 "k8s.io/client-go/kubernetes"
giolekva5ebab802021-10-07 21:50:34 +040020 corev1listers "k8s.io/client-go/listers/core/v1"
giolekva96755fa2021-10-06 21:00:00 +040021 "k8s.io/client-go/tools/cache"
22 "k8s.io/client-go/util/workqueue"
23 "k8s.io/klog/v2"
24
25 nebulav1 "github.com/giolekva/pcloud/core/nebula/apis/nebula/v1"
26 clientset "github.com/giolekva/pcloud/core/nebula/generated/clientset/versioned"
giolekva5ebab802021-10-07 21:50:34 +040027 informers "github.com/giolekva/pcloud/core/nebula/generated/informers/externalversions/nebula/v1"
giolekva96755fa2021-10-06 21:00:00 +040028 listers "github.com/giolekva/pcloud/core/nebula/generated/listers/nebula/v1"
29)
30
31var secretImmutable = true
32
giolekva5ebab802021-10-07 21:50:34 +040033type caRef struct {
34 key string
35}
36
37type nodeRef struct {
38 key string
39}
40
giolekva96755fa2021-10-06 21:00:00 +040041type CAController struct {
42 kubeClient kubernetes.Interface
43 nebulaClient clientset.Interface
44 caLister listers.NebulaCALister
45 caSynced cache.InformerSynced
giolekva5ebab802021-10-07 21:50:34 +040046 nodeLister listers.NebulaNodeLister
47 nodeSynced cache.InformerSynced
48 secretLister corev1listers.SecretLister
49 secretSynced cache.InformerSynced
giolekva96755fa2021-10-06 21:00:00 +040050 workqueue workqueue.RateLimitingInterface
51
52 nebulaCert string
53}
54
giolekva5ebab802021-10-07 21:50:34 +040055func NewCAController(kubeClient kubernetes.Interface,
56 nebulaClient clientset.Interface,
57 caInformer informers.NebulaCAInformer,
58 nodeInformer informers.NebulaNodeInformer,
59 secretInformer corev1informers.SecretInformer,
60 nebulaCert string) *CAController {
giolekva96755fa2021-10-06 21:00:00 +040061 c := &CAController{
62 kubeClient: kubeClient,
63 nebulaClient: nebulaClient,
giolekva5ebab802021-10-07 21:50:34 +040064 caLister: caInformer.Lister(),
65 caSynced: caInformer.Informer().HasSynced,
66 nodeLister: nodeInformer.Lister(),
67 nodeSynced: nodeInformer.Informer().HasSynced,
68 secretLister: secretInformer.Lister(),
69 secretSynced: secretInformer.Informer().HasSynced,
70 workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Nebula"),
giolekva96755fa2021-10-06 21:00:00 +040071 nebulaCert: nebulaCert,
72 }
73
giolekva5ebab802021-10-07 21:50:34 +040074 caInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
75 AddFunc: c.enqueueCA,
giolekva96755fa2021-10-06 21:00:00 +040076 UpdateFunc: func(_, o interface{}) {
giolekva5ebab802021-10-07 21:50:34 +040077 c.enqueueCA(o)
78 },
79 DeleteFunc: func(o interface{}) {
80 },
81 })
82 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
83 AddFunc: c.enqueueNode,
84 UpdateFunc: func(_, o interface{}) {
85 c.enqueueNode(o)
giolekva96755fa2021-10-06 21:00:00 +040086 },
87 DeleteFunc: func(o interface{}) {
88 },
89 })
90
91 return c
92}
93
giolekva5ebab802021-10-07 21:50:34 +040094func (c *CAController) enqueueCA(o interface{}) {
giolekva96755fa2021-10-06 21:00:00 +040095 var key string
96 var err error
97 if key, err = cache.MetaNamespaceKeyFunc(o); err != nil {
98 utilruntime.HandleError(err)
99 return
100 }
giolekva5ebab802021-10-07 21:50:34 +0400101 c.workqueue.Add(caRef{key})
102}
103
104func (c *CAController) enqueueNode(o interface{}) {
105 var key string
106 var err error
107 if key, err = cache.MetaNamespaceKeyFunc(o); err != nil {
108 utilruntime.HandleError(err)
109 return
110 }
111 c.workqueue.Add(nodeRef{key})
giolekva96755fa2021-10-06 21:00:00 +0400112}
113
114func (c *CAController) Run(workers int, stopCh <-chan struct{}) error {
115 defer utilruntime.HandleCrash()
116 defer c.workqueue.ShutDown()
117 klog.Info("Starting NebulaCA controller")
118 klog.Info("Waiting for informer caches to sync")
giolekva5ebab802021-10-07 21:50:34 +0400119 if ok := cache.WaitForCacheSync(stopCh, c.caSynced, c.nodeSynced, c.secretSynced); !ok {
giolekva96755fa2021-10-06 21:00:00 +0400120 return fmt.Errorf("Failed to wait for caches to sync")
121 }
122 fmt.Println("Starting workers")
123 for i := 0; i < workers; i++ {
124 go wait.Until(c.runWorker, time.Second, stopCh)
125 }
126 fmt.Println("Started workers")
127 <-stopCh
128 fmt.Println("Shutting down workers")
129 return nil
130}
131
132func (c *CAController) runWorker() {
133 for c.processNextWorkItem() {
134 }
135}
136
137func (c *CAController) processNextWorkItem() bool {
138 o, shutdown := c.workqueue.Get()
139 if shutdown {
140 return false
141 }
142 err := func(o interface{}) error {
143 defer c.workqueue.Done(o)
giolekva5ebab802021-10-07 21:50:34 +0400144 if ref, ok := o.(caRef); ok {
145 if err := c.processCAWithKey(ref.key); err != nil {
146 c.workqueue.AddRateLimited(ref)
147 return fmt.Errorf("Error syncing '%s': %s, requeuing", ref.key, err.Error())
148 }
149 fmt.Printf("Successfully synced CA '%s'\n", ref.key)
150 } else if ref, ok := o.(nodeRef); ok {
151 if err := c.processNodeWithKey(ref.key); err != nil {
152 c.workqueue.AddRateLimited(ref)
153 return fmt.Errorf("Error syncing '%s': %s, requeuing", ref.key, err.Error())
154 }
155 fmt.Printf("Successfully synced Node '%s'\n", ref.key)
156 } else {
giolekva96755fa2021-10-06 21:00:00 +0400157 c.workqueue.Forget(o)
giolekva5ebab802021-10-07 21:50:34 +0400158 utilruntime.HandleError(fmt.Errorf("expected reference in workqueue but got %#v", o))
giolekva96755fa2021-10-06 21:00:00 +0400159 return nil
160 }
giolekva96755fa2021-10-06 21:00:00 +0400161 c.workqueue.Forget(o)
giolekva96755fa2021-10-06 21:00:00 +0400162 return nil
163 }(o)
164 if err != nil {
165 utilruntime.HandleError(err)
166 return true
167 }
168 return true
169}
170
171func (c *CAController) processCAWithKey(key string) error {
172 namespace, name, err := cache.SplitMetaNamespaceKey(key)
173 if err != nil {
174 return nil
175 }
176 ca, err := c.getCA(namespace, name)
177 if err != nil {
178 panic(err)
179 }
180 if ca.Status.State == nebulav1.NebulaCAStateReady {
181 fmt.Printf("%s CA is already in Ready state\n", ca.Name)
182 return nil
183 }
184 keyDir, err := generateCAKey(ca.Spec.CAName, c.nebulaCert)
185 if err != nil {
186 panic(err)
187 }
188 defer os.RemoveAll(keyDir)
189 secret, err := createSecretFromDir(keyDir)
190 if err != nil {
191 panic(err)
192 }
193 secret.Immutable = &secretImmutable
194 secret.Name = ca.Spec.SecretName
195 _, err = c.kubeClient.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
196 if err != nil {
197 panic(err)
198 }
giolekva5ebab802021-10-07 21:50:34 +0400199 err = c.updateCAStatus(ca, nebulav1.NebulaCAStateReady, "Generated credentials")
giolekva96755fa2021-10-06 21:00:00 +0400200 if err != nil {
201 panic(err)
202 }
203 return nil
204}
205
giolekva5ebab802021-10-07 21:50:34 +0400206func (c *CAController) processNodeWithKey(key string) error {
207 namespace, name, err := cache.SplitMetaNamespaceKey(key)
208 if err != nil {
209 return nil
210 }
211 node, err := c.getNode(namespace, name)
212 if err != nil {
213 panic(err)
214 }
215 if node.Status.State == nebulav1.NebulaNodeStateReady {
216 fmt.Printf("%s Node is already in Ready state\n", node.Name)
217 return nil
218 }
219 ca, err := c.getCA(namespace, node.Spec.CAName)
220 if ca.Status.State != nebulav1.NebulaCAStateReady {
221 return fmt.Errorf("Referenced CA %s is not ready yet.", node.Spec.CAName)
222 }
223 caSecret, err := c.getSecret(ca.Namespace, ca.Spec.SecretName)
224 if err != nil {
225 panic(err)
226 }
227 dir, err := extractSecret(caSecret)
228 if err != nil {
229 panic(err)
230 }
231 if err := generateNodeKey(node.Spec.NodeName, node.Spec.IPCidr, dir, c.nebulaCert); err != nil {
232 panic(err)
233 }
234 defer os.RemoveAll(dir)
235 if err := os.Remove(filepath.Join(dir, "ca.key")); err != nil {
236 panic(err)
237 }
238 if err := os.Remove(filepath.Join(dir, "ca.png")); err != nil {
239 panic(err)
240 }
241 secret, err := createSecretFromDir(dir)
242 if err != nil {
243 panic(err)
244 }
245 secret.Immutable = &secretImmutable
246 secret.Name = node.Spec.SecretName
247 _, err = c.kubeClient.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
248 if err != nil {
249 panic(err)
250 }
251 err = c.updateNodeStatus(node, nebulav1.NebulaNodeStateReady, "Generated credentials")
252 if err != nil {
253 panic(err)
254 }
255 return nil
256}
257
258func (c *CAController) updateCAStatus(ca *nebulav1.NebulaCA, state nebulav1.NebulaCAState, msg string) error {
giolekva96755fa2021-10-06 21:00:00 +0400259 cp := ca.DeepCopy()
260 cp.Status.State = state
261 cp.Status.Message = msg
262 _, err := c.nebulaClient.LekvaV1().NebulaCAs(cp.Namespace).UpdateStatus(context.TODO(), cp, metav1.UpdateOptions{})
263 return err
264}
265
giolekva5ebab802021-10-07 21:50:34 +0400266func (c *CAController) updateNodeStatus(node *nebulav1.NebulaNode, state nebulav1.NebulaNodeState, msg string) error {
267 cp := node.DeepCopy()
268 cp.Status.State = state
269 cp.Status.Message = msg
270 _, err := c.nebulaClient.LekvaV1().NebulaNodes(cp.Namespace).UpdateStatus(context.TODO(), cp, metav1.UpdateOptions{})
271 return err
272}
273
giolekva96755fa2021-10-06 21:00:00 +0400274func createSecretFromDir(path string) (*corev1.Secret, error) {
275 all, err := ioutil.ReadDir(path)
276 if err != nil {
277 return nil, err
278 }
279 secret := &corev1.Secret{
280 Data: make(map[string][]byte),
281 }
282 for _, f := range all {
283 if f.IsDir() {
284 continue
285 }
286 d, err := ioutil.ReadFile(filepath.Join(path, f.Name()))
287 if err != nil {
288 return nil, err
289 }
290 secret.Data[f.Name()] = d
291 }
292 return secret, nil
293}
294
giolekva5ebab802021-10-07 21:50:34 +0400295func extractSecret(secret *corev1.Secret) (string, error) {
296 tmp, err := os.MkdirTemp("", secret.Name)
297 if err != nil {
298 return "", err
299 }
300 for name, data := range secret.Data {
301 if err := ioutil.WriteFile(filepath.Join(tmp, name), data, 0644); err != nil {
302 defer os.RemoveAll(tmp)
303 return "", nil
304 }
305 }
306 return tmp, nil
307}
308
giolekva96755fa2021-10-06 21:00:00 +0400309func generateCAKey(name, nebulaCert string) (string, error) {
310 tmp, err := os.MkdirTemp("", name)
311 if err != nil {
312 return "", err
313 }
giolekva96755fa2021-10-06 21:00:00 +0400314 cmd := exec.Command(nebulaCert, "ca",
315 "-name", name,
316 "-out-key", filepath.Join(tmp, "ca.key"),
317 "-out-crt", filepath.Join(tmp, "ca.crt"),
318 "-out-qr", filepath.Join(tmp, "ca.png"))
319 if err := cmd.Run(); err != nil {
320 return "", err
321 }
322 return tmp, nil
323}
324
giolekva5ebab802021-10-07 21:50:34 +0400325func generateNodeKey(name, ip, dir, nebulaCert string) error {
326 cmd := exec.Command(nebulaCert, "sign",
327 "-ca-crt", filepath.Join(dir, "ca.crt"),
328 "-ca-key", filepath.Join(dir, "ca.key"),
329 "-name", name,
330 "-ip", ip,
331 "-out-key", filepath.Join(dir, "host.key"),
332 "-out-crt", filepath.Join(dir, "host.crt"),
333 "-out-qr", filepath.Join(dir, "host.png"))
334 if d, err := cmd.CombinedOutput(); err != nil {
335 return fmt.Errorf(string(d))
336 }
337 return nil
338}
339
giolekva96755fa2021-10-06 21:00:00 +0400340func (c *CAController) getCA(namespace, name string) (*nebulav1.NebulaCA, error) {
341 s := labels.NewSelector()
342 r, err := labels.NewRequirement("metadata.namespace", selection.Equals, []string{namespace})
343 if err != nil {
344 panic(err)
345 }
346 r1, err := labels.NewRequirement("metadata.name", selection.Equals, []string{name})
347 if err != nil {
348 panic(err)
349 }
350 s.Add(*r, *r1)
351 ncas, err := c.caLister.List(s)
352 if err != nil {
353 panic(err)
354 }
355 if len(ncas) != 1 {
356 panic("err")
357 }
358 return ncas[0], nil
359}
giolekva5ebab802021-10-07 21:50:34 +0400360
361func (c *CAController) getNode(namespace, name string) (*nebulav1.NebulaNode, error) {
362 s := labels.NewSelector()
363 r, err := labels.NewRequirement("metadata.namespace", selection.Equals, []string{namespace})
364 if err != nil {
365 panic(err)
366 }
367 r1, err := labels.NewRequirement("metadata.name", selection.Equals, []string{name})
368 if err != nil {
369 panic(err)
370 }
371 s.Add(*r, *r1)
372 nodes, err := c.nodeLister.List(s)
373 if err != nil {
374 panic(err)
375 }
376 if len(nodes) != 1 {
377 panic("err")
378 }
379 return nodes[0], nil
380}
381
382func (c *CAController) getSecret(namespace, name string) (*corev1.Secret, error) {
383 return c.secretLister.Secrets(namespace).Get(name)
384}