본문 바로가기
MLOps

데이터 전처리

by Nowkeeh Ahc

 이번에는 머신러닝 모델을 피처의 수치 표현으로 학습할 수 있도록 피처들을 일관된 수치 표현으로 변환하는 방법, 전처리에 대해 알아보겠다. 데이터 전처리를 위한 TFX 컴포넌트인 TensorFlow Transform(TFT)을 사용하여 전처리 단계를 텐서플로 그래프로 구성할 수 있다.

 

우선 데이터 전처리를 표준화해야 하는 이유 부터 아래 설명하겠다.

  • 전체 데이터셋의 콘텍스트에서 데이터를 효율적으로 전처리
  • 전처리 단계를 효과적으로 확장
  • 잠재적인 *'학습-서빙 왜곡'을 방지

 

*'학습-서빙 왜곡' 이란???

모델 학습 중에 사용한 전처리 단계가 추론 중에 사용한 단계와 일치하지 않으면 학습-서빙 왜곡이 발생한다. 모델을 학습하는 데 사용하는 데이터는 Pandas와 함께 파이썬 노트북이나 스파크 작업에서 처리할 때가 많다. 모델이 프로덕션 설정으로 배포되면 예측을 위해 데이터가 모델에 도달하기 전에 API에서 전처리 단계가 구현된다. 아래 그림에서 볼 수 있듯이 이 두 프로세스는 단계가 항상 일치하도록 조정해야 한다.

TFT를 사용하면 전처리 단계가 올바르게 진행되도록 할 수 있다. 아래 그림과 같이 예측 요청 클라이언트는 이제 원시 데이터를 제출할 수 있으며, 배포된 모델 그래프에서 전처리가 수행된다.

위와 같은 설정은 필요한 조정량을 줄이고 배포를 단순화 한다.

 

이전에 설명한 TFDV와 마찬가지로 TFX 프로젝트의 일부인 TFT는 이전에 생성한 데이터셋 스키마를 사용하여 파이프라인에 수집된 데이터를 처리하고 다음 두 가지 아티팩트를 출력한다.

  • 전처리한 TFRecord 형식의 학습 및 평가 데이터셋, 생성된 데이터셋은 파이프라인의 Trainer 컴포넌트에서 사용된다.
  • 전처리 그래프 (에셋 파일 포함), 머신러닝 모델을 내보낼 때 사용

 

TFT의 핵심은 preprocessing_fn() 함수이다. 이는 raw 데이터에 적용할 모든 변환을 정의한다. Transform 컴포넌트를 실행하면 preprocessing_fn 함수가 원시 데이터를 수신하고 변환을 적용하며 처리된 데이터를 반환한다. 데이터느 ㄴ피처에 따라 텐서플로 Tensor 또는 SparseTensor로 제공된다. Tensor에 적용되는 변환은 모두 텐서플로 작업이어야 한다. 이를 통하여 TFT는 전처리 단계를 효과적으로 배포할 수있다.

 

이제 한 번 TFT를 사용해보겠다.

 

설치

이미 처음에 TFX 패키지를 설치할 때 TFT가 설치되었다. TFT를 독립 실행 패키지로 사용하려면 다음 코드를 사용하여 설치하면 된다.

pip install tensorflow-transform

 

전처리 전략

앞서 설명했던거와 같이 적용된 변환 방식은 preprocessing_fn()이라는 함수에서 정의한다. 그런 다음 Transform 파이프라인 구성 요소나 standalone TFT 설정에서 해당 피처를 사용한다. 아래 코드가 이제 자세히 설명할 전처리 피처의 예시이다.

import tensorflow_transform as tft 
def preprocessing_fn(inputs):
	x = inputs['x']
	x_normalized = tft.scale_to_0_l(x) 
	return {
		'x_xf': x_normalized 
	}

 위 함수는 입력 배치를 python dictionary로 수신한다. key는 피처의 이름이며 전처리를 적요하기 전의 raw 데이터를 나타내는 값이다. 

TFT는 먼저 위 그림과 같이 분석 단계를 수행한다.

아파치 빔에서 전처리 단계를 실행하면 이 단계를 분산 방식으로도 수행할 수 있다. 두번째 데이터 전달 과정에서 결정된 값 (여기서는 피처 열의 최솟값과 최댓값)을 사용하여 아래 그림과 같이 피처 x를 0과 1 사이에서 스케일링 한다.

 

 

TFT 실행

 

 예제로 다음 소스 코드의 소규모 raw 데이터셋에 전처리를 적용해보겠다.

