Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
pip install .

- name: Run tests
run: python3 -m pytest -s -v -m unit --cov=wfcommons tests/
run: python3 -m pytest -s -v -m unit --cov=wfcommons tests

- name: Upload coverage
if: github.ref == 'refs/heads/main'
Expand Down
18 changes: 13 additions & 5 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ def _install_WfCommons_on_container(container):
target_path = '/tmp/' # inside container
tar_data = _make_tarfile_of_wfcommons()
container.put_archive(target_path, tar_data)
# Cleanup files from the host
# Cleanup files that came from the host
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True)
# Clean up and force a rebuild of cpu-benchmark (because it may be compiled for the wrong architecture)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True,
stderr=True)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", stdout=True,
Expand All @@ -50,6 +51,7 @@ def _install_WfCommons_on_container(container):
# Install WfCommons on the container (to install wfbench and cpu-benchmark really)
exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages",
workdir="/tmp/WfCommons", stdout=True, stderr=True)
# print(output.decode())
if exit_code != 0:
raise RuntimeError("Failed to install WfCommons on the container")

Expand Down Expand Up @@ -88,13 +90,19 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=
stdout=True, stderr=True)
if exit_code != 0:
raise RuntimeError("Failed to copy wfbench script to the bin directory")

exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir],
stdout=True, stderr=True)
if exit_code != 0:
raise RuntimeError("Failed to copy cpu-benchmark executable to the bin directory")
else:
sys.stderr.write(f"[{backend}] Not Copying wfbench and cpu-benchmark...\n")

# Change file permissions
exit_code, output = container.exec_run(["sh", "-c", "sudo chown -R wfcommons:wfcommons "],
stdout=True, stderr=True)


container.backend = backend
return container

Expand Down Expand Up @@ -123,16 +131,16 @@ def _get_total_size_of_directory(directory_path: str):
total_size += os.path.getsize(filepath)
return total_size

def _compare_workflows(workflow1: Workflow, workflow_2: Workflow):
def _compare_workflows(workflow_1: Workflow, workflow_2: Workflow):

# Test the number of tasks
assert (len(workflow1.tasks) == len(workflow_2.tasks))
assert (len(workflow_1.tasks) == len(workflow_2.tasks))
# Test the task graph topology
assert (networkx.is_isomorphic(workflow1, workflow_2))
assert (networkx.is_isomorphic(workflow_1, workflow_2))
# Test the total file size sum
workflow1_input_bytes, workflow2_input_bytes = 0, 0
workflow1_output_bytes, workflow2_output_bytes = 0, 0
for workflow1_task, workflow2_task in zip(workflow1.tasks.values(), workflow_2.tasks.values()):
for workflow1_task, workflow2_task in zip(workflow_1.tasks.values(), workflow_2.tasks.values()):
# sys.stderr.write(f"WORKFLOW1: {workflow1_task.task_id} WORKFLOW2 TASK: {workflow2_task.task_id}\n")
for input_file in workflow1_task.input_files:
# sys.stderr.write(f"WORKFLOW1 INPUT FILE: {input_file.file_id} {input_file.size}\n")
Expand Down
12 changes: 11 additions & 1 deletion tests/translators_loggers/test_translators_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import time
import re
import os

from tests.test_helpers import _create_fresh_local_dir
from tests.test_helpers import _remove_local_dir_if_it_exists
Expand All @@ -38,6 +39,7 @@

from wfcommons.wfinstances import PegasusLogsParser
from wfcommons.wfinstances.logs import TaskVineLogsParser
from wfcommons.wfinstances.logs import MakeflowLogsParser


def _create_workflow_benchmark() -> (WorkflowBenchmark, int):
Expand Down Expand Up @@ -165,7 +167,7 @@ def run_workflow_taskvine(container, num_tasks, str_dirpath):

def run_workflow_makeflow(container, num_tasks, str_dirpath):
# Run the workflow (with full logging)
exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && makeflow --log-verbose ./workflow.makeflow"], stdout=True, stderr=True)
exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && makeflow --log-verbose --monitor=./monitor_data/ ./workflow.makeflow"], stdout=True, stderr=True)
# Check sanity
assert (exit_code == 0)
num_completed_jobs = len(re.findall(r'job \d+ completed', output.decode()))
Expand Down Expand Up @@ -257,6 +259,12 @@ def test_translator(self, backend) -> None:
translator = translator_classes[backend](benchmark.workflow)
translator.translate(output_folder=dirpath)

# # Make the directory that holds the translation world-writable,
# # so that docker commands won't fail
# TODO: Explore whether this below makes tests runnable on Linux due to
# different Docker permission schemes, etc.
# os.chmod(dirpath, 0o777)

