pyiter.parallel_mapping

 1from __future__ import annotations
 2from typing import Callable, Iterable, Iterator, Literal, Optional
 3from .transform import Transform, T, U
 4
 5
 6class ParallelMappingTransform(Transform[T, U]):
 7    """A transform that applies a function to each element of an iterable in parallel."""
 8
 9    Executor = Literal["Thread", "Process"]
10
11    def __init__(
12        self,
13        iter: Iterable[T],
14        transform: Callable[[T], U],
15        max_workers: Optional[int] = None,
16        chunksize: int = 1,
17        executor: ParallelMappingTransform.Executor = "Thread",
18    ):
19        super().__init__(iter)
20        self.transform = transform
21        self.max_workers = max_workers
22        self.executor = executor
23        self.chunksize = chunksize
24
25    def __do_iter__(self) -> Iterator[U]:
26        import os
27        from .sequence import it
28
29        def create_executor(max_workers: int):
30            if self.executor == "Process":
31                from concurrent.futures import ProcessPoolExecutor
32
33                return ProcessPoolExecutor(max_workers=max_workers)
34            else:
35                from concurrent.futures import ThreadPoolExecutor
36
37                return ThreadPoolExecutor(
38                    max_workers=max_workers, thread_name_prefix="PyIter worker"
39                )
40
41        chunksize = self.chunksize
42        max_workers = self.max_workers or min(32, (os.cpu_count() or 1) + 4)
43        batch_size = max_workers * chunksize
44
45        for batch in it(self.iter).chunked(batch_size):
46            with create_executor(max_workers) as executor:
47                yield from executor.map(self.transform, batch, chunksize=chunksize)
class ParallelMappingTransform(pyiter.transform.Transform[~T, ~U]):
 7class ParallelMappingTransform(Transform[T, U]):
 8    """A transform that applies a function to each element of an iterable in parallel."""
 9
10    Executor = Literal["Thread", "Process"]
11
12    def __init__(
13        self,
14        iter: Iterable[T],
15        transform: Callable[[T], U],
16        max_workers: Optional[int] = None,
17        chunksize: int = 1,
18        executor: ParallelMappingTransform.Executor = "Thread",
19    ):
20        super().__init__(iter)
21        self.transform = transform
22        self.max_workers = max_workers
23        self.executor = executor
24        self.chunksize = chunksize
25
26    def __do_iter__(self) -> Iterator[U]:
27        import os
28        from .sequence import it
29
30        def create_executor(max_workers: int):
31            if self.executor == "Process":
32                from concurrent.futures import ProcessPoolExecutor
33
34                return ProcessPoolExecutor(max_workers=max_workers)
35            else:
36                from concurrent.futures import ThreadPoolExecutor
37
38                return ThreadPoolExecutor(
39                    max_workers=max_workers, thread_name_prefix="PyIter worker"
40                )
41
42        chunksize = self.chunksize
43        max_workers = self.max_workers or min(32, (os.cpu_count() or 1) + 4)
44        batch_size = max_workers * chunksize
45
46        for batch in it(self.iter).chunked(batch_size):
47            with create_executor(max_workers) as executor:
48                yield from executor.map(self.transform, batch, chunksize=chunksize)

A transform that applies a function to each element of an iterable in parallel.

Executor = typing.Literal['Thread', 'Process']
transform
max_workers
executor
chunksize