Data Transfer

Data Transfer in Filecoin

The Data Transfer Protocol is a protocol for transferring all or part of a Piece across the network when a deal is made. The overall goal for the data transfer module is for it to be an abstraction of the underlying transport medium over which data is transferred between different parties in the Filecoin network. Currently, the underlying medium or protocol used to actually do the data transfer is GraphSync. As such, the Data Transfer Protocol can be thought of as a negotiation protocol.

The Data Transfer Protocol is used both for Storage and for Retrieval Deals. In both cases, the data transfer request is initiated by the client. The primary reason for this is that clients will more often than not be behind NATs and therefore, it is more convenient to start any data transfer from their side. In the case of Storage Deals the data transfer request is initiated as a push request to send data to the storage provider. In the case of Retrieval Deals the data transfer request is initiated as a pull request to retrieve data by the storage provider.

The request to initiate a data transfer includes a voucher or token (none to be confused with the Payment Channel voucher) that points to a specific deal that the two parties have agreed to before. This is so that the storage provider can identify and link the request to a deal it has agreed to and not disregard the request. As described below the case might be slightly different for retrieval deals, where both a deal proposal and a data transfer request can be sent at once.

Modules

This diagram shows how Data Transfer and its modules fit into the picture with the Storage and Retrieval Markets. In particular, note how the Data Transfer Request Validators from the markets are plugged into the Data Transfer module, but their code belongs in the Markets system.

Data Transfer

Terminology

  • Push Request: A request to send data to the other party - normally initiated by the client and primarily in case of a Storage Deal.
  • Pull Request: A request to have the other party send data - normally initiated by the client and primarily in case of a Retrieval Deal.
  • Requestor: The party that initiates the data transfer request (whether Push or Pull) - normally the client, at least as currently implemented in Filecoin, to overcome NAT-traversal problems.
  • Responder: The party that receives the data transfer request - normally the storage provider.
  • Data Transfer Voucher or Token: A wrapper around storage- or retrieval-related data that can identify and validate the transfer request to the other party.
  • Request Validator: The data transfer module only initiates a transfer when the responder can validate that the request is tied directly to either an existing storage or retrieval deal. Validation is not performed by the data transfer module itself. Instead, a request validator inspects the data transfer voucher to determine whether to respond to the request or disregard the request.
  • Transporter: Once a request is negotiated and validated, the actual transfer is managed by a transporter on both sides. The transporter is part of the data transfer module but is isolated from the negotiation process. It has access to an underlying verifiable transport protocol and uses it to send data and track progress.
  • Subscriber: An external component that monitors progress of a data transfer by subscribing to data transfer events, such as progress or completion.
  • GraphSync: The default underlying transport protocol used by the Transporter. The full graphsync specification can be found here

Request Phases

There are two basic phases to any data transfer:

  1. Negotiation: the requestor and responder agree to the transfer by validating it with the data transfer voucher.
  2. Transfer: once the negotiation phase is complete, the data is actually transferred. The default protocol used to do the transfer is Graphsync.

Note that the Negotiation and Transfer stages can occur in separate round trips, or potentially the same round trip, where the requesting party implicitly agrees by sending the request, and the responding party can agree and immediately send or receive data. Whether the process is taking place in a single or multiple round-trips depends in part on whether the request is a push request (storage deal) or a pull request (retrieval deal), and on whether the data transfer negotiation process is able to piggy back on the underlying transport mechanism. In the case of GraphSync as transport mechanism, data transfer requests can piggy back as an extension to the GraphSync protocol using GraphSync’s built-in extensibility. So, only a single round trip is required for Pull Requests. However, because Graphsync is a request/response protocol with no direct support for push type requests, in the Push case, negotiation happens in a seperate request over data transfer’s own libp2p protocol /fil/datatransfer/1.0.0. Other future transport mechanisms might handle both Push and Pull, either, or neither as a single round trip. Upon receiving a data transfer request, the data transfer module does the decoding the voucher and delivers it to the request validators. In storage deals, the request validator checks if the deal included is one that the recipient has agreed to before. For retrieval deals the request includes the proposal for the retrieval deal itself. As long as request validator accepts the deal proposal, everything is done at once as a single round-trip.

It is worth noting that in the case of retrieval the provider can accept the deal and the data transfer request, but then pause the retrieval itself in order to carry out the unsealing process. The storage provider has to unseal all of the requested data before initiating the actual data transfer. Furthermore, the storage provider has the option of pausing the retrieval flow before starting the unsealing process in order to ask for an unsealing payment request. Storage providers have the option to request for this payment in order to cover unsealing computation costs and avoid falling victims of misbehaving clients.

Example Flows

Push Flow

Data Transfer - Push Flow

  1. A requestor initiates a Push transfer when it wants to send data to another party.
  2. The requestors’ data transfer module will send a push request to the responder along with the data transfer voucher.
  3. The responder’s data transfer module validates the data transfer request via the Validator provided as a dependency by the responder.
  4. The responder’s data transfer module initiates the transfer by making a GraphSync request.
  5. The requestor receives the GraphSync request, verifies that it recognises the data transfer and begins sending data.
  6. The responder receives data and can produce an indication of progress.
  7. The responder completes receiving data, and notifies any listeners.

The push flow is ideal for storage deals, where the client initiates the data transfer straightaway once the provider indicates their intent to accept and publish the client’s deal proposal.

Pull Flow - Single Round Trip

Data Transfer - Single Round Trip Pull Flow

  1. A requestor initiates a Pull transfer when it wants to receive data from another party.
  2. The requestor’s data transfer module initiates the transfer by making a pull request embedded in the GraphSync request to the responder. The request includes the data transfer voucher.
  3. The responder receives the GraphSync request, and forwards the data transfer request to the data transfer module.
  4. The responder’s data transfer module validates the data transfer request via a PullValidator provided as a dependency by the responder.
  5. The responder accepts the GraphSync request and sends the accepted response along with the data transfer level acceptance response.
  6. The requestor receives data and can produce an indication of progress. This timing of this step comes later in time, after the storage provider has finished unsealing the data.
  7. The requestor completes receiving data, and notifies any listeners.

Protocol

A data transfer CAN be negotiated over the network via the Data Transfer Protocol, a libp2p protocol type.

Using the Data Transfer Protocol as an independent libp2p communication mechanism is not a hard requirement – as long as both parties have an implementation of the Data Transfer Subsystem that can talk to the other, any transport mechanism (including offline mechanisms) is acceptable.

Data Structures

package datatransfer

import (
	"fmt"
	"time"

	"github.com/ipfs/go-cid"
	"github.com/ipld/go-ipld-prime"
	"github.com/ipld/go-ipld-prime/datamodel"
	"github.com/libp2p/go-libp2p/core/peer"
	cbg "github.com/whyrusleeping/cbor-gen"
)

//go:generate cbor-gen-for ChannelID ChannelStages ChannelStage Log

// TypeIdentifier is a unique string identifier for a type of encodable object in a
// registry
type TypeIdentifier string

// EmptyTypeIdentifier means there is no voucher present
const EmptyTypeIdentifier = TypeIdentifier("")

// TypedVoucher is a voucher or voucher result in IPLD form and an associated
// type identifier for that voucher or voucher result
type TypedVoucher struct {
	Voucher datamodel.Node
	Type    TypeIdentifier
}

// Equals is a utility to compare that two TypedVouchers are the same - both type
// and the voucher's IPLD content
func (tv1 TypedVoucher) Equals(tv2 TypedVoucher) bool {
	return tv1.Type == tv2.Type && ipld.DeepEqual(tv1.Voucher, tv2.Voucher)
}

// TransferID is an identifier for a data transfer, shared between
// request/responder and unique to the requester
type TransferID uint64

// ChannelID is a unique identifier for a channel, distinct by both the other
// party's peer ID + the transfer ID
type ChannelID struct {
	Initiator peer.ID
	Responder peer.ID
	ID        TransferID
}

func (c ChannelID) String() string {
	return fmt.Sprintf("%s-%s-%d", c.Initiator, c.Responder, c.ID)
}

// OtherParty returns the peer on the other side of the request, depending
// on whether this peer is the initiator or responder
func (c ChannelID) OtherParty(thisPeer peer.ID) peer.ID {
	if thisPeer == c.Initiator {
		return c.Responder
	}
	return c.Initiator
}

// Channel represents all the parameters for a single data transfer
type Channel interface {
	// TransferID returns the transfer id for this channel
	TransferID() TransferID

	// BaseCID returns the CID that is at the root of this data transfer
	BaseCID() cid.Cid

	// Selector returns the IPLD selector for this data transfer (represented as
	// an IPLD node)
	Selector() datamodel.Node

	// Voucher returns the initial voucher for this data transfer
	Voucher() TypedVoucher

	// Sender returns the peer id for the node that is sending data
	Sender() peer.ID

	// Recipient returns the peer id for the node that is receiving data
	Recipient() peer.ID

	// TotalSize returns the total size for the data being transferred
	TotalSize() uint64

	// IsPull returns whether this is a pull request
	IsPull() bool

	// ChannelID returns the ChannelID for this request
	ChannelID() ChannelID

	// OtherPeer returns the counter party peer for this channel
	OtherPeer() peer.ID
}

// ChannelState is channel parameters plus it's current state
type ChannelState interface {
	Channel

	// SelfPeer returns the peer this channel belongs to
	SelfPeer() peer.ID

	// Status is the current status of this channel
	Status() Status

	// Sent returns the number of bytes sent
	Sent() uint64

	// Received returns the number of bytes received
	Received() uint64

	// Message offers additional information about the current status
	Message() string

	// Vouchers returns all vouchers sent on this channel
	Vouchers() []TypedVoucher

	// VoucherResults are results of vouchers sent on the channel
	VoucherResults() []TypedVoucher

	// LastVoucher returns the last voucher sent on the channel
	LastVoucher() TypedVoucher

	// LastVoucherResult returns the last voucher result sent on the channel
	LastVoucherResult() TypedVoucher

	// ReceivedCidsTotal returns the number of (non-unique) cids received so far
	// on the channel - note that a block can exist in more than one place in the DAG
	ReceivedCidsTotal() int64

	// QueuedCidsTotal returns the number of (non-unique) cids queued so far
	// on the channel - note that a block can exist in more than one place in the DAG
	QueuedCidsTotal() int64

	// SentCidsTotal returns the number of (non-unique) cids sent so far
	// on the channel - note that a block can exist in more than one place in the DAG
	SentCidsTotal() int64

	// Queued returns the number of bytes read from the node and queued for sending
	Queued() uint64

	// DataLimit is the maximum data that can be transferred on this channel before
	// revalidation. 0 indicates no limit.
	DataLimit() uint64

	// RequiresFinalization indicates at the end of the transfer, the channel should
	// be left open for a final settlement
	RequiresFinalization() bool

	// InitiatorPaused indicates whether the initiator of this channel is in a paused state
	InitiatorPaused() bool

	// ResponderPaused indicates whether the responder of this channel is in a paused state
	ResponderPaused() bool

	// BothPaused indicates both sides of the transfer have paused the transfer
	BothPaused() bool

	// SelfPaused indicates whether the local peer for this channel is in a paused state
	SelfPaused() bool

	// Stages returns the timeline of events this data transfer has gone through,
	// for observability purposes.
	//
	// It is unsafe for the caller to modify the return value, and changes
	// may not be persisted. It should be treated as immutable.
	Stages() *ChannelStages
}

// ChannelStages captures a timeline of the progress of a data transfer channel,
// grouped by stages.
//
// EXPERIMENTAL; subject to change.
type ChannelStages struct {
	// Stages contains an entry for every stage the channel has gone through.
	// Each stage then contains logs.
	Stages []*ChannelStage
}

// ChannelStage traces the execution of a data transfer channel stage.
//
// EXPERIMENTAL; subject to change.
type ChannelStage struct {
	// Human-readable fields.
	// TODO: these _will_ need to be converted to canonical representations, so
	//  they are machine readable.
	Name        string
	Description string

	// Timestamps.
	// TODO: may be worth adding an exit timestamp. It _could_ be inferred from
	//  the start of the next stage, or from the timestamp of the last log line
	//  if this is a terminal stage. But that's non-determistic and it relies on
	//  assumptions.
	CreatedTime cbg.CborTime
	UpdatedTime cbg.CborTime

	// Logs contains a detailed timeline of events that occurred inside
	// this stage.
	Logs []*Log
}

// Log represents a point-in-time event that occurred inside a channel stage.
//
// EXPERIMENTAL; subject to change.
type Log struct {
	// Log is a human readable message.
	//
	// TODO: this _may_ need to be converted to a canonical data model so it
	//  is machine-readable.
	Log string

	UpdatedTime cbg.CborTime
}

