** 현재 작성중입니다 **

 

종가 매매?

  • 유튜브 등을 참고하여 종가 매매의 패턴은 아래와 같습니다. (장 마감전에 아래와 같은 종목을 빨리 발견해야 합니다)
    • 오늘 기준 09:00~15:10 시간대의 주가가 우상향 할 것
    • 최근 5일 평균 거래액이 10억 이상, 거래량이 5배 이상 증가
    • 최근 3~6개월을 기준으로 전고점을 강하게 돌파할 것

Demo

 

기술적으로는?

  • pFP와 ZIO를 공부한다는 느낌으로 진행합니다. 물론 구현의 난이도는 python이 훨씬 낮습니다.
    • Fiber를 활용하여 대량의 데이터 분석을 병렬로 처리할 수 있는 등 ZIO가 가지는 장점을 활용해 보려고 합니다.
  • 현재 운용중인 https://www.heyes.live 에서 15:00부터 대상 종목 제공하는 것을 목표로 합니다.
    • Web기반의 API를 제공하는 모듈로 배포됩니다.

 

 

크롤링

 

  • 주의할 점은 네이버와 같이 공개된 페이지를 크롤링할 경우 과도하게 요청 트래픽이 발생하지 않도록 1tps 이상의 저속크롤링을 유지해야 된다는 점입니다.
  • 일단은 해당 페이지는 10분 주기로 데이터 수집이 이루어져야 하기 때문에 별도의 스케쥴링 전략이 필요합니다.
    • 우선순위 큐를 만들어 큐에 등록된 seed는 1초 간격으로 하나씩 처리하고 과도하게 pending 상태의 seed job이 발생할 경우 신규 큐를 생성하여 별도의 worker thread가 처리 속도를 높이는 식으로 일단은 구현을 합니다.
    • 아래는 ZIOf로 해당 큐기반의 worker scheduler를 구현한 코드입니다.
abstract class UnitJobScheduler (
  queueRef: Ref[List[Queue[String]]], 
  queueSize: Int = 10) extends JobProducer[String] {

  private def inputDataAndExtendQueue(queue: List[Queue[String]])(effect : => String): ZIO[Any, Nothing, Boolean] = 
    for {
      result <- ZIO.suspendSucceed {
      queue match {
        case Nil => ZIO.succeed(false)
        case x :: xs => 
          for {
            result <- x.offer(effect)
            res <- result match {
              case true => ZIO.succeed(true)
              case false => 
                inputDataAndExtendQueue(xs)(effect)
            }
          } yield res
        }
      }
    } yield result
  
  def processJob(queue: Queue[String], workerId: Int = -1): ZIO[Any, Nothing, Unit]

  // add new job  
  def addJob(job: String) = 
    for {
      queue <- queueRef.get
      addRes <- inputDataAndExtendQueue(queue)(job)
      _ <- addRes match {
        case true => ZIO.unit
        case false => for {
          _ <- Console.printLine("--> create new worker queue ... " + queue.size)
          q <- Queue.dropping[String](queueSize)
          _ <- queueRef.update(_ :+ q)
          _ <- processJob(q, queue.size).fork
        } yield ()
      }
    } yield ()
  
  // produce continouse jobs
  def startProduce() = {
    (for {
      //TODO check this time is whether holiday or not
      data <- unitProduce // ----------------> Producing Job
      _ <- Console.printLine("produce job to crawl: " + data)
      _ <- ZIO.foreach(data){r => addJob(r)}
    } yield ()).repeat(Schedule.spaced(1.seconds)).unit  
  } 
}

 

  • 아래는 위의 코드를 기반으로 Crawler를 구현한 코드입니다.
trait JobProducer[T] {
  def unitProduce: ZIO[Any, Nothing, List[T]]
}

