본문 바로가기
Python/Data Analysis

Python 대용량 데이터 처리 라이브러리 - Dask

by skwkiix 2024. 2. 4.
728x90

 

 

 

 

지난 포스팅에서는, pandas 라이브러리로 대용량 데이터를 불러올 때 가장 쉽게 사용할 수 있는 파라미터에 대해 알아봤다.

하지만, 데이터 용량이 크거나,  많은 파일의 시계열 데이터를 concat하여 처리해야 할 때는 pandas 라이브러리만 사용해서는 효율적인 처리가 어려우며 병렬처리를 고려해야 하는 상황이 발생할 수 있다.

 


Dask는 가상 데이터프레임을 사용해서 대규모 데이터셋을 병렬로 처리한다.

Apache Spark와 비슷하지만 numpy, pandas와 긴밀하게 연결되어 있고, Pandas 와 유사한 API를 제공하기 때문에 이해가 쉽다.

Dask는 병렬 처리를 통해 대용량 데이터 처리 성능을 향상시키고, 병목 현상을 줄여준다.


Dask 병렬 처리 작동원리

  1. 지연 평가 (Lazy Evaluation)
    • Dask의 가상 데이터프레임은 지연평가 방식을 사용함
    • 지연평가 : 데이터를 메모리에 로드하지 않고 작업을 지연시키는 것(데이터 처리 작업을 그래프로 나타내어 나중에 실행될 수 있도록 함)
  2. 분산 데이터 처리
    • 대용량 데이터를 가상의 데이터 프레임으로 분산하여 병렬로 처리(여러 머신의 여러 CPU 코어에서 작업을 분산하고 병렬로 실행하여 대규모 데이터셋을 처리)
  3. 청크 단위 처리
    • Dask의 가상 데이터프레임은 데이터를 작은 청크(chunk)로 분할하여 처리
    • 각 청크는 메모리에 맞지 않는 대규모 데이터셋의 작은 일부분으로, 메모리에 맞게 조절해서 사용

Dask 병렬 처리 과정 요약

데이터를 작은 청크(chunk)로 분할 > 청크를 처리하는 작업들을 정의하고, 의존성을 그래프로 표현 > 작업 스케줄링(컬 머신의 여러 코어나 분산 컴퓨팅 클러스터의 여러 노드에 분산하여 실행) > 여러개의 작업을 병렬로 실행(의존성 고려)

https://www.sktenterprise.com/bizInsight/blogDetail/dev/4704


 

소스 코드

- 간단한 예시

import dask.dataframe as dd

# CSV 파일을 Dask DataFrame으로 불러오기
df = dd.read_csv('your_file.csv')

# 파일의 첫 5개 행 출력하기
print(df.head())

 

pandas 와 유사하게 데이터 프레임을 불러온다. 여기서 pandas dataframe으로 변환하려면 compute() 함수를 사용하면 된다.

 

- 실전에서 사용하기

상황 : file_list에 있는 file 3개를 모두 resampled_fuction을 적용한 후 concat 해야함(각 파일은 모두 시계열 데이터)

import dask.dataframe as dd
import pandas as pd

# 데이터를 담을 빈 리스트 생성
combined_dfs = []

# file_list는 파일 경로의 리스트
file_list = ['file1.csv', 'file2.csv', 'file3.csv']

# 각 파일에 대해 처리
for file in file_list:
    print(f"Reading and processing file: {file}")
    
    # 파일을 Dask DataFrame으로 읽기
    dask_df = dd.read_csv(file, usecols=['col1', 'col2'], dtype={'col1': float, 'col2': float})
    
    # 변환 함수 적용
    computed_df = dask_df.compute()
    processed_df = resampled_function(computed_df)
    combined_dfs.append(processed_df)

combined_df = pd.concat(combined_dfs, ignore_index=False)

 

1. 모든 파일을 dask로 각각 읽어온 후 pandas dataframe 형식으로 compute 한다. 

2. 각각의 파일에 resampled_fuction(임의로 생성한 함수)를 적용한 후 append한다.

3. 모든 처리과정이 끝난 df를 concat 한다.

 

불러와야 하는 데이터 크기가 매우 크고 커널이 자주 종료된다면,

특히나 리샘플링이 필요한 시계열 데이터라면,

pandas 내에서 chunk를 사용하게 되면, 해당 행 개수만큼 처리하므로 리샘플링 처리가 꼬일 수 있다 > Dask 를 사용하자!