| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 1 | package controllers |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "fmt" |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 6 | "time" |
| 7 | |
| 8 | corev1 "k8s.io/api/core/v1" |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 9 | "k8s.io/apimachinery/pkg/api/errors" |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 10 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 11 | utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| 12 | "k8s.io/apimachinery/pkg/util/wait" |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 13 | corev1informers "k8s.io/client-go/informers/core/v1" |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 14 | "k8s.io/client-go/kubernetes" |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 15 | corev1listers "k8s.io/client-go/listers/core/v1" |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 16 | "k8s.io/client-go/tools/cache" |
| 17 | "k8s.io/client-go/util/workqueue" |
| 18 | "k8s.io/klog/v2" |
| 19 | |
| giolekva | c6859b0 | 2021-12-09 18:40:51 +0400 | [diff] [blame] | 20 | nebulav1 "github.com/giolekva/pcloud/core/nebula/controller/apis/nebula/v1" |
| 21 | clientset "github.com/giolekva/pcloud/core/nebula/controller/generated/clientset/versioned" |
| 22 | informers "github.com/giolekva/pcloud/core/nebula/controller/generated/informers/externalversions/nebula/v1" |
| 23 | listers "github.com/giolekva/pcloud/core/nebula/controller/generated/listers/nebula/v1" |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 24 | ) |
| 25 | |
| 26 | var secretImmutable = true |
| 27 | |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 28 | type caRef struct { |
| 29 | key string |
| 30 | } |
| 31 | |
| 32 | type nodeRef struct { |
| 33 | key string |
| 34 | } |
| 35 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 36 | type NebulaController struct { |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 37 | kubeClient kubernetes.Interface |
| 38 | nebulaClient clientset.Interface |
| 39 | caLister listers.NebulaCALister |
| 40 | caSynced cache.InformerSynced |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 41 | nodeLister listers.NebulaNodeLister |
| 42 | nodeSynced cache.InformerSynced |
| 43 | secretLister corev1listers.SecretLister |
| 44 | secretSynced cache.InformerSynced |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 45 | workqueue workqueue.RateLimitingInterface |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 46 | } |
| 47 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 48 | func NewNebulaController(kubeClient kubernetes.Interface, |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 49 | nebulaClient clientset.Interface, |
| 50 | caInformer informers.NebulaCAInformer, |
| 51 | nodeInformer informers.NebulaNodeInformer, |
| giolekva | 6bb21c2 | 2021-12-29 21:31:08 +0400 | [diff] [blame^] | 52 | secretInformer corev1informers.SecretInformer) *NebulaController { |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 53 | c := &NebulaController{ |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 54 | kubeClient: kubeClient, |
| 55 | nebulaClient: nebulaClient, |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 56 | caLister: caInformer.Lister(), |
| 57 | caSynced: caInformer.Informer().HasSynced, |
| 58 | nodeLister: nodeInformer.Lister(), |
| 59 | nodeSynced: nodeInformer.Informer().HasSynced, |
| 60 | secretLister: secretInformer.Lister(), |
| 61 | secretSynced: secretInformer.Informer().HasSynced, |
| 62 | workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Nebula"), |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 63 | } |
| 64 | |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 65 | caInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| 66 | AddFunc: c.enqueueCA, |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 67 | UpdateFunc: func(_, o interface{}) { |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 68 | c.enqueueCA(o) |
| 69 | }, |
| 70 | DeleteFunc: func(o interface{}) { |
| 71 | }, |
| 72 | }) |
| 73 | nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| 74 | AddFunc: c.enqueueNode, |
| 75 | UpdateFunc: func(_, o interface{}) { |
| 76 | c.enqueueNode(o) |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 77 | }, |
| 78 | DeleteFunc: func(o interface{}) { |
| 79 | }, |
| 80 | }) |
| 81 | |
| 82 | return c |
| 83 | } |
| 84 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 85 | func (c *NebulaController) enqueueCA(o interface{}) { |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 86 | var key string |
| 87 | var err error |
| 88 | if key, err = cache.MetaNamespaceKeyFunc(o); err != nil { |
| 89 | utilruntime.HandleError(err) |
| 90 | return |
| 91 | } |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 92 | c.workqueue.Add(caRef{key}) |
| 93 | } |
| 94 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 95 | func (c *NebulaController) enqueueNode(o interface{}) { |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 96 | var key string |
| 97 | var err error |
| 98 | if key, err = cache.MetaNamespaceKeyFunc(o); err != nil { |
| 99 | utilruntime.HandleError(err) |
| 100 | return |
| 101 | } |
| 102 | c.workqueue.Add(nodeRef{key}) |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 103 | } |
| 104 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 105 | func (c *NebulaController) Run(workers int, stopCh <-chan struct{}) error { |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 106 | defer utilruntime.HandleCrash() |
| 107 | defer c.workqueue.ShutDown() |
| 108 | klog.Info("Starting NebulaCA controller") |
| 109 | klog.Info("Waiting for informer caches to sync") |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 110 | if ok := cache.WaitForCacheSync(stopCh, c.caSynced, c.nodeSynced, c.secretSynced); !ok { |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 111 | return fmt.Errorf("Failed to wait for caches to sync") |
| 112 | } |
| 113 | fmt.Println("Starting workers") |
| 114 | for i := 0; i < workers; i++ { |
| 115 | go wait.Until(c.runWorker, time.Second, stopCh) |
| 116 | } |
| 117 | fmt.Println("Started workers") |
| 118 | <-stopCh |
| 119 | fmt.Println("Shutting down workers") |
| 120 | return nil |
| 121 | } |
| 122 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 123 | func (c *NebulaController) runWorker() { |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 124 | for c.processNextWorkItem() { |
| 125 | } |
| 126 | } |
| 127 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 128 | func (c *NebulaController) processNextWorkItem() bool { |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 129 | o, shutdown := c.workqueue.Get() |
| 130 | if shutdown { |
| 131 | return false |
| 132 | } |
| 133 | err := func(o interface{}) error { |
| 134 | defer c.workqueue.Done(o) |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 135 | if ref, ok := o.(caRef); ok { |
| 136 | if err := c.processCAWithKey(ref.key); err != nil { |
| 137 | c.workqueue.AddRateLimited(ref) |
| 138 | return fmt.Errorf("Error syncing '%s': %s, requeuing", ref.key, err.Error()) |
| 139 | } |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 140 | c.workqueue.Forget(o) |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 141 | fmt.Printf("Successfully synced CA '%s'\n", ref.key) |
| 142 | } else if ref, ok := o.(nodeRef); ok { |
| 143 | if err := c.processNodeWithKey(ref.key); err != nil { |
| 144 | c.workqueue.AddRateLimited(ref) |
| 145 | return fmt.Errorf("Error syncing '%s': %s, requeuing", ref.key, err.Error()) |
| 146 | } |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 147 | c.workqueue.Forget(o) |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 148 | fmt.Printf("Successfully synced Node '%s'\n", ref.key) |
| 149 | } else { |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 150 | c.workqueue.Forget(o) |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 151 | utilruntime.HandleError(fmt.Errorf("expected reference in workqueue but got %#v", o)) |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 152 | return nil |
| 153 | } |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 154 | c.workqueue.Forget(o) |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 155 | return nil |
| 156 | }(o) |
| 157 | if err != nil { |
| 158 | utilruntime.HandleError(err) |
| 159 | return true |
| 160 | } |
| 161 | return true |
| 162 | } |
| 163 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 164 | func (c *NebulaController) processCAWithKey(key string) error { |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 165 | namespace, name, err := cache.SplitMetaNamespaceKey(key) |
| 166 | if err != nil { |
| 167 | return nil |
| 168 | } |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 169 | ca, err := c.caLister.NebulaCAs(namespace).Get(name) |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 170 | if err != nil { |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 171 | if errors.IsNotFound(err) { |
| 172 | utilruntime.HandleError(fmt.Errorf("CA '%s' in work queue no longer exists", key)) |
| 173 | return nil |
| 174 | } |
| 175 | return err |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 176 | } |
| 177 | if ca.Status.State == nebulav1.NebulaCAStateReady { |
| 178 | fmt.Printf("%s CA is already in Ready state\n", ca.Name) |
| 179 | return nil |
| 180 | } |
| giolekva | 6bb21c2 | 2021-12-29 21:31:08 +0400 | [diff] [blame^] | 181 | privKey, cert, err := CreateCertificateAuthority(apiAddr(ca.Name), ca.Name) |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 182 | if err != nil { |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 183 | return err |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 184 | } |
| giolekva | 6bb21c2 | 2021-12-29 21:31:08 +0400 | [diff] [blame^] | 185 | secret := &corev1.Secret{ |
| 186 | ObjectMeta: metav1.ObjectMeta{ |
| 187 | Name: ca.Spec.SecretName, |
| 188 | }, |
| 189 | Immutable: &secretImmutable, |
| 190 | Data: map[string][]byte{ |
| 191 | "ca.key": privKey, |
| 192 | "ca.crt": cert, |
| 193 | }, |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 194 | } |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 195 | _, err = c.kubeClient.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) |
| 196 | if err != nil { |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 197 | return err |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 198 | } |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 199 | err = c.updateCAStatus(ca, nebulav1.NebulaCAStateReady, "Generated credentials") |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 200 | if err != nil { |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 201 | return err |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 202 | } |
| 203 | return nil |
| 204 | } |
| 205 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 206 | func (c *NebulaController) processNodeWithKey(key string) error { |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 207 | namespace, name, err := cache.SplitMetaNamespaceKey(key) |
| 208 | if err != nil { |
| 209 | return nil |
| 210 | } |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 211 | node, err := c.nodeLister.NebulaNodes(namespace).Get(name) |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 212 | if err != nil { |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 213 | if errors.IsNotFound(err) { |
| 214 | utilruntime.HandleError(fmt.Errorf("NebulaNode '%s' in work queue no longer exists", key)) |
| 215 | return nil |
| 216 | } |
| 217 | return err |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 218 | } |
| 219 | if node.Status.State == nebulav1.NebulaNodeStateReady { |
| 220 | fmt.Printf("%s Node is already in Ready state\n", node.Name) |
| 221 | return nil |
| 222 | } |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 223 | ca, err := c.caLister.NebulaCAs(node.Spec.CANamespace).Get(node.Spec.CAName) |
| 224 | if err != nil { |
| 225 | return err |
| 226 | } |
| 227 | if ca.Status.State != nebulav1.NebulaCAStateReady { |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 228 | return fmt.Errorf("Referenced CA %s is not ready yet.", node.Spec.CAName) |
| 229 | } |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 230 | caSecret, err := c.secretLister.Secrets(ca.Namespace).Get(ca.Spec.SecretName) |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 231 | if err != nil { |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 232 | if errors.IsNotFound(err) { |
| 233 | c.updateNodeStatus(node, nebulav1.NebulaNodeStateError, "Could not find CA secret") |
| 234 | } |
| 235 | return err |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 236 | } |
| giolekva | 6bb21c2 | 2021-12-29 21:31:08 +0400 | [diff] [blame^] | 237 | var pubKey []byte |
| 238 | if node.Spec.PubKey != "" { |
| 239 | pubKey = []byte(node.Spec.PubKey) |
| 240 | } |
| 241 | privKey, nodeCert, err := SignNebulaNode(apiAddr(ca.Name), caSecret.Data["ca.key"], caSecret.Data["ca.crt"], node.Name, pubKey, node.Spec.IPCidr) |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 242 | if err != nil { |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 243 | return err |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 244 | } |
| giolekva | 6bb21c2 | 2021-12-29 21:31:08 +0400 | [diff] [blame^] | 245 | secret := &corev1.Secret{ |
| 246 | ObjectMeta: metav1.ObjectMeta{ |
| 247 | Name: node.Spec.SecretName, |
| 248 | }, |
| 249 | Immutable: &secretImmutable, |
| 250 | Data: map[string][]byte{ |
| 251 | "ca.crt": caSecret.Data["ca.crt"], |
| 252 | "host.crt": nodeCert, |
| 253 | "host.key": privKey, |
| 254 | }, |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 255 | } |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 256 | _, err = c.kubeClient.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) |
| 257 | if err != nil { |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 258 | return err |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 259 | } |
| 260 | err = c.updateNodeStatus(node, nebulav1.NebulaNodeStateReady, "Generated credentials") |
| 261 | if err != nil { |
| giolekva | 7e73ba7 | 2021-12-03 13:14:20 +0400 | [diff] [blame] | 262 | return err |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 263 | } |
| 264 | return nil |
| 265 | } |
| 266 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 267 | func (c *NebulaController) updateCAStatus(ca *nebulav1.NebulaCA, state nebulav1.NebulaCAState, msg string) error { |
| giolekva | 96755fa | 2021-10-06 21:00:00 +0400 | [diff] [blame] | 268 | cp := ca.DeepCopy() |
| 269 | cp.Status.State = state |
| 270 | cp.Status.Message = msg |
| 271 | _, err := c.nebulaClient.LekvaV1().NebulaCAs(cp.Namespace).UpdateStatus(context.TODO(), cp, metav1.UpdateOptions{}) |
| 272 | return err |
| 273 | } |
| 274 | |
| giolekva | 695960b | 2021-10-07 22:00:29 +0400 | [diff] [blame] | 275 | func (c *NebulaController) updateNodeStatus(node *nebulav1.NebulaNode, state nebulav1.NebulaNodeState, msg string) error { |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 276 | cp := node.DeepCopy() |
| 277 | cp.Status.State = state |
| 278 | cp.Status.Message = msg |
| 279 | _, err := c.nebulaClient.LekvaV1().NebulaNodes(cp.Namespace).UpdateStatus(context.TODO(), cp, metav1.UpdateOptions{}) |
| 280 | return err |
| 281 | } |
| 282 | |
| giolekva | 6bb21c2 | 2021-12-29 21:31:08 +0400 | [diff] [blame^] | 283 | // TODO(giolekva): maybe pass by flag? |
| 284 | func apiAddr(ca string) string { |
| 285 | return fmt.Sprintf("http://nebula-api.%s-ingress-private.svc.cluster.local", ca) |
| giolekva | 5ebab80 | 2021-10-07 21:50:34 +0400 | [diff] [blame] | 286 | } |