event-processor: query app-manager for triggers instead of hard coding them
diff --git a/events/appmanager.go b/events/appmanager.go
new file mode 100644
index 0000000..91b6791
--- /dev/null
+++ b/events/appmanager.go
@@ -0,0 +1,43 @@
+package events
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+)
+
+type Trigger struct {
+ Namespace string `json:"namespace"`
+ Template string `json:"template"`
+}
+
+type AppManager interface {
+ QueryTriggers(triggerOnType string, triggerOnEvent string) ([]Trigger, error)
+}
+
+type appManagerClient struct {
+ addr string
+}
+
+func NewAppManagerClient(addr string) AppManager {
+ return &appManagerClient{addr}
+}
+
+func (c *appManagerClient) QueryTriggers(triggerOnType string, triggerOnEvent string) ([]Trigger, error) {
+ triggerUrl := fmt.Sprintf("%s/triggers?trigger_on_type=%s&trigger_on_event=%s",
+ c.addr, triggerOnType, triggerOnEvent)
+ resp, err := http.Get(triggerUrl)
+ if err != nil {
+ return nil, err
+ }
+ respBody, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+ triggers := make([]Trigger, 0)
+ if err := json.Unmarshal(respBody, &triggers); err != nil {
+ return nil, err
+ }
+ return triggers, nil
+}
diff --git a/events/cmd/main.go b/events/cmd/main.go
index 5a20c26..c272a4e 100644
--- a/events/cmd/main.go
+++ b/events/cmd/main.go
@@ -14,6 +14,7 @@
var kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file.")
var apiAddr = flag.String("api_addr", "", "PCloud API server address.")
+var appManagerAddr = flag.String("app_manager_addr", "", "PCloud AppManager address.")
var objectStoreAddr = flag.String("object_store_addr", "", "S3 compatible object store address.")
func getKubeConfig() (*rest.Config, error) {
@@ -26,16 +27,16 @@
func main() {
flag.Parse()
- config, err := getKubeConfig()
+ kubeconfig, err := getKubeConfig()
if err != nil {
glog.Fatalf("Could not initialize Kubeconfig: %v", err)
}
- clientset, err := kubernetes.NewForConfig(config)
+ kube, err := kubernetes.NewForConfig(kubeconfig)
if err != nil {
glog.Fatalf("Could not create Kubernetes API client: %v", err)
}
- pods := clientset.CoreV1().Pods("default")
eventStore := events.NewGraphQLClient(*apiAddr)
+ appManager := events.NewAppManagerClient(*appManagerAddr)
events.NewSingleEventAtATimeProcessor(
- eventStore, pods, *apiAddr, *objectStoreAddr).Start()
+ eventStore, appManager, kube, *apiAddr, *objectStoreAddr).Start()
}
diff --git a/events/install.yaml b/events/install.yaml
index cf53f76..9f2b66c 100644
--- a/events/install.yaml
+++ b/events/install.yaml
@@ -52,4 +52,4 @@
- name: event-processor
image: giolekva/pcloud-event-processor:latest
imagePullPolicy: Always
- command: ["event-processor", "--logtostderr", "--api_addr=http://api.pcloud.svc:1111/graphql", "--object_store_addr=http://minio.minio.svc:9000"]
+ command: ["event-processor", "--logtostderr", "--api_addr=http://api.pcloud.svc:1111/graphql", "--app_manager_addr=http://app-manager.pcloud-app-manager.svc:80", "--object_store_addr=http://minio.minio.svc:9000"]
diff --git a/events/processor.go b/events/processor.go
index 504e261..fa234cc 100644
--- a/events/processor.go
+++ b/events/processor.go
@@ -1,12 +1,15 @@
package events
import (
+ "bytes"
"context"
- "fmt"
+ "text/template"
"time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/yaml"
+ "k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/golang/glog"
@@ -19,18 +22,20 @@
// Implements processor
type singleEventAtATimeProcessor struct {
- store EventStore
- pods corev1.PodInterface
- pcloudApi string
+ store EventStore
+ appManager AppManager
+ kube *kubernetes.Clientset
+ pcloudApi string
// TODO(giolekva): Nodes themselves should be associated with object store
objectStoreApi string
}
func NewSingleEventAtATimeProcessor(
store EventStore,
- pods corev1.PodInterface,
+ appManager AppManager,
+ kube *kubernetes.Clientset,
pcloudApi, objectStoreApi string) Processor {
- return &singleEventAtATimeProcessor{store, pods, pcloudApi, objectStoreApi}
+ return &singleEventAtATimeProcessor{store, appManager, kube, pcloudApi, objectStoreApi}
}
func (p *singleEventAtATimeProcessor) Start() {
@@ -46,16 +51,28 @@
continue
}
event := events[0]
- pod := createPod(event.NodeId, p.pcloudApi, p.objectStoreApi)
- glog.Info("Creating pod...")
- resp, err := p.pods.Create(context.TODO(), pod, metav1.CreateOptions{})
+ triggers, err := p.appManager.QueryTriggers("Image", string(EventStateNew))
if err != nil {
glog.Error(err)
continue
}
- glog.Infof("Pod created: %s", resp)
- // TODO(giolekva): do not ignore error
- _ = monitorPod(resp, p.pods)
+ for _, t := range triggers {
+ pod, err := renderTriggerTemplate(t, event.NodeId, p.pcloudApi, p.objectStoreApi)
+ if err != nil {
+ glog.Errorf("Could not render trigger: %v %v", err, t)
+ continue
+ }
+ glog.Info("Creating pod: %v", pod)
+ pods := p.kube.CoreV1().Pods(t.Namespace)
+ resp, err := pods.Create(context.TODO(), pod, metav1.CreateOptions{})
+ if err != nil {
+ glog.Error(err)
+ continue
+ }
+ glog.Infof("Pod created: %s", resp)
+ // TODO(giolekva): do not ignore error
+ _ = monitorPod(resp, pods)
+ }
p.store.MarkEventDone(event)
}
}
@@ -88,16 +105,25 @@
return nil
}
-func createPod(id string, pcloudApi string, objectStoreApi string) *apiv1.Pod {
- return &apiv1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: fmt.Sprintf("event-%s", id)},
- Spec: apiv1.PodSpec{
- RestartPolicy: apiv1.RestartPolicyNever,
- Containers: []apiv1.Container{{
- Name: "event",
- Image: "giolekva/face-detector:latest",
- ImagePullPolicy: apiv1.PullAlways,
- Command: []string{"python3", "main.py"},
- Args: []string{pcloudApi, objectStoreApi, id}}}}}
+type args struct {
+ Id string
+ PCloudApiAddr string
+ ObjectStoreAddr string
+}
+
+func renderTriggerTemplate(t Trigger, id string, pcloudApi string, objectStoreApi string) (*apiv1.Pod, error) {
+ tmpl, err := template.New("trigger").Parse(t.Template)
+ if err != nil {
+ return nil, err
+ }
+ var b bytes.Buffer
+ if err := tmpl.Execute(&b, args{id, pcloudApi, objectStoreApi}); err != nil {
+ return nil, err
+ }
+ var pod apiv1.Pod
+ dec := yaml.NewYAMLOrJSONDecoder(&b, 100)
+ if err := dec.Decode(&pod); err != nil {
+ return nil, err
+ }
+ return &pod, nil
}