본문 바로가기
MLOps

데이터 수집(1)

by Nowkeeh Ahc

 기본 TFX 설정고 ML 메타데이터스토어를 사용하여 데이터셋을 다양한 컴포넌트에서 활용할 수 있도록 파이프라인으로 데이터를 수집하는 방법을 설명하겠다.

 

[그림1] 파이프라인의 일부인 데이터 수집

 

 TFX는 파일이나 서비스에서 데이터를 수집하는 컴포넌트를 제공한다. 

필자는 1.2.0 버전을 사용하였다. 환경은 Windows에서 Jupyternotebook을 사용한다.

 

!pip install tfx==1.2.0

 

 

데이터 수집의 개념

 데이터를 읽거나 요청하여 수집한 후 수집된 데이터셋을 다음 컴포넌트로 전달하기 전에 가용 데이터를 별도의 데이터셋 (ex. 학습 및 검증 데이터셋)으로 나눈다. 그런 다음 데이터셋을 tf.Example로 표시된 데이터가 포함된 TFRecord 파일로 변환한다.

 

TFRecord

 대용량 데이터셋 스트리밍에 최적화된 경량 형식이다. 실제로 텐서플로 사용자는 대부분 직렬화된 프로토콜 버퍼를 TFRecord 파일에 저장하지만, TFRecord 파일 형식은 실제로 다음과 같이 모든 바이너리 데이터를 지원한다.

import tensorflow as tf

with tf.io.TFRecordWriter("test.tfrecord") as w:
    w.write(b"First record")
    w.write(b"Second record")

for record in tf.data.TFRecordDataset("test.tfrecord"):
    print(record)

# tf.Tensor(b'First record', shape=(), dtype=string)
# tf.Tensor(b'Second record', shape=(), dtype=string)

 

 만약 TFRecord 파일에 tf.Example 레코드들이 있다면, 각 레코드에는 데이터의 피처들이 포함된다. 그런 다음 데이터가 바이너리 파일에 저장되므로 효율적으로 소화할 수 있다. 

https://www.tensorflow.org/tutorials/load_data/tfrecord

 

TFRecord 및 tf.train.Example  |  TensorFlow Core

TensorFlow.js의 새로운 온라인 과정에서 웹 ML을 통해 0에서 영웅으로 거듭나십시오. 지금 등록하세요 이 페이지는 Cloud Translation API를 통해 번역되었습니다. Switch to English TFRecord 및 tf.train.Example TFRec

www.tensorflow.org

 

 TFRecord의 장점

  1. 데이터 구조는 교차 플랫폼이자 교차 언어 라이브러리인 프로토콜 버퍼에 의존하여 데이터를 직렬 화하므로 시스템 독립적
  2. 대량의 데이터를 빠르게 다운로드하거나 쓰도록 최적화
  3. 텐서플로 생태계의 기본 데이터 구조이므로 모든 TFX 컴포넌트에서 사용

 

로컬 데이터 파일 수집

 ExampleGen 컴포넌트는 CSV, TFRecord, Avro, Parquet 등 몇 가지 데이터 구조를 수집할 수 있다.

 

csv데이터를 TFRecord로 변환하기

import os
from pathlib import Path
from tfx.components import CsvExampleGen
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

context = InteractiveContext()

dir_path = Path().parent.absolute()

# 데이터 경로를 정의합니다.
input_base = os.path.join(dir_path, "..", "..", "data", "taxi")
# 파이프라인 컴포넌트를 인스턴스화합니다.
example_gen = CsvExampleGen(input_base=input_base)
# 대화식으로 컴포넌트를 실행합니다.
context.run(example_gen)

 

TFRecord 파일 가져오기

import os
from pathlib import Path
from tfx.components import ImportExampleGen
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

context = InteractiveContext()

dir_path = Path().parent.absolute()
tfrecord_dir = os.path.join(dir_path, "..", "..", "data", "tfrecord_data")

example_gen = ImportExampleGen(input_base= tfrecord_dir)

context.run(example_gen)

 

