diff --git a/tests/functional/test_cli.py b/tests/functional/test_cli.py index 73430cd54..a46e993ac 100644 --- a/tests/functional/test_cli.py +++ b/tests/functional/test_cli.py @@ -449,6 +449,164 @@ def get_registry_no_auto_pyramid(_container): result = mocked_sub_requests(self.app, self.client.deploy, test_id, cwl=package) assert result.success + def test_deploy_multi_cwl(self): + """ + Test deploying multiple CWL files (workflow with tools) using the CLI. + """ + # Create tool CWL definitions + tool1_id = f"{self.test_process_prefix}tool-1" + tool1_cwl = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": tool1_id, + "baseCommand": ["echo"], + "requirements": { + "DockerRequirement": { + "dockerPull": "debian:stretch-slim" + } + }, + "inputs": {"input": {"type": "string", "inputBinding": {"position": 1}}}, + "outputs": {"output": {"type": "stdout"}}, + "stdout": "output.txt" + } + + tool2_id = f"{self.test_process_prefix}tool-2" + tool2_cwl = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": tool2_id, + "baseCommand": ["cat"], + "requirements": { + "DockerRequirement": { + "dockerPull": "debian:stretch-slim" + } + }, + "inputs": {"file": {"type": "File", "inputBinding": {"position": 1}}}, + "outputs": {"output": {"type": "stdout"}}, + "stdout": "output.txt" + } + + # Create workflow that uses the tools + test_id = f"{self.test_process_prefix}multi-cwl-workflow" + workflow_cwl = { + "cwlVersion": "v1.2", + "class": "Workflow", + "id": test_id, + "inputs": { + "message": {"type": "string"} + }, + "outputs": { + "result": { + "type": "File", + "outputSource": "step2/output" + } + }, + "steps": { + "step1": { + "run": tool1_id, + "in": {"input": "message"}, + "out": ["output"] + }, + "step2": { + "run": tool2_id, + "in": {"file": "step1/output"}, + "out": ["output"] + } + } + } + + with tempfile.TemporaryDirectory() as tmp_dir: + tool1_path = os.path.join(tmp_dir, "tool-1.cwl") + tool2_path = os.path.join(tmp_dir, "tool-2.cwl") + workflow_path = os.path.join(tmp_dir, "workflow.cwl") + + with open(tool1_path, "w", encoding="utf-8") as f: + json.dump(tool1_cwl, f) + with open(tool2_path, "w", encoding="utf-8") as f: + json.dump(tool2_cwl, f) + with open(workflow_path, "w", encoding="utf-8") as f: + json.dump(workflow_cwl, f) + + # Deploy using list of CWL files + result = mocked_sub_requests( + self.app, + self.client.deploy, + test_id, + cwl=[tool1_path, tool2_path, workflow_path] + ) + + assert result.success + assert "processSummary" in result.body + assert result.body["processSummary"]["id"] == test_id + assert "deploymentDone" in result.body + assert result.body["deploymentDone"] is True + + # Verify the workflow was deployed + result = mocked_sub_requests(self.app, self.client.describe, test_id) + assert result.success + assert result.body["id"] == test_id + assert result.body["processDescriptionURL"] + + def test_deploy_multi_cwl_tools_only(self): + """ + Test deploying multiple CWL tool files using the CLI. + """ + tool1_id = f"{self.test_process_prefix}multi-tool-1" + tool1_cwl = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": tool1_id, + "baseCommand": ["echo"], + "requirements": { + "DockerRequirement": { + "dockerPull": "debian:stretch-slim" + } + }, + "inputs": {"input": {"type": "string", "inputBinding": {"position": 1}}}, + "outputs": {"output": {"type": "stdout"}}, + "stdout": "output.txt" + } + + tool2_id = f"{self.test_process_prefix}multi-tool-2" + tool2_cwl = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": tool2_id, + "baseCommand": ["cat"], + "requirements": { + "DockerRequirement": { + "dockerPull": "debian:stretch-slim" + } + }, + "inputs": {"file": {"type": "File", "inputBinding": {"position": 1}}}, + "outputs": {"output": {"type": "stdout"}}, + "stdout": "output.txt" + } + + with tempfile.TemporaryDirectory() as tmp_dir: + tool1_path = os.path.join(tmp_dir, "tool-1.cwl") + tool2_path = os.path.join(tmp_dir, "tool-2.cwl") + + with open(tool1_path, "w", encoding="utf-8") as f: + json.dump(tool1_cwl, f) + with open(tool2_path, "w", encoding="utf-8") as f: + json.dump(tool2_cwl, f) + + # Deploy using list of CWL files - should deploy the first tool as main + result = mocked_sub_requests( + self.app, + self.client.deploy, + tool1_id, + cwl=[tool1_path, tool2_path] + ) + + assert result.success + assert "processSummary" in result.body + # When deploying multiple tools without workflow, first tool becomes main + assert result.body["processSummary"]["id"] == tool1_id + assert "deploymentDone" in result.body + assert result.body["deploymentDone"] is True + def test_undeploy(self): # deploy a new process to leave the test one available other_payload = copy.deepcopy(self.test_payload["Echo"]) diff --git a/tests/processes/test_utils.py b/tests/processes/test_utils.py index b4f52f377..b341fc80d 100644 --- a/tests/processes/test_utils.py +++ b/tests/processes/test_utils.py @@ -7,6 +7,7 @@ import mock import pytest import yaml +from pyramid.httpexceptions import HTTPBadRequest, HTTPNotImplemented from tests import resources from tests.utils import ( @@ -16,9 +17,19 @@ setup_mongodb_servicestore ) from weaver.exceptions import PackageRegistrationError +from weaver.formats import ContentType from weaver.processes.constants import CWL_NAMESPACE_WEAVER_ID, CWL_REQUIREMENT_APP_WPS1, CWL_RequirementWeaverWPS1Type -from weaver.processes.utils import _check_package_file # noqa: W0212 -from weaver.processes.utils import register_cwl_processes_from_config, register_wps_processes_from_config +from weaver.processes.utils import ( # noqa: W0212 + _check_package_file, + _classify_multipart_part, + _get_multipart_content, + create_multipart_deploy, + parse_multipart_deploy, + register_cwl_processes_from_config, + register_wps_processes_from_config, + resolve_cwl_graph, + resolve_deployment_order +) WPS1_URL1 = resources.TEST_REMOTE_SERVER_URL WPS1_URL2 = "http://yet-another-server.com" @@ -397,3 +408,927 @@ def raise_deploy(*_, **__): assert mock_deploy.call_count == 0, "Deploy should not be reached due to failed CWL pre-validation." assert mock_load.call_count == 1 assert result is None # not returned + + +# ============================================================================= +# Multipart Deployment Parsing Tests +# ============================================================================= +# These tests directly test the internal multipart parsing functions to cover +# branches that are difficult to reach through integration tests. + + +def test_get_multipart_content_with_bytes(): + """ + Test _get_multipart_content with bytes input. + """ + content = b"test content" + result = _get_multipart_content(content, request=None) + assert result == b"test content" + assert isinstance(result, bytes) + + +def test_get_multipart_content_with_string(): + """ + Test _get_multipart_content with string input. + """ + content = "test content" + result = _get_multipart_content(content, request=None) + assert result == b"test content" + assert isinstance(result, bytes) + + +def test_get_multipart_content_with_request_body(): + """ + Test _get_multipart_content with request object. + """ + + class MockRequest: + body = b"request body content" + + result = _get_multipart_content(None, request=MockRequest()) + assert result == b"request body content" + + +def test_get_multipart_content_invalid_type(): + """ + Test _get_multipart_content with invalid input type raises error. + """ + with pytest.raises(HTTPBadRequest) as exc_info: + _get_multipart_content(12345, request=None) # type: ignore + assert "Invalid multipart content format" in str(exc_info.value) + + +def test_classify_multipart_part_with_cwl_class(): + """ + Test classification of part with explicit CWL class. + """ + part_data = { + "class": "CommandLineTool", + "id": "test-tool", + "inputs": {}, + "outputs": {} + } + cwl_packages = [] + parts_order = [] + parts_by_cid = {} + + result = _classify_multipart_part( + part_data, cwl_packages, parts_order, parts_by_cid, + content_id="tool-1", process_description=None + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0] == part_data + assert result is None # No process description returned + + +def test_classify_multipart_part_with_graph(): + """ + Test classification of part with cwlVersion and $graph. + """ + part_data = { + "cwlVersion": "v1.2", + "$graph": [ + {"class": "CommandLineTool", "id": "tool-1"} + ] + } + cwl_packages = [] + parts_order = [] + parts_by_cid = {} + + result = _classify_multipart_part( + part_data, cwl_packages, parts_order, parts_by_cid, + content_id="graph-1", process_description=None + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0] == part_data + assert result is None + + +def test_classify_multipart_part_with_process_description(): + """ + Test classification of part with processDescription. + """ + part_data = { + "processDescription": { + "id": "test-process", + "title": "Test Process" + } + } + cwl_packages = [] + parts_order = [] + parts_by_cid = {} + + result = _classify_multipart_part( + part_data, cwl_packages, parts_order, parts_by_cid, + content_id=None, process_description=None + ) + + assert len(cwl_packages) == 0 # Not added to CWL packages + assert result == part_data # Returned as process description + + +def test_classify_multipart_part_multiple_process_descriptions(caplog): + """ + Test warning when multiple process descriptions are found. + """ + first_desc = {"processDescription": {"id": "first"}} + second_desc = {"processDescription": {"id": "second"}} + + cwl_packages = [] + parts_order = [] + parts_by_cid = {} + + # First one is accepted + result1 = _classify_multipart_part( + first_desc, cwl_packages, parts_order, parts_by_cid, + content_id=None, process_description=None + ) + assert result1 == first_desc + + # Second one should trigger warning and be ignored + result2 = _classify_multipart_part( + second_desc, cwl_packages, parts_order, parts_by_cid, + content_id=None, process_description=first_desc + ) + assert result2 == first_desc # Still returns the first one + assert "Multiple process descriptions found" in caplog.text + + +def test_classify_multipart_part_with_implicit_cwl(): + """ + Test classification of part with implicit CWL characteristics (inputs/outputs/baseCommand). + """ + part_data = { + "id": "implicit-tool", + "inputs": {}, + "outputs": {"output": {"type": "File"}}, + "baseCommand": ["echo", "hello"] + } + cwl_packages = [] + parts_order = [] + parts_by_cid = {} + + result = _classify_multipart_part( + part_data, cwl_packages, parts_order, parts_by_cid, + content_id="implicit-1", process_description=None + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0] == part_data + assert result is None + + +def test_classify_multipart_part_with_steps(): + """ + Test classification with 'steps' field (workflow-like). + """ + part_data = { + "id": "workflow", + "steps": { + "step1": {"run": "tool1"} + } + } + cwl_packages = [] + parts_order = [] + parts_by_cid = {} + + _classify_multipart_part( + part_data, cwl_packages, parts_order, parts_by_cid, + content_id=None, process_description=None + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0] == part_data + + +def test_classify_multipart_part_generic_fallback(): + """ + Test fallback classification for generic JSON. + """ + part_data = { + "id": "test-id", + "title": "Some title", + "custom_field": "value" + } + cwl_packages = [] + parts_order = [] + parts_by_cid = {} + + # No process description yet, so this becomes the process description + result = _classify_multipart_part( + part_data, cwl_packages, parts_order, parts_by_cid, + content_id=None, process_description=None + ) + + assert len(cwl_packages) == 0 + assert result == part_data # Returned as process description + + +def test_classify_multipart_part_non_dict(): + """ + Test classification of non-dict data (should be ignored). + """ + part_data = "just a string" + cwl_packages = [] + parts_order = [] + parts_by_cid = {} + + result = _classify_multipart_part( + part_data, cwl_packages, parts_order, parts_by_cid, + content_id=None, process_description=None + ) + + assert len(cwl_packages) == 0 + assert result is None + + +def test_parse_multipart_deploy_simple(): + """ + Test parsing a simple multipart deployment. + """ + tool_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "test-tool", + "inputs": {}, + "outputs": {"output": {"type": "File"}} + }) + + boundary = "----Boundary123" + multipart_body = ( + f"------Boundary123\r\n" + f"Content-Type: {ContentType.APP_CWL_JSON}\r\n" + f"\r\n" + f"{tool_cwl}\r\n" + f"------Boundary123--\r\n" + ).encode('utf-8') + + content_type = f"multipart/mixed; boundary={boundary}" + + cwl_packages, process_desc = parse_multipart_deploy( + multipart_body, content_type, request=None + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0]["id"] == "test-tool" + assert process_desc is None + + +def test_parse_multipart_deploy_missing_boundary(): + """ + Test that missing boundary in Content-Type raises error. + """ + content = b"some content" + content_type = "multipart/mixed" # No boundary parameter + + with pytest.raises(HTTPBadRequest) as exc_info: + parse_multipart_deploy(content, content_type, request=None) + assert exc_info.value.json is not None + assert "boundary" in exc_info.value.json.get("title", "").lower() + + +def test_parse_multipart_deploy_no_cwl_parts(): + """ + Test that multipart with no CWL parts raises error. + """ + generic_json = json.dumps({"some": "data"}) + + boundary = "----Boundary123" + multipart_body = ( + f"------Boundary123\r\n" + f"Content-Type: {ContentType.APP_JSON}\r\n" + f"\r\n" + f"{generic_json}\r\n" + f"------Boundary123--\r\n" + ).encode('utf-8') + + content_type = f"multipart/mixed; boundary={boundary}" + + with pytest.raises(HTTPBadRequest) as exc_info: + parse_multipart_deploy(multipart_body, content_type, request=None) + assert exc_info.value.json is not None + assert "No CWL packages found" in exc_info.value.json.get("title", "") + + +def test_parse_multipart_deploy_content_location_static_file(monkeypatch): + """ + Test Content-Location header successfully fetches remote static CWL file. + """ + # CWL to be "fetched" from Content-Location + remote_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "remote-tool", + "inputs": {}, + "outputs": {"output": {"type": "File"}} + }) + + # Mock load_file to return the CWL content + def mock_load_file(path, text=False): + assert path == "https://example.com/tool.cwl" + return remote_cwl + + monkeypatch.setattr("weaver.processes.utils.load_file", mock_load_file) + + boundary = "----Boundary123" + multipart_body = ( + f"------Boundary123\r\n" + f"Content-Type: {ContentType.APP_CWL_JSON}\r\n" + f"Content-Location: https://example.com/tool.cwl\r\n" + f"\r\n" + f"------Boundary123--\r\n" + ).encode('utf-8') + + content_type = f"multipart/mixed; boundary={boundary}" + + cwl_packages, process_desc = parse_multipart_deploy( + multipart_body, content_type, request=None + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0]["id"] == "remote-tool" + assert process_desc is None + + +def test_parse_multipart_deploy_content_location_weaver_package(monkeypatch): + """ + Test Content-Location header successfully fetches from Weaver package endpoint. + """ + # Mock CWL package + cwl_package = { + "cwlVersion": "v1.2", + "class": "Workflow", + "id": "weaver-workflow", + "inputs": {"input": {"type": "string"}}, + "outputs": {"output": {"type": "File"}}, + "steps": {} + } + + # Mock _generate_process_with_cwl_from_reference + def mock_generate_process(reference, process_hint=None): + assert "/processes/test-process/package" in reference + return (cwl_package, {"identifier": "test-process"}) + + monkeypatch.setattr("weaver.processes.wps_package._generate_process_with_cwl_from_reference", + mock_generate_process) + + boundary = "----Boundary123" + multipart_body = ( + f"------Boundary123\r\n" + f"Content-Type: {ContentType.APP_CWL_JSON}\r\n" + f"Content-Location: https://weaver.example.com/processes/test-process/package\r\n" + f"\r\n" + f"------Boundary123--\r\n" + ).encode('utf-8') + + content_type = f"multipart/mixed; boundary={boundary}" + + cwl_packages, process_desc = parse_multipart_deploy( + multipart_body, content_type, request=None + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0]["id"] == "weaver-workflow" + assert process_desc is None + + +def test_parse_multipart_deploy_content_location_wps_endpoint(monkeypatch): + """ + Test Content-Location header successfully fetches from WPS endpoint. + """ + # Mock CWL package generated from WPS DescribeProcess + wps_cwl_package = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "wps-process", + "inputs": {"input": {"type": "string"}}, + "outputs": {"output": {"type": "File"}} + } + + # Mock _generate_process_with_cwl_from_reference + def mock_generate_process(reference, process_hint=None): + assert "wps" in reference.lower() or "describeprocess" in reference.lower() + return (wps_cwl_package, {"identifier": "wps-process"}) + + monkeypatch.setattr("weaver.processes.wps_package._generate_process_with_cwl_from_reference", + mock_generate_process) + + boundary = "----Boundary123" + multipart_body = ( + f"------Boundary123\r\n" + f"Content-Type: {ContentType.APP_CWL_JSON}\r\n" + f"Content-Location: https://wps.example.com/wps?service=WPS&request=DescribeProcess&identifier=test\r\n" + f"\r\n" + f"------Boundary123--\r\n" + ).encode('utf-8') + + content_type = f"multipart/mixed; boundary={boundary}" + + cwl_packages, process_desc = parse_multipart_deploy( + multipart_body, content_type, request=None + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0]["id"] == "wps-process" + assert process_desc is None + + +def test_parse_multipart_deploy_content_location_failure(monkeypatch): + """ + Test Content-Location header raises error when fetch fails. + """ + # Mock load_file to raise an exception + def mock_load_file(path, text=False): + raise RuntimeError("Failed to fetch file") + + monkeypatch.setattr("weaver.processes.utils.load_file", mock_load_file) + + boundary = "----Boundary123" + multipart_body = ( + f"------Boundary123\r\n" + f"Content-Type: {ContentType.APP_CWL_JSON}\r\n" + f"Content-Location: https://example.com/nonexistent.cwl\r\n" + f"\r\n" + f"------Boundary123--\r\n" + ).encode('utf-8') + + content_type = f"multipart/mixed; boundary={boundary}" + + with pytest.raises(HTTPBadRequest) as exc_info: + parse_multipart_deploy(multipart_body, content_type, request=None) + assert exc_info.value.json is not None + assert "Failed to fetch Content-Location" in exc_info.value.json.get("title", "") + assert "https://example.com/nonexistent.cwl" in exc_info.value.json.get("cause", {}).get("location", "") + + +def test_create_multipart_deploy_single_tool(): + """ + Test creating multipart content from a single CommandLineTool. + """ + tool_cwl = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "test-tool", + "baseCommand": ["echo"], + "inputs": { + "message": {"type": "string"} + }, + "outputs": { + "output": {"type": "stdout"} + } + } + + content, content_type = create_multipart_deploy([tool_cwl]) + + # Verify content type + assert "multipart/related" in content_type + assert "boundary=" in content_type + assert isinstance(content, bytes) + + # Parse the generated multipart content + cwl_packages, _ = parse_multipart_deploy(content, content_type, request=None) + + assert len(cwl_packages) == 1 + assert cwl_packages[0]["class"] == "CommandLineTool" + assert cwl_packages[0]["id"] == "test-tool" + + +def test_create_multipart_deploy_workflow_and_tools(): + """ + Test creating multipart content from a workflow and multiple tools. + """ + tool1_cwl = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "tool-1", + "baseCommand": ["echo"], + "inputs": {"input": {"type": "string"}}, + "outputs": {"output": {"type": "stdout"}} + } + + tool2_cwl = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "tool-2", + "baseCommand": ["cat"], + "inputs": {"file": {"type": "File"}}, + "outputs": {"output": {"type": "stdout"}} + } + + workflow_cwl = { + "cwlVersion": "v1.2", + "class": "Workflow", + "id": "main-workflow", + "inputs": {"input": {"type": "string"}}, + "outputs": {"result": {"type": "File", "outputSource": "step2/output"}}, + "steps": { + "step1": {"run": "#tool-1", "in": {"input": "input"}, "out": ["output"]}, + "step2": {"run": "#tool-2", "in": {"file": "step1/output"}, "out": ["output"]} + } + } + + content, content_type = create_multipart_deploy([tool1_cwl, tool2_cwl, workflow_cwl]) + + # Verify content type includes start parameter for main workflow + assert "multipart/related" in content_type + assert "boundary=" in content_type + assert "start=workflow-" in content_type # Should reference the workflow + assert isinstance(content, bytes) + + # Parse the generated multipart content + cwl_packages, _ = parse_multipart_deploy(content, content_type, request=None) + + assert len(cwl_packages) == 3 + # Verify all packages are present + ids = [pkg.get("id") for pkg in cwl_packages] + assert "tool-1" in ids + assert "tool-2" in ids + assert "main-workflow" in ids + + +def test_create_multipart_deploy_with_process_description(): + """ + Test creating multipart content with an optional process description. + """ + tool_cwl = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "test-tool", + "baseCommand": ["echo"], + "inputs": {}, + "outputs": {} + } + + process_desc = { + "processDescription": { + "id": "test-tool", + "title": "Test Tool", + "description": "A test tool for multipart deployment" + } + } + + content, content_type = create_multipart_deploy([tool_cwl], process_description=process_desc) + + # Verify content type + assert "multipart/related" in content_type + assert isinstance(content, bytes) + + # Parse the generated multipart content + cwl_packages, parsed_desc = parse_multipart_deploy(content, content_type, request=None) + + assert len(cwl_packages) == 1 + assert parsed_desc is not None + assert parsed_desc.get("processDescription", {}).get("id") == "test-tool" + + +def test_create_multipart_deploy_empty_list(): + """ + Test that creating multipart from empty list raises error. + """ + with pytest.raises(ValueError, match="At least one CWL file must be provided"): + create_multipart_deploy([]) + + +def test_create_multipart_deploy_custom_boundary(): + """ + Test creating multipart with a custom boundary. + """ + tool_cwl = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "test-tool", + "baseCommand": ["echo"], + "inputs": {}, + "outputs": {} + } + + custom_boundary = "CustomBoundary123" + content, content_type = create_multipart_deploy([tool_cwl], boundary=custom_boundary) + + # Verify custom boundary is used + assert f"boundary={custom_boundary}" in content_type + assert custom_boundary.encode() in content + + +def test_create_multipart_deploy_with_file_paths(tmp_path): + """ + Test creating multipart from CWL file paths. + """ + # Create a temporary CWL file + tool_cwl = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "file-tool", + "baseCommand": ["echo"], + "inputs": {}, + "outputs": {} + } + + cwl_file = tmp_path / "tool.cwl" + with open(cwl_file, "w", encoding="utf-8") as f: + json.dump(tool_cwl, f) + + # Create multipart from file path + content, content_type = create_multipart_deploy([str(cwl_file)]) + + # Verify content + assert isinstance(content, bytes) + assert "multipart/related" in content_type + + # Parse and verify + cwl_packages, _ = parse_multipart_deploy(content, content_type, request=None) + + assert len(cwl_packages) == 1 + assert cwl_packages[0]["id"] == "file-tool" + + +def test_resolve_cwl_graph_no_graph(): + """ + Test resolve_cwl_graph returns package as-is when no $graph is present. + """ + package = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "simple-tool" + } + result = resolve_cwl_graph(package) + assert result == package + + +def test_resolve_cwl_graph_with_non_list_graph(): + """ + Test resolve_cwl_graph returns package as-is when $graph is not a list. + """ + package = { + "cwlVersion": "v1.2", + "$graph": "not-a-list", # Invalid but should be returned as-is + "id": "test-package" + } + result = resolve_cwl_graph(package) + assert result == package + + +def test_resolve_cwl_graph_single_item(): + """ + Test resolve_cwl_graph unpacks single-item $graph. + """ + package = { + "cwlVersion": "v1.2", + "$graph": [ + { + "class": "CommandLineTool", + "id": "single-tool" + } + ] + } + result = resolve_cwl_graph(package) + assert isinstance(result, dict) + assert result["class"] == "CommandLineTool" + assert result["id"] == "single-tool" + assert result["cwlVersion"] == "v1.2" + assert "$graph" not in result + + +def test_resolve_cwl_graph_multiple_items(): + """ + Test resolve_cwl_graph returns list for multiple items in $graph. + """ + package = { + "cwlVersion": "v1.2", + "$graph": [ + {"class": "CommandLineTool", "id": "tool-1"}, + {"class": "Workflow", "id": "workflow-1"} + ] + } + result = resolve_cwl_graph(package) + assert isinstance(result, list) + assert len(result) == 2 + assert result[0]["id"] == "tool-1" + assert result[0]["cwlVersion"] == "v1.2" + assert result[1]["id"] == "workflow-1" + assert result[1]["cwlVersion"] == "v1.2" + + +def test_resolve_deployment_order_single_workflow(): + """ + Test resolve_deployment_order with a single workflow. + """ + workflow = {"class": "Workflow", "id": "test-workflow"} + tools, main_workflow = resolve_deployment_order([workflow]) + + assert not tools + assert main_workflow == workflow + + +def test_resolve_deployment_order_command_line_tool(): + """ + Test resolve_deployment_order with CommandLineTool. + """ + tool = {"class": "CommandLineTool", "id": "test-tool"} + tools, main_workflow = resolve_deployment_order([tool]) + + assert len(tools) == 1 + assert tools[0] == tool + assert main_workflow is None + + +def test_resolve_deployment_order_expression_tool(): + """ + Test resolve_deployment_order with ExpressionTool. + """ + expr_tool = {"class": "ExpressionTool", "id": "test-expr"} + tools, main_workflow = resolve_deployment_order([expr_tool]) + + assert len(tools) == 1 + assert tools[0] == expr_tool + assert main_workflow is None + + +def test_resolve_deployment_order_workflow_and_tools(): + """ + Test resolve_deployment_order with workflow and multiple tools. + """ + tool1 = {"class": "CommandLineTool", "id": "tool-1"} + tool2 = {"class": "ExpressionTool", "id": "tool-2"} + workflow = {"class": "Workflow", "id": "main-workflow"} + + tools, main_workflow = resolve_deployment_order([tool1, tool2, workflow]) + + assert len(tools) == 2 + assert tool1 in tools + assert tool2 in tools + assert main_workflow == workflow + + +def test_resolve_deployment_order_multiple_workflows_error(): + """ + Test resolve_deployment_order raises error for multiple workflows. + """ + workflow1 = {"class": "Workflow", "id": "workflow-1"} + workflow2 = {"class": "Workflow", "id": "workflow-2"} + + with pytest.raises(HTTPNotImplemented) as exc_info: + resolve_deployment_order([workflow1, workflow2]) + + assert exc_info.value.json is not None + assert "Multiple Workflow definitions" in exc_info.value.json.get("title", "") + + +def test_resolve_deployment_order_no_packages(): + """ + Test resolve_deployment_order with empty list. + """ + tools, main_workflow = resolve_deployment_order([]) + assert not tools + assert main_workflow is None + + +def test_classify_multipart_part_without_content_id(): + """ + Test _classify_multipart_part with no content_id (empty string). + """ + part_data = { + "class": "CommandLineTool", + "id": "test-tool", + "inputs": {}, + "outputs": {} + } + cwl_packages = [] + parts_order = [] + parts_by_cid = {} + + result = _classify_multipart_part( + part_data, cwl_packages, parts_order, parts_by_cid, + content_id="", process_description=None # Empty content_id + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0] == part_data + assert len(parts_by_cid) == 0 # Should not be added when content_id is empty + assert result is None + + +def test_parse_multipart_deploy_with_bytes_content(): + """ + Test parse_multipart_deploy properly decodes bytes content. + """ + tool_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "test-tool", + "inputs": {}, + "outputs": {"output": {"type": "File"}} + }) + + boundary = "----Boundary123" + multipart_body = ( + f"------Boundary123\r\n" + f"Content-Type: {ContentType.APP_CWL_JSON}\r\n" + f"\r\n" + f"{tool_cwl}\r\n" + f"------Boundary123--\r\n" + ).encode('utf-8') # Already bytes + + content_type = f"multipart/mixed; boundary={boundary}" + + cwl_packages, process_desc = parse_multipart_deploy( + multipart_body, content_type, request=None + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0]["id"] == "test-tool" + assert process_desc is None + + +def test_parse_multipart_deploy_no_start_parameter_non_workflow(caplog): + """ + Test parse_multipart_deploy warning when no start parameter and first element is not a Workflow. + """ + tool_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": "test-tool", + "inputs": {}, + "outputs": {"output": {"type": "File"}} + }) + + boundary = "----Boundary123" + multipart_body = ( + f"------Boundary123\r\n" + f"Content-Type: {ContentType.APP_CWL_JSON}\r\n" + f"\r\n" + f"{tool_cwl}\r\n" + f"------Boundary123--\r\n" + ).encode('utf-8') + + content_type = f"multipart/related; boundary={boundary}" # multipart/related without start parameter + + cwl_packages, _ = parse_multipart_deploy( + multipart_body, content_type, request=None + ) + + assert len(cwl_packages) == 1 + assert cwl_packages[0]["id"] == "test-tool" + # Check that a warning was logged + assert any("No 'start' parameter provided" in record.message for record in caplog.records) + + +def test_create_multipart_deploy_single_workflow(): + """ + Test create_multipart_deploy with a single workflow (workflow_count == 1). + """ + workflow_cwl = { + "cwlVersion": "v1.2", + "class": "Workflow", + "id": "test-workflow", + "inputs": {"input": {"type": "string"}}, + "outputs": {"output": {"type": "File"}}, + "steps": {} + } + + content, content_type = create_multipart_deploy([workflow_cwl]) + + # Verify content was created + assert isinstance(content, bytes) + assert "multipart/related" in content_type + + # Verify that the start parameter was added (workflow_count == 1 branch) + assert "start=workflow-0" in content_type + + # Parse and verify + cwl_packages, _ = parse_multipart_deploy(content, content_type, request=None) + assert len(cwl_packages) == 1 + assert cwl_packages[0]["id"] == "test-workflow" + + +def test_create_multipart_deploy_multiple_workflows(): + """ + Test create_multipart_deploy with multiple workflows. + """ + workflow1 = { + "cwlVersion": "v1.2", + "class": "Workflow", + "id": "workflow-1", + "inputs": {}, + "outputs": {}, + "steps": {} + } + workflow2 = { + "cwlVersion": "v1.2", + "class": "Workflow", + "id": "workflow-2", + "inputs": {}, + "outputs": {}, + "steps": {} + } + + content, content_type = create_multipart_deploy([workflow1, workflow2]) + + # Verify content was created + assert isinstance(content, bytes) + assert "multipart/related" in content_type + + # First workflow should be the main one + assert "start=workflow-0" in content_type diff --git a/tests/utils.py b/tests/utils.py index 8d86788ae..b8b6684ad 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -416,6 +416,18 @@ class MockedRequest(DummyRequest): def text(self): return bytes2str(self.body) if self.body else json.dumps(self.json, ensure_ascii=False) + @property + def content_type(self): + """ + Return the Content-Type from headers, defaulting to application/json if JSON body is present. + """ + if hasattr(self, 'headers') and 'Content-Type' in self.headers: + return self.headers['Content-Type'] + # Default to JSON content type if json attribute is set + if self.json: + return ContentType.APP_JSON + return None + class MockedResponse(TestResponse): """ diff --git a/tests/wps_restapi/test_processes.py b/tests/wps_restapi/test_processes.py index eab195d34..5af0e39f5 100644 --- a/tests/wps_restapi/test_processes.py +++ b/tests/wps_restapi/test_processes.py @@ -16,7 +16,6 @@ import pyramid.testing import pytest import stopit -import webtest.app import yaml from parameterized import parameterized from pywps.inout import LiteralInput @@ -1465,14 +1464,727 @@ def test_deploy_process_CWL_direct_graph_JSON(self): def test_deploy_process_CWL_direct_graph_YAML(self): self.deploy_process_CWL_direct(ContentType.APP_CWL_YAML, graph_count=1) - # FIXME: make xfail once nested CWL definitions implemented (https://github.com/crim-ca/weaver/issues/56) + def test_deploy_process_CWL_direct_graph_multi_simple(self): + """ + Test deployment of multiple CWL definitions via $graph (simple case with 2 tools). + + This validates that $graph with multiple items is accepted and deployed. + """ + test_id = self.fully_qualified_test_name() + cwl = { + "cwlVersion": "v1.2", + "$graph": [ + { + "class": "CommandLineTool", + "id": f"{test_id}-tool-1", + "inputs": {}, + "outputs": { + "output": { + "type": "File", + "outputBinding": {"glob": "stdout.log"} + } + }, + "requirements": { + "DockerRequirement": {"dockerPull": "python:3.12-alpine"} + }, + "baseCommand": ["python3", "-V"], + "stdout": "stdout.log" + }, + { + "class": "CommandLineTool", + "id": f"{test_id}-tool-2", + "inputs": {}, + "outputs": { + "output": { + "type": "File", + "outputBinding": {"glob": "stdout.log"} + } + }, + "requirements": { + "DockerRequirement": {"dockerPull": "alpine:latest"} + }, + "baseCommand": ["echo", "hello"], + "stdout": "stdout.log" + } + ] + } + + headers = {"Content-Type": ContentType.APP_CWL_JSON} + resp = mocked_sub_requests(self.app, "post", "/processes", data=cwl, headers=headers, + only_local=True) # mock in case of TestApp self-reference URLs + assert resp.status_code == 201, f"Expected 201, got {resp.status_code}: {resp.json}" + + result = resp.json + assert "processSummary" in result + assert result["deploymentDone"] is True + + # Since there's no workflow, first tool should be the main process + main_id = result["processSummary"]["id"] + assert main_id in [f"{test_id}-tool-1", f"{test_id}-tool-2"] + + # Verify main process was deployed + resp = self.app.get(f"/processes/{main_id}") + assert resp.status_code == 200 + + def test_deploy_process_CWL_direct_graph_multi_valid(self): + """ + Test deployment of multiple CWL definitions via $graph. + + This test validates that multiple CommandLineTool definitions can be deployed + along with a Workflow that references them. + """ + test_id = self.fully_qualified_test_name() + # Create a complete workflow with multiple steps + cwl = { + "cwlVersion": "v1.2", + "$graph": [ + { + "class": "CommandLineTool", + "id": f"{test_id}-echo-tool", + "inputs": { + "message": "string" + }, + "outputs": { + "output": { + "type": "File", + "outputBinding": { + "glob": "output.txt" + } + } + }, + "requirements": { + "DockerRequirement": { + "dockerPull": "alpine:latest" + } + }, + "baseCommand": ["sh", "-c"], + "arguments": [ + "echo $(inputs.message) > output.txt" + ] + }, + { + "class": "CommandLineTool", + "id": f"{test_id}-cat-tool", + "inputs": { + "file": "File" + }, + "outputs": { + "output": { + "type": "File", + "outputBinding": { + "glob": "cat_output.txt" + } + } + }, + "requirements": { + "DockerRequirement": { + "dockerPull": "alpine:latest" + } + }, + "baseCommand": ["cat"], + "stdin": "$(inputs.file.path)", + "stdout": "cat_output.txt" + }, + { + "class": "Workflow", + "id": f"{test_id}-workflow", + "inputs": { + "message": "string" + }, + "outputs": { + "result": { + "type": "File", + "outputSource": "cat_step/output" + } + }, + "steps": { + "echo_step": { + "run": f"{test_id}-echo-tool", + "in": { + "message": "message" + }, + "out": ["output"] + }, + "cat_step": { + "run": f"{test_id}-cat-tool", + "in": { + "file": "echo_step/output" + }, + "out": ["output"] + } + } + } + ] + } + + headers = {"Content-Type": ContentType.APP_CWL_JSON} + resp = mocked_sub_requests(self.app, "post", "/processes", data=cwl, headers=headers, + only_local=True) # mock in case of TestApp self-reference URLs + assert resp.status_code == 201, f"Expected 201, got {resp.status_code}: {resp.json}" + + result = resp.json + assert "processSummary" in result + assert result["deploymentDone"] is True + + # The main workflow should be deployed + main_id = result["processSummary"]["id"] + assert main_id == f"{test_id}-workflow" + + # Verify main workflow was deployed + desc = self.get_process_description(main_id, schema=ProcessSchema.OLD) + assert desc["process"]["id"] == main_id + pkg = self.get_application_package(main_id) + assert pkg["class"] == "Workflow" + + # Verify child tools were deployed + if "deployedProcesses" in result: + assert f"{test_id}-echo-tool" in result["deployedProcesses"] + assert f"{test_id}-cat-tool" in result["deployedProcesses"] + + # Verify we can retrieve the child tools + desc = self.get_process_description(f"{test_id}-echo-tool", schema=ProcessSchema.OLD) + assert desc["process"]["id"] == f"{test_id}-echo-tool" + pkg = self.get_application_package(f"{test_id}-echo-tool") + assert pkg["class"] == "CommandLineTool" + + desc = self.get_process_description(f"{test_id}-cat-tool", schema=ProcessSchema.OLD) + assert desc["process"]["id"] == f"{test_id}-cat-tool" + pkg = self.get_application_package(f"{test_id}-cat-tool") + assert pkg["class"] == "CommandLineTool" + + def test_deploy_process_CWL_multipart(self): + """ + Test deployment of multiple CWL files via multipart/mixed request. + + This validates that multipart deployment can be used to deploy a workflow + with its dependent tools in a single request. + """ + test_id = self.fully_qualified_test_name() + + # Create individual CWL definitions + echo_tool_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": f"{test_id}-echo-tool", + "inputs": { + "message": "string" + }, + "outputs": { + "output": { + "type": "File", + "outputBinding": { + "glob": "output.txt" + } + } + }, + "requirements": { + "DockerRequirement": { + "dockerPull": "alpine:latest" + } + }, + "baseCommand": ["sh", "-c"], + "arguments": [ + "echo $(inputs.message) > output.txt" + ] + }) + + cat_tool_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": f"{test_id}-cat-tool", + "inputs": { + "file": "File" + }, + "outputs": { + "output": { + "type": "File", + "outputBinding": { + "glob": "cat_output.txt" + } + } + }, + "requirements": { + "DockerRequirement": { + "dockerPull": "alpine:latest" + } + }, + "baseCommand": ["cat"], + "stdin": "$(inputs.file.path)", + "stdout": "cat_output.txt" + }) + + workflow_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "Workflow", + "id": f"{test_id}-workflow", + "inputs": { + "message": "string" + }, + "outputs": { + "result": { + "type": "File", + "outputSource": "cat_step/output" + } + }, + "steps": { + "echo_step": { + "run": f"{test_id}-echo-tool", + "in": { + "message": "message" + }, + "out": ["output"] + }, + "cat_step": { + "run": f"{test_id}-cat-tool", + "in": { + "file": "echo_step/output" + }, + "out": ["output"] + } + } + }) + + # Create multipart content + boundary = "----WebKitFormBoundary7MA4YWxkTrZu0gW" + multipart_body = ( + f"------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + f"Content-Type: application/cwl+json\r\n" + f"Content-ID: \r\n" + f"\r\n" + f"{echo_tool_cwl}\r\n" + f"------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + f"Content-Type: application/cwl+json\r\n" + f"Content-ID: \r\n" + f"\r\n" + f"{cat_tool_cwl}\r\n" + f"------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + f"Content-Type: application/cwl+json\r\n" + f"Content-ID: \r\n" + f"\r\n" + f"{workflow_cwl}\r\n" + f"------WebKitFormBoundary7MA4YWxkTrZu0gW--\r\n" + ).encode('utf-8') + + content_type_header = f"multipart/mixed; boundary={boundary}" + + resp = mocked_sub_requests( + self.app, "post", "/processes", + data=multipart_body, + headers={"Content-Type": content_type_header}, + only_local=True + ) + + assert resp.status_code == 201, ( + f"Expected 201, got {resp.status_code}. " + f"Content-Type: {resp.content_type}. " + f"Body: {resp.text[:500] if hasattr(resp, 'text') else resp.body[:500]}" + ) + + result = resp.json + assert "processSummary" in result + assert result["deploymentDone"] is True + + # The main workflow should be deployed + main_id = result["processSummary"]["id"] + assert main_id == f"{test_id}-workflow" + + # Verify main workflow was deployed + desc = self.get_process_description(main_id, schema=ProcessSchema.OLD) + assert desc["process"]["id"] == main_id + pkg = self.get_application_package(main_id) + assert pkg["class"] == "Workflow" + + # Verify child tools were deployed + if "deployedProcesses" in result: + assert f"{test_id}-echo-tool" in result["deployedProcesses"] + assert f"{test_id}-cat-tool" in result["deployedProcesses"] + + def test_deploy_process_CWL_multipart_multiple_workflows_invalid(self): + """ + Test that deployment of multiple Workflows via multipart is rejected. + + This test validates that attempting to deploy multiple Workflow definitions + in a single multipart request fails with an appropriate error, since only one + Workflow is allowed per deployment. + """ + test_id = "test_multipart_multi_workflow_invalid" + + # Create two workflow definitions + workflow1_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "Workflow", + "id": f"{test_id}-workflow-1", + "inputs": { + "message": "string" + }, + "outputs": { + "result": { + "type": "string", + "outputSource": "step1/output" + } + }, + "steps": { + "step1": { + "run": { + "class": "CommandLineTool", + "baseCommand": ["echo"], + "inputs": { + "message": "string" + }, + "outputs": { + "output": { + "type": "stdout" + } + } + }, + "in": { + "message": "message" + }, + "out": ["output"] + } + } + }) + + workflow2_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "Workflow", + "id": f"{test_id}-workflow-2", + "inputs": { + "text": "string" + }, + "outputs": { + "result": { + "type": "string", + "outputSource": "step1/output" + } + }, + "steps": { + "step1": { + "run": { + "class": "CommandLineTool", + "baseCommand": ["cat"], + "inputs": { + "text": "string" + }, + "outputs": { + "output": { + "type": "stdout" + } + } + }, + "in": { + "text": "text" + }, + "out": ["output"] + } + } + }) + + # Create multipart content with two workflows + boundary = "----WebKitFormBoundary7MA4YWxkTrZu0gW" + multipart_body = ( + f"------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + f"Content-Type: application/cwl+json\r\n" + f"Content-ID: \r\n" + f"\r\n" + f"{workflow1_cwl}\r\n" + f"------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + f"Content-Type: application/cwl+json\r\n" + f"Content-ID: \r\n" + f"\r\n" + f"{workflow2_cwl}\r\n" + f"------WebKitFormBoundary7MA4YWxkTrZu0gW--\r\n" + ).encode('utf-8') + + content_type_header = f"multipart/mixed; boundary={boundary}" + + resp = mocked_sub_requests( + self.app, "post", "/processes", + data=multipart_body, + headers={"Content-Type": content_type_header}, + only_local=True + ) + + # Should fail because multiple Workflows are not allowed (HTTP 501 Not Implemented) + assert resp.status_code == 501, ( + f"Expected 501 Not Implemented for multiple Workflows in multipart, got {resp.status_code}. " + f"Content-Type: {resp.content_type}. " + f"Body: {resp.text[:500] if hasattr(resp, 'text') else resp.body[:500]}" + ) + error_json = resp.json + error_text = json.dumps(error_json).lower() + assert any(word in error_text for word in ["workflow", "multiple", "one"]), ( + f"Error message should mention workflow constraint: {error_json}" + ) + def test_deploy_process_CWL_direct_graph_multi_invalid(self): - with pytest.raises((webtest.app.AppError, AssertionError)) as exc: # noqa - self.deploy_process_CWL_direct(ContentType.APP_CWL_JSON, graph_count=2) - error = str(exc.value) - assert "400 Bad Request" in error - assert "Invalid schema" in error - assert "Longer than maximum length 1" in error + """ + Test that deployment of multiple Workflows via $graph is rejected. + + This test validates that attempting to deploy multiple Workflow definitions + in a single $graph fails with an appropriate error, since only one Workflow + is allowed per deployment. + """ + # Create a $graph with multiple Workflows (invalid) + cwl = { + "cwlVersion": "v1.2", + "$graph": [ + { + "class": "Workflow", + "id": "workflow-1", + "inputs": { + "message": "string" + }, + "outputs": { + "result": { + "type": "string", + "outputSource": "step1/output" + } + }, + "steps": { + "step1": { + "run": { + "class": "CommandLineTool", + "baseCommand": ["echo"], + "inputs": { + "message": "string" + }, + "outputs": { + "output": { + "type": "stdout" + } + } + }, + "in": { + "message": "message" + }, + "out": ["output"] + } + } + }, + { + "class": "Workflow", + "id": "workflow-2", + "inputs": { + "text": "string" + }, + "outputs": { + "result": { + "type": "string", + "outputSource": "step1/output" + } + }, + "steps": { + "step1": { + "run": { + "class": "CommandLineTool", + "baseCommand": ["cat"], + "inputs": { + "text": "string" + }, + "outputs": { + "output": { + "type": "stdout" + } + } + }, + "in": { + "text": "text" + }, + "out": ["output"] + } + } + } + ] + } + + headers = {"Content-Type": ContentType.APP_CWL_JSON} + resp = self.app.post_json("/processes", params=cwl, headers=headers, expect_errors=True) + + # Should fail because multiple Workflows are not allowed (HTTP 501 Not Implemented) + assert resp.status_code == 501, ( + f"Expected 501 Not Implemented for multiple Workflows in $graph, got {resp.status_code}: {resp.text[:200]}" + ) + # Verify error message mentions the multiple workflow issue + error_text = json.dumps(resp.json).lower() + assert any(word in error_text for word in ["workflow", "multiple", "one"]), ( + f"Error message should mention workflow constraint: {resp.json}" + ) + + def test_deploy_process_CWL_multipart_related_invalid_root_class(self): + """ + Test that multipart/related with 'start' parameter pointing to non-Workflow is rejected. + + Per RFC 5621 and multipart/related requirements, the root document (identified by 'start') + must be a Workflow when deploying CWL processes. + """ + test_id = "test_multipart_related_invalid_root" + + # Create a CommandLineTool (not a Workflow) + tool_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": f"{test_id}-tool", + "inputs": {}, + "outputs": { + "output": { + "type": "File", + "outputBinding": {"glob": "output.txt"} + } + }, + "requirements": { + "DockerRequirement": {"dockerPull": "alpine:latest"} + }, + "baseCommand": ["echo", "hello"], + "stdout": "output.txt" + }) + + workflow_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "Workflow", + "id": f"{test_id}-workflow", + "inputs": {"message": "string"}, + "outputs": { + "result": {"type": "File", "outputSource": "step1/output"} + }, + "steps": { + "step1": { + "run": f"{test_id}-tool", + "in": {"message": "message"}, + "out": ["output"] + } + } + }) + + # Create multipart/related with start= pointing to the tool (invalid) + boundary = "----WebKitFormBoundary7MA4YWxkTrZu0gW" + multipart_body = ( + f"------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + f"Content-Type: application/cwl+json\r\n" + f"Content-ID: \r\n" + f"\r\n" + f"{tool_cwl}\r\n" + f"------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + f"Content-Type: application/cwl+json\r\n" + f"Content-ID: \r\n" + f"\r\n" + f"{workflow_cwl}\r\n" + f"------WebKitFormBoundary7MA4YWxkTrZu0gW--\r\n" + ).encode('utf-8') + + # start= points to the tool, which is NOT a Workflow + content_type_header = f"multipart/related; boundary={boundary}; start=\"\"" + + resp = mocked_sub_requests( + self.app, "post", "/processes", + data=multipart_body, + headers={"Content-Type": content_type_header}, + only_local=True, + expect_errors=True + ) + + # Should fail with 400 because root must be a Workflow + assert resp.status_code == 400, ( + f"Expected 400 Bad Request for non-Workflow root, got {resp.status_code}. " + f"Body: {resp.text[:500] if hasattr(resp, 'text') else resp.body[:500]}" + ) + error_json = resp.json + error_text = json.dumps(error_json).lower() + assert any(word in error_text for word in ["workflow", "root", "class"]), ( + f"Error message should mention workflow/root/class constraint: {error_json}" + ) + + def test_deploy_process_CWL_multipart_related_valid_root_workflow(self): + """ + Test that multipart/related with 'start' parameter pointing to a Workflow succeeds. + + This validates that when the root document is correctly identified as a Workflow, + the deployment proceeds successfully. + """ + test_id = self.fully_qualified_test_name() + + # Create a CommandLineTool + tool_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "id": f"{test_id}-tool", + "inputs": {"message": "string"}, + "outputs": { + "output": { + "type": "File", + "outputBinding": {"glob": "output.txt"} + } + }, + "requirements": { + "DockerRequirement": {"dockerPull": "alpine:latest"} + }, + "baseCommand": ["sh", "-c"], + "arguments": ["echo $(inputs.message) > output.txt"] + }) + + # Create a Workflow + workflow_cwl = json.dumps({ + "cwlVersion": "v1.2", + "class": "Workflow", + "id": f"{test_id}-workflow", + "inputs": {"message": "string"}, + "outputs": { + "result": {"type": "File", "outputSource": "step1/output"} + }, + "steps": { + "step1": { + "run": f"{test_id}-tool", + "in": {"message": "message"}, + "out": ["output"] + } + } + }) + + # Create multipart/related with start= pointing to the workflow (valid) + boundary = "----WebKitFormBoundary7MA4YWxkTrZu0gW" + multipart_body = ( + f"------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + f"Content-Type: application/cwl+json\r\n" + f"Content-ID: \r\n" + f"\r\n" + f"{tool_cwl}\r\n" + f"------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + f"Content-Type: application/cwl+json\r\n" + f"Content-ID: \r\n" + f"\r\n" + f"{workflow_cwl}\r\n" + f"------WebKitFormBoundary7MA4YWxkTrZu0gW--\r\n" + ).encode('utf-8') + + # start= points to the workflow (correct) + content_type_header = f"multipart/related; boundary={boundary}; start=\"\"" + + resp = mocked_sub_requests( + self.app, "post", "/processes", + data=multipart_body, + headers={"Content-Type": content_type_header}, + only_local=True + ) + + # Should succeed + assert resp.status_code == 201, ( + f"Expected 201, got {resp.status_code}. " + f"Body: {resp.text[:500] if hasattr(resp, 'text') else resp.body[:500]}" + ) + + result = resp.json + assert "processSummary" in result + assert result["deploymentDone"] is True + + # The main workflow should be deployed + main_id = result["processSummary"]["id"] + assert main_id == f"{test_id}-workflow" + + # Verify main workflow was deployed + desc = self.get_process_description(main_id, schema=ProcessSchema.OLD) + assert desc["process"]["id"] == main_id + pkg = self.get_application_package(main_id) + assert pkg["class"] == "Workflow" @staticmethod def get_cwl_docker_python_version(cwl_version="v1.0", process_id=None): diff --git a/weaver/cli.py b/weaver/cli.py index 9aa123df1..b957c2994 100644 --- a/weaver/cli.py +++ b/weaver/cli.py @@ -46,7 +46,7 @@ get_field, repr2json_input_values ) -from weaver.processes.utils import get_process_information +from weaver.processes.utils import create_multipart_deploy, get_process_information from weaver.processes.wps_package import get_process_definition from weaver.provenance import ProvenanceFormat, ProvenancePathType from weaver.sort import Sort, SortMethods @@ -702,13 +702,50 @@ def _resolve_deploy_package(body, cwl, headers): def _parse_deploy_package( self, body, # type: JSON - cwl, # type: Optional[Union[CWL, str]] + cwl, # type: Optional[Union[CWL, str, List[Union[CWL, str]]]] wps, # type: Optional[str] process_id, # type: Optional[str] headers, # type: HeadersType settings, # type: SettingsType ): # type: (...) -> OperationResult try: + # Handle multiple CWL files for multi-deployment + if isinstance(cwl, list) and len(cwl) > 1: + LOGGER.debug("Processing multi-CWL deployment with %d files", len(cwl)) + + # Load and parse each CWL file + cwl_packages = [] + for cwl_item in cwl: + if isinstance(cwl_item, str) and not cwl_item.startswith("{"): + # File reference or URL + cwl_data = load_file(cwl_item) + elif isinstance(cwl_item, str) and cwl_item.startswith("{") and cwl_item.endswith("}"): + # Literal JSON string + cwl_data = yaml.safe_load(cwl_item) + elif isinstance(cwl_item, dict): + # Already parsed CWL + cwl_data = cwl_item + else: + raise PackageRegistrationError(f"Invalid CWL item: {cwl_item}") + + if not isinstance(cwl_data, dict) or cwl_data.get("cwlVersion") is None: + raise PackageRegistrationError("Invalid CWL structure in multi-CWL deployment.") + + cwl_packages.append(cwl_data) + + multipart_content, content_type = create_multipart_deploy( + cwl_files=cwl_packages, + process_description=body if body else None + ) + + # Return the multipart content as body + # The body will be sent as raw bytes with the appropriate Content-Type + headers["Content-Type"] = content_type + return OperationResult(True, process_id or "multi-cwl", multipart_content, headers=headers) + + if isinstance(cwl, list) and len(cwl) == 1: + cwl = cwl[0] # Unwrap single-item list + if not body and cwl and isinstance(cwl, dict): p_id = process_id or cwl.get("id") cwl["id"] = p_id # override if provided by processID @@ -986,7 +1023,7 @@ def deploy( self, process_id=None, # type: Optional[str] body=None, # type: Optional[Union[JSON, str]] - cwl=None, # type: Optional[Union[CWL, str]] + cwl=None, # type: Optional[Union[CWL, str, List[Union[CWL, str]]]] wps=None, # type: Optional[str] token=None, # type: Optional[str] username=None, # type: Optional[str] @@ -1006,6 +1043,7 @@ def deploy( The referenced :term:`Application Package` must be one of: - :term:`CWL` body, local file or URL in :term:`JSON` or :term:`YAML` format + - List of :term:`CWL` bodies/files for multi-process deployment (workflow with tools or multiple tools) - :term:`WPS` process URL with :term:`XML` response - :term:`WPS-REST` process URL with :term:`JSON` response - :term:`OGC API - Processes` process URL with :term:`JSON` response @@ -1035,6 +1073,9 @@ def deploy( inserted into the body (i.e.: ``executionUnit``). If provided without additional :paramref:`body`, it instead be used as-is for the request body with the corresponding :term:`CWL` :term:`Media-Type`. If an embedded ``executionUnit`` containing the :term:`CWL` is desired, provide ``body={}`` explicitly. + Can also be a list of :term:`CWL` definitions (dicts, strings, or file paths) for multi-process + deployment, which will be combined into a multipart request. When deploying multiple CWL files, + the workflow (if present) should be last in the list, and all tools should come before it. :param wps: URL to an existing :term:`WPS` process (WPS-1/2 or WPS-REST/OGC-API) to represent as equivalent :term:`OGC API - Processes` representation. Note that it is up to the server to perform the @@ -1072,17 +1113,30 @@ def deploy( return result p_id = result.message data = result.body + # Update headers if provided by the package parsing (e.g., for multipart content) + if result.headers: + req_headers.update(result.headers) if undeploy: LOGGER.debug("Performing requested undeploy of process: [%s]", p_id) result = self.undeploy(process_id=p_id, url=base) if result.code not in [200, 204, 404]: return OperationResult(False, "Failed requested undeployment prior deployment.", body=result.body, text=result.text, code=result.code, headers=result.headers) - LOGGER.debug("Deployment Body:\n%s", OutputFormat.convert(data, OutputFormat.JSON_STR)) + + # Handle multipart content (bytes) vs JSON content path = f"{base}/processes" - resp = self._request("POST", path, json=data, - headers=req_headers, x_headers=headers, settings=self._settings, auth=auth, - request_timeout=request_timeout, request_retries=request_retries) + if isinstance(data, bytes): + # For multipart content, send as raw data + LOGGER.debug("Deployment with multipart content (%d bytes)", len(data)) + resp = self._request("POST", path, data=data, + headers=req_headers, x_headers=headers, settings=self._settings, auth=auth, + request_timeout=request_timeout, request_retries=request_retries) + else: + # For JSON content, use json parameter + LOGGER.debug("Deployment Body:\n%s", OutputFormat.convert(data, OutputFormat.JSON_STR)) + resp = self._request("POST", path, json=data, + headers=req_headers, x_headers=headers, settings=self._settings, auth=auth, + request_timeout=request_timeout, request_retries=request_retries) return self._parse_result(resp, with_links=with_links, nested_links="processSummary", with_headers=with_headers, output_format=output_format) @@ -3574,10 +3628,13 @@ def make_parser(): ) op_deploy_app_pkg = op_deploy.add_mutually_exclusive_group() op_deploy_app_pkg.add_argument( - "--cwl", dest="cwl", + "--cwl", dest="cwl", nargs="+", metavar="CWL_FILE", help="Application Package of the process defined using Common Workflow Language (CWL) as JSON or YAML " "format when provided by file reference. File reference can be a local file or URL location. " "Can also be provided as literal string contents formatted as JSON. " + "Multiple CWL files can be provided (space-separated) to create a multi-CWL deployment with " + "nested workflows and tools. When multiple files are provided, they will be packaged as multipart " + "content for deployment. " "Provided contents will be inserted into an automatically generated request deploy body if none was " "specified with ``--body`` option (note: ``--process`` must be specified instead in that case). " "Otherwise, it will override the appropriate execution unit section within the provided deploy body." diff --git a/weaver/processes/utils.py b/weaver/processes/utils.py index dae7dbfa6..7fc51173f 100644 --- a/weaver/processes/utils.py +++ b/weaver/processes/utils.py @@ -1,9 +1,14 @@ import copy +import json import logging import os import pathlib +import uuid as uuid_lib import warnings from copy import deepcopy +from email import message_from_bytes +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText from typing import TYPE_CHECKING from urllib.parse import parse_qs, urlparse @@ -19,6 +24,7 @@ HTTPForbidden, HTTPInternalServerError, HTTPNotFound, + HTTPNotImplemented, HTTPOk, HTTPUnprocessableEntity, HTTPUnsupportedMediaType @@ -76,7 +82,7 @@ LOGGER = logging.getLogger(__name__) if TYPE_CHECKING: - from typing import Any, List, Optional, Tuple, Union + from typing import Dict, List, Optional, Tuple, Union from docker.client import DockerClient @@ -317,15 +323,381 @@ def _validate_deploy_process_info(process_info, reference, package, settings, he raise HTTPUnprocessableEntity(detail=msg) -# FIXME: supported nested process and $graph multi-deployment (https://github.com/crim-ca/weaver/issues/56) +# Multi-deployment support for nested processes and $graph (https://github.com/crim-ca/weaver/issues/56) # see also: https://www.commonwl.org/v1.2/CommandLineTool.html#Packed_documents def resolve_cwl_graph(package): - # type: (CWL) -> CWL - if "$graph" in package and isinstance(package["$graph"], list) and len(package["$graph"]) == 1: - # consider package as if provided in non-graph representation - # must preserve top level fields (e.g.: 'cwlVersion') and nested graph item - package.update(package.pop("$graph")[0]) - return package + # type: (CWL) -> Union[CWL, List[CWL]] + """ + Resolve CWL $graph into deployable packages. + + :returns: + - Single CWL dict if no $graph or $graph with 1 item (backward compatible) + - List of CWL dicts if $graph contains multiple items + """ + if "$graph" not in package: + return package + + graph_items = package.get("$graph", []) + if not isinstance(graph_items, list): + return package + + if len(graph_items) == 1: + # Single item: unpack as before (backward compatible) + cwl_base = {k: v for k, v in package.items() if k != "$graph"} + cwl_base.update(graph_items[0]) + return cwl_base + + # Multiple items: return list for multi-deployment + # Each item inherits top-level fields (e.g., cwlVersion) + cwl_base = {k: v for k, v in package.items() if k != "$graph"} + resolved_items = [] + for item in graph_items: + cwl_item = deepcopy(cwl_base) + cwl_item.update(item) + resolved_items.append(cwl_item) + + return resolved_items + + +def resolve_deployment_order(cwl_packages): + # type: (List[CWL]) -> Tuple[List[CWL], Optional[CWL]] + """ + Determine deployment order for multiple CWL packages. + + :param cwl_packages: List of CWL package definitions to order. + :returns: + Tuple of (dependencies, main_workflow) + - dependencies: List of CommandLineTool/ExpressionTool to deploy first + - main_workflow: The main Workflow (if any) to deploy last, or None + :raises HTTPNotImplemented: If multiple Workflow definitions are provided. + """ + workflows = [] + tools = [] + + for pkg in cwl_packages: + cwl_class = pkg.get("class", "") + if cwl_class == "Workflow": + workflows.append(pkg) + elif cwl_class in ["CommandLineTool", "ExpressionTool"]: + tools.append(pkg) + + if len(workflows) > 1: + raise HTTPNotImplemented(json={ + "title": "Multiple Workflow definitions in $graph.", + "description": "Only one top-level Workflow is supported per deployment.", + "cause": {"workflow_count": len(workflows)}, + "value": [wf.get("id") for wf in workflows] + }) + + main_workflow = workflows[0] if workflows else None + + # TODO: Implement topological sort based on workflow step dependencies + # For now, deploy tools in order provided + + return tools, main_workflow + + +def _extract_multipart_boundary(content_type): + # type: (str) -> str + """ + Extract boundary parameter from Content-Type header. + """ + if "boundary=" not in content_type: + raise HTTPBadRequest(json={ + "title": "Missing multipart boundary", + "description": "Content-Type must include boundary parameter for multipart content.", + "cause": {"Content-Type": content_type} + }) + boundary_part = content_type.split("boundary=")[1].split(";")[0].strip() + return boundary_part.strip('"') + + +def _get_multipart_content(content, request): + # type: (Union[str, bytes], Optional[AnyRequestType]) -> bytes + """ + Get raw multipart content as bytes. + """ + if request is not None and hasattr(request, 'body'): + return request.body + if isinstance(content, bytes): + return content + if isinstance(content, str): + return content.encode('utf-8') + raise HTTPBadRequest("Invalid multipart content format") + + +def _parse_multipart_part(part_content, part_content_type): + # type: (str, str) -> Optional[JSON] + """ + Parse content from a multipart part. + """ + if 'yaml' in part_content_type.lower(): + return yaml.safe_load(part_content) + return json.loads(part_content) + + +def create_multipart_deploy(cwl_files, process_description=None, boundary=None): + # type: (List[Union[str, CWL]], Optional[JSON], Optional[str]) -> Tuple[bytes, str] + """ + Create multipart/related deployment content from a list of CWL files. + + :param cwl_files: + List of CWL files. Each item can be: + - A file path (string) to a CWL file (will be loaded) + - A CWL dictionary (already parsed) + :param process_description: Optional process description metadata to include + :param boundary: Optional custom boundary string (auto-generated if not provided) + :returns: Tuple of (multipart content bytes, full Content-Type header with boundary) + """ + if not cwl_files: + raise ValueError("At least one CWL file must be provided") + + if not boundary: + boundary = f"----WeaverMultipartBoundary{uuid_lib.uuid4().hex}" + + msg = MIMEMultipart("related", boundary=boundary) + main_workflow_cid = None + workflow_count = 0 + + # Add CWL parts + for idx, cwl_item in enumerate(cwl_files): + if isinstance(cwl_item, str): + cwl_data = load_file(cwl_item) + else: + cwl_data = cwl_item + + cwl_class = cwl_data.get("class", "") + is_workflow = cwl_class == "Workflow" + + if is_workflow: + workflow_count += 1 + if workflow_count == 1: + # First workflow becomes the main one + main_workflow_cid = f"workflow-{idx}" + + # Create the CWL part + cwl_json = json.dumps(cwl_data, indent=2) + part = MIMEText(cwl_json, _subtype="json", _charset="utf-8") + # Replace the default Content-Type (text/json) with the CWL-specific one + part.replace_header("Content-Type", ContentType.APP_CWL_JSON) + + content_id = f"workflow-{idx}" if is_workflow else f"tool-{idx}" + part.add_header("Content-ID", f"<{content_id}>") + + cwl_id = cwl_data.get("id", f"cwl-{idx}") + part.add_header("Content-Location", cwl_id) + + msg.attach(part) + + # Add process description if provided + if process_description: + desc_json = json.dumps(process_description, indent=2) + desc_part = MIMEText(desc_json, _subtype="json", _charset="utf-8") + # Replace the default Content-Type (text/json) with application/json + desc_part.replace_header("Content-Type", ContentType.APP_JSON) + desc_part.add_header("Content-ID", "") + msg.attach(desc_part) + + multipart_content = msg.as_bytes() + + # Build the Content-Type header with start parameter if we have a main workflow + content_type = f"multipart/related; boundary={boundary}" + if main_workflow_cid: + content_type += f"; start={main_workflow_cid}" + + return multipart_content, content_type + + +def _classify_multipart_part(part_data, cwl_packages, parts_order, parts_by_cid, content_id, process_description): + # type: (JSON, List[CWL], List, Dict, str, Optional[JSON]) -> Optional[JSON] + """ + Classify parsed multipart part as CWL package or process description. + + Returns updated process_description if applicable. + """ + if not isinstance(part_data, dict): + return process_description + + if 'class' in part_data and part_data['class'] in ['CommandLineTool', 'Workflow', 'ExpressionTool']: + cwl_packages.append(part_data) + parts_order.append((part_data, content_id)) + if content_id: + parts_by_cid[content_id] = part_data + return process_description + + if 'cwlVersion' in part_data and '$graph' in part_data: + cwl_packages.append(part_data) + parts_order.append((part_data, content_id)) + if content_id: + parts_by_cid[content_id] = part_data + return process_description + + if 'processDescription' in part_data or 'process' in part_data: + if process_description is not None: + LOGGER.warning("Multiple process descriptions found in multipart, using first one") + return process_description + return part_data + + if any(k in part_data for k in ['inputs', 'outputs', 'baseCommand', 'steps']): + cwl_packages.append(part_data) + parts_order.append((part_data, content_id)) + if content_id: + parts_by_cid[content_id] = part_data + return process_description + + # Unknown part type - use as process description if we don't have one yet + return process_description or part_data + + +def parse_multipart_deploy(content, content_type, request=None): + # type: (Union[str, bytes], str, Optional[AnyRequestType]) -> Tuple[List[CWL], Optional[JSON]] + """ + Parse multipart/mixed or multipart/related deployment content. + + Extracts CWL packages and optional process description from multipart request. + + :param content: Raw multipart content (string or bytes) + :param content_type: Content-Type header value (must include boundary parameter) + :param request: Optional request object for extracting body + :returns: Tuple of (list of CWL packages, optional process description metadata) + :raises HTTPBadRequest: If multipart content is malformed or invalid + """ + boundary = _extract_multipart_boundary(content_type) + raw_content = _get_multipart_content(content, request) + + # Parse using email parser + content_type_with_boundary = f"{content_type.split(';')[0]}; boundary={boundary}" + msg_bytes = b"Content-Type: " + content_type_with_boundary.encode('utf-8') + b"\r\n\r\n" + raw_content + + try: + msg = message_from_bytes(msg_bytes) + except Exception as exc: # pragma: no cover + # Defensive: standard library should handle valid multipart data + raise HTTPBadRequest(json={ + "title": "Failed to parse multipart content", + "description": str(exc), + "cause": {"error": exc.__class__.__name__} + }) + + if not msg.is_multipart(): # pragma: no cover + # Defensive: boundary validation should catch this earlier + raise HTTPBadRequest("Content is not multipart format") + + # Extract root workflow reference for multipart/related + root_workflow_cid = None + if "multipart/related" in content_type.lower() and "start=" in content_type: + start_part = content_type.split("start=")[1].split(";")[0].strip().strip('"').strip('<>') + root_workflow_cid = start_part + + # Process all parts + cwl_packages = [] + process_description = None + parts_by_cid = {} + parts_order = [] + + for part in msg.get_payload(): + if not hasattr(part, 'get_content_type'): # pragma: no cover + # Defensive: email library should return proper Message objects + continue + + part_content_type = part.get_content_type() + content_location = part.get('Content-Location', '').strip() + part_content = part.get_payload(decode=True) + + if isinstance(part_content, bytes): + # Try to decode with the charset specified in the Content-Type, default to utf-8 + charset = part.get_content_charset() or 'utf-8' + try: + part_content = part_content.decode(charset) + except (UnicodeDecodeError, LookupError): # pragma: no cover + # Fallback to utf-8 with error handling if charset is invalid/corrupted + part_content = part_content.decode('utf-8', errors='replace') + + # Check if content should be fetched from Content-Location + # Only fetch if part body is empty AND Content-Location looks like a URL + if content_location and (not part_content or not part_content.strip()): + # Content-Location must be a URL (not just an identifier) + if any(content_location.startswith(scheme) for scheme in ["http://", "https://", "file://", "s3://"]): + LOGGER.debug("Fetching CWL from Content-Location: %s", content_location) + try: + # Content-Location can be: + # 1. A static CWL file URL (http/https/s3/file) + # 2. A Weaver process package endpoint (/processes/{pid}/package) + # 3. A WPS process endpoint (/wps?request=DescribeProcess&identifier=...) + # 4. An OGC API Processes endpoint + + _, ext = os.path.splitext(content_location.split('?')[0]) # strip query params + if ext.replace('.', '') in PACKAGE_EXTENSIONS: + part_content = load_file(content_location, text=True) + else: + # Could be an API endpoint (Weaver, WPS, OGC API) + # Use the process definition resolver that handles all reference types + from weaver.processes.wps_package import _generate_process_with_cwl_from_reference + cwl_pkg, _ = _generate_process_with_cwl_from_reference(content_location) + part_content = json.dumps(cwl_pkg) if isinstance(cwl_pkg, dict) else str(cwl_pkg) + except Exception as exc: + LOGGER.error("Failed to fetch content from Content-Location %s: %s", content_location, exc) + raise HTTPBadRequest(json={ + "title": "Failed to fetch Content-Location", + "description": f"Could not retrieve CWL from Content-Location: {content_location}", + "cause": {"error": str(exc), "location": content_location} + }) + else: + LOGGER.debug( + "Content-Location [%s] is an identifier, not a URL. Using part body content.", + content_location + ) + + content_id = part.get('Content-ID', '').strip('<>') + + # Parse and classify content + if (part_content_type in ContentType.ANY_CWL or + part_content_type in [ContentType.APP_JSON, ContentType.APP_YAML]): + try: + part_data = _parse_multipart_part(part_content, part_content_type) + process_description = _classify_multipart_part( + part_data, cwl_packages, parts_order, parts_by_cid, content_id, process_description + ) + except Exception as exc: # pragma: no cover + # Defensive: skip malformed parts that fail JSON/YAML parsing + LOGGER.warning("Failed to parse part with Content-Type %s: %s", part_content_type, exc) + continue + + if not cwl_packages: + raise HTTPBadRequest(json={ + "title": "No CWL packages found in multipart content", + "description": "Multipart request must contain at least one CWL package part.", + "cause": {"parts_found": len(list(msg.get_payload()))} + }) + + # Reorder if root workflow specified and validate it + if root_workflow_cid and root_workflow_cid in parts_by_cid: + root_pkg = parts_by_cid[root_workflow_cid] + # Validate that the root is actually a Workflow (per RFC 5621 and multipart/related requirements) + root_class = root_pkg.get("class", "") + if root_class != "Workflow": + raise HTTPBadRequest(json={ + "title": "Invalid root workflow reference", + "description": ( + f"The 'start' parameter references a CWL with class '{root_class}', " + "but only 'Workflow' is permitted as root document in multipart/related." + ), + "cause": {"Content-ID": root_workflow_cid, "class": root_class} + }) + cwl_packages = [pkg for pkg in cwl_packages if pkg is not root_pkg] + cwl_packages.append(root_pkg) + elif not root_workflow_cid and cwl_packages: + # No explicit start parameter: validate first element is a Workflow (RFC 5621 ยง7 default) + first_pkg = cwl_packages[0] + first_class = first_pkg.get("class", "") + if first_class and first_class != "Workflow": + LOGGER.warning( + "No 'start' parameter provided in multipart/related. First element has class '%s' " + "but 'Workflow' is recommended for root document. Proceeding with deployment.", + first_class + ) + + return cwl_packages, process_description def parse_process_deploy_content( @@ -346,6 +718,34 @@ def parse_process_deploy_content( "status": HTTPInternalServerError.code, "cause": "Request content and content argument are undefined.", }) + + # Get full Content-Type header (including parameters like boundary) from request if available + if request is not None: + # Use get_header to get the full header value with parameters, not request.content_type which strips them + request_headers = getattr(request, 'headers', {}) + full_content_type = get_header("Content-Type", request_headers) + if full_content_type: + content_type = full_content_type + + if content_type and any(mt in content_type.lower() for mt in ["multipart/mixed", "multipart/related"]): + LOGGER.info("Detected multipart deployment request") + cwl_packages, _ = parse_multipart_deploy( + content=content if content is not None else request.body, + content_type=content_type, + request=request + ) + + # Return the CWL packages as a list (will be handled by multi-deployment logic) + # If there's process description metadata, we could merge it, but for now we focus on CWL + # The multi-deployment logic will handle the CWL packages + if len(cwl_packages) == 1: + # Single CWL package - return as-is + # This avoids double-wrapping if the package already contains a $graph + return cwl_packages[0] + else: + # Multiple packages - return as list to be wrapped in $graph + return cwl_packages + try: if request is not None: content = request.text @@ -409,17 +809,42 @@ def deploy_process_from_payload(payload, container, overwrite=False): # pylint: :raises HTTPException: for any invalid process deployment step. """ headers = getattr(container, "headers", {}) # container is any request (as when called from API Deploy request) - c_type = ContentType.get(get_header("Content-Type", headers), default=ContentType.APP_OGC_PKG_JSON) + c_type_full = get_header("Content-Type", headers) or ContentType.APP_OGC_PKG_JSON + # Extract base media type (without parameters) for content type checks + c_type = c_type_full.split(';')[0].strip() if isinstance(c_type_full, str) else c_type_full # use deepcopy of to remove any circular dependencies before writing to mongodb or any updates to the payload + # For multipart requests, we need to pass the request object to access the raw body payload = parse_process_deploy_content( - request=None, + request=container if hasattr(container, 'body') else None, content=payload, - content_type=c_type, + content_type=c_type_full, # Pass full Content-Type with parameters (e.g., boundary) content_type_schema=sd.DeployContentType, ) + + # For multipart/list payload, wrap it so it can flow through normal deployment logic + # This will be handled by the existing multi-CWL deployment code later + if isinstance(payload, list): + LOGGER.info("Detected multi-CWL deployment (multipart or $graph) with %d packages", len(payload)) + # Wrap list in minimal structure to pass through validation + payload = { + "cwlVersion": "v1.2", + "$graph": payload + } + c_type = ContentType.APP_CWL_JSON + payload_copy = deepcopy(payload) - payload = _check_deploy(payload) + + # Skip deployment schema validation for pure CWL payloads (CWL has its own validation) + is_cwl_only = ( + c_type in list(ContentType.ANY_CWL) + [ContentType.APP_JSON] and + "cwlVersion" in payload and + "processDescription" not in payload and + "executionUnit" not in payload + ) + if not is_cwl_only: + payload = _check_deploy(payload) + payload.pop("$schema", None) payload.pop("$id", None) @@ -457,8 +882,23 @@ def deploy_process_from_payload(payload, container, overwrite=False): # pylint: reference = content.get("href") found = isinstance(reference, str) elif c_type in (list(ContentType.ANY_CWL) + [ContentType.APP_JSON]) and "cwlVersion" in payload: + # For direct CWL deployment without $graph, validate that id is present + if "$graph" not in payload and "id" not in payload: + raise HTTPBadRequest(json={ + "type": "InvalidParameterValue", + "title": "Failed schema validation.", + "status": HTTPBadRequest.code, + "error": colander.Invalid.__name__, + "cause": {"DeployCWL.id": "Missing required field."}, + "value": repr_json(payload, force_string=False), + }) process_info = {"version": payload.pop("version", None)} + # Keep original payload with $graph for workflow validation + original_payload = deepcopy(payload) if "$graph" in payload else None package = resolve_cwl_graph(payload) + # Pass original payload alongside resolved packages for workflow reference resolution + if isinstance(package, list) and original_payload: + package = (package, original_payload) found = True else: # ogc-apppkg type, but no explicit check since used by default (backward compat) if deployment_profile_name: # optional hint @@ -470,20 +910,38 @@ def deploy_process_from_payload(payload, container, overwrite=False): # pylint: if "unit" not in execution_units and "href" not in execution_units: execution_units = {"unit": execution_units} execution_units = [execution_units] - if not isinstance(execution_units, list) or not len(execution_units) == 1: + if not isinstance(execution_units, list) or len(execution_units) < 1: raise HTTPUnprocessableEntity("Invalid parameter 'executionUnit'.") + + # Support multiple execution units for multi-deployment + execution_units_to_deploy = [] for execution_unit in execution_units: if not isinstance(execution_unit, dict): raise HTTPUnprocessableEntity("Invalid parameter 'executionUnit'.") - package = execution_unit.get("unit") - reference = execution_unit.get("href") - # stop on first package/reference found, simultaneous usage will raise during package retrieval - if package: - found = isinstance(package, dict) and package - elif reference: - found = isinstance(reference, str) - if found: - break + unit_package = execution_unit.get("unit") + unit_reference = execution_unit.get("href") + # Collect all valid packages/references + if unit_package: + execution_units_to_deploy.append({"package": unit_package, "reference": None}) + elif unit_reference: + execution_units_to_deploy.append({"package": None, "reference": unit_reference}) + + if not execution_units_to_deploy: + raise HTTPBadRequest("No valid execution units found.") + + # For now, use first execution unit (backward compatible) + # Multi-execution unit deployment can be expanded later + package = execution_units_to_deploy[0]["package"] + reference = execution_units_to_deploy[0]["reference"] + found = package or reference + + # Log if multiple units provided (for future expansion) + if len(execution_units_to_deploy) > 1: + LOGGER.info( + "Multiple execution units provided (%d), currently only first unit is deployed. " + "Full multi-unit deployment support coming soon.", + len(execution_units_to_deploy) + ) if not found: params = [ "process (href)", @@ -504,8 +962,20 @@ def deploy_process_from_payload(payload, container, overwrite=False): # pylint: f"Deployment of {ProcessType.BUILTIN} process is not allowed." ) - # update and validate process information using WPS process offering, CWL/WPS reference or CWL package definition settings = get_settings(container) + + # Handle multi-CWL deployment from $graph + original_graph_package = None + if isinstance(package, tuple): + # package is (list_of_cwls, original_payload_with_graph) + package, original_graph_package = package + if isinstance(package, list): + return _deploy_process_multi_cwl( + package, process_info, process_desc, payload_copy, container, overwrite, settings, headers, + original_graph_package + ) + + # update and validate process information using WPS process offering, CWL/WPS reference or CWL package definition process_info = _validate_deploy_process_info(process_info, reference, package, settings, headers) restapi_url = get_wps_restapi_base_url(settings) @@ -604,6 +1074,179 @@ def _save_deploy_process(process, override, container): return process_summary +def _deploy_process_multi_cwl( + cwl_packages, # type: List[CWL] + process_info, # type: JSON + process_desc, # type: JSON + payload_copy, # type: JSON + container, # type: Union[AnySettingsContainer, AnyRequestType] + overwrite, # type: Union[bool, Process] + settings, # type: SettingsType + headers, # type: AnyHeadersContainer + original_graph_package, # type: Optional[CWL] +): # type: (...) -> HTTPException + """ + Deploy multiple CWL packages from a $graph definition. + + :param cwl_packages: List of resolved CWL package definitions. + :param process_info: Process information dict. + :param process_desc: Process description from payload. + :param payload_copy: Original payload copy. + :param container: Application container. + :param overwrite: Whether to overwrite existing processes. + :param settings: Application settings. + :param headers: Request headers. + :returns: HTTP response with deployment result. + """ + LOGGER.info("Deploying multi-CWL package with %d definitions", len(cwl_packages)) + + # Resolve deployment order (tools first, workflow last) + tools, main_workflow = resolve_deployment_order(cwl_packages) + + restapi_url = get_wps_restapi_base_url(settings) + deployed_processes = [] + + # Determine which process will be the main one + # If there's a workflow, that's the main; otherwise, use first tool as main + main_process_pkg = main_workflow if main_workflow else (tools[0] if tools else None) + if not main_process_pkg: + raise HTTPBadRequest("No valid CWL definitions found in $graph") + + # Identify main process ID to avoid deploying it as a child + main_process_id = main_process_pkg.get("id") + + # Deploy CommandLineTools first (but skip the one that will be the main process) + for tool_pkg in tools: + tool_id = tool_pkg.get("id") + + # Skip if this tool will be deployed as the main process + if tool_id == main_process_id: + LOGGER.info("Skipping child deployment of %s (will be deployed as main process)", tool_id) + continue + + tool_info = _validate_deploy_process_info(deepcopy(process_info), None, tool_pkg, settings, headers) + tool_id = tool_info["identifier"] + + LOGGER.info("Deploying CWL tool: %s", tool_id) + + # Set up tool-specific URLs + tool_desc_url = "/".join([restapi_url, "processes", tool_id]) + tool_exec_url = "/".join([tool_desc_url, "jobs"]) + + tool_info["processEndpointWPS1"] = process_desc.get("processEndpointWPS1") + tool_info["executeEndpoint"] = tool_exec_url + tool_info["payload"] = payload_copy + tool_info["jobControlOptions"] = process_desc.get("jobControlOptions", []) + tool_info["outputTransmission"] = process_desc.get("outputTransmission", []) + tool_info["processDescriptionURL"] = tool_desc_url + + # Remove schema to avoid later deserialization error + tool_info.pop("$schema", None) + tool_info.pop("$id", None) + + try: + tool_process = Process(tool_info) + # Deploy tools without overwrite to avoid conflicts + tool_summary = _save_deploy_process(tool_process, False, container) + deployed_processes.append(tool_summary) + LOGGER.info("Successfully deployed CWL tool: %s", tool_id) + except HTTPConflict: + # Tool already exists, skip but continue + LOGGER.info("CWL tool already exists: %s, skipping deployment", tool_id) + except Exception as exc: + LOGGER.error("Failed to deploy CWL tool %s: %s", tool_id, exc) + raise + + # Deploy main process (workflow or first tool if no workflow) + # For workflows, keep them in original $graph context for tool reference resolution + if main_workflow and original_graph_package: + # Create a minimal $graph with just the workflow (for validation) + # The workflow can reference tools by ID (without #) as they're already deployed + workflow_standalone = deepcopy(main_process_pkg) + # Replace #tool-id references with tool-id (deployed process lookup) + if "steps" in workflow_standalone: + for step_data in workflow_standalone.get("steps", {}).values(): + if "run" in step_data and isinstance(step_data["run"], str): + # Remove # prefix for deployed process lookup + if step_data["run"].startswith("#"): + step_data["run"] = step_data["run"][1:] + validate_pkg = workflow_standalone + else: + validate_pkg = main_process_pkg + + workflow_info = _validate_deploy_process_info(deepcopy(process_info), None, validate_pkg, settings, headers) + workflow_id = workflow_info["identifier"] + + if main_workflow: + LOGGER.info("Deploying main CWL workflow: %s", workflow_id) + else: + LOGGER.info("No workflow found in $graph, deploying first tool as main process: %s", workflow_id) + + # Set up workflow-specific URLs + description_url = "/".join([restapi_url, "processes", workflow_id]) + execute_endpoint = "/".join([description_url, "jobs"]) + + workflow_info["processEndpointWPS1"] = process_desc.get("processEndpointWPS1") + workflow_info["executeEndpoint"] = execute_endpoint + workflow_info["payload"] = payload_copy + workflow_info["jobControlOptions"] = process_desc.get("jobControlOptions", []) + workflow_info["outputTransmission"] = process_desc.get("outputTransmission", []) + workflow_info["processDescriptionURL"] = description_url + + if "links" in workflow_info: + workflow_info["additional_links"] = workflow_info.pop("links") + + workflow_info.pop("$schema", None) + workflow_info.pop("$id", None) + + try: + workflow_process = Process(workflow_info) + if isinstance(overwrite, Process): + process_summary = _update_deploy_process_version(workflow_process, overwrite, VersionLevel.MAJOR, container) + else: + process_summary = _save_deploy_process(workflow_process, overwrite, container) + except ValueError as exc: + LOGGER.error("Failed schema validation of deployed workflow summary:\n%s", exc) + raise HTTPBadRequest(detail=str(exc)) + except HTTPException: + raise + + links = workflow_process.links(container) + loc_url = next(link["href"] for link in links if link["rel"] == "self") + process_summary["links"] = links + + data = { + "description": sd.OkPostProcessesResponse.description, + "processSummary": process_summary, + "deploymentDone": True, + } + + # Include information about deployed child processes + if deployed_processes: + data["deployedProcesses"] = [proc["id"] for proc in deployed_processes] + + headers_out = { + "Content-Type": ContentType.APP_JSON, + "Content-Location": loc_url, + "Location": loc_url, + } + + if overwrite and ( + isinstance(overwrite, bool) or ( + isinstance(overwrite, Process) and + overwrite.version == process_summary.get("version") + ) + ): + http_cls = HTTPOk + else: + http_cls = HTTPCreated + + LOGGER.info("Successfully deployed multi-CWL package: main process %s with %d child processes", + workflow_id, len(deployed_processes)) + + return http_cls(json=data, headers=headers_out) + + def _update_deploy_process_version(process, process_overwrite, update_level, container=None): # type: (Process, Process, VersionLevel, Optional[AnySettingsContainer]) -> JSON """ diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index f34a7970d..04ba0a065 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -7263,7 +7263,12 @@ class ExecutionUnit(OneOfKeywordSchema): class ExecutionUnitList(ExtendedSequenceSchema): item = ExecutionUnit(name="ExecutionUnit") - validator = Length(min=1, max=1) + validator = Length(min=1) + description = ( + "List of execution units to deploy. " + "When multiple units are provided, they should represent workflow steps and/or the main workflow. " + "Deployment order will be determined automatically based on dependencies." + ) class ProcessDeploymentWithContext(ProcessDeployment): @@ -7377,19 +7382,16 @@ class CWLGraphList(ExtendedSequenceSchema): cwl = CWLGraphItem() -# FIXME: supported nested and $graph multi-deployment (https://github.com/crim-ca/weaver/issues/56) +# Multi-deployment support for nested processes and $graph (https://github.com/crim-ca/weaver/issues/56) class CWLGraphBase(ExtendedMappingSchema): graph = CWLGraphList( name="$graph", description=( - "Graph definition that defines *exactly one* CWL Application Package represented as list. " - "Multiple definitions simultaneously deployed is NOT supported currently." - # "Graph definition that combines one or many CWL Application Packages within a single payload. " - # "If a single application is given (list of one item), it will be deployed as normal CWL by itself. " - # "If multiple applications are defined, the first MUST be the top-most Workflow process. " - # "Deployment of other items will be performed, and the full deployment will be persisted only if all are " - # "valid. The resulting Workflow will be registered as a package by itself (i.e: not as a graph)." + "Graph definition that combines one or many CWL Application Packages within a single payload. " + "If a single application is given (list of one item), it will be deployed as a single process. " + "If multiple applications are defined, the first SHOULD be a Workflow that references other items. " + "Child processes (CommandLineTool) will be deployed first, then the parent Workflow." ), - validator=Length(min=1, max=1) + validator=Length(min=1) )