** 현재 작성중입니다 **

 

종가 매매?

  • 유튜브 등을 참고하여 종가 매매의 패턴은 아래와 같습니다. (장 마감전에 아래와 같은 종목을 빨리 발견해야 합니다)
    • 오늘 기준 09:00~15:10 시간대의 주가가 우상향 할 것
    • 최근 5일 평균 거래액이 10억 이상, 거래량이 5배 이상 증가
    • 최근 3~6개월을 기준으로 전고점을 강하게 돌파할 것

 

기술적으로는?

  • pFP와 ZIO를 공부한다는 느낌으로 진행합니다. 물론 구현의 난이도는 python이 훨씬 낮습니다.
    • Fiber를 활용하여 대량의 데이터 분석을 병렬로 처리할 수 있는 등 ZIO가 가지는 장점을 활용해 보려고 합니다.
  • 현재 운용중인 https://www.heyes.live 에서 15:00부터 대상 종목 제공하는 것을 목표로 합니다.
    • Web기반의 API를 제공하는 모듈로 배포됩니다.

 

 

크롤링

 

  • 주의할 점은 네이버와 같이 공개된 페이지를 크롤링할 경우 과도하게 요청 트래픽이 발생하지 않도록 1tps 이상의 저속크롤링을 유지해야 된다는 점입니다.
  • 일단은 해당 페이지는 10분 주기로 데이터 수집이 이루어져야 하기 때문에 별도의 스케쥴링 전략이 필요합니다.
    • 우선순위 큐를 만들어 큐에 등록된 seed는 1초 간격으로 하나씩 처리하고 과도하게 pending 상태의 seed job이 발생할 경우 신규 큐를 생성하여 별도의 worker thread가 처리 속도를 높이는 식으로 일단은 구현을 합니다.
    • 아래는 ZIOf로 해당 큐기반의 worker scheduler를 구현한 코드입니다.
abstract class UnitJobScheduler (
  queueRef: Ref[List[Queue[String]]], 
  queueSize: Int = 10) extends JobProducer[String] {

  private def inputDataAndExtendQueue(queue: List[Queue[String]])(effect : => String): ZIO[Any, Nothing, Boolean] = 
    for {
      result <- ZIO.suspendSucceed {
      queue match {
        case Nil => ZIO.succeed(false)
        case x :: xs => 
          for {
            result <- x.offer(effect)
            res <- result match {
              case true => ZIO.succeed(true)
              case false => 
                inputDataAndExtendQueue(xs)(effect)
            }
          } yield res
        }
      }
    } yield result
  
  def processJob(queue: Queue[String], workerId: Int = -1): ZIO[Any, Nothing, Unit]

  // add new job  
  def addJob(job: String) = 
    for {
      queue <- queueRef.get
      addRes <- inputDataAndExtendQueue(queue)(job)
      _ <- addRes match {
        case true => ZIO.unit
        case false => for {
          _ <- Console.printLine("--> create new worker queue ... " + queue.size)
          q <- Queue.dropping[String](queueSize)
          _ <- queueRef.update(_ :+ q)
          _ <- processJob(q, queue.size).fork
        } yield ()
      }
    } yield ()
  
  // produce continouse jobs
  def startProduce() = {
    (for {
      //TODO check this time is whether holiday or not
      data <- unitProduce // ----------------> Producing Job
      _ <- Console.printLine("produce job to crawl: " + data)
      _ <- ZIO.foreach(data){r => addJob(r)}
    } yield ()).repeat(Schedule.spaced(1.seconds)).unit  
  } 
}

 

  • 아래는 위의 코드를 기반으로 Crawler를 구현한 코드입니다.
trait JobProducer[T] {
  def unitProduce: ZIO[Any, Nothing, List[T]]
}

