FLARE File Streaming

File streaming is a function that allows a file to be shared with one or more receivers. The file owner could be the FL Server or any FL Client. File streaming can be an effective alternative to sending large amounts of data with messages.

There are two main issues with sending large messages: - A large memory space is required to serialize the message into bytes before sending it. Once memory is saturated, everything becomes very slow. - A large byte array sent as a single message could saturate the network, slowing down overall processing.

File streaming, on the other hand, sends the big file with many small messages, each containing a chunk of file data. The big file is never loaded into memory completely. Since only small messages are sent over the network, it is less likely to bog down the network.

Push vs. Pull

There are two ways to get the file sent from one place to another: push and pull.

  • Push: The file owner sends the file to recipient(s). The push process is somewhat strict; if the file is sent to multiple recipients, all must process the same chunks simultaneously. If any one of them fails, the whole sending process fails. Hence, it is most useful when sending the file to a single recipient. The “push” method is implemented with the FileStreamer class.

  • Pull: The file owner first prepares the file and gets the Reference ID (RID) for the file. It then sends the RID to all recipients in whatever way it wants (e.g., broadcast). Once the RID is received, each recipient pulls the file chunk by chunk until the whole file is received. Pulling is more relaxed as recipients are not synchronized. Each recipient can pull the file at its own pace, which is useful when sharing a file with multiple recipients. The “pull” method is implemented with the FileDownloader class.

File Life Cycle Management

Though file download (pull) is more robust for multiple recipients, there is the issue of file management. Ultimately, it’s the file owner’s responsibility to remove the file (if necessary) when it is no longer needed.

Since each recipient can download the file at its own pace, there is no definitive time that the file is no longer needed at the file owner’s side. One effective way is activity timeout: if there has been no downloading activity from any recipient for a specified period, we can assume the file is no longer needed.

FileStreamer

Since FileStreamer (push) sends a file to recipients unannounced, the recipients must be set up in advance to process the received file. This is done by calling FileStreamer.register_stream_processing.

class FileStreamer(StreamerBase):
   @staticmethod
   def register_stream_processing(
       fl_ctx: FLContext,
       channel: str,
       topic: str,
       dest_dir: str = None,
       stream_done_cb=None,
       chunk_consumed_cb=None,
       **cb_kwargs,
   ):
       """Register for stream processing on the receiving side.

       Args:
           fl_ctx: the FLContext object
           channel: the app channel
           topic: the app topic
           dest_dir: the destination dir for received file. If not specified, system temp dir is used
           stream_done_cb: if specified, the callback to be called when the file is completely received
           chunk_consumed_cb: if specified, the callback to be called when a chunk is processed
           **cb_kwargs: the kwargs for the stream_done_cb

       Returns: None

       Notes: the stream_done_cb must follow stream_done_cb_signature as defined in apis.streaming.
       """

A channel and topic must be arranged between the sender and all receivers for them to share files. All recipients must call this method once for each channel and topic expected to receive files. Typically, this call is made at the beginning of the application in an event handler that handles the START_RUN event.

The stream_done_cb is called to notify the application when the file is completely received. It must follow the following signature:

def stream_done_cb_signature(stream_ctx: StreamContext, fl_ctx: FLContext, **kwargs):
   """This is the signature of stream_done_cb.

   Args:
       stream_ctx: context of the stream
       fl_ctx: FLContext object
       **kwargs: the kwargs specified when registering the stream_done_cb.

   Returns: None
   """

The stream_ctx contains information about the stream, including the information from the file owner when it calls stream_file to send the file.

The received data is saved in a temporary file. Use the following methods to get file information from the stream_ctx:

@staticmethod
def get_file_name(stream_ctx: StreamContext):
   """Get the file base name property from stream context.
   This method is intended to be used by the stream_done_cb() function of the receiving side.

   Args:
       stream_ctx: the stream context

   Returns: file base name
   """
@staticmethod
def get_file_location(stream_ctx: StreamContext):
   """Get the file location property from stream context.
   This method is intended to be used by the stream_done_cb() function of the receiving side.

   Args:
       stream_ctx: the stream context

   Returns: location (full file path) of the received file
   """
@staticmethod
def get_file_size(stream_ctx: StreamContext):
   """Get the file size property from stream context.
   This method is intended to be used by the stream_done_cb() function of the receiving side.

   Args:
       stream_ctx: the stream context

   Returns: size (in bytes) of the received file
   """

Note that it’s your responsibility to decide what to do with the received file and whether/when to delete the file.

Sending File

The file owner sends a file to one or more recipients by calling the stream_file function, as defined in the FileStreamer module.

