** 현재 작성중입니다 **

 

종가 매매?

  • 유튜브 등을 참고하여 종가 매매의 패턴은 아래와 같습니다. (장 마감전에 아래와 같은 종목을 빨리 발견해야 합니다)
    • 오늘 기준 09:00~15:10 시간대의 주가가 우상향 할 것
    • 최근 5일 평균 거래액이 10억 이상, 거래량이 5배 이상 증가
    • 최근 3~6개월을 기준으로 전고점을 강하게 돌파할 것

 

기술적으로는?

  • pFP와 ZIO를 공부한다는 느낌으로 진행합니다. 물론 구현의 난이도는 python이 훨씬 낮습니다.
    • Fiber를 활용하여 대량의 데이터 분석을 병렬로 처리할 수 있는 등 ZIO가 가지는 장점을 활용해 보려고 합니다.
  • 현재 운용중인 https://www.heyes.live 에서 15:00부터 대상 종목 제공하는 것을 목표로 합니다.
    • Web기반의 API를 제공하는 모듈로 배포됩니다.

 

 

크롤링

 

  • 주의할 점은 네이버와 같이 공개된 페이지를 크롤링할 경우 과도하게 요청 트래픽이 발생하지 않도록 1tps 이상의 저속크롤링을 유지해야 된다는 점입니다.
  • 일단은 해당 페이지는 10분 주기로 데이터 수집이 이루어져야 하기 때문에 별도의 스케쥴링 전략이 필요합니다.
    • 우선순위 큐를 만들어 큐에 등록된 seed는 1초 간격으로 하나씩 처리하고 과도하게 pending 상태의 seed job이 발생할 경우 신규 큐를 생성하여 별도의 worker thread가 처리 속도를 높이는 식으로 일단은 구현을 합니다.
    • 아래는 ZIOf로 해당 큐기반의 worker scheduler를 구현한 코드입니다.
abstract class UnitJobScheduler (
  queueRef: Ref[List[Queue[String]]], 
  queueSize: Int = 10) extends JobProducer[String] {

  private def inputDataAndExtendQueue(queue: List[Queue[String]])(effect : => String): ZIO[Any, Nothing, Boolean] = 
    for {
      result <- ZIO.suspendSucceed {
      queue match {
        case Nil => ZIO.succeed(false)
        case x :: xs => 
          for {
            result <- x.offer(effect)
            res <- result match {
              case true => ZIO.succeed(true)
              case false => 
                inputDataAndExtendQueue(xs)(effect)
            }
          } yield res
        }
      }
    } yield result
  
  def processJob(queue: Queue[String], workerId: Int = -1): ZIO[Any, Nothing, Unit]

  // add new job  
  def addJob(job: String) = 
    for {
      queue <- queueRef.get
      addRes <- inputDataAndExtendQueue(queue)(job)
      _ <- addRes match {
        case true => ZIO.unit
        case false => for {
          _ <- Console.printLine("--> create new worker queue ... " + queue.size)
          q <- Queue.dropping[String](queueSize)
          _ <- queueRef.update(_ :+ q)
          _ <- processJob(q, queue.size).fork
        } yield ()
      }
    } yield ()
  
  // produce continouse jobs
  def startProduce() = {
    (for {
      //TODO check this time is whether holiday or not
      data <- unitProduce // ----------------> Producing Job
      _ <- Console.printLine("produce job to crawl: " + data)
      _ <- ZIO.foreach(data){r => addJob(r)}
    } yield ()).repeat(Schedule.spaced(1.seconds)).unit  
  } 
}

 

  • 아래는 위의 코드를 기반으로 Crawler를 구현한 코드입니다.
trait JobProducer[T] {
  def unitProduce: ZIO[Any, Nothing, List[T]]
}

