blob: 89db4d52c00356529ce3049d6f734eb2c4076b3f [file] [log] [blame]
giolekvaf89e0462020-05-02 17:47:06 +04001package events
2
3import (
4 "context"
5 "fmt"
giolekvaede6d2b2020-05-05 22:14:16 +04006 "time"
giolekvaf89e0462020-05-02 17:47:06 +04007
8 apiv1 "k8s.io/api/core/v1"
9 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10 corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
11
12 "github.com/golang/glog"
13 // "github.com/itaysk/regogo"
14)
15
16type Processor interface {
17 Start()
18}
19
20// Implements processor
21type singleEventAtATimeProcessor struct {
giolekvaede6d2b2020-05-05 22:14:16 +040022 store EventStore
23 pods corev1.PodInterface
24 pcloudApi string
25 // TODO(giolekva): Nodes themselves should be associated with object store
giolekvaf89e0462020-05-02 17:47:06 +040026 objectStoreApi string
27}
28
29func NewSingleEventAtATimeProcessor(
30 store EventStore,
31 pods corev1.PodInterface,
32 pcloudApi, objectStoreApi string) Processor {
33 return &singleEventAtATimeProcessor{store, pods, pcloudApi, objectStoreApi}
34}
35
36func (p *singleEventAtATimeProcessor) Start() {
37 for {
giolekvaede6d2b2020-05-05 22:14:16 +040038 select {
39 case <-time.After(30 * time.Second):
40 events, err := p.store.GetEventsInState(EventStateNew)
41 if err != nil {
42 glog.Error(err)
43 continue
44 }
45 if len(events) == 0 {
46 continue
47 }
48 event := events[0]
49 pod := createPod(event.NodeId, p.pcloudApi, p.objectStoreApi)
50 glog.Info("Creating pod...")
51 resp, err := p.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
52 if err != nil {
53 glog.Error(resp)
54 continue
55 }
56 glog.Infof("Pod created: %s", resp)
57 // TODO(giolekva): do not ignore error
58 _ = monitorPod(resp, p.pods)
59 p.store.MarkEventDone(event)
giolekvaf89e0462020-05-02 17:47:06 +040060 }
giolekvaf89e0462020-05-02 17:47:06 +040061 }
62}
63
64func isInTerminalState(pod *apiv1.Pod) bool {
65 return pod.Status.Phase == apiv1.PodSucceeded ||
66 pod.Status.Phase == apiv1.PodFailed
67}
68
69func monitorPod(pod *apiv1.Pod, pods corev1.PodInterface) error {
70 w, err := pods.Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta))
71 if err != nil {
72 return err
73 }
74 for {
75 select {
76 case events, ok := <-w.ResultChan():
77 if !ok {
78 return nil
79 }
80 p := events.Object.(*apiv1.Pod)
giolekvaede6d2b2020-05-05 22:14:16 +040081 glog.Infof("Pod status: %s", pod.Status.Phase)
giolekvaf89e0462020-05-02 17:47:06 +040082 if isInTerminalState(p) {
giolekvaede6d2b2020-05-05 22:14:16 +040083 glog.Info("Pod is DONE")
giolekvaf89e0462020-05-02 17:47:06 +040084 w.Stop()
85 }
86 }
87 }
88 return nil
89}
90
91func createPod(id string, pcloudApi string, objectStoreApi string) *apiv1.Pod {
92 return &apiv1.Pod{
93 ObjectMeta: metav1.ObjectMeta{
94 Name: fmt.Sprintf("event-%s", id)},
95 Spec: apiv1.PodSpec{
giolekva14438032020-05-05 18:23:13 +040096 RestartPolicy: apiv1.RestartPolicyNever,
giolekvaf89e0462020-05-02 17:47:06 +040097 Containers: []apiv1.Container{{
98 Name: "event",
99 Image: "giolekva/face-detector:latest",
giolekva14438032020-05-05 18:23:13 +0400100 ImagePullPolicy: apiv1.PullAlways,
giolekvaede6d2b2020-05-05 22:14:16 +0400101 Command: []string{"python3", "main.py"},
giolekvaf89e0462020-05-02 17:47:06 +0400102 Args: []string{pcloudApi, objectStoreApi, id}}}}}
giolekvaf89e0462020-05-02 17:47:06 +0400103}