blob: 9cc284b1b78d5f13f1ea08dfdc832650b4683599 [file] [log] [blame]
giolekva96755fa2021-10-06 21:00:00 +04001package controllers
2
3import (
4 "context"
5 "fmt"
giolekva96755fa2021-10-06 21:00:00 +04006 "time"
7
8 corev1 "k8s.io/api/core/v1"
giolekva7e73ba72021-12-03 13:14:20 +04009 "k8s.io/apimachinery/pkg/api/errors"
giolekva96755fa2021-10-06 21:00:00 +040010 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
giolekva96755fa2021-10-06 21:00:00 +040011 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
12 "k8s.io/apimachinery/pkg/util/wait"
giolekva5ebab802021-10-07 21:50:34 +040013 corev1informers "k8s.io/client-go/informers/core/v1"
giolekva96755fa2021-10-06 21:00:00 +040014 "k8s.io/client-go/kubernetes"
giolekva5ebab802021-10-07 21:50:34 +040015 corev1listers "k8s.io/client-go/listers/core/v1"
giolekva96755fa2021-10-06 21:00:00 +040016 "k8s.io/client-go/tools/cache"
17 "k8s.io/client-go/util/workqueue"
18 "k8s.io/klog/v2"
19
giolekvac6859b02021-12-09 18:40:51 +040020 nebulav1 "github.com/giolekva/pcloud/core/nebula/controller/apis/nebula/v1"
21 clientset "github.com/giolekva/pcloud/core/nebula/controller/generated/clientset/versioned"
22 informers "github.com/giolekva/pcloud/core/nebula/controller/generated/informers/externalversions/nebula/v1"
23 listers "github.com/giolekva/pcloud/core/nebula/controller/generated/listers/nebula/v1"
giolekva96755fa2021-10-06 21:00:00 +040024)
25
26var secretImmutable = true
27
giolekva5ebab802021-10-07 21:50:34 +040028type caRef struct {
29 key string
30}
31
32type nodeRef struct {
33 key string
34}
35
giolekva695960b2021-10-07 22:00:29 +040036type NebulaController struct {
giolekva96755fa2021-10-06 21:00:00 +040037 kubeClient kubernetes.Interface
38 nebulaClient clientset.Interface
39 caLister listers.NebulaCALister
40 caSynced cache.InformerSynced
giolekva5ebab802021-10-07 21:50:34 +040041 nodeLister listers.NebulaNodeLister
42 nodeSynced cache.InformerSynced
43 secretLister corev1listers.SecretLister
44 secretSynced cache.InformerSynced
giolekva96755fa2021-10-06 21:00:00 +040045 workqueue workqueue.RateLimitingInterface
giolekva96755fa2021-10-06 21:00:00 +040046}
47
giolekva695960b2021-10-07 22:00:29 +040048func NewNebulaController(kubeClient kubernetes.Interface,
giolekva5ebab802021-10-07 21:50:34 +040049 nebulaClient clientset.Interface,
50 caInformer informers.NebulaCAInformer,
51 nodeInformer informers.NebulaNodeInformer,
giolekva6bb21c22021-12-29 21:31:08 +040052 secretInformer corev1informers.SecretInformer) *NebulaController {
giolekva695960b2021-10-07 22:00:29 +040053 c := &NebulaController{
giolekva96755fa2021-10-06 21:00:00 +040054 kubeClient: kubeClient,
55 nebulaClient: nebulaClient,
giolekva5ebab802021-10-07 21:50:34 +040056 caLister: caInformer.Lister(),
57 caSynced: caInformer.Informer().HasSynced,
58 nodeLister: nodeInformer.Lister(),
59 nodeSynced: nodeInformer.Informer().HasSynced,
60 secretLister: secretInformer.Lister(),
61 secretSynced: secretInformer.Informer().HasSynced,
62 workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Nebula"),
giolekva96755fa2021-10-06 21:00:00 +040063 }
64
giolekva5ebab802021-10-07 21:50:34 +040065 caInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
66 AddFunc: c.enqueueCA,
giolekva96755fa2021-10-06 21:00:00 +040067 UpdateFunc: func(_, o interface{}) {
giolekva5ebab802021-10-07 21:50:34 +040068 c.enqueueCA(o)
69 },
70 DeleteFunc: func(o interface{}) {
71 },
72 })
73 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
74 AddFunc: c.enqueueNode,
75 UpdateFunc: func(_, o interface{}) {
76 c.enqueueNode(o)
giolekva96755fa2021-10-06 21:00:00 +040077 },
78 DeleteFunc: func(o interface{}) {
79 },
80 })
81
82 return c
83}
84
giolekva695960b2021-10-07 22:00:29 +040085func (c *NebulaController) enqueueCA(o interface{}) {
giolekva96755fa2021-10-06 21:00:00 +040086 var key string
87 var err error
88 if key, err = cache.MetaNamespaceKeyFunc(o); err != nil {
89 utilruntime.HandleError(err)
90 return
91 }
giolekva5ebab802021-10-07 21:50:34 +040092 c.workqueue.Add(caRef{key})
93}
94
giolekva695960b2021-10-07 22:00:29 +040095func (c *NebulaController) enqueueNode(o interface{}) {
giolekva5ebab802021-10-07 21:50:34 +040096 var key string
97 var err error
98 if key, err = cache.MetaNamespaceKeyFunc(o); err != nil {
99 utilruntime.HandleError(err)
100 return
101 }
102 c.workqueue.Add(nodeRef{key})
giolekva96755fa2021-10-06 21:00:00 +0400103}
104
giolekva695960b2021-10-07 22:00:29 +0400105func (c *NebulaController) Run(workers int, stopCh <-chan struct{}) error {
giolekva96755fa2021-10-06 21:00:00 +0400106 defer utilruntime.HandleCrash()
107 defer c.workqueue.ShutDown()
108 klog.Info("Starting NebulaCA controller")
109 klog.Info("Waiting for informer caches to sync")
giolekva5ebab802021-10-07 21:50:34 +0400110 if ok := cache.WaitForCacheSync(stopCh, c.caSynced, c.nodeSynced, c.secretSynced); !ok {
giolekva96755fa2021-10-06 21:00:00 +0400111 return fmt.Errorf("Failed to wait for caches to sync")
112 }
113 fmt.Println("Starting workers")
114 for i := 0; i < workers; i++ {
115 go wait.Until(c.runWorker, time.Second, stopCh)
116 }
117 fmt.Println("Started workers")
118 <-stopCh
119 fmt.Println("Shutting down workers")
120 return nil
121}
122
giolekva695960b2021-10-07 22:00:29 +0400123func (c *NebulaController) runWorker() {
giolekva96755fa2021-10-06 21:00:00 +0400124 for c.processNextWorkItem() {
125 }
126}
127
giolekva695960b2021-10-07 22:00:29 +0400128func (c *NebulaController) processNextWorkItem() bool {
giolekva96755fa2021-10-06 21:00:00 +0400129 o, shutdown := c.workqueue.Get()
130 if shutdown {
131 return false
132 }
133 err := func(o interface{}) error {
134 defer c.workqueue.Done(o)
giolekva5ebab802021-10-07 21:50:34 +0400135 if ref, ok := o.(caRef); ok {
136 if err := c.processCAWithKey(ref.key); err != nil {
137 c.workqueue.AddRateLimited(ref)
138 return fmt.Errorf("Error syncing '%s': %s, requeuing", ref.key, err.Error())
139 }
giolekva7e73ba72021-12-03 13:14:20 +0400140 c.workqueue.Forget(o)
giolekva5ebab802021-10-07 21:50:34 +0400141 fmt.Printf("Successfully synced CA '%s'\n", ref.key)
142 } else if ref, ok := o.(nodeRef); ok {
143 if err := c.processNodeWithKey(ref.key); err != nil {
144 c.workqueue.AddRateLimited(ref)
145 return fmt.Errorf("Error syncing '%s': %s, requeuing", ref.key, err.Error())
146 }
giolekva7e73ba72021-12-03 13:14:20 +0400147 c.workqueue.Forget(o)
giolekva5ebab802021-10-07 21:50:34 +0400148 fmt.Printf("Successfully synced Node '%s'\n", ref.key)
149 } else {
giolekva96755fa2021-10-06 21:00:00 +0400150 c.workqueue.Forget(o)
giolekva5ebab802021-10-07 21:50:34 +0400151 utilruntime.HandleError(fmt.Errorf("expected reference in workqueue but got %#v", o))
giolekva96755fa2021-10-06 21:00:00 +0400152 return nil
153 }
giolekva96755fa2021-10-06 21:00:00 +0400154 c.workqueue.Forget(o)
giolekva96755fa2021-10-06 21:00:00 +0400155 return nil
156 }(o)
157 if err != nil {
158 utilruntime.HandleError(err)
159 return true
160 }
161 return true
162}
163
giolekva695960b2021-10-07 22:00:29 +0400164func (c *NebulaController) processCAWithKey(key string) error {
giolekva96755fa2021-10-06 21:00:00 +0400165 namespace, name, err := cache.SplitMetaNamespaceKey(key)
166 if err != nil {
167 return nil
168 }
giolekva7e73ba72021-12-03 13:14:20 +0400169 ca, err := c.caLister.NebulaCAs(namespace).Get(name)
giolekva96755fa2021-10-06 21:00:00 +0400170 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400171 if errors.IsNotFound(err) {
172 utilruntime.HandleError(fmt.Errorf("CA '%s' in work queue no longer exists", key))
173 return nil
174 }
175 return err
giolekva96755fa2021-10-06 21:00:00 +0400176 }
177 if ca.Status.State == nebulav1.NebulaCAStateReady {
178 fmt.Printf("%s CA is already in Ready state\n", ca.Name)
179 return nil
180 }
giolekva6bb21c22021-12-29 21:31:08 +0400181 privKey, cert, err := CreateCertificateAuthority(apiAddr(ca.Name), ca.Name)
giolekva96755fa2021-10-06 21:00:00 +0400182 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400183 return err
giolekva96755fa2021-10-06 21:00:00 +0400184 }
giolekva6bb21c22021-12-29 21:31:08 +0400185 secret := &corev1.Secret{
186 ObjectMeta: metav1.ObjectMeta{
187 Name: ca.Spec.SecretName,
188 },
189 Immutable: &secretImmutable,
190 Data: map[string][]byte{
191 "ca.key": privKey,
192 "ca.crt": cert,
193 },
giolekva96755fa2021-10-06 21:00:00 +0400194 }
giolekva96755fa2021-10-06 21:00:00 +0400195 _, err = c.kubeClient.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
196 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400197 return err
giolekva96755fa2021-10-06 21:00:00 +0400198 }
giolekva5ebab802021-10-07 21:50:34 +0400199 err = c.updateCAStatus(ca, nebulav1.NebulaCAStateReady, "Generated credentials")
giolekva96755fa2021-10-06 21:00:00 +0400200 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400201 return err
giolekva96755fa2021-10-06 21:00:00 +0400202 }
203 return nil
204}
205
giolekva695960b2021-10-07 22:00:29 +0400206func (c *NebulaController) processNodeWithKey(key string) error {
giolekva5ebab802021-10-07 21:50:34 +0400207 namespace, name, err := cache.SplitMetaNamespaceKey(key)
208 if err != nil {
209 return nil
210 }
giolekva7e73ba72021-12-03 13:14:20 +0400211 node, err := c.nodeLister.NebulaNodes(namespace).Get(name)
giolekva5ebab802021-10-07 21:50:34 +0400212 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400213 if errors.IsNotFound(err) {
214 utilruntime.HandleError(fmt.Errorf("NebulaNode '%s' in work queue no longer exists", key))
215 return nil
216 }
217 return err
giolekva5ebab802021-10-07 21:50:34 +0400218 }
219 if node.Status.State == nebulav1.NebulaNodeStateReady {
220 fmt.Printf("%s Node is already in Ready state\n", node.Name)
221 return nil
222 }
giolekva7e73ba72021-12-03 13:14:20 +0400223 ca, err := c.caLister.NebulaCAs(node.Spec.CANamespace).Get(node.Spec.CAName)
224 if err != nil {
225 return err
226 }
227 if ca.Status.State != nebulav1.NebulaCAStateReady {
giolekva5ebab802021-10-07 21:50:34 +0400228 return fmt.Errorf("Referenced CA %s is not ready yet.", node.Spec.CAName)
229 }
giolekva7e73ba72021-12-03 13:14:20 +0400230 caSecret, err := c.secretLister.Secrets(ca.Namespace).Get(ca.Spec.SecretName)
giolekva5ebab802021-10-07 21:50:34 +0400231 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400232 if errors.IsNotFound(err) {
233 c.updateNodeStatus(node, nebulav1.NebulaNodeStateError, "Could not find CA secret")
234 }
235 return err
giolekva5ebab802021-10-07 21:50:34 +0400236 }
giolekva6bb21c22021-12-29 21:31:08 +0400237 var pubKey []byte
238 if node.Spec.PubKey != "" {
239 pubKey = []byte(node.Spec.PubKey)
240 }
241 privKey, nodeCert, err := SignNebulaNode(apiAddr(ca.Name), caSecret.Data["ca.key"], caSecret.Data["ca.crt"], node.Name, pubKey, node.Spec.IPCidr)
giolekva5ebab802021-10-07 21:50:34 +0400242 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400243 return err
giolekva5ebab802021-10-07 21:50:34 +0400244 }
giolekva6bb21c22021-12-29 21:31:08 +0400245 secret := &corev1.Secret{
246 ObjectMeta: metav1.ObjectMeta{
247 Name: node.Spec.SecretName,
248 },
249 Immutable: &secretImmutable,
250 Data: map[string][]byte{
251 "ca.crt": caSecret.Data["ca.crt"],
252 "host.crt": nodeCert,
253 "host.key": privKey,
254 },
giolekva5ebab802021-10-07 21:50:34 +0400255 }
giolekva5ebab802021-10-07 21:50:34 +0400256 _, err = c.kubeClient.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
257 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400258 return err
giolekva5ebab802021-10-07 21:50:34 +0400259 }
260 err = c.updateNodeStatus(node, nebulav1.NebulaNodeStateReady, "Generated credentials")
261 if err != nil {
giolekva7e73ba72021-12-03 13:14:20 +0400262 return err
giolekva5ebab802021-10-07 21:50:34 +0400263 }
264 return nil
265}
266
giolekva695960b2021-10-07 22:00:29 +0400267func (c *NebulaController) updateCAStatus(ca *nebulav1.NebulaCA, state nebulav1.NebulaCAState, msg string) error {
giolekva96755fa2021-10-06 21:00:00 +0400268 cp := ca.DeepCopy()
269 cp.Status.State = state
270 cp.Status.Message = msg
271 _, err := c.nebulaClient.LekvaV1().NebulaCAs(cp.Namespace).UpdateStatus(context.TODO(), cp, metav1.UpdateOptions{})
272 return err
273}
274
giolekva695960b2021-10-07 22:00:29 +0400275func (c *NebulaController) updateNodeStatus(node *nebulav1.NebulaNode, state nebulav1.NebulaNodeState, msg string) error {
giolekva5ebab802021-10-07 21:50:34 +0400276 cp := node.DeepCopy()
277 cp.Status.State = state
278 cp.Status.Message = msg
279 _, err := c.nebulaClient.LekvaV1().NebulaNodes(cp.Namespace).UpdateStatus(context.TODO(), cp, metav1.UpdateOptions{})
280 return err
281}
282
giolekva6bb21c22021-12-29 21:31:08 +0400283// TODO(giolekva): maybe pass by flag?
284func apiAddr(ca string) string {
285 return fmt.Sprintf("http://nebula-api.%s-ingress-private.svc.cluster.local", ca)
giolekva5ebab802021-10-07 21:50:34 +0400286}