Skip to content

Data corruption of Profiles data when batching in the exporter queue #15084

@mdonkers

Description

@mdonkers

Component(s)

pdata/pprofile

What happened?

Describe the bug
When using batching in the exporter queue, e.g. by adding config such as;

sending_queue:
  batch:
    flush_timeout: 1s
    min_size: 10000

this causes Profile messages to get merged / split. Due to how the Dictionary is updated, this results in a corruption bug for Resources;
This only triggers when data gets marshalled / unmarshalled. Because dictionary indices don't get updated in the Resource attributes themselves, whenever the data gets unmarshalled again based on the indices the wrong values are retrieved from the dictionary

Additionally, the way Dictionary merging is implemented is really costly, due to linear scans in the SetXXX functions during batch merging:

The exporter queue batcher (partitionBatcher) merges multiple profile requests using MergeSplit before flushing. Each merge triggers:

asyncQueue.Start
  → partitionBatcher.consumeInternal
    → profilesRequest.MergeSplit / mergeTo
      → Profiles.MergeTo
        → Profiles.switchDictionary
          → Stack.switchDictionary (for each stack)
            → SetLocation           ← O(n) linear scan per location
              → LocationSlice.All() + Location.Equal

For example SetLocation (in go.opentelemetry.io/collector/pdata/pprofile v0.149.0, generated_location.go) does a linear scan through the existing LocationSlice to find duplicates when rebuilding the merged dictionary. For a batch of N profiles with L locations each, this is O(N × L²). The same pattern applies to SetFunction, SetString, SetStack, and SetMapping.

Steps to reproduce
Unit test to show the above corruption bug:

func TestProfilesMergeTo_ResourceAttributeCorruption_MarshalUnmarshal(t *testing.T) {
	marshaler := &pprofile.ProtoMarshaler{}
	unmarshaler := &pprofile.ProtoUnmarshaler{}

	// Create two profiles with DIFFERENT string tables so that after merge the
	// destination string table has different content at the source's original indices.
	//
	// Profile A (destination): string table will start with its own entries.
	// Profile B (source): has a different string table layout. After merge, B's
	// resource attribute KeyStrindex values become stale pointers into the merged table.
	profileA := buildProfileWithDistinctResourceAttrs("service-A", map[string]string{
		"service.name": "service-A",
		"deployment":   "prod",
	})
	profileB := buildProfileWithDistinctResourceAttrs("service-B", map[string]string{
		"host.name":    "host-1",
		"cluster.name": "us-east",
	})

	// Record original resource attributes before any marshaling.
	origAttrsA := extractResourceAttrStrings(profileA)
	origAttrsB := extractResourceAttrStrings(profileB)
	require.NotEmpty(t, origAttrsA)
	require.NotEmpty(t, origAttrsB)

	// Step 1: Marshal → Unmarshal both profiles (simulates Kafka write/read).
	// After unmarshal, resolveProfilesReferences sets kv.Key AND keeps kv.KeyStrindex.
	bytesA, err := marshaler.MarshalProfiles(profileA)
	require.NoError(t, err)
	bytesB, err := marshaler.MarshalProfiles(profileB)
	require.NoError(t, err)

	dstProfiles, err := unmarshaler.UnmarshalProfiles(bytesA)
	require.NoError(t, err)
	srcProfiles, err := unmarshaler.UnmarshalProfiles(bytesB)
	require.NoError(t, err)

	// Verify in-memory attributes are correct after unmarshal.
	assert.Equal(t, origAttrsA, extractResourceAttrStrings(dstProfiles),
		"destination attributes should be correct after unmarshal")
	assert.Equal(t, origAttrsB, extractResourceAttrStrings(srcProfiles),
		"source attributes should be correct after unmarshal")

	// Step 2: MergeTo — merges src into dst. switchDictionary remaps dictionary
	// tables but NOT resource attribute KeyStrindex values.
	err = srcProfiles.MergeTo(dstProfiles)
	require.NoError(t, err)

	// In-memory attributes should still be correct (Map.Get uses kv.Key field).
	mergedAttrs := extractAllResourceAttrStrings(dstProfiles)
	require.Len(t, mergedAttrs, 2, "merged profiles should have 2 resource profiles")
	// First resource profile is from the original destination (profileA).
	assert.Equal(t, origAttrsA, mergedAttrs[0],
		"destination resource attrs should survive merge in-memory")
	// Second resource profile is from the merged source (profileB).
	assert.Equal(t, origAttrsB, mergedAttrs[1],
		"source resource attrs should survive merge in-memory")

	// Step 3: Re-marshal the merged result (simulates writing back to Kafka).
	// convertProfilesToReferences SKIPS keys where KeyStrindex != 0, so stale
	// indices are marshaled as-is.
	mergedBytes, err := marshaler.MarshalProfiles(dstProfiles)
	require.NoError(t, err)

	// Step 4: Re-unmarshal (simulates consumer reading from Kafka).
	// resolveProfilesReferences reads stale key_strindex → gets wrong key.
	finalProfiles, err := unmarshaler.UnmarshalProfiles(mergedBytes)
	require.NoError(t, err)

	// ASSERTION: After the full round-trip, resource attributes must still match.
	// This is where the corruption manifests — stale KeyStrindex values cause
	// keys to resolve to wrong strings from the merged dictionary.
	finalAttrs := extractAllResourceAttrStrings(finalProfiles)
	require.Len(t, finalAttrs, 2, "final profiles should have 2 resource profiles")

	assert.Equal(t, origAttrsA, finalAttrs[0],
		"BUG: destination resource attribute keys/values corrupted after marshal→unmarshal→merge→marshal→unmarshal round-trip")
	assert.Equal(t, origAttrsB, finalAttrs[1],
		"BUG: source resource attribute keys/values corrupted after marshal→unmarshal→merge→marshal→unmarshal round-trip")
}

