diff --git a/docs/source/generating_workflow_benchmarks.rst b/docs/source/generating_workflow_benchmarks.rst index 13cf0bf6..a160f806 100644 --- a/docs/source/generating_workflow_benchmarks.rst +++ b/docs/source/generating_workflow_benchmarks.rst @@ -132,8 +132,24 @@ workflow benchmark for running with Nextflow:: benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6) # generate a Nextflow workflow - translator = NextflowTranslator(benchmark.workflow) - translator.translate(output_folder=pathlib.Path("./nextflow-wf/"")) + translator = NextflowTranslator( + benchmark.workflow, + use_subworkflows=False, + max_tasks_per_subworkflow=1000, + ) + translator.translate(output_folder=pathlib.Path("./nextflow-wf/")) + +If you want to split large workflows across multiple Nextflow module files, enable +subworkflows and set the maximum number of tasks per module. This produces a +``modules/`` directory plus a top-level ``workflow.nf`` that includes and runs +the modules sequentially:: + + translator = NextflowTranslator( + benchmark.workflow, + use_subworkflows=True, + max_tasks_per_subworkflow=250, + ) + translator.translate(output_folder=pathlib.Path("./nextflow-wf/")) .. warning:: diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 00000000..939ca143 --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,3 @@ +pytest +pytest-cov +docker diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 400a45a8..337b2a06 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# Copyright (c) 2025 The WfCommons Team. +# Copyright (c) 2025-2026 The WfCommons Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -105,6 +105,7 @@ def _additional_setup_swiftt(container): "dask": noop, "parsl": noop, "nextflow": noop, + "nextflow_subworkflow": noop, "airflow": noop, "bash": noop, "taskvine": _additional_setup_taskvine, @@ -203,6 +204,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath): "dask": run_workflow_dask, "parsl": run_workflow_parsl, "nextflow": run_workflow_nextflow, + "nextflow_subworkflow": run_workflow_nextflow, "airflow": run_workflow_airflow, "bash": run_workflow_bash, "taskvine": run_workflow_taskvine, @@ -216,6 +218,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath): "dask": DaskTranslator, "parsl": ParslTranslator, "nextflow": NextflowTranslator, + "nextflow_subworkflow": NextflowTranslator, "airflow": AirflowTranslator, "bash": BashTranslator, "taskvine": TaskVineTranslator, @@ -235,6 +238,7 @@ class TestTranslators: "dask", "parsl", "nextflow", + "nextflow_subworkflow", "airflow", "bash", "taskvine", @@ -256,7 +260,10 @@ def test_translator(self, backend) -> None: # Perform the translation sys.stderr.write(f"\n[{backend}] Translating workflow...\n") - translator = translator_classes[backend](benchmark.workflow) + if backend == "nextflow_subworkflow": + translator = translator_classes[backend](benchmark.workflow, use_subworkflows=True, max_tasks_per_subworkflow=10) + else: + translator = translator_classes[backend](benchmark.workflow) translator.translate(output_folder=dirpath) # # Make the directory that holds the translation world-writable, @@ -266,7 +273,7 @@ def test_translator(self, backend) -> None: # os.chmod(dirpath, 0o777) # Start the Docker container - container = _start_docker_container(backend, str_dirpath, str_dirpath, str_dirpath + "bin/") + container = _start_docker_container(backend if backend != "nextflow_subworkflow" else "nextflow", str_dirpath, str_dirpath, str_dirpath + "bin/") # Do whatever necessary setup additional_setup_methods[backend](container) diff --git a/wfcommons/wfbench/translator/nextflow.py b/wfcommons/wfbench/translator/nextflow.py index 3864cf06..f8023dff 100644 --- a/wfcommons/wfbench/translator/nextflow.py +++ b/wfcommons/wfbench/translator/nextflow.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# Copyright (c) 2021-2025 The WfCommons Team. +# Copyright (c) 2021-2026 The WfCommons Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -25,8 +25,17 @@ class NextflowTranslator(Translator): """ A WfFormat parser for creating Nextflow workflow applications. + This translator can generate either a single-file workflow or split the workflow + across multiple module files for better scalability with large workflows. + :param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance. :type workflow: Union[Workflow, pathlib.Path] + :param use_subworkflows: Whether to split the workflow into multiple module files. + :type use_subworkflows: bool + :param max_tasks_per_subworkflow: Maximum number of tasks per module file when using subworkflows. + :type max_tasks_per_subworkflow: int + :param max_parents_threshold: Tasks with more parents than this get their own module (default: 100). + :type max_parents_threshold: int :param slurm: Whether to generate a Slurm template script for workflow submission using :code:`sbatch`. :type slurm: bool :param logger: The logger where to log information/warning or errors (optional). @@ -34,17 +43,24 @@ class NextflowTranslator(Translator): """ def __init__(self, workflow: Union[Workflow, pathlib.Path], + use_subworkflows: bool = False, + max_tasks_per_subworkflow: int = 1000, + max_parents_threshold: Optional[int] = 100, slurm: Optional[bool] = False, logger: Optional[Logger] = None) -> None: """Create an object of the translator.""" super().__init__(workflow, logger) + self.use_subworkflows = use_subworkflows + self.max_tasks_per_subworkflow = max_tasks_per_subworkflow + self.max_parents_threshold = max_parents_threshold self.slurm = slurm self.script = "" self.out_files = set() + self.subworkflows: List[List[Task]] = [] def translate(self, output_folder: pathlib.Path) -> None: """ - Translate a workflow benchmark description(WfFormat) into a Nextflow workflow application. + Translate a workflow benchmark description (WfFormat) into a Nextflow workflow application. :param output_folder: The path to the folder in which the workflow benchmark will be generated. :type output_folder: pathlib.Path @@ -56,7 +72,7 @@ def translate(self, output_folder: pathlib.Path) -> None: # Create benchmark files self._copy_binary_files(output_folder) self._generate_input_files(output_folder) - + if self.slurm: shutil.copy(this_dir.joinpath("templates/nextflow/nextflow_hyperqueue_job.sh"), output_folder) @@ -71,59 +87,22 @@ def translate(self, output_folder: pathlib.Path) -> None: for task in sorted_tasks: self._create_task_script(task) - # Create the Nextflow workflow script and file - self._create_workflow_script(sorted_tasks) - run_workflow_code = self._merge_codelines("templates/nextflow/workflow.nf", self.script) - self._write_output_file(run_workflow_code, output_folder.joinpath("workflow.nf")) + if self.use_subworkflows: + self._translate_with_subworkflows(output_folder, sorted_tasks) + else: + self._translate_single_file(output_folder, sorted_tasks) # Create the README file - self._write_readme_file(output_folder) - - return - - def _create_workflow_script(self, tasks: list[Task]): - """ - Create the Nextflow script. - - :param tasks: The (sorted) list of tasks. - :type tasks: list[Task] - """ - # Add Flowcept code if enabled - if self.workflow.workflow_id: - self.script += self._generate_flowcept_code() - - # Output the code for each task - for task in tasks: - self.script += self._generate_task_code(task) - - # Output the code for the workflow - self.script += self._generate_workflow_code(tasks) + self._write_readme_file(output_folder, self.use_subworkflows) - return - - def _generate_flowcept_code(self) -> str: - """ - - :return: The code. - :rtype: str - """ - out_files = ", ".join(f"\"{item}\"" for item in self.out_files) - return "process flowcept(){\n" \ - " input:\n" \ - " output:\n" \ - " script:\n" \ - " \"\"\"\n" \ - " ${pwd}/bin/flowcept_agent.py " \ - f"{self.workflow.name} {self.workflow.workflow_id} '[{out_files}]' \n" \ - " \"\"\"\n" \ - "}\n\n" + # ========================================================================= + # Common methods + # ========================================================================= def _get_tasks_in_topological_order(self) -> List[Task]: """ Sort the workflow tasks in topological order. - :param output_folder: The path to the output folder. - :type output_folder: pathlib.Path :return: A sorted list of tasks. :rtype: List[Task] """ @@ -148,31 +127,28 @@ def _get_tasks_in_topological_order(self) -> List[Task]: current_level += 1 return sorted_tasks - def _create_task_script(self, task: Task): + def _create_task_script(self, task: Task) -> None: """ Generate the bash script for invoking a task. :param task: The task. :type task: Task - :return: The code. - :rtype: str """ - code = "#!/bin/bash\n\n" # Generate input spec input_spec = "'\\[" for f in task.input_files: - input_spec += f"\"{self.output_folder}/data/{f.file_id}\"," + input_spec += f"\"{self.output_folder.absolute()}/data/{f.file_id}\"," input_spec = input_spec[:-1] + "\\]'" # Generate output spec output_spec = "'\\{" for f in task.output_files: - output_spec += f"\"{self.output_folder}/data/{f.file_id}\":{str(f.size)}," + output_spec += f"\"{self.output_folder.absolute()}/data/{f.file_id}\":{str(f.size)}," output_spec = output_spec[:-1] + "\\}'" - code += f"{self.output_folder}/bin/{task.program} " + code += f"{self.output_folder.absolute()}/bin/{task.program} " for a in task.args: if "--output-files" in a: @@ -187,7 +163,25 @@ def _create_task_script(self, task: Task): with open(script_file_path, "w") as out: out.write(code) - def _generate_task_code(self, task: Task) -> str: + def _generate_flowcept_code(self) -> str: + """ + Generate the Flowcept process code. + + :return: The code. + :rtype: str + """ + out_files = ", ".join(f"\"{item}\"" for item in self.out_files) + return "process flowcept(){\n" \ + " input:\n" \ + " output:\n" \ + " script:\n" \ + " \"\"\"\n" \ + " ${pwd}/bin/flowcept_agent.py " \ + f"{self.workflow.name} {self.workflow.workflow_id} '[{out_files}]' \n" \ + " \"\"\"\n" \ + "}\n\n" + + def _generate_task_process(self, task: Task) -> str: """ Generate the code for a task, as a Nextflow process. @@ -233,23 +227,94 @@ def _generate_task_code(self, task: Task) -> str: for f in task.output_files: code += "\t\t" + f.file_id + " = \"${pwd}/data/" + f.file_id + "\"\n" - code += "\t\t\"\"\"\n" code += "\t\t${params.simulate ? 'sleep 1' : \"bash ${pwd}/bin/script_" + task.task_id + ".sh\"}\n" code += "\t\t\"\"\"\n" code += "}\n\n" return code - def _generate_workflow_code(self, sorted_tasks: List[Task]) -> str: + def _generate_task_function(self, task: Task) -> str: """ - Generate the code for the workflow. + Generate the code for starting a task's Nextflow process, as a Nextflow function. - :param sorted_tasks: The task. - :type sorted_tasks: List[Task] + :param task: The task. + :type task: Task :return: The code. :rtype: str """ + code = f"// Function to call task {task.task_id}\n" + function_name = task.task_id.replace(".", "_") + code += f"def function_{function_name}(Map inputs) " + "{\n" + code += "\tdef outputs = inputs.clone()\n" + code += self._generate_task_call( + task=task, + function_name=function_name, + inputs_var="inputs", + results_var="outputs", + include_comment=False, + ) + code += "\treturn outputs\n" + code += "}\n\n" + + return code + + def _write_readme_file(self, output_folder: pathlib.Path, use_subworkflows: bool) -> None: + """ + Write the README file. + + :param output_folder: The path of the output folder. + :type output_folder: pathlib.Path + :param use_subworkflows: Whether subworkflows were used. + :type use_subworkflows: bool + """ + readme_file_path = output_folder.joinpath("README") + with open(readme_file_path, "w") as out: + out.write(f"Run the workflow in directory {str(output_folder)} using the following command:\n") + out.write(f"\tnextflow run ./workflow.nf --pwd `pwd`\n\n") + if use_subworkflows: + out.write(f"This workflow has been split into {len(self.subworkflows)} module file(s), ") + out.write(f"each containing a maximum of {self.max_tasks_per_subworkflow} tasks.\n") + out.write(f"\nModule files are located in the 'modules/' directory.\n") + + # ========================================================================= + # Single-file mode methods + # ========================================================================= + + def _translate_single_file(self, output_folder: pathlib.Path, sorted_tasks: List[Task]) -> None: + """ + Generate a single-file Nextflow workflow. + + :param output_folder: The output folder path. + :type output_folder: pathlib.Path + :param sorted_tasks: Tasks in topological order. + :type sorted_tasks: List[Task] + """ + self.script = "" + + # Add Flowcept code if enabled + if self.workflow.workflow_id: + self.script += self._generate_flowcept_code() + + # Output the code for each task + for task in sorted_tasks: + self.script += self._generate_task_process(task) + + # Output the code for the workflow + self.script += self._generate_single_file_workflow_code(sorted_tasks) + + # Merge with template and write + run_workflow_code = self._merge_codelines("templates/nextflow/workflow.nf", self.script) + self._write_output_file(run_workflow_code, output_folder.joinpath("workflow.nf")) + + def _generate_single_file_workflow_code(self, sorted_tasks: List[Task]) -> str: + """ + Generate the workflow code for single-file mode. + :param sorted_tasks: The tasks in topological order. + :type sorted_tasks: List[Task] + :return: The code. + :rtype: str + """ code = "" # Generate bootstrap function @@ -274,53 +339,223 @@ def _generate_workflow_code(self, sorted_tasks: List[Task]) -> str: code += "}\n" return code - def _generate_task_function(self, task: Task) -> str: + # ========================================================================= + # Subworkflow mode methods + # ========================================================================= + + def _translate_with_subworkflows(self, output_folder: pathlib.Path, sorted_tasks: List[Task]) -> None: + """ + Generate a multi-file Nextflow workflow with module files. + + :param output_folder: The output folder path. + :type output_folder: pathlib.Path + :param sorted_tasks: Tasks in topological order. + :type sorted_tasks: List[Task] + """ + # Split tasks into chunks for separate files + self._split_into_subworkflows(sorted_tasks) + self.logger.info(f"Split workflow into {len(self.subworkflows)} module file(s)") + + # Create modules directory + modules_dir = output_folder.joinpath("modules") + modules_dir.mkdir(exist_ok=True) + + # Create each module file (processes + orchestrating function) + for idx, subworkflow_tasks in enumerate(self.subworkflows): + module_code = self._generate_module_file(idx, subworkflow_tasks) + self._write_output_file( + module_code, + modules_dir.joinpath(f"tasks_{idx}.nf") + ) + + # Create the main Nextflow workflow script + main_workflow_code = self._generate_main_workflow() + self._write_output_file(main_workflow_code, output_folder.joinpath("workflow.nf")) + + def _split_into_subworkflows(self, sorted_tasks: List[Task]) -> None: + """ + Split the sorted tasks into chunks for separate files. + + Tasks with more than max_parents_threshold parents are placed in their own + subworkflow to avoid generating overly long channel mixing code. + + :param sorted_tasks: The topologically sorted list of tasks. + :type sorted_tasks: List[Task] + """ + self.subworkflows = [] + current_chunk = [] + + for task in sorted_tasks: + num_parents = len(self._find_parents(task.task_id)) + + # If task has many parents, put it in its own subworkflow + if num_parents > self.max_parents_threshold: + # First, save the current chunk if not empty + if current_chunk: + self.subworkflows.append(current_chunk) + current_chunk = [] + # Add this task as its own subworkflow + self.subworkflows.append([task]) + else: + current_chunk.append(task) + # If chunk is full, start a new one + if len(current_chunk) >= self.max_tasks_per_subworkflow: + self.subworkflows.append(current_chunk) + current_chunk = [] + + # Don't forget the last chunk + if current_chunk: + self.subworkflows.append(current_chunk) + + def _generate_module_file(self, module_idx: int, tasks: List[Task]) -> str: + """ + Generate a module .nf file containing processes and an orchestrating function. + + Each module file contains: + - Process definitions (private to the file) + - A single exported function that orchestrates those processes + + Using a function instead of a workflow block allows passing Maps of channels + between modules without Nextflow channel type casting issues. + + :param module_idx: The index of this module. + :type module_idx: int + :param tasks: The tasks in this module. + :type tasks: List[Task] + :return: The complete module file content. + :rtype: str + """ + code = f"// Module {module_idx} - Tasks file\n" + code += "// Auto-generated by WfCommons NextflowTranslator\n\n" + + # Define pwd variable from params (modules can't access main workflow variables) + code += "// Resolve working directory from params\n" + code += "def pwd = params.pwd ? file(params.pwd).toAbsolutePath().toString() : null\n\n" + + # Generate process definitions for each task (private to this file) + for task in tasks: + code += self._generate_task_process(task) + + # Generate the function that orchestrates these tasks + code += self._generate_module_function(module_idx, tasks) + + return code + + def _generate_module_function(self, module_idx: int, tasks: List[Task]) -> str: + """ + Generate a Groovy function that orchestrates a set of tasks. + + Using a function instead of a workflow block allows passing Maps of channels + between modules without Nextflow channel type casting issues. + + :param module_idx: The index of this module. + :type module_idx: int + :param tasks: The tasks in this module. + :type tasks: List[Task] + :return: The function code. + :rtype: str + """ + code = f"// Function to execute tasks in module {module_idx}\n" + code += f"def run_module_{module_idx}(Map inputs) {{\n" + code += "\tdef results = inputs.clone()\n\n" + + # Call each task's process + for task in tasks: + function_name = task.task_id.replace(".", "_") + code += self._generate_task_call( + task=task, + function_name=function_name, + inputs_var="results", + results_var="results", + include_comment=True, + ) + + code += "\treturn results\n" + code += "}\n\n" + + return code + + def _generate_task_call(self, + task: Task, + function_name: str, + inputs_var: str, + results_var: str, + include_comment: bool) -> str: """ - Generate the code for a starting a task's Nextflow process, as a Nextflow function. + Generate the code to call a task's process and map outputs into a results map. :param task: The task. :type task: Task + :param function_name: The sanitized function name. + :type function_name: str + :param inputs_var: The variable name containing input channels. + :type inputs_var: str + :param results_var: The variable name to update with outputs. + :type results_var: str + :param include_comment: Whether to include a task comment. + :type include_comment: bool :return: The code. :rtype: str """ - code = f"// Function to call task {task.task_id}\n" - function_name = task.task_id.replace(".", "_") - code += f"def function_{function_name}(Map inputs) " + "{\n" + code = "" + has_parents = self._find_parents(task.task_id) - if self._find_parents(task.task_id): - # Input channel mixing and then call - code += f"\tdef {function_name}_necessary_input = Channel.empty()\n" + if include_comment: + root_suffix = " (root)" if not has_parents else "" + code += f"\t// Task: {task.task_id}{root_suffix}\n" + + if has_parents: + code += f"\tdef {function_name}_input = Channel.empty()\n" for f in task.input_files: - code += f"\t{function_name}_necessary_input = {function_name}_necessary_input.mix(inputs.{f.file_id})\n" - code += f"\tdef {function_name}_necessary_input_future = {function_name}_necessary_input.collect()\n" - code += f"\tdef {function_name}_produced_output = {function_name}({function_name}_necessary_input_future)\n" + code += f"\t{function_name}_input = {function_name}_input.mix({inputs_var}.{f.file_id})\n" + code += f"\tdef {function_name}_input_future = {function_name}_input.collect()\n" + code += f"\tdef {function_name}_output = {function_name}({function_name}_input_future)\n" else: - # Simple call - code += f"\tdef {function_name}_produced_output = {function_name}()\n" + code += f"\tdef {function_name}_output = {function_name}()\n" - # Pass on the outputs - code += "\n" - code += "\tdef outputs = inputs.clone()\n" if self._find_children(task.task_id): counter = 0 for f in task.output_files: - code += f"\toutputs.{f.file_id} = {function_name}_produced_output.map" + "{it[" + str(counter) + "]}\n" + code += f"\t{results_var}.{f.file_id} = {function_name}_output.map{{it[{counter}]}}\n" counter += 1 - code += "\treturn outputs\n" - code += "}\n\n" + code += "\n" return code - - def _write_readme_file(self, output_folder: pathlib.Path) -> None: + def _generate_main_workflow(self) -> str: """ - Write the README file. + Generate the main workflow file that orchestrates all module functions. - :param output_folder: The path of the output folder. - :type output_folder: pathlib.Path + :return: The main workflow file content. + :rtype: str """ - readme_file_path = output_folder.joinpath("README") - with open(readme_file_path, "w") as out: - out.write(f"Run the workflow in directory {str(output_folder)} using the following command:\n") + code = "" + + # Include module functions (one per line to avoid long strings) + code += "// Include module functions\n" + for idx in range(len(self.subworkflows)): + code += f"include {{ run_module_{idx} }} from './modules/tasks_{idx}.nf'\n" + code += "\n" + + # Add Flowcept process if enabled + if self.workflow.workflow_id: + code += self._generate_flowcept_code() + + # Generate the main workflow + code += "workflow {\n" + + if self.workflow.workflow_id: + code += "\tflowcept()\n" + + # Initialize empty channel map + code += "\t// Initialize empty channel map\n" + code += "\tdef ch_data = [:]\n\n" + + # Call each module function in sequence, passing the channel map + code += "\t// Execute modules in sequence\n" + for idx in range(len(self.subworkflows)): + code += f"\tch_data = run_module_{idx}(ch_data)\n" + + code += "}\n" - out.write(f"\tnextflow run ./workflow.nf --pwd `pwd`\n") + return self._merge_codelines("templates/nextflow/workflow.nf", code) diff --git a/wfcommons/wfbench/translator/templates/nextflow/workflow.nf b/wfcommons/wfbench/translator/templates/nextflow/workflow.nf index 7f400dbe..49871c31 100644 --- a/wfcommons/wfbench/translator/templates/nextflow/workflow.nf +++ b/wfcommons/wfbench/translator/templates/nextflow/workflow.nf @@ -33,10 +33,10 @@ def validateParams() { pwd = file(params.pwd).toAbsolutePath().toString() if (!file(pwd).exists()) { printUsage(msg = "Directory not found: ${pwd}", exit_code=1) - } + } } // Call validation at the start validateParams() -# Generated code goes here \ No newline at end of file +# Generated code goes here