ReadingService¶
ReadingService
handles in-place modification of DataPipe
graph based on different use cases.
Features¶
Dynamic Sharding¶
Dynamic sharding is achieved by PrototypeMultiProcessingReadingService
and DistributedReadingService
to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of DataPipe
letting users to define the sharding place within the pipeline.
sharding_filter
: When the pipeline is replicable, each distributed/multiprocessing worker loads data from one replica of theDataPipe
graph, and skip the data not blonged to the corresponding worker at the place ofsharding_filter
.sharding_round_robin_dispatch
: When there is anysharding_round_robin_dispatch
DataPipe
in the pipeline, that branch will be treated as a non-replicable branch. Then, a single dispatching process will be created to load data from the non-repliable branch and distributed data to the subsequent worker processes.
The following is an example of having two types of sharding strategies in the pipeline.
![digraph Example {
subgraph cluster_replicable {
label="Replicable"
a -> b -> c -> d -> l;
color=blue;
}
subgraph cluster_non_replicable {
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="Non-Replicable"
e -> f -> g -> k;
h -> i -> j -> k;
}
k -> l -> fullsync -> end;
a [label="DP1"];
b [label="shuffle"];
c [label="sharding_filter", color=blue];
d [label="DP4"];
e [label="DP2"];
f [label="shuffle"];
g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
h [label="DP3"];
i [label="shuffle"];
j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
k [label="DP5 (Lowest common ancestor)"];
l [label="DP6"];
fullsync;
end [shape=box];
}](_images/graphviz-a4b77ae8d32185927d8707cd0b25fc9226103ca2.png)
When multiprocessing takes place, the graph becomes:
![digraph Example {
subgraph cluster_worker_0 {
label="Worker 0"
a0 -> b0 -> c0 -> d0 -> l0;
m0 -> l0;
color=blue;
}
subgraph cluster_worker_1 {
label="Worker 1"
a1 -> b1 -> c1 -> d1 -> l1;
m1 -> l1;
color=blue;
}
subgraph cluster_non_replicable {
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="Non-Replicable"
e -> f -> g -> k;
h -> i -> j -> k;
k -> round_robin_demux;
}
round_robin_demux -> m0;
round_robin_demux -> m1;
l0 -> n;
l1 -> n;
n -> fullsync -> end;
a0 [label="DP1"];
b0 [label="shuffle"];
c0 [label="sharding_filter", color=blue];
d0 [label="DP4"];
a1 [label="DP1"];
b1 [label="shuffle"];
c1 [label="sharding_filter", color=blue];
d1 [label="DP4"];
e [label="DP2"];
f [label="shuffle"];
g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
h [label="DP3"];
i [label="shuffle"];
j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
k [label="DP5 (Lowest common ancestor)"];
fullsync;
l0 [label="DP6"];
l1 [label="DP6"];
m0 [label="Client"]
m1 [label="Client"]
n [label="Client"]
end [shape=box];
}](_images/graphviz-8581116405d37f067d4dfa1c6bb711728c59d81e.png)
Client
in the graph is a DataPipe
that send request and receive response from multiprocessing queues.
Determinism¶
In DataLoader2
, a SeedGenerator
becomes a single source of randomness and each ReadingService
would access to it via initialize_iteration()
and generate corresponding random seeds for random DataPipe
operations.
In order to make sure that the Dataset shards are mutually exclusive and collectively exhaunsitve on multiprocessing processes and distributed nodes, PrototypeMultiProcessingReadingService
and DistributedReadingService
would help DataLoader2
to synchronize random states for any random DataPipe
operation prior to sharding_filter
or sharding_round_robin_dispatch
. For the remaining DataPipe
operations after sharding, unique random states are generated based on the distributed rank and worker process id by each ReadingService
, in order to perform different random transformations.
Graph Mode¶
This also allows easier transition of data-preprocessing pipeline from research to production. After the DataPipe
graph is created and validated with the ReadingServices
, a different ReadingService
that configures and connects to the production service/infra such as AIStore
can be provided to DataLoader2
as a drop-in replacement. The ReadingService
could potentially search the graph, and find DataPipe
operations that can be delegated to the production service/infra, then modify the graph correspondingly to achieve higher-performant execution.
Extend ReadingService¶
The followings are interfaces for custom ReadingService
.
- class torchdata.dataloader2.ReadingServiceInterface¶
Interface for
ReadingService
. Please extend customReadingService
based on this interface class.ReadingService must be picklable prior to
initialize
being called. This is because a copy of it will be created byDataLoader2
to avoid the situation where the same ReadingService object is used by multipleDataLoader2
, and its internal state will be modifiable by each of them.As a result of this constraint, certain initialization steps may need to take place within the
initialize
method rather than__init__
of the ReadingService class.- finalize() None ¶
ReadingService
cleans up internal states and fully shuts down the service. Called inDataLoader2
’sshutdown
and__del__
.
- finalize_iteration() None ¶
ReadingService
ends service after an epoch is finished. Called when the iterator ofDataLoader2
is depleted.
- abstract initialize(datapipe: IterDataPipe | MapDataPipe) IterDataPipe | MapDataPipe ¶
ReadingService
takes aDataPipe
graph, adapts it into a newDataPipe
graph based on the custom need. Called once in creatingDataLoader2
iterator at first time. Prior to calling this method, theReadingService
object must be picklable.- Parameters:
datapipe – Original
DataPipe
graph.- Returns:
An adapted or a new
DataPipe
graph.
- initialize_iteration(seed_generator: SeedGenerator) None ¶
ReadingService
spins up service for an epoch. Called at the beginning of every time gettingDataLoader2
iterator.- Parameters:
seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will governs the determinism for all of random operations with the graph of DataPipes.
Example
MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.
The checkpoint/snapshotting feature is a work in progress. Here is the preliminary interface (small changes are likely):
- class torchdata.dataloader2.CheckpointableReadingServiceInterface¶
Extend
ReadingServiceInterface
with two additional methods to save/restore the state of the data-processing graph.- abstract checkpoint() bytes ¶
ReadingService
serializes the internal states. Called inDataLoader2.state_dict
.
- abstract restore(datapipe: IterDataPipe | MapDataPipe, serialized_state: bytes) IterDataPipe | MapDataPipe ¶
ReadingService
adaptsDataPipe
graph based on the serialized state. Called once in creatingDataLoader2
iterator at first time. Counterpart ofinitialize
, which adaptDataPipe
graph from scratch.- Parameters:
datapipe – original
DataPipe
graph before adapted byReadingService
serialized_state – The serialized state of internal state used to restore the state of the adapted
DataPipe
graph.
- Returns:
Adapted
DataPipe
generated from the serialized state.
Graph Functions¶
And, graph utility functions are provided in torchdata.dataloader.graph
to help users to do DataPipe
graph rewrite for custom ReadingService
:
Traverse the DataPipes and their attributes to extract the DataPipe graph. |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |