evet-processor: mark event as done once pod finishes
diff --git a/events/processor.go b/events/processor.go
index 3d4079c..89db4d5 100644
--- a/events/processor.go
+++ b/events/processor.go
@@ -3,6 +3,7 @@
import (
"context"
"fmt"
+ "time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -18,9 +19,10 @@
// Implements processor
type singleEventAtATimeProcessor struct {
- store EventStore
- pods corev1.PodInterface
- pcloudApi string
+ store EventStore
+ pods corev1.PodInterface
+ pcloudApi string
+ // TODO(giolekva): Nodes themselves should be associated with object store
objectStoreApi string
}
@@ -33,22 +35,29 @@
func (p *singleEventAtATimeProcessor) Start() {
for {
- events, err := p.store.GetEventsInState(EventStateNew)
- if err != nil {
- glog.Error(err)
- continue
+ select {
+ case <-time.After(30 * time.Second):
+ events, err := p.store.GetEventsInState(EventStateNew)
+ if err != nil {
+ glog.Error(err)
+ continue
+ }
+ if len(events) == 0 {
+ continue
+ }
+ event := events[0]
+ pod := createPod(event.NodeId, p.pcloudApi, p.objectStoreApi)
+ glog.Info("Creating pod...")
+ resp, err := p.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
+ if err != nil {
+ glog.Error(resp)
+ continue
+ }
+ glog.Infof("Pod created: %s", resp)
+ // TODO(giolekva): do not ignore error
+ _ = monitorPod(resp, p.pods)
+ p.store.MarkEventDone(event)
}
- pod := createPod(events[0].NodeId, p.pcloudApi, p.objectStoreApi)
- glog.Info("Creating pod...")
- resp, err := p.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
- if err != nil {
- glog.Error(resp)
- continue
- }
- glog.Infof("Pod created: %s", resp)
- // TODO(giolekva): do not ignore error
- _ = monitorPod(resp, p.pods)
- break
}
}
@@ -69,8 +78,9 @@
return nil
}
p := events.Object.(*apiv1.Pod)
- fmt.Println("Pod status:", pod.Status.Phase)
+ glog.Infof("Pod status: %s", pod.Status.Phase)
if isInTerminalState(p) {
+ glog.Info("Pod is DONE")
w.Stop()
}
}
@@ -88,6 +98,6 @@
Name: "event",
Image: "giolekva/face-detector:latest",
ImagePullPolicy: apiv1.PullAlways,
- Command: []string{"python", "main.py"},
+ Command: []string{"python3", "main.py"},
Args: []string{pcloudApi, objectStoreApi, id}}}}}
}