AppManager: App installation status monitoring

Change-Id: I64f4ae0d27892b74f8827a275907cb75da09a758
diff --git a/core/installer/status/background.go b/core/installer/status/background.go
new file mode 100644
index 0000000..bebffde
--- /dev/null
+++ b/core/installer/status/background.go
@@ -0,0 +1,47 @@
+package status
+
+import (
+	"sync"
+	"time"
+)
+
+type ste struct {
+	status Status
+	error  error
+}
+
+type backgroundMonitor struct {
+	l        sync.Locker
+	m        ResourceMonitor
+	interval time.Duration
+	cache    map[ResourceRef]ste
+}
+
+func NewBackgroundMonitor(m ResourceMonitor, interval time.Duration) ResourceMonitor {
+	return &backgroundMonitor{
+		&sync.Mutex{},
+		m,
+		interval,
+		map[ResourceRef]ste{},
+	}
+}
+
+func (m *backgroundMonitor) Get(ref ResourceRef) (Status, error) {
+	m.l.Lock()
+	defer m.l.Unlock()
+	if ret, ok := m.cache[ref]; ok {
+		return ret.status, ret.error
+	}
+	m.cache[ref] = ste{StatusNotFound, nil}
+	go func() {
+		st, err := m.m.Get(ref)
+		m.l.Lock()
+		m.cache[ref] = ste{st, err}
+		m.l.Unlock()
+		if IsStatusTerminal(st) {
+			return
+		}
+		time.Sleep(m.interval)
+	}()
+	return StatusNotFound, nil
+}
diff --git a/core/installer/status/helm.go b/core/installer/status/helm.go
new file mode 100644
index 0000000..b2db1d4
--- /dev/null
+++ b/core/installer/status/helm.go
@@ -0,0 +1,67 @@
+package status
+
+import (
+	"context"
+	"encoding/json"
+
+	"github.com/giolekva/pcloud/core/installer/kube"
+
+	"k8s.io/apimachinery/pkg/api/errors"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/runtime/schema"
+	"k8s.io/client-go/dynamic"
+)
+
+type helmRelease struct {
+	Status struct {
+		Conditions []struct {
+			Type   string `json:"type"`
+			Status string `json:"status"`
+		} `json:"conditions"`
+	} `json:"status,omitempty"`
+}
+
+type helmReleaseMonitor struct {
+	d dynamic.Interface
+}
+
+func (m *helmReleaseMonitor) Get(ref ResourceRef) (Status, error) {
+	ctx := context.Background()
+	res, err := m.d.Resource(
+		schema.GroupVersionResource{
+			Group:    "helm.toolkit.fluxcd.io",
+			Version:  "v2beta1",
+			Resource: "helmreleases",
+		},
+	).Namespace(ref.Namespace).Get(ctx, ref.Name, metav1.GetOptions{})
+	if err != nil {
+		if errors.IsNotFound(err) {
+			return StatusNotFound, nil
+		}
+		return StatusNoStatus, err
+	}
+	b, err := res.MarshalJSON()
+	if err != nil {
+		return StatusNoStatus, err
+	}
+	var hr helmRelease
+	if err := json.Unmarshal(b, &hr); err != nil {
+		return StatusNoStatus, err
+	}
+	// TODO(gio): check more thoroughly
+	for _, c := range hr.Status.Conditions {
+		if c.Type == "Ready" && c.Status == "True" {
+			return StatusSuccess, nil
+		}
+	}
+	return StatusProcessing, nil
+}
+
+func NewHelmReleaseMonitor(kubeconfig string) (ResourceMonitor, error) {
+	c, err := kube.NewKubeClient(kube.KubeConfigOpts{KubeConfigPath: kubeconfig})
+	if err != nil {
+		return nil, err
+	}
+	d := dynamic.New(c.RESTClient())
+	return &helmReleaseMonitor{d}, nil
+}
diff --git a/core/installer/status/instance.go b/core/installer/status/instance.go
new file mode 100644
index 0000000..285f68c
--- /dev/null
+++ b/core/installer/status/instance.go
@@ -0,0 +1,28 @@
+package status
+
+type InstanceMonitor struct {
+	m         Monitor
+	instances map[string][]Resource
+}
+
+func NewInstanceMonitor(m Monitor) *InstanceMonitor {
+	return &InstanceMonitor{
+		m:         m,
+		instances: map[string][]Resource{},
+	}
+}
+
+func (m *InstanceMonitor) Monitor(id string, resources []Resource) {
+	m.instances[id] = resources
+}
+
+func (m *InstanceMonitor) Get(id string) (ret map[Resource]Status, err error) {
+	ret = map[Resource]Status{}
+	for _, r := range m.instances[id] {
+		ret[r], err = m.m.Get(r)
+		if err != nil {
+			break
+		}
+	}
+	return
+}
diff --git a/core/installer/status/status.go b/core/installer/status/status.go
new file mode 100644
index 0000000..ebec72f
--- /dev/null
+++ b/core/installer/status/status.go
@@ -0,0 +1,87 @@
+package status
+
+import (
+	"fmt"
+)
+
+type Status int
+
+const (
+	StatusNoStatus Status = iota
+	StatusNotFound
+	StatusPending
+	StatusProcessing
+	StatusSuccess
+	StatusFailure
+)
+
+func IsStatusTerminal(s Status) bool {
+	return s == StatusSuccess || s == StatusFailure
+}
+
+func StatusString(s Status) string {
+	switch s {
+	case StatusNoStatus:
+		return "no_status"
+	case StatusNotFound:
+		return "not_found"
+	case StatusPending:
+		return "pending"
+	case StatusProcessing:
+		return "processing"
+	case StatusSuccess:
+		return "success"
+	case StatusFailure:
+		return "failure"
+	default:
+		panic("MUST NOT REACH!")
+	}
+}
+
+type ResourceType int
+
+const (
+	ResourceHelmRelease ResourceType = iota
+)
+
+type ResourceRef struct {
+	Name      string
+	Namespace string
+}
+
+type Resource struct {
+	ResourceRef
+	Type ResourceType
+}
+
+type Monitor interface {
+	Get(r Resource) (Status, error)
+}
+
+type ResourceMonitor interface {
+	Get(r ResourceRef) (Status, error)
+}
+
+type delegatingMonitor struct {
+	m map[ResourceType]ResourceMonitor
+}
+
+func NewDelegatingMonitor(kubeconfig string) (Monitor, error) {
+	helm, err := NewHelmReleaseMonitor(kubeconfig)
+	if err != nil {
+		return nil, err
+	}
+	return delegatingMonitor{
+		map[ResourceType]ResourceMonitor{
+			ResourceHelmRelease: helm,
+		},
+	}, nil
+}
+
+func (m delegatingMonitor) Get(r Resource) (Status, error) {
+	if rm, ok := m.m[r.Type]; !ok {
+		return StatusNoStatus, fmt.Errorf("unknown resource type: %d", r.Type)
+	} else {
+		return rm.Get(r.ResourceRef)
+	}
+}