class CrawlJobScheduler(
  queueRef: Ref[List[Queue[String]]], 
  stockRepo: StockRepo, 
  crawler: MinStockCrawler,
  crawlStatus: CrawlStatusRepo) extends UnitJobScheduler(queueRef) {

  override def unitProduce: ZIO[Any, Nothing, List[String]] = (for {
    _ <- ZIO.when(DataUtil.getCurrentTimestamp() != DataUtil.stockTimestamp(0))(
      ZIO.log("Out of stockTime") *> ZIO.fail(new Exception("---> Out of stock time of Today")))
    queue <- queueRef.get
    cntEndItemCodes <- queue.lastOption match {
      case Some(q) => q.size
      case None => ZIO.succeed(0)
    } 
    expiredItemCodes <- crawlStatus.getExpiredItemCode(10 * 60 * 1000 - cntEndItemCodes * 1000)
    _ <- ZIO.foreachPar(expiredItemCodes){itemCode => crawlStatus.syncCrawlStatus(itemCode, "PEND")}
    _ <- ZIO.log("Create target itemCodes : " + expiredItemCodes)
  } yield expiredItemCodes)
  .catchAll(e => ZIO.log(e.getMessage()) *> ZIO.succeed(List.empty[String]))

  override def processJob(queue: Queue[String], workerId: Int = -1): ZIO[Any, Nothing, Unit] = 
    (for {
      itemCode <- queue.take
      cd <- crawler.crawl(itemCode)
      res <- stockRepo.insertStockMinVolumeSerialBulk(cd)
      _ <- crawlStatus.syncCrawlStatus(itemCode, "SUSP")
      _ <- ZIO.foreach(cd){r => Console.printLine(r.toString())}
      _ <- Console.printLine(s"Inserted data ${cd.size} for $itemCode by worker #$workerId").ignore // log
    } yield ()).repeat(Schedule.spaced(1.seconds))
    .provide(
      Client.customized,
      NettyClientDriver.live,
      ZLayer.succeed(NettyConfig.default),
      DnsResolver.default,
      DataDownloader.live,
    )
    .unit.catchAll(e => ZIO.log(e.getMessage()).ignore)
}

 

 

종가패턴 발견

  • 우상향 여부를 판별하고 기울기 값과 오차범위내의 가격을 count하여 패턴의 일치여부를 찾습니다.
  •  
class EndPriceAnalyzerImpl extends EndPriceAnalyzer {

  def analyzeUpstream(data: List[StockMinVolumeTable],  allowedError: Double = 0.25, innerErrorPer: Double = 0.80) = {
        
    val streamedData = ZStream.fromIterable(data).filter(r => {
        val tokens = r.tsCode.split("_")
        tokens.length == 2 && tokens(1) <= "15:30"
      })
    
    (for {
      firstElem <- streamedData.runHead
      lastElem <- streamedData.runLast
      _ <- Console.printLine(s"First : ${firstElem}, end : ${lastElem}")
      delta <- ZIO.attempt(
        lastElem.flatMap(l => firstElem.map(f => l.fixedPrice - f.fixedPrice))
        .getOrElse(0.0)
        )
      _ <- Console.printLine(s"Delta : ${delta}")
      _ <- ZIO.when(delta < 0)(ZIO.fail(new Exception("---> Stock price is not trending upward <--")))
      slope <- ZIO.attempt(delta / (data.length - 1))
      _ <- Console.printLine(s"Slope : ${slope}")

      result <- streamedData.zipWithIndex.runFold(0) { case (inCount, (element, idx)) =>
        val expectedY = firstElem.map(a => a.fixedPrice + slope * idx) 
        val error = expectedY.map(exp => Math.abs(element.fixedPrice - exp) / delta)
        error match {
          case Some(e) if e <= allowedError => 
            inCount + 1
          case x => 
            inCount
        }
      }
      _ <- Console.printLine(s"Result : ${result} / ${data.length}")
      finResult <- ZIO.succeed(result > data.length * innerErrorPer)
    } yield finResult).catchAll(e => Console.printError(e.getLocalizedMessage()) *> ZIO.succeed(false))
    
  }

  override def analyze(stockCode: String, targetDt: String) = for {
    repo <- ZIO.service[StockRepo]
    targetData <- repo.selectStockDataByItemCode(stockCode, targetDt)
    result <- analyzeUpstream(targetData)
  } yield result
  
}

 

'Tech > ZIO' 카테고리의 다른 글

ZIO의 Error 처리  (0) 2024.08.30
Scala ZIO의 error handling - (1)  (0) 2024.05.05

+ Recent posts