PrototypeMultiProcessingReadingService¶
- class torchdata.dataloader2.PrototypeMultiProcessingReadingService(num_workers: int = 0, multiprocessing_context: str | None = None, worker_prefetch_cnt: int = 10, main_prefetch_cnt: int = 10, worker_init_fn: Callable[[IterDataPipe | MapDataPipe, WorkerInfo], IterDataPipe | MapDataPipe] | None = None, worker_reset_fn: Callable[[IterDataPipe | MapDataPipe, WorkerInfo, SeedGenerator], IterDataPipe | MapDataPipe] | None = None)¶
Spawns multiple worker processes to load data from the
DataPipe
graph. If any non-replicableDataPipe
(sharding_round_robin_dispatch
) is presented in the graph, a separate dispatching process will be created to load data from the lowest common ancestor of all non-replicableDataPipes
and distributes data to each worker process in the round-robin manner Then, the subsequentDataPipe
graph in each worker process will process the data from the dispatching process and eventually return the result to the main process.- Parameters:
num_workers (int, optional) – How many subprocesses to use for data loading.
0
will be replaced byInProcessReadingService
in the future.multiprocessing_context (str, optional) – Multiprocessing starting method. If method is None then the default context is returned. Otherwise, method should be ‘fork’, ‘spawn’.
worker_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of each worker process.
main_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of the whole pipeline in the main process.
worker_init_fn – (Callable, optional): Function to be called when each worker process launches with
DataPipe
andWorkerInfo
as the expected arguments.worker_reset_fn – (Callable, optional): Function to be called at the beginning of each epoch in each worker process with
DataPipe
,WorkerInfo
andSeedGenerator
as the expected arguments.
Note
This
ReadingService
is still in prototype mode and will replaceMultiProcessingReadingService
.It currently does both distributed and multiprocessing sharding over the pipeline. The distributed-related code is going to be removed when
SequentialReadingService
is provided to combine theDistributedReadingService
and thisReadingService
.
- finalize() None ¶
PrototypeMultiProcessingReadingService
invalidate states & properly exits all subprocesses.
- initialize(datapipe: IterDataPipe | MapDataPipe) IterDataPipe | MapDataPipe ¶
PrototypeMultiProcessingReadingService
finds information about sharding, separates graph by multiple pieces and reconnects it using queues. creates subprocesses.
- initialize_iteration(seed_generator: SeedGenerator) None ¶
ReadingService
spins up service for an epoch. Called at the beginning of every time gettingDataLoader2
iterator.- Parameters:
seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will governs the determinism for all of random operations with the graph of DataPipes.
Example
MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.