// AddLog adds a log to the specified stage, creating the stage if
// it doesn't exist yet.
//
// EXPERIMENTAL; subject to change.
func (cs *ChannelStages) AddLog(stage, msg string) {
	if cs == nil {
		return
	}

	now := curTime()
	st := cs.GetStage(stage)
	if st == nil {
		st = &ChannelStage{
			CreatedTime: now,
		}
		cs.Stages = append(cs.Stages, st)
	}

	st.Name = stage
	st.UpdatedTime = now
	if msg != "" && (len(st.Logs) == 0 || st.Logs[len(st.Logs)-1].Log != msg) {
		// only add the log if it's not a duplicate.
		st.Logs = append(st.Logs, &Log{msg, now})
	}
}

// GetStage returns the ChannelStage object for a named stage, or nil if not found.
//
// TODO: the input should be a strongly-typed enum instead of a free-form string.
// TODO: drop Get from GetStage to make this code more idiomatic. Return a
//
//	second ok boolean to make it even more idiomatic.
//
// EXPERIMENTAL; subject to change.
func (cs *ChannelStages) GetStage(stage string) *ChannelStage {
	if cs == nil {
		return nil
	}

	for _, s := range cs.Stages {
		if s.Name == stage {
			return s
		}
	}

	return nil
}

func curTime() cbg.CborTime {
	now := time.Now()
	return cbg.CborTime(time.Unix(0, now.UnixNano()).UTC())
}
package datatransfer

import "github.com/filecoin-project/go-statemachine/fsm"

// Status is the status of transfer for a given channel
type Status uint64

const (
	// Requested means a data transfer was requested by has not yet been approved
	Requested Status = iota

	// Ongoing means the data transfer is in progress
	Ongoing

	// TransferFinished indicates the initiator is done sending/receiving
	// data but is awaiting confirmation from the responder
	TransferFinished

	// ResponderCompleted indicates the initiator received a message from the
	// responder that it's completed
	ResponderCompleted

	// Finalizing means the responder is awaiting a final message from the initator to
	// consider the transfer done
	Finalizing

	// Completing just means we have some final cleanup for a completed request
	Completing

	// Completed means the data transfer is completed successfully
	Completed

	// Failing just means we have some final cleanup for a failed request
	Failing

	// Failed means the data transfer failed
	Failed

	// Cancelling just means we have some final cleanup for a cancelled request
	Cancelling

	// Cancelled means the data transfer ended prematurely
	Cancelled

	// DEPRECATED: Use InitiatorPaused() method on ChannelState
	InitiatorPaused

	// DEPRECATED: Use ResponderPaused() method on ChannelState
	ResponderPaused

	// DEPRECATED: Use BothPaused() method on ChannelState
	BothPaused

	// ResponderFinalizing is a unique state where the responder is awaiting a final voucher
	ResponderFinalizing

	// ResponderFinalizingTransferFinished is a unique state where the responder is awaiting a final voucher
	// and we have received all data
	ResponderFinalizingTransferFinished

	// ChannelNotFoundError means the searched for data transfer does not exist
	ChannelNotFoundError

	// Queued indicates a data transfer request has been accepted, but is not actively transfering yet
	Queued

	// AwaitingAcceptance indicates a transfer request is actively being processed by the transport
	// even if the remote has not yet responded that it's accepted the transfer. Such a state can
	// occur, for example, in a requestor-initiated transfer that starts processing prior to receiving
	// acceptance from the server.
	AwaitingAcceptance
)

type statusList []Status

func (sl statusList) Contains(s Status) bool {
	for _, ts := range sl {
		if ts == s {
			return true
		}
	}
	return false
}

func (sl statusList) AsFSMStates() []fsm.StateKey {
	sk := make([]fsm.StateKey, 0, len(sl))
	for _, s := range sl {
		sk = append(sk, s)
	}
	return sk
}

var NotAcceptedStates = statusList{
	Requested,
	AwaitingAcceptance,
	Cancelled,
	Cancelling,
	Failed,
	Failing,
	ChannelNotFoundError}

func (s Status) IsAccepted() bool {
	return !NotAcceptedStates.Contains(s)
}
func (s Status) String() string {
	return Statuses[s]
}

var FinalizationStatuses = statusList{Finalizing, Completed, Completing}

func (s Status) InFinalization() bool {
	return FinalizationStatuses.Contains(s)
}

var TransferCompleteStates = statusList{
	TransferFinished,
	ResponderFinalizingTransferFinished,
	Finalizing,
	Completed,
	Completing,
	Failing,
	Failed,
	Cancelling,
	Cancelled,
	ChannelNotFoundError,
}