raw_data = [
	{'x': 1.20}, 
	{'x':2.99},
	{'x':100.00}
]

 우선 데이터 스키마를 정의해야 한다. 다음 코드와 같이 피처 사양에서 스키마를 생성할 수 있다. 예시 데이터셋에는 x라는 피처 하나만 포함된다. tf.float32 데이터 유형으로 해당 피처를 정의해보겠다.

import tensorflow as tf
from tensorfl.ow_transforin.tf.metadata import dataset_metadata 
from tensorfl.ow_transforin.tf.metadata import schema_utils

raw_data_metadata = dataset.metadata.DatasetMetadata( 
	schema_utils.schema_from_feature_spec({
		'x_: tf.io.FixedLenFeature([], tf.float32), 
	}))

 

데이터셋을 로드하고 데이터 스키마를 생성한 다음에는 앞에서 정의한 전처리 함수 preprocessing_fn을 실행할 수 있다. TFT는 AnalyzeAndTransformDataset 피처를 사용하여 아파치 빔에서 실행하기 위한 바인딩을 제공한다. 이 피처는 앞에서 설명한 2단계 프로세스를 수행한다. 먼저 데이터셋을 분석한 후 변환한다. 실행은 파이썬 콘텍스트 관리자 tft_beam.Context를 통해 수행된다. 이렇게 원하는 설정 (ex. 배치 크기)을 할 수 있다. 하지만 일반적으로는 기본 배치 크기를 사용할 때 성능이 더 좋다.

아래 예제에서 AnalyzeAndTransformDataset 함수의 사용법을 보겠다.

import tempfile
import tensorflow_transform.beam.impl as tft_beam 

