diff --git a/feature/platform/tests/breakout_subscription_test/breakout_subscription_test.go b/feature/platform/tests/breakout_subscription_test/breakout_subscription_test.go index e4c28879de4..87a5214fdfb 100644 --- a/feature/platform/tests/breakout_subscription_test/breakout_subscription_test.go +++ b/feature/platform/tests/breakout_subscription_test/breakout_subscription_test.go @@ -20,12 +20,12 @@ import ( "encoding/binary" "encoding/json" "fmt" + "log" "net" - "regexp" + "slices" "sort" "strconv" "strings" - "sync" "testing" "time" @@ -195,10 +195,6 @@ func incrementMAC(mac string, i int) (string, error) { // configureATE configures the ATE with the testbed topology. func (tc *testCase) configureATE(t *testing.T) { - if len(tc.atePorts) < 2 { - t.Fatalf("Testbed requires at least 2 ports, got: %v", tc.atePorts) - } - p0 := tc.atePorts[2] tc.top.Ports().Add().SetName(p0.ID()) d0 := tc.top.Devices().Add().SetName(ateDst.Name) @@ -222,9 +218,12 @@ func (tc *testCase) configureATE(t *testing.T) { } agg.Protocol().Lacp().SetActorKey(1).SetActorSystemPriority(1).SetActorSystemId("01:01:01:01:01:01") - // Disable FEC for 100G-FR ports because Novus does not support it. + // Gather only the ports actively added to the topology + activePorts := []*ondatra.Port{tc.atePorts[2], tc.atePorts[0]} + + // Disable FEC for active 100G-FR ports because Novus does not support it. var p100gbasefr []string - for _, p := range tc.atePorts { + for _, p := range activePorts { if p.PMD() == ondatra.PMD100GBASEFR { p100gbasefr = append(p100gbasefr, p.ID()) } @@ -326,17 +325,17 @@ func (tc *testCase) configDstDUT(i *oc.Interface, a *attrs.Attributes) { func (tc *testCase) configureDUT(t *testing.T) { t.Logf("dut ports = %v", tc.dutPorts) - if len(tc.dutPorts) < 2 { - t.Fatalf("Testbed requires at least 2 ports, got %d", len(tc.dutPorts)) - } - d := gnmi.OC() if deviations.AggregateAtomicUpdate(tc.dut) { tc.clearAggregateMembers(t) tc.setupAggregateAtomically(t) + + t.Logf("Waiting 15 seconds for the control plane to settle after atomic aggregate setup...") + time.Sleep(15 * time.Second) } + // Configure LACP SECOND if tc.lagType == lagTypeLACP { lacp := &oc.Lacp_Interface{Name: ygot.String(tc.aggID)} lacp.LacpMode = oc.Lacp_LacpActivityType_ACTIVE @@ -346,14 +345,16 @@ func (tc *testCase) configureDUT(t *testing.T) { gnmi.Replace(t, tc.dut, lacpPath.Config(), lacp) } - time.Sleep(5 * time.Second) - + // Create the Aggregate Interface FIRST agg := &oc.Interface{Name: ygot.String(tc.aggID)} tc.configSrcAggregateDUT(agg, &dutSrc) aggPath := d.Interface(tc.aggID) fptest.LogQuery(t, tc.aggID, aggPath.Config(), agg) gnmi.Replace(t, tc.dut, aggPath.Config(), agg) + time.Sleep(5 * time.Second) + + // Configure the destination port dstp := tc.dutPorts[2] dsti := &oc.Interface{Name: ygot.String(dstp.Name())} tc.configDstDUT(dsti, &dutDst) @@ -365,6 +366,8 @@ func (tc *testCase) configureDUT(t *testing.T) { fptest.AssignToNetworkInstance(t, tc.dut, dstp.Name(), deviations.DefaultNetworkInstance(tc.dut), 0) fptest.AssignToNetworkInstance(t, tc.dut, tc.aggID, deviations.DefaultNetworkInstance(tc.dut), 0) } + + // Configure the member ports for _, port := range tc.dutPorts[0:1] { i := &oc.Interface{Name: ygot.String(port.Name())} tc.configSrcMemberDUT(i, port) @@ -372,6 +375,7 @@ func (tc *testCase) configureDUT(t *testing.T) { fptest.LogQuery(t, port.String(), iPath.Config(), i) gnmi.Replace(t, tc.dut, iPath.Config(), i) } + if deviations.ExplicitPortSpeed(tc.dut) { for _, port := range tc.dutPorts { fptest.SetPortSpeed(t, port) @@ -398,7 +402,7 @@ func (tc *testCase) verifyDUT(t *testing.T) { // LinecardReboot performs a linecard reboot. func LinecardReboot(t *testing.T, dut *ondatra.DUTDevice) { - const linecardBoottime = 10 * time.Minute + const linecardBoottime = 20 * time.Minute lcs := components.FindComponentsByType(t, dut, oc.PlatformTypes_OPENCONFIG_HARDWARE_COMPONENT_LINECARD) t.Logf("Found linecard list: %v", lcs) @@ -414,8 +418,8 @@ func LinecardReboot(t *testing.T, dut *ondatra.DUTDevice) { } else { validCards = lcs } - if *args.NumLinecards >= 0 && len(validCards) != *args.NumLinecards { - t.Errorf("Incorrect number of linecards: got %v, want exactly %v (specified by flag)", len(validCards), *args.NumLinecards) + if *args.NumLinecards >= 0 && len(validCards) < *args.NumLinecards { + t.Errorf("Incorrect number of linecards: got %v, want at least %v (specified by flag)", len(validCards), *args.NumLinecards) } if got := len(validCards); got == 0 { @@ -493,7 +497,6 @@ func LinecardReboot(t *testing.T, dut *ondatra.DUTDevice) { gnmi.Await(t, dut, gnmi.OC().Component(removableLinecard).Removable().State(), linecardBoottime, true) helpers.ValidateOperStatusUPIntfs(t, dut, intfsOperStatusUPBeforeReboot, 10*time.Minute) - } // chassisReboot performs a chassis reboot. @@ -575,91 +578,58 @@ func chassisReboot(t *testing.T, dut *ondatra.DUTDevice) { } } -// subscribedUpdates is a function that subscribes to the DUT and returns the updates received. -func subscribedUpdates(t *testing.T, dut *ondatra.DUTDevice, stream gpb.GNMI_SubscribeClient, mu *sync.Mutex, sharedResult *updateResult) ([]*gpb.Notification, error) { - var notifications []*gpb.Notification - startTime := time.Now() - for { - respUpdate, err := stream.Recv() - if err != nil { - return notifications, err - } - if respUpdate == nil { - break - } - if uint64(time.Since(startTime).Seconds()) > 10 { - break - } - if respUpdate.GetUpdate() != nil { - - if n, ok := respUpdate.GetResponse().(*gpb.SubscribeResponse_Update); ok { - notification := n.Update - notifications = append(notifications, notification) - notification.GetUpdate() - // --- Update shared state incrementally --- - mu.Lock() - // Decide: Append or replace? Appending might be safer. - sharedResult.notifications = append(sharedResult.notifications, notification) - // Also update shared error state if applicable - // sharedResult.err = nil // Or potential partial errors - mu.Unlock() - // --- End incremental update --- - - // Now you can work with the 'notification' variable +// startStreamReader creates a single, dedicated goroutine to safely read from the gRPC stream. +// It pipes all incoming updates into a buffered channel to prevent concurrency issues. +func startStreamReader(t testing.TB, stream gpb.GNMI_SubscribeClient) <-chan *gpb.Notification { + notifCh := make(chan *gpb.Notification, 100000) + go func() { + defer close(notifCh) + for { + respUpdate, err := stream.Recv() + if err != nil { + log.Printf("Stream reader terminated (expected at end of test): %v", err) + return + } + if respUpdate != nil && respUpdate.GetUpdate() != nil { + if n, ok := respUpdate.GetResponse().(*gpb.SubscribeResponse_Update); ok { + notifCh <- n.Update + } } - } else { - t.Logf("No updates received ") - break } - } - t.Logf("Notifications received: %v", notifications) - return notifications, nil + }() + return notifCh } -// updateResult is a struct to share the result of the goroutine. -type updateResult struct { - notifications []*gpb.Notification - err error +// drainChannel empties the notification buffer of any background noise +// before we trigger a specific action. +func drainChannel(ch <-chan *gpb.Notification) { + for { + select { + case <-ch: + default: + return + } + } } -type recievedUpdateWithTimeout func(t *testing.T, dut *ondatra.DUTDevice, stream gpb.GNMI_SubscribeClient, mu *sync.Mutex, sharedResult *updateResult) ([]*gpb.Notification, error) -// recieveUpdateWithTimeout is a function that receives updates from the DUT and returns the updates received but with a timeout. -func recieveUpdateWithTimeout(ctx context.Context, t *testing.T, dut *ondatra.DUTDevice, stream gpb.GNMI_SubscribeClient, recieveUpdate recievedUpdateWithTimeout, updateTimeout time.Duration) ([]*gpb.Notification, error) { +// recieveUpdateWithTimeout reads safely from the notification channel until the timeout expires. +func recieveUpdateWithTimeout(ctx context.Context, t *testing.T, notifCh <-chan *gpb.Notification, updateTimeout time.Duration) ([]*gpb.Notification, error) { t.Helper() ctxTimeout, cancelTimeout := context.WithTimeout(ctx, updateTimeout) defer cancelTimeout() - done := make(chan updateResult, 1) - // --- Changes for sharing result --- - var mu sync.Mutex // Mutex to protect sharedResult - var sharedResult updateResult // Variable to hold the latest result from the goroutine - go func() { - _, err := recieveUpdate(t, dut, stream, &mu, &sharedResult) - mu.Lock() - sharedResult.err = err - t.Logf("Goroutine: Updated sharedResult INSIDE LOCK. Update count: %d, Err: %v", len(sharedResult.notifications), sharedResult.err) // <<< ADD THIS LOG - // Unlock mutex after updating - mu.Unlock() - done <- sharedResult - }() - - select { - case result := <-done: + var notifications []*gpb.Notification - return result.notifications, result.err - case <-ctxTimeout.Done(): - if err := stream.CloseSend(); err != nil { - t.Logf("recieveUpdateWithTimeout: Error calling stream.CloseSend() on timeout: %v", err) - } else { - t.Logf("recieveUpdateWithTimeout: Successfully called stream.CloseSend() on timeout.") + for { + select { + case n, ok := <-notifCh: + if !ok { + return notifications, fmt.Errorf("notification channel closed") + } + notifications = append(notifications, n) + case <-ctxTimeout.Done(): + return notifications, ctxTimeout.Err() } - mu.Lock() - lastUpdates := sharedResult.notifications - - t.Logf("Timeout Case: Read sharedResult INSIDE LOCK. Update count: %d, Err: %v", len(lastUpdates), sharedResult.err) // <<< ADD THIS LOG - // Unlock mutex - mu.Unlock() - return lastUpdates, ctxTimeout.Err() // Return timeout error context.DeadlineExceeded } } @@ -712,24 +682,30 @@ func checkSyncResponse(t *testing.T, stream gpb.GNMI_SubscribeClient) { } } -// verifyNotificationPaths is a function that verifies the paths in the notifications. +// verifyNotificationPaths verifies the paths in the notifications, accounting for parent-level grouping. func verifyNotificationPaths(t *testing.T, notifications []*gpb.Notification, expectedUpdatePaths []string) { t.Helper() var paths []string for _, notification := range notifications { - path := "" - for _, elem := range notification.GetPrefix().GetElem() { - path += "/" + elem.GetName() + prefixPath := "" + if notification.GetPrefix() != nil { + for _, elem := range notification.GetPrefix().GetElem() { + prefixPath += "/" + elem.GetName() + } } - t.Logf("Notification path: %v", path) - if len(notification.GetUpdate()) > 0 { - path += "/" + notification.GetUpdate()[0].GetPath().GetElem()[0].GetName() + "/" + notification.GetUpdate()[0].GetPath().GetElem()[1].GetName() - t.Logf("Update path: %v", path) + + for _, update := range notification.GetUpdate() { + fullPath := prefixPath + if update.GetPath() != nil { + for _, elem := range update.GetPath().GetElem() { + fullPath += "/" + elem.GetName() + } + } + paths = append(paths, fullPath) } - paths = append(paths, path) } - t.Logf("Paths in notification: %v", paths) + type pathResult struct { path string found bool @@ -738,49 +714,111 @@ func verifyNotificationPaths(t *testing.T, notifications []*gpb.Notification, ex for _, expectedPath := range expectedUpdatePaths { pathResults = append(pathResults, pathResult{path: expectedPath, found: false}) } - for _, pathResult := range pathResults { - for _, path := range paths { - if path == pathResult.path { - t.Logf("Expected path %v found in received updates: %v", pathResult.path, paths) - pathResult.found = true + + for i, pathResult := range pathResults { + for _, p := range paths { + // Check for exact match OR if the device grouped the update at a parent node + // Example: expected "/.../state/admin-status" matches received "/.../state" + if p == pathResult.path || strings.HasPrefix(pathResult.path, p+"/") { + t.Logf("Expected path %v found (Matched against received path: %v)", pathResult.path, p) + pathResults[i].found = true break } } + } + + // Deduplicate received paths for cleaner error logging + uniquePaths := make(map[string]bool) + for _, p := range paths { + uniquePaths[p] = true + } + var sample []string + for p := range uniquePaths { + sample = append(sample, p) + } + + for _, pathResult := range pathResults { if !pathResult.found { - t.Errorf(" Error: Expected path %v not found in received updates: %v", pathResult.path, paths) + t.Errorf("Error: Expected path %v not found in received updates. Extracted unique paths: %v", pathResult.path, sample) } } } -// verifyUpdatevalue is a function that verifies the value of the leaf in the notifications. -func verifyUpdateValue(t testing.TB, notifications []*gpb.Notification, expectedUpdateValue string) { +// verifyUpdateValue verifies the value of the leaf, extracting it from JSON objects if grouped. +func verifyUpdateValue(t testing.TB, notifications []*gpb.Notification, dut *ondatra.DUTDevice, actionState string) { t.Helper() + var expectedUpdateValue []string + if actionState == "UP" { + expectedUpdateValue = []string{"UP"} + } else if actionState == "DOWN" { + switch dut.Vendor() { + case ondatra.ARISTA, ondatra.JUNIPER: + expectedUpdateValue = []string{"LOWER_LAYER_DOWN", "DOWN"} + default: + expectedUpdateValue = []string{"DOWN"} + } + } + for _, notification := range notifications { for _, update := range notification.GetUpdate() { - for _, elem := range update.GetPath().GetElem() { - if elem.GetName() == "admin-status" { - var updateValueString string - err := json.Unmarshal(update.GetVal().GetJsonVal(), &updateValueString) - if err != nil { - t.Errorf("Error marshalling json value: %v", err) - } else if updateValueString != expectedUpdateValue { - t.Errorf("Error: SubscribedValue stringVal in Update message for admin-status: %v, want: %v", updateValueString, expectedUpdateValue) + var allElems []*gpb.PathElem + if notification.GetPrefix() != nil { + allElems = append(allElems, notification.GetPrefix().GetElem()...) + } + if update.GetPath() != nil { + allElems = append(allElems, update.GetPath().GetElem()...) + } + + leafName := "" + if len(allElems) > 0 { + leafName = allElems[len(allElems)-1].GetName() + } + + jsonBytes := update.GetVal().GetJsonVal() + if jsonBytes == nil { + continue + } + + var val interface{} + if err := json.Unmarshal(jsonBytes, &val); err != nil { + continue + } + + // Case 1: The update is bundled in a parent JSON object + if m, ok := val.(map[string]interface{}); ok { + if adminStatus, exists := m["admin-status"]; exists { + adminStr := fmt.Sprintf("%v", adminStatus) + if slices.Contains(expectedUpdateValue, adminStr) { + t.Logf("Found grouped admin-status: %v", adminStr) } else { - t.Logf("SubscribedValue stringVal in Update message for admin-status: %v, want: %v", updateValueString, expectedUpdateValue) + t.Errorf("Error: Grouped admin-status: %v, want: %v", adminStr, expectedUpdateValue) } } - if elem.GetName() == "oper-status" { - var updateValueString string - err := json.Unmarshal(update.GetVal().GetJsonVal(), &updateValueString) - if err != nil { - t.Errorf("Error marshalling json value: %v", err) - } else if strings.Contains(updateValueString, expectedUpdateValue) { - t.Logf("SubscribedValue stringVal in Update message for oper-status: %v, want: %v", updateValueString, expectedUpdateValue) + if operStatus, exists := m["oper-status"]; exists { + operStr := fmt.Sprintf("%v", operStatus) + if slices.Contains(expectedUpdateValue, operStr) { + t.Logf("Found grouped oper-status: %v", operStr) } else { - t.Errorf("Error: SubscribedValue stringVal in Update message for oper-status: %v, want: %v", updateValueString, expectedUpdateValue) + t.Errorf("Error: Grouped oper-status: %v, want: %v", operStr, expectedUpdateValue) } } } + + // Case 2: The update was sent directly to the exact leaf path + if leafName == "admin-status" || leafName == "oper-status" { + valStr := "" + if s, ok := val.(string); ok { + valStr = s + } else { + valStr = fmt.Sprintf("%v", val) + } + + if slices.Contains(expectedUpdateValue, valStr) { + t.Logf("Found direct %s: %v", leafName, valStr) + } else { + t.Errorf("Error: Direct %s: %v, want: %v", leafName, valStr, expectedUpdateValue) + } + } } } } @@ -798,63 +836,69 @@ func lineCardUp(t testing.TB, dut *ondatra.DUTDevice, fpc string) { t.Logf("Component %s, oper-status after %f minutes: %v", fpc, time.Since(start).Minutes(), oper) } -func findFpcFromPort(t testing.TB, portArray []string, dut *ondatra.DUTDevice) ([]string, error) { +// findLinecardFromPort dynamically finds the parent line card component for a given list of ports +// by traversing the OpenConfig component hierarchy via gNMI. +func findLinecardFromPort(t testing.TB, portArray []string, dut *ondatra.DUTDevice) ([]string, error) { t.Helper() - var fpcArray []string + var linecardArray []string for _, portName := range portArray { - if dut.Vendor() == ondatra.ARISTA { - re := regexp.MustCompile(`^[A-Za-z]+(\d+)/(\d+)/\d+(?::\d+)?$`) - match := re.FindStringSubmatch(portName) - if match == nil { - return nil, fmt.Errorf("invalid port name format: %s", portName) - } - fpcArray = append(fpcArray, fmt.Sprintf("FPC%s", match[1])) + // 1. Determine the starting component for the port. + compName := portName + if hwPort, present := gnmi.Lookup(t, dut, gnmi.OC().Interface(portName).HardwarePort().State()).Val(); present { + compName = hwPort } - if dut.Vendor() == ondatra.CISCO { - re := regexp.MustCompile(`^HundredGigE+(\d+)/(\d+)/\d+/\d+(?::\d+)?$`) - match := re.FindStringSubmatch(portName) - if match == nil { - return nil, fmt.Errorf("invalid port name format: %s", portName) + + parentName := compName + found := false + + // 2. Traverse the component tree upwards until a LINECARD is found. + for parentName != "" { + comp, present := gnmi.Lookup(t, dut, gnmi.OC().Component(parentName).State()).Val() + if !present { + break } - fpcArray = append(fpcArray, fmt.Sprintf("FPC%s", match[2])) - } - if dut.Vendor() == ondatra.JUNIPER { - re := regexp.MustCompile(`^[a-z]+-(\d+)/\d+/\d+(?::\d+)?$`) - match := re.FindStringSubmatch(portName) - if match == nil { - return nil, fmt.Errorf("invalid port name format: %s", portName) + + // Check if the current component is a Linecard + if comp.GetType() == oc.PlatformTypes_OPENCONFIG_HARDWARE_COMPONENT_LINECARD { + linecardArray = append(linecardArray, parentName) + found = true + break } - fpcArray = append(fpcArray, fmt.Sprintf("FPC%s", match[1])) - t.Logf("fpcArray: %v", fpcArray) - t.Logf("portName: %v", portName) - t.Logf("match: %v", match[1]) + + // Move up to the parent component + parentName = comp.GetParent() } - if dut.Vendor() == ondatra.NOKIA { - re := regexp.MustCompile(`ethernet-(\d+)/\d+(?::(\d+))?$`) - match := re.FindStringSubmatch(portName) - if match == nil { - return nil, fmt.Errorf("invalid port name format: %s", portName) - } - fpcArray = append(fpcArray, fmt.Sprintf("FPC%s", match[1])) + if !found { + return nil, fmt.Errorf("could not find parent line card component for port: %s", portName) } } - return fpcArray, nil + return linecardArray, nil } + func verifyNotificationPathsForPortUpdates(t *testing.T, notifications []*gpb.Notification, selectedFpc string) { t.Helper() for _, notification := range notifications { - path := "" + // Check Prefix for _, elem := range notification.GetPrefix().GetElem() { - path += "/" + elem.GetName() for key, value := range elem.GetKey() { if value == selectedFpc { - t.Logf("Notification path: %v has update for part: %v", path, selectedFpc) - t.Logf("Key: %v, Value: %v", key, value) + t.Logf("Notification prefix path has update for part: %v. Key: %v, Value: %v", selectedFpc, key, value) return } } } + // Check Update paths + for _, update := range notification.GetUpdate() { + for _, elem := range update.GetPath().GetElem() { + for key, value := range elem.GetKey() { + if value == selectedFpc { + t.Logf("Notification update path has update for part: %v. Key: %v, Value: %v", selectedFpc, key, value) + return + } + } + } + } } t.Errorf("Notification is missing update for part: %v", selectedFpc) } @@ -864,6 +908,7 @@ func linecardDown(t testing.TB, dut *ondatra.DUTDevice, fpc string, lcs []string // don't consider the empty linecard slots. if len(lcs) > *args.NumLinecards { for _, lc := range lcs { + t.Logf("lc: %v", lc) empty, ok := gnmi.Lookup(t, dut, gnmi.OC().Component(lc).Empty().State()).Val() if !ok || (ok && !empty) { validCards = append(validCards, lc) @@ -872,21 +917,20 @@ func linecardDown(t testing.TB, dut *ondatra.DUTDevice, fpc string, lcs []string } else { validCards = lcs } - if *args.NumLinecards >= 0 && len(validCards) != *args.NumLinecards { - t.Errorf("Incorrect number of linecards: got %v, want exactly %v (specified by flag)", len(validCards), *args.NumLinecards) + if *args.NumLinecards >= 0 && len(validCards) < *args.NumLinecards { + t.Errorf("Incorrect number of linecards: got %v, want at least %v (specified by flag)", len(validCards), *args.NumLinecards) } if got := len(validCards); got == 0 { t.Skipf("Not enough linecards for the test on %v: got %v, want > 0", dut.Model(), got) } - if got := gnmi.Lookup(t, dut, gnmi.OC().Component(fpc).Removable().State()).IsPresent(); !got { - t.Fatalf("Detected non-removable line card: %v", fpc) - } - if got := gnmi.Get(t, dut, gnmi.OC().Component(fpc).Removable().State()); got { - t.Logf("Found removable line card: %v", fpc) - } c := gnmi.OC().Component(fpc) + if deviations.PowerDisableEnableLeafRefValidation(dut) { + gnmi.Update(t, dut, c.Config(), &oc.Component{ + Name: ygot.String(fpc), + }) + } config := c.Linecard().PowerAdminState().Config() t.Logf("Starting %s POWER_DISABLE", fpc) gnmi.Replace(t, dut, config, oc.Platform_ComponentPowerType_POWER_DISABLED) @@ -894,6 +938,7 @@ func linecardDown(t testing.TB, dut *ondatra.DUTDevice, fpc string, lcs []string t.Logf("Wait for 15 seconds to allow the sub component's power down process to complete") time.Sleep(15 * time.Second) } + func uniqueString(input []string) []string { seen := make(map[string]struct{}) var result []string @@ -907,16 +952,24 @@ func uniqueString(input []string) []string { return result } -func selectFpc(t testing.TB, fpcList []string) string { +func selectFpc(t testing.TB, fpcList []string, dut *ondatra.DUTDevice) string { t.Helper() var selectedFpc string uniqueFpcList := uniqueString(fpcList) - if len(uniqueFpcList) > 1 { + + if len(uniqueFpcList) > 0 { sort.Strings(uniqueFpcList) selectedFpc = uniqueFpcList[len(uniqueFpcList)-1] + + // Check if the component is physically removable. If fixed, skip gracefully. + removable, ok := gnmi.Lookup(t, dut, gnmi.OC().Component(selectedFpc).Removable().State()).Val() + if ok && !removable { + t.Skipf("Skipping test: %s is a non-removable component on fixed chassis (%s)", selectedFpc, dut.Model()) + } } else { - t.Fatalf("No FPC found for the test") + t.Fatalf("No Line card found for the test") } + return selectedFpc } @@ -938,14 +991,19 @@ func TestBreakoutSubscription(t *testing.T) { tc.configureDUT(t) ctx := context.Background() t.Run("verifyDUT", tc.verifyDUT) + stream := newSubscribeRequest(ctx, t, dut) checkSyncResponse(t, stream) + notifCh := startStreamReader(t, stream) + t.Run("PLT-1.2.1 Check response after a triggered interface state change", func(t *testing.T) { + drainChannel(notifCh) + setDUTInterfaceWithState(t, dut, tc.dutPorts[0], false) setDUTInterfaceWithState(t, dut, tc.dutPorts[2], false) - time.Sleep(2 * time.Second) - updateTimeout := 10 * time.Second - receivedNotifications, err := recieveUpdateWithTimeout(ctx, t, dut, stream, subscribedUpdates, updateTimeout) + // time.Sleep(2 * time.Second) + updateTimeout := 30 * time.Second + receivedNotifications, err := recieveUpdateWithTimeout(ctx, t, notifCh, updateTimeout) if err != nil { t.Logf("Received error(possibly end of updates): %v", err) } @@ -955,30 +1013,39 @@ func TestBreakoutSubscription(t *testing.T) { "/interfaces/interface/state/oper-status", } verifyNotificationPaths(t, receivedNotifications, expectedUpdatePaths) - verifyUpdateValue(t, receivedNotifications, "DOWN") + verifyUpdateValue(t, receivedNotifications, dut, "DOWN") + + drainChannel(notifCh) + setDUTInterfaceWithState(t, dut, tc.dutPorts[0], true) setDUTInterfaceWithState(t, dut, tc.dutPorts[2], true) - receivedNotifications, _ = recieveUpdateWithTimeout(ctx, t, dut, stream, subscribedUpdates, updateTimeout) - verifyUpdateValue(t, receivedNotifications, "UP") + receivedNotifications, _ = recieveUpdateWithTimeout(ctx, t, notifCh, updateTimeout) + verifyUpdateValue(t, receivedNotifications, dut, "UP") }) + // Check response after a triggered interface flap t.Run("PLT-1.2.2 Check response after a triggered interface flap", func(t *testing.T) { counter := 5 var receivedNotifications []*gpb.Notification var err error for i := 0; i < counter; i++ { + drainChannel(notifCh) + setDUTInterfaceWithState(t, dut, tc.dutPorts[0], false) setDUTInterfaceWithState(t, dut, tc.dutPorts[2], false) - updateTimeout := 10 * time.Second - receivedNotifications, err = recieveUpdateWithTimeout(ctx, t, dut, stream, subscribedUpdates, updateTimeout) + updateTimeout := 45 * time.Second + receivedNotifications, err = recieveUpdateWithTimeout(ctx, t, notifCh, updateTimeout) if err != nil { t.Logf("Received error(possibly end of updates): %v", err) } - verifyUpdateValue(t, receivedNotifications, "DOWN") + verifyUpdateValue(t, receivedNotifications, dut, "DOWN") + + drainChannel(notifCh) + setDUTInterfaceWithState(t, dut, tc.dutPorts[0], true) setDUTInterfaceWithState(t, dut, tc.dutPorts[2], true) - receivedNotifications, _ = recieveUpdateWithTimeout(ctx, t, dut, stream, subscribedUpdates, updateTimeout) - verifyUpdateValue(t, receivedNotifications, "UP") + receivedNotifications, _ = recieveUpdateWithTimeout(ctx, t, notifCh, updateTimeout) + verifyUpdateValue(t, receivedNotifications, dut, "UP") } expectedUpdatePaths := []string{ "/interfaces/interface/state/admin-status", @@ -987,11 +1054,12 @@ func TestBreakoutSubscription(t *testing.T) { } verifyNotificationPaths(t, receivedNotifications, expectedUpdatePaths) }) + // Check response after a triggered LC reboot t.Run("PLT-1.2.3 Check response after a triggered LC reboot", func(t *testing.T) { LinecardReboot(t, dut) updateTimeout := 300 * time.Second - receivedNotifications, err := recieveUpdateWithTimeout(ctx, t, dut, stream, subscribedUpdates, updateTimeout) + receivedNotifications, err := recieveUpdateWithTimeout(ctx, t, notifCh, updateTimeout) if err != nil { t.Logf("Received error:(possibly end of updates) %v", err) t.Logf("Received notifications in main function: %v", receivedNotifications) @@ -1003,39 +1071,45 @@ func TestBreakoutSubscription(t *testing.T) { } verifyNotificationPaths(t, receivedNotifications, expectedUpdatePaths) }) + defer stream.CloseSend() - defer ctx.Done() + // Check response after a triggered chassis reboot t.Run("PLT-1.2.4 Check response after a triggered chassis reboot", func(t *testing.T) { chassisReboot(t, dut) - stream := newSubscribeRequest(ctx, t, dut) - checkSyncResponse(t, stream) - defer stream.CloseSend() - defer ctx.Done() + streamReboot := newSubscribeRequest(ctx, t, dut) + checkSyncResponse(t, streamReboot) + defer streamReboot.CloseSend() }) + // Check response after a triggered breakout module reboot t.Run("PLT-1.2.5 Check response after a triggered breakout module reboot", func(t *testing.T) { + lcs := components.FindComponentsByType(t, dut, oc.PlatformTypes_OPENCONFIG_HARDWARE_COMPONENT_LINECARD) + if len(lcs) == 0 { + t.Skipf("Skipping PLT-1.2.5: No LINECARD components found. Device %s is a Fixed Form Factor (FFF) chassis.", dut.Model()) + } intfsOperStatusUPBeforeReboot := helpers.FetchOperStatusUPIntfs(t, dut, *args.CheckInterfacesInBinding) t.Logf("intfsOperStatusUPBeforeReboot: %v", intfsOperStatusUPBeforeReboot) - fpcList, err := findFpcFromPort(t, intfsOperStatusUPBeforeReboot, dut) + lcList, err := findLinecardFromPort(t, intfsOperStatusUPBeforeReboot, dut) if err != nil { - t.Fatalf("Failed to find FPC from port: %v", err) + t.Fatalf("Failed to find Line Card from port: %v", err) } - t.Logf("fpcList: %v", fpcList) - selectedFpc := selectFpc(t, fpcList) - t.Logf("selectedFpc: %v", selectedFpc) - lcs := components.FindComponentsByType(t, dut, oc.PlatformTypes_OPENCONFIG_HARDWARE_COMPONENT_LINECARD) - linecardDown(t, dut, selectedFpc, lcs) - stream := newSubscribeRequest(ctx, t, dut) - checkSyncResponse(t, stream) - lineCardUp(t, dut, selectedFpc) + t.Logf("LinecardList: %v", lcList) + selectedLC := selectFpc(t, lcList, dut) + t.Logf("selectedFpc: %v", selectedLC) + linecardDown(t, dut, selectedLC, lcs) + + streamLC := newSubscribeRequest(ctx, t, dut) + checkSyncResponse(t, streamLC) + notifChLC := startStreamReader(t, streamLC) + + lineCardUp(t, dut, selectedLC) updateTimeout := 10 * time.Minute - receivedNotifications, err := recieveUpdateWithTimeout(ctx, t, dut, stream, subscribedUpdates, updateTimeout) + receivedNotifications, err := recieveUpdateWithTimeout(ctx, t, notifChLC, updateTimeout) if err != nil { t.Logf("Received error(possibly end of updates): %v", err) } - verifyNotificationPathsForPortUpdates(t, receivedNotifications, selectedFpc) - defer stream.CloseSend() - defer ctx.Done() + verifyNotificationPathsForPortUpdates(t, receivedNotifications, selectedLC) + defer streamLC.CloseSend() }) }