Shortcuts

PrototypeMultiProcessingReadingService

class torchdata.dataloader2.PrototypeMultiProcessingReadingService(num_workers: int = 0, multiprocessing_context: str | None = None, worker_prefetch_cnt: int = 10, main_prefetch_cnt: int = 10, worker_init_fn: Callable[[IterDataPipe | MapDataPipe, WorkerInfo], IterDataPipe | MapDataPipe] | None = None, worker_reset_fn: Callable[[IterDataPipe | MapDataPipe, WorkerInfo, SeedGenerator], IterDataPipe | MapDataPipe] | None = None)

Spawns multiple worker processes to load data from the DataPipe graph. If any non-replicable DataPipe (sharding_round_robin_dispatch) is presented in the graph, a separate dispatching process will be created to load data from the lowest common ancestor of all non-replicable DataPipes and distributes data to each worker process in the round-robin manner Then, the subsequent DataPipe graph in each worker process will process the data from the dispatching process and eventually return the result to the main process.

Parameters:
  • num_workers (int, optional) – How many subprocesses to use for data loading. 0 will be replaced by InProcessReadingService in the future.

  • multiprocessing_context (str, optional) – Multiprocessing starting method. If method is None then the default context is returned. Otherwise, method should be ‘fork’, ‘spawn’.

  • worker_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of each worker process.

  • main_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of the whole pipeline in the main process.

  • worker_init_fn – (Callable, optional): Function to be called when each worker process launches with DataPipe and WorkerInfo as the expected arguments.

  • worker_reset_fn – (Callable, optional): Function to be called at the beginning of each epoch in each worker process with DataPipe, WorkerInfo and SeedGenerator as the expected arguments.

Note

  • This ReadingService is still in prototype mode and will replace MultiProcessingReadingService.

  • It currently does both distributed and multiprocessing sharding over the pipeline. The distributed-related code is going to be removed when SequentialReadingService is provided to combine the DistributedReadingService and this ReadingService.

finalize() None

PrototypeMultiProcessingReadingService invalidate states & properly exits all subprocesses.

initialize(datapipe: IterDataPipe | MapDataPipe) IterDataPipe | MapDataPipe

PrototypeMultiProcessingReadingService finds information about sharding, separates graph by multiple pieces and reconnects it using queues. creates subprocesses.

initialize_iteration(seed_generator: SeedGenerator) None

ReadingService spins up service for an epoch. Called at the beginning of every time getting DataLoader2 iterator.

Parameters:

seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will governs the determinism for all of random operations with the graph of DataPipes.

Example

MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources