Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wantlist command #719

Merged
merged 4 commits into from
Feb 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions core/commands/bitswap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package commands

import (
"bytes"
"fmt"
cmds "github.com/jbenet/go-ipfs/commands"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
u "github.com/jbenet/go-ipfs/util"
"io"
)

var BitswapCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "A set of commands to manipulate the bitswap agent",
ShortDescription: ``,
},
Subcommands: map[string]*cmds.Command{
"wantlist": showWantlistCmd,
"stat": bitswapStatCmd,
},
}

var showWantlistCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Show blocks currently on the wantlist",
ShortDescription: `
Print out all blocks currently on the bitswap wantlist for the local peer`,
},
Type: KeyList{},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.Context().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
bs, ok := nd.Exchange.(*bitswap.Bitswap)
if !ok {
res.SetError(u.ErrCast(), cmds.ErrNormal)
return
}

res.SetOutput(&KeyList{bs.GetWantlist()})
},
Marshalers: cmds.MarshalerMap{
cmds.Text: KeyListTextMarshaler,
},
}

var bitswapStatCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "show some diagnostic information on the bitswap agent",
ShortDescription: ``,
},
Type: bitswap.Stat{},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.Context().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

bs, ok := nd.Exchange.(*bitswap.Bitswap)
if !ok {
res.SetError(u.ErrCast(), cmds.ErrNormal)
return
}

st, err := bs.Stat()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

res.SetOutput(st)
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
out, ok := res.Output().(*bitswap.Stat)
if !ok {
return nil, u.ErrCast()
}
buf := new(bytes.Buffer)
fmt.Fprintln(buf, "bitswap status")
fmt.Fprintf(buf, "\tprovides buffer: %d / %d\n", out.ProvideBufLen, bitswap.HasBlockBufferSize)
fmt.Fprintf(buf, "\twantlist [%d keys]\n", len(out.Wantlist))
for _, k := range out.Wantlist {
fmt.Fprintf(buf, "\t\t%s\n", k.B58String())
}
fmt.Fprintf(buf, "\tpartners [%d]\n", len(out.Peers))
for _, p := range out.Peers {
fmt.Fprintf(buf, "\t\t%s\n", p)
}
return buf, nil
},
},
}
5 changes: 3 additions & 2 deletions core/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"strings"

cmds "github.com/jbenet/go-ipfs/commands"
u "github.com/jbenet/go-ipfs/util"
evlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
)

var log = u.Logger("core/commands")
var log = evlog.Logger("core/commands")

type TestOutput struct {
Foo string
Expand Down Expand Up @@ -98,6 +98,7 @@ var rootSubcommands = map[string]*cmds.Command{
"swarm": SwarmCmd,
"update": UpdateCmd,
"version": VersionCmd,
"bitswap": BitswapCmd,
}

func init() {
Expand Down
46 changes: 27 additions & 19 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32

hasBlockBufferSize = 256
HasBlockBufferSize = 256
provideWorkers = 4
)

Expand Down Expand Up @@ -79,7 +79,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
px.Close()
}()

bs := &bitswap{
bs := &Bitswap{
self: p,
blockstore: bstore,
notifications: notif,
Expand All @@ -88,7 +88,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
wantlist: wantlist.NewThreadSafe(),
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *blocks.Block, hasBlockBufferSize),
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
}
network.SetDelegate(bs)

Expand All @@ -97,8 +97,8 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
return bs
}

// bitswap instances implement the bitswap protocol.
type bitswap struct {
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {

// the ID of the peer to act on behalf of
self peer.ID
Expand Down Expand Up @@ -133,7 +133,7 @@ type blockRequest struct {

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {

// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
Expand Down Expand Up @@ -179,7 +179,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
select {
case <-bs.process.Closing():
return nil, errors.New("bitswap is closed")
Expand All @@ -201,7 +201,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.

// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Event(ctx, "hasBlock", blk)
select {
case <-bs.process.Closing():
Expand All @@ -221,7 +221,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return nil
}

func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
set := pset.New()
wg := sync.WaitGroup{}
for peerToQuery := range peers {
Expand All @@ -242,7 +242,7 @@ func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMe
return nil
}

func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
message := bsmsg.New()
message.SetFull(true)
for _, wanted := range bs.wantlist.Entries() {
Expand All @@ -251,7 +251,7 @@ func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID
return bs.sendWantlistMsgToPeers(ctx, message, peers)
}

func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) {
func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -286,7 +286,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
}

// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) {
defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()

Expand Down Expand Up @@ -325,7 +325,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}

// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerConnected(p peer.ID) {
func (bs *Bitswap) PeerConnected(p peer.ID) {
// TODO: add to clientWorker??
peers := make(chan peer.ID, 1)
peers <- p
Expand All @@ -337,11 +337,11 @@ func (bs *bitswap) PeerConnected(p peer.ID) {
}

// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerDisconnected(p peer.ID) {
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.engine.PeerDisconnected(p)
}

func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
if len(bkeys) < 1 {
return
}
Expand All @@ -358,7 +358,7 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
}
}

func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
if len(bkeys) < 1 {
return
}
Expand All @@ -383,22 +383,30 @@ func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
wg.Wait()
}

func (bs *bitswap) ReceiveError(err error) {
func (bs *Bitswap) ReceiveError(err error) {
log.Debugf("Bitswap ReceiveError: %s", err)
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}

// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
func (bs *Bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
defer log.EventBegin(ctx, "sendMessage", p, m).Done()
if err := bs.network.SendMessage(ctx, p, m); err != nil {
return errors.Wrap(err)
}
return bs.engine.MessageSent(p, m)
}

func (bs *bitswap) Close() error {
func (bs *Bitswap) Close() error {
return bs.process.Close()
}

func (bs *Bitswap) GetWantlist() []u.Key {
var out []u.Key
for _, e := range bs.wantlist.Entries() {
out = append(out, e.Key)
}
return out
}
25 changes: 25 additions & 0 deletions exchange/bitswap/stat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package bitswap

import (
u "github.com/jbenet/go-ipfs/util"
"sort"
)

type Stat struct {
ProvideBufLen int
Wantlist []u.Key
Peers []string
}

func (bs *Bitswap) Stat() (*Stat, error) {
st := new(Stat)
st.ProvideBufLen = len(bs.newBlocks)
st.Wantlist = bs.GetWantlist()

for _, p := range bs.engine.Peers() {
st.Peers = append(st.Peers, p.Pretty())
}
sort.Strings(st.Peers)

return st, nil
}
10 changes: 5 additions & 5 deletions exchange/bitswap/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)

func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) {
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) {
bs.clientWorker(ctx)
Expand All @@ -34,7 +34,7 @@ func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) {
}
}

func (bs *bitswap) taskWorker(ctx context.Context) {
func (bs *Bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for {
select {
Expand All @@ -55,7 +55,7 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
}
}

func (bs *bitswap) provideWorker(ctx context.Context) {
func (bs *Bitswap) provideWorker(ctx context.Context) {
for {
select {
case blk, ok := <-bs.newBlocks:
Expand All @@ -75,7 +75,7 @@ func (bs *bitswap) provideWorker(ctx context.Context) {
}

// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
func (bs *Bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")

for {
Expand Down Expand Up @@ -115,7 +115,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
}
}

func (bs *bitswap) rebroadcastWorker(parent context.Context) {
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
defer cancel()

Expand Down