반응형

금융 데이터를 이용해서 이런저런 실험을 해보다보면

큰 데이터를 가지고 작업을 할 일이 매우 많다.

주가 데이터가 큰 것도 있고,

많은 테스트셋을 만들려면 당연히 데이터가 커질 수 밖에 없다.

100개 단위의 데이터에서는 큰 차이를 느낄 수 없지만,

데이터의 양이 10만, 100만 단위로 간다면 반복문을 사용해보면 시간 소모가 커짐을 체감할 수 있다.

그래서 이럴때 멀티프로세싱을 사용하면 고속으로 데이터 처리를 할 수 있다.

어떻게 사용할 수 있는지, 속도는 얼마나 체감이 되는지 예제를 통해 알아보자.


(1) 멀티프로세싱이란?

네이버 사전에 멀티프로세싱을 검색해 보았다.(한국말인 다중처리로 검색하였음)

1. 여러 개의 처리 장치를 갖춘 하나의 컴퓨터 시스템에서, 하나는 시스템을 제어하고 다른 것들은 그것을 보조하는 기능을 수행하게 하는 처리 방식.

2. 하나의 작업이나 프로세스를 논리적 또는 기능적으로 분할하여 여러 개의 컴퓨터에서 비동기적으로 동시에 수행하게 하는 처리 방식

3. 시분할 방식에 의하여 여러 프로그램이나 그 부분들을 동시에 수행하는 것.

 

여기서 내가 설명할 멀티프로세싱의 예제는 2번에 가깝다.

작업(한개의 함수를 여러번 수행)을 분할하여 동시에 수행하게 할 것이기 때문이다.


(2) 파이썬에서의 멀티프로세싱

파이썬에서는 multiprocessing 이라는 모듈을 이용해 멀티프로세싱을 수행할 수 있다.

multiprocessingthreading 모듈과 유사한 API를 사용하여 프로세스 스포닝(spawning)을 지원하는 패키지입니다. multiprocessing 패키지는 지역과 원격 동시성을 모두 제공하며 스레드 대신 서브 프로세스를 사용하여 전역 인터프리터 록 을 효과적으로 피합니다. 이것 때문에, multiprocessing 모듈은 프로그래머가 주어진 기계에서 다중 프로세서를 최대한 활용할 수 있게 합니다. 유닉스와 윈도우에서 모두 실행됩니다.

- Python 공식 문서에서 multiprocessing 모듈 검색 -

 

프로그래머가 다중 프로세서를 활용할 수 있는 모듈이라고 설명하고 있다.

그러면 유저는 어떻게 이 모듈을 사용할 수 있는가?

1. 파이썬에서 multiprocessing 모둘을 불러온다.

멀티프로세싱 모듈은 pip를 통해 따로 설치해 줄 필요가 없다.

2. pool 객체를 생성한다.

이 객체는 여러 입력 값에 걸쳐 함수의 실행을 병렬 처리하고 입력 데이터를 프로세스에 분산시키는 편리한 방법을 제공합니다(데이터 병렬 처리)

- pool 객체를 공식 문서에서 검색하면 다음과 같이 설명한다. -

 

pool 객체를 이용해 멀티프로세싱을 할 수 있게 객체를 생성해준다.

pool(코어 갯수) 로 만들 수 있다.

3. pool 객체에 작업을 매핑해준다.

pool.map(함수, (인풋 데이터)) 형식으로 작업을 매핑해준다.

코어 갯수로 나누어 작업을 처리하게 된다.

4. 실행

객체를 생성한 뒤, 실제로 속도가 빨라지는지 한번 확인해본다.


(3) 예제를 통한 속도 개선 수준의 확인

직접 만든 예제를 이용하여 속도 개선 수준을 한번 확인해보도록 한다.

다음 코드는 2017년부터 특정 가격(전일종가 -1%) 에 해당되는 주식 종목을 검색하여, 당일 종가기준 수익률이 얼마나 되는지 검색하는 코드이다.

작동 플로우는 일봉데이터DB에서 쿼리 → 조건에 맞는지 확인 → 조건 충족하는 데이터를 수합하여 히스토그램으로 시현 순으로 작동된다.

import numpy  as np
import pandas as pd
import matplotlib.pyplot as plt
import sqlite3
import multiprocessing as mp
import time

