Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Config struct {
Import Import
Version Version

OnDemandPinning OnDemandPinning

Internal Internal // experimental/unstable options

Bitswap Bitswap
Expand Down
1 change: 1 addition & 0 deletions config/experiments.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Experiments struct {
OptimisticProvide bool
OptimisticProvideJobsPoolSize int
GatewayOverLibp2p bool `json:",omitempty"`
OnDemandPinningEnabled bool

GraphsyncEnabled graphsyncEnabled `json:",omitempty"`
AcceleratedDHTClient experimentalAcceleratedDHTClient `json:",omitempty"`
Expand Down
20 changes: 20 additions & 0 deletions config/ondemandpin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package config

import "time"

const (
DefaultOnDemandPinReplicationTarget = 5
DefaultOnDemandPinCheckInterval = 10 * time.Minute
DefaultOnDemandPinUnpinGracePeriod = 24 * time.Hour
)

type OnDemandPinning struct {
// Minimum providers desired in the DHT (excluding self).
ReplicationTarget OptionalInteger

// How often the checker evaluates all registered CIDs.
CheckInterval OptionalDuration

// How long replication must stay above target before unpinning.
UnpinGracePeriod OptionalDuration
}
4 changes: 4 additions & 0 deletions core/commands/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func TestCommands(t *testing.T) {
"/pin/remote/service/add",
"/pin/remote/service/ls",
"/pin/remote/service/rm",
"/pin/ondemand",
"/pin/ondemand/add",
"/pin/ondemand/rm",
"/pin/ondemand/ls",
"/pin/rm",
"/pin/update",
"/pin/verify",
Expand Down
280 changes: 280 additions & 0 deletions core/commands/pin/ondemandpin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package pin

import (
"fmt"
"io"
"time"

cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/config"
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/commands/cmdutils"
"github.com/ipfs/kubo/ondemandpin"
)

const onDemandLiveOptionName = "live"
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ondemand ls command shows the locally stored pin records (CID, pinned status, timestamps). With ---live, it additionally queries the DHT in real-time to report how many providers currently exist

Copy link
Copy Markdown
Author

@ihlec ihlec Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DHT queries are sequential. Doing ipfs pin ondemand ls --live on hundreds of CIDs will take a long time. Still handy for debugging and development.


var onDemandPinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Manage on-demand pins.",
ShortDescription: `
On-demand pins when few DHT providers exist in the routing table; unpins after
replication stays above target for a grace period. Requires config
Experimental.OnDemandPinningEnabled.
`,
},
Subcommands: map[string]*cmds.Command{
"add": addOnDemandPinCmd,
"rm": rmOnDemandPinCmd,
"ls": listOnDemandPinCmd,
},
}

type OnDemandPinOutput struct {
Cid string `json:"Cid"`
}

var addOnDemandPinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Register CIDs for on-demand pinning.",
ShortDescription: `Registers CID(s) for on-demand pinning; checker pins when needed.`,
},
Arguments: []cmds.Argument{
cmds.StringArg("cid", true, true, "CID(s) to register."),
},
Type: OnDemandPinOutput{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
return err
}

cfg, err := n.Repo.Config()
if err != nil {
return err
}
if !cfg.Experimental.OnDemandPinningEnabled {
return fmt.Errorf("on-demand pinning is not enabled; set Experimental.OnDemandPinningEnabled = true in config")
}

store := n.OnDemandPinStore

api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}

for _, arg := range req.Arguments {
p, err := cmdutils.PathOrCidPath(arg)
if err != nil {
return fmt.Errorf("invalid CID or path %q: %w", arg, err)
}

rp, _, err := api.ResolvePath(req.Context, p)
if err != nil {
return fmt.Errorf("resolving %q: %w", arg, err)
}
c := rp.RootCid()

if err := store.Add(req.Context, c); err != nil {
return err
}

if checker := n.OnDemandPinChecker; checker != nil {
checker.Enqueue(c)
}

if err := res.Emit(&OnDemandPinOutput{
Cid: c.String(),
}); err != nil {
return err
}
}
return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *OnDemandPinOutput) error {
fmt.Fprintf(w, "registered %s for on-demand pinning\n", out.Cid)
return nil
}),
},
}

var rmOnDemandPinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Remove CIDs from on-demand pinning.",
ShortDescription: `
Removes CID(s) from the registry. Checker-pinned content is unpinned.

Works when on-demand pinning is disabled, to clear old registrations.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("cid", true, true, "CID(s) to remove."),
},
Type: OnDemandPinOutput{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
return err
}

store := n.OnDemandPinStore

api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}

for _, arg := range req.Arguments {
p, err := cmdutils.PathOrCidPath(arg)
if err != nil {
return fmt.Errorf("invalid CID or path %q: %w", arg, err)
}

rp, _, err := api.ResolvePath(req.Context, p)
if err != nil {
return fmt.Errorf("resolving %q: %w", arg, err)
}
c := rp.RootCid()

isOurs, err := ondemandpin.PinHasName(req.Context, n.Pinning, c, ondemandpin.OnDemandPinName)
if err != nil {
return fmt.Errorf("checking pin state for %s: %w", c, err)
}
if isOurs {
if err := api.Pin().Rm(req.Context, rp); err != nil {
return fmt.Errorf("unpinning %s: %w", c, err)
}
}

if err := store.Remove(req.Context, c); err != nil {
return err
}

if err := res.Emit(&OnDemandPinOutput{
Cid: c.String(),
}); err != nil {
return err
}
}
return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *OnDemandPinOutput) error {
fmt.Fprintf(w, "removed %s from on-demand pinning\n", out.Cid)
return nil
}),
},
}

type OnDemandLsOutput struct {
Cid string `json:"Cid"`
PinnedByUs bool `json:"PinnedByUs"`
Providers *int `json:"Providers,omitempty"`
LastAboveTarget string `json:"LastAboveTarget,omitempty"`
CreatedAt string `json:"CreatedAt"`
}

var listOnDemandPinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List on-demand pins.",
ShortDescription: `
Lists CIDs registered for on-demand pinning with their current state.
Use --live to include real-time provider counts from the DHT.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("cid", false, true, "Optional CID(s) to filter."),
},
Options: []cmds.Option{
cmds.BoolOption(onDemandLiveOptionName, "l", "Perform live provider lookup."),
},
Type: OnDemandLsOutput{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
return err
}

store := n.OnDemandPinStore

live, _ := req.Options[onDemandLiveOptionName].(bool)

var globalTarget int
if live {
cfg, err := n.Repo.Config()
if err != nil {
return err
}
globalTarget = int(cfg.OnDemandPinning.ReplicationTarget.WithDefault(config.DefaultOnDemandPinReplicationTarget))
}

var records []ondemandpin.Record
if len(req.Arguments) > 0 {
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}
for _, arg := range req.Arguments {
p, err := cmdutils.PathOrCidPath(arg)
if err != nil {
return fmt.Errorf("invalid CID or path %q: %w", arg, err)
}
rp, _, err := api.ResolvePath(req.Context, p)
if err != nil {
return fmt.Errorf("resolving %q: %w", arg, err)
}
rec, err := store.Get(req.Context, rp.RootCid())
if err != nil {
return err
}
records = append(records, *rec)
}
} else {
records, err = store.List(req.Context)
if err != nil {
return err
}
}

for _, rec := range records {
out := OnDemandLsOutput{
Cid: rec.Cid.String(),
PinnedByUs: rec.PinnedByUs,
CreatedAt: rec.CreatedAt.Format(time.RFC3339),
}
if !rec.LastAboveTarget.IsZero() {
out.LastAboveTarget = rec.LastAboveTarget.Format(time.RFC3339)
}

if live && n.Routing != nil {
count := ondemandpin.CountProviders(req.Context, n.Routing, n.Identity, rec.Cid, globalTarget)
out.Providers = &count
}

if err := res.Emit(&out); err != nil {
return err
}
}
return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *OnDemandLsOutput) error {
pinState := "not-pinned"
if out.PinnedByUs {
pinState = "pinned"
}
fmt.Fprintf(w, "%s", out.Cid)
if out.Providers != nil {
fmt.Fprintf(w, " providers=%d", *out.Providers)
}
fmt.Fprintf(w, " %s created=%s", pinState, out.CreatedAt)
if out.LastAboveTarget != "" {
fmt.Fprintf(w, " above-target-since=%s", out.LastAboveTarget)
}
fmt.Fprintln(w)
return nil
}),
},
}
2 changes: 2 additions & 0 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var PinCmd = &cmds.Command{
"verify": verifyPinCmd,
"update": updatePinCmd,
"remote": remotePinCmd,

"ondemand": onDemandPinCmd,
},
}

Expand Down
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/ipfs/kubo/core/node"
"github.com/ipfs/kubo/core/node/libp2p"
"github.com/ipfs/kubo/fuse/mount"
"github.com/ipfs/kubo/ondemandpin"
"github.com/ipfs/kubo/p2p"
"github.com/ipfs/kubo/repo"
irouting "github.com/ipfs/kubo/routing"
Expand All @@ -76,6 +77,9 @@ type IpfsNode struct {
PrivateKey ic.PrivKey `optional:"true"` // the local node's private Key
PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network

OnDemandPinStore *ondemandpin.Store
OnDemandPinChecker *ondemandpin.Checker `optional:"true"`

// Services
Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Expand Down
5 changes: 5 additions & 0 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
// Disabling is controlled by Provide.Enabled=false or setting Interval to 0.
isProviderEnabled := cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) && cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) != 0

isOnDemandPinEnabled := cfg.Experimental.OnDemandPinningEnabled

return fx.Options(
fx.Provide(BitswapOptions(cfg)),
fx.Provide(Bitswap(isBitswapServerEnabled, isBitswapLibp2pEnabled, isHTTPRetrievalEnabled)),
Expand All @@ -365,6 +367,8 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part

LibP2P(bcfg, cfg, userResourceOverrides),
OnlineProviders(isProviderEnabled, cfg),

maybeProvide(OnDemandPinChecker(cfg.OnDemandPinning), isOnDemandPinEnabled),
)
}

Expand Down Expand Up @@ -458,6 +462,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
fx.Provide(BlockService(cfg)),
fx.Provide(Pinning(providerStrategy)),
fx.Provide(Files(providerStrategy)),
fx.Provide(OnDemandPinStore),
Core,
)
}
Loading
Loading