class CrawlJobScheduler(
  queueRef: Ref[List[Queue[String]]], 
  stockRepo: StockRepo, 
  crawler: MinStockCrawler,
  crawlStatus: CrawlStatusRepo) extends UnitJobScheduler(queueRef) {

  override def unitProduce: ZIO[Any, Nothing, List[String]] = (for {
    _ <- ZIO.when(DataUtil.getCurrentTimestamp() != DataUtil.stockTimestamp(0))(
      ZIO.log("Out of stockTime") *> ZIO.fail(new Exception("---> Out of stock time of Today")))
    queue <- queueRef.get
    cntEndItemCodes <- queue.lastOption match {
      case Some(q) => q.size
      case None => ZIO.succeed(0)
    } 
    expiredItemCodes <- crawlStatus.getExpiredItemCode(10 * 60 * 1000 - cntEndItemCodes * 1000)
    _ <- ZIO.foreachPar(expiredItemCodes){itemCode => crawlStatus.syncCrawlStatus(itemCode, "PEND")}
    _ <- ZIO.log("Create target itemCodes : " + expiredItemCodes)
  } yield expiredItemCodes)
  .catchAll(e => ZIO.log(e.getMessage()) *> ZIO.succeed(List.empty[String]))

  override def processJob(queue: Queue[String], workerId: Int = -1): ZIO[Any, Nothing, Unit] = 
    (for {
      itemCode <- queue.take
      cd <- crawler.crawl(itemCode)
      res <- stockRepo.insertStockMinVolumeSerialBulk(cd)
      _ <- crawlStatus.syncCrawlStatus(itemCode, "SUSP")
      _ <- ZIO.foreach(cd){r => Console.printLine(r.toString())}
      _ <- Console.printLine(s"Inserted data ${cd.size} for $itemCode by worker #$workerId").ignore // log
    } yield ()).repeat(Schedule.spaced(1.seconds))
    .provide(
      Client.customized,
      NettyClientDriver.live,
      ZLayer.succeed(NettyConfig.default),
      DnsResolver.default,
      DataDownloader.live,
    )
    .unit.catchAll(e => ZIO.log(e.getMessage()).ignore)
}

 

 

종가패턴 발견

  • 우상향 여부를 판별하고 기울기 값과 오차범위내의 가격을 count하여 패턴의 일치여부를 찾습니다.
  •  
class EndPriceAnalyzerImpl extends EndPriceAnalyzer {

  def analyzeUpstream(data: List[StockMinVolumeTable],  allowedError: Double = 0.25, innerErrorPer: Double = 0.80) = {
        
    val streamedData = ZStream.fromIterable(data).filter(r => {
        val tokens = r.tsCode.split("_")
        tokens.length == 2 && tokens(1) <= "15:30"
      })
    
    (for {
      firstElem <- streamedData.runHead
      lastElem <- streamedData.runLast
      _ <- Console.printLine(s"First : ${firstElem}, end : ${lastElem}")
      delta <- ZIO.attempt(
        lastElem.flatMap(l => firstElem.map(f => l.fixedPrice - f.fixedPrice))
        .getOrElse(0.0)
        )
      _ <- Console.printLine(s"Delta : ${delta}")
      _ <- ZIO.when(delta < 0)(ZIO.fail(new Exception("---> Stock price is not trending upward <--")))
      slope <- ZIO.attempt(delta / (data.length - 1))
      _ <- Console.printLine(s"Slope : ${slope}")

      result <- streamedData.zipWithIndex.runFold(0) { case (inCount, (element, idx)) =>
        val expectedY = firstElem.map(a => a.fixedPrice + slope * idx) 
        val error = expectedY.map(exp => Math.abs(element.fixedPrice - exp) / delta)
        error match {
          case Some(e) if e <= allowedError => 
            inCount + 1
          case x => 
            inCount
        }
      }
      _ <- Console.printLine(s"Result : ${result} / ${data.length}")
      finResult <- ZIO.succeed(result > data.length * innerErrorPer)
    } yield finResult).catchAll(e => Console.printError(e.getLocalizedMessage()) *> ZIO.succeed(false))
    
  }

  override def analyze(stockCode: String, targetDt: String) = for {
    repo <- ZIO.service[StockRepo]
    targetData <- repo.selectStockDataByItemCode(stockCode, targetDt)
    result <- analyzeUpstream(targetData)
  } yield result
  
}

 

'Tech > ZIO' 카테고리의 다른 글

ZIO의 Error 처리  (0) 2024.08.30
Scala ZIO의 error handling - (1)  (0) 2024.05.05

