AppManager: App installation status monitoring
Change-Id: I64f4ae0d27892b74f8827a275907cb75da09a758
diff --git a/core/installer/status/background.go b/core/installer/status/background.go
new file mode 100644
index 0000000..bebffde
--- /dev/null
+++ b/core/installer/status/background.go
@@ -0,0 +1,47 @@
+package status
+
+import (
+ "sync"
+ "time"
+)
+
+type ste struct {
+ status Status
+ error error
+}
+
+type backgroundMonitor struct {
+ l sync.Locker
+ m ResourceMonitor
+ interval time.Duration
+ cache map[ResourceRef]ste
+}
+
+func NewBackgroundMonitor(m ResourceMonitor, interval time.Duration) ResourceMonitor {
+ return &backgroundMonitor{
+ &sync.Mutex{},
+ m,
+ interval,
+ map[ResourceRef]ste{},
+ }
+}
+
+func (m *backgroundMonitor) Get(ref ResourceRef) (Status, error) {
+ m.l.Lock()
+ defer m.l.Unlock()
+ if ret, ok := m.cache[ref]; ok {
+ return ret.status, ret.error
+ }
+ m.cache[ref] = ste{StatusNotFound, nil}
+ go func() {
+ st, err := m.m.Get(ref)
+ m.l.Lock()
+ m.cache[ref] = ste{st, err}
+ m.l.Unlock()
+ if IsStatusTerminal(st) {
+ return
+ }
+ time.Sleep(m.interval)
+ }()
+ return StatusNotFound, nil
+}
diff --git a/core/installer/status/helm.go b/core/installer/status/helm.go
new file mode 100644
index 0000000..b2db1d4
--- /dev/null
+++ b/core/installer/status/helm.go
@@ -0,0 +1,67 @@
+package status
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/giolekva/pcloud/core/installer/kube"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/client-go/dynamic"
+)
+
+type helmRelease struct {
+ Status struct {
+ Conditions []struct {
+ Type string `json:"type"`
+ Status string `json:"status"`
+ } `json:"conditions"`
+ } `json:"status,omitempty"`
+}
+
+type helmReleaseMonitor struct {
+ d dynamic.Interface
+}
+
+func (m *helmReleaseMonitor) Get(ref ResourceRef) (Status, error) {
+ ctx := context.Background()
+ res, err := m.d.Resource(
+ schema.GroupVersionResource{
+ Group: "helm.toolkit.fluxcd.io",
+ Version: "v2beta1",
+ Resource: "helmreleases",
+ },
+ ).Namespace(ref.Namespace).Get(ctx, ref.Name, metav1.GetOptions{})
+ if err != nil {
+ if errors.IsNotFound(err) {
+ return StatusNotFound, nil
+ }
+ return StatusNoStatus, err
+ }
+ b, err := res.MarshalJSON()
+ if err != nil {
+ return StatusNoStatus, err
+ }
+ var hr helmRelease
+ if err := json.Unmarshal(b, &hr); err != nil {
+ return StatusNoStatus, err
+ }
+ // TODO(gio): check more thoroughly
+ for _, c := range hr.Status.Conditions {
+ if c.Type == "Ready" && c.Status == "True" {
+ return StatusSuccess, nil
+ }
+ }
+ return StatusProcessing, nil
+}
+
+func NewHelmReleaseMonitor(kubeconfig string) (ResourceMonitor, error) {
+ c, err := kube.NewKubeClient(kube.KubeConfigOpts{KubeConfigPath: kubeconfig})
+ if err != nil {
+ return nil, err
+ }
+ d := dynamic.New(c.RESTClient())
+ return &helmReleaseMonitor{d}, nil
+}
diff --git a/core/installer/status/instance.go b/core/installer/status/instance.go
new file mode 100644
index 0000000..285f68c
--- /dev/null
+++ b/core/installer/status/instance.go
@@ -0,0 +1,28 @@
+package status
+
+type InstanceMonitor struct {
+ m Monitor
+ instances map[string][]Resource
+}
+
+func NewInstanceMonitor(m Monitor) *InstanceMonitor {
+ return &InstanceMonitor{
+ m: m,
+ instances: map[string][]Resource{},
+ }
+}
+
+func (m *InstanceMonitor) Monitor(id string, resources []Resource) {
+ m.instances[id] = resources
+}
+
+func (m *InstanceMonitor) Get(id string) (ret map[Resource]Status, err error) {
+ ret = map[Resource]Status{}
+ for _, r := range m.instances[id] {
+ ret[r], err = m.m.Get(r)
+ if err != nil {
+ break
+ }
+ }
+ return
+}
diff --git a/core/installer/status/status.go b/core/installer/status/status.go
new file mode 100644
index 0000000..ebec72f
--- /dev/null
+++ b/core/installer/status/status.go
@@ -0,0 +1,87 @@
+package status
+
+import (
+ "fmt"
+)
+
+type Status int
+
+const (
+ StatusNoStatus Status = iota
+ StatusNotFound
+ StatusPending
+ StatusProcessing
+ StatusSuccess
+ StatusFailure
+)
+
+func IsStatusTerminal(s Status) bool {
+ return s == StatusSuccess || s == StatusFailure
+}
+
+func StatusString(s Status) string {
+ switch s {
+ case StatusNoStatus:
+ return "no_status"
+ case StatusNotFound:
+ return "not_found"
+ case StatusPending:
+ return "pending"
+ case StatusProcessing:
+ return "processing"
+ case StatusSuccess:
+ return "success"
+ case StatusFailure:
+ return "failure"
+ default:
+ panic("MUST NOT REACH!")
+ }
+}
+
+type ResourceType int
+
+const (
+ ResourceHelmRelease ResourceType = iota
+)
+
+type ResourceRef struct {
+ Name string
+ Namespace string
+}
+
+type Resource struct {
+ ResourceRef
+ Type ResourceType
+}
+
+type Monitor interface {
+ Get(r Resource) (Status, error)
+}
+
+type ResourceMonitor interface {
+ Get(r ResourceRef) (Status, error)
+}
+
+type delegatingMonitor struct {
+ m map[ResourceType]ResourceMonitor
+}
+
+func NewDelegatingMonitor(kubeconfig string) (Monitor, error) {
+ helm, err := NewHelmReleaseMonitor(kubeconfig)
+ if err != nil {
+ return nil, err
+ }
+ return delegatingMonitor{
+ map[ResourceType]ResourceMonitor{
+ ResourceHelmRelease: helm,
+ },
+ }, nil
+}
+
+func (m delegatingMonitor) Get(r Resource) (Status, error) {
+ if rm, ok := m.m[r.Type]; !ok {
+ return StatusNoStatus, fmt.Errorf("unknown resource type: %d", r.Type)
+ } else {
+ return rm.Get(r.ResourceRef)
+ }
+}