parquet데이터를 TFRecord로 변환하기

from tfx.components.base import executor_spec
# 일반 파일 로더 컴포넌트를 가져옵니다.
from tfx.components import FileBasedExampleGen
# 파케이 관련 실행자를 가져옵니다.
from tfx.components.example_gen.custom_executors import parquet_executor

parquet_dir_path = "parquet_data"

# Executor를 재정의합니다.
example_gen = FileBasedExampleGen(
    input_base=parquet_dir_path,
    custom_executor_spec=executor_spec.ExecutorClassSpec(
        parquet_executor.Executor))

 

avro데이터를 TFRecord로 변환하기

from tfx.components.base import executor_spec
# 일반 파일 로더 컴포넌트를 가져옵니다.
from tfx.components import FileBasedExampleGen
# 아브로 관련 실행자를 가져옵니다.
from tfx.components.example_gen.custom_executors import avro_executor

avro_dir_path = "avro_data"

# Executor를 재정의합니다.
example_gen = FileBasedExampleGen(
    input_base=avro_dir_path,
    custom_executor_spec=executor_spec.ExecutorClassSpec(
        avro_executor.Executor))

 

사용자 지정 데이터를 TFRecord로 변환하기

우선 웹 상에서 샘플 데이터를 다운로드한 후 용도에 맞게 전처리 진행

 

import os
import tensorflow as tf
import pandas as pd
import numpy as np
import shutil
from pathlib import Path

# 웹 상에서 원시 데이터를 다운 받습니다.
filepath = tf.keras.utils.get_file(
    "complaints.csv.zip",
    "http://files.consumerfinance.gov/ccdb/complaints.csv.zip")

dir_path = Path(__file__).parent.absolute()
data_dir = os.path.join(dir_path, "..", "..", "data")
processed_dir = os.path.join(dir_path, "..", "..", "data", "processed")
Path(processed_dir).mkdir(parents=True, exist_ok=True)

# 압축을 해제합니다.
shutil.unpack_archive(filepath, data_dir)
# pandas로 csv 파일을 읽어 들입니다.
df = pd.read_csv(os.path.join(data_dir, "complaints.csv"))

df.columns = [
    "date_received", "product", "sub_product", "issue", "sub_issue",
    "consumer_complaint_narrative", "company_public_response",
    "company", "state", "zip_code", "tags",
    "consumer_consent_provided", "submitted_via",
    "date_sent_to_company", "company_response",
    "timely_response", "consumer_disputed", "complaint_id"]

df.loc[df["consumer_disputed"] == "", "consumer_disputed"] = np.nan

# 주요한 필드가 비어있는 경우 레코드를 제외합니다.
df = df.dropna(subset=["consumer_complaint_narrative", "consumer_disputed"])

# Label 필드인 consumer_disputed를 Yes, No에서 1, 0 으로 변경합니다.
df.loc[df["consumer_disputed"] == "Yes", "consumer_disputed"] = 1
df.loc[df["consumer_disputed"] == "No", "consumer_disputed"] = 0

df.loc[df["zip_code"] == "", "zip_code"] = "000000"
df.loc[pd.isna(df["zip_code"]), "zip_code"] = "000000"

df = df[df['zip_code'].str.len() == 5]
df["zip_code"] = df['zip_code'].str.replace('XX', '00')
df = df.reset_index(drop=True)
df["zip_code"] = pd.to_numeric(df["zip_code"], errors='coerce')

# 판다스 DataFrame을 csv 파일로 다시 저장합니다.
df.to_csv(os.path.join(processed_dir, "processed-complaints.csv"), index=False)

 

 기존 TFRecord파일 가져오는 부분에서 설명한 대로, 기존 데이터셋을 TFRecord 데이터 구조로 변환한 다음 ImportExampleGen 컴포넌트로 수집하는 방법이 더 간단할 수도 있다. 이 접근 방식은 효율적인 데이터 스트리밍이 가능한 데이터 플랫폼을 통해 데이터를 사용할 수 없을 때 유용하다.

 모든 유형의 데이터를 TFRecord파일로 변환하려면 데이터셋의 모든 레코드에 대해 tf.Example 구조를 만들어야 한다. tf.Example은 key-value 매핑으로 간단하지만 유연성이 높은 key-value 매핑 데이터 구조이다.

 

