Node#

class nnodes.node.Node(cwd: str, data: dict, parent: nnodes.node.Node | None)#

A directory with a task.

async _exec_children()#

Execute self._children.

async _exec_task()#

Execute self.task.

add(task: Optional[Union[Callable, List[str], Tuple[str, ...], str]] = None, /, cwd: Optional[str] = None, name: Optional[str] = None, *, args: Optional[Union[list, tuple]] = None, concurrent: Optional[bool] = None, prober: Optional[Callable[[...], float | str | None]] = None, **data) N#

Add a child node or a child task.

add_mpi(cmd: Union[Callable, List[str], Tuple[str, ...], str], /, nprocs: Union[int, Callable[[Directory], int]] = 1, cpus_per_proc: int = 1, gpus_per_proc: int = 0, mps: Optional[int] = None, *, name: Optional[str] = None, fname: Optional[str] = None, args: Optional[Union[list, tuple]] = None, mpiarg: Optional[Union[list, tuple]] = None, group_mpiarg: bool = False, check_output: Optional[Callable[[...], None]] = None, use_multiprocessing: Optional[bool] = None, cwd: Optional[str] = None, data: Optional[dict] = None, timeout: Optional[Union[Literal['auto'], float]] = 'auto', ontimeout: Optional[Union[Literal['raise'], Callable[[], None]]] = 'raise')#

Run MPI task.

property done: bool#

Main function and child nodes executed successfully.

property elapsed: float | None#

Total walltime.

async execute()#

Execute task and child tasks.

property name: str#

Node name.

property parent: N#

Parent node.

reset()#

Reset node (including child nodes).

run()#

Wrap self.execute with asyncio.

stat(verbose: bool = False)#

Structure and execution status.

update(items: dict)#

Update properties from dict.

nnodes.node.getname(task: Union[Callable, List[str], Tuple[str, ...], str]) str | None#

Get default task name.

nnodes.node.getnargs(func: Callable) int#

Get the number of arguments required by a function.

nnodes.node.parse_import(path: Union[List[str], Tuple[str, ...]]) Any#

Import function from a custom module.