event-processor: query app-manager for triggers instead of hard coding them
diff --git a/events/appmanager.go b/events/appmanager.go
new file mode 100644
index 0000000..91b6791
--- /dev/null
+++ b/events/appmanager.go
@@ -0,0 +1,43 @@
+package events
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+)
+
+type Trigger struct {
+	Namespace string `json:"namespace"`
+	Template  string `json:"template"`
+}
+
+type AppManager interface {
+	QueryTriggers(triggerOnType string, triggerOnEvent string) ([]Trigger, error)
+}
+
+type appManagerClient struct {
+	addr string
+}
+
+func NewAppManagerClient(addr string) AppManager {
+	return &appManagerClient{addr}
+}
+
+func (c *appManagerClient) QueryTriggers(triggerOnType string, triggerOnEvent string) ([]Trigger, error) {
+	triggerUrl := fmt.Sprintf("%s/triggers?trigger_on_type=%s&trigger_on_event=%s",
+		c.addr, triggerOnType, triggerOnEvent)
+	resp, err := http.Get(triggerUrl)
+	if err != nil {
+		return nil, err
+	}
+	respBody, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return nil, err
+	}
+	triggers := make([]Trigger, 0)
+	if err := json.Unmarshal(respBody, &triggers); err != nil {
+		return nil, err
+	}
+	return triggers, nil
+}
diff --git a/events/cmd/main.go b/events/cmd/main.go
index 5a20c26..c272a4e 100644
--- a/events/cmd/main.go
+++ b/events/cmd/main.go
@@ -14,6 +14,7 @@
 
 var kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file.")
 var apiAddr = flag.String("api_addr", "", "PCloud API server address.")
+var appManagerAddr = flag.String("app_manager_addr", "", "PCloud AppManager address.")
 var objectStoreAddr = flag.String("object_store_addr", "", "S3 compatible object store address.")
 
 func getKubeConfig() (*rest.Config, error) {
@@ -26,16 +27,16 @@
 
 func main() {
 	flag.Parse()
-	config, err := getKubeConfig()
+	kubeconfig, err := getKubeConfig()
 	if err != nil {
 		glog.Fatalf("Could not initialize Kubeconfig: %v", err)
 	}
-	clientset, err := kubernetes.NewForConfig(config)
+	kube, err := kubernetes.NewForConfig(kubeconfig)
 	if err != nil {
 		glog.Fatalf("Could not create Kubernetes API client: %v", err)
 	}
-	pods := clientset.CoreV1().Pods("default")
 	eventStore := events.NewGraphQLClient(*apiAddr)
+	appManager := events.NewAppManagerClient(*appManagerAddr)
 	events.NewSingleEventAtATimeProcessor(
-		eventStore, pods, *apiAddr, *objectStoreAddr).Start()
+		eventStore, appManager, kube, *apiAddr, *objectStoreAddr).Start()
 }
diff --git a/events/install.yaml b/events/install.yaml
index cf53f76..9f2b66c 100644
--- a/events/install.yaml
+++ b/events/install.yaml
@@ -52,4 +52,4 @@
       - name: event-processor
         image: giolekva/pcloud-event-processor:latest
         imagePullPolicy: Always
