blob: fa234cc1fb9d27ba592a09e62b15f9111bd9e707 [file] [log] [blame]
giolekvaf89e0462020-05-02 17:47:06 +04001package events
2
3import (
giolekva8d9f1022020-05-13 20:13:36 +04004 "bytes"
giolekvaf89e0462020-05-02 17:47:06 +04005 "context"
giolekva8d9f1022020-05-13 20:13:36 +04006 "text/template"
giolekvaede6d2b2020-05-05 22:14:16 +04007 "time"
giolekvaf89e0462020-05-02 17:47:06 +04008
9 apiv1 "k8s.io/api/core/v1"
10 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
giolekva8d9f1022020-05-13 20:13:36 +040011 "k8s.io/apimachinery/pkg/util/yaml"
12 "k8s.io/client-go/kubernetes"
giolekvaf89e0462020-05-02 17:47:06 +040013 corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
14
15 "github.com/golang/glog"
16 // "github.com/itaysk/regogo"
17)
18
19type Processor interface {
20 Start()
21}
22
23// Implements processor
24type singleEventAtATimeProcessor struct {
giolekva8d9f1022020-05-13 20:13:36 +040025 store EventStore
26 appManager AppManager
27 kube *kubernetes.Clientset
28 pcloudApi string
giolekvaede6d2b2020-05-05 22:14:16 +040029 // TODO(giolekva): Nodes themselves should be associated with object store
giolekvaf89e0462020-05-02 17:47:06 +040030 objectStoreApi string
31}
32
33func NewSingleEventAtATimeProcessor(
34 store EventStore,
giolekva8d9f1022020-05-13 20:13:36 +040035 appManager AppManager,
36 kube *kubernetes.Clientset,
giolekvaf89e0462020-05-02 17:47:06 +040037 pcloudApi, objectStoreApi string) Processor {
giolekva8d9f1022020-05-13 20:13:36 +040038 return &singleEventAtATimeProcessor{store, appManager, kube, pcloudApi, objectStoreApi}
giolekvaf89e0462020-05-02 17:47:06 +040039}
40
41func (p *singleEventAtATimeProcessor) Start() {
42 for {
giolekvaede6d2b2020-05-05 22:14:16 +040043 select {
44 case <-time.After(30 * time.Second):
45 events, err := p.store.GetEventsInState(EventStateNew)
46 if err != nil {
47 glog.Error(err)
48 continue
49 }
50 if len(events) == 0 {
51 continue
52 }
53 event := events[0]
giolekva8d9f1022020-05-13 20:13:36 +040054 triggers, err := p.appManager.QueryTriggers("Image", string(EventStateNew))
giolekvaede6d2b2020-05-05 22:14:16 +040055 if err != nil {
giolekvac17b19c2020-05-06 15:33:49 +040056 glog.Error(err)
giolekvaede6d2b2020-05-05 22:14:16 +040057 continue
58 }
giolekva8d9f1022020-05-13 20:13:36 +040059 for _, t := range triggers {
60 pod, err := renderTriggerTemplate(t, event.NodeId, p.pcloudApi, p.objectStoreApi)
61 if err != nil {
62 glog.Errorf("Could not render trigger: %v %v", err, t)
63 continue
64 }
65 glog.Info("Creating pod: %v", pod)
66 pods := p.kube.CoreV1().Pods(t.Namespace)
67 resp, err := pods.Create(context.TODO(), pod, metav1.CreateOptions{})
68 if err != nil {
69 glog.Error(err)
70 continue
71 }
72 glog.Infof("Pod created: %s", resp)
73 // TODO(giolekva): do not ignore error
74 _ = monitorPod(resp, pods)
75 }
giolekvaede6d2b2020-05-05 22:14:16 +040076 p.store.MarkEventDone(event)
giolekvaf89e0462020-05-02 17:47:06 +040077 }
giolekvaf89e0462020-05-02 17:47:06 +040078 }
79}
80
81func isInTerminalState(pod *apiv1.Pod) bool {
82 return pod.Status.Phase == apiv1.PodSucceeded ||
83 pod.Status.Phase == apiv1.PodFailed
84}
85
86func monitorPod(pod *apiv1.Pod, pods corev1.PodInterface) error {
87 w, err := pods.Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta))
88 if err != nil {
89 return err
90 }
91 for {
92 select {
93 case events, ok := <-w.ResultChan():
94 if !ok {
95 return nil
96 }
97 p := events.Object.(*apiv1.Pod)
giolekvaede6d2b2020-05-05 22:14:16 +040098 glog.Infof("Pod status: %s", pod.Status.Phase)
giolekvaf89e0462020-05-02 17:47:06 +040099 if isInTerminalState(p) {
giolekvaede6d2b2020-05-05 22:14:16 +0400100 glog.Info("Pod is DONE")
giolekvaf89e0462020-05-02 17:47:06 +0400101 w.Stop()
102 }
103 }
104 }
105 return nil
106}
107
giolekva8d9f1022020-05-13 20:13:36 +0400108type args struct {
109 Id string
110 PCloudApiAddr string
111 ObjectStoreAddr string
112}
113
114func renderTriggerTemplate(t Trigger, id string, pcloudApi string, objectStoreApi string) (*apiv1.Pod, error) {
115 tmpl, err := template.New("trigger").Parse(t.Template)
116 if err != nil {
117 return nil, err
118 }
119 var b bytes.Buffer
120 if err := tmpl.Execute(&b, args{id, pcloudApi, objectStoreApi}); err != nil {
121 return nil, err
122 }
123 var pod apiv1.Pod
124 dec := yaml.NewYAMLOrJSONDecoder(&b, 100)
125 if err := dec.Decode(&pod); err != nil {
126 return nil, err
127 }
128 return &pod, nil
giolekvaf89e0462020-05-02 17:47:06 +0400129}