with beam.P丄pelineO as pipeline:
	with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    
		tfrecord_fi1e = "/your/tf_records_file.tfrecord" 
		raw_data = (
			pipeline | beam.io.ReadFromTFRecord(tfrecord_file)) 
            
		transfromed_dataset, transform_fn = (
			(raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset( 
				preprocessing_fn))

 

'Hello World' 를 테스트하고 전처리 단계를 실행한 후 결과를 출력하면 아래와 같이 처리된 데이터셋이 표시된다.

transformed.data, transformed_metadata = transformed_dataset 
print(transformed_data)



[
	{'x_xf1: 0.0},
	{'x_xf': 0.018117407}, 
	{'x_xf': 1.0}
]

 

아래는 완성된 코드이다.

import pprint
import tempfile

import tensortlow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf.metadata import dataset_metadata 
from tensorflow_transform.tf.metadata import schema_utils


def preprocessing_fn(inputs):
	"""입력 열을 변환된 열로 전처리합니다."""
	print(inputs) 
	x = inputs['x']
	x_xf = tft.scale_to_0_l(x) 
	return {
		'x_xf': x_xf,
	}
    
   
raw_data = [ 
	{'x': 1.20}, 
	{'x': 2.99}, 
	{'x': 100.0}
]

raw_data_metadata = dataset_metadata.DatasetMetadata( 
	schema_utils.schema_from_feature_spec({
		'x': tf.io.FixedLenFeature([], tf.float32), 
	}))
    
with tft_beam.Con text(temp_di r=tempfile.mkd temp()): 
	transformed_dataset, transform_fn = (
			(raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransfomDataset( 
        preprocessing_fn))
        
transformed_data, transformed_metadata = transformed_dataset    # pylint:    disable=unused- 
variable

pprint.pprint(transformed_data)

 

끝으로 위 완성된 코드를 머신러닝 파이프라인에 통합해보겠다.

 

TFT를 머신러닝 파이프라인에 통합하기

 

아래 코드에서 피처를 정의한다. 추후에 편하게 처리하기 위해 각 변환 출력 데이터 유형을 나타내는 딕셔너리의 입력 피처 이름을 (원-핫 인코딩, 버킷 처리, raw 문자열 표현으로) 그룹화한다.

import tensorflow as tf
import tensorflow_transform as tft 

LABEL_KEY = "consumer_disputed"

#  피처 이름, 피처 차원
ONE_HOT_FEATURES = { 
	"product": 11,
	"sub_product": 45,
	"company_response": 5, 
	"state": 60,
	"issue": 90 
}

#  피처 이름, 버킷 개수
BUCKET_FEATURES = {
	"zip_code": 10 
}

#  피처 이름,값은 정의되지 않음
TEXT_FEATURES = {
	"consumer_complaint_narrative": None 
}

이런 입력 피처 딕셔너리를 반복하기 전에 데이터를 효율적으로 변환하는 몇 가지 헬퍼 함수를 정의하면 편하다. 피처 이름에 접미사 (ex: _xf)를 추가하여 피처 이름을 변경하느 ㄴ것이 좋다. 접미사를 사용하면 오류가 입력 또는 출력 피처 중 어디에서 발생하는지 구별하는 데 도움이 되고, 실제 모델에서 변환되지 않은 피처를 실수로 사용하지 않도록 방지할 수 있다.

def transformed_name(key):
	return key + '_xf'

일부 피처는 Sparse 하지만 TFT는 변환 출력이 Dense 할 것으로 예상하고 있다. 다음 헬퍼 함수를 사용하여 Sparse를 Dense로 변환하고 결측값을 기본값으로 채울 수 있다.

def fill_in_missing(x):
	default_value = '' if x.dtype == tf.string or to_string else 0 
	if type(x) == tf.SparseTensor:
		x = tf.sparse.to_dense(
			tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
				default_value)
	return tf.squeeze(x, axis=l)

다음 헬퍼 함수는 지정된 인덱스를 원-핫 인코딩된 표현으로 변환하고 벡터를 반환한다.

def convert_num_to_one_hot(label_tensor, num_labels=2): 
	one_hot_tensor = tf.one_hot(label_tensor, num_labels) 
	return tf.reshape(one_hot_tensor', [-1, num_labels])

이거 외에도 모델에 맞게 무수한 헬퍼함수들을 만들어 사용할 수 있다.

 

드디어 완성된 코드이다.

from typing import Union

import tensorflow as tf
import tensorflow_transform as tft

# 피처 이름, 피처 차원
ONE_HOT_FEATURES = {
    "product": 11,
    "sub_product": 45,
    "company_response": 5,
    "state": 60,
    "issue": 90,
}

# 피처 이름, 버킷 개수
BUCKET_FEATURES = {"zip_code": 10}

# 피처 이름, 값은 정의되지 않음
TEXT_FEATURES = {"consumer_complaint_narrative": None}

LABEL_KEY = "consumer_disputed"


def transformed_name(key):
    return key + "_xf"


def fill_in_missing(x: Union[tf.Tensor, tf.SparseTensor]) -> tf.Tensor:
    if isinstance(x, tf.sparse.SparseTensor):
        default_value = "" if x.dtype == tf.string else 0
        x = tf.sparse.to_dense(
            tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
            default_value,
        )
    return tf.squeeze(x, axis=1)


def convert_num_to_one_hot(label_tensor, num_labels=2):
    one_hot_tensor = tf.one_hot(label_tensor, num_labels)
    return tf.reshape(one_hot_tensor, [-1, num_labels])


def convert_zip_code(zipcode):
    if zipcode == "":
        zipcode = "00000"
    zipcode = tf.strings.regex_replace(zipcode, r"X{0,5}", "0")
    zipcode = tf.strings.to_number(zipcode, out_type=tf.float32)
    return zipcode


def preprocessing_fn(inputs):
    outputs = {}

    for key in ONE_HOT_FEATURES.keys():
        dim = ONE_HOT_FEATURES[key]
        int_value = tft.compute_and_apply_vocabulary(
            fill_in_missing(inputs[key]), top_k=dim + 1
        )
        outputs[transformed_name(key)] = convert_num_to_one_hot(
            int_value, num_labels=dim + 1
        )

    for key, bucket_count in BUCKET_FEATURES.items():
        temp_feature = tft.bucketize(
            convert_zip_code(fill_in_missing(inputs[key])),
            bucket_count,
            always_return_num_quantiles=False,
        )
        outputs[transformed_name(key)] = convert_num_to_one_hot(
            temp_feature, num_labels=bucket_count + 1
        )

    for key in TEXT_FEATURES.keys():
        outputs[transformed_name(key)] = fill_in_missing(inputs[key])

    outputs[transformed_name(LABEL_KEY)] = fill_in_missing(inputs[LABEL_KEY])

    return outputs

 

 TFX는 Transform  컴포넌트를 실행하면 로드된 입력 데이터에 위 완성 코드.py 모듈 파일에 정의된 변환을 적용한다. 이 변환은 데이터 수집 단계에서 TFRecord 데이터 구조로 변환된다. 그러면 컴포넌트는 변환된 데이터, 변환 그래프, 필수 메타데이터를 출력한다.

 


This post was written based on what I read and studied the book below. 
https://www.oreilly.com/library/view/building-machine-learning/9781492053187/

댓글