event-processor: delegate launching action to appmanager
diff --git a/events/appmanager.go b/events/appmanager.go
index 91b6791..783586c 100644
--- a/events/appmanager.go
+++ b/events/appmanager.go
@@ -1,19 +1,24 @@
package events
import (
+ "bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
+
+ "github.com/golang/glog"
)
type Trigger struct {
- Namespace string `json:"namespace"`
- Template string `json:"template"`
+ App string `json:"app"`
+ Action string `json:"action"`
}
type AppManager interface {
- QueryTriggers(triggerOnType string, triggerOnEvent string) ([]Trigger, error)
+ QueryTriggers(triggerOnType, triggerOnEvent string) ([]Trigger, error)
+ // TODO(giolekva): must return launched action id to enable monitoring
+ LaunchAction(app, action string, args interface{}) error
}
type appManagerClient struct {
@@ -41,3 +46,27 @@
}
return triggers, nil
}
+
+type actionArgs struct {
+ App string `json:"app"`
+ Action string `json:"action"`
+ Args interface{} `json:"args"`
+}
+
+func (c *appManagerClient) LaunchAction(app, action string, args interface{}) error {
+ actionUrl := fmt.Sprintf("%s/launch_action", c.addr)
+ reqJson, err := json.Marshal(actionArgs{app, action, args})
+ if err != nil {
+ return err
+ }
+ resp, err := http.Post(actionUrl, "application/json", bytes.NewReader(reqJson))
+ if err != nil {
+ return err
+ }
+ respBody, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ glog.Info("Triggered action: %s", string(respBody))
+ return nil
+}
diff --git a/events/install.yaml b/events/install.yaml
index 9f2b66c..fd17a00 100644
--- a/events/install.yaml
+++ b/events/install.yaml
@@ -4,34 +4,6 @@
metadata:
name: pcloud-events
---
-apiVersion: v1
-kind: ServiceAccount
-metadata:
- name: event-processor
- namespace: pcloud-events
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
-metadata:
- name: modify-pods
-rules:
- - apiGroups: [""]
- resources: ["pods"]
- verbs: ["create", "get", "watch", "list", "delete"]
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRoleBinding
-metadata:
- name: modify-pods-to-sa
-subjects:
- - kind: ServiceAccount
- name: event-processor
- namespace: pcloud-events
-roleRef:
- kind: ClusterRole
- name: modify-pods
- apiGroup: rbac.authorization.k8s.io
----
apiVersion: apps/v1
kind: Deployment
metadata:
@@ -47,9 +19,8 @@
labels:
app: event-processor
spec:
- serviceAccountName: event-processor
containers:
- name: event-processor
image: giolekva/pcloud-event-processor:latest
imagePullPolicy: Always
- command: ["event-processor", "--logtostderr", "--api_addr=http://api.pcloud.svc:1111/graphql", "--app_manager_addr=http://app-manager.pcloud-app-manager.svc:80", "--object_store_addr=http://minio.minio.svc:9000"]
+ command: ["event-processor", "--logtostderr", "--api_addr=http://api.pcloud.svc:1111/graphql", "--app_manager_addr=http://app-manager.pcloud-app-manager.svc:80", "--object_store_addr=http://minio.app-object-store.svc:9000"]
diff --git a/events/processor.go b/events/processor.go
index fa234cc..aa9ac5c 100644
--- a/events/processor.go
+++ b/events/processor.go
@@ -1,16 +1,9 @@
package events
import (
- "bytes"
- "context"
- "text/template"
"time"
- apiv1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes"
- corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/golang/glog"
// "github.com/itaysk/regogo"
@@ -57,73 +50,20 @@
continue
}
for _, t := range triggers {
- pod, err := renderTriggerTemplate(t, event.NodeId, p.pcloudApi, p.objectStoreApi)
+ err := p.appManager.LaunchAction(t.App, t.Action, args{event.NodeId, p.pcloudApi, p.objectStoreApi})
+ // TODO(giolekva): do not simply ignore error and monitor progress
if err != nil {
- glog.Errorf("Could not render trigger: %v %v", err, t)
continue
}
- glog.Info("Creating pod: %v", pod)
- pods := p.kube.CoreV1().Pods(t.Namespace)
- resp, err := pods.Create(context.TODO(), pod, metav1.CreateOptions{})
- if err != nil {
- glog.Error(err)
- continue
- }
- glog.Infof("Pod created: %s", resp)
- // TODO(giolekva): do not ignore error
- _ = monitorPod(resp, pods)
+ glog.Info("Launched action: %s %s", t.App, t.Action)
}
p.store.MarkEventDone(event)
}
}
}
-func isInTerminalState(pod *apiv1.Pod) bool {
- return pod.Status.Phase == apiv1.PodSucceeded ||
- pod.Status.Phase == apiv1.PodFailed
-}
-
-func monitorPod(pod *apiv1.Pod, pods corev1.PodInterface) error {
- w, err := pods.Watch(context.TODO(), metav1.SingleObject(pod.ObjectMeta))
- if err != nil {
- return err
- }
- for {
- select {
- case events, ok := <-w.ResultChan():
- if !ok {
- return nil
- }
- p := events.Object.(*apiv1.Pod)
- glog.Infof("Pod status: %s", pod.Status.Phase)
- if isInTerminalState(p) {
- glog.Info("Pod is DONE")
- w.Stop()
- }
- }
- }
- return nil
-}
-
type args struct {
Id string
PCloudApiAddr string
ObjectStoreAddr string
}
-
-func renderTriggerTemplate(t Trigger, id string, pcloudApi string, objectStoreApi string) (*apiv1.Pod, error) {
- tmpl, err := template.New("trigger").Parse(t.Template)
- if err != nil {
- return nil, err
- }
- var b bytes.Buffer
- if err := tmpl.Execute(&b, args{id, pcloudApi, objectStoreApi}); err != nil {
- return nil, err
- }
- var pod apiv1.Pod
- dec := yaml.NewYAMLOrJSONDecoder(&b, 100)
- if err := dec.Decode(&pod); err != nil {
- return nil, err
- }
- return &pod, nil
-}