clean up
diff --git "a/archive/argo/\043argo-events-crds-install.yaml\043" "b/archive/argo/\043argo-events-crds-install.yaml\043"
new file mode 100644
index 0000000..92b812a
--- /dev/null
+++ "b/archive/argo/\043argo-events-crds-install.yaml\043"
@@ -0,0 +1,221 @@
+oapiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+ name: gateways.argoproj.io
+spec:
+ group: argoproj.io
+ names:
+ kind: Gateway
+ listKind: GatewayList
+ plural: gateways
+ singular: gateway
+ shortNames:
+ - gw
+ scope: Namespaced
+ version: "v1alpha1"
+---
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+ name: sensors.argoproj.io
+spec:
+ group: argoproj.io
+ names:
+ kind: Sensor
+ listKind: SensorList
+ plural: sensors
+ singular: sensor
+ shortNames:
+ - sn
+ scope: Namespaced
+ version: "v1alpha1"
+---
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+ name: eventsources.argoproj.io
+spec:
+ group: argoproj.io
+ scope: Namespaced
+ names:
+ kind: EventSource
+ plural: eventsources
+ singular: eventsource
+ listKind: EventSourceList
+ shortNames:
+ - es
+ version: "v1alpha1"
+---
+# apiVersion: v1
+# kind: Namespace
+# metadata:
+# name: argo-events
+---
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ name: argo-events-sa
+ namespace: kube-system
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: RoleBinding
+metadata:
+ name: argo-events-role-binding
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: Role
+ name: argo-events-role
+subjects:
+ - kind: ServiceAccount
+ name: argo-events-sa
+ namespace: kube-system
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+ name: argo-events-role
+rules:
+ - apiGroups:
+ - argoproj.io
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
+ - watch
+ resources:
+ - workflows
+ - workflows/finalizers
+ - workflowtemplates
+ - workflowtemplates/finalizers
+ - gateways
+ - gateways/finalizers
+ - sensors
+ - sensors/finalizers
+ - eventsources
+ - eventsources/finalizers
+ - apiGroups:
+ - ""
+ resources:
+ - pods
+ - pods/exec
+ - configmaps
+ - secrets
+ - services
+ - events
+ - persistentvolumeclaims
+ verbs:
+ - create
+ - get
+ - list
+ - watch
+ - update
+ - patch
+ - delete
+ - apiGroups:
+ - "batch"
+ resources:
+ - jobs
+ verbs:
+ - create
+ - get
+ - list
+ - watch
+ - update
+ - patch
+ - delete
+ - apiGroups:
+ - "apps"
+ resources:
+ - deployments
+ verbs:
+ - create
+ - get
+ - list
+ - watch
+ - update
+ - patch
+ - delete
+---
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: gateway-controller-configmap
+ namespace: kube-system
+data:
+ config: |
+ instanceID: argo-events
+ namespace: kube-system
+---
+# The gateway-controller listens for changes on the gateway CRD and creates gateway
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: gateway-controller
+ namespace: kube-system
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: gateway-controller
+ template:
+ metadata:
+ labels:
+ app: gateway-controller
+ spec:
+ serviceAccountName: argo-events-sa
+ containers:
+ - name: gateway-controller
+ image: argoproj/gateway-controller:v0.13.0
+ imagePullPolicy: Always
+ env:
+ - name: NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: CONTROLLER_CONFIG_MAP
+ value: gateway-controller-configmap
+---
+# The sensor-controller configmap includes configuration information for the sensor-controller
+# To watch sensors created in different namespace than the controller is deployed in, remove the namespace: kube-system.
+# Similarly to watch sensors created in specific namespace, change to namespace: <your_namespace>
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: sensor-controller-configmap
+ namespace: kube-system
+data:
+ config: |
+ instanceID: argo-events
+ namespace: kube-system
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: sensor-controller
+ namespace: kube-system
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: sensor-controller
+ template:
+ metadata:
+ labels:
+ app: sensor-controller
+ spec:
+ serviceAccountName: argo-events-sa
+ containers:
+ - name: sensor-controller
+ image: argoproj/sensor-controller:v0.13.0
+ imagePullPolicy: Always
+ env:
+ - name: NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: CONTROLLER_CONFIG_MAP
+ value: sensor-controller-configmap
diff --git a/archive/argo/argo-events-crds-install.yaml b/archive/argo/argo-events-crds-install.yaml
new file mode 100644
index 0000000..c746479
--- /dev/null
+++ b/archive/argo/argo-events-crds-install.yaml
@@ -0,0 +1,221 @@
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+ name: gateways.argoproj.io
+spec:
+ group: argoproj.io
+ names:
+ kind: Gateway
+ listKind: GatewayList
+ plural: gateways
+ singular: gateway
+ shortNames:
+ - gw
+ scope: Namespaced
+ version: "v1alpha1"
+---
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+ name: sensors.argoproj.io
+spec:
+ group: argoproj.io
+ names:
+ kind: Sensor
+ listKind: SensorList
+ plural: sensors
+ singular: sensor
+ shortNames:
+ - sn
+ scope: Namespaced
+ version: "v1alpha1"
+---
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+ name: eventsources.argoproj.io
+spec:
+ group: argoproj.io
+ scope: Namespaced
+ names:
+ kind: EventSource
+ plural: eventsources
+ singular: eventsource
+ listKind: EventSourceList
+ shortNames:
+ - es
+ version: "v1alpha1"
+---
+# apiVersion: v1
+# kind: Namespace
+# metadata:
+# name: argo-events
+---
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ name: argo-events-sa
+ namespace: kube-system
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: RoleBinding
+metadata:
+ name: argo-events-role-binding
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: Role
+ name: argo-events-role
+subjects:
+ - kind: ServiceAccount
+ name: argo-events-sa
+ namespace: kube-system
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+ name: argo-events-role
+rules:
+ - apiGroups:
+ - argoproj.io
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
+ - watch
+ resources:
+ - workflows
+ - workflows/finalizers
+ - workflowtemplates
+ - workflowtemplates/finalizers
+ - gateways
+ - gateways/finalizers
+ - sensors
+ - sensors/finalizers
+ - eventsources
+ - eventsources/finalizers
+ - apiGroups:
+ - ""
+ resources:
+ - pods
+ - pods/exec
+ - configmaps
+ - secrets
+ - services
+ - events
+ - persistentvolumeclaims
+ verbs:
+ - create
+ - get
+ - list
+ - watch
+ - update
+ - patch
+ - delete
+ - apiGroups:
+ - "batch"
+ resources:
+ - jobs
+ verbs:
+ - create
+ - get
+ - list
+ - watch
+ - update
+ - patch
+ - delete
+ - apiGroups:
+ - "apps"
+ resources:
+ - deployments
+ verbs:
+ - create
+ - get
+ - list
+ - watch
+ - update
+ - patch
+ - delete
+---
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: gateway-controller-configmap
+ namespace: kube-system
+data:
+ config: |
+ instanceID: argo-events
+ namespace: kube-system
+---
+# The gateway-controller listens for changes on the gateway CRD and creates gateway
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: gateway-controller
+ namespace: kube-system
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: gateway-controller
+ template:
+ metadata:
+ labels:
+ app: gateway-controller
+ spec:
+ serviceAccountName: argo-events-sa
+ containers:
+ - name: gateway-controller
+ image: argoproj/gateway-controller:v0.13.0
+ imagePullPolicy: Always
+ env:
+ - name: NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: CONTROLLER_CONFIG_MAP
+ value: gateway-controller-configmap
+---
+# The sensor-controller configmap includes configuration information for the sensor-controller
+# To watch sensors created in different namespace than the controller is deployed in, remove the namespace: kube-system.
+# Similarly to watch sensors created in specific namespace, change to namespace: <your_namespace>
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: sensor-controller-configmap
+ namespace: kube-system
+data:
+ config: |
+ instanceID: argo-events
+ namespace: kube-system
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: sensor-controller
+ namespace: kube-system
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: sensor-controller
+ template:
+ metadata:
+ labels:
+ app: sensor-controller
+ spec:
+ serviceAccountName: argo-events-sa
+ containers:
+ - name: sensor-controller
+ image: argoproj/sensor-controller:v0.13.0
+ imagePullPolicy: Always
+ env:
+ - name: NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: CONTROLLER_CONFIG_MAP
+ value: sensor-controller-configmap
diff --git a/archive/argo/event-source.yaml b/archive/argo/event-source.yaml
new file mode 100644
index 0000000..23ec5ed
--- /dev/null
+++ b/archive/argo/event-source.yaml
@@ -0,0 +1,48 @@
+apiVersion: argoproj.io/v1alpha1
+kind: EventSource
+metadata:
+ name: nats-event-source
+spec:
+ type: nats
+ nats:
+ example:
+ # url of the nats service
+ url: nats://nats.svc:4222
+ # jsonBody specifies that all event body payload coming from this
+ # source will be JSON
+ jsonBody: true
+ # subject name
+ subject: input-objects
+ # optional backoff time for connection retries.
+ # if not provided, default connection backoff time will be used.
+ connectionBackoff:
+ # duration in nanoseconds. following value is 10 seconds
+ duration: 10000000000
+ # how many backoffs
+ steps: 5
+ # factor to increase on each step.
+ # setting factor > 1 makes backoff exponential.
+ factor: 2
+ jitter: 0.2
+
+# apiVersion: argoproj.io/v1alpha1
+# kind: EventSource
+# metadata:
+# name: minio-event-source
+# spec:
+# type: "minio"
+# minio:
+# example:
+# bucket:
+# name: input
+# endpoint: mio-minio.default.svc:9000
+# events:
+# - s3:ObjectCreated:Put
+# - s3:ObjectRemoved:Delete
+# insecure: true
+# accessKey:
+# key: accesskey
+# name: artifacts-minio
+# secretKey:
+# key: secretkey
+# name: artifacts-minio
\ No newline at end of file
diff --git a/archive/argo/gateway.yaml b/archive/argo/gateway.yaml
new file mode 100644
index 0000000..b98baf4
--- /dev/null
+++ b/archive/argo/gateway.yaml
@@ -0,0 +1,61 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Gateway
+metadata:
+ name: nats-gateway
+ labels:
+ # gateway controller with instanceId "argo-events" will process this gateway
+ gateways.argoproj.io/gateway-controller-instanceid: argo-events
+spec:
+ type: nats
+ eventSourceRef:
+ name: nats-event-source
+ template:
+ metadata:
+ name: nats-gateway
+ labels:
+ gateway-name: nats-gateway
+ spec:
+ containers:
+ - name: gateway-client
+ image: argoproj/gateway-client:v0.14.0
+ imagePullPolicy: IfNotPresent
+ command: ["/bin/gateway-client"]
+ - name: nats-events
+ image: argoproj/nats-gateway:v0.14.0
+ imagePullPolicy: IfNotPresent
+ command: ["/bin/nats-gateway"]
+ serviceAccountName: argo-events-sa
+ subscribers:
+ http:
+ - "http://nats-sensor.svc:9300/"
+
+
+# apiVersion: argoproj.io/v1alpha1
+# kind: Gateway
+# metadata:
+# name: minio-gateway
+# labels:
+# # gateway controller with instanceId "argo-events" will process this gateway
+# gateways.argoproj.io/gateway-controller-instanceid: argo-events
+# spec:
+# type: minio
+# eventSourceRef:
+# name: minio-event-source
+# template:
+# metadata:
+# name: minio-gateway
+# labels:
+# gateway-name: minio-gateway
+# spec:
+# containers:
+# - name: gateway-client
+# image: argoproj/gateway-client:v0.13.0
+# imagePullPolicy: Always
+# command: ["/bin/gateway-client"]
+# - name: minio-events
+# image: argoproj/artifact-gateway:v0.13.0
+# imagePullPolicy: Always
+# serviceAccountName: argo-events-sa
+# subscribers:
+# http:
+# - "http://minio-sensor.kube-system.svc:9300/"
diff --git a/archive/argo/install.yaml b/archive/argo/install.yaml
new file mode 100644
index 0000000..c800f6a
--- /dev/null
+++ b/archive/argo/install.yaml
@@ -0,0 +1,408 @@
+# This is an auto-generated file. DO NOT EDIT
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+ name: cronworkflows.argoproj.io
+spec:
+ group: argoproj.io
+ names:
+ kind: CronWorkflow
+ plural: cronworkflows
+ shortNames:
+ - cronwf
+ - cwf
+ scope: Namespaced
+ version: v1alpha1
+---
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+ name: workflows.argoproj.io
+spec:
+ additionalPrinterColumns:
+ - JSONPath: .status.phase
+ description: Status of the workflow
+ name: Status
+ type: string
+ - JSONPath: .status.startedAt
+ description: When the workflow was started
+ format: date-time
+ name: Age
+ type: date
+ group: argoproj.io
+ names:
+ kind: Workflow
+ plural: workflows
+ shortNames:
+ - wf
+ scope: Namespaced
+ version: v1alpha1
+---
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+ name: workflowtemplates.argoproj.io
+spec:
+ group: argoproj.io
+ names:
+ kind: WorkflowTemplate
+ plural: workflowtemplates
+ shortNames:
+ - wftmpl
+ scope: Namespaced
+ version: v1alpha1
+# ---
+# apiVersion: v1
+# kind: ServiceAccount
+# metadata:
+# name: argo
+# ---
+# apiVersion: v1
+# kind: ServiceAccount
+# metadata:
+# name: argo-server
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+ name: argo-role
+rules:
+- apiGroups:
+ - ""
+ resources:
+ - secrets
+ verbs:
+ - get
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ labels:
+ rbac.authorization.k8s.io/aggregate-to-admin: "true"
+ name: argo-aggregate-to-admin
+rules:
+- apiGroups:
+ - argoproj.io
+ resources:
+ - workflows
+ - workflows/finalizers
+ - workflowtemplates
+ - workflowtemplates/finalizers
+ - cronworkflows
+ - cronworkflows/finalizers
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
+ - watch
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ labels:
+ rbac.authorization.k8s.io/aggregate-to-edit: "true"
+ name: argo-aggregate-to-edit
+rules:
+- apiGroups:
+ - argoproj.io
+ resources:
+ - workflows
+ - workflows/finalizers
+ - workflowtemplates
+ - workflowtemplates/finalizers
+ - cronworkflows
+ - cronworkflows/finalizers
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
+ - watch
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ labels:
+ rbac.authorization.k8s.io/aggregate-to-view: "true"
+ name: argo-aggregate-to-view
+rules:
+- apiGroups:
+ - argoproj.io
+ resources:
+ - workflows
+ - workflows/finalizers
+ - workflowtemplates
+ - workflowtemplates/finalizers
+ - cronworkflows
+ - cronworkflows/finalizers
+ verbs:
+ - get
+ - list
+ - watch
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ name: argo-cluster-role
+rules:
+- apiGroups:
+ - ""
+ resources:
+ - pods
+ - pods/exec
+ verbs:
+ - create
+ - get
+ - list
+ - watch
+ - update
+ - patch
+ - delete
+- apiGroups:
+ - ""
+ resources:
+ - configmaps
+ verbs:
+ - get
+ - watch
+ - list
+- apiGroups:
+ - ""
+ resources:
+ - persistentvolumeclaims
+ verbs:
+ - create
+ - delete
+- apiGroups:
+ - argoproj.io
+ resources:
+ - workflows
+ - workflows/finalizers
+ verbs:
+ - get
+ - list
+ - watch
+ - update
+ - patch
+ - delete
+ - create
+- apiGroups:
+ - argoproj.io
+ resources:
+ - workflowtemplates
+ - workflowtemplates/finalizers
+ verbs:
+ - get
+ - list
+ - watch
+- apiGroups:
+ - ""
+ resources:
+ - serviceaccounts
+ verbs:
+ - get
+ - list
+- apiGroups:
+ - argoproj.io
+ resources:
+ - cronworkflows
+ - cronworkflows/finalizers
+ verbs:
+ - get
+ - list
+ - watch
+ - update
+ - patch
+ - delete
+- apiGroups:
+ - ""
+ resources:
+ - events
+ verbs:
+ - create
+- apiGroups:
+ - policy
+ resources:
+ - poddisruptionbudgets
+ verbs:
+ - create
+ - get
+ - delete
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ name: argo-server-cluster-role
+rules:
+- apiGroups:
+ - ""
+ resources:
+ - configmaps
+ verbs:
+ - get
+ - watch
+ - list
+- apiGroups:
+ - ""
+ resources:
+ - secrets
+ verbs:
+ - get
+- apiGroups:
+ - ""
+ resources:
+ - pods
+ - pods/exec
+ - pods/log
+ verbs:
+ - get
+ - list
+ - watch
+ - delete
+- apiGroups:
+ - ""
+ resources:
+ - secrets
+ verbs:
+ - get
+- apiGroups:
+ - argoproj.io
+ resources:
+ - workflows
+ - workflowtemplates
+ - cronworkflows
+ verbs:
+ - create
+ - get
+ - list
+ - watch
+ - update
+ - patch
+ - delete
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: RoleBinding
+metadata:
+ name: argo-binding
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: Role
+ name: argo-role
+subjects:
+- kind: ServiceAccount
+ name: default
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+ name: argo-binding
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: ClusterRole
+ name: argo-cluster-role
+subjects:
+- kind: ServiceAccount
+ name: default
+ namespace: pcloud
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+ name: argo-server-binding
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: ClusterRole
+ name: argo-server-cluster-role
+subjects:
+- kind: ServiceAccount
+ name: default
+ namespace: pcloud
+---
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: workflow-controller-configmap
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: argo-server
+spec:
+ ports:
+ - port: 2746
+ targetPort: 2746
+ selector:
+ app: argo-server
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: workflow-controller-metrics
+spec:
+ ports:
+ - port: 9090
+ protocol: TCP
+ targetPort: 9090
+ selector:
+ app: workflow-controller
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: argo-server
+spec:
+ selector:
+ matchLabels:
+ app: argo-server
+ template:
+ metadata:
+ labels:
+ app: argo-server
+ spec:
+ containers:
+ - args:
+ - server
+ image: argoproj/argocli:v2.7.4
+ name: argo-server
+ ports:
+ - containerPort: 2746
+ readinessProbe:
+ httpGet:
+ path: /
+ port: 2746
+ scheme: HTTP
+ initialDelaySeconds: 10
+ periodSeconds: 20
+ serviceAccountName: default
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: workflow-controller
+spec:
+ selector:
+ matchLabels:
+ app: workflow-controller
+ template:
+ metadata:
+ labels:
+ app: workflow-controller
+ spec:
+ containers:
+ - args:
+ - --configmap
+ - workflow-controller-configmap
+ - --executor-image
+ - argoproj/argoexec:v2.7.4
+ command:
+ - workflow-controller
+ image: argoproj/workflow-controller:v2.7.4
+ name: workflow-controller
+ serviceAccountName: default
\ No newline at end of file
diff --git a/archive/argo/mio-minio-secrets.yaml b/archive/argo/mio-minio-secrets.yaml
new file mode 100644
index 0000000..c48d5ef
--- /dev/null
+++ b/archive/argo/mio-minio-secrets.yaml
@@ -0,0 +1,9 @@
+apiVersion: v1
+kind: Secret
+metadata:
+ name: minio-credentials
+data:
+ # base64 of minio
+ accesskey: bWluaW8K
+ # base64 of minio123
+ secretkey: bWluaW8xMjMK
diff --git a/archive/argo/sensor.yaml b/archive/argo/sensor.yaml
new file mode 100644
index 0000000..6918bd5
--- /dev/null
+++ b/archive/argo/sensor.yaml
@@ -0,0 +1,138 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Sensor
+metadata:
+ name: nats-sensor
+ labels:
+ sensors.argoproj.io/sensor-controller-instanceid: argo-events
+spec:
+ template:
+ spec:
+ containers:
+ - name: sensor
+ image: argoproj/sensor:v0.14.0
+ imagePullPolicy: IfNotPresent
+ serviceAccountName: argo-events-sa
+ subscription:
+ http:
+ port: 9300
+ dependencies:
+ - name: test-dep
+ gatewayName: nats-gateway
+ eventName: example
+ triggers:
+ - template:
+ name: nats-workflow-trigger
+ k8s:
+ group: argoproj.io
+ version: v1alpha1
+ resource: workflows
+ operation: create
+ source:
+ resource:
+ apiVersion: argoproj.io/v1alpha1
+ kind: Workflow
+ metadata:
+ generateName: nats-workflow-
+ spec:
+ entrypoint: whalesay
+ arguments:
+ parameters:
+ - name: message
+ value: WILL_BE_REPLACED
+ templates:
+ - name: whalesay
+ inputs:
+ parameters:
+ - name: message
+ container:
+ image: docker/whalesay:latest
+ imagePyllPolicy: IfNotPresent
+ command: [cowsay]
+ args: ["{{inputs.parameters.message}}"]
+ parameters:
+ - src:
+ dependencyName: test-dep
+ dest: spec.arguments.parameters.0.value
+
+
+# apiVersion: argoproj.io/v1alpha1
+# kind: Sensor
+# metadata:
+# name: minio-sensor
+# labels:
+# # sensor controller with instanceId "argo-events" will process this sensor
+# sensors.argoproj.io/sensor-controller-instanceid: argo-events
+# spec:
+# template:
+# spec:
+# containers:
+# - name: sensor
+# image: argoproj/sensor:v0.13.0
+# imagePullPolicy: Always
+# serviceAccountName: argo-events-sa
+# subscription:
+# http:
+# port: 9300
+# dependencies:
+# - name: test-dep
+# gatewayName: minio-gateway
+# eventName: example
+# triggers:
+# - template:
+# name: minio-workflow-trigger
+# k8s:
+# group: argoproj.io
+# version: v1alpha1
+# resource: workflows
+# operation: create
+# source:
+# resource:
+# apiVersion: argoproj.io/v1alpha1
+# kind: Workflow
+# metadata:
+# generateName: artifact-workflow-2-
+# spec:
+# entrypoint: detect
+# templates:
+# - name: detect
+# inputs:
+# artifacts:
+# - name: input-image
+# path: /input
+# s3:
+# # endpoint: mio-minio.default.svc:9000
+# # bucket: input # change
+# key: harry.jpg
+# # insecure: true
+# # accessKeySecret:
+# # key: accessKey
+# # name: artifacts-minio
+# # secretKeySecret:
+# # key: secretKey
+# # name: artifacts-minio
+# # useSDKCreds: true
+# # outputs:
+# # artifacts:
+# # - name: output-image
+# # path: /output
+# # s3:
+# # endpoint: mio-minio.default.svc:9000
+# # bucket: output # change
+# # key: PARAMETER
+# # insecure: true
+# # accessKeySecret:
+# # key: accessKey
+# # name: artifacts-minio
+# # secretKeySecret:
+# # key: secretKey
+# # name: artifacts-minio
+# # useSDKCreds: true
+# container:
+# image: face:latest
+# command: [python face.py]
+# args: ["/input", "/output"]
+# # parameters:
+# # - src:
+# # dependencyName: test-dep
+# # dataKey: notification.0.s3.object.key
+# # dest: spec.templates.0.inputs.artifacts.0.s3.key
diff --git a/archive/argo/setup.sh b/archive/argo/setup.sh
new file mode 100644
index 0000000..7d6cc53
--- /dev/null
+++ b/archive/argo/setup.sh
@@ -0,0 +1,21 @@
+#!/bin/sh
+
+# kubectl create namespace argo
+kubectl apply -n pcloud -f install.yaml
+
+# kubectl apply -n kube-system -f mio-minio-secrets.yaml
+
+
+# helm repo add argo https://argoproj.github.io/argo-helm
+# helm install my-argo --namespace kube-system argo/argo
+# read -s
+# kubectl -n kube-system port-forward deployment/my-argo-server 2746 &
+# read -s
+
+#kubectl apply -n kube-system -f argo-events-crds-install.yaml
+#read -s
+
+
+#kubectl apply -n kube-system -f event-source.yaml
+#kubectl apply -n kube-system -f gateway.yaml
+#kubectl apply -n kube-system -f sensor.yaml
diff --git a/archive/nats/deployment.yaml b/archive/nats/deployment.yaml
new file mode 100644
index 0000000..66418cb
--- /dev/null
+++ b/archive/nats/deployment.yaml
@@ -0,0 +1,15 @@
+---
+apiVersion: "nats.io/v1alpha2"
+kind: "NatsCluster"
+metadata:
+ name: "nats"
+spec:
+ size: 1
+---
+apiVersion: "streaming.nats.io/v1alpha1"
+kind: "NatsStreamingCluster"
+metadata:
+ name: "nats-streaming"
+spec:
+ size: 2
+ natsSvc: "nats"
\ No newline at end of file
diff --git a/archive/nats/setup.sh b/archive/nats/setup.sh
new file mode 100644
index 0000000..e988ea3
--- /dev/null
+++ b/archive/nats/setup.sh
@@ -0,0 +1,11 @@
+# kubectl apply -f https://github.com/nats-io/nats-operator/releases/download/v0.5.0/00-prereqs.yaml
+# kubectl apply -f https://github.com/nats-io/nats-operator/releases/download/v0.5.0/10-deployment.yaml
+
+# # Install NATS Streaming Operator on default namespace
+# kubectl apply -f https://raw.githubusercontent.com/nats-io/nats-streaming-operator/master/deploy/default-rbac.yaml
+
+# kubectl apply -f https://raw.githubusercontent.com/nats-io/nats-streaming-operator/master/deploy/deployment.yaml
+
+# sleep 10
+
+kubectl apply -f deployment.yaml
diff --git a/archive/pfs/Dockerfile b/archive/pfs/Dockerfile
new file mode 100644
index 0000000..5984a5f
--- /dev/null
+++ b/archive/pfs/Dockerfile
@@ -0,0 +1,30 @@
+FROM ubuntu:latest
+
+RUN apt-get update --fix-missing
+RUN apt-get -y upgrade
+RUN apt-get -y install wget git bash unzip
+
+WORKDIR /tmp
+RUN wget https://dl.google.com/go/go1.14.linux-amd64.tar.gz
+RUN tar -xvf go1.14.linux-amd64.tar.gz
+RUN mv go /usr/local
+RUN rm go1.14.linux-amd64.tar.gz
+
+ENV GOROOT=/usr/local/go
+ENV GOPATH=/src/go
+ENV GOBIN=$GOPATH/bin
+ENV PATH=$GOBIN:$GOROOT/bin:$PATH
+
+RUN go get -u google.golang.org/grpc
+
+WORKDIR /src/protoc
+RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v3.11.4/protoc-3.11.4-linux-x86_64.zip
+RUN unzip protoc-3.11.4-linux-x86_64.zip
+RUN rm protoc-3.11.4-linux-x86_64.zip
+ENV PATH=/src/protoc/bin:$PATH
+
+RUN go get -u github.com/golang/protobuf/protoc-gen-go
+RUN go get -u google.golang.org/protobuf/encoding/prototext
+RUN go get -u github.com/google/uuid
+
+WORKDIR /src/go/src/github.com/giolekva/pcloud/pfs
diff --git a/archive/pfs/api/api.proto b/archive/pfs/api/api.proto
new file mode 100644
index 0000000..56a8e9e
--- /dev/null
+++ b/archive/pfs/api/api.proto
@@ -0,0 +1,132 @@
+syntax = "proto3";
+
+package pcloud.api;
+
+option go_package = "api";
+
+enum ChunkStatus {
+ NEW = 0;
+ CREATED = 1;
+ WRITING = 2;
+ REPLICATING = 3;
+ READY = 4;
+}
+
+enum ReplicaRole {
+ SECONDARY = 0;
+ PRIMARY = 1;
+}
+
+// ChunkStorage
+
+service ChunkStorage {
+ rpc ListChunks(ListChunksRequest) returns (ListChunksResponse) {}
+
+ rpc CreateChunk(CreateChunkRequest) returns (CreateChunkResponse) {}
+
+ rpc GetChunkStatus(GetChunkStatusRequest) returns (GetChunkStatusResponse) {}
+
+ rpc ReadChunk(ReadChunkRequest) returns (ReadChunkResponse) {}
+
+ rpc WriteChunk(WriteChunkRequest) returns (WriteChunkResponse) {}
+
+ rpc RemoveChunk(RemoveChunkRequest) returns (RemoveChunkResponse) {}
+}
+
+message ListChunksRequest {
+}
+
+message ListChunksResponse {
+ repeated string chunk_id = 1;
+}
+
+message CreateChunkRequest {
+ string chunk_id = 1;
+ int32 size = 2;
+ ReplicaRole role = 3;
+ string primary_address = 4;
+}
+
+message CreateChunkResponse {
+}
+
+message GetChunkStatusRequest {
+ string chunk_id = 1;
+}
+
+message GetChunkStatusResponse {
+ ChunkStatus status = 1;
+ int32 total_bytes = 2;
+ int32 committed_bytes = 3;
+}
+
+message ReadChunkRequest {
+ string chunk_id = 1;
+ int32 offset = 2;
+ int32 num_bytes = 3;
+}
+
+message ReadChunkResponse {
+ bytes data = 1;
+}
+
+message WriteChunkRequest {
+ string chunk_id = 1;
+ int32 offset = 2;
+ bytes data = 3;
+}
+
+message WriteChunkResponse {
+ int32 bytes_written = 1;
+}
+
+message RemoveChunkRequest {
+ string chunk_id = 1;
+}
+
+message RemoveChunkResponse {
+}
+
+// MetadataStorage
+
+message ChunkStorageMetadata {
+ string chunk_id = 1;
+ int32 size_bytes = 2;
+ repeated string server = 3;
+}
+
+service MetadataStorage {
+ rpc AddChunkServer(AddChunkServerRequest) returns (AddChunkServerResponse) {}
+
+ rpc CreateBlob(CreateBlobRequest) returns (CreateBlobResponse) {}
+
+ rpc GetBlobMetadata(GetBlobMetadataRequest) returns (GetBlobMetadataResponse) {}
+}
+
+message AddChunkServerRequest {
+ string address = 1;
+}
+
+message AddChunkServerResponse {
+}
+
+message CreateBlobRequest {
+ int32 size_bytes = 1;
+ int32 chunk_size_bytes = 2;
+ int32 num_replicas = 3;
+}
+
+message CreateBlobResponse {
+ string blob_id = 1;
+ repeated ChunkStorageMetadata chunk = 2;
+}
+
+message GetBlobMetadataRequest {
+ string blob_id = 1;
+}
+
+message GetBlobMetadataResponse {
+ string blob_id = 1;
+ int32 size_bytes = 2;
+ repeated ChunkStorageMetadata chunk = 3;
+}
\ No newline at end of file
diff --git a/archive/pfs/api/client.go b/archive/pfs/api/client.go
new file mode 100644
index 0000000..3e05de6
--- /dev/null
+++ b/archive/pfs/api/client.go
@@ -0,0 +1,12 @@
+package api
+
+import (
+ "google.golang.org/grpc"
+)
+
+func DialConn(address string) (*grpc.ClientConn, error) {
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ return grpc.Dial(address, opts...)
+}
diff --git a/archive/pfs/chunk/chunk.go b/archive/pfs/chunk/chunk.go
new file mode 100644
index 0000000..aaa1bfb
--- /dev/null
+++ b/archive/pfs/chunk/chunk.go
@@ -0,0 +1,23 @@
+package chunk
+
+import (
+ "io"
+
+ "github.com/giolekva/pcloud/pfs/api"
+)
+
+type ChunkInfo struct {
+ Status api.ChunkStatus
+ Size int
+ Committed int
+}
+
+type Chunk interface {
+ Stats() (ChunkInfo, error)
+ ReaderAt() io.ReaderAt
+ WriterAt() io.WriterAt
+}
+
+type ChunkFactory interface {
+ New(size int) Chunk
+}
diff --git a/archive/pfs/chunk/file.go b/archive/pfs/chunk/file.go
new file mode 100644
index 0000000..3502a50
--- /dev/null
+++ b/archive/pfs/chunk/file.go
@@ -0,0 +1,49 @@
+package chunk
+
+import (
+ "io"
+ "os"
+
+ "github.com/giolekva/pcloud/pfs/api"
+)
+
+type ReadOnlyFileChunk struct {
+ f *os.File
+ offset int
+ size int
+}
+
+func NewReadOnlyFileChunk(f *os.File, offset, size int) Chunk {
+ return &ReadOnlyFileChunk{f, offset, size}
+}
+
+func (c *ReadOnlyFileChunk) Stats() (ChunkInfo, error) {
+ return ChunkInfo{
+ Status: api.ChunkStatus_READY,
+ Size: c.size,
+ Committed: c.size}, nil
+}
+
+func (c *ReadOnlyFileChunk) ReaderAt() io.ReaderAt {
+ return &fileReader{c.f}
+}
+
+func (c *ReadOnlyFileChunk) WriterAt() io.WriterAt {
+ return &fileWriter{c.f}
+}
+
+type fileReader struct {
+ f *os.File
+}
+
+func (f *fileReader) ReadAt(b []byte, offset int64) (int, error) {
+ return f.f.ReadAt(b, offset)
+}
+
+type fileWriter struct {
+ f *os.File
+}
+
+func (f *fileWriter) WriteAt(b []byte, offset int64) (int, error) {
+ return f.f.WriteAt(b, offset)
+}
diff --git a/archive/pfs/chunk/in_memory.go b/archive/pfs/chunk/in_memory.go
new file mode 100644
index 0000000..b9b55ec
--- /dev/null
+++ b/archive/pfs/chunk/in_memory.go
@@ -0,0 +1,68 @@
+package chunk
+
+import (
+ "bytes"
+ "errors"
+ "io"
+
+ "github.com/giolekva/pcloud/pfs/api"
+)
+
+type InMemoryChunk struct {
+ status api.ChunkStatus
+ payload []byte
+ committed int
+}
+
+func (c *InMemoryChunk) Stats() (ChunkInfo, error) {
+ return ChunkInfo{c.status, len(c.payload), c.committed}, nil
+}
+
+func (c *InMemoryChunk) ReaderAt() io.ReaderAt {
+ return bytes.NewReader(c.payload[:c.committed])
+}
+
+func (c *InMemoryChunk) WriterAt() io.WriterAt {
+ return &byteWriter{c}
+}
+
+type byteWriter struct {
+ c *InMemoryChunk
+}
+
+func (w *byteWriter) WriteAt(p []byte, offset int64) (n int, err error) {
+ if int(offset) > w.c.committed {
+ panic(1)
+ return 0, errors.New("Gaps are not allowed when writing in chunks")
+ }
+ if int(offset) < w.c.committed {
+ if int(offset)+len(p) <= w.c.committed {
+ if bytes.Compare(w.c.payload[int(offset):int(offset)+len(p)], p) != 0 {
+ panic(2)
+ return 0, errors.New("Can not change contents of allready committed chunk bytes")
+ }
+ panic(3)
+ return len(p), nil
+ }
+ n = w.c.committed - int(offset)
+ p = p[n:]
+ offset = int64(w.c.committed)
+ }
+ if w.c.committed+len(p) > len(w.c.payload) {
+ panic(4)
+ return 0, errors.New("In memory chunk does not have enough space available")
+ }
+ n += copy(w.c.payload[w.c.committed:], p)
+ w.c.committed += n
+ return
+}
+
+type InMemoryChunkFactory struct {
+}
+
+func (f InMemoryChunkFactory) New(size int) Chunk {
+ return &InMemoryChunk{
+ status: api.ChunkStatus_CREATED,
+ payload: make([]byte, size),
+ committed: 0}
+}
diff --git a/archive/pfs/chunk/in_memory_test.go b/archive/pfs/chunk/in_memory_test.go
new file mode 100644
index 0000000..b9711ca
--- /dev/null
+++ b/archive/pfs/chunk/in_memory_test.go
@@ -0,0 +1,27 @@
+package chunk
+
+import (
+ "bytes"
+ "testing"
+)
+
+func TestConcurrentReads(t *testing.T) {
+ c := InMemoryChunkFactory{}.New(4)
+ if _, err := c.WriterAt().WriteAt([]byte("abcd"), 0); err != nil {
+ panic(err)
+ }
+ d1 := make([]byte, 2)
+ d2 := make([]byte, 3)
+ if _, err := c.ReaderAt().ReadAt(d1, 0); err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(d1, []byte("ab")) != 0 {
+ t.Errorf("Expected: %s\nActual: %s", "ab", d1)
+ }
+ if _, err := c.ReaderAt().ReadAt(d2, 0); err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(d2, []byte("abc")) != 0 {
+ t.Errorf("Expected: %s\nActual: %s", "abc", d2)
+ }
+}
diff --git a/archive/pfs/chunk/remote.go b/archive/pfs/chunk/remote.go
new file mode 100644
index 0000000..6d84241
--- /dev/null
+++ b/archive/pfs/chunk/remote.go
@@ -0,0 +1,74 @@
+package chunk
+
+import (
+ "context"
+ "io"
+
+ "github.com/giolekva/pcloud/pfs/api"
+)
+
+type RemoteChunk struct {
+ chunkId string
+ client api.ChunkStorageClient
+}
+
+func (r *RemoteChunk) Stats() (info ChunkInfo, err error) {
+ resp, err := r.client.GetChunkStatus(
+ context.Background(),
+ &api.GetChunkStatusRequest{ChunkId: r.chunkId})
+ if err != nil {
+ return
+ }
+ info = ChunkInfo{
+ resp.Status,
+ int(resp.TotalBytes),
+ int(resp.CommittedBytes)}
+ return
+}
+
+func (r *RemoteChunk) ReaderAt() io.ReaderAt {
+ return &remoteChunkReaderAt{
+ chunkId: r.chunkId,
+ client: r.client}
+}
+
+func (r *RemoteChunk) WriterAt() io.WriterAt {
+ return &remoteChunkWriterAt{
+ chunkId: r.chunkId,
+ client: r.client}
+}
+
+type remoteChunkReaderAt struct {
+ chunkId string
+ client api.ChunkStorageClient
+}
+
+func (c *remoteChunkReaderAt) ReadAt(p []byte, offset int64) (n int, err error) {
+ req := api.ReadChunkRequest{
+ ChunkId: c.chunkId,
+ Offset: int32(offset),
+ NumBytes: int32(len(p))}
+ resp, err := c.client.ReadChunk(context.Background(), &req)
+ if err != nil {
+ return
+ }
+ n = copy(p, resp.Data)
+ return
+}
+
+type remoteChunkWriterAt struct {
+ chunkId string
+ client api.ChunkStorageClient
+}
+
+func (c *remoteChunkWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
+ req := api.WriteChunkRequest{
+ ChunkId: c.chunkId,
+ Offset: int32(offset),
+ Data: p}
+ resp, err := c.client.WriteChunk(context.Background(), &req)
+ if resp != nil {
+ n = int(resp.BytesWritten)
+ }
+ return
+}
diff --git a/archive/pfs/chunk/replicator.go b/archive/pfs/chunk/replicator.go
new file mode 100644
index 0000000..d8990a7
--- /dev/null
+++ b/archive/pfs/chunk/replicator.go
@@ -0,0 +1,116 @@
+package chunk
+
+import (
+ "context"
+ "io"
+
+ "google.golang.org/grpc"
+
+ "github.com/giolekva/pcloud/pfs/api"
+)
+
+type ReplicaAssignmentChangeListener interface {
+ Primary(chunkId, currentPrimary string) <-chan string
+}
+
+type PrimaryReplicaChangeListener interface {
+ ChunkId() string
+ Address() <-chan string
+}
+
+type NonChangingReplicaAssignment struct {
+}
+
+func (l *NonChangingReplicaAssignment) Primary(chunkId, address string) <-chan string {
+ ch := make(chan string, 1)
+ ch <- address
+ return ch
+}
+
+func replicate(ctx context.Context, dst, src Chunk, done chan<- int) {
+ dstInfo, err := dst.Stats()
+ if err != nil {
+ panic(err)
+ }
+ inp := src.ReaderAt()
+ replicated := dstInfo.Committed
+ out := dst.WriterAt()
+ for {
+ select {
+ default:
+ p := make([]byte, 100)
+ n, err := inp.ReadAt(p, int64(replicated))
+ if n > 0 {
+ m, _ := out.WriteAt(p[:n], int64(replicated))
+ replicated += m
+ }
+ if err == io.EOF {
+ done <- 1
+ return
+ }
+
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func ReplicateFromPrimary(ctx context.Context, chunkId string, dst Chunk, primaryAddressCh <-chan string) {
+ var done chan int
+ var cancel context.CancelFunc = nil
+ for {
+ select {
+ case <-done:
+ return
+ case <-ctx.Done():
+ return
+ case address := <-primaryAddressCh:
+ if cancel != nil {
+ cancel()
+ }
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(address, opts...)
+ if err == nil {
+ continue
+ }
+ client := api.NewChunkStorageClient(conn)
+ src := RemoteChunk{chunkId, client}
+ replicatorCtx, cancelFn := context.WithCancel(context.Background())
+ cancel = cancelFn
+ done = make(chan int, 1)
+ go replicate(replicatorCtx, dst, &src, done)
+ }
+ }
+}
+
+func WriteToPrimary(ctx context.Context, chunkId string, src Chunk, primaryAddressCh <-chan string) {
+ var done chan int
+ var cancel context.CancelFunc = nil
+ for {
+ select {
+ case <-done:
+ return
+ case <-ctx.Done():
+ return
+ case address := <-primaryAddressCh:
+ if cancel != nil {
+ cancel()
+ }
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(address, opts...)
+ if err != nil {
+ continue
+ }
+ client := api.NewChunkStorageClient(conn)
+ dst := RemoteChunk{chunkId, client}
+ replicatorCtx, cancelFn := context.WithCancel(context.Background())
+ cancel = cancelFn
+ done = make(chan int, 1)
+ go replicate(replicatorCtx, &dst, src, done)
+ }
+ }
+}
diff --git a/archive/pfs/chunk/server.go b/archive/pfs/chunk/server.go
new file mode 100644
index 0000000..d619f13
--- /dev/null
+++ b/archive/pfs/chunk/server.go
@@ -0,0 +1,123 @@
+package chunk
+
+import (
+ "context"
+ "fmt"
+ "log"
+
+ "sync"
+
+ "github.com/giolekva/pcloud/pfs/api"
+)
+
+type ChunkServer struct {
+ factory ChunkFactory
+ assignmentChangeLis ReplicaAssignmentChangeListener
+ chunks sync.Map
+ replicatorCancel sync.Map
+}
+
+func NewChunkServer(factory ChunkFactory,
+ assignmentChangeLis ReplicaAssignmentChangeListener) *ChunkServer {
+ return &ChunkServer{
+ factory: factory,
+ assignmentChangeLis: assignmentChangeLis}
+}
+
+func (s *ChunkServer) ListChunks(
+ ctx context.Context,
+ req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
+ resp = &api.ListChunksResponse{}
+ s.chunks.Range(func(k, v interface{}) bool {
+ resp.ChunkId = append(resp.ChunkId, k.(string))
+ return true
+ })
+ return
+}
+
+func (s *ChunkServer) CreateChunk(
+ ctx context.Context,
+ req *api.CreateChunkRequest) (resp *api.CreateChunkResponse, err error) {
+ chunk := s.factory.New(int(req.Size))
+ s.chunks.Store(req.ChunkId, chunk)
+ switch req.Role {
+ case api.ReplicaRole_SECONDARY:
+ ctx, cancel := context.WithCancel(context.Background())
+ s.replicatorCancel.Store(req.ChunkId, cancel)
+ primaryAddressCh := s.assignmentChangeLis.Primary(
+ req.ChunkId, req.PrimaryAddress)
+ go ReplicateFromPrimary(ctx, req.ChunkId, chunk, primaryAddressCh)
+ case api.ReplicaRole_PRIMARY:
+ {
+ }
+ }
+ resp = &api.CreateChunkResponse{}
+ log.Printf("Created chunk: %s\n", req.ChunkId)
+ return
+
+}
+
+func (s *ChunkServer) GetChunkStatus(
+ ctx context.Context,
+ req *api.GetChunkStatusRequest) (resp *api.GetChunkStatusResponse, err error) {
+ if chunk, ok := s.chunks.Load(req.ChunkId); ok {
+ c := chunk.(Chunk)
+ var info ChunkInfo
+ info, err = c.Stats()
+ if err != nil {
+ return
+ }
+ resp = &api.GetChunkStatusResponse{
+ Status: info.Status,
+ TotalBytes: int32(info.Size),
+ CommittedBytes: int32(info.Committed)}
+ return
+ }
+ return nil, fmt.Errorf("Could not fund chunk: %s", req.ChunkId)
+}
+
+func (s *ChunkServer) ReadChunk(
+ ctx context.Context,
+ req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) {
+ if value, ok := s.chunks.Load(req.ChunkId); ok {
+ chunk := value.(Chunk)
+ b := make([]byte, req.NumBytes)
+ var n int
+ n, err = chunk.ReaderAt().ReadAt(b, int64(req.Offset))
+ if n == 0 {
+ return
+ }
+ return &api.ReadChunkResponse{Data: b[:n]}, nil
+
+ } else {
+ return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
+ }
+}
+
+func (s *ChunkServer) WriteChunk(
+ ctx context.Context,
+ req *api.WriteChunkRequest) (resp *api.WriteChunkResponse, err error) {
+ if value, ok := s.chunks.Load(req.ChunkId); ok {
+ chunk := value.(Chunk)
+ var n int
+ n, err = chunk.WriterAt().WriteAt(req.Data, int64(req.Offset))
+ if n == 0 {
+ return
+ }
+ return &api.WriteChunkResponse{BytesWritten: int32(n)}, nil
+
+ } else {
+ return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
+ }
+}
+
+func (s *ChunkServer) RemoveChunk(
+ ctx context.Context,
+ req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
+ if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
+ cancel.(context.CancelFunc)()
+ s.replicatorCancel.Delete(req.ChunkId)
+ }
+ s.chunks.Delete(req.ChunkId)
+ return &api.RemoveChunkResponse{}, nil
+}
diff --git a/archive/pfs/chunk/server_test.go b/archive/pfs/chunk/server_test.go
new file mode 100644
index 0000000..9549a06
--- /dev/null
+++ b/archive/pfs/chunk/server_test.go
@@ -0,0 +1,103 @@
+package chunk
+
+import (
+ "bytes"
+ "context"
+ "testing"
+
+ "github.com/giolekva/pcloud/pfs/api"
+)
+
+func TestStoreChunk(t *testing.T) {
+ s := ChunkServer{factory: &InMemoryChunkFactory{}}
+ _, err := s.CreateChunk(context.Background(), &api.CreateChunkRequest{
+ ChunkId: "foo",
+ Size: 11,
+ Role: api.ReplicaRole_PRIMARY})
+ if err != nil {
+ t.Error(err)
+ }
+ _, err = s.WriteChunk(context.Background(), &api.WriteChunkRequest{
+ ChunkId: "foo",
+ Offset: 0,
+ Data: []byte("hello world")})
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestStoreAndReadChunk(t *testing.T) {
+ s := ChunkServer{factory: &InMemoryChunkFactory{}}
+ _, err := s.CreateChunk(context.Background(), &api.CreateChunkRequest{
+ ChunkId: "foo",
+ Size: 11,
+ Role: api.ReplicaRole_PRIMARY})
+ if err != nil {
+ t.Error(err)
+ }
+ _, err = s.WriteChunk(context.Background(), &api.WriteChunkRequest{
+ ChunkId: "foo",
+ Offset: 0,
+ Data: []byte("hello world")})
+ if err != nil {
+ t.Error(err)
+ }
+ resp, err := s.ReadChunk(context.Background(), &api.ReadChunkRequest{
+ ChunkId: "foo",
+ NumBytes: 100})
+ if err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(resp.Data, []byte("hello world")) != 0 {
+ t.Errorf("Expected: %s\nGot: %s\n", "hello world", resp.Data)
+ }
+}
+
+func TestReadWithOffsets(t *testing.T) {
+ s := ChunkServer{factory: &InMemoryChunkFactory{}}
+ _, err := s.CreateChunk(context.Background(), &api.CreateChunkRequest{
+ ChunkId: "foo",
+ Size: 11,
+ Role: api.ReplicaRole_PRIMARY})
+ if err != nil {
+ t.Error(err)
+ }
+ _, err = s.WriteChunk(context.Background(), &api.WriteChunkRequest{
+ ChunkId: "foo",
+ Offset: 0,
+ Data: []byte("hello world")})
+ if err != nil {
+ t.Error(err)
+ }
+ resp, err := s.ReadChunk(context.Background(), &api.ReadChunkRequest{
+ ChunkId: "foo",
+ Offset: 0,
+ NumBytes: 2})
+ if err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(resp.Data, []byte("he")) != 0 {
+ t.Errorf("Expected: %s\nGot: %s\n", "he", resp.Data)
+ }
+ resp, err = s.ReadChunk(context.Background(), &api.ReadChunkRequest{
+ ChunkId: "foo",
+ Offset: 2,
+ NumBytes: 2})
+ if err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(resp.Data, []byte("ll")) != 0 {
+ t.Errorf("Expected: %s\nGot: %s\n", "ll", resp.Data)
+ }
+ resp, err = s.ReadChunk(context.Background(), &api.ReadChunkRequest{
+ ChunkId: "foo",
+ Offset: 4,
+ NumBytes: 100})
+ if err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(resp.Data, []byte("o world")) != 0 {
+ t.Errorf("Expected: %s\nGot: %s\n", "o world", resp.Data)
+ }
+
+}
diff --git a/archive/pfs/chunk_server.go b/archive/pfs/chunk_server.go
new file mode 100644
index 0000000..04a266e
--- /dev/null
+++ b/archive/pfs/chunk_server.go
@@ -0,0 +1,55 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "log"
+ "net"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/giolekva/pcloud/pfs/api"
+ "github.com/giolekva/pcloud/pfs/chunk"
+)
+
+var controllerAddress = flag.String("controller", "localhost:123", "Metadata storage address.")
+var selfAddress = flag.String("self", "", "Metadata storage address.")
+
+func main() {
+ flag.Parse()
+ log.Print("Chunk server starting")
+
+ // Create Master server client.
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(*controllerAddress, opts...)
+ if err != nil {
+ log.Fatalf("Failed to dial %s: %v", *controllerAddress, err)
+ }
+ defer conn.Close()
+ client := api.NewMetadataStorageClient(conn)
+
+ // Register current Chunk server with Master.
+ ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+ _, err = client.AddChunkServer(
+ ctx,
+ &api.AddChunkServerRequest{Address: *selfAddress})
+ if err != nil {
+ log.Fatalf("failed to register chunk server: %v", err)
+ }
+ log.Print("Registered myself")
+
+ // Start RPC server
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 234))
+ if err != nil {
+ log.Fatalf("failed to listen: %v", err)
+ }
+ server := grpc.NewServer()
+ api.RegisterChunkStorageServer(server, chunk.NewChunkServer(
+ &chunk.InMemoryChunkFactory{},
+ &chunk.NonChangingReplicaAssignment{}))
+ server.Serve(lis)
+}
diff --git a/archive/pfs/client.go b/archive/pfs/client.go
new file mode 100644
index 0000000..2abb47f
--- /dev/null
+++ b/archive/pfs/client.go
@@ -0,0 +1,36 @@
+package main
+
+import (
+ "flag"
+ "log"
+ "os"
+
+ "google.golang.org/grpc"
+
+ "github.com/giolekva/pcloud/pfs/api"
+ "github.com/giolekva/pcloud/pfs/client"
+)
+
+var controllerAddress = flag.String("controller", "localhost:123", "Metadata storage address.")
+var fileToUpload = flag.String("file", "", "File path to upload.")
+
+func main() {
+ flag.Parse()
+
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(*controllerAddress, opts...)
+ if err != nil {
+ log.Fatalf("Failed to dial %s: %v", *controllerAddress, err)
+ }
+ defer conn.Close()
+ uploader := client.NewFileUploader(api.NewMetadataStorageClient(conn))
+
+ f, err := os.Open(*fileToUpload)
+ if err != nil {
+ panic(err)
+ }
+
+ uploader.Upload(f)
+}
diff --git a/archive/pfs/client/client.go b/archive/pfs/client/client.go
new file mode 100644
index 0000000..a8492a3
--- /dev/null
+++ b/archive/pfs/client/client.go
@@ -0,0 +1,43 @@
+package client
+
+import (
+ "context"
+ "os"
+
+ "github.com/giolekva/pcloud/pfs/api"
+ "github.com/giolekva/pcloud/pfs/chunk"
+)
+
+type FileUploader struct {
+ client api.MetadataStorageClient
+}
+
+func NewFileUploader(client api.MetadataStorageClient) *FileUploader {
+ return &FileUploader{client}
+}
+
+func (fu *FileUploader) Upload(f *os.File, numReplicas int) {
+ info, err := f.Stat()
+ if err != nil {
+ return
+ }
+ resp, err := fu.client.CreateBlob(
+ context.Background(), &api.CreateBlobRequest{
+ SizeBytes: int32(info.Size()),
+ NumReplicas: int32(numReplicas)})
+ if err != nil {
+ panic(err)
+ }
+ if len(resp.Chunk) != 1 {
+ panic(resp)
+ }
+ lis := &chunk.NonChangingReplicaAssignment{}
+ primaryAddressCh := lis.Primary(
+ resp.Chunk[0].ChunkId,
+ resp.Chunk[0].Server[0])
+ chunk.WriteToPrimary(
+ context.Background(),
+ resp.Chunk[0].ChunkId,
+ chunk.NewReadOnlyFileChunk(f, 0, int(info.Size())),
+ primaryAddressCh)
+}
diff --git a/archive/pfs/client/client_test.go b/archive/pfs/client/client_test.go
new file mode 100644
index 0000000..b9f3002
--- /dev/null
+++ b/archive/pfs/client/client_test.go
@@ -0,0 +1,62 @@
+package client
+
+import (
+ "os"
+ "testing"
+
+ "google.golang.org/grpc"
+
+ "github.com/giolekva/pcloud/pfs/api"
+ pt "github.com/giolekva/pcloud/pfs/testing"
+)
+
+func TestUploadSmallFile(t *testing.T) {
+ env, err := pt.NewInMemoryEnv(1)
+ if err != nil {
+ t.Error(err)
+ }
+ defer env.Stop()
+
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial("unix:///tmp/pcloud/controller", opts...)
+ if err != nil {
+ t.Error(err)
+ }
+ defer conn.Close()
+ client := api.NewMetadataStorageClient(conn)
+
+ uploader := NewFileUploader(client)
+ f, err := os.Open("testdata/foo")
+ if err != nil {
+ t.Error(err)
+ }
+ uploader.Upload(f, 1)
+
+}
+
+func TestUploadSmallFileWithReplication(t *testing.T) {
+ env, err := pt.NewInMemoryEnv(3)
+ if err != nil {
+ t.Error(err)
+ }
+ defer env.Stop()
+
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial("unix:///tmp/pcloud/controller", opts...)
+ if err != nil {
+ t.Error(err)
+ }
+ defer conn.Close()
+ client := api.NewMetadataStorageClient(conn)
+
+ uploader := NewFileUploader(client)
+ f, err := os.Open("testdata/foo")
+ if err != nil {
+ t.Error(err)
+ }
+ uploader.Upload(f, 2)
+}
diff --git a/archive/pfs/client/testdata/foo b/archive/pfs/client/testdata/foo
new file mode 100644
index 0000000..257cc56
--- /dev/null
+++ b/archive/pfs/client/testdata/foo
@@ -0,0 +1 @@
+foo
diff --git a/archive/pfs/controller/chunk.go b/archive/pfs/controller/chunk.go
new file mode 100644
index 0000000..5fb694f
--- /dev/null
+++ b/archive/pfs/controller/chunk.go
@@ -0,0 +1,18 @@
+package controller
+
+import (
+ "github.com/giolekva/pcloud/pfs/api"
+)
+
+type chunkServerStatus int
+
+const (
+ Healthy chunkServerStatus = iota
+ UNREACHABLE
+)
+
+type chunkServer struct {
+ address string
+ status chunkServerStatus
+ chunks map[string]api.ChunkStatus
+}
diff --git a/archive/pfs/controller/server.go b/archive/pfs/controller/server.go
new file mode 100644
index 0000000..885ad55
--- /dev/null
+++ b/archive/pfs/controller/server.go
@@ -0,0 +1,116 @@
+package controller
+
+import (
+ "context"
+ "log"
+ "math/rand"
+
+ "github.com/google/uuid"
+ "google.golang.org/grpc"
+
+ "github.com/giolekva/pcloud/pfs/api"
+)
+
+type chunkServers struct {
+ address string
+}
+
+type BlobStatus int
+
+const (
+ NEW BlobStatus = iota
+)
+
+type ChunkStatus int
+
+const (
+ ASSIGNED ChunkStatus = iota
+ STORED
+)
+
+type chunkReplica struct {
+ chunkServer string
+ status ChunkStatus
+}
+
+type chunk struct {
+ id string
+ replica []chunkReplica
+}
+
+type blob struct {
+ id string
+ status BlobStatus
+ chunks []chunk
+}
+
+type MasterServer struct {
+ chunkServers []*chunkServer
+ blobs []*blob
+}
+
+func NewMasterServer() *MasterServer {
+ return &MasterServer{}
+}
+
+func (s *MasterServer) AddChunkServer(
+ ctx context.Context,
+ req *api.AddChunkServerRequest) (*api.AddChunkServerResponse, error) {
+ s.chunkServers = append(s.chunkServers, &chunkServer{
+ address: req.Address,
+ status: Healthy})
+ log.Printf("Registered Chunk server: %s", req.Address)
+ return &api.AddChunkServerResponse{}, nil
+}
+
+func (s *MasterServer) CreateBlob(
+ ctx context.Context,
+ req *api.CreateBlobRequest) (*api.CreateBlobResponse, error) {
+ if int(req.NumReplicas) > len(s.chunkServers) {
+ return nil, nil
+ }
+ resp := api.CreateBlobResponse{
+ BlobId: uuid.New().String(),
+ Chunk: []*api.ChunkStorageMetadata{
+ {ChunkId: uuid.New().String()},
+ }}
+ assigned := 0
+ chunkId := resp.Chunk[0].ChunkId
+ for i := range rand.Perm(len(s.chunkServers)) {
+ if assigned == int(req.NumReplicas) {
+ break
+ }
+ address := s.chunkServers[i].address
+ log.Printf(address)
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(address, opts...)
+ if err != nil {
+ continue
+ }
+ defer conn.Close()
+ client := api.NewChunkStorageClient(conn)
+ createChunkReq := api.CreateChunkRequest{
+ ChunkId: chunkId,
+ Size: req.SizeBytes}
+ if assigned == 0 {
+ createChunkReq.Role = api.ReplicaRole_PRIMARY
+ } else {
+ createChunkReq.Role = api.ReplicaRole_SECONDARY
+ createChunkReq.PrimaryAddress = resp.Chunk[0].Server[0]
+ }
+ _, err = client.CreateChunk(ctx, &createChunkReq)
+ if err == nil {
+ resp.Chunk[0].Server = append(resp.Chunk[0].Server, address)
+ assigned++
+ }
+ }
+ return &resp, nil
+}
+
+func (s *MasterServer) GetBlobMetadata(
+ ctx context.Context,
+ req *api.GetBlobMetadataRequest) (*api.GetBlobMetadataResponse, error) {
+ return nil, nil
+}
diff --git a/archive/pfs/controller_server.go b/archive/pfs/controller_server.go
new file mode 100644
index 0000000..c905ba3
--- /dev/null
+++ b/archive/pfs/controller_server.go
@@ -0,0 +1,29 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "net"
+
+ "google.golang.org/grpc"
+
+ "github.com/giolekva/pcloud/pfs/api"
+ "github.com/giolekva/pcloud/pfs/controller"
+)
+
+var port = flag.Int("port", 123, "Port to listen on.")
+
+func main() {
+ flag.Parse()
+ log.Print("Master server starting")
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
+ if err != nil {
+ log.Fatalf("Failed to listen on port %d: %v", *port, err)
+ }
+ log.Printf("Listening on port: %d", *port)
+ server := grpc.NewServer()
+ api.RegisterMetadataStorageServer(server, controller.NewMasterServer())
+ log.Print("Master serving")
+ server.Serve(lis)
+}
diff --git a/archive/pfs/pfs.yamls b/archive/pfs/pfs.yamls
new file mode 100644
index 0000000..ba06bcb
--- /dev/null
+++ b/archive/pfs/pfs.yamls
@@ -0,0 +1,78 @@
+---
+kind: Service
+apiVersion: v1
+metadata:
+ name: pfs-controller-service
+spec:
+ type: ClusterIP
+ selector:
+ app: pfs-controller
+ ports:
+ - nodePort:
+ port: 111
+ targetPort: 123
+---
+kind: Deployment
+apiVersion: apps/v1
+metadata:
+ name: pfs-controller
+spec:
+ selector:
+ matchLabels:
+ app: pfs-controller
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ app: pfs-controller
+ spec:
+ containers:
+ - name: pfs-controller
+ image: pcloud:latest
+ imagePullPolicy: Never
+ ports:
+ - containerPort: 123
+ volumeMounts:
+ - name: code
+ mountPath: /src/go/src/github.com/giolekva/pcloud/pfs
+ command: ["/bin/sh"]
+ args: ["-c", "protoc api/api.proto --go_out=plugins=grpc:. && go run controller_server.go --port=123"]
+ volumes:
+ - name: code
+ hostPath:
+ path: "/Users/lekva/dev/go/src/github.com/giolekva/pcloud/pfs"
+---
+kind: Deployment
+apiVersion: apps/v1
+metadata:
+ name: pfs-chunk
+spec:
+ selector:
+ matchLabels:
+ app: pfs-chunk
+ replicas: 3
+ template:
+ metadata:
+ labels:
+ app: pfs-chunk
+ spec:
+ containers:
+ - name: pfs-chunk
+ image: pcloud:latest
+ imagePullPolicy: Never
+ ports:
+ - containerPort: 234
+ env:
+ - name: SELF_IP
+ valueFrom:
+ fieldRef:
+ fieldPath: status.podIP
+ volumeMounts:
+ - name: code
+ mountPath: /src/go/src/github.com/giolekva/pcloud/pfs
+ command: ["/bin/sh"]
+ args: ["-c", "protoc api/api.proto --go_out=plugins=grpc:. && go run chunk_server.go --controller=pfs-controller-service:111 --self=$(SELF_IP):234"]
+ volumes:
+ - name: code
+ hostPath:
+ path: "/Users/lekva/dev/go/src/github.com/giolekva/pcloud/pfs"
diff --git a/archive/pfs/testing/in_memory_env.go b/archive/pfs/testing/in_memory_env.go
new file mode 100644
index 0000000..c3410be
--- /dev/null
+++ b/archive/pfs/testing/in_memory_env.go
@@ -0,0 +1,84 @@
+package testing
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "syscall"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/giolekva/pcloud/pfs/api"
+ "github.com/giolekva/pcloud/pfs/chunk"
+ "github.com/giolekva/pcloud/pfs/controller"
+)
+
+type InMemoryEnv struct {
+ m *grpc.Server
+ c []*grpc.Server
+ controllerConn *grpc.ClientConn
+}
+
+func NewInMemoryEnv(numChunkServers int) (*InMemoryEnv, error) {
+ env := new(InMemoryEnv)
+ syscall.Unlink("/tmp/pcloud/controller")
+ lis, err := net.Listen("unix", "/tmp/pcloud/controller")
+ if err != nil {
+ return nil, err
+ }
+ server := grpc.NewServer()
+ api.RegisterMetadataStorageServer(server, controller.NewMasterServer())
+ go server.Serve(lis)
+
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial("unix:/tmp/pcloud/controller", opts...)
+ if err != nil {
+ return nil, err
+ }
+ env.controllerConn = conn
+ client := api.NewMetadataStorageClient(conn)
+
+ env.c = make([]*grpc.Server, numChunkServers)
+ for i := 0; i < numChunkServers; i++ {
+ unixSocket := fmt.Sprintf("/tmp/pcloud/chunk-%d", i)
+ syscall.Unlink(unixSocket)
+ lis, err := net.Listen("unix", unixSocket)
+ if err != nil {
+ return nil, err
+ }
+ server := grpc.NewServer()
+ api.RegisterChunkStorageServer(server, chunk.NewChunkServer(
+ &chunk.InMemoryChunkFactory{},
+ &chunk.NonChangingReplicaAssignment{}))
+ go server.Serve(lis)
+ env.c[i] = server
+ }
+
+ for i := 0; i < numChunkServers; i++ {
+ ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+ _, err = client.AddChunkServer(
+ ctx,
+ &api.AddChunkServerRequest{Address: fmt.Sprintf("unix:///tmp/pcloud/chunk-%d", i)})
+ if err != nil {
+ return nil, err
+ }
+ }
+ return env, nil
+}
+
+func (e *InMemoryEnv) Stop() {
+ if e.controllerConn != nil {
+ e.controllerConn.Close()
+ }
+ for _, s := range e.c {
+ if s != nil {
+ s.GracefulStop()
+ }
+ }
+ if e.m != nil {
+ e.m.GracefulStop()
+ }
+}
diff --git a/archive/pfs/testing/simple_test.go b/archive/pfs/testing/simple_test.go
new file mode 100644
index 0000000..d8e2510
--- /dev/null
+++ b/archive/pfs/testing/simple_test.go
@@ -0,0 +1,13 @@
+package testing
+
+import (
+ "testing"
+)
+
+func TestSetup(t *testing.T) {
+ env, err := NewInMemoryEnv(3)
+ if err != nil {
+ t.Error(err)
+ }
+ defer env.Stop()
+}
diff --git a/scripts/goofys.sh b/scripts/goofys.sh
new file mode 100644
index 0000000..598d090
--- /dev/null
+++ b/scripts/goofys.sh
@@ -0,0 +1 @@
+goofys -f --debug_fuse --debug_s3 --profile mio --endpoint http://localhost:9000 my-test tmp/mio-test