// buildProfileWithDistinctResourceAttrs creates a Profiles object with the given
// resource attributes and a non-trivial dictionary (so that string table indices
// differ between profiles).
func buildProfileWithDistinctResourceAttrs(serviceName string, attrs map[string]string) pprofile.Profiles {
	profiles := pprofile.NewProfiles()

	rp := profiles.ResourceProfiles().AppendEmpty()
	for k, v := range attrs {
		rp.Resource().Attributes().PutStr(k, v)
	}

	sp := rp.ScopeProfiles().AppendEmpty()
	sp.Scope().SetName("test-profiler")

	profile := sp.Profiles().AppendEmpty()
	profile.SetProfileID([16]byte{0xde, 0xad, 0xbe, 0xef, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12})
	profile.SetTime(pcommon.NewTimestampFromTime(
		pcommon.Timestamp(1000000000).AsTime()))
	profile.SetDurationNano(uint64(1_000_000_000))

	// Build a non-trivial dictionary to ensure string table positions differ
	// between profiles built with different serviceName values.
	dict := profiles.Dictionary()
	st := dict.StringTable()

	st.Append("")                     // 0
	st.Append("cpu")                  // 1
	st.Append("nanoseconds")          // 2
	st.Append(serviceName + "_func1") // 3
	st.Append(serviceName + "_file1") // 4
	st.Append(serviceName + "_func2") // 5
	st.Append(serviceName + "_file2") // 6
	st.Append(serviceName + "_lib")   // 7

	fn1 := dict.FunctionTable().AppendEmpty()
	fn1.SetNameStrindex(3)
	fn1.SetFilenameStrindex(4)

	fn2 := dict.FunctionTable().AppendEmpty()
	fn2.SetNameStrindex(5)
	fn2.SetFilenameStrindex(6)

	m := dict.MappingTable().AppendEmpty()
	m.SetFilenameStrindex(7)

	loc1 := dict.LocationTable().AppendEmpty()
	loc1.SetMappingIndex(0)
	line1 := loc1.Lines().AppendEmpty()
	line1.SetFunctionIndex(0)
	line1.SetLine(10)

	loc2 := dict.LocationTable().AppendEmpty()
	loc2.SetMappingIndex(0)
	line2 := loc2.Lines().AppendEmpty()
	line2.SetFunctionIndex(1)
	line2.SetLine(20)

	stack := dict.StackTable().AppendEmpty()
	stack.LocationIndices().Append(0)
	stack.LocationIndices().Append(1)

	periodType := profile.PeriodType()
	periodType.SetTypeStrindex(1)
	periodType.SetUnitStrindex(2)
	profile.SetPeriod(10_000_000)

	sample := profile.Samples().AppendEmpty()
	sample.SetStackIndex(0)

	return profiles
}


// extractResourceAttrStrings returns resource attributes of the first ResourceProfiles
// as a map[string]string.
func extractResourceAttrStrings(profiles pprofile.Profiles) map[string]string {
	if profiles.ResourceProfiles().Len() == 0 {
		return nil
	}
	return attrMapToStrings(profiles.ResourceProfiles().At(0).Resource().Attributes())
}

// extractAllResourceAttrStrings returns resource attributes for each ResourceProfiles
// as a slice of map[string]string.
func extractAllResourceAttrStrings(profiles pprofile.Profiles) []map[string]string {
	result := make([]map[string]string, profiles.ResourceProfiles().Len())
	for i := 0; i < profiles.ResourceProfiles().Len(); i++ {
		result[i] = attrMapToStrings(profiles.ResourceProfiles().At(i).Resource().Attributes())
	}
	return result
}

func attrMapToStrings(m pcommon.Map) map[string]string {
	result := make(map[string]string, m.Len())
	m.Range(func(k string, v pcommon.Value) bool {
		result[k] = v.AsString()
		return true
	})
	return result
}

What did you expect to see?
Data not getting corrupted

What did you see instead?
Lost and incorrect data / references

Collector version

v0.149.0

Environment information

Environment

OS: Debian/Linux
Compiler(if manually compiled): go 1.26.1

OpenTelemetry Collector configuration

Log output

Additional context

No response

Tip

React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions