pipeline sdk是使用python配合kubeflow pipelines功能的工具包,文档在此进行了简单搬运,如果有想了解更多可以阅读原始文档:
https://www.bookstack.cn/read/kubeflow-1.0-en/dc127d6c8622b832.md
此外,为了简化用户使用kubeflow pipelines功能,对常用api做了使用封装。

常用api简介

将一个用dsl.pipeline装饰的工作流方法,编译成一个k8s任务.yaml配置的压缩文件:
Compiler.compile

class Compiler(object):
  """DSL Compiler.

  It compiles DSL pipeline functions into workflow yaml. Example usage:
 
  @dsl.pipeline(
    name='name',
    description='description'
  )
  def my_pipeline(a: int = 1, b: str = "default value"):
    ...

  Compiler().compile(my_pipeline, 'path/to/workflow.yaml')
 
  """


def compile(self, pipeline_func, package_path, type_check=True, pipeline_conf: dsl.PipelineConf = None):
  """Compile the given pipeline function into workflow yaml.

  Args:
    pipeline_func: pipeline functions with @dsl.pipeline decorator.
    package_path: the output workflow tar.gz file path. for example, "~/a.tar.gz"
    type_check: whether to enable the type check or not, default: False.
    pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline.
  """

构建pipeline组件依赖的类和方法:
python在工作目录下,构建pipeline组件依赖镜像的方法
build_image_from_working_dir

def build_image_from_working_dir(image_name: str = None, working_dir: str = None, file_filter_re: str = r'.*\.py',  timeout: int = 1000, base_image: str = None, builder: ContainerBuilder = None) -> str:
    '''build_image_from_working_dir builds and pushes a new container image that captures the current python working directory.

    This function recursively scans the working directory and captures the following files in the container image context:
    * requirements.txt files
    * all python files (can be overridden by passing a different `file_filter_re` argument)

    The function generates Dockerfile that starts from a python container image, install packages from requirements.txt (if present) and copies all the captured python files to the container image.
    The Dockerfile can be overridden by placing a custom Dockerfile in the root of the working directory.

    Args:
        image_name: Optional. The image repo name where the new container image will be pushed. The name will be generated if not not set.
        working_dir: Optional. The directory that will be captured. The current directory will be used if omitted.
        file_filter_re: Optional. A regular expression that will be used to decide which files to include in the container building context.
        timeout: Optional. The image building timeout in seconds.
        base_image: Optional. The container image to use as the base for the new image. If not set, the Google Deep Learning Tensorflow CPU image will be used.
        builder: Optional. An instance of ContainerBuilder or compatible class that will be used to build the image.
          The default builder uses "kubeflow-pipelines-container-builder" service account in "kubeflow" namespace. It works with Kubeflow Pipelines clusters installed in "kubeflow" namespace using Google Cloud Marketplace or Standalone with version > 0.4.0.
          If your Kubeflow Pipelines is installed in a different namespace, you should use ContainerBuilder(namespace='<your-kfp-namespace>', ...).
          Depending on how you installed Kubeflow Pipelines, you need to configure your ContainerBuilder instance's namespace and service_account:
          For clusters installed with Kubeflow >= 0.7, use ContainerBuidler(namespace='<your-user-namespace>', service_account='default-editor', ...). You can omit the namespace if you use kfp sdk from in-cluster notebook, it uses notebook namespace by default.
          For clusters installed with Kubeflow < 0.7, use ContainerBuilder(service_account='default', ...).
          For clusters installed using Google Cloud Marketplace or Standalone with version <= 0.4.0, use ContainerBuilder(namespace='<your-kfp-namespace>' service_account='default')
          You may refer to https://www.kubeflow.org/docs/pipelines/installation/overview/ for more details about different installation options.

    Returns:
        The full name of the container image including the hash digest. E.g. gcr.io/my-org/my-image@sha256:86c1...793c.
    '''

将一个python方法转化成一个pipeline的组件:
kfp.components.func_to_container_op

def func_to_container_op(func, output_component_file=None, base_image: str = None, extra_code='', packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False):
    '''
    Converts a Python function to a component and returns a task (ContainerOp) factory

    Function docstring is used as component description.
    Argument and return annotations are used as component input/output types.
    To declare a function with multiple return values, use the NamedTuple return annotation syntax:

        from typing import NamedTuple
        def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('sum', float), ('product', float)]):
            """Returns sum and product of two arguments"""
            return (a + b, a * b)

    Args:
        func: The python function to convert
        base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.13.2-py3
        output_component_file: Optional. Write a component definition to a local file. Can be used for sharing.
        extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
        packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function.
        modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependecy.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.
        use_code_pickling: Specifies whether the function code should be captured using pickling as opposed to source code manipulation. Pickling has better support for capturing dependencies, but is sensitive to version mismatch between python in component creation environment and runtime image.

    Returns:
        A factory function with a strongly-typed signature taken from the python function.
        Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp) that can run the original function in a container.
    '''

从某个文件中读取pipeline组件的配置:
kfp.components.load_component_from_file

def load_component_from_file(filename):
    '''
    Loads component from file and creates a task factory function
    
    Args:
        filename: Path of local file containing the component definition.

    Returns:
        A factory function with a strongly-typed signature.
        Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp).
    '''

从某个链接中读取pipeline组件的配置:
kfp.components.load_component_from_url

def load_component_from_url(url):
    '''
    Loads component from URL and creates a task factory function
    
    Args:
        url: The URL of the component file data

    Returns:
        A factory function with a strongly-typed signature.
        Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp).
    '''

和pipeline相关联的一些配置方法和概念:
依赖镜像构建的pipeline组件:
kfp.dsl.ContainerOp

class ContainerOp(BaseOp):
    """
    Represents an op implemented by a container image.
    
    Example::

        from kfp import dsl
        from kubernetes.client.models import V1EnvVar, V1SecretKeySelector


        @dsl.pipeline(
            name='foo',
            description='hello world')
        def foo_pipeline(tag: str, pull_image_policy: str):

            # any attributes can be parameterized (both serialized string or actual PipelineParam)
            op = dsl.ContainerOp(name='foo', 
                                image='busybox:%s' % tag,
                                # pass in init_container list
                                init_containers=[dsl.UserContainer('print', 'busybox:latest', command='echo "hello"')],
                                # pass in sidecars list
                                sidecars=[dsl.Sidecar('print', 'busybox:latest', command='echo "hello"')],
                                # pass in k8s container kwargs
                                container_kwargs={'env': [V1EnvVar('foo', 'bar')]},
            )

            # set `imagePullPolicy` property for `container` with `PipelineParam` 
            op.container.set_image_pull_policy(pull_image_policy)

            # add sidecar with parameterized image tag
            # sidecar follows the argo sidecar swagger spec
            op.add_sidecar(dsl.Sidecar('redis', 'redis:%s' % tag).set_image_pull_policy('Always'))
    
    """

    # list of attributes that might have pipeline params - used to generate
    # the input parameters during compilation.
    # Excludes `file_outputs` and `outputs` as they are handled separately
    # in the compilation process to generate the DAGs and task io parameters.

pipeline组件之间传递的参数:
kfp.dsl.PipelineParam

class PipelineParam(object):
  """Representing a future value that is passed between pipeline components.

  A PipelineParam object can be used as a pipeline function argument so that it will be a
  pipeline parameter that shows up in ML Pipelines system UI. It can also represent an intermediate
  value passed between components.
  """
  
  def __init__(self, name: str, op_name: str=None, value: str=None, param_type : Union[str, Dict] = None, pattern: str=None):
    """Create a new instance of PipelineParam.
    Args:
      name: name of the pipeline parameter.
      op_name: the name of the operation that produces the PipelineParam. None means
               it is not produced by any operator, so if None, either user constructs it
               directly (for providing an immediate value), or it is a pipeline function
               argument.
      value: The actual value of the PipelineParam. If provided, the PipelineParam is
             "resolved" immediately. For now, we support string only.
      param_type: the type of the PipelineParam.
      pattern: the serialized string regex pattern this pipeline parameter created from. 
    Raises: ValueError in name or op_name contains invalid characters, or both op_name
            and value are set.
    """

kubeflow pipelines客户端构建任务的方法:
构建pipeline实验,返回实验编号和名称:
kfp.Client.create_experiment

def create_experiment(self, name, description=None, namespace=None):
  """Create a new experiment.
  Args:
    name: the name of the experiment.
    description: description of the experiment.
    namespace: kubernetes namespace where the experiment should be created.
      For single user deployment, leave it as None;
      For multi user, input a namespace where the user is authorized.
  Returns:
    An Experiment object. Most important field is id.
  """

一次实验的运行:
kfp.Client.run_pipeline

def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, params={}, pipeline_id=None, version_id=None):
  """Run a specified pipeline.

  Args:
    experiment_id: The string id of an experiment.
    job_name: name of the job.
    pipeline_package_path: local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
    params: a dictionary with key (string) as param name and value (string) as as param value.
    pipeline_id: the string ID of a pipeline.
    version_id: the string ID of a pipeline version.
      If both pipeline_id and version_id are specified, version_id will take precendence.
      If only pipeline_id is specified, the default version of this pipeline is used to create the run.

  Returns:
    A run object. Most important field is id.
  """

构建pipelines组件的方法

关于构建pipelines组件,官方给了如下几种方法:

从已有业务代码中构建组件

组件构建结构图可以参考下图:
在这里插入图片描述
组件的构造可以分2步进行:
第一步:将工程代码打成镜像
第二步:通过dsl.component方法,依赖工程镜像返回一个ContainerOp(容器操作对象),即可实现pipeline组件的构造
这种方式对原始工程的代码改动较小,且适合多人协作,比较推荐。

侵入业务代码构建组件

不同于从已有业务代码中构建组件的方法,此种构造方法需要对原始代码修改,组件构造结构图可以参考下图:
在这里插入图片描述
组件的构造很简单,也可分2步进行:
第一步:使用python_component装饰器,将目标方法定义为pipeline组件方法
第二步:使用build_python_component方法将组件方法打成镜像,同时返回一个ContainerOp(容器操作对象),如此可实现pipeline组件构造
这种方式虽然省事,但是需要将pipeline sdk的方法侵入到工程代码。

构建轻量组件

构造轻量组件的方法,不需要手动将工程代码打成镜像,适合快速迭代;组件构造结构图可以参考下图:
在这里插入图片描述

复用pipelines组件

如果工作中,有些pipeline组件比较抽象或者常用,可以考虑复用组件:
使用kfp.components.load_component_from_url或者kfp.components.load_component_from_file api
例如:
加载pipeline组件

my_op = kfp.components.load_component_from_url('https://path/to/component.yaml')

定义pipeline工作流

@kfp.dsl.pipeline(
  name='My pipeline',
  description='My machine learning pipeline'
)
def my_pipeline(param_1: PipelineParam, param_2: PipelineParam):
  my_step = my_op(a='a', b='b')

参考对于业务中常用的组件可以构建组件库,提高开发工作效率

编译pipeline任务

编译工作流任务目前由2中方式:

  • 终端中使用命令行编译
  • python代码中编译

针对kubeflow平台用户做的api封装

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