blob: fa234cc1fb9d27ba592a09e62b15f9111bd9e707 [file] [log] [blame]
package events
import (
"bytes"
"context"
"text/template"
"time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/golang/glog"
// "github.com/itaysk/regogo"
)
type Processor interface {
Start()
}
// Implements processor
type singleEventAtATimeProcessor struct {
store EventStore
appManager AppManager
kube *kubernetes.Clientset
pcloudApi string
// TODO(giolekva): Nodes themselves should be associated with object store
objectStoreApi string
}
func NewSingleEventAtATimeProcessor(
store EventStore,
appManager AppManager,
kube *kubernetes.Clientset,
pcloudApi, objectStoreApi string) Processor {
return &singleEventAtATimeProcessor{store, appManager, kube, pcloudApi, objectStoreApi}
}
func (p *singleEventAtATimeProcessor) Start() {
for {
select {
case <-time.After(30 * time.Second):
events, err := p.store.GetEventsInState(EventStateNew)
if err != nil {
glog.Error(err)
continue
}
if len(events) == 0 {
continue
}
event := events[0]
triggers, err := p.appManager.QueryTriggers("Image", string(EventStateNew))
if err != nil {
glog.Error(err)
continue
}
for _, t := range triggers {
pod, err := renderTriggerTemplate(t, event.NodeId, p.pcloudApi, p.objectStoreApi)
if err != nil {
glog.Errorf("Could not render trigger: %v %v", err, t)
continue
}
glog.Info("Creating pod: %v", pod)
pods := p.kube.CoreV1().Pods(t.Namespace)
resp, err := pods.Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
glog.Error(err)
continue
}
glog.Infof("Pod created: %s", resp)
// TODO(giolekva): do not ignore error
_ = monitorPod(resp, pods)
}
p.store.MarkEventDone(event)
}
}
}
func isInTerminalState(pod *apiv1.Pod) bool {
return pod.Status.Phase == apiv1.PodSucceeded ||
pod.Status.Phase == apiv1.PodFailed
}
func monitorPod(pod *apiv1.Pod, pods corev1.PodInterface) error {
w, err := pods.Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta))
if err != nil {
return err
}
for {
select {
case events, ok := <-w.ResultChan():
if !ok {
return nil
}
p := events.Object.(*apiv1.Pod)
glog.Infof("Pod status: %s", pod.Status.Phase)
if isInTerminalState(p) {
glog.Info("Pod is DONE")
w.Stop()
}
}
}
return nil
}
type args struct {
Id string
PCloudApiAddr string
ObjectStoreAddr string
}
func renderTriggerTemplate(t Trigger, id string, pcloudApi string, objectStoreApi string) (*apiv1.Pod, error) {
tmpl, err := template.New("trigger").Parse(t.Template)
if err != nil {
return nil, err
}
var b bytes.Buffer
if err := tmpl.Execute(&b, args{id, pcloudApi, objectStoreApi}); err != nil {
return nil, err
}
var pod apiv1.Pod
dec := yaml.NewYAMLOrJSONDecoder(&b, 100)
if err := dec.Decode(&pod); err != nil {
return nil, err
}
return &pod, nil
}