haive.agents.common.models.task_analysis.parallelization ======================================================== .. py:module:: haive.agents.common.models.task_analysis.parallelization .. autoapi-nested-parse:: Parallelization analysis for task execution planning. This module analyzes task dependencies to identify parallelization opportunities, execution phases, join points, and optimal execution strategies. Classes ------- .. autoapisummary:: haive.agents.common.models.task_analysis.parallelization.ExecutionPhase haive.agents.common.models.task_analysis.parallelization.ExecutionStrategy haive.agents.common.models.task_analysis.parallelization.JoinPoint haive.agents.common.models.task_analysis.parallelization.ParallelGroup haive.agents.common.models.task_analysis.parallelization.ParallelizationAnalysis haive.agents.common.models.task_analysis.parallelization.ParallelizationAnalyzer Module Contents --------------- .. py:class:: ExecutionPhase(/, **data) Bases: :py:obj:`pydantic.BaseModel` Represents a phase in the overall task execution plan. Execution phases organize the task execution into sequential stages, where each phase must complete before the next phase can begin. .. attribute:: phase_number Sequential phase number .. attribute:: name Descriptive name for this phase .. attribute:: parallel_groups Groups of tasks that can run in parallel within this phase .. attribute:: dependencies What this phase depends on .. attribute:: estimated_duration_minutes Total time for this phase .. attribute:: critical_path_tasks Tasks on the critical path within this phase .. rubric:: Example .. code-block:: python phase = ExecutionPhase( phase_number=1, name="Data Collection Phase", parallel_groups=[research_group, survey_group], estimated_duration_minutes=180 ) Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name. .. py:method:: calculate_sequential_duration() Calculate duration if all tasks ran sequentially. :returns: Sequential execution duration in minutes .. py:method:: get_max_parallelism() Get maximum number of tasks that can run simultaneously. :returns: Maximum parallelism level .. py:method:: get_parallelization_benefit() Calculate benefit from parallelization as a ratio. :returns: Parallelization benefit (sequential_time / parallel_time) .. py:method:: get_total_task_count() Get total number of tasks across all parallel groups. :returns: Total task count .. py:attribute:: model_config Configuration for the model, should be a dictionary conforming to [`ConfigDict`][pydantic.config.ConfigDict]. .. py:class:: ExecutionStrategy Bases: :py:obj:`str`, :py:obj:`enum.Enum` Strategies for executing parallel tasks. .. attribute:: SEQUENTIAL Execute all tasks one after another .. attribute:: MAX_PARALLEL Execute as many tasks in parallel as possible .. attribute:: RESOURCE_CONSTRAINED Parallel execution limited by resource availability .. attribute:: PRIORITY_BASED Execute high-priority tasks first, parallelize when possible .. attribute:: BALANCED Balance between parallelization and resource usage Initialize self. See help(type(self)) for accurate signature. .. py:class:: JoinPoint(/, **data) Bases: :py:obj:`pydantic.BaseModel` Represents a point where multiple parallel tasks must synchronize. Join points are critical for understanding where parallel execution must wait for all dependencies to complete before proceeding. .. attribute:: id Unique identifier for this join point .. attribute:: name Descriptive name for the join point .. attribute:: input_task_ids IDs of tasks that must complete before this join .. attribute:: output_task_ids IDs of tasks that can start after this join .. attribute:: join_type Type of join operation .. attribute:: estimated_wait_time Expected time to wait for all inputs .. attribute:: is_critical_path Whether this join point is on the critical path .. rubric:: Example .. code-block:: python join_point = JoinPoint( id="analysis_join", name="Combine Analysis Results", input_task_ids=["data_collection", "background_research"], output_task_ids=["final_report"], join_type="synchronous" ) Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name. .. py:method:: get_input_count() Get the number of input tasks for this join point. :returns: Number of input tasks .. py:method:: get_output_count() Get the number of output tasks from this join point. :returns: Number of output tasks .. py:method:: is_merge_point() Check if this is a merge point (multiple inputs, single output). :returns: True if multiple inputs merge to single output .. py:method:: is_split_point() Check if this is a split point (single input, multiple outputs). :returns: True if single input splits to multiple outputs .. py:attribute:: model_config Configuration for the model, should be a dictionary conforming to [`ConfigDict`][pydantic.config.ConfigDict]. .. py:class:: ParallelGroup(/, **data) Bases: :py:obj:`pydantic.BaseModel` Represents a group of tasks that can execute in parallel. Parallel groups identify sets of tasks that have no blocking dependencies between them and can therefore run simultaneously. .. attribute:: group_id Unique identifier for this parallel group .. attribute:: task_ids IDs of tasks in this parallel group .. attribute:: estimated_duration_minutes Time for the longest task in the group .. attribute:: resource_requirements Combined resource requirements .. attribute:: can_be_interleaved Whether tasks can be interleaved or must run fully parallel .. attribute:: priority Priority level for this group .. attribute:: phase Execution phase this group belongs to .. rubric:: Example .. code-block:: python parallel_group = ParallelGroup( group_id="research_phase", task_ids=["web_research", "library_research", "expert_interviews"], estimated_duration_minutes=120, resource_requirements={"researchers": 3, "internet": True} ) Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name. .. py:method:: calculate_actual_duration(sequential_duration) Calculate actual duration considering parallelization. :param sequential_duration: Duration if tasks ran sequentially :returns: Actual duration with parallelization .. py:method:: get_task_count() Get the number of tasks in this parallel group. :returns: Number of tasks in the group .. py:method:: get_theoretical_speedup() Calculate theoretical speedup from parallelization. :returns: Theoretical speedup factor .. py:attribute:: model_config Configuration for the model, should be a dictionary conforming to [`ConfigDict`][pydantic.config.ConfigDict]. .. py:class:: ParallelizationAnalysis(/, **data) Bases: :py:obj:`pydantic.BaseModel` Complete analysis of parallelization opportunities for a task. This is the main result of parallelization analysis, containing all the information needed to optimize task execution. .. attribute:: execution_phases Sequential phases of execution .. attribute:: parallel_groups All identified parallel groups .. attribute:: join_points Critical synchronization points .. attribute:: critical_path Tasks on the critical path .. attribute:: execution_strategy Recommended execution strategy .. attribute:: estimated_speedup Expected speedup from parallelization .. attribute:: resource_requirements Peak resource requirements .. attribute:: bottlenecks Identified bottlenecks and constraints .. rubric:: Example .. code-block:: python analysis = ParallelizationAnalysis( execution_phases=[phase1, phase2, phase3], parallel_groups=[group1, group2], join_points=[join1, join2], critical_path=["task_1", "task_3", "task_5"], estimated_speedup=2.5 ) Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name. .. py:method:: calculate_time_savings() Calculate time savings from parallelization. :returns: Time savings in minutes .. py:method:: get_critical_path_duration() Get duration of the critical path. :returns: Critical path duration in minutes .. py:method:: get_efficiency_percentage() Get parallelization efficiency as a percentage. :returns: Efficiency percentage (0-100) .. py:method:: get_max_parallelism() Get maximum parallelism across all phases. :returns: Maximum number of tasks that can run simultaneously .. py:method:: get_total_phases() Get total number of execution phases. :returns: Number of phases .. py:method:: is_worth_parallelizing(min_speedup = 1.2) Determine if parallelization is worthwhile. :param min_speedup: Minimum speedup required to justify parallelization :returns: True if parallelization provides sufficient benefit .. py:attribute:: model_config Configuration for the model, should be a dictionary conforming to [`ConfigDict`][pydantic.config.ConfigDict]. .. py:class:: ParallelizationAnalyzer(/, **data) Bases: :py:obj:`pydantic.BaseModel` Analyzer for identifying parallelization opportunities in tasks. This class performs sophisticated analysis of task dependencies to identify optimal parallelization strategies, execution phases, and resource requirements. .. attribute:: max_parallel_tasks Maximum number of tasks to run in parallel .. attribute:: resource_constraints Resource limitations that affect parallelization .. attribute:: prefer_balanced_groups Whether to prefer balanced parallel groups .. attribute:: include_coordination_overhead Whether to include coordination overhead .. rubric:: Example .. code-block:: python analyzer = ParallelizationAnalyzer( max_parallel_tasks=8, resource_constraints={"cpu_cores": 4, "memory_gb": 16} ) analysis = analyzer.analyze_task(complex_task) print(f"Recommended speedup: {analysis.estimated_speedup:.1f}x") Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name. .. py:method:: analyze_task(task) Analyze a task for parallelization opportunities. :param task: Task to analyze :returns: Complete parallelization analysis .. py:attribute:: model_config Configuration for the model, should be a dictionary conforming to [`ConfigDict`][pydantic.config.ConfigDict].