Files
kubernetes/pkg/proxy/conntrack/cleanup_test.go
Dan Winship f5969adb14 Clean up NewServiceChangeTracker/NewEndpointsChangeTracker args
Remove the now-unused event recorders, and put the remaining args into
a sensible order, and consistent between the two.
2024-12-14 12:12:42 -05:00

376 lines
12 KiB
Go

//go:build linux
// +build linux
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package conntrack
import (
"fmt"
"sort"
"testing"
"github.com/stretchr/testify/require"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/proxy"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
const (
testServiceName = "cleanup-test"
testServiceNamespace = "test"
testIPFamily = v1.IPv4Protocol
testClusterIP = "172.30.1.1"
testExternalIP = "192.168.99.100"
testLoadBalancerIP = "1.2.3.4"
testServingEndpointIP = "10.240.0.4"
testNonServingEndpointIP = "10.240.1.5"
testDeletedEndpointIP = "10.240.2.6"
testPort = 8000
testNodePort = 32000
)
func TestCleanStaleEntries(t *testing.T) {
// We need to construct proxy.ServicePortMap and proxy.EndpointsMap to pass to
// CleanStaleEntries. ServicePortMap and EndpointsMap are just maps, but there are
// no public constructors for any implementation of proxy.ServicePort and
// proxy.EndpointsMap, so we have to either provide our own implementation of that
// interface, or else use a proxy.ServiceChangeTracker and proxy.NewEndpointsChangeTracker
// to construct them and fill in the maps for us.
sct := proxy.NewServiceChangeTracker(v1.IPv4Protocol, nil, nil)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: testServiceName,
Namespace: testServiceNamespace,
},
Spec: v1.ServiceSpec{
ClusterIP: testClusterIP,
ExternalIPs: []string{testExternalIP},
Ports: []v1.ServicePort{
{
Name: "test-tcp",
Port: testPort,
Protocol: v1.ProtocolTCP,
},
{
Name: "test-udp",
Port: testPort,
NodePort: testNodePort,
Protocol: v1.ProtocolUDP,
},
{
Name: "test-sctp",
Port: testPort,
NodePort: testNodePort,
Protocol: v1.ProtocolSCTP,
},
},
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{{
IP: testLoadBalancerIP,
}},
},
},
}
sct.Update(nil, svc)
svcPortMap := make(proxy.ServicePortMap)
_ = svcPortMap.Update(sct)
ect := proxy.NewEndpointsChangeTracker(v1.IPv4Protocol, "test-worker", nil, nil)
eps := &discovery.EndpointSlice{
TypeMeta: metav1.TypeMeta{},
AddressType: discovery.AddressTypeIPv4,
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-0", testServiceName),
Namespace: testServiceNamespace,
Labels: map[string]string{discovery.LabelServiceName: testServiceName},
},
Endpoints: []discovery.Endpoint{
{
Addresses: []string{testServingEndpointIP},
Conditions: discovery.EndpointConditions{Serving: ptr.To(true)},
},
{
Addresses: []string{testNonServingEndpointIP},
Conditions: discovery.EndpointConditions{Serving: ptr.To(false)},
},
},
Ports: []discovery.EndpointPort{
{
Name: ptr.To("test-tcp"),
Port: ptr.To(int32(testPort)),
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("test-udp"),
Port: ptr.To(int32(testPort)),
Protocol: ptr.To(v1.ProtocolUDP),
},
{
Name: ptr.To("test-sctp"),
Port: ptr.To(int32(testPort)),
Protocol: ptr.To(v1.ProtocolSCTP),
},
},
}
ect.EndpointSliceUpdate(eps, false)
endpointsMap := make(proxy.EndpointsMap)
_ = endpointsMap.Update(ect)
tcpPortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: svc.Namespace,
Name: svc.Name,
},
Port: svc.Spec.Ports[0].Name,
Protocol: svc.Spec.Ports[0].Protocol,
}
udpPortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: svc.Namespace,
Name: svc.Name,
},
Port: svc.Spec.Ports[1].Name,
Protocol: svc.Spec.Ports[1].Protocol,
}
sctpPortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: svc.Namespace,
Name: svc.Name,
},
Port: svc.Spec.Ports[2].Name,
Protocol: svc.Spec.Ports[2].Protocol,
}
// Sanity-check to make sure we constructed the ServicePortMap correctly
if len(svcPortMap) != 3 {
t.Fatalf("expected svcPortMap to have 2 entries, got %+v", svcPortMap)
}
servicePort := svcPortMap[tcpPortName]
if servicePort == nil || servicePort.String() != "172.30.1.1:8000/TCP" {
t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:8000/TCP\", got %q", tcpPortName.String(), servicePort.String())
}
servicePort = svcPortMap[udpPortName]
if servicePort == nil || servicePort.String() != "172.30.1.1:8000/UDP" {
t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:8000/UDP\", got %q", udpPortName.String(), servicePort.String())
}
servicePort = svcPortMap[sctpPortName]
if servicePort == nil || servicePort.String() != "172.30.1.1:8000/SCTP" {
t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:8000/SCTP\", got %q", sctpPortName.String(), servicePort.String())
}
// Sanity-check to make sure we constructed the EndpointsMap map correctly
if len(endpointsMap) != 3 {
t.Fatalf("expected endpointsMap to have 3 entries, got %+v", endpointsMap)
}
for _, svcPortName := range []proxy.ServicePortName{tcpPortName, udpPortName, sctpPortName} {
if len(endpointsMap[svcPortName]) != 2 {
t.Fatalf("expected endpointsMap[%q] to have 2 entries, got %+v", svcPortName.String(), endpointsMap[svcPortName])
}
if endpointsMap[svcPortName][0].IP() != "10.240.0.4" {
t.Fatalf("expected endpointsMap[%q][0] IP to be \"10.240.0.4\", got \"%s\"", svcPortName.String(), endpointsMap[svcPortName][0].IP())
}
if endpointsMap[svcPortName][1].IP() != "10.240.1.5" {
t.Fatalf("expected endpointsMap[%q][1] IP to be \"10.240.1.5\", got \"%s\"", svcPortName.String(), endpointsMap[svcPortName][1].IP())
}
if !endpointsMap[svcPortName][0].IsServing() {
t.Fatalf("expected endpointsMap[%q][0] to be serving", svcPortName.String())
}
if endpointsMap[svcPortName][1].IsServing() {
t.Fatalf("expected endpointsMap[%q][1] to be not serving", svcPortName.String())
}
}
// mock existing entries before cleanup
// we create 36 fake flow entries ( 3 Endpoints * 3 Protocols * ( 3 (ServiceIPs) + 1 (NodePort))
var mockEntries []*netlink.ConntrackFlow
// expectedEntries are the entries on which we will assert the cleanup logic
var expectedEntries []*netlink.ConntrackFlow
for _, dnatDest := range []string{testServingEndpointIP, testNonServingEndpointIP, testDeletedEndpointIP} {
for _, proto := range []uint8{unix.IPPROTO_TCP, unix.IPPROTO_UDP, unix.IPPROTO_SCTP} {
for _, origDest := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(origDest),
Protocol: proto,
},
Reverse: netlink.IPTuple{
Protocol: proto,
SrcIP: netutils.ParseIPSloppy(dnatDest),
},
}
mockEntries = append(mockEntries, entry)
// we do not expect deleted or non-serving UDP endpoints flows to be present after cleanup
if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) {
expectedEntries = append(expectedEntries, entry)
}
}
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstPort: testNodePort,
Protocol: proto,
},
Reverse: netlink.IPTuple{
Protocol: proto,
SrcIP: netutils.ParseIPSloppy(dnatDest),
},
}
mockEntries = append(mockEntries, entry)
// we do not expect deleted or non-serving UDP endpoints entries to be present after cleanup
if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) {
expectedEntries = append(expectedEntries, entry)
}
}
}
// add some non-DNATed mock entries which should be cleared up by reconciler
// These will exist if the proxy don't have DROP/REJECT rule for service with
// no endpoints, --orig-dst and --reply-src will be same for these entries.
for _, ip := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(ip),
Protocol: unix.IPPROTO_UDP,
},
Reverse: netlink.IPTuple{
Protocol: unix.IPPROTO_UDP,
SrcIP: netutils.ParseIPSloppy(ip),
},
}
mockEntries = append(mockEntries, entry)
}
fake := NewFake()
fake.entries = mockEntries
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily])
require.Equal(t, len(expectedEntries), len(actualEntries))
// sort the actual flows before comparison
sort.Slice(actualEntries, func(i, j int) bool {
return actualEntries[i].String() < actualEntries[j].String()
})
// sort the expected flows before comparison
sort.Slice(expectedEntries, func(i, j int) bool {
return expectedEntries[i].String() < expectedEntries[j].String()
})
for i := 0; i < len(expectedEntries); i++ {
require.Equal(t, expectedEntries[i], actualEntries[i])
}
}
func TestFilterForNAT(t *testing.T) {
testCases := []struct {
name string
orig string
dest string
protocol v1.Protocol
expectedFilter *conntrackFilter
}{
{
name: "ipv4 + SCTP",
orig: "10.96.0.10",
dest: "10.244.0.3",
protocol: v1.ProtocolSCTP,
expectedFilter: &conntrackFilter{
protocol: 132,
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("10.96.0.10")},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("10.244.0.3")},
},
},
{
name: "ipv6 + UDP",
orig: "2001:db8:1::2",
dest: "4001:ab8::2",
protocol: v1.ProtocolUDP,
expectedFilter: &conntrackFilter{
protocol: 17,
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("2001:db8:1::2")},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("4001:ab8::2")},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedFilter, filterForNAT(tc.orig, tc.dest, tc.protocol))
})
}
}
func TestFilterForPortNAT(t *testing.T) {
testCases := []struct {
name string
dest string
port int
protocol v1.Protocol
expectedFamily netlink.InetFamily
expectedFilter *conntrackFilter
}{
{
name: "ipv4 + TCP",
dest: "10.96.0.10",
port: 80,
protocol: v1.ProtocolTCP,
expectedFilter: &conntrackFilter{
protocol: 6,
original: &connectionTuple{dstPort: 80},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("10.96.0.10")},
},
},
{
name: "ipv6 + UDP",
dest: "2001:db8:1::2",
port: 8000,
protocol: v1.ProtocolUDP,
expectedFilter: &conntrackFilter{
protocol: 17,
original: &connectionTuple{dstPort: 8000},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("2001:db8:1::2")},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedFilter, filterForPortNAT(tc.dest, tc.port, tc.protocol))
})
}
}