-        command: ["event-processor", "--logtostderr", "--api_addr=http://api.pcloud.svc:1111/graphql", "--object_store_addr=http://minio.minio.svc:9000"]
+        command: ["event-processor", "--logtostderr", "--api_addr=http://api.pcloud.svc:1111/graphql", "--app_manager_addr=http://app-manager.pcloud-app-manager.svc:80", "--object_store_addr=http://minio.minio.svc:9000"]
diff --git a/events/processor.go b/events/processor.go
index 504e261..fa234cc 100644
--- a/events/processor.go
+++ b/events/processor.go
@@ -1,12 +1,15 @@
 package events
 
 import (
+	"bytes"
 	"context"
-	"fmt"
+	"text/template"
 	"time"
 
 	apiv1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/util/yaml"
+	"k8s.io/client-go/kubernetes"
 	corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
 
 	"github.com/golang/glog"
@@ -19,18 +22,20 @@
 
 // Implements processor
 type singleEventAtATimeProcessor struct {
-	store     EventStore
-	pods      corev1.PodInterface
-	pcloudApi string
+	store      EventStore
+	appManager AppManager
+	kube       *kubernetes.Clientset
+	pcloudApi  string
 	// TODO(giolekva): Nodes themselves should be associated with object store
 	objectStoreApi string
 }
 
 func NewSingleEventAtATimeProcessor(
 	store EventStore,
-	pods corev1.PodInterface,
+	appManager AppManager,
+	kube *kubernetes.Clientset,
 	pcloudApi, objectStoreApi string) Processor {
-	return &singleEventAtATimeProcessor{store, pods, pcloudApi, objectStoreApi}
+	return &singleEventAtATimeProcessor{store, appManager, kube, pcloudApi, objectStoreApi}
 }
 
 func (p *singleEventAtATimeProcessor) Start() {
@@ -46,16 +51,28 @@
 				continue
 			}
 			event := events[0]
-			pod := createPod(event.NodeId, p.pcloudApi, p.objectStoreApi)
-			glog.Info("Creating pod...")
-			resp, err := p.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
+			triggers, err := p.appManager.QueryTriggers("Image", string(EventStateNew))
 			if err != nil {
 				glog.Error(err)
 				continue
 			}
-			glog.Infof("Pod created: %s", resp)
-			// TODO(giolekva): do not ignore error
-			_ = monitorPod(resp, p.pods)
+			for _, t := range triggers {
+				pod, err := renderTriggerTemplate(t, event.NodeId, p.pcloudApi, p.objectStoreApi)
+				if err != nil {
+					glog.Errorf("Could not render trigger: %v %v", err, t)
+					continue
+				}
+				glog.Info("Creating pod: %v", pod)
+				pods := p.kube.CoreV1().Pods(t.Namespace)
+				resp, err := pods.Create(context.TODO(), pod, metav1.CreateOptions{})
+				if err != nil {
+					glog.Error(err)
+					continue
+				}
+				glog.Infof("Pod created: %s", resp)
+				// TODO(giolekva): do not ignore error
+				_ = monitorPod(resp, pods)
+			}
 			p.store.MarkEventDone(event)
 		}
 	}
@@ -88,16 +105,25 @@
 	return nil
 }
 
-func createPod(id string, pcloudApi string, objectStoreApi string) *apiv1.Pod {
-	return &apiv1.Pod{
-		ObjectMeta: metav1.ObjectMeta{
-			Name: fmt.Sprintf("event-%s", id)},
-		Spec: apiv1.PodSpec{
-			RestartPolicy: apiv1.RestartPolicyNever,
-			Containers: []apiv1.Container{{
-				Name:            "event",
-				Image:           "giolekva/face-detector:latest",
-				ImagePullPolicy: apiv1.PullAlways,
-				Command:         []string{"python3", "main.py"},
-				Args:            []string{pcloudApi, objectStoreApi, id}}}}}
+type args struct {
+	Id              string
+	PCloudApiAddr   string
+	ObjectStoreAddr string
+}
+
+func renderTriggerTemplate(t Trigger, id string, pcloudApi string, objectStoreApi string) (*apiv1.Pod, error) {
+	tmpl, err := template.New("trigger").Parse(t.Template)
+	if err != nil {
+		return nil, err
+	}
+	var b bytes.Buffer
+	if err := tmpl.Execute(&b, args{id, pcloudApi, objectStoreApi}); err != nil {
+		return nil, err
+	}
+	var pod apiv1.Pod
+	dec := yaml.NewYAMLOrJSONDecoder(&b, 100)
+	if err := dec.Decode(&pod); err != nil {
+		return nil, err
+	}
+	return &pod, nil
 }