class CrawlJobScheduler(
  queueRef: Ref[List[Queue[String]]], 
  stockRepo: StockRepo, 
  crawler: MinStockCrawler,
  crawlStatus: CrawlStatusRepo) extends UnitJobScheduler(queueRef) {

  override def unitProduce: ZIO[Any, Nothing, List[String]] = (for {
    _ <- ZIO.when(DataUtil.getCurrentTimestamp() != DataUtil.stockTimestamp(0))(
      ZIO.log("Out of stockTime") *> ZIO.fail(new Exception("---> Out of stock time of Today")))
    queue <- queueRef.get
    cntEndItemCodes <- queue.lastOption match {
      case Some(q) => q.size
      case None => ZIO.succeed(0)
    } 
    expiredItemCodes <- crawlStatus.getExpiredItemCode(10 * 60 * 1000 - cntEndItemCodes * 1000)
    _ <- ZIO.foreachPar(expiredItemCodes){itemCode => crawlStatus.syncCrawlStatus(itemCode, "PEND")}
    _ <- ZIO.log("Create target itemCodes : " + expiredItemCodes)
  } yield expiredItemCodes)
  .catchAll(e => ZIO.log(e.getMessage()) *> ZIO.succeed(List.empty[String]))

  override def processJob(queue: Queue[String], workerId: Int = -1): ZIO[Any, Nothing, Unit] = 
    (for {
      itemCode <- queue.take
      cd <- crawler.crawl(itemCode)
      res <- stockRepo.insertStockMinVolumeSerialBulk(cd)
      _ <- crawlStatus.syncCrawlStatus(itemCode, "SUSP")
      _ <- ZIO.foreach(cd){r => Console.printLine(r.toString())}
      _ <- Console.printLine(s"Inserted data ${cd.size} for $itemCode by worker #$workerId").ignore // log
    } yield ()).repeat(Schedule.spaced(1.seconds))
    .provide(
      Client.customized,
      NettyClientDriver.live,
      ZLayer.succeed(NettyConfig.default),
      DnsResolver.default,
      DataDownloader.live,
    )
    .unit.catchAll(e => ZIO.log(e.getMessage()).ignore)
}

 

 

종가패턴 발견

  • 우상향 여부를 판별하고 기울기 값과 오차(Error)범위내의 가격을 count하여 패턴의 일치여부를 찾습니다.
class EndPriceAnalyzerImpl extends EndPriceAnalyzer {

  def analyzeUpstream(data: List[StockMinVolumeTable],  allowedError: Double = 0.25, innerErrorPer: Double = 0.80) = {
        
    val streamedData = ZStream.fromIterable(data).filter(r => {
        val tokens = r.tsCode.split("_")
        tokens.length == 2 && tokens(1) <= "15:30"
      })
    
    (for {
      firstElem <- streamedData.runHead
      lastElem <- streamedData.runLast
      _ <- Console.printLine(s"First : ${firstElem}, end : ${lastElem}")
      delta <- ZIO.attempt(
        lastElem.flatMap(l => firstElem.map(f => l.fixedPrice - f.fixedPrice))
        .getOrElse(0.0)
        )
      _ <- Console.printLine(s"Delta : ${delta}")
      _ <- ZIO.when(delta < 0)(ZIO.fail(new Exception("---> Stock price is not trending upward <--")))
      slope <- ZIO.attempt(delta / (data.length - 1))
      _ <- Console.printLine(s"Slope : ${slope}")

      result <- streamedData.zipWithIndex.runFold(0) { case (inCount, (element, idx)) =>
        val expectedY = firstElem.map(a => a.fixedPrice + slope * idx) 
        val error = expectedY.map(exp => Math.abs(element.fixedPrice - exp) / delta)
        error match {
          case Some(e) if e <= allowedError => 
            inCount + 1
          case x => 
            inCount
        }
      }
      _ <- Console.printLine(s"Result : ${result} / ${data.length}")
      finResult <- ZIO.succeed(result > data.length * innerErrorPer)
    } yield finResult).catchAll(e => Console.printError(e.getLocalizedMessage()) *> ZIO.succeed(false))
    
  }

  override def analyze(stockCode: String, targetDt: String) = for {
    repo <- ZIO.service[StockRepo]
    targetData <- repo.selectStockDataByItemCode(stockCode, targetDt)
    result <- analyzeUpstream(targetData)
  } yield result
  
}

 

'Tech > ZIO' 카테고리의 다른 글

ZIO의 Error 처리  (0) 2024.08.30
Scala ZIO의 error handling - (1)  (0) 2024.05.05

본 문서에 관하여

  • python을 활용하여 crawler와 고전 ml로 주가 회귀 분석 진행
  • 아직 작성중

----

 

KODEX 코스닥150레버리지 상품의 다음날 상승, 하락, 가격을 예측하는 것을 목표로 하였다.

 

아래의 사이트에서 '외국인/기관 순매매 거래량' 2016년 부터의 데이터를 Crawling 하였다.

 

