Data Transfer in Filecoin
-
State
stable
-
Theory Audit
n/a
-
Edit this section
-
section-systems.filecoin_files.data_transfer
-
State
stable
-
Theory Audit
n/a
- Edit this section
-
section-systems.filecoin_files.data_transfer
The Data Transfer Protocol is a protocol for transferring all or part of a Piece
across the network when a deal is made. The overall goal for the data transfer module is for it to be an abstraction of the underlying transport medium over which data is transferred between different parties in the Filecoin network. Currently, the underlying medium or protocol used to actually do the data transfer is GraphSync. As such, the Data Transfer Protocol can be thought of as a negotiation protocol.
The Data Transfer Protocol is used both for Storage and for Retrieval Deals. In both cases, the data transfer request is initiated by the client. The primary reason for this is that clients will more often than not be behind NATs and therefore, it is more convenient to start any data transfer from their side. In the case of Storage Deals the data transfer request is initiated as a push request to send data to the storage provider. In the case of Retrieval Deals the data transfer request is initiated as a pull request to retrieve data by the storage provider.
The request to initiate a data transfer includes a voucher or token (none to be confused with the Payment Channel voucher) that points to a specific deal that the two parties have agreed to before. This is so that the storage provider can identify and link the request to a deal it has agreed to and not disregard the request. As described below the case might be slightly different for retrieval deals, where both a deal proposal and a data transfer request can be sent at once.
Modules
-
State
stable
-
Theory Audit
n/a
-
Edit this section
-
section-systems.filecoin_files.data_transfer.modules
-
State
stable
-
Theory Audit
n/a
- Edit this section
-
section-systems.filecoin_files.data_transfer.modules
This diagram shows how Data Transfer and its modules fit into the picture with the Storage and Retrieval Markets. In particular, note how the Data Transfer Request Validators from the markets are plugged into the Data Transfer module, but their code belongs in the Markets system.
Terminology
-
State
stable
-
Theory Audit
n/a
-
Edit this section
-
section-systems.filecoin_files.data_transfer.terminology
-
State
stable
-
Theory Audit
n/a
- Edit this section
-
section-systems.filecoin_files.data_transfer.terminology
- Push Request: A request to send data to the other party - normally initiated by the client and primarily in case of a Storage Deal.
- Pull Request: A request to have the other party send data - normally initiated by the client and primarily in case of a Retrieval Deal.
- Requestor: The party that initiates the data transfer request (whether Push or Pull) - normally the client, at least as currently implemented in Filecoin, to overcome NAT-traversal problems.
- Responder: The party that receives the data transfer request - normally the storage provider.
- Data Transfer Voucher or Token: A wrapper around storage- or retrieval-related data that can identify and validate the transfer request to the other party.
- Request Validator: The data transfer module only initiates a transfer when the responder can validate that the request is tied directly to either an existing storage or retrieval deal. Validation is not performed by the data transfer module itself. Instead, a request validator inspects the data transfer voucher to determine whether to respond to the request or disregard the request.
- Transporter: Once a request is negotiated and validated, the actual transfer is managed by a transporter on both sides. The transporter is part of the data transfer module but is isolated from the negotiation process. It has access to an underlying verifiable transport protocol and uses it to send data and track progress.
- Subscriber: An external component that monitors progress of a data transfer by subscribing to data transfer events, such as progress or completion.
- GraphSync: The default underlying transport protocol used by the Transporter. The full graphsync specification can be found here
Request Phases
-
State
stable
-
Theory Audit
n/a
-
Edit this section
-
section-systems.filecoin_files.data_transfer.request-phases
-
State
stable
-
Theory Audit
n/a
- Edit this section
-
section-systems.filecoin_files.data_transfer.request-phases
There are two basic phases to any data transfer:
- Negotiation: the requestor and responder agree to the transfer by validating it with the data transfer voucher.
- Transfer: once the negotiation phase is complete, the data is actually transferred. The default protocol used to do the transfer is Graphsync.
Note that the Negotiation and Transfer stages can occur in separate round trips,
or potentially the same round trip, where the requesting party implicitly agrees by sending the request, and the responding party can agree and immediately send or receive data. Whether the process is taking place in a single or multiple round-trips depends in part on whether the request is a push request (storage deal) or a pull request (retrieval deal), and on whether the data transfer negotiation process is able to piggy back on the underlying transport mechanism.
In the case of GraphSync as transport mechanism, data transfer requests can piggy back as an extension to the GraphSync protocol using
GraphSync’s built-in extensibility. So, only a single round trip is required for Pull Requests. However, because Graphsync is a request/response protocol with no direct support for push
type requests, in the Push case, negotiation happens in a seperate request over data transfer’s own libp2p protocol /fil/datatransfer/1.0.0
. Other future transport mechanisms might handle both Push and Pull, either, or neither as a single round trip.
Upon receiving a data transfer request, the data transfer module does the decoding the voucher and delivers it to the request validators. In storage deals, the request validator checks if the deal included is one that the recipient has agreed to before. For retrieval deals the request includes the proposal for the retrieval deal itself. As long as request validator accepts the deal proposal, everything is done at once as a single round-trip.
It is worth noting that in the case of retrieval the provider can accept the deal and the data transfer request, but then pause the retrieval itself in order to carry out the unsealing process. The storage provider has to unseal all of the requested data before initiating the actual data transfer. Furthermore, the storage provider has the option of pausing the retrieval flow before starting the unsealing process in order to ask for an unsealing payment request. Storage providers have the option to request for this payment in order to cover unsealing computation costs and avoid falling victims of misbehaving clients.
Example Flows
-
State
stable
-
Theory Audit
n/a
-
Edit this section
-
section-systems.filecoin_files.data_transfer.example-flows
-
State
stable
-
Theory Audit
n/a
- Edit this section
-
section-systems.filecoin_files.data_transfer.example-flows
Push Flow
-
State
stable
-
Theory Audit
n/a
-
Edit this section
-
section-systems.filecoin_files.data_transfer.push-flow
-
State
stable
-
Theory Audit
n/a
- Edit this section
-
section-systems.filecoin_files.data_transfer.push-flow
- A requestor initiates a Push transfer when it wants to send data to another party.
- The requestors’ data transfer module will send a push request to the responder along with the data transfer voucher.
- The responder’s data transfer module validates the data transfer request via the Validator provided as a dependency by the responder.
- The responder’s data transfer module initiates the transfer by making a GraphSync request.
- The requestor receives the GraphSync request, verifies that it recognises the data transfer and begins sending data.
- The responder receives data and can produce an indication of progress.
- The responder completes receiving data, and notifies any listeners.
The push flow is ideal for storage deals, where the client initiates the data transfer straightaway once the provider indicates their intent to accept and publish the client’s deal proposal.
Pull Flow - Single Round Trip
-
State
stable
-
Theory Audit
n/a
-
Edit this section
-
section-systems.filecoin_files.data_transfer.pull-flow---single-round-trip
-
State
stable
-
Theory Audit
n/a
- Edit this section
-
section-systems.filecoin_files.data_transfer.pull-flow---single-round-trip
- A requestor initiates a Pull transfer when it wants to receive data from another party.
- The requestor’s data transfer module initiates the transfer by making a pull request embedded in the GraphSync request to the responder. The request includes the data transfer voucher.
- The responder receives the GraphSync request, and forwards the data transfer request to the data transfer module.
- The responder’s data transfer module validates the data transfer request via a PullValidator provided as a dependency by the responder.
- The responder accepts the GraphSync request and sends the accepted response along with the data transfer level acceptance response.
- The requestor receives data and can produce an indication of progress. This timing of this step comes later in time, after the storage provider has finished unsealing the data.
- The requestor completes receiving data, and notifies any listeners.
Protocol
-
State
stable
-
Theory Audit
n/a
-
Edit this section
-
section-systems.filecoin_files.data_transfer.protocol
-
State
stable
-
Theory Audit
n/a
- Edit this section
-
section-systems.filecoin_files.data_transfer.protocol
A data transfer CAN be negotiated over the network via the Data Transfer Protocol, a libp2p protocol type.
Using the Data Transfer Protocol as an independent libp2p communication mechanism is not a hard requirement – as long as both parties have an implementation of the Data Transfer Subsystem that can talk to the other, any transport mechanism (including offline mechanisms) is acceptable.
Data Structures
-
State
stable
-
Theory Audit
n/a
-
Edit this section
-
section-systems.filecoin_files.data_transfer.data-structures
-
State
stable
-
Theory Audit
n/a
- Edit this section
-
section-systems.filecoin_files.data_transfer.data-structures
package datatransfer
import (
"fmt"
"time"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/libp2p/go-libp2p/core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
)
//go:generate cbor-gen-for ChannelID ChannelStages ChannelStage Log
// TypeIdentifier is a unique string identifier for a type of encodable object in a
// registry
type TypeIdentifier string
// EmptyTypeIdentifier means there is no voucher present
const EmptyTypeIdentifier = TypeIdentifier("")
// TypedVoucher is a voucher or voucher result in IPLD form and an associated
// type identifier for that voucher or voucher result
type TypedVoucher struct {
Voucher datamodel.Node
Type TypeIdentifier
}
// Equals is a utility to compare that two TypedVouchers are the same - both type
// and the voucher's IPLD content
func (tv1 TypedVoucher) Equals(tv2 TypedVoucher) bool {
return tv1.Type == tv2.Type && ipld.DeepEqual(tv1.Voucher, tv2.Voucher)
}
// TransferID is an identifier for a data transfer, shared between
// request/responder and unique to the requester
type TransferID uint64
// ChannelID is a unique identifier for a channel, distinct by both the other
// party's peer ID + the transfer ID
type ChannelID struct {
Initiator peer.ID
Responder peer.ID
ID TransferID
}
func (c ChannelID) String() string {
return fmt.Sprintf("%s-%s-%d", c.Initiator, c.Responder, c.ID)
}
// OtherParty returns the peer on the other side of the request, depending
// on whether this peer is the initiator or responder
func (c ChannelID) OtherParty(thisPeer peer.ID) peer.ID {
if thisPeer == c.Initiator {
return c.Responder
}
return c.Initiator
}
// Channel represents all the parameters for a single data transfer
type Channel interface {
// TransferID returns the transfer id for this channel
TransferID() TransferID
// BaseCID returns the CID that is at the root of this data transfer
BaseCID() cid.Cid
// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
Selector() datamodel.Node
// Voucher returns the initial voucher for this data transfer
Voucher() TypedVoucher
// Sender returns the peer id for the node that is sending data
Sender() peer.ID
// Recipient returns the peer id for the node that is receiving data
Recipient() peer.ID
// TotalSize returns the total size for the data being transferred
TotalSize() uint64
// IsPull returns whether this is a pull request
IsPull() bool
// ChannelID returns the ChannelID for this request
ChannelID() ChannelID
// OtherPeer returns the counter party peer for this channel
OtherPeer() peer.ID
}
// ChannelState is channel parameters plus it's current state
type ChannelState interface {
Channel
// SelfPeer returns the peer this channel belongs to
SelfPeer() peer.ID
// Status is the current status of this channel
Status() Status
// Sent returns the number of bytes sent
Sent() uint64
// Received returns the number of bytes received
Received() uint64
// Message offers additional information about the current status
Message() string
// Vouchers returns all vouchers sent on this channel
Vouchers() []TypedVoucher
// VoucherResults are results of vouchers sent on the channel
VoucherResults() []TypedVoucher
// LastVoucher returns the last voucher sent on the channel
LastVoucher() TypedVoucher
// LastVoucherResult returns the last voucher result sent on the channel
LastVoucherResult() TypedVoucher
// ReceivedCidsTotal returns the number of (non-unique) cids received so far
// on the channel - note that a block can exist in more than one place in the DAG
ReceivedCidsTotal() int64
// QueuedCidsTotal returns the number of (non-unique) cids queued so far
// on the channel - note that a block can exist in more than one place in the DAG
QueuedCidsTotal() int64
// SentCidsTotal returns the number of (non-unique) cids sent so far
// on the channel - note that a block can exist in more than one place in the DAG
SentCidsTotal() int64
// Queued returns the number of bytes read from the node and queued for sending
Queued() uint64
// DataLimit is the maximum data that can be transferred on this channel before
// revalidation. 0 indicates no limit.
DataLimit() uint64
// RequiresFinalization indicates at the end of the transfer, the channel should
// be left open for a final settlement
RequiresFinalization() bool
// InitiatorPaused indicates whether the initiator of this channel is in a paused state
InitiatorPaused() bool
// ResponderPaused indicates whether the responder of this channel is in a paused state
ResponderPaused() bool
// BothPaused indicates both sides of the transfer have paused the transfer
BothPaused() bool
// SelfPaused indicates whether the local peer for this channel is in a paused state
SelfPaused() bool
// Stages returns the timeline of events this data transfer has gone through,
// for observability purposes.
//
// It is unsafe for the caller to modify the return value, and changes
// may not be persisted. It should be treated as immutable.
Stages() *ChannelStages
}
// ChannelStages captures a timeline of the progress of a data transfer channel,
// grouped by stages.
//
// EXPERIMENTAL; subject to change.
type ChannelStages struct {
// Stages contains an entry for every stage the channel has gone through.
// Each stage then contains logs.
Stages []*ChannelStage
}
// ChannelStage traces the execution of a data transfer channel stage.
//
// EXPERIMENTAL; subject to change.
type ChannelStage struct {
// Human-readable fields.
// TODO: these _will_ need to be converted to canonical representations, so
// they are machine readable.
Name string
Description string
// Timestamps.
// TODO: may be worth adding an exit timestamp. It _could_ be inferred from
// the start of the next stage, or from the timestamp of the last log line
// if this is a terminal stage. But that's non-determistic and it relies on
// assumptions.
CreatedTime cbg.CborTime
UpdatedTime cbg.CborTime
// Logs contains a detailed timeline of events that occurred inside
// this stage.
Logs []*Log
}
// Log represents a point-in-time event that occurred inside a channel stage.
//
// EXPERIMENTAL; subject to change.
type Log struct {
// Log is a human readable message.
//
// TODO: this _may_ need to be converted to a canonical data model so it
// is machine-readable.
Log string
UpdatedTime cbg.CborTime
}
// AddLog adds a log to the specified stage, creating the stage if
// it doesn't exist yet.
//
// EXPERIMENTAL; subject to change.
func (cs *ChannelStages) AddLog(stage, msg string) {
if cs == nil {
return
}
now := curTime()
st := cs.GetStage(stage)
if st == nil {
st = &ChannelStage{
CreatedTime: now,
}
cs.Stages = append(cs.Stages, st)
}
st.Name = stage
st.UpdatedTime = now
if msg != "" && (len(st.Logs) == 0 || st.Logs[len(st.Logs)-1].Log != msg) {
// only add the log if it's not a duplicate.
st.Logs = append(st.Logs, &Log{msg, now})
}
}
// GetStage returns the ChannelStage object for a named stage, or nil if not found.
//
// TODO: the input should be a strongly-typed enum instead of a free-form string.
// TODO: drop Get from GetStage to make this code more idiomatic. Return a
//
// second ok boolean to make it even more idiomatic.
//
// EXPERIMENTAL; subject to change.
func (cs *ChannelStages) GetStage(stage string) *ChannelStage {
if cs == nil {
return nil
}
for _, s := range cs.Stages {
if s.Name == stage {
return s
}
}
return nil
}
func curTime() cbg.CborTime {
now := time.Now()
return cbg.CborTime(time.Unix(0, now.UnixNano()).UTC())
}
package datatransfer
import "github.com/filecoin-project/go-statemachine/fsm"
// Status is the status of transfer for a given channel
type Status uint64
const (
// Requested means a data transfer was requested by has not yet been approved
Requested Status = iota
// Ongoing means the data transfer is in progress
Ongoing
// TransferFinished indicates the initiator is done sending/receiving
// data but is awaiting confirmation from the responder
TransferFinished
// ResponderCompleted indicates the initiator received a message from the
// responder that it's completed
ResponderCompleted
// Finalizing means the responder is awaiting a final message from the initator to
// consider the transfer done
Finalizing
// Completing just means we have some final cleanup for a completed request
Completing
// Completed means the data transfer is completed successfully
Completed
// Failing just means we have some final cleanup for a failed request
Failing
// Failed means the data transfer failed
Failed
// Cancelling just means we have some final cleanup for a cancelled request
Cancelling
// Cancelled means the data transfer ended prematurely
Cancelled
// DEPRECATED: Use InitiatorPaused() method on ChannelState
InitiatorPaused
// DEPRECATED: Use ResponderPaused() method on ChannelState
ResponderPaused
// DEPRECATED: Use BothPaused() method on ChannelState
BothPaused
// ResponderFinalizing is a unique state where the responder is awaiting a final voucher
ResponderFinalizing
// ResponderFinalizingTransferFinished is a unique state where the responder is awaiting a final voucher
// and we have received all data
ResponderFinalizingTransferFinished
// ChannelNotFoundError means the searched for data transfer does not exist
ChannelNotFoundError
// Queued indicates a data transfer request has been accepted, but is not actively transfering yet
Queued
// AwaitingAcceptance indicates a transfer request is actively being processed by the transport
// even if the remote has not yet responded that it's accepted the transfer. Such a state can
// occur, for example, in a requestor-initiated transfer that starts processing prior to receiving
// acceptance from the server.
AwaitingAcceptance
)
type statusList []Status
func (sl statusList) Contains(s Status) bool {
for _, ts := range sl {
if ts == s {
return true
}
}
return false
}
func (sl statusList) AsFSMStates() []fsm.StateKey {
sk := make([]fsm.StateKey, 0, len(sl))
for _, s := range sl {
sk = append(sk, s)
}
return sk
}
var NotAcceptedStates = statusList{
Requested,
AwaitingAcceptance,
Cancelled,
Cancelling,
Failed,
Failing,
ChannelNotFoundError}
func (s Status) IsAccepted() bool {
return !NotAcceptedStates.Contains(s)
}
func (s Status) String() string {
return Statuses[s]
}
var FinalizationStatuses = statusList{Finalizing, Completed, Completing}
func (s Status) InFinalization() bool {
return FinalizationStatuses.Contains(s)
}
var TransferCompleteStates = statusList{
TransferFinished,
ResponderFinalizingTransferFinished,
Finalizing,
Completed,
Completing,
Failing,
Failed,
Cancelling,
Cancelled,
ChannelNotFoundError,
}
func (s Status) TransferComplete() bool {
return TransferCompleteStates.Contains(s)
}
var TransferringStates = statusList{
Ongoing,
ResponderCompleted,
ResponderFinalizing,
AwaitingAcceptance,
}
func (s Status) Transferring() bool {
return TransferringStates.Contains(s)
}
// Statuses are human readable names for data transfer states
var Statuses = map[Status]string{
// Requested means a data transfer was requested by has not yet been approved
Requested: "Requested",
Ongoing: "Ongoing",
TransferFinished: "TransferFinished",
ResponderCompleted: "ResponderCompleted",
Finalizing: "Finalizing",
Completing: "Completing",
Completed: "Completed",
Failing: "Failing",
Failed: "Failed",
Cancelling: "Cancelling",
Cancelled: "Cancelled",
InitiatorPaused: "InitiatorPaused",
ResponderPaused: "ResponderPaused",
BothPaused: "BothPaused",
ResponderFinalizing: "ResponderFinalizing",
ResponderFinalizingTransferFinished: "ResponderFinalizingTransferFinished",
ChannelNotFoundError: "ChannelNotFoundError",
Queued: "Queued",
AwaitingAcceptance: "AwaitingAcceptance",
}
type Manager interface {
// Start initializes data transfer processing
Start(ctx context.Context) error
// OnReady registers a listener for when the data transfer comes on line
OnReady(ReadyFunc)
// Stop terminates all data transfers and ends processing
Stop(ctx context.Context) error
// RegisterVoucherType registers a validator for the given voucher type
// will error if voucher type does not implement voucher
// or if there is a voucher type registered with an identical identifier
RegisterVoucherType(voucherType TypeIdentifier, validator RequestValidator) error
// RegisterTransportConfigurer registers the given transport configurer to be run on requests with the given voucher
// type
RegisterTransportConfigurer(voucherType TypeIdentifier, configurer TransportConfigurer) error
// open a data transfer that will send data to the recipient peer and
// transfer parts of the piece that match the selector
OpenPushDataChannel(ctx context.Context, to peer.ID, voucher TypedVoucher, baseCid cid.Cid, selector datamodel.Node, options ...TransferOption) (ChannelID, error)
// open a data transfer that will request data from the sending peer and
// transfer parts of the piece that match the selector
OpenPullDataChannel(ctx context.Context, to peer.ID, voucher TypedVoucher, baseCid cid.Cid, selector datamodel.Node, options ...TransferOption) (ChannelID, error)
// send an intermediate voucher as needed when the receiver sends a request for revalidation
SendVoucher(ctx context.Context, chid ChannelID, voucher TypedVoucher) error
// send information from the responder to update the initiator on the state of their voucher
SendVoucherResult(ctx context.Context, chid ChannelID, voucherResult TypedVoucher) error
// Update the validation status for a given channel, to change data limits, finalization, accepted status, and pause state
// and send new voucher results as
UpdateValidationStatus(ctx context.Context, chid ChannelID, validationResult ValidationResult) error
// close an open channel (effectively a cancel)
CloseDataTransferChannel(ctx context.Context, chid ChannelID) error
// pause a data transfer channel (only allowed if transport supports it)
PauseDataTransferChannel(ctx context.Context, chid ChannelID) error
// resume a data transfer channel (only allowed if transport supports it)
ResumeDataTransferChannel(ctx context.Context, chid ChannelID) error
// get status of a transfer
TransferChannelStatus(ctx context.Context, x ChannelID) Status
// get channel state
ChannelState(ctx context.Context, chid ChannelID) (ChannelState, error)
// get notified when certain types of events happen
SubscribeToEvents(subscriber Subscriber) Unsubscribe
// get all in progress transfers
InProgressChannels(ctx context.Context) (map[ChannelID]ChannelState, error)
// RestartDataTransferChannel restarts an existing data transfer channel
RestartDataTransferChannel(ctx context.Context, chid ChannelID) error
}