blob: eb949689b93bd4014802886fcaf16cd2e99b0b63 [file] [log] [blame]
Giorgi Lekveishvili285ab622023-11-22 13:50:45 +04001# pylint: disable=W0613
2
3# Copyright (C) 2022 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import base64
18import json
19import warnings
20
21from kubernetes import client, config
22
23import pytest
24
25from .helm.client import HelmClient
26
27
28class Cluster:
29 def __init__(self, kube_config):
30 self.kube_config = kube_config
31
32 self.image_pull_secrets = []
33 self.namespaces = []
34
35 context = self._load_kube_config()
36 self.helm = HelmClient(self.kube_config, context)
37
38 def _load_kube_config(self):
39 config.load_kube_config(config_file=self.kube_config)
40 _, context = config.list_kube_config_contexts(config_file=self.kube_config)
41 return context["name"]
42
43 def _apply_image_pull_secrets(self, namespace):
44 for ips in self.image_pull_secrets:
45 try:
46 client.CoreV1Api().create_namespaced_secret(namespace, ips)
47 except client.rest.ApiException as exc:
48 if exc.status == 409 and exc.reason == "Conflict":
49 warnings.warn(
50 "Kubernetes Cluster not empty. Image pull secret already exists."
51 )
52 else:
53 raise exc
54
55 def add_container_registry(self, secret_name, url, user, pwd):
56 data = {
57 "auths": {
58 url: {
59 "auth": base64.b64encode(str.encode(f"{user}:{pwd}")).decode(
60 "utf-8"
61 )
62 }
63 }
64 }
65 metadata = client.V1ObjectMeta(name=secret_name)
66 self.image_pull_secrets.append(
67 client.V1Secret(
68 api_version="v1",
69 kind="Secret",
70 metadata=metadata,
71 type="kubernetes.io/dockerconfigjson",
72 data={
73 ".dockerconfigjson": base64.b64encode(
74 json.dumps(data).encode()
75 ).decode("utf-8")
76 },
77 )
78 )
79
80 def create_namespace(self, name):
81 namespace_metadata = client.V1ObjectMeta(name=name)
82 namespace_body = client.V1Namespace(
83 kind="Namespace", api_version="v1", metadata=namespace_metadata
84 )
85 client.CoreV1Api().create_namespace(body=namespace_body)
86 self.namespaces.append(name)
87 self._apply_image_pull_secrets(name)
88
89 def delete_namespace(self, name):
90 if name not in self.namespaces:
91 return
92
93 client.CoreV1Api().delete_namespace(name, body=client.V1DeleteOptions())
94 self.namespaces.remove(name)
95
96 def cleanup(self):
97 while self.namespaces:
98 self.helm.delete_all(
99 namespace=self.namespaces[0],
100 )
101 self.delete_namespace(self.namespaces[0])
102
103
104@pytest.fixture(scope="session")
105def test_cluster(request):
106 kube_config = request.config.getoption("--kubeconfig")
107
108 test_cluster = Cluster(kube_config)
109 test_cluster.add_container_registry(
110 "image-pull-secret",
111 request.config.getoption("--registry"),
112 request.config.getoption("--registry-user"),
113 request.config.getoption("--registry-pwd"),
114 )
115
116 yield test_cluster
117
118 test_cluster.cleanup()
119
120
121@pytest.fixture(scope="session")
122def ldap_credentials(test_cluster):
123 ldap_secret = client.CoreV1Api().read_namespaced_secret(
124 "openldap-users", namespace="openldap"
125 )
126 users = base64.b64decode(ldap_secret.data["users"]).decode("utf-8").split(",")
127 passwords = (
128 base64.b64decode(ldap_secret.data["passwords"]).decode("utf-8").split(",")
129 )
130 credentials = {}
131 for i, user in enumerate(users):
132 credentials[user] = passwords[i]
133
134 yield credentials
135
136
137@pytest.fixture(scope="session")
138def ldap_admin_credentials(test_cluster):
139 ldap_secret = client.CoreV1Api().read_namespaced_secret(
140 "openldap-admin", namespace="openldap"
141 )
142 password = base64.b64decode(ldap_secret.data["adminpassword"]).decode("utf-8")
143
144 yield ("admin", password)