기반 개념

  • Local Reasonning = type signature describes the kind of computation that will be performed
  • Referential transparency = ability to replace an expression with the value that it evaluate to

Effect 

  • the type signature describes what kind of computation it will perform
  • the type signature describes the type of VALUE that it will produce
  • if side effect are required, construction must be separate from the EXECUTION
  • Scala Error Handling의 기본 컨셉은 선언적 처리(Declarative Error Handling)으로 기존 try-catch의 명시적인 방식(Imperative Error Handling)과는 방식이 다르다.
  • 선언적 에러 처리의 장점은 아래와 같다.
    • 기존 try-catch 방식은 경우에 따라 error의 발생 원인과 try-catch로 인한 로직의 변경 추적이 어렵다. 반면 선언적 처리는 참조 무결성(referentially transparent)이 보장된다.
    • Type-safety : 함수의 정의시 return type 뿐만 아니라 어떤 type의 error로 실패하는지 알 수 있게 되어 compile time에 type-safety에 대한 점검이 가능하다.
    • Exhaustivity Checking(에러 체크의 완전성) : 컴파일 타임에 반드시 처리해야 하는 error handling 여부에 대한 검사가 가능하다.
    • Error Model : ZIO는 Exit, Cause와 같은 에러의 정보를 담고있는 모델을 자체적으로 제공한다. 에러의 유실을 막을 수 있다.
 try {
    try throw new Error("e1")
    finally throw new Error("e2")
 } catch {
   case e: Error => println(e)
 }

// Output:
// e2

---

ZIO.fail("e1")
  .ensuring(ZIO.succeed(throw new Exception("e2")))
  .catchAll {
    case "e1" => Console.printLine("e1")
    case "e2" => Console.printLine("e2")
  }

// Output:
// e1

 

  • ZIO 전용의 Error를 저장 관리하기 위한 객체인 Cause에 대한 이해가 필수적이다.
    • https://zio.dev/reference/core/cause
    • It allows us to take a base type E that represents the error type and then capture the sequential and parallel composition of errors in a fully lossless fashion.
    • (번역) Cause는 에러(Exception)의 class type을 나타내는 E를 획득할 수 있게 하고, 에러의 누락 방직 차원에서 순차적인 혹은 병렬적인 에러의 포착을 제공한다.
    • Cause의 내부 구조는 다음과 같다
      • Empty : Cause의 초기상태로 ZIO.succeed(5)와 같이 error가 없는 상태를 의미한다.
      • Fail : expected error의 type E를 의미한다.
      • Die : defect(unexpected error)의 type E를 의미한다.
      • Iterrupt : fiber 등의 멀티 쓰레드 환경에서 interruption을 의미한다.
      • Stackless : stack trace 와 excution trace 정보를 담고 있다. (stack trace의 노출 레벨이 Die와는 다르다)
      • Both : 병렬 프로그램밍 상에서 2개 이상의 error가 발생하였을때의 정보를 담고 있다.
      • Then : 순차적 에러가 발생했을때 Error 객체 저장(고전 try-catch 모델 혹은 fail.ensuring 등)
sealed abstract class Cause[+E] extends Product with Serializable { self =>
  import Cause._
  def trace: Trace = ???

  final def ++[E1 >: E](that: Cause[E1]): Cause[E1] = Then(self, that)
  final def &&[E1 >: E](that: Cause[E1]): Cause[E1] = Both(self, that)
}

