Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/guides/multicluster/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ tee $TILT_OVERRIDES_PATH <<EOF
global:
conf:
apiServerOverrides:
- gvk: cortex.cloud/v1alpha1/DecisionList
host: https://host.docker.internal:8444
caCert: |
$(cat /tmp/root-ca-remote.pem | sed 's/^/ /')
- gvk: cortex.cloud/v1alpha1/Decision
host: https://host.docker.internal:8444
caCert: |
Expand Down
19 changes: 8 additions & 11 deletions internal/scheduling/explanation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,15 @@ func (c *Controller) StartupCallback(ctx context.Context) error {
// This function sets up the controller with the provided manager.
func (c *Controller) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error {
if !c.SkipIndexFields {
gvk, err := mcl.GVKFromHomeScheme(&v1alpha1.Decision{})
if err != nil {
return err
ctx := context.Background()
obj := &v1alpha1.Decision{}
lst := &v1alpha1.DecisionList{}
idx := "spec.resourceID"
fnc := func(obj client.Object) []string {
decision := obj.(*v1alpha1.Decision)
return []string{decision.Spec.ResourceID}
}
cluster := mcl.ClusterForResource(gvk)
if err := cluster.GetCache().IndexField(
context.Background(), &v1alpha1.Decision{}, "spec.resourceID",
func(obj client.Object) []string {
decision := obj.(*v1alpha1.Decision)
return []string{decision.Spec.ResourceID}
},
); err != nil {
if err := mcl.IndexField(ctx, obj, lst, idx, fnc); err != nil {
return err
}
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/multicluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,29 @@ func (c *subResourceClient) Patch(ctx context.Context, obj client.Object, patch
func (c *subResourceClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...client.SubResourceApplyOption) error {
return errors.New("apply operation is not supported in multicluster subresource client")
}

// Index a field for a resource in the matching cluster's cache.
// Usually, you want to index the same field in both the object and list type,
// as both would be mapped to individual clients based on their GVK.
func (c *Client) IndexField(ctx context.Context, obj client.Object, list client.ObjectList, field string, extractValue client.IndexerFunc) error {
gvkObj, err := c.GVKFromHomeScheme(obj)
if err != nil {
return err
}
if err := c.ClusterForResource(gvkObj).
GetCache().
IndexField(ctx, obj, field, extractValue); err != nil {
return err
}
// Index the object in the list cluster as well.
gvkList, err := c.GVKFromHomeScheme(list)
if err != nil {
return err
}
if err := c.ClusterForResource(gvkList).
GetCache().
IndexField(ctx, obj, field, extractValue); err != nil {
return err
}
return nil
}
110 changes: 110 additions & 0 deletions pkg/multicluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/cluster"
Expand All @@ -38,19 +39,62 @@ func (u *unknownType) DeepCopyObject() runtime.Object {
return &unknownType{TypeMeta: u.TypeMeta, ObjectMeta: u.ObjectMeta}
}

// fakeCache implements cache.Cache interface for testing IndexField.
type fakeCache struct {
cache.Cache
indexFieldFunc func(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error
// Track calls to IndexField for verification
indexFieldCalls []indexFieldCall
mu sync.Mutex
}

type indexFieldCall struct {
obj client.Object
field string
}

func (f *fakeCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
f.mu.Lock()
f.indexFieldCalls = append(f.indexFieldCalls, indexFieldCall{obj: obj, field: field})
f.mu.Unlock()
if f.indexFieldFunc != nil {
return f.indexFieldFunc(ctx, obj, field, extractValue)
}
return nil
}

func (f *fakeCache) getIndexFieldCalls() []indexFieldCall {
f.mu.Lock()
defer f.mu.Unlock()
return f.indexFieldCalls
}

// fakeCluster implements cluster.Cluster interface for testing.
type fakeCluster struct {
cluster.Cluster
fakeClient client.Client
fakeCache *fakeCache
}

func (f *fakeCluster) GetClient() client.Client {
return f.fakeClient
}

func (f *fakeCluster) GetCache() cache.Cache {
return f.fakeCache
}

func newFakeCluster(scheme *runtime.Scheme, objs ...client.Object) *fakeCluster {
return &fakeCluster{
fakeClient: fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build(),
fakeCache: &fakeCache{},
}
}

func newFakeClusterWithCache(scheme *runtime.Scheme, fakeCache *fakeCache, objs ...client.Object) *fakeCluster {
return &fakeCluster{
fakeClient: fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build(),
fakeCache: fakeCache,
}
}

Expand Down Expand Up @@ -1449,3 +1493,69 @@ func TestClient_StatusAndSubResource_ErrorOnUnknownType(t *testing.T) {
t.Error("expected error for unknown type in subresource Patch")
}
}

// TestClient_IndexField_WithRemoteClusters tests IndexField with remote clusters configured.
func TestClient_IndexField_WithRemoteClusters(t *testing.T) {
scheme := newTestScheme(t)

homeCache := &fakeCache{}
homeCluster := newFakeClusterWithCache(scheme, homeCache)

remoteObjCache := &fakeCache{}
remoteObjCluster := newFakeClusterWithCache(scheme, remoteObjCache)

remoteListCache := &fakeCache{}
remoteListCluster := newFakeClusterWithCache(scheme, remoteListCache)

objGVK := schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "ConfigMap",
}
listGVK := schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "ConfigMapList",
}

c := &Client{
HomeCluster: homeCluster,
HomeScheme: scheme,
remoteClusters: map[schema.GroupVersionKind]cluster.Cluster{
objGVK: remoteObjCluster,
listGVK: remoteListCluster,
},
}

ctx := context.Background()

obj := &corev1.ConfigMap{}
list := &corev1.ConfigMapList{}
field := "metadata.name"
extractValue := func(obj client.Object) []string {
return []string{obj.GetName()}
}

err := c.IndexField(ctx, obj, list, field, extractValue)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// Verify IndexField was called on the remote object cluster's cache
objCalls := remoteObjCache.getIndexFieldCalls()
if len(objCalls) != 1 {
t.Errorf("expected 1 IndexField call on remote object cluster, got %d", len(objCalls))
}

// Verify IndexField was called on the remote list cluster's cache
listCalls := remoteListCache.getIndexFieldCalls()
if len(listCalls) != 1 {
t.Errorf("expected 1 IndexField call on remote list cluster, got %d", len(listCalls))
}

// Verify home cluster cache was NOT called
homeCalls := homeCache.getIndexFieldCalls()
if len(homeCalls) != 0 {
t.Errorf("expected 0 IndexField calls on home cluster, got %d", len(homeCalls))
}
}