blob: 4ece5e6da60da527e45444a5785495499b9d2737 [file] [log] [blame]
giolekvaf89e0462020-05-02 17:47:06 +04001package events
2
3import (
4 "context"
5 "fmt"
6
7 apiv1 "k8s.io/api/core/v1"
8 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9 corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
10
11 "github.com/golang/glog"
12 // "github.com/itaysk/regogo"
13)
14
15type Processor interface {
16 Start()
17}
18
19// Implements processor
20type singleEventAtATimeProcessor struct {
21 store EventStore
22 pods corev1.PodInterface
23 pcloudApi string
24 objectStoreApi string
25}
26
27func NewSingleEventAtATimeProcessor(
28 store EventStore,
29 pods corev1.PodInterface,
30 pcloudApi, objectStoreApi string) Processor {
31 return &singleEventAtATimeProcessor{store, pods, pcloudApi, objectStoreApi}
32}
33
34func (p *singleEventAtATimeProcessor) Start() {
35 for {
36 events, err := p.store.GetEventsInState(EventStateNew)
37 if err != nil {
38 glog.Error(err)
39 continue
40 }
41 pod := createPod(events[0].NodeId, p.pcloudApi, p.objectStoreApi)
42 glog.Info("Creating pod...")
43 resp, err := p.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
44 if err != nil {
45 glog.Error(resp)
46 continue
47 }
48 glog.Infof("Pod created: %s", resp)
49 // TODO(giolekva): do not ignore error
50 _ = monitorPod(resp, p.pods)
giolekvad2618252020-05-02 19:39:02 +040051 break
giolekvaf89e0462020-05-02 17:47:06 +040052 }
53}
54
55func isInTerminalState(pod *apiv1.Pod) bool {
56 return pod.Status.Phase == apiv1.PodSucceeded ||
57 pod.Status.Phase == apiv1.PodFailed
58}
59
60func monitorPod(pod *apiv1.Pod, pods corev1.PodInterface) error {
61 w, err := pods.Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta))
62 if err != nil {
63 return err
64 }
65 for {
66 select {
67 case events, ok := <-w.ResultChan():
68 if !ok {
69 return nil
70 }
71 p := events.Object.(*apiv1.Pod)
72 fmt.Println("Pod status:", pod.Status.Phase)
73 if isInTerminalState(p) {
74 w.Stop()
75 }
76 }
77 }
78 return nil
79}
80
81func createPod(id string, pcloudApi string, objectStoreApi string) *apiv1.Pod {
82 return &apiv1.Pod{
83 ObjectMeta: metav1.ObjectMeta{
84 Name: fmt.Sprintf("event-%s", id)},
85 Spec: apiv1.PodSpec{
giolekva3bd20e12020-05-04 14:14:03 +040086 RestartPolicy: apiv1.RestartPolicyIfNotPresent,
87 NodeSelector: map[string]string{"kubernetes.io/arch", "amd64"},
giolekvaf89e0462020-05-02 17:47:06 +040088 Containers: []apiv1.Container{{
89 Name: "event",
90 Image: "giolekva/face-detector:latest",
91 ImagePullPolicy: apiv1.PullNever,
92 Command: []string{"python", "main.py"},
93 Args: []string{pcloudApi, objectStoreApi, id}}}}}
94 // "http://api.pcloud.svc:1111/graphql", "http://minio.minio.svc:9000"
95}