feat: initial support for NATS as Datastore (#442)

This commit is contained in:
TheCodeAssassin
2024-04-22 15:31:35 +02:00
committed by GitHub
parent a849a84fd0
commit 28a098af21
26 changed files with 565 additions and 27 deletions

5
.gitignore vendored
View File

@@ -32,5 +32,8 @@ bin
**/*.key
**/*.pem
**/*.csr
**/server-csr.json
.DS_Store
**/server-csr.json
!deploy/kine/mysql/server-csr.json
!deploy/kine/nats/server-csr.json

View File

@@ -134,6 +134,10 @@ datastore-postgres:
_datastore-etcd:
$(HELM) upgrade --install etcd-$(NAME) clastix/kamaji-etcd --create-namespace -n etcd-system --set datastore.enabled=true
_datastore-nats:
$(MAKE) NAME=$(NAME) NAMESPACE=nats-system -C deploy/kine/nats nats
kubectl apply -f $(shell pwd)/config/samples/kamaji_v1alpha1_datastore_nats_$(NAME).yaml
datastore-etcd: helm
$(HELM) repo add clastix https://clastix.github.io/charts
$(HELM) repo update
@@ -141,7 +145,14 @@ datastore-etcd: helm
$(MAKE) NAME=silver _datastore-etcd
$(MAKE) NAME=gold _datastore-etcd
datastores: datastore-mysql datastore-etcd datastore-postgres ## Install all Kamaji DataStores with multiple drivers, and different tiers.
datastore-nats: helm
$(HELM) repo add nats https://nats-io.github.io/k8s/helm/charts/
$(HELM) repo update
$(MAKE) NAME=bronze _datastore-nats
$(MAKE) NAME=silver _datastore-nats
$(MAKE) NAME=gold _datastore-nats
datastores: datastore-mysql datastore-etcd datastore-postgres datastore-nats ## Install all Kamaji DataStores with multiple drivers, and different tiers.
##@ Build

View File

@@ -8,7 +8,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +kubebuilder:validation:Enum=etcd;MySQL;PostgreSQL
// +kubebuilder:validation:Enum=etcd;MySQL;PostgreSQL;NATS
type Driver string
@@ -16,6 +16,7 @@ var (
EtcdDriver Driver = "etcd"
KineMySQLDriver Driver = "MySQL"
KinePostgreSQLDriver Driver = "PostgreSQL"
KineNatsDriver Driver = "NATS"
)
// +kubebuilder:validation:MinItems=1

View File

@@ -118,6 +118,7 @@ spec:
- etcd
- MySQL
- PostgreSQL
- NATS
type: string
endpoints:
description: |-

View File

@@ -121,6 +121,7 @@ spec:
- etcd
- MySQL
- PostgreSQL
- NATS
type: string
endpoints:
description: |-

View File

@@ -127,6 +127,7 @@ spec:
- etcd
- MySQL
- PostgreSQL
- NATS
type: string
endpoints:
description: |-

View File

@@ -0,0 +1,34 @@
apiVersion: kamaji.clastix.io/v1alpha1
kind: DataStore
metadata:
name: nats-bronze
spec:
driver: NATS
endpoints:
- bronze-nats.nats-system.svc:4222
basicAuth:
username:
content: YWRtaW4=
password:
secretReference:
name: nats-bronze-config
namespace: nats-system
keyPath: password
tlsConfig:
certificateAuthority:
certificate:
secretReference:
name: nats-bronze-config
namespace: nats-system
keyPath: ca.crt
clientCertificate:
certificate:
secretReference:
name: nats-bronze-config
namespace: nats-system
keyPath: server.crt
privateKey:
secretReference:
name: nats-bronze-config
namespace: nats-system
keyPath: server.key

View File

@@ -0,0 +1,34 @@
apiVersion: kamaji.clastix.io/v1alpha1
kind: DataStore
metadata:
name: nats-gold
spec:
driver: NATS
endpoints:
- nats-gold.nats-system.svc:4222
basicAuth:
username:
content: YWRtaW4=
password:
secretReference:
name: nats-gold-config
namespace: nats-system
keyPath: password
tlsConfig:
certificateAuthority:
certificate:
secretReference:
name: nats-gold-config
namespace: nats-system
keyPath: ca.crt
clientCertificate:
certificate:
secretReference:
name: nats-gold-config
namespace: nats-system
keyPath: server.crt
privateKey:
secretReference:
name: nats-gold-config
namespace: nats-system
keyPath: server.key

View File

@@ -0,0 +1,34 @@
apiVersion: kamaji.clastix.io/v1alpha1
kind: DataStore
metadata:
name: nats-silver
spec:
driver: NATS
endpoints:
- nats-silver.nats-system.svc:4222
basicAuth:
username:
content: YWRtaW4=
password:
secretReference:
name: nats-silver-config
namespace: nats-system
keyPath: password
tlsConfig:
certificateAuthority:
certificate:
secretReference:
name: nats-silver-config
namespace: nats-system
keyPath: ca.crt
clientCertificate:
certificate:
secretReference:
name: nats-silver-config
namespace: nats-system
keyPath: server.crt
privateKey:
secretReference:
name: nats-silver-config
namespace: nats-system
keyPath: server.key

View File

@@ -26,7 +26,7 @@ nodes:
## expose port 31132 of the node to port 31132 on the host for konnectivity
- containerPort: 31132
hostPort: 31132
protocol: TCP
protocol: TCP
## expose port 31443 of the node to port 31443 on the host
- containerPort: 31443
hostPort: 31443

View File

@@ -1,14 +0,0 @@
{
"CN": "bronze.mysql-system.svc.cluster.local",
"key": {
"algo": "rsa",
"size": 2048
},
"hosts": [
"127.0.0.1",
"localhost",
"bronze",
"bronze.mysql-system.svc",
"bronze.mysql-system.svc.cluster.local"
]
}

38
deploy/kine/nats/Makefile Normal file
View File

@@ -0,0 +1,38 @@
ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST))))
NAME:=default
NAMESPACE:=nats-system
.PHONY: helm
HELM = $(shell pwd)/../../../bin/helm
helm: ## Download helm locally if necessary.
$(call go-install-tool,$(HELM),helm.sh/helm/v3/cmd/helm@v3.9.0)
nats: nats-certificates nats-secret nats-deployment
nats-certificates:
rm -rf $(ROOT_DIR)/certs/$(NAME) && mkdir -p $(ROOT_DIR)/certs/$(NAME)
cfssl gencert -initca $(ROOT_DIR)/ca-csr.json | cfssljson -bare $(ROOT_DIR)/certs/$(NAME)/ca
@mv $(ROOT_DIR)/certs/$(NAME)/ca.pem $(ROOT_DIR)/certs/$(NAME)/ca.crt
@mv $(ROOT_DIR)/certs/$(NAME)/ca-key.pem $(ROOT_DIR)/certs/$(NAME)/ca.key
@NAME=$(NAME) NAMESPACE=$(NAMESPACE) envsubst < server-csr.json > $(ROOT_DIR)/certs/$(NAME)/server-csr.json
cfssl gencert -ca=$(ROOT_DIR)/certs/$(NAME)/ca.crt -ca-key=$(ROOT_DIR)/certs/$(NAME)/ca.key \
-config=$(ROOT_DIR)/config.json -profile=server \
$(ROOT_DIR)/certs/$(NAME)/server-csr.json | cfssljson -bare $(ROOT_DIR)/certs/$(NAME)/server
@mv $(ROOT_DIR)/certs/$(NAME)/server.pem $(ROOT_DIR)/certs/$(NAME)/server.crt
@mv $(ROOT_DIR)/certs/$(NAME)/server-key.pem $(ROOT_DIR)/certs/$(NAME)/server.key
chmod 644 $(ROOT_DIR)/certs/$(NAME)/*
nats-secret:
@kubectl create namespace $(NAMESPACE) || true
@kubectl -n $(NAMESPACE) create secret generic nats-$(NAME)-config \
--from-file=$(ROOT_DIR)/certs/$(NAME)/ca.crt --from-file=$(ROOT_DIR)/certs/$(NAME)/ca.key \
--from-file=$(ROOT_DIR)/certs/$(NAME)/server.key --from-file=$(ROOT_DIR)/certs/$(NAME)/server.crt \
--from-literal=password=password \
--dry-run=client -o yaml | kubectl apply -f -
nats-deployment:
@NAME=$(NAME) envsubst < $(ROOT_DIR)/values.yaml | $(HELM) upgrade --install $(NAME) nats/nats --create-namespace -n nats-system -f -
nats-destroy:
@NAME=$(NAME) envsubst < $(ROOT_DIR)/nats.yaml | kubectl -n $(NAMESPACE) delete --ignore-not-found -f -
@kubectl delete -n $(NAMESPACE) secret mysql-$(NAME)config --ignore-not-found

View File

@@ -0,0 +1,18 @@
{
"CN": "Clastix CA",
"key": {
"algo": "rsa",
"size": 2048
},
"names": [
{
"C": "IT",
"ST": "Italy",
"L": "Milan"
}
],
"hosts": [
"127.0.0.1",
"localhost"
]
}

View File

@@ -0,0 +1,18 @@
{
"signing": {
"default": {
"expiry": "8760h"
},
"profiles": {
"server": {
"expiry": "8760h",
"usages": [
"signing",
"key encipherment",
"server auth",
"client auth"
]
}
}
}
}

View File

@@ -0,0 +1,14 @@
{
"CN": "$NAME.$NAMESPACE.svc.cluster.local",
"key": {
"algo": "rsa",
"size": 2048
},
"hosts": [
"127.0.0.1",
"localhost",
"$NAME-nats",
"$NAME-nats.$NAMESPACE.svc",
"$NAME-nats.$NAMESPACE.svc.cluster.local"
]
}

View File

@@ -0,0 +1,20 @@
config:
merge:
accounts:
private:
jetstream: enabled
users:
- {user: admin, password: "password", permissions: {subscribe: [">"], publish: [">"]}}
cluster:
enabled: no
nats:
tls:
enabled: true
secretName: nats-$NAME-config
cert: server.crt
key: server.key
jetstream:
enabled: true
fileStore:
pvc:
size: 32Mi

View File

@@ -14,6 +14,48 @@ On the Management Cluster, install one of the alternative supported datastore:
`$ make -C deploy/kine/postgresql postgresql`
- **NATS**
*Note: NATS SUPPORT IS EXPERIMENTAL: Currently multi-tenancy is NOT supported when using NATS as an alternative datastore*
Currently, only username/password auth is supported.
```bash
cat << EOF > values-nats.yaml
config:
merge:
accounts:
private:
jetstream: enabled
users:
- {user: admin, password: "password", permissions: {subscribe: [">"], publish: [">"]}}
cluster:
enabled: no
nats:
tls:
enabled: true
secretName: nats-config
cert: server.crt
key: server.key
jetstream:
enabled: true
fileStore:
pvc:
size: 32Mi
EOF
```
```bash
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats/nats \
-f values-nats.yaml
--namespace nats-system \
--create-namespace
```
## Install Cert Manager
As prerequisite for Kamaji, install the Cert Manager
@@ -31,7 +73,7 @@ helm install \
## Install Kamaji
Use Helm to install the Kamaji Operator and make sure it uses a datastore with the proper driver `datastore.driver=<MySQL|PostgreSQL>`.
Use Helm to install the Kamaji Operator and make sure it uses a datastore with the proper driver `datastore.driver=<MySQL|PostgreSQL|NATS>`.
For example, with a PostreSQL datastore installed:

View File

@@ -89,7 +89,7 @@ DataStoreSpec defines the desired state of DataStore.
<td>
The driver to use to connect to the shared datastore.<br/>
<br/>
<i>Enum</i>: etcd, MySQL, PostgreSQL<br/>
<i>Enum</i>: etcd, MySQL, PostgreSQL, NATS<br/>
</td>
<td>true</td>
</tr><tr>

View File

@@ -0,0 +1,54 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package e2e
import (
"context"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pointer "k8s.io/utils/ptr"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
)
var _ = Describe("Deploy a TenantControlPlane resource with the NATS driver", func() {
// Fill TenantControlPlane object
tcp := &kamajiv1alpha1.TenantControlPlane{
ObjectMeta: metav1.ObjectMeta{
Name: "nats",
Namespace: "default",
},
Spec: kamajiv1alpha1.TenantControlPlaneSpec{
DataStore: "nats-bronze",
ControlPlane: kamajiv1alpha1.ControlPlane{
Deployment: kamajiv1alpha1.DeploymentSpec{
Replicas: pointer.To(int32(1)),
},
Service: kamajiv1alpha1.ServiceSpec{
ServiceType: "ClusterIP",
},
},
Kubernetes: kamajiv1alpha1.KubernetesSpec{
Version: "v1.23.6",
Kubelet: kamajiv1alpha1.KubeletSpec{
CGroupFS: "cgroupfs",
},
},
},
}
// Create a TenantControlPlane resource into the cluster
JustBeforeEach(func() {
Expect(k8sClient.Create(context.Background(), tcp)).NotTo(HaveOccurred())
})
// Delete the TenantControlPlane resource after test is finished
JustAfterEach(func() {
Expect(k8sClient.Delete(context.Background(), tcp)).Should(Succeed())
})
// Check if TenantControlPlane resource has been created
It("Should be Ready", func() {
StatusMustEqualTo(tcp, kamajiv1alpha1.VersionReady)
})
})

10
go.mod
View File

@@ -84,6 +84,7 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/lithammer/dedent v1.1.0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
@@ -98,6 +99,9 @@ require (
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nats.go v1.34.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
@@ -124,14 +128,14 @@ require (
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.18.0 // indirect

14
go.sum
View File

@@ -1378,6 +1378,8 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -1469,6 +1471,12 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
@@ -1875,6 +1883,8 @@ golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -2224,6 +2234,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240208230135-b75ee8823808/go.mod h1:KG1lNk5ZFNssSZLrpVb4sMXKMpGwGXOxSG3rnu2gZQQ=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -2245,6 +2257,8 @@ golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@@ -708,7 +708,7 @@ func (d Deployment) buildKubeAPIServerCommand(tenantControlPlane kamajiv1alpha1.
}
switch d.DataStore.Spec.Driver {
case kamajiv1alpha1.KineMySQLDriver, kamajiv1alpha1.KinePostgreSQLDriver:
case kamajiv1alpha1.KineMySQLDriver, kamajiv1alpha1.KinePostgreSQLDriver, kamajiv1alpha1.KineNatsDriver:
desiredArgs["--etcd-servers"] = "http://127.0.0.1:2379"
case kamajiv1alpha1.EtcdDriver:
httpsEndpoints := make([]string, 0, len(d.DataStore.Spec.Endpoints))
@@ -867,6 +867,8 @@ func (d Deployment) buildKine(podSpec *corev1.PodSpec, tcp kamajiv1alpha1.Tenant
args["--endpoint"] = "mysql://$(DB_USER):$(DB_PASSWORD)@tcp($(DB_CONNECTION_STRING))/$(DB_SCHEMA)"
case kamajiv1alpha1.KinePostgreSQLDriver:
args["--endpoint"] = "postgres://$(DB_USER):$(DB_PASSWORD)@$(DB_CONNECTION_STRING)/$(DB_SCHEMA)"
case kamajiv1alpha1.KineNatsDriver:
args["--endpoint"] = "nats://$(DB_USER):$(DB_PASSWORD)@$(DB_CONNECTION_STRING)?bucket=$(DB_SCHEMA)"
}
args["--ca-file"] = "/certs/ca.crt"

View File

@@ -33,6 +33,8 @@ func NewStorageConnection(ctx context.Context, client client.Client, ds kamajiv1
return NewPostgreSQLConnection(*cc)
case kamajiv1alpha1.EtcdDriver:
return NewETCDConnection(*cc)
case kamajiv1alpha1.KineNatsDriver:
return NewNATSConnection(*cc)
default:
return nil, fmt.Errorf("%s is not a valid driver", ds.Spec.Driver)
}

184
internal/datastore/nats.go Normal file
View File

@@ -0,0 +1,184 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package datastore
import (
"context"
"fmt"
"strings"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
)
// NATSConnection represents a connection to a NATS KV store.
type NATSConnection struct {
js nats.JetStreamContext
conn *nats.Conn
config ConnectionConfig
}
// NewNATSConnection initializes a connection to NATS and sets up the KV store.
func NewNATSConnection(config ConnectionConfig) (*NATSConnection, error) {
var endpoints string
if len(config.Endpoints) > 1 {
// comma separated list of endpoints
var ep []string
for _, e := range config.Endpoints {
ep = append(ep, fmt.Sprintf("nats://%s", e.String()))
}
endpoints = strings.Join(ep, ",")
} else {
endpoints = fmt.Sprintf("nats://%s", config.Endpoints[0].String())
}
var conn *nats.Conn
var err error
var natsOpts []nats.Option
if config.TLSConfig != nil {
natsOpts = append(natsOpts, nats.Secure(config.TLSConfig))
}
if config.User != "" && config.Password != "" {
natsOpts = append(natsOpts, nats.UserInfo(config.User, config.Password))
}
conn, err = nats.Connect(endpoints, natsOpts...)
if err != nil {
return nil, err
}
js, err := conn.JetStream()
if err != nil {
return nil, err
}
return &NATSConnection{
js: js,
conn: conn,
config: config,
}, nil
}
func (nc *NATSConnection) CreateUser(_ context.Context, _, _ string) error {
return nil
}
func (nc *NATSConnection) CreateDB(_ context.Context, dbName string) error {
_, err := nc.js.CreateKeyValue(&nats.KeyValueConfig{Bucket: dbName})
if err != nil {
return errors.Wrap(err, "unable to create the datastore")
}
return nil
}
func (nc *NATSConnection) GrantPrivileges(_ context.Context, _, _ string) error {
return nil
}
func (nc *NATSConnection) UserExists(_ context.Context, _ string) (bool, error) {
return true, nil
}
func (nc *NATSConnection) DBExists(_ context.Context, dbName string) (bool, error) {
_, err := nc.js.KeyValue(dbName)
if err != nil {
if errors.Is(err, nats.ErrBucketNotFound) {
return false, nil
}
return false, err
}
return true, nil
}
func (nc *NATSConnection) GrantPrivilegesExists(_ context.Context, _, _ string) (bool, error) {
return true, nil
}
func (nc *NATSConnection) DeleteUser(_ context.Context, _ string) error {
return nil
}
func (nc *NATSConnection) DeleteDB(_ context.Context, dbName string) error {
err := nc.js.DeleteKeyValue(dbName)
return err
}
func (nc *NATSConnection) RevokePrivileges(_ context.Context, _, _ string) error {
return nil
}
func (nc *NATSConnection) GetConnectionString() string {
return nc.config.Endpoints[0].String()
}
func (nc *NATSConnection) Close() error {
return nc.conn.Drain()
}
func (nc *NATSConnection) Check(_ context.Context) error {
status := nc.conn.Status()
if status != nats.CONNECTED {
return errors.New("connection to NATS is not established")
}
return nil
}
func (nc *NATSConnection) Driver() string {
return string(kamajiv1alpha1.KineNatsDriver)
}
func (nc *NATSConnection) GetConfig() ConnectionConfig {
return nc.config
}
func (nc *NATSConnection) Migrate(ctx context.Context, tcp kamajiv1alpha1.TenantControlPlane, target Connection) error {
targetClient := target.(*NATSConnection) //nolint:forcetypeassert
dbName := tcp.Status.Storage.Setup.Schema
targetKv, err := targetClient.js.KeyValue(dbName)
if err != nil {
return err
}
sourceKv, err := nc.js.KeyValue(dbName)
if err != nil {
return err
}
if err := target.Check(ctx); err != nil {
return err
}
// copy all keys from source to target
keys, err := sourceKv.Keys()
if err != nil {
return err
}
for _, key := range keys {
entry, err := sourceKv.Get(key)
if err != nil {
return err
}
_, err = targetKv.Put(key, entry.Value())
if err != nil {
return err
}
}
return nil
}

View File

@@ -128,7 +128,7 @@ func (r *Certificate) mutate(ctx context.Context, tenantControlPlane *kamajiv1al
return err
}
case kamajiv1alpha1.KineMySQLDriver, kamajiv1alpha1.KinePostgreSQLDriver:
case kamajiv1alpha1.KineMySQLDriver, kamajiv1alpha1.KinePostgreSQLDriver, kamajiv1alpha1.KineNatsDriver:
var crtBytes, keyBytes []byte
// For the SQL drivers we just need to copy the certificate, since the basic authentication is used
// to connect to the desired schema and database.

View File

@@ -104,9 +104,10 @@ func (r *Config) UpdateTenantControlPlaneStatus(_ context.Context, tenantControl
return nil
}
func (r *Config) mutate(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) controllerutil.MutateFn {
func (r *Config) mutate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) controllerutil.MutateFn {
return func() error {
var password []byte
var username []byte
hash := utilities.GetObjectChecksum(r.resource)
switch {
@@ -133,10 +134,31 @@ func (r *Config) mutate(_ context.Context, tenantControlPlane *kamajiv1alpha1.Te
finalizersList.Insert(finalizers.DatastoreSecretFinalizer)
r.resource.SetFinalizers(finalizersList.UnsortedList())
// TODO(thecodeassassin): remove this after multi-tenancy is implemented for NATS.
// Due to NATS is missing a programmatic approach to create users and password,
// we're using the Datastore root password.
if r.DataStore.Spec.Driver == kamajiv1alpha1.KineNatsDriver {
// set username and password to the basicAuth values of the NATS datastore
u, err := r.DataStore.Spec.BasicAuth.Username.GetContent(ctx, r.Client)
if err != nil {
return errors.Wrap(err, "failed to retrieve the username for the NATS datastore")
}
p, err := r.DataStore.Spec.BasicAuth.Password.GetContent(ctx, r.Client)
if err != nil {
return errors.Wrap(err, "failed to retrieve the password for the NATS datastore")
}
username = u
password = p
} else {
username = coalesceFn(tenantControlPlane.Status.Storage.Setup.User)
}
r.resource.Data = map[string][]byte{
"DB_CONNECTION_STRING": []byte(r.ConnString),
"DB_SCHEMA": coalesceFn(tenantControlPlane.Status.Storage.Setup.Schema),
"DB_USER": coalesceFn(tenantControlPlane.Status.Storage.Setup.User),
"DB_USER": username,
"DB_PASSWORD": password,
}