event-processor: query app-manager for triggers instead of hard coding them
diff --git a/events/processor.go b/events/processor.go
index 504e261..fa234cc 100644
--- a/events/processor.go
+++ b/events/processor.go
@@ -1,12 +1,15 @@
package events
import (
+ "bytes"
"context"
- "fmt"
+ "text/template"
"time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/yaml"
+ "k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/golang/glog"
@@ -19,18 +22,20 @@
// Implements processor
type singleEventAtATimeProcessor struct {
- store EventStore
- pods corev1.PodInterface
- pcloudApi string
+ store EventStore
+ appManager AppManager
+ kube *kubernetes.Clientset
+ pcloudApi string
// TODO(giolekva): Nodes themselves should be associated with object store
objectStoreApi string
}
func NewSingleEventAtATimeProcessor(
store EventStore,
- pods corev1.PodInterface,
+ appManager AppManager,
+ kube *kubernetes.Clientset,
pcloudApi, objectStoreApi string) Processor {
- return &singleEventAtATimeProcessor{store, pods, pcloudApi, objectStoreApi}
+ return &singleEventAtATimeProcessor{store, appManager, kube, pcloudApi, objectStoreApi}
}
func (p *singleEventAtATimeProcessor) Start() {
@@ -46,16 +51,28 @@
continue
}
event := events[0]
- pod := createPod(event.NodeId, p.pcloudApi, p.objectStoreApi)
- glog.Info("Creating pod...")
- resp, err := p.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
+ triggers, err := p.appManager.QueryTriggers("Image", string(EventStateNew))
if err != nil {
glog.Error(err)
continue
}
- glog.Infof("Pod created: %s", resp)
- // TODO(giolekva): do not ignore error
- _ = monitorPod(resp, p.pods)
+ for _, t := range triggers {
+ pod, err := renderTriggerTemplate(t, event.NodeId, p.pcloudApi, p.objectStoreApi)
+ if err != nil {
+ glog.Errorf("Could not render trigger: %v %v", err, t)
+ continue
+ }
+ glog.Info("Creating pod: %v", pod)
+ pods := p.kube.CoreV1().Pods(t.Namespace)
+ resp, err := pods.Create(context.TODO(), pod, metav1.CreateOptions{})
+ if err != nil {
+ glog.Error(err)
+ continue
+ }
+ glog.Infof("Pod created: %s", resp)
+ // TODO(giolekva): do not ignore error
+ _ = monitorPod(resp, pods)
+ }
p.store.MarkEventDone(event)
}
}
@@ -88,16 +105,25 @@
return nil
}
-func createPod(id string, pcloudApi string, objectStoreApi string) *apiv1.Pod {
- return &apiv1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: fmt.Sprintf("event-%s", id)},
- Spec: apiv1.PodSpec{
- RestartPolicy: apiv1.RestartPolicyNever,
- Containers: []apiv1.Container{{
- Name: "event",
- Image: "giolekva/face-detector:latest",
- ImagePullPolicy: apiv1.PullAlways,
- Command: []string{"python3", "main.py"},
- Args: []string{pcloudApi, objectStoreApi, id}}}}}
+type args struct {
+ Id string
+ PCloudApiAddr string
+ ObjectStoreAddr string
+}
+
+func renderTriggerTemplate(t Trigger, id string, pcloudApi string, objectStoreApi string) (*apiv1.Pod, error) {
+ tmpl, err := template.New("trigger").Parse(t.Template)
+ if err != nil {
+ return nil, err
+ }
+ var b bytes.Buffer
+ if err := tmpl.Execute(&b, args{id, pcloudApi, objectStoreApi}); err != nil {
+ return nil, err
+ }
+ var pod apiv1.Pod
+ dec := yaml.NewYAMLOrJSONDecoder(&b, 100)
+ if err := dec.Decode(&pod); err != nil {
+ return nil, err
+ }
+ return &pod, nil
}