Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 65 additions & 18 deletions cwltool/factory.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from __future__ import absolute_import

# move to a regular typing import when Python 3.3-3.6 is no longer supported
import functools
import os
import sys
from typing import Callable as tCallable # pylint: disable=unused-import
from typing import Any, Dict, Tuple, Union
Comment thread
mr-c marked this conversation as resolved.

from typing_extensions import Text # pylint: disable=unused-import
# move to a regular typing import when Python 3.3-3.6 is no longer supported

from . import load_tool
from .context import LoadingContext, RuntimeContext
from .argparser import arg_parser
from .context import LoadingContext, RuntimeContext, getdefault
from .executors import SingleJobExecutor
from .process import Process
Comment thread
mr-c marked this conversation as resolved.
from .main import find_default_container
from .resolver import tool_resolver
from .secrets import SecretStore
from .utils import DEFAULT_TMP_PREFIX


class WorkflowStatus(Exception):
Expand All @@ -28,34 +35,74 @@ def __init__(self, t, factory): # type: (Process, Factory) -> None

def __call__(self, **kwargs):
# type: (**Any) -> Union[Text, Dict[Text, Text]]
runtime_context = self.factory.runtime_context.copy()
runtime_context.basedir = os.getcwd()
out, status = self.factory.executor(self.t, kwargs, runtime_context)
out, status = self.factory.executor(
self.t, kwargs, self.factory.runtimeContext)
if status != "success":
raise WorkflowStatus(out, status)
else:
return out


class Factory(object):
def __init__(self,
executor=None, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
loading_context=None, # type: LoadingContext
runtime_context=None # type: RuntimeContext
): # type: (...) -> None
argsl=None, # type: List[str]
args=None, # type: argparse.Namespace
executor=None, # type: Callable[..., Tuple[Dict[Text, Any], Text]]
Comment thread
mr-c marked this conversation as resolved.
loadingContext=None, # type: LoadingContext
Comment thread
mr-c marked this conversation as resolved.
runtimeContext=None # type: RuntimeContext
): # type: (...) -> None
if argsl is not None:
args = arg_parser().parse_args(argsl)
if executor is None:
executor = SingleJobExecutor()
self.executor = executor
self.loading_context = loading_context
if loading_context is None:
self.loading_context = LoadingContext()
if runtime_context is None:
self.runtime_context = RuntimeContext()
self.executor = SingleJobExecutor()
else:
self.executor = executor
if loadingContext is None:
self.loadingContext = LoadingContext(vars(args))
Comment thread
mr-c marked this conversation as resolved.
self._fix_loadingContext()
else:
self.runtime_context = runtime_context
self.loadingContext = loadingContext
if runtimeContext is None:
self.runtimeContext = RuntimeContext(vars(args))
self._fix_runtimeContext()
else:
self.runtimeContext = runtimeContext

def make(self, cwl):
"""Instantiate a CWL object from a CWl document."""
load = load_tool.load_tool(cwl, self.loading_context)
load = load_tool.load_tool(cwl, self.loadingContext)
if isinstance(load, int):
raise Exception("Error loading tool")
return Callable(load, self)

def _fix_loadingContext(self):
Comment thread
mr-c marked this conversation as resolved.
self.loadingContext.resolver = getdefault(
Comment thread
mr-c marked this conversation as resolved.
self.loadingContext.resolver, tool_resolver)

def _fix_runtimeContext(self):
Comment thread
mr-c marked this conversation as resolved.
self.runtimeContext.basedir = os.getcwd()
self.runtimeContext.find_default_container = functools.partial(
find_default_container,
default_container=None,
use_biocontainers=None)

if sys.platform == "darwin":
default_mac_path = "/private/tmp/docker_tmp"
if self.runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX:
self.runtimeContext.tmp_outdir_prefix = default_mac_path

for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"):
if getattr(self.runtimeContext, dirprefix) and getattr(self.runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX:
sl = "/" if getattr(self.runtimeContext, dirprefix).endswith("/") or dirprefix == "cachedir" \
else ""
setattr(self.runtimeContext, dirprefix,
os.path.abspath(getattr(self.runtimeContext, dirprefix)) + sl)
if not os.path.exists(os.path.dirname(getattr(self.runtimeContext, dirprefix))):
try:
os.makedirs(os.path.dirname(
getattr(self.runtimeContext, dirprefix)))
except Exception as e:
print("Failed to create directory: %s", e)

self.runtimeContext.secret_store = getdefault(
self.runtimeContext.secret_store, SecretStore())