ReadingService¶
ReadingService handles in-place modification of DataPipe graph based on different use cases.
Features¶
Dynamic Sharding¶
Dynamic sharding is achieved by PrototypeMultiProcessingReadingService and DistributedReadingService to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of DataPipe letting users to define the sharding place within the pipeline.
sharding_filter: When the pipeline is replicable, each distributed/multiprocessing worker loads data from one replica of theDataPipegraph, and skip the data not blonged to the corresponding worker at the place ofsharding_filter.sharding_round_robin_dispatch: When there is anysharding_round_robin_dispatchDataPipein the pipeline, that branch will be treated as a non-replicable branch. Then, a single dispatching process will be created to load data from the non-repliable branch and distributed data to the subsequent worker processes.
The following is an example of having two types of sharding strategies in the pipeline.
![digraph Example {
subgraph cluster_replicable {
label="Replicable"
a -> b -> c -> d -> l;
color=blue;
}
subgraph cluster_non_replicable {
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="Non-Replicable"
e -> f -> g -> k;
h -> i -> j -> k;
}
k -> l -> fullsync -> end;
a [label="DP1"];
b [label="shuffle"];
c [label="sharding_filter", color=blue];
d [label="DP4"];
e [label="DP2"];
f [label="shuffle"];
g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
h [label="DP3"];
i [label="shuffle"];
j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
k [label="DP5 (Lowest common ancestor)"];
l [label="DP6"];
fullsync;
end [shape=box];
}](_images/graphviz-a4b77ae8d32185927d8707cd0b25fc9226103ca2.png)
When multiprocessing takes place, the graph becomes:
![digraph Example {
subgraph cluster_worker_0 {
label="Worker 0"
a0 -> b0 -> c0 -> d0 -> l0;
m0 -> l0;
color=blue;
}
subgraph cluster_worker_1 {
label="Worker 1"
a1 -> b1 -> c1 -> d1 -> l1;
m1 -> l1;
color=blue;
}
subgraph cluster_non_replicable {
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="Non-Replicable"
e -> f -> g -> k;
h -> i -> j -> k;
k -> round_robin_demux;
}
round_robin_demux -> m0;
round_robin_demux -> m1;
l0 -> n;
l1 -> n;
n -> fullsync -> end;
a0 [label="DP1"];
b0 [label="shuffle"];
c0 [label="sharding_filter", color=blue];
d0 [label="DP4"];
a1 [label="DP1"];
b1 [label="shuffle"];
c1 [label="sharding_filter", color=blue];
d1 [label="DP4"];
e [label="DP2"];
f [label="shuffle"];
g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
h [label="DP3"];
i [label="shuffle"];
j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
k [label="DP5 (Lowest common ancestor)"];
fullsync;
l0 [label="DP6"];
l1 [label="DP6"];
m0 [label="Client"]
m1 [label="Client"]
n [label="Client"]
end [shape=box];
}](_images/graphviz-8581116405d37f067d4dfa1c6bb711728c59d81e.png)
Client in the graph is a DataPipe that send request and receive response from multiprocessing queues.
Determinism¶
In DataLoader2, a SeedGenerator becomes a single source of randomness and each ReadingService would access to it via initialize_iteration() and generate corresponding random seeds for random DataPipe operations.
In order to make sure that the Dataset shards are mutually exclusive and collectively exhaunsitve on multiprocessing processes and distributed nodes, PrototypeMultiProcessingReadingService and DistributedReadingService would help DataLoader2 to synchronize random states for any random DataPipe operation prior to sharding_filter or sharding_round_robin_dispatch. For the remaining DataPipe operations after sharding, unique random states are generated based on the distributed rank and worker process id by each ReadingService, in order to perform different random transformations.
Graph Mode¶
This also allows easier transition of data-preprocessing pipeline from research to production. After the DataPipe graph is created and validated with the ReadingServices, a different ReadingService that configures and connects to the production service/infra such as AIStore can be provided to DataLoader2 as a drop-in replacement. The ReadingService could potentially search the graph, and find DataPipe operations that can be delegated to the production service/infra, then modify the graph correspondingly to achieve higher-performant execution.
Extend ReadingService¶
The followings are interfaces for custom ReadingService.
- class torchdata.dataloader2.ReadingServiceInterface¶
Interface for
ReadingService. Please extend customReadingServicebased on this interface class.ReadingService must be picklable prior to
initializebeing called. This is because a copy of it will be created byDataLoader2to avoid the situation where the same ReadingService object is used by multipleDataLoader2, and its internal state will be modifiable by each of them.As a result of this constraint, certain initialization steps may need to take place within the
initializemethod rather than__init__of the ReadingService class.- finalize() None¶
ReadingServicecleans up internal states and fully shuts down the service. Called inDataLoader2’sshutdownand__del__.
- finalize_iteration() None¶
ReadingServiceends service after an epoch is finished. Called when the iterator ofDataLoader2is depleted.
- abstract initialize(datapipe: IterDataPipe | MapDataPipe) IterDataPipe | MapDataPipe¶
ReadingServicetakes aDataPipegraph, adapts it into a newDataPipegraph based on the custom need. Called once in creatingDataLoader2iterator at first time. Prior to calling this method, theReadingServiceobject must be picklable.- Parameters:
datapipe – Original
DataPipegraph.- Returns:
An adapted or a new
DataPipegraph.
- initialize_iteration(seed_generator: SeedGenerator) None¶
ReadingServicespins up service for an epoch. Called at the beginning of every time gettingDataLoader2iterator.- Parameters:
seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will governs the determinism for all of random operations with the graph of DataPipes.
Example
MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.
The checkpoint/snapshotting feature is a work in progress. Here is the preliminary interface (small changes are likely):
- class torchdata.dataloader2.CheckpointableReadingServiceInterface¶
Extend
ReadingServiceInterfacewith two additional methods to save/restore the state of the data-processing graph.- abstract checkpoint() bytes¶
ReadingServiceserializes the internal states. Called inDataLoader2.state_dict.
- abstract restore(datapipe: IterDataPipe | MapDataPipe, serialized_state: bytes) IterDataPipe | MapDataPipe¶
ReadingServiceadaptsDataPipegraph based on the serialized state. Called once in creatingDataLoader2iterator at first time. Counterpart ofinitialize, which adaptDataPipegraph from scratch.- Parameters:
datapipe – original
DataPipegraph before adapted byReadingServiceserialized_state – The serialized state of internal state used to restore the state of the adapted
DataPipegraph.
- Returns:
Adapted
DataPipegenerated from the serialized state.
Graph Functions¶
And, graph utility functions are provided in torchdata.dataloader.graph to help users to do DataPipe graph rewrite for custom ReadingService:
Traverse the DataPipes and their attributes to extract the DataPipe graph. |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |