pyiter.parallel_mapping
1from __future__ import annotations 2from typing import Callable, Iterable, Iterator, Literal, Optional 3from .transform import Transform, T, U 4 5 6class ParallelMappingTransform(Transform[T, U]): 7 """A transform that applies a function to each element of an iterable in parallel.""" 8 9 Executor = Literal["Thread", "Process"] 10 11 def __init__( 12 self, 13 iter: Iterable[T], 14 transform: Callable[[T], U], 15 max_workers: Optional[int] = None, 16 chunksize: int = 1, 17 executor: ParallelMappingTransform.Executor = "Thread", 18 ): 19 super().__init__(iter) 20 self.transform = transform 21 self.max_workers = max_workers 22 self.executor = executor 23 self.chunksize = chunksize 24 25 def __do_iter__(self) -> Iterator[U]: 26 import os 27 from .sequence import it 28 29 def create_executor(max_workers: int): 30 if self.executor == "Process": 31 from concurrent.futures import ProcessPoolExecutor 32 33 return ProcessPoolExecutor(max_workers=max_workers) 34 else: 35 from concurrent.futures import ThreadPoolExecutor 36 37 return ThreadPoolExecutor( 38 max_workers=max_workers, thread_name_prefix="PyIter worker" 39 ) 40 41 chunksize = self.chunksize 42 max_workers = self.max_workers or min(32, (os.cpu_count() or 1) + 4) 43 batch_size = max_workers * chunksize 44 45 for batch in it(self.iter).chunked(batch_size): 46 with create_executor(max_workers) as executor: 47 yield from executor.map(self.transform, batch, chunksize=chunksize)
7class ParallelMappingTransform(Transform[T, U]): 8 """A transform that applies a function to each element of an iterable in parallel.""" 9 10 Executor = Literal["Thread", "Process"] 11 12 def __init__( 13 self, 14 iter: Iterable[T], 15 transform: Callable[[T], U], 16 max_workers: Optional[int] = None, 17 chunksize: int = 1, 18 executor: ParallelMappingTransform.Executor = "Thread", 19 ): 20 super().__init__(iter) 21 self.transform = transform 22 self.max_workers = max_workers 23 self.executor = executor 24 self.chunksize = chunksize 25 26 def __do_iter__(self) -> Iterator[U]: 27 import os 28 from .sequence import it 29 30 def create_executor(max_workers: int): 31 if self.executor == "Process": 32 from concurrent.futures import ProcessPoolExecutor 33 34 return ProcessPoolExecutor(max_workers=max_workers) 35 else: 36 from concurrent.futures import ThreadPoolExecutor 37 38 return ThreadPoolExecutor( 39 max_workers=max_workers, thread_name_prefix="PyIter worker" 40 ) 41 42 chunksize = self.chunksize 43 max_workers = self.max_workers or min(32, (os.cpu_count() or 1) + 4) 44 batch_size = max_workers * chunksize 45 46 for batch in it(self.iter).chunked(batch_size): 47 with create_executor(max_workers) as executor: 48 yield from executor.map(self.transform, batch, chunksize=chunksize)
A transform that applies a function to each element of an iterable in parallel.