blob: 37b38562547d596f8c9ef3e973a90c88242a2e96 [file] [log] [blame]
giolekva96755fa2021-10-06 21:00:00 +04001package controllers
2
3import (
4 "context"
5 "fmt"
6 "io/ioutil"
7 "os"
8 "os/exec"
9 "path/filepath"
10 "time"
11
12 corev1 "k8s.io/api/core/v1"
giolekva7e73ba72021-12-03 13:14:20 +040013 "k8s.io/apimachinery/pkg/api/errors"
giolekva96755fa2021-10-06 21:00:00 +040014 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
giolekva96755fa2021-10-06 21:00:00 +040015 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16 "k8s.io/apimachinery/pkg/util/wait"
giolekva5ebab802021-10-07 21:50:34 +040017 corev1informers "k8s.io/client-go/informers/core/v1"
giolekva96755fa2021-10-06 21:00:00 +040018 "k8s.io/client-go/kubernetes"
giolekva5ebab802021-10-07 21:50:34 +040019 corev1listers "k8s.io/client-go/listers/core/v1"
giolekva96755fa2021-10-06 21:00:00 +040020 "k8s.io/client-go/tools/cache"
21 "k8s.io/client-go/util/workqueue"
22 "k8s.io/klog/v2"
23
giolekvac6859b02021-12-09 18:40:51 +040024 nebulav1 "github.com/giolekva/pcloud/core/nebula/controller/apis/nebula/v1"
25 clientset "github.com/giolekva/pcloud/core/nebula/controller/generated/clientset/versioned"
26 informers "github.com/giolekva/pcloud/core/nebula/controller/generated/informers/externalversions/nebula/v1"
27 listers "github.com/giolekva/pcloud/core/nebula/controller/generated/listers/nebula/v1"
giolekva96755fa2021-10-06 21:00:00 +040028)
29
30var secretImmutable = true
31
giolekva5ebab802021-10-07 21:50:34 +040032type caRef struct {
33 key string
34}
35
36type nodeRef struct {
37 key string
38}
39
giolekva695960b2021-10-07 22:00:29 +040040type NebulaController struct {
giolekva96755fa2021-10-06 21:00:00 +040041 kubeClient kubernetes.Interface
42 nebulaClient clientset.Interface
43 caLister listers.NebulaCALister
44 caSynced cache.InformerSynced
giolekva5ebab802021-10-07 21:50:34 +040045 nodeLister listers.NebulaNodeLister
46 nodeSynced cache.InformerSynced
47 secretLister corev1listers.SecretLister
48 secretSynced cache.InformerSynced
giolekva96755fa2021-10-06 21:00:00 +040049 workqueue workqueue.RateLimitingInterface
50
51 nebulaCert string
52}
53
giolekva695960b2021-10-07 22:00:29 +040054func NewNebulaController(kubeClient kubernetes.Interface,
giolekva5ebab802021-10-07 21:50:34 +040055 nebulaClient clientset.Interface,
56 caInformer informers.NebulaCAInformer,
57 nodeInformer informers.NebulaNodeInformer,
58 secretInformer corev1informers.SecretInformer,
giolekva695960b2021-10-07 22:00:29 +040059 nebulaCert string) *NebulaController {
60 c := &NebulaController{
giolekva96755fa2021-10-06 21:00:00 +040061 kubeClient: kubeClient,
62 nebulaClient: nebulaClient,
giolekva5ebab802021-10-07 21:50:34 +040063 caLister: caInformer.Lister(),
64 caSynced: caInformer.Informer().HasSynced,
65 nodeLister: nodeInformer.Lister(),
66 nodeSynced: nodeInformer.Informer().HasSynced,
67 secretLister: secretInformer.Lister(),
68 secretSynced: secretInformer.Informer().HasSynced,
69 workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Nebula"),
giolekva96755fa2021-10-06 21:00:00 +040070 nebulaCert: nebulaCert,
71 }
72
giolekva5ebab802021-10-07 21:50:34 +040073 caInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
74 AddFunc: c.enqueueCA,
giolekva96755fa2021-10-06 21:00:00 +040075 UpdateFunc: func(_, o interface{}) {
giolekva5ebab802021-10-07 21:50:34 +040076 c.enqueueCA(o)
77 },
78 DeleteFunc: func(o interface{}) {
79 },
80 })
81 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
82 AddFunc: c.enqueueNode,
83 UpdateFunc: func(_, o interface{}) {
84 c.enqueueNode(o)
giolekva96755fa2021-10-06 21:00:00 +040085 },
86 DeleteFunc: func(o interface{}) {
87 },
88 })
89
90 return c
91}
92
giolekva695960b2021-10-07 22:00:29 +040093func (c *NebulaController) enqueueCA(o interface{}) {
giolekva96755fa2021-10-06 21:00:00 +040094 var key string
95 var err error
96 if key, err = cache.MetaNamespaceKeyFunc(o); err != nil {
97 utilruntime.HandleError(err)
98 return
99 }
giolekva5ebab802021-10-07 21:50:34 +0400100 c.workqueue.Add(caRef{key})
101}
102
giolekva695960b2021-10-07 22:00:29 +0400103func (c *NebulaController) enqueueNode(o interface{}) {
giolekva5ebab802021-10-07 21:50:34 +0400104 var key string
105 var err error
106 if key, err = cache.MetaNamespaceKeyFunc(o); err != nil {
107 utilruntime.HandleError(err)
108 return
109 }
110 c.workqueue.Add(nodeRef{key})
giolekva96755fa2021-10-06 21:00:00 +0400111}
112
giolekva695960b2021-10-07 22:00:29 +0400113func (c *NebulaController) Run(workers int, stopCh <-chan struct{}) error {
giolekva96755fa2021-10-06 21:00:00 +0400114 defer utilruntime.HandleCrash()
115 defer c.workqueue.ShutDown()
116 klog.Info("Starting NebulaCA controller")
117 klog.Info("Waiting for informer caches to sync")
giolekva5ebab802021-10-07 21:50:34 +0400118 if ok := cache.WaitForCacheSync(stopCh, c.caSynced, c.nodeSynced, c.secretSynced); !ok {
giolekva96755fa2021-10-06 21:00:00 +0400119 return fmt.Errorf("Failed to wait for caches to sync")
120 }
121 fmt.Println("Starting workers")
122 for i := 0; i < workers; i++ {
123 go wait.Until(c.runWorker, time.Second, stopCh)
124 }
125 fmt.Println("Started workers")
126 <-stopCh
127 fmt.Println("Shutting down workers")
128 return nil
129}
130
giolekva695960b2021-10-07 22:00:29 +0400131func (c *NebulaController) runWorker() {
giolekva96755fa2021-10-06 21:00:00 +0400132 for c.processNextWorkItem() {
133 }
134}
135
giolekva695960b2021-10-07 22:00:29 +0400136func (c *NebulaController) processNextWorkItem() bool {
giolekva96755fa2021-10-06 21:00:00 +0400137 o, shutdown := c.workqueue.Get()
138 if shutdown {
139 return false
140 }
141 err := func(o interface{}) error {
142 defer c.workqueue.Done(o)
giolekva5ebab802021-10-07 21:50:34 +0400143 if ref, ok := o.(caRef); ok {
144 if err := c.processCAWithKey(ref.key); err != nil {
145 c.workqueue.AddRateLimited(ref)
146 return fmt.Errorf("Error syncing '%s': %s, requeuing", ref.key, err.Error())
147 }
giolekva7e73ba72021-12-03 13:14:20 +0400148 c.workqueue.Forget(o)
giolekva5ebab802021-10-07 21:50:34 +0400149 fmt.Printf("Successfully synced CA '%s'\n", ref.key)
150 } else if ref, ok := o.(nodeRef); ok {
151 if err := c.processNodeWithKey(ref.key); err != nil {
152 c.workqueue.AddRateLimited(ref)
153 return fmt.Errorf("Error syncing '%s': %s, requeuing", ref.key, err.Error())
154 }
giolekva7e73ba72021-12-03 13:14:20 +0400155 c.workqueue.Forget(o)
giolekva5ebab802021-10-07 21:50:34 +0400156 fmt.Printf("Successfully synced Node '%s'\n", ref.key)
157 } else {
giolekva96755fa2021-10-06 21:00:00 +0400158 c.workqueue.Forget(o)
giolekva5ebab802021-10-07 21:50:34 +0400159 utilruntime.HandleError(fmt.Errorf("expected reference in workqueue but got %#v", o))
giolekva96755fa2021-10-06 21:00:00 +0400160 return nil
161 }
giolekva96755fa2021-10-06 21:00:00 +0400162 c.workqueue.Forget(o)
giolekva96755fa2021-10-06 21:00:00 +0400163 return nil
164 }(o)
165 if err != nil {
166 utilruntime.HandleError(err)
167 return true
168 }
169 return true
170}
171
giolekva695960b2021-10-07 22:00:29 +0400172func (c *NebulaController) processCAWithKey(key string) error {
giolekva96755fa2021-10-06 21:00:00 +0400173 namespace, name, err := cache.SplitMetaNamespaceKey(key)
174 if err != nil {
175 return nil
176 }
giolekva7e73ba72021-12-03 13:14:20 +0400177 ca, err := c.caLister.NebulaCAs(namespace).Get(name)
giolekva96755fa2021-10-06 21:00:00 +0400178 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400179 if errors.IsNotFound(err) {
180 utilruntime.HandleError(fmt.Errorf("CA '%s' in work queue no longer exists", key))
181 return nil
182 }
183 return err
giolekva96755fa2021-10-06 21:00:00 +0400184 }
185 if ca.Status.State == nebulav1.NebulaCAStateReady {
186 fmt.Printf("%s CA is already in Ready state\n", ca.Name)
187 return nil
188 }
giolekva4b2934b2021-10-08 19:37:12 +0400189 keyDir, err := generateCAKey(ca.Name, c.nebulaCert)
giolekva96755fa2021-10-06 21:00:00 +0400190 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400191 return err
giolekva96755fa2021-10-06 21:00:00 +0400192 }
193 defer os.RemoveAll(keyDir)
194 secret, err := createSecretFromDir(keyDir)
195 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400196 return err
giolekva96755fa2021-10-06 21:00:00 +0400197 }
198 secret.Immutable = &secretImmutable
199 secret.Name = ca.Spec.SecretName
200 _, err = c.kubeClient.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
201 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400202 return err
giolekva96755fa2021-10-06 21:00:00 +0400203 }
giolekva5ebab802021-10-07 21:50:34 +0400204 err = c.updateCAStatus(ca, nebulav1.NebulaCAStateReady, "Generated credentials")
giolekva96755fa2021-10-06 21:00:00 +0400205 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400206 return err
giolekva96755fa2021-10-06 21:00:00 +0400207 }
208 return nil
209}
210
giolekva695960b2021-10-07 22:00:29 +0400211func (c *NebulaController) processNodeWithKey(key string) error {
giolekva5ebab802021-10-07 21:50:34 +0400212 namespace, name, err := cache.SplitMetaNamespaceKey(key)
213 if err != nil {
214 return nil
215 }
giolekva7e73ba72021-12-03 13:14:20 +0400216 node, err := c.nodeLister.NebulaNodes(namespace).Get(name)
giolekva5ebab802021-10-07 21:50:34 +0400217 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400218 if errors.IsNotFound(err) {
219 utilruntime.HandleError(fmt.Errorf("NebulaNode '%s' in work queue no longer exists", key))
220 return nil
221 }
222 return err
giolekva5ebab802021-10-07 21:50:34 +0400223 }
224 if node.Status.State == nebulav1.NebulaNodeStateReady {
225 fmt.Printf("%s Node is already in Ready state\n", node.Name)
226 return nil
227 }
giolekva7e73ba72021-12-03 13:14:20 +0400228 ca, err := c.caLister.NebulaCAs(node.Spec.CANamespace).Get(node.Spec.CAName)
229 if err != nil {
230 return err
231 }
232 if ca.Status.State != nebulav1.NebulaCAStateReady {
giolekva5ebab802021-10-07 21:50:34 +0400233 return fmt.Errorf("Referenced CA %s is not ready yet.", node.Spec.CAName)
234 }
giolekva7e73ba72021-12-03 13:14:20 +0400235 caSecret, err := c.secretLister.Secrets(ca.Namespace).Get(ca.Spec.SecretName)
giolekva5ebab802021-10-07 21:50:34 +0400236 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400237 if errors.IsNotFound(err) {
238 c.updateNodeStatus(node, nebulav1.NebulaNodeStateError, "Could not find CA secret")
239 }
240 return err
giolekva5ebab802021-10-07 21:50:34 +0400241 }
242 dir, err := extractSecret(caSecret)
243 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400244 return err
giolekva5ebab802021-10-07 21:50:34 +0400245 }
giolekva4b2934b2021-10-08 19:37:12 +0400246 if node.Spec.PubKey == "" {
247 if err := generateNodeKey(node.Name, node.Spec.IPCidr, dir, c.nebulaCert); err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400248 return err
giolekva4b2934b2021-10-08 19:37:12 +0400249 }
250 } else {
251 if err := generateNodeKeyFromPub(node.Name, node.Spec.IPCidr, node.Spec.PubKey, dir, c.nebulaCert); err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400252 return err
giolekva4b2934b2021-10-08 19:37:12 +0400253 }
giolekva5ebab802021-10-07 21:50:34 +0400254 }
255 defer os.RemoveAll(dir)
256 if err := os.Remove(filepath.Join(dir, "ca.key")); err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400257 return err
giolekva5ebab802021-10-07 21:50:34 +0400258 }
259 if err := os.Remove(filepath.Join(dir, "ca.png")); err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400260 return err
giolekva5ebab802021-10-07 21:50:34 +0400261 }
262 secret, err := createSecretFromDir(dir)
263 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400264 return err
giolekva5ebab802021-10-07 21:50:34 +0400265 }
266 secret.Immutable = &secretImmutable
267 secret.Name = node.Spec.SecretName
268 _, err = c.kubeClient.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
269 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400270 return err
giolekva5ebab802021-10-07 21:50:34 +0400271 }
272 err = c.updateNodeStatus(node, nebulav1.NebulaNodeStateReady, "Generated credentials")
273 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400274 return err
giolekva5ebab802021-10-07 21:50:34 +0400275 }
276 return nil
277}
278
giolekva695960b2021-10-07 22:00:29 +0400279func (c *NebulaController) updateCAStatus(ca *nebulav1.NebulaCA, state nebulav1.NebulaCAState, msg string) error {
giolekva96755fa2021-10-06 21:00:00 +0400280 cp := ca.DeepCopy()
281 cp.Status.State = state
282 cp.Status.Message = msg
283 _, err := c.nebulaClient.LekvaV1().NebulaCAs(cp.Namespace).UpdateStatus(context.TODO(), cp, metav1.UpdateOptions{})
284 return err
285}
286
giolekva695960b2021-10-07 22:00:29 +0400287func (c *NebulaController) updateNodeStatus(node *nebulav1.NebulaNode, state nebulav1.NebulaNodeState, msg string) error {
giolekva5ebab802021-10-07 21:50:34 +0400288 cp := node.DeepCopy()
289 cp.Status.State = state
290 cp.Status.Message = msg
291 _, err := c.nebulaClient.LekvaV1().NebulaNodes(cp.Namespace).UpdateStatus(context.TODO(), cp, metav1.UpdateOptions{})
292 return err
293}
294
giolekva96755fa2021-10-06 21:00:00 +0400295func createSecretFromDir(path string) (*corev1.Secret, error) {
296 all, err := ioutil.ReadDir(path)
297 if err != nil {
298 return nil, err
299 }
300 secret := &corev1.Secret{
301 Data: make(map[string][]byte),
302 }
303 for _, f := range all {
304 if f.IsDir() {
305 continue
306 }
307 d, err := ioutil.ReadFile(filepath.Join(path, f.Name()))
308 if err != nil {
309 return nil, err
310 }
311 secret.Data[f.Name()] = d
312 }
313 return secret, nil
314}
315
giolekva5ebab802021-10-07 21:50:34 +0400316func extractSecret(secret *corev1.Secret) (string, error) {
317 tmp, err := os.MkdirTemp("", secret.Name)
318 if err != nil {
319 return "", err
320 }
321 for name, data := range secret.Data {
322 if err := ioutil.WriteFile(filepath.Join(tmp, name), data, 0644); err != nil {
323 defer os.RemoveAll(tmp)
giolekva7e73ba72021-12-03 13:14:20 +0400324 return "", err
giolekva5ebab802021-10-07 21:50:34 +0400325 }
326 }
327 return tmp, nil
328}
329
giolekva96755fa2021-10-06 21:00:00 +0400330func generateCAKey(name, nebulaCert string) (string, error) {
331 tmp, err := os.MkdirTemp("", name)
332 if err != nil {
333 return "", err
334 }
giolekva96755fa2021-10-06 21:00:00 +0400335 cmd := exec.Command(nebulaCert, "ca",
336 "-name", name,
337 "-out-key", filepath.Join(tmp, "ca.key"),
338 "-out-crt", filepath.Join(tmp, "ca.crt"),
339 "-out-qr", filepath.Join(tmp, "ca.png"))
giolekva4b2934b2021-10-08 19:37:12 +0400340 if d, err := cmd.CombinedOutput(); err != nil {
341 return "", fmt.Errorf(string(d))
giolekva96755fa2021-10-06 21:00:00 +0400342 }
343 return tmp, nil
344}
345
giolekva4b2934b2021-10-08 19:37:12 +0400346func generateNodeKeyFromPub(name, ip, pubKey, dir, nebulaCert string) error {
347 hostPub := filepath.Join(dir, "host.pub")
348 if err := ioutil.WriteFile(hostPub, []byte(pubKey), 0644); err != nil {
349 return err
350 }
351 defer os.Remove(hostPub)
352 cmd := exec.Command(nebulaCert, "sign",
353 "-ca-crt", filepath.Join(dir, "ca.crt"),
354 "-ca-key", filepath.Join(dir, "ca.key"),
355 "-name", name,
356 "-ip", ip,
357 "-in-pub", hostPub,
358 "-out-crt", filepath.Join(dir, "host.crt"),
359 "-out-qr", filepath.Join(dir, "host.png"))
giolekva7e73ba72021-12-03 13:14:20 +0400360 if _, err := cmd.CombinedOutput(); err != nil {
361 return err
giolekva4b2934b2021-10-08 19:37:12 +0400362 }
363 return nil
364}
365
giolekva5ebab802021-10-07 21:50:34 +0400366func generateNodeKey(name, ip, dir, nebulaCert string) error {
367 cmd := exec.Command(nebulaCert, "sign",
368 "-ca-crt", filepath.Join(dir, "ca.crt"),
369 "-ca-key", filepath.Join(dir, "ca.key"),
370 "-name", name,
371 "-ip", ip,
372 "-out-key", filepath.Join(dir, "host.key"),
373 "-out-crt", filepath.Join(dir, "host.crt"),
374 "-out-qr", filepath.Join(dir, "host.png"))
giolekva7e73ba72021-12-03 13:14:20 +0400375 if _, err := cmd.CombinedOutput(); err != nil {
376 return err
giolekva5ebab802021-10-07 21:50:34 +0400377 }
378 return nil
379}