object Cause extends Serializable {
  case object Empty extends Cause[Nothing]
  final case class Fail[+E](value: E, override val trace: Trace) extends Cause[E]
  final case class Die(value: Throwable, override val trace: Trace) extends Cause[Nothing]
  final case class Interrupt(fiberId: FiberId, override val trace: Trace) extends Cause[Nothing]
  final case class Stackless[+E](cause: Cause[E], stackless: Boolean) extends Cause[E]
  final case class Then[+E](left: Cause[E], right: Cause[E]) extends Cause[E]
  final case class Both[+E](left: Cause[E], right: Cause[E]) extends Cause[E]
}
  • ZIO상에서 에러의 처리를 위해서는 아래의 관련 함수를 숙지해야 한다.
    • .catchAll
    • .catchSome
    • .either / .absolve
      • [R, E, A] --> [R, Either[E, A]]
      • [R, Nothing, Either[E, A]] --> [R, E, A]
    • .absorb  // defect to Failures (recover from both Die and Interruption)
      • [Any, Nothing, Nothing] --> [Any, Throwable, Nothing]
    • .resurrent // defect to Failures (recover from only from Die) 
      • [Any, Nothing, Nothing] --> [Any, Throwable, Nothing]
    •  .orDie
      • [R, Throwable, A] --> [R, Nothing, A]
    • .refineOrDie // defect to Failures (narraw down the type of the error channel from E)
      • [R, Throwable, A] --> [R, IOException, A]
    • .unrefine // defect to Failures (broadens the type of the error channel from E to the E1 and embeds some defects into it)
      • [R, Nothing, A] --> [R, E, A]
    • .sandbox (.unsandbox) // defect to Failures
      • [R, Nothing, A] --> [R, Cause[E], A]
    • .ensuring : catch block과 유사한 역할
      • [R, E, A] --> [R, ???, A]
    • .some
      • [R, E, Option[A]] --> [R, Option[E], A]
    • .cause (.uncause)
      • [R, E, A] --> [R, Nothing, Cause[E]]
    • .merge
      • [R, E, A] --> [R, super(A|E)]
    • .reject : success chanel의 값의 일부를 fail channel로 처리
      • [R, E, A] --> [R, E, A]

 

 

 

'Tech > ZIO' 카테고리의 다른 글

Scala ZIO로 구현해본 종가매매 종목 추출 자동화  (0) 2025.01.15
Scala ZIO의 error handling - (1)  (0) 2024.05.05

본 문서에 관하여

  • python을 활용하여 crawler와 고전 ml로 주가 회귀 분석 진행
  • 아직 작성중

----

 

KODEX 코스닥150레버리지 상품의 다음날 상승, 하락, 가격을 예측하는 것을 목표로 하였다.

 

아래의 사이트에서 '외국인/기관 순매매 거래량' 2016년 부터의 데이터를 Crawling 하였다.

 

코스닥 전체의 투자자별 매매 동향 또한 동일 기간 Crawling을 하였다.

 

Regression 방식의 머신러닝일 적용하므로 과거의 많은 일 수의 특성의 반영이 필요하여 주요 지표에 대해서 아래와 같이 1, 2, 4주 기준으로 합산, 평균, 표준편차 Feature를 추가하였다.

def createColumns(df , name: str) :
    # print(name + ' newly added ..')
    df[name + '_SUM_D5'] = df[name].rolling(5).sum().shift(1)
    df[name + '_AVG_D5'] = df[name].rolling(5).mean().shift(1)
    df[name + '_STD_D5'] = df[name].rolling(5).std().shift(1)
    df[name + '_SUM_D10'] = df[name].rolling(10).sum().shift(1)
    df[name + '_AVG_D10'] = df[name].rolling(10).mean().shift(1)
    df[name + '_STD_D10'] = df[name].rolling(10).std().shift(1)
    df[name + '_SUM_D20'] = df[name].rolling(20).sum().shift(1)
    df[name + '_AVG_D20'] = df[name].rolling(20).mean().shift(1)
    df[name + '_STD_D20'] = df[name].rolling(20).std().shift(1)


feature_cols = ['VOLUME',
                'COMP_BUY', 'FOR_BUY', 'FOR_CONT', 'FOR_PER', 'PERSONAL',
                'FOREIGNER', 'COMPANY', 'FINANCE', 'INSURANCE', 'TOOSIN', 'BANK',
                'ETC_FIN', 'GOV_FUND', 'ETC_FUND']

for col in feature_cols:
    createColumns(df3, col)

 

다음날 종가를 예측하는게 목표이므로 output은 다음날의 종가로 설정을 하였다.

df3['END_VALUE_PRED_OUT']=df3.END_VALUE.shift(-1)

 

pycaret을 활용하여 모델을 생성하였다.

from pycaret.regression import *