코스닥 전체의 투자자별 매매 동향 또한 동일 기간 Crawling을 하였다.

 

Regression 방식의 머신러닝일 적용하므로 과거의 많은 일 수의 특성의 반영이 필요하여 주요 지표에 대해서 아래와 같이 1, 2, 4주 기준으로 합산, 평균, 표준편차 Feature를 추가하였다.

def createColumns(df , name: str) :
    # print(name + ' newly added ..')
    df[name + '_SUM_D5'] = df[name].rolling(5).sum().shift(1)
    df[name + '_AVG_D5'] = df[name].rolling(5).mean().shift(1)
    df[name + '_STD_D5'] = df[name].rolling(5).std().shift(1)
    df[name + '_SUM_D10'] = df[name].rolling(10).sum().shift(1)
    df[name + '_AVG_D10'] = df[name].rolling(10).mean().shift(1)
    df[name + '_STD_D10'] = df[name].rolling(10).std().shift(1)
    df[name + '_SUM_D20'] = df[name].rolling(20).sum().shift(1)
    df[name + '_AVG_D20'] = df[name].rolling(20).mean().shift(1)
    df[name + '_STD_D20'] = df[name].rolling(20).std().shift(1)


feature_cols = ['VOLUME',
                'COMP_BUY', 'FOR_BUY', 'FOR_CONT', 'FOR_PER', 'PERSONAL',
                'FOREIGNER', 'COMPANY', 'FINANCE', 'INSURANCE', 'TOOSIN', 'BANK',
                'ETC_FIN', 'GOV_FUND', 'ETC_FUND']

for col in feature_cols:
    createColumns(df3, col)

 

다음날 종가를 예측하는게 목표이므로 output은 다음날의 종가로 설정을 하였다.

df3['END_VALUE_PRED_OUT']=df3.END_VALUE.shift(-1)

 

pycaret을 활용하여 모델을 생성하였다.

from pycaret.regression import *

s = setup(data = train, target='DELTA_OUT', session_id=123, normalize=True, normalize_method='zscore')

 

베이스라인 테스트를 진행해 보았다. 예측 정확도가 굉장히 높게 나왔다.

best = compare_models(exclude=['lar'])

 

Feature Importance를 보면 대체로 한달 정도의 기간의 통계 데이터가 결과에 영향을 많이 줌을 확인할 수 있다.

SHAP value로 각 Feature별 공헌 특성을 확인해보자

 

대략적인 해석은,

  • COMP_BUY_STD_D20 : 기관투자자 순매수 20일치 표본표준편차, 음의 기여도를 가진다. 기관투자자 매도하는 시점은 상승으로 이어진다.
  • 은행, 국민연금, 보험, 펀드 등 : 양의 기여도, 즉 주가가 상승할때 이들의 양의 매수도 증가한다 볼 수 있다.
  • 투신 : 대체로 양의 기여도이나 다른 주체와는 달리 선형적이지 않다. 미리 치고 빠지는 느낌
  • 외국인 : 대체로 양의 관계를 보이나 결과 영향도는 미미함.
  • COMP_BUIY_STD_D10 : 20일 값과 유사하게 음의 기여도이나 기여도 자체는 20일과 비교하여 줄어드는 양상, 즉 기관 투자자들은 상승 3~4주 전에 움직인다.
 

LSTM은,

  • 동일 데이터로 실험시 매우 저조한 예측 정확도를 출력
    • Feature Engineering에 추가적인 시간 투자가 필요해 보임
    • 일간 데이터이므로 딥러닝을 의미있는 정확도를 추출하기에는 데이터가 과부족로 판단됨

내일의 예측은 어떻게 할 것인가?

  • 아직은 신의 영역이나 기관투자자들은 최대 한달정도 전부터 움직임, 이것을 Catch 하는 방향으로 접근이 요구됨

https://youtu.be/N6ystK4XoXU

현재 일어나고 있는 이슈를 어떻게 빠르게 파악할 수 있을까?

포털이나 언론사의 뉴스를 하루종일 보고 있기는 힘든 일이다.

특히 뉴스의 빠른 파악이 중요한 주식, 코인 투자자들에겐 더더욱..

 

그래서 그들을 위해서 뉴스를 수집하여 이슈를 요약하여 시각화하는 서비스를 직접 만들어 보기로 하였다.

 

Tech Stack은 Spark, Scala, Slick, Scalatra, Twirl, Infux 등을 사용하였다.

+ Recent posts