### Initialize a Pipeline Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/01-playground.ipynb Demonstrates the basic initialization of a Pipeline object from the thecodecrate_pipeline library. This is the starting point for creating any pipeline. ```python from thecodecrate_pipeline import Pipeline pipeline = Pipeline() ``` -------------------------------- ### Simple Pipeline Usage with Lambda Functions Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/01-playground.ipynb Shows a basic pipeline setup using lambda functions for transformations. The pipeline takes an integer, performs two addition operations, and then formats the result as a string. It requires the `Pipeline` class from `thecodecrate_pipeline`. ```python # simple usage from thecodecrate_pipeline import ( Pipeline, ) pipeline = ( Pipeline[int, str]() .pipe(lambda x: x + 1) .pipe(lambda x: x + 1) .pipe(lambda x: f"result is {x}") ) # 'result is 3' await pipeline.process(1) ``` -------------------------------- ### Install TheCodeCrate Pipeline Package Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md The installation command using pip to add the thecodecrate-pipeline package to your Python environment. This is the first step to using the pipeline pattern. ```bash pip install thecodecrate-pipeline ``` -------------------------------- ### Pipeline with Varying Type Hints (Python) Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Shows a pipeline configured to handle different input and output types. This example takes an integer and converts it into a formatted string. ```python pipeline = ( Pipeline[int, str]() .pipe(lambda payload: f"Number: {payload}") ) # Returns "Number: 10" await pipeline.process(10) ``` -------------------------------- ### Pipeline with Nested Pipelines as Stages Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/01-playground.ipynb Demonstrates composing pipelines by using one pipeline as a stage within another. This example nests a pipeline that multiplies by three and adds five, with a previously defined pipeline that doubles and adds one. Requires `StageInterface` from `thecodecrate_pipeline`. ```python # with pipelines as stages class TimesThreeStage(StageInterface[int]): async def __call__(self, payload: int) -> int: return payload * 3 class AddFiveStage(StageInterface[int]): async def __call__(self, payload: int) -> int: return payload + 5 pipeline = ( (Pipeline[int]()).pipe(TimesThreeStage()).pipe(my_pipeline).pipe(AddFiveStage()) ) # returns 36 # 5 * 3 = 15 * 2 + 1 = 31 + 5 = 36 result = await pipeline.process(5) assert result == 36 result ``` -------------------------------- ### Type Transformation Pipeline in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Illustrates a pipeline that transforms data types across stages. This example starts with an integer, adds 1 to it, and then converts the result into a formatted string. ```python from thecodecrate_pipeline import Pipeline # Pipeline that transforms int to string pipeline = ( Pipeline[int, str]() .pipe(lambda x: x + 1) .pipe(lambda payload: f"total is {payload}!") ) # result = await pipeline.process(10) # Returns: "total is 11!" ``` -------------------------------- ### Create Pipeline with Custom Processor Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/03-pipeline-factory.ipynb Illustrates creating a pipeline factory that includes a custom processor. The example defines a `MyProcessor` class inheriting from `ChainedProcessor`, adds it to the factory, and then verifies its presence in the created pipeline instance. The pipeline processes data similarly to previous examples. ```python from thecodecrate_pipeline import PipelineFactory from thecodecrate_pipeline.processors import ChainedProcessor from thecodecrate_pipeline.types import CallableCollection # create factory with a processor class MyProcessor(ChainedProcessor): pass some_stages: CallableCollection = ( (lambda x: x + 1), (lambda x: x + 1), (lambda x: f"result is {x}"), ) pipeline_factory = ( (PipelineFactory[int, str]()).with_stages(some_stages).with_processor(MyProcessor()) ) # create and process pipeline = pipeline_factory.make() result = await pipeline.process(10) print(result) # check the processor processor_instance = pipeline.get_processor_instance() assert processor_instance.__class__ == MyProcessor print(processor_instance) ``` -------------------------------- ### Configure PipelineFactory with Custom Processor Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Demonstrates how to configure a PipelineFactory with a custom processor. This allows for specific pre-processing or post-processing logic to be applied. The example defines a `MyProcessor` and integrates it into the pipeline factory. ```python from thecodecrate_pipeline import PipelineFactory, Stage from thecodecrate_pipeline.processors import ChainedProcessor class MyProcessor(ChainedProcessor): """Custom processor implementation.""" pass class AddOneStage(Stage[int, int]): async def __call__(self, x: int) -> int: return x + 1 # Create factory with processor pipeline_factory = ( PipelineFactory[int, str]() .with_stages(( AddOneStage, lambda x: x + 1, lambda x: f"result is {x}", )) .with_processor(MyProcessor()) ) # Build and verify processor type pipeline = pipeline_factory.make() result = await pipeline.process(10) # Returns: "result is 12" # Verify processor instance assert pipeline.get_processor_instance().__class__ == MyProcessor ``` -------------------------------- ### Class-Based Stage Implementation (Python) Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Shows how to define and use class-based stages that implement the StageInterface. This example uses `TimesTwoStage` and `AddOneStage` to process an integer payload. ```python class TimesTwoStage(StageInterface[int, int]): async def __call__(self, payload: int) -> int: return payload * 2 class AddOneStage(StageInterface[int, int]): async def __call__(self, payload: int) -> int: return payload + 1 pipeline = ( Pipeline[int, int]() .pipe(TimesTwoStage()) .pipe(AddOneStage()) ) # Returns 21 await pipeline.process(10) ``` -------------------------------- ### Indexed Stages with Custom Arguments Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/01-playground.ipynb Explains and demonstrates stages that accept custom arguments during processing, specifically the index of the stage in the processing queue. This involves custom `IndexedStage`, `IndexedProcessor`, and `IndexedPipeline` classes. Requires `Processor`, `Stage`, and `StageDefinitionCollection` from `thecodecrate_pipeline`. ```python ### ## Stages with Custom Arguments ## ## On this example, the stages have the current index ## of the processing queue ### from abc import abstractmethod from typing import Any, Awaitable, Callable, Concatenate, cast from thecodecrate_pipeline import Processor, Stage from thecodecrate_pipeline.types import T_in, T_out, StageDefinitionCollection class IndexedStage(Stage[T_in, T_out]): @abstractmethod async def __call__( self, payload: T_in, /, tag: int, ) -> T_out: pass IndexedPipelineCallable = ( IndexedStage[T_in, T_out] | Callable[Concatenate[T_in, ...], Awaitable[T_out]] | Callable[Concatenate[T_in, ...], T_out] ) class IndexedProcessor(Processor[T_in, T_out]): async def process( self, payload: T_in, stages: StageDefinitionCollection, ) -> T_out: index = 0 payload_out: Any = payload for stage in stages: payload_out = await self._call( callable=stage, payload=payload_out, index=index, ) index += 1 return cast(T_out, payload_out) class IndexedPipeline(Pipeline[T_in, T_out]): processor = IndexedProcessor ### ## Usage ### class MyIndexedStage(IndexedStage[str]): # type-hinting: change `index` type to see an error async def __call__(self, payload: str, index: int) -> str: return f"{payload}: {index}" indexed_pipeline = ( (IndexedPipeline[str]()).pipe(MyIndexedStage()).pipe(MyIndexedStage()) ) assert await indexed_pipeline.process("test") == "test: 0: 1" # returns "hello: 0: 1" result = await indexed_pipeline.process("hello") assert result == "hello: 0: 1" result ``` -------------------------------- ### Composable Pipeline Usage (Python) Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Demonstrates the composability of pipelines, where one pipeline can be used as a stage within another. This example composes pipelines for API request processing. ```python process_api_request = ( Pipeline() .pipe(ExecuteHttpRequest()) .pipe(ParseJsonResponse()) ) pipeline = ( Pipeline() .pipe(ConvertToPsr7Request()) .pipe(process_api_request) .pipe(ConvertToResponseDto()) ) await pipeline.process(DeleteBlogPost(post_id)) ``` -------------------------------- ### Pipeline with Class-Based Stages Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/01-playground.ipynb Illustrates creating a pipeline using custom class-based stages that inherit from `StageInterface`. Each stage performs a specific operation (multiplying by two, adding one). Requires `StageInterface` from `thecodecrate_pipeline`. ```python # with class-based stages from thecodecrate_pipeline import ( StageInterface, ) class TimesTwoStage(StageInterface[int]): async def __call__(self, payload: int) -> int: return payload * 2 class AddOneStage(StageInterface[int]): async def __call__(self, payload: int) -> int: return payload + 1 my_pipeline = (Pipeline[int]()).pipe(TimesTwoStage()).pipe(AddOneStage()) # returns 21 result = await my_pipeline.process(10) assert result == 21 result ``` -------------------------------- ### Combine Mixed Stage Types in a Pipeline Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Illustrates the flexibility of the pipeline system by combining various types of stages, including stage instances, stage classes, regular functions, async functions, and even other pipelines. This allows for complex and modular pipeline construction. The example defines `MyPipeline` with a mix of these types. ```python from thecodecrate_pipeline import Pipeline, Stage class TimesThreeStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload * 3 class AddOneStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload + 1 class TimesTwoStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload * 2 # Create sub-pipeline sub_pipeline = ( Pipeline[int]() .pipe(TimesTwoStage()) .pipe(AddOneStage()) ) # Define functions def add_seven(payload: int) -> int: return payload + 7 async def sub_three_async(payload: int) -> int: return payload - 3 # Combine all stage types class MyPipeline(Pipeline[int]): stages = ( TimesThreeStage(), # Stage instance sub_pipeline, # Pipeline as stage AddOneStage, # Stage class add_seven, # Regular function sub_three_async, # Async function ) result = await MyPipeline().process(5) # Returns: 36 # Breakdown: 5 * 3 = 15 # (15 * 2) + 1 = 31 # 31 + 1 = 32 # 32 + 7 = 39 # 39 - 3 = 36 ``` -------------------------------- ### Declarative Pipeline Stages (Python) Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Illustrates defining pipeline stages declaratively as class attributes within a Pipeline subclass. This example defines stages using both class references and instances. ```python class MyPipeline(Pipeline[int, int]): stages = ( TimesTwoStage, # class TimesThreeStage(), # object ) # Process the payload through the pipeline with the declared stages result = await MyPipeline().process(5) ``` -------------------------------- ### Pipeline Immutability Example in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Demonstrates the immutability feature of the Pipeline class. Adding a new stage to an existing pipeline returns a new pipeline instance, leaving the original unchanged. ```python from thecodecrate_pipeline import Pipeline # Create initial pipeline pipeline = ( Pipeline[int]() .pipe(lambda payload: payload + 1) .pipe(lambda payload: payload + 1) ) # result1 = await pipeline.process(1) # Returns: 3 # Adding a stage returns a NEW pipeline pipeline2 = pipeline.pipe(lambda payload: payload + 1) # result2 = await pipeline.process(1) # Returns: 3 (Original pipeline unchanged) # result3 = await pipeline2.process(1) # Returns: 4 (New pipeline has additional stage) ``` -------------------------------- ### Async Pipeline Processing in Python Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/02-streams.ipynb This Python code defines an asynchronous pipeline with two stages: stage1 doubles the input integers and introduces a delay, while stage2 formats the integers into strings. The pipeline is then processed with an input stream, and the results are printed. ```python from time import sleep from typing import AsyncIterator from thecodecrate_pipeline import Pipeline async def stage1(stream: AsyncIterator[int]) -> AsyncIterator[int]: async for item in stream: yield item * 2 sleep(1) async def stage2(stream: AsyncIterator[int]) -> AsyncIterator[str]: async for item in stream: yield f"Number: {item}" pipeline = Pipeline[AsyncIterator[int], AsyncIterator[str]]().pipe(stage1).pipe(stage2) async def input_stream() -> AsyncIterator[int]: for i in range(5): yield i async def main(): stream = await pipeline.process(input_stream()) async for result in stream: print(result) await main() ``` -------------------------------- ### Handle Exceptions in Pipelines and Stages Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Illustrates how to handle exceptions that occur during pipeline execution, either at the pipeline level using a try-except block or within individual stages. The example creates a pipeline that intentionally causes a `ZeroDivisionError` and demonstrates its handling. ```python from thecodecrate_pipeline import Pipeline # Pipeline that may raise an exception pipeline = ( Pipeline[int]() .pipe(lambda payload: payload + 1) .pipe(lambda payload: payload / 0) # Division by zero .pipe(lambda payload: payload * 2) ) # Handle exception when processing try: result = await pipeline.process(10) except ZeroDivisionError as e: # Handle the exception appropriately print(f"Error: Division by zero occurred") # Error: Division by zero occurred ``` -------------------------------- ### Declarative Processor Setup in Python Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Sets a custom processor for a pipeline class declaratively by assigning it to the 'processor' attribute. This approach enhances readability and maintainability of pipeline configurations. Dependencies include the Pipeline class and a custom processor implementation. ```python from typing import TypeVar # Assuming Pipeline and T_in, T_out are defined elsewhere # class Pipeline[T_in, T_out]: # pass # T_in = TypeVar('T_in') # T_out = TypeVar('T_out') class MyCustomProcessor: def process(self, data): # Custom processing logic return data class MyPipeline(Pipeline[T_in, T_out]): processor = MyCustomProcessor() ``` -------------------------------- ### Interruptible Pipeline Usage Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/01-playground.ipynb Demonstrates the use of `InterruptiblePipeline` which allows processing to stop based on a condition. The provided condition function `continues_when_less_than_ten` stops the pipeline if the payload exceeds 10. Requires `InterruptiblePipeline` from `thecodecrate_pipeline.processors`. ```python from thecodecrate_pipeline.processors import InterruptiblePipeline def continues_when_less_than_ten(payload: int) -> bool: return payload > 10 pipeline = ( InterruptiblePipeline[int](continues_when_less_than_ten) .pipe(lambda payload: payload + 1) .pipe(lambda payload: payload * 2) .pipe(lambda payload: payload * 3) ) # returns 12 result = await pipeline.process(5) assert result == 12 result ``` -------------------------------- ### Process Async Iterators with Pipelines Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Demonstrates processing asynchronous iterators (streams) through pipeline stages. This is useful for handling data flows that are generated or consumed asynchronously. The example defines an input stream and two transformation stages, then processes the stream. ```python from typing import AsyncIterator from thecodecrate_pipeline import Pipeline import asyncio # Define async stream generator async def input_stream() -> AsyncIterator[int]: for i in range(5): yield i await asyncio.sleep(0.1) # Define stream transformation stages async def stage1(stream: AsyncIterator[int]) -> AsyncIterator[int]: async for item in stream: yield item * 2 async def stage2(stream: AsyncIterator[int]) -> AsyncIterator[str]: async for item in stream: yield f"Number: {item}" # Create streaming pipeline pipeline = ( Pipeline[AsyncIterator[int], AsyncIterator[str]]() .pipe(stage1) .pipe(stage2) ) # Process stream stream = await pipeline.process(input_stream()) async for result in stream: print(result) # Output: # Number: 0 # Number: 2 # Number: 4 # Number: 6 # Number: 8 ``` -------------------------------- ### Exception Handling in Python Pipelines Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Demonstrates exception handling within a pipeline execution. The package does not catch exceptions; they must be handled either within a stage or around the pipeline processing call. This example shows handling a ZeroDivisionError. ```python # Assuming Pipeline is defined elsewhere # class Pipeline: # def pipe(self, stage): # # ... # return self # async def process(self, payload): # # ... # pass pipeline = ( Pipeline() .pipe(lambda payload: payload / 0) # Stage that causes ZeroDivisionError ) try: # Replace with actual await call if pipeline.process is async # await pipeline.process(10) pass # Placeholder for actual execution except ZeroDivisionError as e: # Handle the exception. print(f"Caught expected exception: {e}") pass ``` -------------------------------- ### Pipeline with Stages Declared on the Class Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/01-playground.ipynb Shows how to define pipeline stages directly within a custom pipeline class by overriding the `stages` attribute. This allows for a more declarative pipeline definition. Requires `StageInterface` from `thecodecrate_pipeline`. ```python # with stages declared on the class class MyPipeline(Pipeline[int]): stages = ( TimesThreeStage(), my_pipeline, AddFiveStage, ) # returns 36 result = await MyPipeline().process(5) assert result == 36 result ``` -------------------------------- ### Override Pipeline Stages Using Constructor Parameters Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Shows how to override the default stages defined in a Pipeline class using constructor parameters. This provides flexibility in customizing pipeline behavior for different use cases without modifying the class definition. The example defines `MyPipeline` with default stages and then overrides them. ```python from thecodecrate_pipeline import Pipeline, Stage class TimesTwoStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload * 2 class TimesThreeStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload * 3 class AddOneStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload + 1 # Define pipeline with default stages class MyPipeline(Pipeline[int]): stages = ( TimesTwoStage, TimesThreeStage(), ) # Use default stages result1 = await MyPipeline().process(5) # Returns: 30 # (5 * 2) * 3 = 30 # Override stages in constructor custom_stages = ( AddOneStage, TimesTwoStage(), ) result2 = await MyPipeline(stages=custom_stages).process(5) # Returns: 12 # (5 + 1) * 2 = 12 ``` -------------------------------- ### Pipeline Using a Custom Processor (Python) Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Demonstrates how to instantiate a pipeline with a custom processor. This pipeline uses `MyCustomProcessor` and a lambda function to double the input. ```python pipeline = ( Pipeline[int, int]( processor=MyCustomProcessor(), ) .pipe(lambda x: x * 2) ) ``` -------------------------------- ### Basic Pipeline with Lambda Stages in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Demonstrates creating a simple pipeline using lambda functions for sequential data transformations. The pipeline takes an integer, adds 1, multiplies by 2, and adds 1 again. It supports asynchronous processing. ```python from thecodecrate_pipeline import Pipeline # Create a pipeline with lambda stages pipeline = ( Pipeline[int]() .pipe(lambda payload: payload + 1) .pipe(lambda payload: payload * 2) .pipe(lambda payload: payload + 1) ) # Process data through the pipeline # result = await pipeline.process(1) # Returns: 5 # result = await pipeline.process(2) # Returns: 7 ``` -------------------------------- ### Pipeline with Default Type Hinting (Python) Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Illustrates using type hinting with a pipeline where the input type is specified, but the output type defaults to the input type. This pipeline doubles an integer payload. ```python pipeline = ( Pipeline[int]() .pipe(lambda payload: payload * 2) ) # Returns 20 await pipeline.process(10) ``` -------------------------------- ### Basic Pipeline Execution Logic (Python) Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Illustrates the fundamental execution flow of a pipeline using a simple for loop. It shows how a payload is sequentially passed through each stage. ```python result = payload for stage in stages: result = stage(result) return result ``` -------------------------------- ### Functional Stage Pipeline Usage (Python) Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Demonstrates how to create and use a pipeline with functional stages (lambda functions). The pipeline processes a payload, multiplying it by 10. ```python pipeline = ( Pipeline() .pipe(lambda payload: payload * 10) ) # Returns 100 await pipeline.process(10) ``` -------------------------------- ### PipelineFactory for Predefined Pipeline Configurations in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Use PipelineFactory to create pipelines with predefined stages and configurations. Factories are mutable and allow adding stages dynamically. The `make` method instantiates a pipeline based on the factory's configuration. ```python from thecodecrate_pipeline import PipelineFactory, Stage class AddOneStage(Stage[int, int]): async def __call__(self, x: int) -> int: return x + 1 # Create factory with stages pipeline_factory = ( PipelineFactory[int, str]() .with_stages(( AddOneStage, # Stage class lambda x: x + 1, # Lambda function lambda x: f"result is {x}", # Type transformation )) ) # Build pipeline from factory pipeline = pipeline_factory.make() result = await pipeline.process(10) # Returns: "result is 12" # 10 + 1 + 1 = 12, then format # Factory is mutable - add more stages pipeline_factory.add_stage(lambda x: f"{x}!!!") # Create new pipeline with additional stage pipeline2 = pipeline_factory.make() result2 = await pipeline2.process(10) # Returns: "result is 12!!!" # Original pipeline unchanged result3 = await pipeline.process(10) # Returns: "result is 12" ``` -------------------------------- ### Nested Pipelines as Stages in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Shows how to compose pipelines by using one pipeline as a stage within another. This allows for modular and reusable pipeline structures for complex workflows. ```python from thecodecrate_pipeline import Pipeline # Create sub-pipelines sub_pipeline1 = ( Pipeline[int]() .pipe(lambda payload: payload * 2) .pipe(lambda payload: payload + 1) .pipe(lambda payload: payload * 3) ) sub_pipeline2 = ( Pipeline[int]() .pipe(lambda payload: payload + 1) .pipe(lambda payload: payload * 2) ) # Use pipelines as stages pipeline = ( Pipeline[int]() .pipe(lambda payload: payload * 3) .pipe(sub_pipeline1) # Pipeline as a stage .pipe(sub_pipeline2) # Another pipeline as a stage .pipe(lambda payload: payload * 2) ) # result = await pipeline.process(10) # Returns: 736 ``` -------------------------------- ### Custom Processor Implementation (Python) Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Defines a custom processor class `MyCustomProcessor` that can be used to customize pipeline execution logic. It iterates through stages and applies them to the payload. ```python class MyCustomProcessor(Processor[T_in, T_out]): async def process( self, payload: T_in, stages: StageInstanceCollection, ) -> T_out: # Custom processing logic for stage in stages: payload = await stage(payload) return payload ``` -------------------------------- ### Declarative Pipeline Definition in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Illustrates defining pipelines with stages declared as class attributes, offering a declarative approach to pipeline construction. This method can lead to more organized and readable pipeline configurations. ```python from thecodecrate_pipeline import Pipeline, Stage class TimesTwoStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload * 2 class AddOneStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload + 1 class TimesThreeStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload * 3 # Example of how this might be used (actual declarative syntax not fully shown in source): # class MyPipeline(Pipeline[int]): # stages = [ # TimesTwoStage, # AddOneStage, # TimesThreeStage # ] # pipeline = MyPipeline() # result = await pipeline.process(5) ``` -------------------------------- ### Pipeline Factory for Distributed Composition in Python Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Facilitates distributed composition of immutable pipelines using a PipelineFactory. It collects stages and allows building a pipeline at any time, with the ability to add more stages later. Input stages can be classes or objects. ```python # Assuming PipelineFactory, LogicalStage, AddOneStage, LastStage are defined elsewhere # class PipelineFactory: # def with_stages(self, stages): # # ... # return self # def add_stage(self, stage): # # ... # return self # def build(self): # # ... # return Pipeline() # Example placeholder classes/objects for illustration # class LogicalStage: # pass # class AddOneStage: # pass # class LastStage: # pass pipeline_factory = ( PipelineFactory() .with_stages([ LogicalStage, # class AddOneStage(), # object ]) ) # Additional stages can be added later pipeline_factory.add_stage(LastStage()) # Build the pipeline pipeline = pipeline_factory.build() ``` -------------------------------- ### Class-Based Stages Pipeline in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Shows how to define reusable data transformation stages as classes implementing the Stage interface. This pipeline multiplies by 2, adds 1, and then multiplies by 3, processing an integer payload asynchronously. ```python from thecodecrate_pipeline import Pipeline, Stage # Define custom stage classes class TimesTwoStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload * 2 class AddOneStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload + 1 class TimesThreeStage(Stage[int]): async def __call__(self, payload: int) -> int: return payload * 3 # Build pipeline with class-based stages pipeline = ( Pipeline[int]() .pipe(TimesTwoStage()) .pipe(AddOneStage()) .pipe(TimesThreeStage()) ) # result = await pipeline.process(5) # Returns: 33 ``` -------------------------------- ### Create and Process Pipeline with Stages Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/03-pipeline-factory.ipynb Defines a pipeline using a factory with predefined stages and processes data through it. It takes an integer, applies transformations, and returns a formatted string. The pipeline is immutable after creation. ```python from thecodecrate_pipeline import PipelineFactory from thecodecrate_pipeline.types import CallableCollection # define a factory with a few stages some_stages: CallableCollection = ( (lambda x: x + 1), (lambda x: x + 1), (lambda x: f"result is {x}"), ) pipeline_factory = PipelineFactory[int, str]().with_stages(some_stages) # create and process pipeline = pipeline_factory.make() result = await pipeline.process(10) # check the result assert result == "result is 12" print(result) ``` -------------------------------- ### Real-time Stream Processing with Python Pipelines Source: https://github.com/thecodecrate/python-pipeline/blob/main/README.md Processes asynchronous data streams in real-time using a pipeline of stages. Each stage can yield results consumed by the next, enabling efficient handling of data as it becomes available. Requires 'asyncio' and 'typing.AsyncIterator'. ```python from typing import AsyncIterator import asyncio # Assuming Pipeline is defined elsewhere # class Pipeline[T_in, T_out]: # def pipe(self, stage): # # ... # return self # async def process(self, stream): # # ... # return processed_stream async def input_stream() -> AsyncIterator[int]: for i in range(5): yield i async def stage1(stream: AsyncIterator[int]) -> AsyncIterator[int]: async for item in stream: yield item * 2 await asyncio.sleep(1) # Simulate processing delay async def stage2(stream: AsyncIterator[int]) -> AsyncIterator[str]: async for item in stream: yield f"Number: {item}" async def main(): pipeline = ( Pipeline[AsyncIterator[int], AsyncIterator[str]]() .pipe(stage1) .pipe(stage2) ) stream = await pipeline.process(input_stream()) async for result in stream: print(result) # Run the async main function # await main() # Uncomment to run ``` -------------------------------- ### Declarative PipelineFactory Definition in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Define a PipelineFactory declaratively by setting the `stages` class attribute. This approach simplifies factory creation when stages are fixed. The factory is then instantiated, and its `make` method is used to create pipeline instances. ```python from thecodecrate_pipeline import PipelineFactory, Stage class MyStage(Stage[int, str]): async def __call__(self, x: int) -> str: return f"result is {x}" # Declarative factory class MyPipelineFactory(PipelineFactory[int, str]): stages = (MyStage,) # Instantiate and use pipeline_factory = MyPipelineFactory() pipeline = pipeline_factory.make() result = await pipeline.process(10) # Returns: "result is 10" ``` -------------------------------- ### Chained Processor for Sequential Execution in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt The ChainedProcessor executes stages sequentially, either directly or via ChainedPipeline. Stages are added using the `pipe` method or provided as a tuple to the `process` method. It takes a payload and a collection of stages (lambdas or functions) as input. ```python from thecodecrate_pipeline import Pipeline from thecodecrate_pipeline.processors import ChainedPipeline, ChainedProcessor # Using ChainedPipeline (has ChainedProcessor built-in) pipeline = ( ChainedPipeline[int]() .pipe(lambda payload: payload + 1) .pipe(lambda payload: payload * 2) ) result = await pipeline.process(5) # Returns: 12 # (5 + 1) * 2 = 12 # Or use ChainedProcessor directly processor = ChainedProcessor[int]() result = await processor.process( payload=5, stages=( lambda payload: payload + 1, lambda payload: payload * 2, ), ) # Returns: 12 ``` -------------------------------- ### Declarative Pipeline Definition in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Define pipeline stages declaratively using a tuple of stage instances or classes. The pipeline automatically instantiates stage classes if provided. The `process` method executes the defined stages sequentially. ```python from thecodecrate_pipeline import Pipeline # Assume TimesThreeStage, TimesTwoStage, AddOneStage are defined elsewhere # Define stages declaratively class MyPipeline(Pipeline[int]): stages = ( TimesThreeStage(), # Stage instance TimesTwoStage, # Stage class (auto-instantiated) AddOneStage, # Stage class (auto-instantiated) ) # Use the declarative pipeline result = await MyPipeline().process(5) # Returns: 31 # (5 * 3 * 2) + 1 = 31 ``` -------------------------------- ### Custom Processor Implementation in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt Implement custom processors by inheriting from the base `Processor` class and overriding the `process` method. This allows for specialized execution strategies, such as modifying the final result after all stages have completed. ```python from typing import Any, cast from thecodecrate_pipeline import Pipeline, Processor from thecodecrate_pipeline.types import CallableCollection # Define custom processor class DoubleResultProcessor(Processor[int, int]): """Processor that doubles the final result.""" async def process( self, payload: int, stages: CallableCollection, *args: Any, **kwds: Any, ) -> int: # Process through all stages result = payload for stage in stages: result = await self._call(callable=stage, payload=result, *args, **kwds) # Double the final result return cast(int, result * 10) # Use custom processor processor = DoubleResultProcessor() pipeline = ( Pipeline[int](processor=processor) .pipe(lambda payload: payload + 1) .pipe(lambda payload: payload + 1) ) result = await pipeline.process(1) # Returns: 30 # ((1 + 1 + 1) = 3) * 10 = 30 ``` -------------------------------- ### Interruptible Processor for Conditional Pipeline Halting in Python Source: https://context7.com/thecodecrate/python-pipeline/llms.txt The InterruptibleProcessor allows pipeline execution to halt based on a provided condition. The `should_interrupt` function determines if a stage should be executed. It can be used with InterruptiblePipeline or directly with InterruptibleProcessor. ```python from thecodecrate_pipeline import Pipeline from thecodecrate_pipeline.processors import InterruptiblePipeline, InterruptibleProcessor # Define interruption condition def should_interrupt(payload: int) -> bool: return payload >= 10 # Stop when payload reaches 10 or more # Using InterruptiblePipeline pipeline = ( InterruptiblePipeline[int](should_interrupt) .pipe(lambda payload: payload + 2) # 5 + 2 = 7 .pipe(lambda payload: payload * 10) # 7 * 10 = 70 (>= 10, interrupts here) .pipe(lambda payload: payload * 10) # This stage never executes ) result = await pipeline.process(5) # Returns: 70 # Stops after second stage because 70 >= 10 # Using InterruptibleProcessor directly processor = InterruptibleProcessor[int](lambda payload: payload >= 20) result = await processor.process( payload=5, stages=( lambda payload: payload + 2, # 5 + 2 = 7 lambda payload: payload * 5, # 7 * 5 = 35 (>= 20, interrupts here) lambda payload: payload * 10, # Never executes ), ) # Returns: 35 ``` -------------------------------- ### Add Stage to Pipeline Factory and Process Source: https://github.com/thecodecrate/python-pipeline/blob/main/jupyter/03-pipeline-factory.ipynb Demonstrates adding a new stage to an existing pipeline factory after the initial pipeline has been created. It shows that modifying the factory does not affect previously created pipelines due to immutability, but new pipelines created from the modified factory will include the new stage. ```python # adding another stage after the pipeline has been created from typing import Any def add_excitement(x: Any) -> str: return f"{x}!!!" pipeline_factory.add_stage(add_excitement) # the original pipeline is not affected result2 = await pipeline.process(10) assert result2 == result print(f"original pipeline: {result}") # now, let's create a new pipeline, with the excitement stage!!! pipeline2 = pipeline_factory.make() result3 = await pipeline2.process(10) # the new pipeline has the excitement!!! assert result3 == "result is 12!!!" print(f"new pipeline: {result3}") # notice the original pipeline is not affected result4 = await pipeline.process(10) assert result4 == result print(f"original pipeline: {result}") # what if we try to change the original pipeline, directly? pipeline.pipe(add_excitement) # <- adding the excitement stage to the original pipeline # the original pipeline is still not affected, due to the immutability nature of pipelines result5 = await pipeline.process(10) assert result5 == result print(f"original pipeline: {result}") ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.