Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions server/common/module_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ var log = logging.MustGetLogger("server_common")

const QUEUE_SIZE = 1 << 16

type IngesterStatus uint8

const (
IngesterOk IngesterStatus = iota
IngesterAbnormal
IngesterInitializing
)

type ControllerIngesterShared struct {
ResourceEventQueue *queue.OverwriteQueue
TraceTreeQueue *queue.OverwriteQueue
Expand Down Expand Up @@ -83,15 +91,16 @@ func ExportersEnabled(configPath string) bool {
return false
}

type OrgHanderInterface interface {
type OrgHandlerInterface interface {
DropOrg(orgId uint16) error
UpdateNativeTag(nativetag.NativeTagOP, uint16, *nativetag.NativeTag) error
IsHealthy() bool
}

var ingesterOrgHanders []OrgHanderInterface
var ingesterOrgHandlers []OrgHandlerInterface

func SetOrgHandler(orgHandler OrgHanderInterface) {
ingesterOrgHanders = append(ingesterOrgHanders, orgHandler)
func SetOrgHandler(orgHandler OrgHandlerInterface) {
ingesterOrgHandlers = append(ingesterOrgHandlers, orgHandler)
}

/*
Expand All @@ -115,11 +124,11 @@ func SetOrgHandler(orgHandler OrgHanderInterface) {
*/
func DropOrg(orgId uint16) error {
log.Info("drop org id:", orgId)
if ingesterOrgHanders == nil {
return fmt.Errorf("ingesterOrgHanders is nil, drop org id %d failed", orgId)
if ingesterOrgHandlers == nil {
return fmt.Errorf("ingesterOrgHandlers is nil, drop org id %d failed", orgId)
}
for _, ingesterOrgHander := range ingesterOrgHanders {
err := ingesterOrgHander.DropOrg(orgId)
for _, orgHandler := range ingesterOrgHandlers {
err := orgHandler.DropOrg(orgId)
if err != nil {
return err
}
Expand All @@ -142,13 +151,13 @@ func PushNativeTags(orgId uint16, nativeTags []nativetag.NativeTag) {
// When adding or removing native_tag, you need to call the interface
func UpdateNativeTag(op nativetag.NativeTagOP, orgId uint16, nativeTag *nativetag.NativeTag) error {
log.Infof("orgId %d %s native tag: %+v", orgId, op, nativeTag)
if ingesterOrgHanders == nil {
if ingesterOrgHandlers == nil {
err := fmt.Errorf("ingester is not ready, update native tag failed")
log.Error(err)
return err
}
for _, ingesterOrgHander := range ingesterOrgHanders {
err := ingesterOrgHander.UpdateNativeTag(op, orgId, nativeTag)
for _, orgHandler := range ingesterOrgHandlers {
err := orgHandler.UpdateNativeTag(op, orgId, nativeTag)
if err != nil {
log.Error(err)
return err
Expand All @@ -157,3 +166,18 @@ func UpdateNativeTag(op nativetag.NativeTagOP, orgId uint16, nativeTag *nativeta
nativetag.UpdateNativeTag(op, orgId, nativeTag)
return nil
}

func CheckIngesterStatus() IngesterStatus {
if ingesterOrgHandlers == nil {
log.Infof("ingester is initializing")
return IngesterInitializing
}
for _, orgHandler := range ingesterOrgHandlers {
// Treat the ingester as abnormal only after a handler has observed write failures without any successful writes.
if !orgHandler.IsHealthy() {
log.Errorf("ingester is abnormal")
return IngesterAbnormal
}
}
return IngesterOk
}
4 changes: 4 additions & 0 deletions server/ingester/ingester/org_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (o *OrgHandler) DropOrg(orgId uint16) error {
return o.dropOrgDatabase(orgId)
}

func (o *OrgHandler) IsHealthy() bool {
return true
}

// FIXME: After clearing the Org data, if the same Org ID is created again later, data writing will fail. You can restart deepflow-server to solve it.
func (o *OrgHandler) dropOrgDatabase(orgId uint16) error {
if ckdb.IsDefaultOrgID(orgId) {
Expand Down
18 changes: 18 additions & 0 deletions server/ingester/pkg/ckwriter/ckwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,24 @@ func (m *CKWriterManager) EndpointsChange(addrs []string) {
ckwriterManager.Unlock()
}

func (m *CKWriterManager) IsHealthy() bool {
ckwriterManager.Lock()
defer ckwriterManager.Unlock()
hasWriteFailure := false
for _, writer := range m.ckwriters {
for _, queueCtx := range writer.queueContexts {
// Consider the writer healthy once any queue succeeds; before that, only an observed failure makes it abnormal.
if queueCtx.counter.WriteSuccessCount > 0 {
return true
}
if queueCtx.counter.WriteFailedCount > 0 {
hasWriteFailure = true
}
}
}
return !hasWriteFailure
}

type CKWriter struct {
addrs []string
user string
Expand Down
Loading