/*
|
Copyright 2019 The Kubernetes Authors.
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
you may not use this file except in compliance with the License.
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
distributed under the License is distributed on an "AS IS" BASIS,
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
See the License for the specific language governing permissions and
|
limitations under the License.
|
*/
|
|
package events
|
|
import (
|
"strconv"
|
"testing"
|
"time"
|
|
"os"
|
"strings"
|
|
v1 "k8s.io/api/core/v1"
|
eventsv1 "k8s.io/api/events/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/client-go/kubernetes/scheme"
|
restclient "k8s.io/client-go/rest"
|
ref "k8s.io/client-go/tools/reference"
|
)
|
|
type testEventSeriesSink struct {
|
OnCreate func(e *eventsv1.Event) (*eventsv1.Event, error)
|
OnUpdate func(e *eventsv1.Event) (*eventsv1.Event, error)
|
OnPatch func(e *eventsv1.Event, p []byte) (*eventsv1.Event, error)
|
}
|
|
// Create records the event for testing.
|
func (t *testEventSeriesSink) Create(e *eventsv1.Event) (*eventsv1.Event, error) {
|
if t.OnCreate != nil {
|
return t.OnCreate(e)
|
}
|
return e, nil
|
}
|
|
// Update records the event for testing.
|
func (t *testEventSeriesSink) Update(e *eventsv1.Event) (*eventsv1.Event, error) {
|
if t.OnUpdate != nil {
|
return t.OnUpdate(e)
|
}
|
return e, nil
|
}
|
|
// Patch records the event for testing.
|
func (t *testEventSeriesSink) Patch(e *eventsv1.Event, p []byte) (*eventsv1.Event, error) {
|
if t.OnPatch != nil {
|
return t.OnPatch(e, p)
|
}
|
return e, nil
|
}
|
|
func TestEventSeriesf(t *testing.T) {
|
hostname, _ := os.Hostname()
|
|
testPod := &v1.Pod{
|
ObjectMeta: metav1.ObjectMeta{
|
Name: "foo",
|
Namespace: "baz",
|
UID: "bar",
|
},
|
}
|
|
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
|
if err != nil {
|
t.Fatal(err)
|
}
|
|
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
|
if err != nil {
|
t.Fatal(err)
|
}
|
|
expectedEvent := &eventsv1.Event{
|
ObjectMeta: metav1.ObjectMeta{
|
Name: "foo",
|
Namespace: "baz",
|
},
|
EventTime: metav1.MicroTime{time.Now()},
|
ReportingController: "eventTest",
|
ReportingInstance: "eventTest-" + hostname,
|
Action: "started",
|
Reason: "test",
|
Regarding: *regarding,
|
Related: related,
|
Note: "some verbose message: 1",
|
Type: v1.EventTypeNormal,
|
}
|
|
isomorphicEvent := expectedEvent.DeepCopy()
|
|
nonIsomorphicEvent := expectedEvent.DeepCopy()
|
nonIsomorphicEvent.Action = "stopped"
|
|
expectedEvent.Series = &eventsv1.EventSeries{Count: 1}
|
table := []struct {
|
regarding k8sruntime.Object
|
related k8sruntime.Object
|
actual *eventsv1.Event
|
elements []interface{}
|
expect *eventsv1.Event
|
expectUpdate bool
|
}{
|
{
|
regarding: regarding,
|
related: related,
|
actual: isomorphicEvent,
|
elements: []interface{}{1},
|
expect: expectedEvent,
|
expectUpdate: true,
|
},
|
{
|
regarding: regarding,
|
related: related,
|
actual: nonIsomorphicEvent,
|
elements: []interface{}{1},
|
expect: nonIsomorphicEvent,
|
expectUpdate: false,
|
},
|
}
|
|
stopCh := make(chan struct{})
|
|
createEvent := make(chan *eventsv1.Event)
|
updateEvent := make(chan *eventsv1.Event)
|
patchEvent := make(chan *eventsv1.Event)
|
|
testEvents := testEventSeriesSink{
|
OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
|
createEvent <- event
|
return event, nil
|
},
|
OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
|
updateEvent <- event
|
return event, nil
|
},
|
OnPatch: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
|
// event we receive is already patched, usually the sink uses it only to retrieve the name and namespace, here
|
// we'll use it directly
|
patchEvent <- event
|
return event, nil
|
},
|
}
|
eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*eventsv1.Event{})
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
|
broadcaster := eventBroadcaster.(*eventBroadcasterImpl)
|
// Don't call StartRecordingToSink, as we don't need neither refreshing event
|
// series nor finishing them in this tests and additional events updated would
|
// race with our expected ones.
|
broadcaster.startRecordingEvents(stopCh)
|
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
|
// read from the chan as this was needed only to populate the cache
|
<-createEvent
|
for index, item := range table {
|
actual := item.actual
|
recorder.Eventf(item.regarding, item.related, actual.Type, actual.Reason, actual.Action, actual.Note, item.elements)
|
// validate event
|
if item.expectUpdate {
|
actualEvent := <-patchEvent
|
t.Logf("%v - validating event affected by patch request", index)
|
validateEvent(strconv.Itoa(index), true, actualEvent, item.expect, t)
|
} else {
|
actualEvent := <-createEvent
|
t.Logf("%v - validating event affected by a create request", index)
|
validateEvent(strconv.Itoa(index), false, actualEvent, item.expect, t)
|
}
|
}
|
close(stopCh)
|
}
|
|
func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) {
|
recvEvent := *actualEvent
|
|
// Just check that the timestamp was set.
|
if recvEvent.EventTime.IsZero() {
|
t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent)
|
}
|
|
if expectedUpdate {
|
if recvEvent.Series == nil {
|
t.Errorf("%v - Series was nil but expected: %#v", messagePrefix, recvEvent.Series)
|
|
} else {
|
if recvEvent.Series.Count != expectedEvent.Series.Count {
|
t.Errorf("%v - Series mismatch actual was: %#v but expected: %#v", messagePrefix, recvEvent.Series, expectedEvent.Series)
|
}
|
}
|
|
// Check that name has the right prefix.
|
if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
|
t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en)
|
}
|
} else {
|
if recvEvent.Series != nil {
|
t.Errorf("%v - series was expected to be nil but was: %#v", messagePrefix, recvEvent.Series)
|
}
|
}
|
|
}
|
|
func TestFinishSeries(t *testing.T) {
|
hostname, _ := os.Hostname()
|
testPod := &v1.Pod{
|
ObjectMeta: metav1.ObjectMeta{
|
SelfLink: "/api/v1/namespaces/baz/pods/foo",
|
Name: "foo",
|
Namespace: "baz",
|
UID: "bar",
|
},
|
}
|
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
|
if err != nil {
|
t.Fatal(err)
|
}
|
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
|
if err != nil {
|
t.Fatal(err)
|
}
|
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
|
|
createEvent := make(chan *eventsv1.Event, 10)
|
updateEvent := make(chan *eventsv1.Event, 10)
|
patchEvent := make(chan *eventsv1.Event, 10)
|
testEvents := testEventSeriesSink{
|
OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
|
createEvent <- event
|
return event, nil
|
},
|
OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
|
updateEvent <- event
|
return event, nil
|
},
|
OnPatch: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
|
// event we receive is already patched, usually the sink uses it
|
// only to retrieve the name and namespace, here we'll use it directly
|
patchEvent <- event
|
return event, nil
|
},
|
}
|
cache := map[eventKey]*eventsv1.Event{}
|
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
|
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
|
nonFinishedEvent := cachedEvent.DeepCopy()
|
nonFinishedEvent.ReportingController = "nonFinished-controller"
|
cachedEvent.Series = &eventsv1.EventSeries{
|
Count: 10,
|
LastObservedTime: LastObservedTime,
|
}
|
cache[getKey(cachedEvent)] = cachedEvent
|
cache[getKey(nonFinishedEvent)] = nonFinishedEvent
|
eventBroadcaster.finishSeries()
|
select {
|
case actualEvent := <-patchEvent:
|
t.Logf("validating event affected by patch request")
|
eventBroadcaster.mu.Lock()
|
defer eventBroadcaster.mu.Unlock()
|
if len(cache) != 1 {
|
t.Errorf("cache should be empty, but instead got a size of %v", len(cache))
|
}
|
if !actualEvent.Series.LastObservedTime.Equal(&cachedEvent.Series.LastObservedTime) {
|
t.Errorf("series was expected be seen with LastObservedTime %v, but instead got %v ", cachedEvent.Series.LastObservedTime, actualEvent.Series.LastObservedTime)
|
}
|
// check that we emitted only one event
|
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
|
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
|
}
|
case <-time.After(wait.ForeverTestTimeout):
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
}
|
}
|
|
func TestRefreshExistingEventSeries(t *testing.T) {
|
hostname, _ := os.Hostname()
|
testPod := &v1.Pod{
|
ObjectMeta: metav1.ObjectMeta{
|
SelfLink: "/api/v1/namespaces/baz/pods/foo",
|
Name: "foo",
|
Namespace: "baz",
|
UID: "bar",
|
},
|
}
|
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
|
if err != nil {
|
t.Fatal(err)
|
}
|
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
|
if err != nil {
|
t.Fatal(err)
|
}
|
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
|
createEvent := make(chan *eventsv1.Event, 10)
|
updateEvent := make(chan *eventsv1.Event, 10)
|
patchEvent := make(chan *eventsv1.Event, 10)
|
|
table := []struct {
|
patchFunc func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error)
|
}{
|
{
|
patchFunc: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
|
// event we receive is already patched, usually the sink uses it
|
//only to retrieve the name and namespace, here we'll use it directly.
|
patchEvent <- event
|
return event, nil
|
},
|
},
|
{
|
patchFunc: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
|
// we simulate an apiserver error here
|
patchEvent <- nil
|
return nil, &restclient.RequestConstructionError{}
|
},
|
},
|
}
|
for _, item := range table {
|
testEvents := testEventSeriesSink{
|
OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
|
createEvent <- event
|
return event, nil
|
},
|
OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
|
updateEvent <- event
|
return event, nil
|
},
|
OnPatch: item.patchFunc,
|
}
|
cache := map[eventKey]*eventsv1.Event{}
|
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
|
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
|
cachedEvent.Series = &eventsv1.EventSeries{
|
Count: 10,
|
LastObservedTime: LastObservedTime,
|
}
|
cacheKey := getKey(cachedEvent)
|
cache[cacheKey] = cachedEvent
|
|
eventBroadcaster.refreshExistingEventSeries()
|
select {
|
case <-patchEvent:
|
t.Logf("validating event affected by patch request")
|
eventBroadcaster.mu.Lock()
|
defer eventBroadcaster.mu.Unlock()
|
if len(cache) != 1 {
|
t.Errorf("cache should be with same size, but instead got a size of %v", len(cache))
|
}
|
// check that we emitted only one event
|
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
|
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
|
}
|
cacheEvent, exists := cache[cacheKey]
|
|
if cacheEvent == nil || !exists {
|
t.Errorf("expected event to exist and not being nil, but instead event: %v and exists: %v", cacheEvent, exists)
|
}
|
case <-time.After(wait.ForeverTestTimeout):
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
}
|
}
|
}
|