zjb.main.manager.workspace 源代码

import ulid
from traits.api import List

from zjb._traits.types import Instance
from zjb.doj.job_manager import JobManager
from zjb.doj.worker import Worker

from ..dtb.atlas import Atlas
from ..dtb.dynamics_model import DynamicsModel
from .project import Project


[文档] class Workspace(Project): """ 工作空间类,继承自Project类,用于管理和处理与工作空间相关的数据和操作。 Attributes ---------- manager : Instance(JobManager) 与工作空间相关联的作业管理器实例。 name : str 工作空间名称,默认为'Workspace'。 parent : None 工作空间没有父项目,始终为None。 atlases : List(Instance(Atlas)) 工作空间中包含的脑图谱实例的列表。 dynamics : List(Instance(DynamicsModel)) 工作空间中包含的动力学模型列表。 workers : List(Instance(Worker)) 工作空间中的处理器列表。 """ manager = Instance(JobManager, transient=True) name = "Workspace" parent: None = None # type: ignore atlases = List(Instance(Atlas)) dynamics = List(Instance(DynamicsModel)) workers = List(Instance(Worker), transient=True)
[文档] @classmethod def from_manager(cls, manager: JobManager, gid: "ulid.ULID | None" = None): """ 从作业管理器创建工作空间实例。如果提供了全局唯一标识符(gid),则使用它来创建实例;否则从作业管理器获取数据创建工作空间。 Parameters ---------- manager : JobManager 作业管理器实例。 gid : ulid.ULID | None, optional 全局唯一标识符,可选,默认为None。 Returns ------- Workspace 创建的工作空间实例。 """ if gid: return super().from_manager(manager, gid) data = next(manager.iter(), None) if not data: # manager为空 workspace = cls() workspace._gid = ulid.MIN_ULID manager.bind(workspace) workspace.manager = manager return workspace if isinstance(data, cls): data.manager = manager return data raise ValueError("The workspace must be created from an empty JobManager")
[文档] def unbind(self): """解除工作区与作业管理器的绑定关系。由于工作区不支持解绑操作,因此此方法会抛出RuntimeError。""" raise RuntimeError( "The workspace cannot unbind from the manager, delete the data files directly." )
[文档] def start_workers(self, count: int = 1): """启动一些Worker Parameters ---------- count : int, optional 要启动的Worker数量, by default 1 """ new_workers = [Worker(manager=self.manager) for _ in range(count)] for worker in new_workers: worker.start() self.workers += new_workers
[文档] def remove_idle_workers(self, count: int = 0): """移除一些空闲的Worker Parameters ---------- count : int, optional 要移除的空闲Worker数量, 小于等于0时会移除所有空闲Worker, by default 0 """ terminated_workers: list[Worker] = [] for worker in self.workers: result = worker.terminate() if result: terminated_workers.append(worker) count -= 1 if count == 0: break for worker in terminated_workers: self.workers.remove(worker)