func (s Status) TransferComplete() bool {
	return TransferCompleteStates.Contains(s)
}

var TransferringStates = statusList{
	Ongoing,
	ResponderCompleted,
	ResponderFinalizing,
	AwaitingAcceptance,
}

func (s Status) Transferring() bool {
	return TransferringStates.Contains(s)
}

// Statuses are human readable names for data transfer states
var Statuses = map[Status]string{
	// Requested means a data transfer was requested by has not yet been approved
	Requested:                           "Requested",
	Ongoing:                             "Ongoing",
	TransferFinished:                    "TransferFinished",
	ResponderCompleted:                  "ResponderCompleted",
	Finalizing:                          "Finalizing",
	Completing:                          "Completing",
	Completed:                           "Completed",
	Failing:                             "Failing",
	Failed:                              "Failed",
	Cancelling:                          "Cancelling",
	Cancelled:                           "Cancelled",
	InitiatorPaused:                     "InitiatorPaused",
	ResponderPaused:                     "ResponderPaused",
	BothPaused:                          "BothPaused",
	ResponderFinalizing:                 "ResponderFinalizing",
	ResponderFinalizingTransferFinished: "ResponderFinalizingTransferFinished",
	ChannelNotFoundError:                "ChannelNotFoundError",
	Queued:                              "Queued",
	AwaitingAcceptance:                  "AwaitingAcceptance",
}
Manager is the core interface presented by all implementations of of the data transfer sub system
type Manager interface {

	// Start initializes data transfer processing
	Start(ctx context.Context) error

	// OnReady registers a listener for when the data transfer comes on line
	OnReady(ReadyFunc)

	// Stop terminates all data transfers and ends processing
	Stop(ctx context.Context) error

	// RegisterVoucherType registers a validator for the given voucher type
	// will error if voucher type does not implement voucher
	// or if there is a voucher type registered with an identical identifier
	RegisterVoucherType(voucherType TypeIdentifier, validator RequestValidator) error

	// RegisterTransportConfigurer registers the given transport configurer to be run on requests with the given voucher
	// type
	RegisterTransportConfigurer(voucherType TypeIdentifier, configurer TransportConfigurer) error

	// open a data transfer that will send data to the recipient peer and
	// transfer parts of the piece that match the selector
	OpenPushDataChannel(ctx context.Context, to peer.ID, voucher TypedVoucher, baseCid cid.Cid, selector datamodel.Node, options ...TransferOption) (ChannelID, error)

	// open a data transfer that will request data from the sending peer and
	// transfer parts of the piece that match the selector
	OpenPullDataChannel(ctx context.Context, to peer.ID, voucher TypedVoucher, baseCid cid.Cid, selector datamodel.Node, options ...TransferOption) (ChannelID, error)

	// send an intermediate voucher as needed when the receiver sends a request for revalidation
	SendVoucher(ctx context.Context, chid ChannelID, voucher TypedVoucher) error

	// send information from the responder to update the initiator on the state of their voucher
	SendVoucherResult(ctx context.Context, chid ChannelID, voucherResult TypedVoucher) error

	// Update the validation status for a given channel, to change data limits, finalization, accepted status, and pause state
	// and send new voucher results as
	UpdateValidationStatus(ctx context.Context, chid ChannelID, validationResult ValidationResult) error

	// close an open channel (effectively a cancel)
	CloseDataTransferChannel(ctx context.Context, chid ChannelID) error

	// pause a data transfer channel (only allowed if transport supports it)
	PauseDataTransferChannel(ctx context.Context, chid ChannelID) error

	// resume a data transfer channel (only allowed if transport supports it)
	ResumeDataTransferChannel(ctx context.Context, chid ChannelID) error

	// get status of a transfer
	TransferChannelStatus(ctx context.Context, x ChannelID) Status

	// get channel state
	ChannelState(ctx context.Context, chid ChannelID) (ChannelState, error)

	// get notified when certain types of events happen
	SubscribeToEvents(subscriber Subscriber) Unsubscribe

	// get all in progress transfers
	InProgressChannels(ctx context.Context) (map[ChannelID]ChannelState, error)

	// RestartDataTransferChannel restarts an existing data transfer channel
	RestartDataTransferChannel(ctx context.Context, chid ChannelID) error
}