본문 바로가기

ML & AI/kubeflow

MiniKF - Tutorial 따라하기_Taxi Cab #2

(작성중)

 

Kubeflow Pipeline을 작성하는 두가지 방법

  • kfp.dsl 패키지의 ContainerOP 사용 : ML Workflow의 각 Step을 독립적으로 수행가능한 Component(container 형태)로 패키징하고, ContainerOP의 매개변수로 container Image를 전달하여 사용(Tips: GCP환경에서는 Pipeline SDK를 사용하여 각 Step을 Container로 바로 빌드할 수 있는 container build 기능을 제공) 
  • kfp.dsl 패키지를 사용하여 Python 함수를 Pipeline으로 변환 : Docker Image가 필요없이 DSL SDK를 사용하여 Python 코드를 직접 YAML 형식으로 컴파일하여 사용 

Taxi Cab 튜토리얼의 Pipeline은 다음과 같이 8개의 Step Component로 Workflow를 정의하고 있으며, ContainerOP를 위한 docker image를 제공하고 있다.

  • dataflow_tf_data_validation_op
  • dataflow_tf_transform_op
  • tf_train_op
  • dataflow_tf_model_analyze_op
  • dataflow_tf_predict_op
  • confusion_matrix_op
  • roc_op
  • kubeflow_deploy_op
Taxi Cap Pipeline 기본 구조 (taxi-cap-pipeline-snap.py)
import kfp.dsl as dsl --------------------------------------------------------------------------- ①

@dsl.pipeline( -----------------------------------------------------------------------------------

    name='Taxi Cab on-prem',
    description='Example pipeline that does classification with model analysis based on a public BigQuery dataset for on-prem cluster.'
)

def taxi_cab_classification( ---------------------------------------------------------------------
        rok_url,
        rok_register_url,
        pvc_size='1Gi',
        project='tfx-taxi-pipeline-on-prem',
        column_names='taxi-cab-classification/column-names.json',
        key_columns='trip_start_timestamp',
        train='taxi-cab-classification/train.csv',
        evaluation='taxi-cab-classification/eval.csv',
        ...
):

if __name__ == '__main__':
    import kfp.compiler as compiler -------------------------------------------------------------
    compiler.Compiler().compile(taxi_cab_classification, __file__ + '.tar.gz') -------------------
  1. kfp.dsl : DSL 컴파일 패키지 임포트
  2. kfp.compiler : kubeflow pipeline 을 컴파일하기 위한 기능을 제공하는 패키지
  3. Compiler().compile(함수명, 파이프라인 리소스명) : Python DSL 코드를 Pipeline YAML 파일로 변환
  4. taxi_cab_classification : Pipeline 실행 함수명
  5. @dsl.pipeline : Kubernetes의 리소스 메타정보 데코레이션(필수)
kfp.dsl.VolumeOp

Kubernetes PV(Persistent Volume)을 생성한다.

vop = dsl.VolumeOp(
    name='create-volume',
    resource_name='taxi-cab-data',
    annotations={"rok/origin": rok_url},
    size=pvc_size
)
  • name : k8s의 볼륨 이름 설정
  • resource_name(필수) : k8s의 볼륨 컨트롤러 이름 설정
  • annotations : 이건 모지?
  • size(필수) : 보륨 크기 설정(예제에서는 1Gi 로 설정하였다)
dataflow_tf_data_validation_op
def dataflow_tf_data_validation_op(inference_data, validation_data, column_names,
                                             key_columns, project, mode, validation_output, volume, 
                                             step_name='validation'):
    return dsl.ContainerOp(
        name=step_name,
        image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:6ad2601ec7d04e842c212c50d5c78e548e12ddea',
        arguments=[
            '--csv-data-for-inference', inference_data,
            '--csv-data-to-validate', validation_data,
            '--column-names', column_names,
            '--key-columns', key_columns,
            '--project', project,
            '--mode', mode,
            '--output', '%s/{{workflow.name}}/validation' % validation_output,
        ],
        file_outputs={
            'schema': '/schema.txt',
            'validation': '/output_validation_result.txt',
        },
        pvolumes={validation_output: volume}
    )


validation = dataflow_tf_data_validation_op(
    '/mnt/%s' % train,
    '/mnt/%s' % evaluation,
    '/mnt/%s' % column_names,
    key_columns, project,
    mode,
    '/mnt', vop.volume
)

[ContainerOp 인자값]

  • name(필수) : 컴포넌트 명 설정
  • image(필수) : Container 실행을 위한 Image 설정
  • arguments : Container 실행을 위한 내부 인자값(String or StringList) 설정
  • file_outputs : 아웃풋 라벨과 Pipeline 실행 결과를 저장할 로컬파일 설정 (예제에서는 'schema' 와 'validation' 두 개의 라벨을 지정하고, 각각의 파일명으로 저장한다.) 
  • pvolumes : 생성한 볼륨을 컨테이너의 경로에 마운트(예제에서는 '/mnt' 경로에 vop.volume을 마운트하도록 인자를 받았다)
dataflow_tf_transform_op

 

tf_train_op

 

dataflow_tf_model_analyze_op

 

dataflow_tf_predict_op

 

confusion_matrix_op

 

roc_op

 

kubeflow_deploy_op

Pipeline 소스를 모두 분석하고, 이 후 단계를 진행하려니 진행이 너무 더디다. 우선 대략적으로 소스 구조를 이해했으니 다음 단계를 먼저 진행하고 추 후에 나머지 소스를 더 분석해봐야 겠다.