def stream_file(
   channel: str,
   topic: str,
   stream_ctx: StreamContext,
   targets: List[str],
   file_name: str,
   fl_ctx: FLContext,
   chunk_size=None,
   chunk_timeout=None,
   optional=False,
   secure=False,
) -> (str, bool):
   """Stream a file to one or more targets.

   Args:
       channel: the app channel
       topic: the app topic
       stream_ctx: context data of the stream
       targets: targets that the file will be sent to
       file_name: full path to the file to be streamed
       fl_ctx: a FLContext object
       chunk_size: size of each chunk to be streamed. If not specified, default to 1M bytes.
       chunk_timeout: timeout for each chunk of data sent to targets.
       optional: whether the file is optional
       secure: whether P2P security is required

   Returns: a tuple of (RC, Result):
       - RC is ReturnCode.OK or ReturnCode.ERROR;
       - Result is whether the streaming completed successfully

   Notes: this is a blocking call - only returns after the streaming is done.
   """

The arguments are self-explanatory. Note that you can send any additional information through the stream_ctx, which is a dict. The information will be available to the recipient’s registered stream_done_cb.

FileDownloader

The file downloading process requires three steps:

  1. The data owner prepares the file(s) to be shared with recipients and obtains one reference id (RID) for each file.

  2. The data owner sends the RID(s) to all recipients. This is usually done with a broadcast message.

  3. Recipients download the files one by one with received RIDs.

Download Preparation

The data owner first prepares files to be shared with other recipients using the FileDownloader’s new_transaction and add_file methods, defined as follows:

class FileDownloader:

   @classmethod
   def new_transaction(
       cls,
       cell: Cell,
       timeout: float,
       timeout_cb,
       **cb_kwargs,
   ):
       """Create a new file download transaction.

       Args:
           cell: the cell for communication with recipients
           timeout: timeout for the transaction
           timeout_cb: CB to be called when the transaction is timed out
           **cb_kwargs: args to be passed to the CB

       Returns: transaction id

       The timeout_cb must follow this signature:

           cb(tx_id, file_names: List[str], **cb_args)
       """
@classmethod
def add_file(
    cls,
    transaction_id: str,
    file_name: str,
    file_downloaded_cb=None,
    **cb_kwargs,
) -> str:
    """Add a file to be downloaded to the specified transaction.

    Args:
        transaction_id: ID of the transaction
        file_name: name of the file to be downloaded
        file_downloaded_cb: CB to be called when the file is done downloading
        **cb_kwargs: args to be passed to the CB

    Returns: reference id for the file.

The file_downloaded_cb must follow this signature:

    cb(ref_id: str, to_receiver: str, status: str, file_name: str, **cb_kwargs)
    """

First, you call the new_transaction method to get a transaction id. A transaction can include one or more files to be downloaded. The arguments are self-explanatory. The cell is for messaging with the recipients. You can get it from a FLContext object as follows:

engine = fl_ctx.get_engine()
cell = engine.get_cell()

The timeout specifies when the transaction should time out: it is the maximum time within which no downloading activity is received from any recipient for any file in the transaction! Due to the distributed nature of recipients, they can download the file(s) at their own pace - some are downloading one file while others are downloading another file. The transaction is considered timed out only if no recipient is downloading any file of the transaction for the specified amount of time. The registered timeout_cb will be called with all the file names of the transaction. You can then decide what to do with these files.

You call the add_file method for each file to be downloaded. You receive a file reference id (RID) for each file added. You then send the RIDs to all recipients with a message.

Download File

Once the recipient receives RID(s), it calls the function to download the referenced file from the data owner.

def download_file(
   from_fqcn: str,
   ref_id: str,
   per_request_timeout: float,
   cell: Cell,
   location: str = None,
   secure=False,
   optional=False,
   abort_signal=None,
) -> (str, Optional[str]):
   """Download the referenced file from the file owner.

   Args:
       from_fqcn: FQCN of the file owner.
       ref_id: reference ID of the file to be downloaded.
       per_request_timeout: timeout for requests sent to the file owner.
       cell: cell to be used for communicating to the file owner.
       location: dir for keeping the received file. If not specified, will use temp dir.
       secure: P2P private mode for communication
       optional: suppress log messages of communication
       abort_signal: signal for aborting download.

   Returns: tuple of (error message if any, full path of the downloaded file).
   """

The arguments are self-explanatory. If the downloading is successful, you will get the full path to the downloaded file. It’s up to you what to do with the file.

Large Object Serialization with File Streaming or Download

please refer to Decomposer for Large Objects