# Start the Docker container
container = _start_docker_container(backend, str_dirpath, str_dirpath, str_dirpath + "bin/")

Expand All @@ -274,6 +282,8 @@ def test_translator(self, backend) -> None:
parser = PegasusLogsParser(dirpath / "work/wfcommons/pegasus/Blast-Benchmark/run0001/")
elif backend == "taskvine":
parser = TaskVineLogsParser(dirpath / "vine-run-info/most-recent/vine-logs", filenames_to_ignore=["cpu-benchmark","stress-ng", "wfbench"])
elif backend == "makeflow":
parser = MakeflowLogsParser(execution_dir = dirpath, resource_monitor_logs_dir = dirpath / "monitor_data/")
else:
parser = None

Expand Down
104 changes: 58 additions & 46 deletions wfcommons/wfinstances/logs/makeflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class MakeflowLogsParser(LogsParser):
"""
Parse Makeflow submit directory to generate workflow instance.

:param execution_dir: Makeflow workflow execution directory (contains .mf and .makeflowlog files).
:param execution_dir: Makeflow workflow execution directory (contains .mf/.makeflow and .makeflowlog files).
:type execution_dir: pathlib.Path
:param resource_monitor_logs_dir: Resource Monitor log files directory.
:param resource_monitor_logs_dir: Resource Monitor log files directory (created with `makeflow ----monitor=... ...`)
:type resource_monitor_logs_dir: pathlib.Path
:param description: Workflow instance description.
:type description: Optional[str]
Expand All @@ -46,28 +46,38 @@ def __init__(self,
"""Create an object of the makeflow log parser."""
super().__init__('Makeflow', 'http://ccl.cse.nd.edu/software/makeflow/', description, logger)

# Sanity check
# Sanity checks
if not execution_dir.is_dir():
raise OSError(f'The provided path does not exist or is not a folder: {execution_dir}')
if not resource_monitor_logs_dir.is_dir():
raise OSError(f'The provided path does not exist or is not a folder: {resource_monitor_logs_dir}')

# Makeflow file
files: List[pathlib.Path] = list(execution_dir.glob('*.mf'))
if len(files) > 1:
raise OSError(f'Multiple .mf files in: {execution_dir}')
if len(files) == 0:
raise OSError(f'Unable to find .mf file in: {execution_dir}')
files: List[pathlib.Path] = list(execution_dir.glob('*.makeflow'))
if len(files) > 1:
raise OSError(f'Multiple .makeflow files in: {execution_dir}')
if len(files) == 0:
raise OSError(f'Unable to find a .mf or .makeflow file in: {execution_dir}')
self.mf_file: pathlib.Path = files[0]

# Log file
files = list(execution_dir.glob('*.makeflowlog'))
if len(files) == 0:
raise OSError(f'Unable to find .makeflowlog file in: {execution_dir}')
if len(files) > 1:
raise OSError(f'Multiple .makeflowlog files in: {execution_dir}')
self.mf_log_file: pathlib.Path = files[0]
if self.mf_log_file.read_text().count("# NODE") == 0:
raise OSError(f'Not sufficiently verbose log file {self.mf_log_file}. Re-run the workflow with `makeflow --log-verbose ...`')

if not resource_monitor_logs_dir.is_dir():
raise OSError(f'The provided path does not exist or is not a folder: {resource_monitor_logs_dir}')

self.execution_dir: pathlib.Path = execution_dir

self.resource_monitor_logs_dir: pathlib.Path = resource_monitor_logs_dir
self.files_map = {}
self.args_map = {}
self._execution_dir: pathlib.Path = execution_dir
self._resource_monitor_logs_dir: pathlib.Path = resource_monitor_logs_dir
self._files_map = {}
self._args_map = {}

def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
"""
Expand Down Expand Up @@ -106,46 +116,48 @@ def _parse_workflow_file(self) -> None:
outputs = []
inputs = []
for line in f:
if ':' in line:
# print(f"Processing line: {line}")
if line.lstrip().startswith('#'):
continue
if ':' in line and '\t' not in line:
outputs = line.split(':')[0].split()
inputs = line.split(':')[1].split()

for file in itertools.chain(outputs, inputs):
if file not in self.files_map:
self.files_map[file] = {'task_name': None, 'children': [], 'file': []}
if file not in self._files_map:
self._files_map[file] = {'task_name': None, 'children': [], 'file': []}

elif len(line.strip()) > 0:
# task execution command
prefix = line.replace('./', '').replace('perl', '').strip().split()[1 if 'LOCAL' in line else 0]
task_name = "{}_ID{:07d}".format(prefix, task_id_counter)
elif '\t' in line:
# task execution command (likely olf here)
prefix = line.replace('./', '').strip().split()[1 if 'LOCAL' in line else 0]
task_name = "ID{:07d}".format(task_id_counter)

