멍청멍청기록/에러 일기

파이썬] 키움증권 Open API OPT10079 주식차트조회

  • -
728x90
반응형

내용

키움증권 Open API를 이욯해서 틱 차트를 조회하는 로직을 알아보고 있었다

구글 검색 시 나오는 코드를 복붙 후 이용해서 종목 테스트를 진행하는 중에 뭔가 이상함을 발견했다

 

테스트 환경

윈도우 11

vscode & python 3 & anaconda 32bit

종목명 : 대한화섬 = "003830"

 

테스트 결과

파이썬 로직 : 25개 데이터

키움증권 Open API : 27개 데이터

 

2개의 데이터가 누락이 되서 나오고 있었다

처음에는 다량의 종목을 한번에 조회하고 있어서 데이터가 누락되는지 몰랐는데 거래량이 적은 종목으로 테스트를 해보니 확연히 알 수 있었다.

 

원인은 복붙한 코드 중 for문에서 체결시간을 가져와서 index로 사용하고 있는데 이 부분이 문제가 되는 것 같았다.

 

아래 코드는 정상적인 데이터가 나오게 수정을 한 코드이다.

 

import sys
from PyQt5.QtWidgets import *
from PyQt5.QAxContainer import *
from PyQt5.QtCore import *
import pandas as pd
from datetime import *
import time
from ItemList import ItemList
import pprint

# 초당 조회 횟수를 회피하기 위한 대기시간 지정
TIME_TERM = 0.5

class Main(QAxWidget):
    def __init__(self):
        super().__init__()
        self.scrno = '1000'
        self._create_kiwoom_instance()
        self._set_signal_slots()
        self.login_event_loop = None
        self.tr_event_loop = None
        self.sPrevNext = None
        self.end_date = None
        self.start_time = None
        self.tr = False
        self.dataframes = []

    def gen_scrno(self):
        self.scrno = str(int(self.scrno) + 1)
        return self.scrno

    def _create_kiwoom_instance(self):
        self.setControl("KHOPENAPI.KHOpenAPICtrl.1")

    def _set_signal_slots(self):
        self.OnEventConnect.connect(self._event_connect)
        self.OnReceiveTrData.connect(self._receive_tr_data)

    def comm_connect(self):
        self.dynamicCall("CommConnect()")
        self.login_event_loop = QEventLoop()
        self.login_event_loop.exec_()

    def _event_connect(self, nErrCode):
        if nErrCode == 0:
            print('로그인완료')
            print('=' * 50)
        self.login_event_loop.exit()

    def set_input_value(self, id, value):
        self.dynamicCall("SetInputValue(QString, QString)", id, value)

    def comm_rq_data(self, rqname, trcode, next, screen_no):
        self.dynamicCall("CommRqData(QString, QString, int, QString", rqname, trcode, next, screen_no)
        self.tr_event_loop = QEventLoop()
        self.tr_event_loop.exec_()

    def _comm_get_data(self, code, real_type, field_name, index, item_name):
        ret = self.dynamicCall("CommGetData(QString, QString, QString, int, QString", code,
                               real_type, field_name, index, item_name)
        return ret.strip()
    
    def _comm_get_big_data(self, sTrCode, sRQName):
        ret = self.dynamicCall(
                "GetCommDataEx(QString, QString)", sTrCode, sRQName)
        return ret
        
    def _get_repeat_cnt(self, trcode, rqname):
        ret = self.dynamicCall("GetRepeatCnt(QString, QString)", trcode, rqname)
        return ret

    def _receive_tr_data(self, sScrNo, sRQName, sTrCode, sRecordName, sPrevNext, nDataLength, sErrorCode, sMessage, sSplmMsg):
        print('TR_Message:', sScrNo, sRQName, sTrCode, sRecordName, sPrevNext, nDataLength, sErrorCode, sMessage, sSplmMsg)
        self.sPrevNext = sPrevNext

        if sRQName == "opt10079_req":
            self._opt10080(sRQName, sTrCode)

        try:
            self.tr_event_loop.exit()
        except AttributeError:
            pass

    def req_tick_chart(self, code):
        # 최초조회
        time.sleep(TIME_TERM)
        # self.tr = True
        self.set_input_value("종목코드", code)
        self.set_input_value("틱범위", "1")
        self.set_input_value("수정주가구분", "0")
        self.start_time = time.time()  # 시작 시간 기록
        self.comm_rq_data(f'opt10079_req', "opt10079", '0', self.gen_scrno())

        
    def _opt10080(self, rqname, trcode):
        # 조회된 데이터에서 종목코드를 가져옴 (싱글데이터)
        code = self._comm_get_data(trcode, "", rqname, 0, "종목코드")

        # 전체 데이터 개수 조회
        data_cnt = self._get_repeat_cnt(trcode, rqname)
        print('전체 데이터 갯수 : ', data_cnt)
        print('-' * 50)

        # 조회된 데이터 갯수 만큼 반복해서 데이터를 가져온 후 딕셔너리에 저장 (멀티데이터)
        total_ret = []
        for i in range(data_cnt):
            ret = {key: self._comm_get_data(trcode, "", rqname, i, key) for key in ['현재가', '거래량', '체결시간']}
            total_ret.append(ret)
                       
        # pprint.pprint(total_ret)

        # 딕셔너리를 DataFrame으로 변환 / 컬럼명 변경 / datetime 컬럼 생성
        df = pd.DataFrame(total_ret)
        df.columns = ['현재가', '거래량', '체결시간']
        
        df['체결시간'] = pd.to_datetime(df['체결시간'], format="%Y%m%d%H%M%S")

        # 최종 조회일자 : 해당 일자이전 기간이 조회데이터에 포함되면 조회를 멈춤
        if df['체결시간'].iat[-1] < self.end_date:
            df = df[df['체결시간'] >= self.end_date]
            self.sPrevNext = None   
       
        df['code'] = code
        df = df.reset_index(drop=True)
        self.dataframes.append(df)
        
        print(df)
        self.getTime()  # 데이터 가져오는 데 걸린 시간 출력
        
    def getTime(self):
        end_time = time.time()
        elapsed_time = end_time - self.start_time
        result = timedelta(seconds=elapsed_time)
        print('-' * 50)
        print('데이터 조회 시간 : ', result)
        print('-' * 50)

    # 데이터 파일로 저장
    def save_data(self):
        combined_df = pd.concat(self.dataframes, ignore_index=True)
        combined_df.to_csv('C:/Users/최하준/Documents/all_data.csv', index=False)

        
if __name__ == "__main__":
    app = QApplication(sys.argv)

    main = Main()
    main.comm_connect()
    main.end_date = datetime(2024, 3, 11)  # 현재부터 해당 날짜까지 조회

    start_time_total = time.time()

    code_list = [item.value for item in ItemList]

    # with ThreadPoolExecutor(max_workers=MAX_THREAD) as executor:
    for code in code_list:
        main.req_tick_chart(code)
    
    end_time_total = time.time()

    elapsed_time_total = end_time_total - start_time_total
    result_total = timedelta(seconds=elapsed_time_total)
    print("전체 데이터 조회 시간 : ", result_total)

    main.save_data()
반응형
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.