blob: 89db4d52c00356529ce3049d6f734eb2c4076b3f [file] [log] [blame]
package events
import (
"context"
"fmt"
"time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/golang/glog"
// "github.com/itaysk/regogo"
)
type Processor interface {
Start()
}
// Implements processor
type singleEventAtATimeProcessor struct {
store EventStore
pods corev1.PodInterface
pcloudApi string
// TODO(giolekva): Nodes themselves should be associated with object store
objectStoreApi string
}
func NewSingleEventAtATimeProcessor(
store EventStore,
pods corev1.PodInterface,
pcloudApi, objectStoreApi string) Processor {
return &singleEventAtATimeProcessor{store, pods, pcloudApi, objectStoreApi}
}
func (p *singleEventAtATimeProcessor) Start() {
for {
select {
case <-time.After(30 * time.Second):
events, err := p.store.GetEventsInState(EventStateNew)
if err != nil {
glog.Error(err)
continue
}
if len(events) == 0 {
continue
}
event := events[0]
pod := createPod(event.NodeId, p.pcloudApi, p.objectStoreApi)
glog.Info("Creating pod...")
resp, err := p.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
glog.Error(resp)
continue
}
glog.Infof("Pod created: %s", resp)
// TODO(giolekva): do not ignore error
_ = monitorPod(resp, p.pods)
p.store.MarkEventDone(event)
}
}
}
func isInTerminalState(pod *apiv1.Pod) bool {
return pod.Status.Phase == apiv1.PodSucceeded ||
pod.Status.Phase == apiv1.PodFailed
}
func monitorPod(pod *apiv1.Pod, pods corev1.PodInterface) error {
w, err := pods.Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta))
if err != nil {
return err
}
for {
select {
case events, ok := <-w.ResultChan():
if !ok {
return nil
}
p := events.Object.(*apiv1.Pod)
glog.Infof("Pod status: %s", pod.Status.Phase)
if isInTerminalState(p) {
glog.Info("Pod is DONE")
w.Stop()
}
}
}
return nil
}
func createPod(id string, pcloudApi string, objectStoreApi string) *apiv1.Pod {
return &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("event-%s", id)},
Spec: apiv1.PodSpec{
RestartPolicy: apiv1.RestartPolicyNever,
Containers: []apiv1.Container{{
Name: "event",
Image: "giolekva/face-detector:latest",
ImagePullPolicy: apiv1.PullAlways,
Command: []string{"python3", "main.py"},
Args: []string{pcloudApi, objectStoreApi, id}}}}}
}