# create list of task files
list_files = []
# create list of input and output files
output_files = self._create_files(outputs, "output", task_name)
input_files = self._create_files(inputs, "input", task_name)

# create task
args = ' '.join(line.replace('LOCAL', '').replace('perl', '').strip().split())
args = ' '.join(line.split())
task = Task(name=task_name,
task_id="ID{:07d}".format(task_id_counter),
task_id=task_name,
category=prefix,
task_type=TaskType.COMPUTE,
runtime=0,
program=prefix,
args=args.split(),
cores=1,
input_files=input_files,
output_files=output_files,
logger=self.logger)
self.workflow.add_node(task_name, task=task)
self.args_map[args] = task
self.workflow.add_task(task)
args = args.replace('\\\\', '\\')
self._args_map[args] = task
task_id_counter += 1

# adding edges
for file in self.files_map:
for child in self.files_map[file]['children']:
if self.files_map[file]['task_name']:
self.workflow.add_edge(self.files_map[file]['task_name'], child)
for file in self._files_map:
for child in self._files_map[file]['children']:
if self._files_map[file]['task_name']:
self.workflow.add_edge(self._files_map[file]['task_name'], child)

def _create_files(self, files_list: List[str], input_or_output: str, task_name: str) -> List[File]:
"""
Expand All @@ -163,16 +175,16 @@ def _create_files(self, files_list: List[str], input_or_output: str, task_name:
"""
list_files = []
for file in files_list:
if self.files_map[file]['file']:
if self._files_map[file]['file']:
list_files.append(
self.files_map[file]['file'][0] if input_or_output == "input" else self.files_map[file]['file'][1])
self._files_map[file]['file'][0] if input_or_output == "input" else self._files_map[file]['file'][1])
else:
size = 0
file_path = self.execution_dir.joinpath(file)
file_path = self._execution_dir.joinpath(file)
if file_path.is_dir():
size = sum(math.ceil(f.stat().st_size / 1000) for f in file_path.glob("*") if f.is_file())
size = sum(f.stat().st_size for f in file_path.glob("*") if f.is_file())
elif file_path.is_file():
size = int(math.ceil(file_path.stat().st_size / 1000)) # B to KB
size = int(file_path.stat().st_size)

file_obj_in = File(file_id=file,
size=size,
Expand All @@ -181,13 +193,13 @@ def _create_files(self, files_list: List[str], input_or_output: str, task_name:
size=size,
logger=self.logger)
list_files.append(file_obj_in if input_or_output == "input" else file_obj_out)
self.files_map[file]['file'].extend([file_obj_in, file_obj_out])
self._files_map[file]['file'].extend([file_obj_in, file_obj_out])

# files dependencies
if input_or_output == "input":
self.files_map[file]['children'].append(task_name)
self._files_map[file]['children'].append(task_name)
else:
self.files_map[file]['task_name'] = task_name
self._files_map[file]['task_name'] = task_name

return list_files

Expand All @@ -208,24 +220,24 @@ def _parse_makeflow_log_file(self):

elif line.startswith('# FILE') and 'condorlog' not in line:
file_name = line.split()[3]
if file_name in self.files_map:
size = int(math.ceil(int(line.split()[5]) / 1000)) # B to KB
for file_obj in self.files_map[file_name]['file']:
if file_name in self._files_map:
size = int(line.split()[5])
for file_obj in self._files_map[file_name]['file']:
file_obj.size = size

def _parse_resource_monitor_logs(self):
"""Parse the log files produced by resource monitor"""
for file in pathlib.Path.glob(f'{self.resource_monitor_logs_dir}/*.summary'):
for file in self._resource_monitor_logs_dir.glob("*.summary"):
with open(file) as f:
data = json.load(f)

# task
task = self.args_map[data['command'].replace('perl', '').strip()]
task = self._args_map[data['command'].replace('perl', '').strip()]
task.runtime = float(data['wall_time'][0])
task.cores = float(data['cores'][0])
task.memory = int(data['memory'][0]) * 1000 # MB to KB
task.bytes_read = int(data['bytes_read'][0] * 1000) # MB to KB
task.bytes_written = int(data['bytes_written'][0] * 1000) # MB to KB
task.memory = int(data['memory'][0])
task.bytes_read = int(data['bytes_read'][0])
task.bytes_written = int(data['bytes_written'][0])
task.avg_cpu = float('%.4f' % (float(data['cpu_time'][0]) / float(data['wall_time'][0]) * 100))
task.machine = Machine(name=data['host'],
cpu={'coreCount': int(data['machine_cpus'][0]), 'speedInMHz': 0, 'vendor': ''},
Expand Down