class get_pattern():
    def __init__(self, stock_code_list):
        self.db = sqlite3.connect("stock_price(day).db")
        self.stock_code_list = stock_code_list

    ### Query data
    def query_data(self, code):
        ## Pass ETN
        if code[0] == 'Q':
            pass

        cursor = self.db.cursor()
        
        data = cursor.execute("SELECT * FROM %s WHERE date > 20170101" %code)
        cols = [column[0] for column in data.description]
        
        query_data = pd.DataFrame.from_records(data=data.fetchall(), columns=cols)

        return query_data


    ### Search Pattern
    def get_pattern(self, queried_data):
        queried_data['close_D-1'] = queried_data['close'].shift(1) * 0.99
        queried_data['Pattern_Today'] = (queried_data['low'] < queried_data['close_D-1']) & (queried_data['volume'] * queried_data['close'] > 10000000000)
        queried_data['Profit'] = (queried_data['close'] / queried_data['close_D-1'] - 1) * 100

        return queried_data['Profit'][queried_data['Pattern_Today'] == True]
 

    ## run
    def run(self):
        filtered_profit = pd.Series()

        for code in self.stock_code_list:
            history_data = self.query_data(code)
            filtered_data = self.get_pattern(history_data)

            filtered_profit = pd.concat([filtered_profit, filtered_data])

        return filtered_profit
    

# Multiprocessing
def multi_run(code_list):
    get_p = get_pattern(code_list)
    profit = get_p.run()

    return profit


# parallelize dataframe
def parallelize_dataframe(df, func, cores):

    list_split = np.array_split(df, cores)
    pool = mp.Pool(num_cores) ## pool 객체 생성
    parallelized_df = pd.concat(pool.map(func, list_split)) ## 멀티프로세싱 후 데이터프레임 합체

    pool.close()
    pool.join()

    return parallelized_df

if __name__ == '__main__':   
    
    # import stock_code_list
    stock_code_list = pd.read_hdf('code_list.h5')
    
    # 내 PC의 cpu 코어 갯수를 확인한다.
    print(mp.cpu_count())

    ### (1) 멀티프로세싱으로 작동
    ### 멀티프로세싱 (코어 갯수 : 8)
    ###

    start = time.time()  # 시작 시간
    total_profit = parallelize_dataframe(stock_code_list, multi_run, mp.cpu_count()) # storing result
    print("time :", time.time() - start)  # 현재시각 - 시작시간 = 실행 시간

    plt.hist(total_profit, bins=60, range=(-30, 30))
    plt.show()


    ### (2) 시간 비교를 위해 단일 프로세싱으로 작동
    ### 단일 프로세스로 작동
    ###

    start = time.time()  # 시작 시간

    get_p = get_pattern(code_list)
    total_profit = get_p.run()

    print("time :", time.time() - start)  # 현재시각 - 시작시간 = 실행 시간

    plt.hist(total_profit, bins=60, range=(-30, 30))
    plt.show()

* 다음 예제에서는 parallelize_dataframe 함수에서 멀티프로세싱을 수행한다.

* 결과 데이터의 사이즈는 88460행이다.

* 코드를 작동해서 확인해 볼 수는 없으나, 실질적으로 쿼리되는 데이터의 양은 약 25만 행으로 추정한다.

(추출 근거는 17년 1월 1일부터 영업일(약 900일) * 검색종목수(테이블 수 2981개)를 근사치로 설정하여 25만행으로 산출하였다.)

실행을 해보니 결과는 다음과 같았다.

단일 프로세싱 time : 30.910

2코어 멀티 프로세싱 time : 15.925

4코어 멀티 프로세싱 time : 11.130

8코어 멀티 프로세싱 time : 10.087

* 속도의 변화만 보기 위해 평균치는 내지 않았다. 시간 차이가 작동때마다 차이가 발생할 수 있음.

* 8코어는 실제코어가 8코어가 아니고, 하이퍼쓰레딩이 적용되어있어 8개로 인식한다. 실제 코어 수는 4개이다.

 

우리는 이 테스트 코드를 통해 다음과 같은 결과를 도출해 낼 수 있다.

* 2개의 코어로 분산만 해줘도 속도 개선이 확연히 개선되었다.

* 코어를 n배한다고 해서 정확히 속도가 1/n배로는 개선되지 않는다.

따라서 적당한 코어 갯수 설정을 통한 멀티프로세싱을 하면,

데이터 분석 속도를 가시적으로 개선해 볼 수 있을 것이다.

반응형
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기