| Giorgi Lekveishvili | 285ab62 | 2023-11-22 13:50:45 +0400 | [diff] [blame] | 1 | # pylint: disable=W0613 |
| 2 | |
| 3 | # Copyright (C) 2022 The Android Open Source Project |
| 4 | # |
| 5 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | # you may not use this file except in compliance with the License. |
| 7 | # You may obtain a copy of the License at |
| 8 | # |
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | # |
| 11 | # Unless required by applicable law or agreed to in writing, software |
| 12 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | |
| 17 | import base64 |
| 18 | import json |
| 19 | import warnings |
| 20 | |
| 21 | from kubernetes import client, config |
| 22 | |
| 23 | import pytest |
| 24 | |
| 25 | from .helm.client import HelmClient |
| 26 | |
| 27 | |
| 28 | class Cluster: |
| 29 | def __init__(self, kube_config): |
| 30 | self.kube_config = kube_config |
| 31 | |
| 32 | self.image_pull_secrets = [] |
| 33 | self.namespaces = [] |
| 34 | |
| 35 | context = self._load_kube_config() |
| 36 | self.helm = HelmClient(self.kube_config, context) |
| 37 | |
| 38 | def _load_kube_config(self): |
| 39 | config.load_kube_config(config_file=self.kube_config) |
| 40 | _, context = config.list_kube_config_contexts(config_file=self.kube_config) |
| 41 | return context["name"] |
| 42 | |
| 43 | def _apply_image_pull_secrets(self, namespace): |
| 44 | for ips in self.image_pull_secrets: |
| 45 | try: |
| 46 | client.CoreV1Api().create_namespaced_secret(namespace, ips) |
| 47 | except client.rest.ApiException as exc: |
| 48 | if exc.status == 409 and exc.reason == "Conflict": |
| 49 | warnings.warn( |
| 50 | "Kubernetes Cluster not empty. Image pull secret already exists." |
| 51 | ) |
| 52 | else: |
| 53 | raise exc |
| 54 | |
| 55 | def add_container_registry(self, secret_name, url, user, pwd): |
| 56 | data = { |
| 57 | "auths": { |
| 58 | url: { |
| 59 | "auth": base64.b64encode(str.encode(f"{user}:{pwd}")).decode( |
| 60 | "utf-8" |
| 61 | ) |
| 62 | } |
| 63 | } |
| 64 | } |
| 65 | metadata = client.V1ObjectMeta(name=secret_name) |
| 66 | self.image_pull_secrets.append( |
| 67 | client.V1Secret( |
| 68 | api_version="v1", |
| 69 | kind="Secret", |
| 70 | metadata=metadata, |
| 71 | type="kubernetes.io/dockerconfigjson", |
| 72 | data={ |
| 73 | ".dockerconfigjson": base64.b64encode( |
| 74 | json.dumps(data).encode() |
| 75 | ).decode("utf-8") |
| 76 | }, |
| 77 | ) |
| 78 | ) |
| 79 | |
| 80 | def create_namespace(self, name): |
| 81 | namespace_metadata = client.V1ObjectMeta(name=name) |
| 82 | namespace_body = client.V1Namespace( |
| 83 | kind="Namespace", api_version="v1", metadata=namespace_metadata |
| 84 | ) |
| 85 | client.CoreV1Api().create_namespace(body=namespace_body) |
| 86 | self.namespaces.append(name) |
| 87 | self._apply_image_pull_secrets(name) |
| 88 | |
| 89 | def delete_namespace(self, name): |
| 90 | if name not in self.namespaces: |
| 91 | return |
| 92 | |
| 93 | client.CoreV1Api().delete_namespace(name, body=client.V1DeleteOptions()) |
| 94 | self.namespaces.remove(name) |
| 95 | |
| 96 | def cleanup(self): |
| 97 | while self.namespaces: |
| 98 | self.helm.delete_all( |
| 99 | namespace=self.namespaces[0], |
| 100 | ) |
| 101 | self.delete_namespace(self.namespaces[0]) |
| 102 | |
| 103 | |
| 104 | @pytest.fixture(scope="session") |
| 105 | def test_cluster(request): |
| 106 | kube_config = request.config.getoption("--kubeconfig") |
| 107 | |
| 108 | test_cluster = Cluster(kube_config) |
| 109 | test_cluster.add_container_registry( |
| 110 | "image-pull-secret", |
| 111 | request.config.getoption("--registry"), |
| 112 | request.config.getoption("--registry-user"), |
| 113 | request.config.getoption("--registry-pwd"), |
| 114 | ) |
| 115 | |
| 116 | yield test_cluster |
| 117 | |
| 118 | test_cluster.cleanup() |
| 119 | |
| 120 | |
| 121 | @pytest.fixture(scope="session") |
| 122 | def ldap_credentials(test_cluster): |
| 123 | ldap_secret = client.CoreV1Api().read_namespaced_secret( |
| 124 | "openldap-users", namespace="openldap" |
| 125 | ) |
| 126 | users = base64.b64decode(ldap_secret.data["users"]).decode("utf-8").split(",") |
| 127 | passwords = ( |
| 128 | base64.b64decode(ldap_secret.data["passwords"]).decode("utf-8").split(",") |
| 129 | ) |
| 130 | credentials = {} |
| 131 | for i, user in enumerate(users): |
| 132 | credentials[user] = passwords[i] |
| 133 | |
| 134 | yield credentials |
| 135 | |
| 136 | |
| 137 | @pytest.fixture(scope="session") |
| 138 | def ldap_admin_credentials(test_cluster): |
| 139 | ldap_secret = client.CoreV1Api().read_namespaced_secret( |
| 140 | "openldap-admin", namespace="openldap" |
| 141 | ) |
| 142 | password = base64.b64decode(ldap_secret.data["adminpassword"]).decode("utf-8") |
| 143 | |
| 144 | yield ("admin", password) |