cleanup
diff --git "a/argo/\043argo-events-crds-install.yaml\043" "b/argo/\043argo-events-crds-install.yaml\043"
deleted file mode 100644
index 92b812a..0000000
--- "a/argo/\043argo-events-crds-install.yaml\043"
+++ /dev/null
@@ -1,221 +0,0 @@
-oapiVersion: apiextensions.k8s.io/v1beta1
-kind: CustomResourceDefinition
-metadata:
- name: gateways.argoproj.io
-spec:
- group: argoproj.io
- names:
- kind: Gateway
- listKind: GatewayList
- plural: gateways
- singular: gateway
- shortNames:
- - gw
- scope: Namespaced
- version: "v1alpha1"
----
-apiVersion: apiextensions.k8s.io/v1beta1
-kind: CustomResourceDefinition
-metadata:
- name: sensors.argoproj.io
-spec:
- group: argoproj.io
- names:
- kind: Sensor
- listKind: SensorList
- plural: sensors
- singular: sensor
- shortNames:
- - sn
- scope: Namespaced
- version: "v1alpha1"
----
-apiVersion: apiextensions.k8s.io/v1beta1
-kind: CustomResourceDefinition
-metadata:
- name: eventsources.argoproj.io
-spec:
- group: argoproj.io
- scope: Namespaced
- names:
- kind: EventSource
- plural: eventsources
- singular: eventsource
- listKind: EventSourceList
- shortNames:
- - es
- version: "v1alpha1"
----
-# apiVersion: v1
-# kind: Namespace
-# metadata:
-# name: argo-events
----
-apiVersion: v1
-kind: ServiceAccount
-metadata:
- name: argo-events-sa
- namespace: kube-system
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: RoleBinding
-metadata:
- name: argo-events-role-binding
-roleRef:
- apiGroup: rbac.authorization.k8s.io
- kind: Role
- name: argo-events-role
-subjects:
- - kind: ServiceAccount
- name: argo-events-sa
- namespace: kube-system
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: Role
-metadata:
- name: argo-events-role
-rules:
- - apiGroups:
- - argoproj.io
- verbs:
- - create
- - delete
- - deletecollection
- - get
- - list
- - patch
- - update
- - watch
- resources:
- - workflows
- - workflows/finalizers
- - workflowtemplates
- - workflowtemplates/finalizers
- - gateways
- - gateways/finalizers
- - sensors
- - sensors/finalizers
- - eventsources
- - eventsources/finalizers
- - apiGroups:
- - ""
- resources:
- - pods
- - pods/exec
- - configmaps
- - secrets
- - services
- - events
- - persistentvolumeclaims
- verbs:
- - create
- - get
- - list
- - watch
- - update
- - patch
- - delete
- - apiGroups:
- - "batch"
- resources:
- - jobs
- verbs:
- - create
- - get
- - list
- - watch
- - update
- - patch
- - delete
- - apiGroups:
- - "apps"
- resources:
- - deployments
- verbs:
- - create
- - get
- - list
- - watch
- - update
- - patch
- - delete
----
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: gateway-controller-configmap
- namespace: kube-system
-data:
- config: |
- instanceID: argo-events
- namespace: kube-system
----
-# The gateway-controller listens for changes on the gateway CRD and creates gateway
-apiVersion: apps/v1
-kind: Deployment
-metadata:
- name: gateway-controller
- namespace: kube-system
-spec:
- replicas: 1
- selector:
- matchLabels:
- app: gateway-controller
- template:
- metadata:
- labels:
- app: gateway-controller
- spec:
- serviceAccountName: argo-events-sa
- containers:
- - name: gateway-controller
- image: argoproj/gateway-controller:v0.13.0
- imagePullPolicy: Always
- env:
- - name: NAMESPACE
- valueFrom:
- fieldRef:
- fieldPath: metadata.namespace
- - name: CONTROLLER_CONFIG_MAP
- value: gateway-controller-configmap
----
-# The sensor-controller configmap includes configuration information for the sensor-controller
-# To watch sensors created in different namespace than the controller is deployed in, remove the namespace: kube-system.
-# Similarly to watch sensors created in specific namespace, change to namespace: <your_namespace>
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: sensor-controller-configmap
- namespace: kube-system
-data:
- config: |
- instanceID: argo-events
- namespace: kube-system
----
-apiVersion: apps/v1
-kind: Deployment
-metadata:
- name: sensor-controller
- namespace: kube-system
-spec:
- replicas: 1
- selector:
- matchLabels:
- app: sensor-controller
- template:
- metadata:
- labels:
- app: sensor-controller
- spec:
- serviceAccountName: argo-events-sa
- containers:
- - name: sensor-controller
- image: argoproj/sensor-controller:v0.13.0
- imagePullPolicy: Always
- env:
- - name: NAMESPACE
- valueFrom:
- fieldRef:
- fieldPath: metadata.namespace
- - name: CONTROLLER_CONFIG_MAP
- value: sensor-controller-configmap
diff --git a/argo/argo-events-crds-install.yaml b/argo/argo-events-crds-install.yaml
deleted file mode 100644
index c746479..0000000
--- a/argo/argo-events-crds-install.yaml
+++ /dev/null
@@ -1,221 +0,0 @@
-apiVersion: apiextensions.k8s.io/v1beta1
-kind: CustomResourceDefinition
-metadata:
- name: gateways.argoproj.io
-spec:
- group: argoproj.io
- names:
- kind: Gateway
- listKind: GatewayList
- plural: gateways
- singular: gateway
- shortNames:
- - gw
- scope: Namespaced
- version: "v1alpha1"
----
-apiVersion: apiextensions.k8s.io/v1beta1
-kind: CustomResourceDefinition
-metadata:
- name: sensors.argoproj.io
-spec:
- group: argoproj.io
- names:
- kind: Sensor
- listKind: SensorList
- plural: sensors
- singular: sensor
- shortNames:
- - sn
- scope: Namespaced
- version: "v1alpha1"
----
-apiVersion: apiextensions.k8s.io/v1beta1
-kind: CustomResourceDefinition
-metadata:
- name: eventsources.argoproj.io
-spec:
- group: argoproj.io
- scope: Namespaced
- names:
- kind: EventSource
- plural: eventsources
- singular: eventsource
- listKind: EventSourceList
- shortNames:
- - es
- version: "v1alpha1"
----
-# apiVersion: v1
-# kind: Namespace
-# metadata:
-# name: argo-events
----
-apiVersion: v1
-kind: ServiceAccount
-metadata:
- name: argo-events-sa
- namespace: kube-system
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: RoleBinding
-metadata:
- name: argo-events-role-binding
-roleRef:
- apiGroup: rbac.authorization.k8s.io
- kind: Role
- name: argo-events-role
-subjects:
- - kind: ServiceAccount
- name: argo-events-sa
- namespace: kube-system
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: Role
-metadata:
- name: argo-events-role
-rules:
- - apiGroups:
- - argoproj.io
- verbs:
- - create
- - delete
- - deletecollection
- - get
- - list
- - patch
- - update
- - watch
- resources:
- - workflows
- - workflows/finalizers
- - workflowtemplates
- - workflowtemplates/finalizers
- - gateways
- - gateways/finalizers
- - sensors
- - sensors/finalizers
- - eventsources
- - eventsources/finalizers
- - apiGroups:
- - ""
- resources:
- - pods
- - pods/exec
- - configmaps
- - secrets
- - services
- - events
- - persistentvolumeclaims
- verbs:
- - create
- - get
- - list
- - watch
- - update
- - patch
- - delete
- - apiGroups:
- - "batch"
- resources:
- - jobs
- verbs:
- - create
- - get
- - list
- - watch
- - update
- - patch
- - delete
- - apiGroups:
- - "apps"
- resources:
- - deployments
- verbs:
- - create
- - get
- - list
- - watch
- - update
- - patch
- - delete
----
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: gateway-controller-configmap
- namespace: kube-system
-data:
- config: |
- instanceID: argo-events
- namespace: kube-system
----
-# The gateway-controller listens for changes on the gateway CRD and creates gateway
-apiVersion: apps/v1
-kind: Deployment
-metadata:
- name: gateway-controller
- namespace: kube-system
-spec:
- replicas: 1
- selector:
- matchLabels:
- app: gateway-controller
- template:
- metadata:
- labels:
- app: gateway-controller
- spec:
- serviceAccountName: argo-events-sa
- containers:
- - name: gateway-controller
- image: argoproj/gateway-controller:v0.13.0
- imagePullPolicy: Always
- env:
- - name: NAMESPACE
- valueFrom:
- fieldRef:
- fieldPath: metadata.namespace
- - name: CONTROLLER_CONFIG_MAP
- value: gateway-controller-configmap
----
-# The sensor-controller configmap includes configuration information for the sensor-controller
-# To watch sensors created in different namespace than the controller is deployed in, remove the namespace: kube-system.
-# Similarly to watch sensors created in specific namespace, change to namespace: <your_namespace>
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: sensor-controller-configmap
- namespace: kube-system
-data:
- config: |
- instanceID: argo-events
- namespace: kube-system
----
-apiVersion: apps/v1
-kind: Deployment
-metadata:
- name: sensor-controller
- namespace: kube-system
-spec:
- replicas: 1
- selector:
- matchLabels:
- app: sensor-controller
- template:
- metadata:
- labels:
- app: sensor-controller
- spec:
- serviceAccountName: argo-events-sa
- containers:
- - name: sensor-controller
- image: argoproj/sensor-controller:v0.13.0
- imagePullPolicy: Always
- env:
- - name: NAMESPACE
- valueFrom:
- fieldRef:
- fieldPath: metadata.namespace
- - name: CONTROLLER_CONFIG_MAP
- value: sensor-controller-configmap
diff --git a/argo/event-source.yaml b/argo/event-source.yaml
deleted file mode 100644
index 23ec5ed..0000000
--- a/argo/event-source.yaml
+++ /dev/null
@@ -1,48 +0,0 @@
-apiVersion: argoproj.io/v1alpha1
-kind: EventSource
-metadata:
- name: nats-event-source
-spec:
- type: nats
- nats:
- example:
- # url of the nats service
- url: nats://nats.svc:4222
- # jsonBody specifies that all event body payload coming from this
- # source will be JSON
- jsonBody: true
- # subject name
- subject: input-objects
- # optional backoff time for connection retries.
- # if not provided, default connection backoff time will be used.
- connectionBackoff:
- # duration in nanoseconds. following value is 10 seconds
- duration: 10000000000
- # how many backoffs
- steps: 5
- # factor to increase on each step.
- # setting factor > 1 makes backoff exponential.
- factor: 2
- jitter: 0.2
-
-# apiVersion: argoproj.io/v1alpha1
-# kind: EventSource
-# metadata:
-# name: minio-event-source
-# spec:
-# type: "minio"
-# minio:
-# example:
-# bucket:
-# name: input
-# endpoint: mio-minio.default.svc:9000
-# events:
-# - s3:ObjectCreated:Put
-# - s3:ObjectRemoved:Delete
-# insecure: true
-# accessKey:
-# key: accesskey
-# name: artifacts-minio
-# secretKey:
-# key: secretkey
-# name: artifacts-minio
\ No newline at end of file
diff --git a/argo/gateway.yaml b/argo/gateway.yaml
deleted file mode 100644
index b98baf4..0000000
--- a/argo/gateway.yaml
+++ /dev/null
@@ -1,61 +0,0 @@
-apiVersion: argoproj.io/v1alpha1
-kind: Gateway
-metadata:
- name: nats-gateway
- labels:
- # gateway controller with instanceId "argo-events" will process this gateway
- gateways.argoproj.io/gateway-controller-instanceid: argo-events
-spec:
- type: nats
- eventSourceRef:
- name: nats-event-source
- template:
- metadata:
- name: nats-gateway
- labels:
- gateway-name: nats-gateway
- spec:
- containers:
- - name: gateway-client
- image: argoproj/gateway-client:v0.14.0
- imagePullPolicy: IfNotPresent
- command: ["/bin/gateway-client"]
- - name: nats-events
- image: argoproj/nats-gateway:v0.14.0
- imagePullPolicy: IfNotPresent
- command: ["/bin/nats-gateway"]
- serviceAccountName: argo-events-sa
- subscribers:
- http:
- - "http://nats-sensor.svc:9300/"
-
-
-# apiVersion: argoproj.io/v1alpha1
-# kind: Gateway
-# metadata:
-# name: minio-gateway
-# labels:
-# # gateway controller with instanceId "argo-events" will process this gateway
-# gateways.argoproj.io/gateway-controller-instanceid: argo-events
-# spec:
-# type: minio
-# eventSourceRef:
-# name: minio-event-source
-# template:
-# metadata:
-# name: minio-gateway
-# labels:
-# gateway-name: minio-gateway
-# spec:
-# containers:
-# - name: gateway-client
-# image: argoproj/gateway-client:v0.13.0
-# imagePullPolicy: Always
-# command: ["/bin/gateway-client"]
-# - name: minio-events
-# image: argoproj/artifact-gateway:v0.13.0
-# imagePullPolicy: Always
-# serviceAccountName: argo-events-sa
-# subscribers:
-# http:
-# - "http://minio-sensor.kube-system.svc:9300/"
diff --git a/argo/install.yaml b/argo/install.yaml
deleted file mode 100644
index c800f6a..0000000
--- a/argo/install.yaml
+++ /dev/null
@@ -1,408 +0,0 @@
-# This is an auto-generated file. DO NOT EDIT
-apiVersion: apiextensions.k8s.io/v1beta1
-kind: CustomResourceDefinition
-metadata:
- name: cronworkflows.argoproj.io
-spec:
- group: argoproj.io
- names:
- kind: CronWorkflow
- plural: cronworkflows
- shortNames:
- - cronwf
- - cwf
- scope: Namespaced
- version: v1alpha1
----
-apiVersion: apiextensions.k8s.io/v1beta1
-kind: CustomResourceDefinition
-metadata:
- name: workflows.argoproj.io
-spec:
- additionalPrinterColumns:
- - JSONPath: .status.phase
- description: Status of the workflow
- name: Status
- type: string
- - JSONPath: .status.startedAt
- description: When the workflow was started
- format: date-time
- name: Age
- type: date
- group: argoproj.io
- names:
- kind: Workflow
- plural: workflows
- shortNames:
- - wf
- scope: Namespaced
- version: v1alpha1
----
-apiVersion: apiextensions.k8s.io/v1beta1
-kind: CustomResourceDefinition
-metadata:
- name: workflowtemplates.argoproj.io
-spec:
- group: argoproj.io
- names:
- kind: WorkflowTemplate
- plural: workflowtemplates
- shortNames:
- - wftmpl
- scope: Namespaced
- version: v1alpha1
-# ---
-# apiVersion: v1
-# kind: ServiceAccount
-# metadata:
-# name: argo
-# ---
-# apiVersion: v1
-# kind: ServiceAccount
-# metadata:
-# name: argo-server
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: Role
-metadata:
- name: argo-role
-rules:
-- apiGroups:
- - ""
- resources:
- - secrets
- verbs:
- - get
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
-metadata:
- labels:
- rbac.authorization.k8s.io/aggregate-to-admin: "true"
- name: argo-aggregate-to-admin
-rules:
-- apiGroups:
- - argoproj.io
- resources:
- - workflows
- - workflows/finalizers
- - workflowtemplates
- - workflowtemplates/finalizers
- - cronworkflows
- - cronworkflows/finalizers
- verbs:
- - create
- - delete
- - deletecollection
- - get
- - list
- - patch
- - update
- - watch
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
-metadata:
- labels:
- rbac.authorization.k8s.io/aggregate-to-edit: "true"
- name: argo-aggregate-to-edit
-rules:
-- apiGroups:
- - argoproj.io
- resources:
- - workflows
- - workflows/finalizers
- - workflowtemplates
- - workflowtemplates/finalizers
- - cronworkflows
- - cronworkflows/finalizers
- verbs:
- - create
- - delete
- - deletecollection
- - get
- - list
- - patch
- - update
- - watch
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
-metadata:
- labels:
- rbac.authorization.k8s.io/aggregate-to-view: "true"
- name: argo-aggregate-to-view
-rules:
-- apiGroups:
- - argoproj.io
- resources:
- - workflows
- - workflows/finalizers
- - workflowtemplates
- - workflowtemplates/finalizers
- - cronworkflows
- - cronworkflows/finalizers
- verbs:
- - get
- - list
- - watch
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
-metadata:
- name: argo-cluster-role
-rules:
-- apiGroups:
- - ""
- resources:
- - pods
- - pods/exec
- verbs:
- - create
- - get
- - list
- - watch
- - update
- - patch
- - delete
-- apiGroups:
- - ""
- resources:
- - configmaps
- verbs:
- - get
- - watch
- - list
-- apiGroups:
- - ""
- resources:
- - persistentvolumeclaims
- verbs:
- - create
- - delete
-- apiGroups:
- - argoproj.io
- resources:
- - workflows
- - workflows/finalizers
- verbs:
- - get
- - list
- - watch
- - update
- - patch
- - delete
- - create
-- apiGroups:
- - argoproj.io
- resources:
- - workflowtemplates
- - workflowtemplates/finalizers
- verbs:
- - get
- - list
- - watch
-- apiGroups:
- - ""
- resources:
- - serviceaccounts
- verbs:
- - get
- - list
-- apiGroups:
- - argoproj.io
- resources:
- - cronworkflows
- - cronworkflows/finalizers
- verbs:
- - get
- - list
- - watch
- - update
- - patch
- - delete
-- apiGroups:
- - ""
- resources:
- - events
- verbs:
- - create
-- apiGroups:
- - policy
- resources:
- - poddisruptionbudgets
- verbs:
- - create
- - get
- - delete
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
-metadata:
- name: argo-server-cluster-role
-rules:
-- apiGroups:
- - ""
- resources:
- - configmaps
- verbs:
- - get
- - watch
- - list
-- apiGroups:
- - ""
- resources:
- - secrets
- verbs:
- - get
-- apiGroups:
- - ""
- resources:
- - pods
- - pods/exec
- - pods/log
- verbs:
- - get
- - list
- - watch
- - delete
-- apiGroups:
- - ""
- resources:
- - secrets
- verbs:
- - get
-- apiGroups:
- - argoproj.io
- resources:
- - workflows
- - workflowtemplates
- - cronworkflows
- verbs:
- - create
- - get
- - list
- - watch
- - update
- - patch
- - delete
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: RoleBinding
-metadata:
- name: argo-binding
-roleRef:
- apiGroup: rbac.authorization.k8s.io
- kind: Role
- name: argo-role
-subjects:
-- kind: ServiceAccount
- name: default
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRoleBinding
-metadata:
- name: argo-binding
-roleRef:
- apiGroup: rbac.authorization.k8s.io
- kind: ClusterRole
- name: argo-cluster-role
-subjects:
-- kind: ServiceAccount
- name: default
- namespace: pcloud
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRoleBinding
-metadata:
- name: argo-server-binding
-roleRef:
- apiGroup: rbac.authorization.k8s.io
- kind: ClusterRole
- name: argo-server-cluster-role
-subjects:
-- kind: ServiceAccount
- name: default
- namespace: pcloud
----
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: workflow-controller-configmap
----
-apiVersion: v1
-kind: Service
-metadata:
- name: argo-server
-spec:
- ports:
- - port: 2746
- targetPort: 2746
- selector:
- app: argo-server
----
-apiVersion: v1
-kind: Service
-metadata:
- name: workflow-controller-metrics
-spec:
- ports:
- - port: 9090
- protocol: TCP
- targetPort: 9090
- selector:
- app: workflow-controller
----
-apiVersion: apps/v1
-kind: Deployment
-metadata:
- name: argo-server
-spec:
- selector:
- matchLabels:
- app: argo-server
- template:
- metadata:
- labels:
- app: argo-server
- spec:
- containers:
- - args:
- - server
- image: argoproj/argocli:v2.7.4
- name: argo-server
- ports:
- - containerPort: 2746
- readinessProbe:
- httpGet:
- path: /
- port: 2746
- scheme: HTTP
- initialDelaySeconds: 10
- periodSeconds: 20
- serviceAccountName: default
----
-apiVersion: apps/v1
-kind: Deployment
-metadata:
- name: workflow-controller
-spec:
- selector:
- matchLabels:
- app: workflow-controller
- template:
- metadata:
- labels:
- app: workflow-controller
- spec:
- containers:
- - args:
- - --configmap
- - workflow-controller-configmap
- - --executor-image
- - argoproj/argoexec:v2.7.4
- command:
- - workflow-controller
- image: argoproj/workflow-controller:v2.7.4
- name: workflow-controller
- serviceAccountName: default
\ No newline at end of file
diff --git a/argo/mio-minio-secrets.yaml b/argo/mio-minio-secrets.yaml
deleted file mode 100644
index c48d5ef..0000000
--- a/argo/mio-minio-secrets.yaml
+++ /dev/null
@@ -1,9 +0,0 @@
-apiVersion: v1
-kind: Secret
-metadata:
- name: minio-credentials
-data:
- # base64 of minio
- accesskey: bWluaW8K
- # base64 of minio123
- secretkey: bWluaW8xMjMK
diff --git a/argo/sensor.yaml b/argo/sensor.yaml
deleted file mode 100644
index 6918bd5..0000000
--- a/argo/sensor.yaml
+++ /dev/null
@@ -1,138 +0,0 @@
-apiVersion: argoproj.io/v1alpha1
-kind: Sensor
-metadata:
- name: nats-sensor
- labels:
- sensors.argoproj.io/sensor-controller-instanceid: argo-events
-spec:
- template:
- spec:
- containers:
- - name: sensor
- image: argoproj/sensor:v0.14.0
- imagePullPolicy: IfNotPresent
- serviceAccountName: argo-events-sa
- subscription:
- http:
- port: 9300
- dependencies:
- - name: test-dep
- gatewayName: nats-gateway
- eventName: example
- triggers:
- - template:
- name: nats-workflow-trigger
- k8s:
- group: argoproj.io
- version: v1alpha1
- resource: workflows
- operation: create
- source:
- resource:
- apiVersion: argoproj.io/v1alpha1
- kind: Workflow
- metadata:
- generateName: nats-workflow-
- spec:
- entrypoint: whalesay
- arguments:
- parameters:
- - name: message
- value: WILL_BE_REPLACED
- templates:
- - name: whalesay
- inputs:
- parameters:
- - name: message
- container:
- image: docker/whalesay:latest
- imagePyllPolicy: IfNotPresent
- command: [cowsay]
- args: ["{{inputs.parameters.message}}"]
- parameters:
- - src:
- dependencyName: test-dep
- dest: spec.arguments.parameters.0.value
-
-
-# apiVersion: argoproj.io/v1alpha1
-# kind: Sensor
-# metadata:
-# name: minio-sensor
-# labels:
-# # sensor controller with instanceId "argo-events" will process this sensor
-# sensors.argoproj.io/sensor-controller-instanceid: argo-events
-# spec:
-# template:
-# spec:
-# containers:
-# - name: sensor
-# image: argoproj/sensor:v0.13.0
-# imagePullPolicy: Always
-# serviceAccountName: argo-events-sa
-# subscription:
-# http:
-# port: 9300
-# dependencies:
-# - name: test-dep
-# gatewayName: minio-gateway
-# eventName: example
-# triggers:
-# - template:
-# name: minio-workflow-trigger
-# k8s:
-# group: argoproj.io
-# version: v1alpha1
-# resource: workflows
-# operation: create
-# source:
-# resource:
-# apiVersion: argoproj.io/v1alpha1
-# kind: Workflow
-# metadata:
-# generateName: artifact-workflow-2-
-# spec:
-# entrypoint: detect
-# templates:
-# - name: detect
-# inputs:
-# artifacts:
-# - name: input-image
-# path: /input
-# s3:
-# # endpoint: mio-minio.default.svc:9000
-# # bucket: input # change
-# key: harry.jpg
-# # insecure: true
-# # accessKeySecret:
-# # key: accessKey
-# # name: artifacts-minio
-# # secretKeySecret:
-# # key: secretKey
-# # name: artifacts-minio
-# # useSDKCreds: true
-# # outputs:
-# # artifacts:
-# # - name: output-image
-# # path: /output
-# # s3:
-# # endpoint: mio-minio.default.svc:9000
-# # bucket: output # change
-# # key: PARAMETER
-# # insecure: true
-# # accessKeySecret:
-# # key: accessKey
-# # name: artifacts-minio
-# # secretKeySecret:
-# # key: secretKey
-# # name: artifacts-minio
-# # useSDKCreds: true
-# container:
-# image: face:latest
-# command: [python face.py]
-# args: ["/input", "/output"]
-# # parameters:
-# # - src:
-# # dependencyName: test-dep
-# # dataKey: notification.0.s3.object.key
-# # dest: spec.templates.0.inputs.artifacts.0.s3.key
diff --git a/argo/setup.sh b/argo/setup.sh
deleted file mode 100644
index 7d6cc53..0000000
--- a/argo/setup.sh
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/bin/sh
-
-# kubectl create namespace argo
-kubectl apply -n pcloud -f install.yaml
-
-# kubectl apply -n kube-system -f mio-minio-secrets.yaml
-
-
-# helm repo add argo https://argoproj.github.io/argo-helm
-# helm install my-argo --namespace kube-system argo/argo
-# read -s
-# kubectl -n kube-system port-forward deployment/my-argo-server 2746 &
-# read -s
-
-#kubectl apply -n kube-system -f argo-events-crds-install.yaml
-#read -s
-
-
-#kubectl apply -n kube-system -f event-source.yaml
-#kubectl apply -n kube-system -f gateway.yaml
-#kubectl apply -n kube-system -f sensor.yaml
diff --git a/controller/tests/query_test.go b/controller/tests/query_test.go
new file mode 100644
index 0000000..b4f9898
--- /dev/null
+++ b/controller/tests/query_test.go
@@ -0,0 +1,33 @@
+package tests
+
+import (
+ "testing"
+
+ "github.com/vektah/gqlparser"
+ "github.com/vektah/gqlparser/ast"
+)
+
+var gqlSchema = `#######################\n# Input Schema\n#######################\n\ntype Image {\n\tid: ID!\n\tobjectPath: String! @search(by: [exact])\n\tsegments(filter: ImageSegmentFilter, order: ImageSegmentOrder, first: Int, offset: Int): [ImageSegment] @hasInverse(field: sourceImage)\n}\n\ntype ImageSegment {\n\tid: ID!\n\tupperLeftX: Float!\n\tupperLeftY: Float!\n\tlowerRightX: Float!\n\tlowerRightY: Float!\n\tsourceImage(filter: ImageFilter): Image! @hasInverse(field: segments)\n\tobjectPath: String\n}\n\n#######################\n# Extended Definitions\n#######################\n\nscalar DateTime\n\nenum DgraphIndex {\n\tint\n\tfloat\n\tbool\n\thash\n\texact\n\tterm\n\tfulltext\n\ttrigram\n\tregexp\n\tyear\n\tmonth\n\tday\n\thour\n}\n\ndirective @hasInverse(field: String!) on FIELD_DEFINITION\ndirective @search(by: [DgraphIndex!]) on FIELD_DEFINITION\ndirective @dgraph(type: String, pred: String) on OBJECT | INTERFACE | FIELD_DEFINITION\ndirective @id on FIELD_DEFINITION\ndirective @secret(field: String!, pred: String) on OBJECT | INTERFACE\n\ninput IntFilter {\n\teq: Int\n\tle: Int\n\tlt: Int\n\tge: Int\n\tgt: Int\n}\n\ninput FloatFilter {\n\teq: Float\n\tle: Float\n\tlt: Float\n\tge: Float\n\tgt: Float\n}\n\ninput DateTimeFilter {\n\teq: DateTime\n\tle: DateTime\n\tlt: DateTime\n\tge: DateTime\n\tgt: DateTime\n}\n\ninput StringTermFilter {\n\tallofterms: String\n\tanyofterms: String\n}\n\ninput StringRegExpFilter {\n\tregexp: String\n}\n\ninput StringFullTextFilter {\n\talloftext: String\n\tanyoftext: String\n}\n\ninput StringExactFilter {\n\teq: String\n\tle: String\n\tlt: String\n\tge: String\n\tgt: String\n}\n\ninput StringHashFilter {\n\teq: String\n}\n\n#######################\n# Generated Types\n#######################\n\ntype AddImagePayload {\n\timage(filter: ImageFilter, order: ImageOrder, first: Int, offset: Int): [Image]\n\tnumUids: Int\n}\n\ntype AddImageSegmentPayload {\n\timagesegment(filter: ImageSegmentFilter, order: ImageSegmentOrder, first: Int, offset: Int): [ImageSegment]\n\tnumUids: Int\n}\n\ntype DeleteImagePayload {\n\tmsg: String\n\tnumUids: Int\n}\n\ntype DeleteImageSegmentPayload {\n\tmsg: String\n\tnumUids: Int\n}\n\ntype UpdateImagePayload {\n\timage(filter: ImageFilter, order: ImageOrder, first: Int, offset: Int): [Image]\n\tnumUids: Int\n}\n\ntype UpdateImageSegmentPayload {\n\timagesegment(filter: ImageSegmentFilter, order: ImageSegmentOrder, first: Int, offset: Int): [ImageSegment]\n\tnumUids: Int\n}\n\n#######################\n# Generated Enums\n#######################\n\nenum ImageOrderable {\n\tobjectPath\n}\n\nenum ImageSegmentOrderable {\n\tupperLeftX\n\tupperLeftY\n\tlowerRightX\n\tlowerRightY\n\tobjectPath\n}\n\n#######################\n# Generated Inputs\n#######################\n\ninput AddImageInput {\n\tobjectPath: String!\n\tsegments: [ImageSegmentRef]\n}\n\ninput AddImageSegmentInput {\n\tupperLeftX: Float!\n\tupperLeftY: Float!\n\tlowerRightX: Float!\n\tlowerRightY: Float!\n\tsourceImage: ImageRef!\n\tobjectPath: String\n}\n\ninput ImageFilter {\n\tid: [ID!]\n\tobjectPath: StringExactFilter\n\tand: ImageFilter\n\tor: ImageFilter\n\tnot: ImageFilter\n}\n\ninput ImageOrder {\n\tasc: ImageOrderable\n\tdesc: ImageOrderable\n\tthen: ImageOrder\n}\n\ninput ImagePatch {\n\tobjectPath: String\n\tsegments: [ImageSegmentRef]\n}\n\ninput ImageRef {\n\tid: ID\n\tobjectPath: String\n\tsegments: [ImageSegmentRef]\n}\n\ninput ImageSegmentFilter {\n\tid: [ID!]\n\tnot: ImageSegmentFilter\n}\n\ninput ImageSegmentOrder {\n\tasc: ImageSegmentOrderable\n\tdesc: ImageSegmentOrderable\n\tthen: ImageSegmentOrder\n}\n\ninput ImageSegmentPatch {\n\tupperLeftX: Float\n\tupperLeftY: Float\n\tlowerRightX: Float\n\tlowerRightY: Float\n\tsourceImage: ImageRef\n\tobjectPath: String\n}\n\ninput ImageSegmentRef {\n\tid: ID\n\tupperLeftX: Float\n\tupperLeftY: Float\n\tlowerRightX: Float\n\tlowerRightY: Float\n\tsourceImage: ImageRef\n\tobjectPath: String\n}\n\ninput UpdateImageInput {\n\tfilter: ImageFilter!\n\tset: ImagePatch\n\tremove: ImagePatch\n}\n\ninput UpdateImageSegmentInput {\n\tfilter: ImageSegmentFilter!\n\tset: ImageSegmentPatch\n\tremove: ImageSegmentPatch\n}\n\n#######################\n# Generated Query\n#######################\n\ntype Query {\n\tgetImage(id: ID!): Image\n\tqueryImage(filter: ImageFilter, order: ImageOrder, first: Int, offset: Int): [Image]\n\tgetImageSegment(id: ID!): ImageSegment\n\tqueryImageSegment(filter: ImageSegmentFilter, order: ImageSegmentOrder, first: Int, offset: Int): [ImageSegment]\n}\n\n#######################\n# Generated Mutations\n#######################\n\ntype Mutation {\n\taddImage(input: [AddImageInput!]!): AddImagePayload\n\tupdateImage(input: UpdateImageInput!): UpdateImagePayload\n\tdeleteImage(filter: ImageFilter!): DeleteImagePayload\n\taddImageSegment(input: [AddImageSegmentInput!]!): AddImageSegmentPayload\n\tupdateImageSegment(input: UpdateImageSegmentInput!): UpdateImageSegmentPayload\n\tdeleteImageSegment(filter: ImageSegmentFilter!): DeleteImageSegmentPayload\n}\n`
+
+func TestParseQuery(t *testing.T) {
+ schema := getSchema()
+ query, err := gqlparser.LoadQuery(schema, `{
+getImage(id: "0x2") {
+ id
+ objectPath
+}
+}`)
+ if err != nil {
+ panic(err)
+ }
+ print(ast.Dump(schema.Mutation))
+ print(ast.Dump(query))
+}
+
+func getSchema() *ast.Schema {
+ schema, err := gqlparser.LoadSchema(&ast.Source{Input: gqlSchema})
+ if err != nil {
+ panic(err)
+ }
+ return schema
+}
diff --git a/goofys.sh b/goofys.sh
deleted file mode 100644
index 598d090..0000000
--- a/goofys.sh
+++ /dev/null
@@ -1 +0,0 @@
-goofys -f --debug_fuse --debug_s3 --profile mio --endpoint http://localhost:9000 my-test tmp/mio-test
diff --git a/k8s/pfs.yamls b/k8s/pfs.yamls
deleted file mode 100644
index ba06bcb..0000000
--- a/k8s/pfs.yamls
+++ /dev/null
@@ -1,78 +0,0 @@
----
-kind: Service
-apiVersion: v1
-metadata:
- name: pfs-controller-service
-spec:
- type: ClusterIP
- selector:
- app: pfs-controller
- ports:
- - nodePort:
- port: 111
- targetPort: 123
----
-kind: Deployment
-apiVersion: apps/v1
-metadata:
- name: pfs-controller
-spec:
- selector:
- matchLabels:
- app: pfs-controller
- replicas: 1
- template:
- metadata:
- labels:
- app: pfs-controller
- spec:
- containers:
- - name: pfs-controller
- image: pcloud:latest
- imagePullPolicy: Never
- ports:
- - containerPort: 123
- volumeMounts:
- - name: code
- mountPath: /src/go/src/github.com/giolekva/pcloud/pfs
- command: ["/bin/sh"]
- args: ["-c", "protoc api/api.proto --go_out=plugins=grpc:. && go run controller_server.go --port=123"]
- volumes:
- - name: code
- hostPath:
- path: "/Users/lekva/dev/go/src/github.com/giolekva/pcloud/pfs"
----
-kind: Deployment
-apiVersion: apps/v1
-metadata:
- name: pfs-chunk
-spec:
- selector:
- matchLabels:
- app: pfs-chunk
- replicas: 3
- template:
- metadata:
- labels:
- app: pfs-chunk
- spec:
- containers:
- - name: pfs-chunk
- image: pcloud:latest
- imagePullPolicy: Never
- ports:
- - containerPort: 234
- env:
- - name: SELF_IP
- valueFrom:
- fieldRef:
- fieldPath: status.podIP
- volumeMounts:
- - name: code
- mountPath: /src/go/src/github.com/giolekva/pcloud/pfs
- command: ["/bin/sh"]
- args: ["-c", "protoc api/api.proto --go_out=plugins=grpc:. && go run chunk_server.go --controller=pfs-controller-service:111 --self=$(SELF_IP):234"]
- volumes:
- - name: code
- hostPath:
- path: "/Users/lekva/dev/go/src/github.com/giolekva/pcloud/pfs"
diff --git a/nats/deployment.yaml b/nats/deployment.yaml
deleted file mode 100644
index 66418cb..0000000
--- a/nats/deployment.yaml
+++ /dev/null
@@ -1,15 +0,0 @@
----
-apiVersion: "nats.io/v1alpha2"
-kind: "NatsCluster"
-metadata:
- name: "nats"
-spec:
- size: 1
----
-apiVersion: "streaming.nats.io/v1alpha1"
-kind: "NatsStreamingCluster"
-metadata:
- name: "nats-streaming"
-spec:
- size: 2
- natsSvc: "nats"
\ No newline at end of file
diff --git a/nats/setup.sh b/nats/setup.sh
deleted file mode 100644
index e988ea3..0000000
--- a/nats/setup.sh
+++ /dev/null
@@ -1,11 +0,0 @@
-# kubectl apply -f https://github.com/nats-io/nats-operator/releases/download/v0.5.0/00-prereqs.yaml
-# kubectl apply -f https://github.com/nats-io/nats-operator/releases/download/v0.5.0/10-deployment.yaml
-
-# # Install NATS Streaming Operator on default namespace
-# kubectl apply -f https://raw.githubusercontent.com/nats-io/nats-streaming-operator/master/deploy/default-rbac.yaml
-
-# kubectl apply -f https://raw.githubusercontent.com/nats-io/nats-streaming-operator/master/deploy/deployment.yaml
-
-# sleep 10
-
-kubectl apply -f deployment.yaml
diff --git a/pfs/Dockerfile b/pfs/Dockerfile
deleted file mode 100644
index 5984a5f..0000000
--- a/pfs/Dockerfile
+++ /dev/null
@@ -1,30 +0,0 @@
-FROM ubuntu:latest
-
-RUN apt-get update --fix-missing
-RUN apt-get -y upgrade
-RUN apt-get -y install wget git bash unzip
-
-WORKDIR /tmp
-RUN wget https://dl.google.com/go/go1.14.linux-amd64.tar.gz
-RUN tar -xvf go1.14.linux-amd64.tar.gz
-RUN mv go /usr/local
-RUN rm go1.14.linux-amd64.tar.gz
-
-ENV GOROOT=/usr/local/go
-ENV GOPATH=/src/go
-ENV GOBIN=$GOPATH/bin
-ENV PATH=$GOBIN:$GOROOT/bin:$PATH
-
-RUN go get -u google.golang.org/grpc
-
-WORKDIR /src/protoc
-RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v3.11.4/protoc-3.11.4-linux-x86_64.zip
-RUN unzip protoc-3.11.4-linux-x86_64.zip
-RUN rm protoc-3.11.4-linux-x86_64.zip
-ENV PATH=/src/protoc/bin:$PATH
-
-RUN go get -u github.com/golang/protobuf/protoc-gen-go
-RUN go get -u google.golang.org/protobuf/encoding/prototext
-RUN go get -u github.com/google/uuid
-
-WORKDIR /src/go/src/github.com/giolekva/pcloud/pfs
diff --git a/pfs/api/api.proto b/pfs/api/api.proto
deleted file mode 100644
index 56a8e9e..0000000
--- a/pfs/api/api.proto
+++ /dev/null
@@ -1,132 +0,0 @@
-syntax = "proto3";
-
-package pcloud.api;
-
-option go_package = "api";
-
-enum ChunkStatus {
- NEW = 0;
- CREATED = 1;
- WRITING = 2;
- REPLICATING = 3;
- READY = 4;
-}
-
-enum ReplicaRole {
- SECONDARY = 0;
- PRIMARY = 1;
-}
-
-// ChunkStorage
-
-service ChunkStorage {
- rpc ListChunks(ListChunksRequest) returns (ListChunksResponse) {}
-
- rpc CreateChunk(CreateChunkRequest) returns (CreateChunkResponse) {}
-
- rpc GetChunkStatus(GetChunkStatusRequest) returns (GetChunkStatusResponse) {}
-
- rpc ReadChunk(ReadChunkRequest) returns (ReadChunkResponse) {}
-
- rpc WriteChunk(WriteChunkRequest) returns (WriteChunkResponse) {}
-
- rpc RemoveChunk(RemoveChunkRequest) returns (RemoveChunkResponse) {}
-}
-
-message ListChunksRequest {
-}
-
-message ListChunksResponse {
- repeated string chunk_id = 1;
-}
-
-message CreateChunkRequest {
- string chunk_id = 1;
- int32 size = 2;
- ReplicaRole role = 3;
- string primary_address = 4;
-}
-
-message CreateChunkResponse {
-}
-
-message GetChunkStatusRequest {
- string chunk_id = 1;
-}
-
-message GetChunkStatusResponse {
- ChunkStatus status = 1;
- int32 total_bytes = 2;
- int32 committed_bytes = 3;
-}
-
-message ReadChunkRequest {
- string chunk_id = 1;
- int32 offset = 2;
- int32 num_bytes = 3;
-}
-
-message ReadChunkResponse {
- bytes data = 1;
-}
-
-message WriteChunkRequest {
- string chunk_id = 1;
- int32 offset = 2;
- bytes data = 3;
-}
-
-message WriteChunkResponse {
- int32 bytes_written = 1;
-}
-
-message RemoveChunkRequest {
- string chunk_id = 1;
-}
-
-message RemoveChunkResponse {
-}
-
-// MetadataStorage
-
-message ChunkStorageMetadata {
- string chunk_id = 1;
- int32 size_bytes = 2;
- repeated string server = 3;
-}
-
-service MetadataStorage {
- rpc AddChunkServer(AddChunkServerRequest) returns (AddChunkServerResponse) {}
-
- rpc CreateBlob(CreateBlobRequest) returns (CreateBlobResponse) {}
-
- rpc GetBlobMetadata(GetBlobMetadataRequest) returns (GetBlobMetadataResponse) {}
-}
-
-message AddChunkServerRequest {
- string address = 1;
-}
-
-message AddChunkServerResponse {
-}
-
-message CreateBlobRequest {
- int32 size_bytes = 1;
- int32 chunk_size_bytes = 2;
- int32 num_replicas = 3;
-}
-
-message CreateBlobResponse {
- string blob_id = 1;
- repeated ChunkStorageMetadata chunk = 2;
-}
-
-message GetBlobMetadataRequest {
- string blob_id = 1;
-}
-
-message GetBlobMetadataResponse {
- string blob_id = 1;
- int32 size_bytes = 2;
- repeated ChunkStorageMetadata chunk = 3;
-}
\ No newline at end of file
diff --git a/pfs/api/client.go b/pfs/api/client.go
deleted file mode 100644
index 3e05de6..0000000
--- a/pfs/api/client.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package api
-
-import (
- "google.golang.org/grpc"
-)
-
-func DialConn(address string) (*grpc.ClientConn, error) {
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- return grpc.Dial(address, opts...)
-}
diff --git a/pfs/chunk/chunk.go b/pfs/chunk/chunk.go
deleted file mode 100644
index aaa1bfb..0000000
--- a/pfs/chunk/chunk.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package chunk
-
-import (
- "io"
-
- "github.com/giolekva/pcloud/pfs/api"
-)
-
-type ChunkInfo struct {
- Status api.ChunkStatus
- Size int
- Committed int
-}
-
-type Chunk interface {
- Stats() (ChunkInfo, error)
- ReaderAt() io.ReaderAt
- WriterAt() io.WriterAt
-}
-
-type ChunkFactory interface {
- New(size int) Chunk
-}
diff --git a/pfs/chunk/file.go b/pfs/chunk/file.go
deleted file mode 100644
index 3502a50..0000000
--- a/pfs/chunk/file.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package chunk
-
-import (
- "io"
- "os"
-
- "github.com/giolekva/pcloud/pfs/api"
-)
-
-type ReadOnlyFileChunk struct {
- f *os.File
- offset int
- size int
-}
-
-func NewReadOnlyFileChunk(f *os.File, offset, size int) Chunk {
- return &ReadOnlyFileChunk{f, offset, size}
-}
-
-func (c *ReadOnlyFileChunk) Stats() (ChunkInfo, error) {
- return ChunkInfo{
- Status: api.ChunkStatus_READY,
- Size: c.size,
- Committed: c.size}, nil
-}
-
-func (c *ReadOnlyFileChunk) ReaderAt() io.ReaderAt {
- return &fileReader{c.f}
-}
-
-func (c *ReadOnlyFileChunk) WriterAt() io.WriterAt {
- return &fileWriter{c.f}
-}
-
-type fileReader struct {
- f *os.File
-}
-
-func (f *fileReader) ReadAt(b []byte, offset int64) (int, error) {
- return f.f.ReadAt(b, offset)
-}
-
-type fileWriter struct {
- f *os.File
-}
-
-func (f *fileWriter) WriteAt(b []byte, offset int64) (int, error) {
- return f.f.WriteAt(b, offset)
-}
diff --git a/pfs/chunk/in_memory.go b/pfs/chunk/in_memory.go
deleted file mode 100644
index b9b55ec..0000000
--- a/pfs/chunk/in_memory.go
+++ /dev/null
@@ -1,68 +0,0 @@
-package chunk
-
-import (
- "bytes"
- "errors"
- "io"
-
- "github.com/giolekva/pcloud/pfs/api"
-)
-
-type InMemoryChunk struct {
- status api.ChunkStatus
- payload []byte
- committed int
-}
-
-func (c *InMemoryChunk) Stats() (ChunkInfo, error) {
- return ChunkInfo{c.status, len(c.payload), c.committed}, nil
-}
-
-func (c *InMemoryChunk) ReaderAt() io.ReaderAt {
- return bytes.NewReader(c.payload[:c.committed])
-}
-
-func (c *InMemoryChunk) WriterAt() io.WriterAt {
- return &byteWriter{c}
-}
-
-type byteWriter struct {
- c *InMemoryChunk
-}
-
-func (w *byteWriter) WriteAt(p []byte, offset int64) (n int, err error) {
- if int(offset) > w.c.committed {
- panic(1)
- return 0, errors.New("Gaps are not allowed when writing in chunks")
- }
- if int(offset) < w.c.committed {
- if int(offset)+len(p) <= w.c.committed {
- if bytes.Compare(w.c.payload[int(offset):int(offset)+len(p)], p) != 0 {
- panic(2)
- return 0, errors.New("Can not change contents of allready committed chunk bytes")
- }
- panic(3)
- return len(p), nil
- }
- n = w.c.committed - int(offset)
- p = p[n:]
- offset = int64(w.c.committed)
- }
- if w.c.committed+len(p) > len(w.c.payload) {
- panic(4)
- return 0, errors.New("In memory chunk does not have enough space available")
- }
- n += copy(w.c.payload[w.c.committed:], p)
- w.c.committed += n
- return
-}
-
-type InMemoryChunkFactory struct {
-}
-
-func (f InMemoryChunkFactory) New(size int) Chunk {
- return &InMemoryChunk{
- status: api.ChunkStatus_CREATED,
- payload: make([]byte, size),
- committed: 0}
-}
diff --git a/pfs/chunk/in_memory_test.go b/pfs/chunk/in_memory_test.go
deleted file mode 100644
index b9711ca..0000000
--- a/pfs/chunk/in_memory_test.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package chunk
-
-import (
- "bytes"
- "testing"
-)
-
-func TestConcurrentReads(t *testing.T) {
- c := InMemoryChunkFactory{}.New(4)
- if _, err := c.WriterAt().WriteAt([]byte("abcd"), 0); err != nil {
- panic(err)
- }
- d1 := make([]byte, 2)
- d2 := make([]byte, 3)
- if _, err := c.ReaderAt().ReadAt(d1, 0); err != nil {
- t.Error(err)
- }
- if bytes.Compare(d1, []byte("ab")) != 0 {
- t.Errorf("Expected: %s\nActual: %s", "ab", d1)
- }
- if _, err := c.ReaderAt().ReadAt(d2, 0); err != nil {
- t.Error(err)
- }
- if bytes.Compare(d2, []byte("abc")) != 0 {
- t.Errorf("Expected: %s\nActual: %s", "abc", d2)
- }
-}
diff --git a/pfs/chunk/remote.go b/pfs/chunk/remote.go
deleted file mode 100644
index 6d84241..0000000
--- a/pfs/chunk/remote.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package chunk
-
-import (
- "context"
- "io"
-
- "github.com/giolekva/pcloud/pfs/api"
-)
-
-type RemoteChunk struct {
- chunkId string
- client api.ChunkStorageClient
-}
-
-func (r *RemoteChunk) Stats() (info ChunkInfo, err error) {
- resp, err := r.client.GetChunkStatus(
- context.Background(),
- &api.GetChunkStatusRequest{ChunkId: r.chunkId})
- if err != nil {
- return
- }
- info = ChunkInfo{
- resp.Status,
- int(resp.TotalBytes),
- int(resp.CommittedBytes)}
- return
-}
-
-func (r *RemoteChunk) ReaderAt() io.ReaderAt {
- return &remoteChunkReaderAt{
- chunkId: r.chunkId,
- client: r.client}
-}
-
-func (r *RemoteChunk) WriterAt() io.WriterAt {
- return &remoteChunkWriterAt{
- chunkId: r.chunkId,
- client: r.client}
-}
-
-type remoteChunkReaderAt struct {
- chunkId string
- client api.ChunkStorageClient
-}
-
-func (c *remoteChunkReaderAt) ReadAt(p []byte, offset int64) (n int, err error) {
- req := api.ReadChunkRequest{
- ChunkId: c.chunkId,
- Offset: int32(offset),
- NumBytes: int32(len(p))}
- resp, err := c.client.ReadChunk(context.Background(), &req)
- if err != nil {
- return
- }
- n = copy(p, resp.Data)
- return
-}
-
-type remoteChunkWriterAt struct {
- chunkId string
- client api.ChunkStorageClient
-}
-
-func (c *remoteChunkWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
- req := api.WriteChunkRequest{
- ChunkId: c.chunkId,
- Offset: int32(offset),
- Data: p}
- resp, err := c.client.WriteChunk(context.Background(), &req)
- if resp != nil {
- n = int(resp.BytesWritten)
- }
- return
-}
diff --git a/pfs/chunk/replicator.go b/pfs/chunk/replicator.go
deleted file mode 100644
index d8990a7..0000000
--- a/pfs/chunk/replicator.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package chunk
-
-import (
- "context"
- "io"
-
- "google.golang.org/grpc"
-
- "github.com/giolekva/pcloud/pfs/api"
-)
-
-type ReplicaAssignmentChangeListener interface {
- Primary(chunkId, currentPrimary string) <-chan string
-}
-
-type PrimaryReplicaChangeListener interface {
- ChunkId() string
- Address() <-chan string
-}
-
-type NonChangingReplicaAssignment struct {
-}
-
-func (l *NonChangingReplicaAssignment) Primary(chunkId, address string) <-chan string {
- ch := make(chan string, 1)
- ch <- address
- return ch
-}
-
-func replicate(ctx context.Context, dst, src Chunk, done chan<- int) {
- dstInfo, err := dst.Stats()
- if err != nil {
- panic(err)
- }
- inp := src.ReaderAt()
- replicated := dstInfo.Committed
- out := dst.WriterAt()
- for {
- select {
- default:
- p := make([]byte, 100)
- n, err := inp.ReadAt(p, int64(replicated))
- if n > 0 {
- m, _ := out.WriteAt(p[:n], int64(replicated))
- replicated += m
- }
- if err == io.EOF {
- done <- 1
- return
- }
-
- case <-ctx.Done():
- return
- }
- }
-}
-
-func ReplicateFromPrimary(ctx context.Context, chunkId string, dst Chunk, primaryAddressCh <-chan string) {
- var done chan int
- var cancel context.CancelFunc = nil
- for {
- select {
- case <-done:
- return
- case <-ctx.Done():
- return
- case address := <-primaryAddressCh:
- if cancel != nil {
- cancel()
- }
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- conn, err := grpc.Dial(address, opts...)
- if err == nil {
- continue
- }
- client := api.NewChunkStorageClient(conn)
- src := RemoteChunk{chunkId, client}
- replicatorCtx, cancelFn := context.WithCancel(context.Background())
- cancel = cancelFn
- done = make(chan int, 1)
- go replicate(replicatorCtx, dst, &src, done)
- }
- }
-}
-
-func WriteToPrimary(ctx context.Context, chunkId string, src Chunk, primaryAddressCh <-chan string) {
- var done chan int
- var cancel context.CancelFunc = nil
- for {
- select {
- case <-done:
- return
- case <-ctx.Done():
- return
- case address := <-primaryAddressCh:
- if cancel != nil {
- cancel()
- }
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- conn, err := grpc.Dial(address, opts...)
- if err != nil {
- continue
- }
- client := api.NewChunkStorageClient(conn)
- dst := RemoteChunk{chunkId, client}
- replicatorCtx, cancelFn := context.WithCancel(context.Background())
- cancel = cancelFn
- done = make(chan int, 1)
- go replicate(replicatorCtx, &dst, src, done)
- }
- }
-}
diff --git a/pfs/chunk/server.go b/pfs/chunk/server.go
deleted file mode 100644
index d619f13..0000000
--- a/pfs/chunk/server.go
+++ /dev/null
@@ -1,123 +0,0 @@
-package chunk
-
-import (
- "context"
- "fmt"
- "log"
-
- "sync"
-
- "github.com/giolekva/pcloud/pfs/api"
-)
-
-type ChunkServer struct {
- factory ChunkFactory
- assignmentChangeLis ReplicaAssignmentChangeListener
- chunks sync.Map
- replicatorCancel sync.Map
-}
-
-func NewChunkServer(factory ChunkFactory,
- assignmentChangeLis ReplicaAssignmentChangeListener) *ChunkServer {
- return &ChunkServer{
- factory: factory,
- assignmentChangeLis: assignmentChangeLis}
-}
-
-func (s *ChunkServer) ListChunks(
- ctx context.Context,
- req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
- resp = &api.ListChunksResponse{}
- s.chunks.Range(func(k, v interface{}) bool {
- resp.ChunkId = append(resp.ChunkId, k.(string))
- return true
- })
- return
-}
-
-func (s *ChunkServer) CreateChunk(
- ctx context.Context,
- req *api.CreateChunkRequest) (resp *api.CreateChunkResponse, err error) {
- chunk := s.factory.New(int(req.Size))
- s.chunks.Store(req.ChunkId, chunk)
- switch req.Role {
- case api.ReplicaRole_SECONDARY:
- ctx, cancel := context.WithCancel(context.Background())
- s.replicatorCancel.Store(req.ChunkId, cancel)
- primaryAddressCh := s.assignmentChangeLis.Primary(
- req.ChunkId, req.PrimaryAddress)
- go ReplicateFromPrimary(ctx, req.ChunkId, chunk, primaryAddressCh)
- case api.ReplicaRole_PRIMARY:
- {
- }
- }
- resp = &api.CreateChunkResponse{}
- log.Printf("Created chunk: %s\n", req.ChunkId)
- return
-
-}
-
-func (s *ChunkServer) GetChunkStatus(
- ctx context.Context,
- req *api.GetChunkStatusRequest) (resp *api.GetChunkStatusResponse, err error) {
- if chunk, ok := s.chunks.Load(req.ChunkId); ok {
- c := chunk.(Chunk)
- var info ChunkInfo
- info, err = c.Stats()
- if err != nil {
- return
- }
- resp = &api.GetChunkStatusResponse{
- Status: info.Status,
- TotalBytes: int32(info.Size),
- CommittedBytes: int32(info.Committed)}
- return
- }
- return nil, fmt.Errorf("Could not fund chunk: %s", req.ChunkId)
-}
-
-func (s *ChunkServer) ReadChunk(
- ctx context.Context,
- req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) {
- if value, ok := s.chunks.Load(req.ChunkId); ok {
- chunk := value.(Chunk)
- b := make([]byte, req.NumBytes)
- var n int
- n, err = chunk.ReaderAt().ReadAt(b, int64(req.Offset))
- if n == 0 {
- return
- }
- return &api.ReadChunkResponse{Data: b[:n]}, nil
-
- } else {
- return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
- }
-}
-
-func (s *ChunkServer) WriteChunk(
- ctx context.Context,
- req *api.WriteChunkRequest) (resp *api.WriteChunkResponse, err error) {
- if value, ok := s.chunks.Load(req.ChunkId); ok {
- chunk := value.(Chunk)
- var n int
- n, err = chunk.WriterAt().WriteAt(req.Data, int64(req.Offset))
- if n == 0 {
- return
- }
- return &api.WriteChunkResponse{BytesWritten: int32(n)}, nil
-
- } else {
- return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
- }
-}
-
-func (s *ChunkServer) RemoveChunk(
- ctx context.Context,
- req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
- if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
- cancel.(context.CancelFunc)()
- s.replicatorCancel.Delete(req.ChunkId)
- }
- s.chunks.Delete(req.ChunkId)
- return &api.RemoveChunkResponse{}, nil
-}
diff --git a/pfs/chunk/server_test.go b/pfs/chunk/server_test.go
deleted file mode 100644
index 9549a06..0000000
--- a/pfs/chunk/server_test.go
+++ /dev/null
@@ -1,103 +0,0 @@
-package chunk
-
-import (
- "bytes"
- "context"
- "testing"
-
- "github.com/giolekva/pcloud/pfs/api"
-)
-
-func TestStoreChunk(t *testing.T) {
- s := ChunkServer{factory: &InMemoryChunkFactory{}}
- _, err := s.CreateChunk(context.Background(), &api.CreateChunkRequest{
- ChunkId: "foo",
- Size: 11,
- Role: api.ReplicaRole_PRIMARY})
- if err != nil {
- t.Error(err)
- }
- _, err = s.WriteChunk(context.Background(), &api.WriteChunkRequest{
- ChunkId: "foo",
- Offset: 0,
- Data: []byte("hello world")})
- if err != nil {
- t.Error(err)
- }
-}
-
-func TestStoreAndReadChunk(t *testing.T) {
- s := ChunkServer{factory: &InMemoryChunkFactory{}}
- _, err := s.CreateChunk(context.Background(), &api.CreateChunkRequest{
- ChunkId: "foo",
- Size: 11,
- Role: api.ReplicaRole_PRIMARY})
- if err != nil {
- t.Error(err)
- }
- _, err = s.WriteChunk(context.Background(), &api.WriteChunkRequest{
- ChunkId: "foo",
- Offset: 0,
- Data: []byte("hello world")})
- if err != nil {
- t.Error(err)
- }
- resp, err := s.ReadChunk(context.Background(), &api.ReadChunkRequest{
- ChunkId: "foo",
- NumBytes: 100})
- if err != nil {
- t.Error(err)
- }
- if bytes.Compare(resp.Data, []byte("hello world")) != 0 {
- t.Errorf("Expected: %s\nGot: %s\n", "hello world", resp.Data)
- }
-}
-
-func TestReadWithOffsets(t *testing.T) {
- s := ChunkServer{factory: &InMemoryChunkFactory{}}
- _, err := s.CreateChunk(context.Background(), &api.CreateChunkRequest{
- ChunkId: "foo",
- Size: 11,
- Role: api.ReplicaRole_PRIMARY})
- if err != nil {
- t.Error(err)
- }
- _, err = s.WriteChunk(context.Background(), &api.WriteChunkRequest{
- ChunkId: "foo",
- Offset: 0,
- Data: []byte("hello world")})
- if err != nil {
- t.Error(err)
- }
- resp, err := s.ReadChunk(context.Background(), &api.ReadChunkRequest{
- ChunkId: "foo",
- Offset: 0,
- NumBytes: 2})
- if err != nil {
- t.Error(err)
- }
- if bytes.Compare(resp.Data, []byte("he")) != 0 {
- t.Errorf("Expected: %s\nGot: %s\n", "he", resp.Data)
- }
- resp, err = s.ReadChunk(context.Background(), &api.ReadChunkRequest{
- ChunkId: "foo",
- Offset: 2,
- NumBytes: 2})
- if err != nil {
- t.Error(err)
- }
- if bytes.Compare(resp.Data, []byte("ll")) != 0 {
- t.Errorf("Expected: %s\nGot: %s\n", "ll", resp.Data)
- }
- resp, err = s.ReadChunk(context.Background(), &api.ReadChunkRequest{
- ChunkId: "foo",
- Offset: 4,
- NumBytes: 100})
- if err != nil {
- t.Error(err)
- }
- if bytes.Compare(resp.Data, []byte("o world")) != 0 {
- t.Errorf("Expected: %s\nGot: %s\n", "o world", resp.Data)
- }
-
-}
diff --git a/pfs/chunk_server.go b/pfs/chunk_server.go
deleted file mode 100644
index 04a266e..0000000
--- a/pfs/chunk_server.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package main
-
-import (
- "context"
- "flag"
- "fmt"
- "log"
- "net"
- "time"
-
- "google.golang.org/grpc"
-
- "github.com/giolekva/pcloud/pfs/api"
- "github.com/giolekva/pcloud/pfs/chunk"
-)
-
-var controllerAddress = flag.String("controller", "localhost:123", "Metadata storage address.")
-var selfAddress = flag.String("self", "", "Metadata storage address.")
-
-func main() {
- flag.Parse()
- log.Print("Chunk server starting")
-
- // Create Master server client.
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- conn, err := grpc.Dial(*controllerAddress, opts...)
- if err != nil {
- log.Fatalf("Failed to dial %s: %v", *controllerAddress, err)
- }
- defer conn.Close()
- client := api.NewMetadataStorageClient(conn)
-
- // Register current Chunk server with Master.
- ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
- _, err = client.AddChunkServer(
- ctx,
- &api.AddChunkServerRequest{Address: *selfAddress})
- if err != nil {
- log.Fatalf("failed to register chunk server: %v", err)
- }
- log.Print("Registered myself")
-
- // Start RPC server
- lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 234))
- if err != nil {
- log.Fatalf("failed to listen: %v", err)
- }
- server := grpc.NewServer()
- api.RegisterChunkStorageServer(server, chunk.NewChunkServer(
- &chunk.InMemoryChunkFactory{},
- &chunk.NonChangingReplicaAssignment{}))
- server.Serve(lis)
-}
diff --git a/pfs/client.go b/pfs/client.go
deleted file mode 100644
index 2abb47f..0000000
--- a/pfs/client.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package main
-
-import (
- "flag"
- "log"
- "os"
-
- "google.golang.org/grpc"
-
- "github.com/giolekva/pcloud/pfs/api"
- "github.com/giolekva/pcloud/pfs/client"
-)
-
-var controllerAddress = flag.String("controller", "localhost:123", "Metadata storage address.")
-var fileToUpload = flag.String("file", "", "File path to upload.")
-
-func main() {
- flag.Parse()
-
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- conn, err := grpc.Dial(*controllerAddress, opts...)
- if err != nil {
- log.Fatalf("Failed to dial %s: %v", *controllerAddress, err)
- }
- defer conn.Close()
- uploader := client.NewFileUploader(api.NewMetadataStorageClient(conn))
-
- f, err := os.Open(*fileToUpload)
- if err != nil {
- panic(err)
- }
-
- uploader.Upload(f)
-}
diff --git a/pfs/client/client.go b/pfs/client/client.go
deleted file mode 100644
index a8492a3..0000000
--- a/pfs/client/client.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package client
-
-import (
- "context"
- "os"
-
- "github.com/giolekva/pcloud/pfs/api"
- "github.com/giolekva/pcloud/pfs/chunk"
-)
-
-type FileUploader struct {
- client api.MetadataStorageClient
-}
-
-func NewFileUploader(client api.MetadataStorageClient) *FileUploader {
- return &FileUploader{client}
-}
-
-func (fu *FileUploader) Upload(f *os.File, numReplicas int) {
- info, err := f.Stat()
- if err != nil {
- return
- }
- resp, err := fu.client.CreateBlob(
- context.Background(), &api.CreateBlobRequest{
- SizeBytes: int32(info.Size()),
- NumReplicas: int32(numReplicas)})
- if err != nil {
- panic(err)
- }
- if len(resp.Chunk) != 1 {
- panic(resp)
- }
- lis := &chunk.NonChangingReplicaAssignment{}
- primaryAddressCh := lis.Primary(
- resp.Chunk[0].ChunkId,
- resp.Chunk[0].Server[0])
- chunk.WriteToPrimary(
- context.Background(),
- resp.Chunk[0].ChunkId,
- chunk.NewReadOnlyFileChunk(f, 0, int(info.Size())),
- primaryAddressCh)
-}
diff --git a/pfs/client/client_test.go b/pfs/client/client_test.go
deleted file mode 100644
index b9f3002..0000000
--- a/pfs/client/client_test.go
+++ /dev/null
@@ -1,62 +0,0 @@
-package client
-
-import (
- "os"
- "testing"
-
- "google.golang.org/grpc"
-
- "github.com/giolekva/pcloud/pfs/api"
- pt "github.com/giolekva/pcloud/pfs/testing"
-)
-
-func TestUploadSmallFile(t *testing.T) {
- env, err := pt.NewInMemoryEnv(1)
- if err != nil {
- t.Error(err)
- }
- defer env.Stop()
-
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- conn, err := grpc.Dial("unix:///tmp/pcloud/controller", opts...)
- if err != nil {
- t.Error(err)
- }
- defer conn.Close()
- client := api.NewMetadataStorageClient(conn)
-
- uploader := NewFileUploader(client)
- f, err := os.Open("testdata/foo")
- if err != nil {
- t.Error(err)
- }
- uploader.Upload(f, 1)
-
-}
-
-func TestUploadSmallFileWithReplication(t *testing.T) {
- env, err := pt.NewInMemoryEnv(3)
- if err != nil {
- t.Error(err)
- }
- defer env.Stop()
-
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- conn, err := grpc.Dial("unix:///tmp/pcloud/controller", opts...)
- if err != nil {
- t.Error(err)
- }
- defer conn.Close()
- client := api.NewMetadataStorageClient(conn)
-
- uploader := NewFileUploader(client)
- f, err := os.Open("testdata/foo")
- if err != nil {
- t.Error(err)
- }
- uploader.Upload(f, 2)
-}
diff --git a/pfs/client/testdata/foo b/pfs/client/testdata/foo
deleted file mode 100644
index 257cc56..0000000
--- a/pfs/client/testdata/foo
+++ /dev/null
@@ -1 +0,0 @@
-foo
diff --git a/pfs/controller/chunk.go b/pfs/controller/chunk.go
deleted file mode 100644
index 5fb694f..0000000
--- a/pfs/controller/chunk.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package controller
-
-import (
- "github.com/giolekva/pcloud/pfs/api"
-)
-
-type chunkServerStatus int
-
-const (
- Healthy chunkServerStatus = iota
- UNREACHABLE
-)
-
-type chunkServer struct {
- address string
- status chunkServerStatus
- chunks map[string]api.ChunkStatus
-}
diff --git a/pfs/controller/server.go b/pfs/controller/server.go
deleted file mode 100644
index 885ad55..0000000
--- a/pfs/controller/server.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package controller
-
-import (
- "context"
- "log"
- "math/rand"
-
- "github.com/google/uuid"
- "google.golang.org/grpc"
-
- "github.com/giolekva/pcloud/pfs/api"
-)
-
-type chunkServers struct {
- address string
-}
-
-type BlobStatus int
-
-const (
- NEW BlobStatus = iota
-)
-
-type ChunkStatus int
-
-const (
- ASSIGNED ChunkStatus = iota
- STORED
-)
-
-type chunkReplica struct {
- chunkServer string
- status ChunkStatus
-}
-
-type chunk struct {
- id string
- replica []chunkReplica
-}
-
-type blob struct {
- id string
- status BlobStatus
- chunks []chunk
-}
-
-type MasterServer struct {
- chunkServers []*chunkServer
- blobs []*blob
-}
-
-func NewMasterServer() *MasterServer {
- return &MasterServer{}
-}
-
-func (s *MasterServer) AddChunkServer(
- ctx context.Context,
- req *api.AddChunkServerRequest) (*api.AddChunkServerResponse, error) {
- s.chunkServers = append(s.chunkServers, &chunkServer{
- address: req.Address,
- status: Healthy})
- log.Printf("Registered Chunk server: %s", req.Address)
- return &api.AddChunkServerResponse{}, nil
-}
-
-func (s *MasterServer) CreateBlob(
- ctx context.Context,
- req *api.CreateBlobRequest) (*api.CreateBlobResponse, error) {
- if int(req.NumReplicas) > len(s.chunkServers) {
- return nil, nil
- }
- resp := api.CreateBlobResponse{
- BlobId: uuid.New().String(),
- Chunk: []*api.ChunkStorageMetadata{
- {ChunkId: uuid.New().String()},
- }}
- assigned := 0
- chunkId := resp.Chunk[0].ChunkId
- for i := range rand.Perm(len(s.chunkServers)) {
- if assigned == int(req.NumReplicas) {
- break
- }
- address := s.chunkServers[i].address
- log.Printf(address)
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- conn, err := grpc.Dial(address, opts...)
- if err != nil {
- continue
- }
- defer conn.Close()
- client := api.NewChunkStorageClient(conn)
- createChunkReq := api.CreateChunkRequest{
- ChunkId: chunkId,
- Size: req.SizeBytes}
- if assigned == 0 {
- createChunkReq.Role = api.ReplicaRole_PRIMARY
- } else {
- createChunkReq.Role = api.ReplicaRole_SECONDARY
- createChunkReq.PrimaryAddress = resp.Chunk[0].Server[0]
- }
- _, err = client.CreateChunk(ctx, &createChunkReq)
- if err == nil {
- resp.Chunk[0].Server = append(resp.Chunk[0].Server, address)
- assigned++
- }
- }
- return &resp, nil
-}
-
-func (s *MasterServer) GetBlobMetadata(
- ctx context.Context,
- req *api.GetBlobMetadataRequest) (*api.GetBlobMetadataResponse, error) {
- return nil, nil
-}
diff --git a/pfs/controller_server.go b/pfs/controller_server.go
deleted file mode 100644
index c905ba3..0000000
--- a/pfs/controller_server.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package main
-
-import (
- "flag"
- "fmt"
- "log"
- "net"
-
- "google.golang.org/grpc"
-
- "github.com/giolekva/pcloud/pfs/api"
- "github.com/giolekva/pcloud/pfs/controller"
-)
-
-var port = flag.Int("port", 123, "Port to listen on.")
-
-func main() {
- flag.Parse()
- log.Print("Master server starting")
- lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
- if err != nil {
- log.Fatalf("Failed to listen on port %d: %v", *port, err)
- }
- log.Printf("Listening on port: %d", *port)
- server := grpc.NewServer()
- api.RegisterMetadataStorageServer(server, controller.NewMasterServer())
- log.Print("Master serving")
- server.Serve(lis)
-}
diff --git a/pfs/testing/in_memory_env.go b/pfs/testing/in_memory_env.go
deleted file mode 100644
index c3410be..0000000
--- a/pfs/testing/in_memory_env.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package testing
-
-import (
- "context"
- "fmt"
- "net"
- "syscall"
- "time"
-
- "google.golang.org/grpc"
-
- "github.com/giolekva/pcloud/pfs/api"
- "github.com/giolekva/pcloud/pfs/chunk"
- "github.com/giolekva/pcloud/pfs/controller"
-)
-
-type InMemoryEnv struct {
- m *grpc.Server
- c []*grpc.Server
- controllerConn *grpc.ClientConn
-}
-
-func NewInMemoryEnv(numChunkServers int) (*InMemoryEnv, error) {
- env := new(InMemoryEnv)
- syscall.Unlink("/tmp/pcloud/controller")
- lis, err := net.Listen("unix", "/tmp/pcloud/controller")
- if err != nil {
- return nil, err
- }
- server := grpc.NewServer()
- api.RegisterMetadataStorageServer(server, controller.NewMasterServer())
- go server.Serve(lis)
-
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- conn, err := grpc.Dial("unix:/tmp/pcloud/controller", opts...)
- if err != nil {
- return nil, err
- }
- env.controllerConn = conn
- client := api.NewMetadataStorageClient(conn)
-
- env.c = make([]*grpc.Server, numChunkServers)
- for i := 0; i < numChunkServers; i++ {
- unixSocket := fmt.Sprintf("/tmp/pcloud/chunk-%d", i)
- syscall.Unlink(unixSocket)
- lis, err := net.Listen("unix", unixSocket)
- if err != nil {
- return nil, err
- }
- server := grpc.NewServer()
- api.RegisterChunkStorageServer(server, chunk.NewChunkServer(
- &chunk.InMemoryChunkFactory{},
- &chunk.NonChangingReplicaAssignment{}))
- go server.Serve(lis)
- env.c[i] = server
- }
-
- for i := 0; i < numChunkServers; i++ {
- ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
- _, err = client.AddChunkServer(
- ctx,
- &api.AddChunkServerRequest{Address: fmt.Sprintf("unix:///tmp/pcloud/chunk-%d", i)})
- if err != nil {
- return nil, err
- }
- }
- return env, nil
-}
-
-func (e *InMemoryEnv) Stop() {
- if e.controllerConn != nil {
- e.controllerConn.Close()
- }
- for _, s := range e.c {
- if s != nil {
- s.GracefulStop()
- }
- }
- if e.m != nil {
- e.m.GracefulStop()
- }
-}
diff --git a/pfs/testing/simple_test.go b/pfs/testing/simple_test.go
deleted file mode 100644
index d8e2510..0000000
--- a/pfs/testing/simple_test.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package testing
-
-import (
- "testing"
-)
-
-func TestSetup(t *testing.T) {
- env, err := NewInMemoryEnv(3)
- if err != nil {
- t.Error(err)
- }
- defer env.Stop()
-}