haive.agents.common.models.task_analysis.parallelization¶
Parallelization analysis for task execution planning.
This module analyzes task dependencies to identify parallelization opportunities, execution phases, join points, and optimal execution strategies.
Classes¶
Represents a phase in the overall task execution plan. |
|
Strategies for executing parallel tasks. |
|
Represents a point where multiple parallel tasks must synchronize. |
|
Represents a group of tasks that can execute in parallel. |
|
Complete analysis of parallelization opportunities for a task. |
|
Analyzer for identifying parallelization opportunities in tasks. |
Module Contents¶
- class haive.agents.common.models.task_analysis.parallelization.ExecutionPhase(/, **data)¶
Bases:
pydantic.BaseModelRepresents a phase in the overall task execution plan.
Execution phases organize the task execution into sequential stages, where each phase must complete before the next phase can begin.
- Parameters:
data (Any)
- phase_number¶
Sequential phase number
- name¶
Descriptive name for this phase
- parallel_groups¶
Groups of tasks that can run in parallel within this phase
- dependencies¶
What this phase depends on
- estimated_duration_minutes¶
Total time for this phase
- critical_path_tasks¶
Tasks on the critical path within this phase
Example
phase = ExecutionPhase( phase_number=1, name="Data Collection Phase", parallel_groups=[research_group, survey_group], estimated_duration_minutes=180 )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- calculate_sequential_duration()¶
Calculate duration if all tasks ran sequentially.
- Returns:
Sequential execution duration in minutes
- Return type:
- get_max_parallelism()¶
Get maximum number of tasks that can run simultaneously.
- Returns:
Maximum parallelism level
- Return type:
- get_parallelization_benefit()¶
Calculate benefit from parallelization as a ratio.
- Returns:
Parallelization benefit (sequential_time / parallel_time)
- Return type:
- get_total_task_count()¶
Get total number of tasks across all parallel groups.
- Returns:
Total task count
- Return type:
- model_config¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class haive.agents.common.models.task_analysis.parallelization.ExecutionStrategy¶
-
Strategies for executing parallel tasks.
- SEQUENTIAL¶
Execute all tasks one after another
- MAX_PARALLEL¶
Execute as many tasks in parallel as possible
- RESOURCE_CONSTRAINED¶
Parallel execution limited by resource availability
- PRIORITY_BASED¶
Execute high-priority tasks first, parallelize when possible
- BALANCED¶
Balance between parallelization and resource usage
Initialize self. See help(type(self)) for accurate signature.
- class haive.agents.common.models.task_analysis.parallelization.JoinPoint(/, **data)¶
Bases:
pydantic.BaseModelRepresents a point where multiple parallel tasks must synchronize.
Join points are critical for understanding where parallel execution must wait for all dependencies to complete before proceeding.
- Parameters:
data (Any)
- id¶
Unique identifier for this join point
- name¶
Descriptive name for the join point
- input_task_ids¶
IDs of tasks that must complete before this join
- output_task_ids¶
IDs of tasks that can start after this join
- join_type¶
Type of join operation
- estimated_wait_time¶
Expected time to wait for all inputs
- is_critical_path¶
Whether this join point is on the critical path
Example
join_point = JoinPoint( id="analysis_join", name="Combine Analysis Results", input_task_ids=["data_collection", "background_research"], output_task_ids=["final_report"], join_type="synchronous" )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- get_input_count()¶
Get the number of input tasks for this join point.
- Returns:
Number of input tasks
- Return type:
- get_output_count()¶
Get the number of output tasks from this join point.
- Returns:
Number of output tasks
- Return type:
- is_merge_point()¶
Check if this is a merge point (multiple inputs, single output).
- Returns:
True if multiple inputs merge to single output
- Return type:
- is_split_point()¶
Check if this is a split point (single input, multiple outputs).
- Returns:
True if single input splits to multiple outputs
- Return type:
- model_config¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class haive.agents.common.models.task_analysis.parallelization.ParallelGroup(/, **data)¶
Bases:
pydantic.BaseModelRepresents a group of tasks that can execute in parallel.
Parallel groups identify sets of tasks that have no blocking dependencies between them and can therefore run simultaneously.
- Parameters:
data (Any)
- group_id¶
Unique identifier for this parallel group
- task_ids¶
IDs of tasks in this parallel group
- estimated_duration_minutes¶
Time for the longest task in the group
- resource_requirements¶
Combined resource requirements
- can_be_interleaved¶
Whether tasks can be interleaved or must run fully parallel
- priority¶
Priority level for this group
- phase¶
Execution phase this group belongs to
Example
parallel_group = ParallelGroup( group_id="research_phase", task_ids=["web_research", "library_research", "expert_interviews"], estimated_duration_minutes=120, resource_requirements={"researchers": 3, "internet": True} )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- calculate_actual_duration(sequential_duration)¶
Calculate actual duration considering parallelization.
- get_task_count()¶
Get the number of tasks in this parallel group.
- Returns:
Number of tasks in the group
- Return type:
- get_theoretical_speedup()¶
Calculate theoretical speedup from parallelization.
- Returns:
Theoretical speedup factor
- Return type:
- model_config¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class haive.agents.common.models.task_analysis.parallelization.ParallelizationAnalysis(/, **data)¶
Bases:
pydantic.BaseModelComplete analysis of parallelization opportunities for a task.
This is the main result of parallelization analysis, containing all the information needed to optimize task execution.
- Parameters:
data (Any)
- execution_phases¶
Sequential phases of execution
- parallel_groups¶
All identified parallel groups
- join_points¶
Critical synchronization points
- critical_path¶
Tasks on the critical path
- execution_strategy¶
Recommended execution strategy
- estimated_speedup¶
Expected speedup from parallelization
- resource_requirements¶
Peak resource requirements
- bottlenecks¶
Identified bottlenecks and constraints
Example
analysis = ParallelizationAnalysis( execution_phases=[phase1, phase2, phase3], parallel_groups=[group1, group2], join_points=[join1, join2], critical_path=["task_1", "task_3", "task_5"], estimated_speedup=2.5 )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- calculate_time_savings()¶
Calculate time savings from parallelization.
- Returns:
Time savings in minutes
- Return type:
- get_critical_path_duration()¶
Get duration of the critical path.
- Returns:
Critical path duration in minutes
- Return type:
- get_efficiency_percentage()¶
Get parallelization efficiency as a percentage.
- Returns:
Efficiency percentage (0-100)
- Return type:
- get_max_parallelism()¶
Get maximum parallelism across all phases.
- Returns:
Maximum number of tasks that can run simultaneously
- Return type:
- get_total_phases()¶
Get total number of execution phases.
- Returns:
Number of phases
- Return type:
- is_worth_parallelizing(min_speedup=1.2)¶
Determine if parallelization is worthwhile.
- model_config¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class haive.agents.common.models.task_analysis.parallelization.ParallelizationAnalyzer(/, **data)¶
Bases:
pydantic.BaseModelAnalyzer for identifying parallelization opportunities in tasks.
This class performs sophisticated analysis of task dependencies to identify optimal parallelization strategies, execution phases, and resource requirements.
- Parameters:
data (Any)
- max_parallel_tasks¶
Maximum number of tasks to run in parallel
- resource_constraints¶
Resource limitations that affect parallelization
- prefer_balanced_groups¶
Whether to prefer balanced parallel groups
- include_coordination_overhead¶
Whether to include coordination overhead
Example
analyzer = ParallelizationAnalyzer( max_parallel_tasks=8, resource_constraints={"cpu_cores": 4, "memory_gb": 16} ) analysis = analyzer.analyze_task(complex_task) print(f"Recommended speedup: {analysis.estimated_speedup:.1f}x")
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- analyze_task(task)¶
Analyze a task for parallelization opportunities.
- Parameters:
task (haive.agents.common.models.task_analysis.base.Task) – Task to analyze
- Returns:
Complete parallelization analysis
- Return type:
- model_config¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].