{"string": value}

 

TFRecord 데이터 구조의 경우 tf.Example에서는 key-value 매핑이 포함된 피처 Dictionary을 사용할 수 있는 tf.Features 객체를 포함한다. key는 항상 피처 열을 나타내는 문자열 식별자이며 값은 tf.train.Feature 객체이다.

 

Record 1:
tf.Example
	tf.Features
    	'column A': tf.train.Feature
        'column B': tf.train.Feature
        'column C': tf.train.Feature

 

tf.train.Feature은 다음 3가지 데이터 타입을 허용한다.

  • tf.train.BytesList
  • tf.train.FloatList
  • tf.train.Int64List

 

데이터 레코드를 tf.Example에서 사용하는 올바른 데이터 구조로 변환하는 데 도움이 되는 헬퍼 함수를 정의하면 코드 중복을 줄일 수 있다.

 정의한 헬퍼 함수를 사용하여 샘플 데이터셋을 TFRecord 데이터 형식의 파일로 변환해보겠다.

 

import os
import re

import tensorflow as tf
import pandas as pd
from pathlib import Path


def _bytes_feature(value):
    return tf.train.Feature(
        bytes_list=tf.train.BytesList(value=[value.encode()])
    )


def _float_feature(value):
    return tf.train.Feature(
        float_list=tf.train.FloatList(value=[value])
    )


def _int64_feature(value):
    return tf.train.Feature(
        int64_list=tf.train.Int64List(value=[value])
    )


def clean_rows(row):
    if pd.isna(row["zip_code"]):
        row["zip_code"] = "99999"
    return row


def convert_zipcode_to_int(zipcode):
    nums = re.findall(r'\d+', zipcode)
    if len(nums) > 0:
        int_zipcode = int(nums[0])
    else:
        int_zipcode = 99999
    return int_zipcode


dir_path = Path(__file__).parent.absolute()
data_dir = os.path.join(dir_path, "..", "..", "data")
tfrecord_dir = os.path.join(dir_path, "..", "..", "data", "tfrecord")
df = pd.read_csv(os.path.join(data_dir, "processed-complaints.csv"))

tfrecord_filename = "consumer-complaints.tfrecord"
tfrecord_filepath = os.path.join(tfrecord_dir, tfrecord_filename)
# tfrecord_filename에 지정된 경로에 저장하는 TFRecordWriter 객체를 만듭니다.
tf_record_writer = tf.io.TFRecordWriter(tfrecord_filepath)

for index, row in df.iterrows():
    row = clean_rows(row)
    # 모든 데이터 레코드를 tf.train.Example로 변환
    example = tf.train.Example(
        features=tf.train.Features(
            feature={
                "product": _bytes_feature(str(row["product"])),
                "sub_product": _bytes_feature(str(row["sub_product"])),
                "issue": _bytes_feature(str(row["issue"])),
                "sub_issue": _bytes_feature(str(row["sub_issue"])),
                "state": _bytes_feature(str(row["state"])),
                "zip_code": _int64_feature(convert_zipcode_to_int(row["zip_code"])),
                "company": _bytes_feature(str(row["company"])),
                "company_response": _bytes_feature(str(row["company_response"])),
                "timely_response": _bytes_feature(str(row["timely_response"])),
                "consumer_disputed": _float_feature(
                    row["consumer_disputed"]
                ),
            }
        )
    )
    # 데이터 구조를 직렬화
    tf_record_writer.write(example.SerializeToString())
tf_record_writer.close()

 

 이제 생성된 TFRecord 파일을 ImportExampleGen 구성요소로 가져올 수 있다.

 


This post was written based on what I read and studied the book below.
https://www.oreilly.com/library/view/building-machine-learning/9781492053187/

댓글