s = setup(data = train, target='DELTA_OUT', session_id=123, normalize=True, normalize_method='zscore')

 

베이스라인 테스트를 진행해 보았다. 예측 정확도가 굉장히 높게 나왔다.

best = compare_models(exclude=['lar'])

 

Feature Importance를 보면 대체로 한달 정도의 기간의 통계 데이터가 결과에 영향을 많이 줌을 확인할 수 있다.

SHAP value로 각 Feature별 공헌 특성을 확인해보자

 

대략적인 해석은,

  • COMP_BUY_STD_D20 : 기관투자자 순매수 20일치 표본표준편차, 음의 기여도를 가진다. 기관투자자 매도하는 시점은 상승으로 이어진다.
  • 은행, 국민연금, 보험, 펀드 등 : 양의 기여도, 즉 주가가 상승할때 이들의 양의 매수도 증가한다 볼 수 있다.
  • 투신 : 대체로 양의 기여도이나 다른 주체와는 달리 선형적이지 않다. 미리 치고 빠지는 느낌
  • 외국인 : 대체로 양의 관계를 보이나 결과 영향도는 미미함.
  • COMP_BUIY_STD_D10 : 20일 값과 유사하게 음의 기여도이나 기여도 자체는 20일과 비교하여 줄어드는 양상, 즉 기관 투자자들은 상승 3~4주 전에 움직인다.
 

LSTM은,

  • 동일 데이터로 실험시 매우 저조한 예측 정확도를 출력
    • Feature Engineering에 추가적인 시간 투자가 필요해 보임
    • 일간 데이터이므로 딥러닝을 의미있는 정확도를 추출하기에는 데이터가 과부족로 판단됨

내일의 예측은 어떻게 할 것인가?

  • 아직은 신의 영역이나 기관투자자들은 최대 한달정도 전부터 움직임, 이것을 Catch 하는 방향으로 접근이 요구됨

본 문서에 관하여

  • ZIO의 error handling 방식을 함수 위주로 정리하여 실제 Scala에서 error handling 전략을 살펴본다

ZIO는 기본적으로 아래와 같이 Input, Error, Output Type을 정의해야 한다.

ZIO[R, E, A]

Java와 다른 점은 실질적인 Return type이 Error와 실제 Value 를 정의한다는 것이다.

 

아래는 Error를 명시적으로 구현한 코드라 볼 수 있다.

val aFailedZIO: IO[String, Nothing] = ZIO.fail("Something went wrong")
val failedWithThroable: IO[RuntimeException, Nothing] = ZIO.fail(new RuntimeException("Bumb!"))
// RuntimeException의 Error Type을 String으로 변환
val failWithDescription: ZIO[Any, String, Nothing] = failedWithThroable.mapError(_.getMessage)

 

그러면 Java의 try-catch 처럼 명시적 혹은 비명시적으로 발생한 Error를 어떻게 처리할 것인가?

// 잘못된 사용 예시
val badZIO: ZIO[Any, Nothing, RuntimeFlags] = ZIO.succeed {
    println("Trying something")
    val string: String = null
    string.length
  }

// attempt 함수를 사용하여 Throwable을 error를 처리할 수 있다.
val anAttempt: ZIO[Any, Throwable, Int] = ZIO.attempt {
    println("Trying something")
    val string: String = null
    string.length
}

// catchAll과 catchSome을 활용한 catch error
val catchError: ZIO[Any, Throwable, Any] = 
	anAttempt.catchAll(a => ZIO.attempt(s"Returning a different value because $a"))

val catchServiceErrors: ZIO[Any, Throwable, Any] = anAttempt.catchSome {
    case e: RuntimeException => ZIO.succeed(s"Ignoring runtime excep[tion: $e")
    case _ => ZIO.succeed("Ignoring everything else")
}

 

 위의 코드와 같이 성공인지 실패인지 알 수 없는 코드를 실행하는 경우 ZIO.attempt 를 활용하고,

catchAll과 catchSome을 활용하여 발생한 exception에 전체에 대해 처리가 가능하다.

 

Scala의 일반적은 Error 처리 기법인 Try, Either, Option 등이 ZIO에도 동일하게 사용된다.

// Option / Try / Either to ZIO
  val aTryToZIO: ZIO[Any, Throwable, Int] = ZIO.fromTry(Try(42 / 0))

  // either -> ZIO
  val anEither: Either[Int, String] = Right("Success!")
  val anEitherToZIO: ZIO[Any, Int, String] = ZIO.fromEither(anEither)

  // ZIO -> ZIO with Either as the value channel
  val eitherZIO: URIO[Any, Either[Throwable, Int]] = anAttempt.either

  // reserve
  val anAttempt_v2 = eitherZIO.absolve

  // option -> ZIO
  val anOption: ZIO[Any, Option[Nothing], Int] = ZIO.fromOption(Some(42))

  // implements
  def try2ZIO[A](aTry: Try[A]): Task[A] = aTry match {
    case Failure(exception) => ZIO.fail(exception)
    case Success(value) => ZIO.succeed(value)
  }

  def either2ZIO[A, B](anEither: Either[A, B]): ZIO[Any, A, B] = anEither match {
    case Left(value) => ZIO.fail(value)
    case Right(value) => ZIO.succeed(value)
  }

  def option2ZIO[A](anOption: Option[A]): ZIO[Any, Option[Nothing], A] = anOption match {
    case Some(value) => ZIO.succeed(value)
    case None => ZIO.fail(None)
  }

  def zio2zioEither[R, A, B](zio: ZIO[R, A, B]): ZIO[R, Nothing, Either[A, B]] = zio.foldZIO(
    error => ZIO.succeed(Left(error)),
    value => ZIO.succeed(Right(value))
  )

  def absolveZIO[R, A, B](zio: ZIO[R, Nothing, Either[A, B]]): ZIO[R, A, B] = zio.flatMap {
    case Left(e) => ZIO.fail(e)
    case Right(v) => ZIO.succeed(v)
  }
 
 

계속..

'Tech > ZIO' 카테고리의 다른 글

Scala ZIO로 구현해본 종가매매 종목 추출 자동화  (0) 2025.01.15
ZIO의 Error 처리  (0) 2024.08.30

동기

  • Monad를 규명하고 싶은자들에게 필수적인 내용을 담고 있다.

FlatMap의 Signature

  • flatMap 함수하나를 포함하고 있다.
trait CustomFlatMap[M[_]] {
  def flatMap[A, B](ma: M[A])(f: A => M[B]): M[B]
}

 

활용 For comprehension

  import cats.FlatMap
  import cats.syntax.flatMap._
  import cats.syntax.functor._

  def getPairs[M[_] : FlatMap](numbers: M[Int], chars: M[Char]): M[(Int, Char)] = for {
    n <- numbers
    c <- chars
  } yield (n, c)

  // Generalized
  def getPairs[M[_] : FlatMap, A, B](ma: M[A], mb: M[B]): M[(A, B)] = for {
    n <- ma
    c <- mb
  } yield (n, c)

 

타 Class와의 관계 규명

  • Apply(Applicative)의 특성을 가진다.
  • Apply에서 제공하는 map함수와 flatMap 함수를 조합하여 Apply.ap 함수를 구현할 수 있다.
  trait MyFlatMap[M[_]] extends Apply[M] {
    def flatMap[A, B](ma: M[A])(f: A => M[B]): M[B]
    def ap[A, B](wf: M[A => B])(wa: M[A]): M[B] =
      flatMap(wa)(a => map(wf)(f => f(a)))
  }

 

Monad의 한 부분

  • 아래와 같은 구조(pure, map, flatMap 조합)로 Monad의 완전체를 구현할 수  있다.
  trait MyMonad[M[_]] extends Applicative[M] with CustomFlatMap[M] {
    override def map[A, B](ma: M[A])(f: A=>B): M[B] =
      flatMap(ma)(x => pure(f(x)))
  }
 

 

Monad를 위한 정리

  • 지금까지의 Type Class와 Monad는 아래와 같은 구조를 이룬다. (하위 Monad가 상위 모든 클래스의 특징을 가진다)
  • StarUML은 Higher Kinded Type 표기를 지원하지 않아 Class Diagram상 함수명만 표기하였다.

+ Recent posts