** 현재 작성중입니다 **
종가 매매?
- 유튜브 등을 참고하여 종가 매매의 패턴은 아래와 같습니다. (장 마감전에 아래와 같은 종목을 빨리 발견해야 합니다)
- 오늘 기준 09:00~15:10 시간대의 주가가 우상향 할 것
- 최근 5일 평균 거래액이 10억 이상, 거래량이 5배 이상 증가
- 최근 3~6개월을 기준으로 전고점을 강하게 돌파할 것
기술적으로는?
- pFP와 ZIO를 공부한다는 느낌으로 진행합니다. 물론 구현의 난이도는 python이 훨씬 낮습니다.
- Fiber를 활용하여 대량의 데이터 분석을 병렬로 처리할 수 있는 등 ZIO가 가지는 장점을 활용해 보려고 합니다.
- 현재 운용중인 https://www.heyes.live 에서 15:00부터 대상 종목 제공하는 것을 목표로 합니다.
- Web기반의 API를 제공하는 모듈로 배포됩니다.
크롤링
- 네이버 Pay증권의 화면을 crawling 하여 필요한 정보를 추출후 db에 저장하는 형태가 됩니다.
- 주어진 url인 https://finance.naver.com/item/sise_time.naver?code=000660&thistime=20250115124308&page=1 는 10분 단위로 분당 거래량, 체결가 등의 정보가 제공됩니다.
- 주의할 점은 네이버와 같이 공개된 페이지를 크롤링할 경우 과도하게 요청 트래픽이 발생하지 않도록 1tps 이상의 저속크롤링을 유지해야 된다는 점입니다.
- 일단은 해당 페이지는 10분 주기로 데이터 수집이 이루어져야 하기 때문에 별도의 스케쥴링 전략이 필요합니다.
- 우선순위 큐를 만들어 큐에 등록된 seed는 1초 간격으로 하나씩 처리하고 과도하게 pending 상태의 seed job이 발생할 경우 신규 큐를 생성하여 별도의 worker thread가 처리 속도를 높이는 식으로 일단은 구현을 합니다.
- 아래는 ZIOf로 해당 큐기반의 worker scheduler를 구현한 코드입니다.
abstract class UnitJobScheduler (
queueRef: Ref[List[Queue[String]]],
queueSize: Int = 10) extends JobProducer[String] {
private def inputDataAndExtendQueue(queue: List[Queue[String]])(effect : => String): ZIO[Any, Nothing, Boolean] =
for {
result <- ZIO.suspendSucceed {
queue match {
case Nil => ZIO.succeed(false)
case x :: xs =>
for {
result <- x.offer(effect)
res <- result match {
case true => ZIO.succeed(true)
case false =>
inputDataAndExtendQueue(xs)(effect)
}
} yield res
}
}
} yield result
def processJob(queue: Queue[String], workerId: Int = -1): ZIO[Any, Nothing, Unit]
// add new job
def addJob(job: String) =
for {
queue <- queueRef.get
addRes <- inputDataAndExtendQueue(queue)(job)
_ <- addRes match {
case true => ZIO.unit
case false => for {
_ <- Console.printLine("--> create new worker queue ... " + queue.size)
q <- Queue.dropping[String](queueSize)
_ <- queueRef.update(_ :+ q)
_ <- processJob(q, queue.size).fork
} yield ()
}
} yield ()
// produce continouse jobs
def startProduce() = {
(for {
//TODO check this time is whether holiday or not
data <- unitProduce // ----------------> Producing Job
_ <- Console.printLine("produce job to crawl: " + data)
_ <- ZIO.foreach(data){r => addJob(r)}
} yield ()).repeat(Schedule.spaced(1.seconds)).unit
}
}
- 아래는 위의 코드를 기반으로 Crawler를 구현한 코드입니다.
trait JobProducer[T] {
def unitProduce: ZIO[Any, Nothing, List[T]]
}
class CrawlJobScheduler(
queueRef: Ref[List[Queue[String]]],
stockRepo: StockRepo,
crawler: MinStockCrawler,
crawlStatus: CrawlStatusRepo) extends UnitJobScheduler(queueRef) {
override def unitProduce: ZIO[Any, Nothing, List[String]] = (for {
_ <- ZIO.when(DataUtil.getCurrentTimestamp() != DataUtil.stockTimestamp(0))(
ZIO.log("Out of stockTime") *> ZIO.fail(new Exception("---> Out of stock time of Today")))
queue <- queueRef.get
cntEndItemCodes <- queue.lastOption match {
case Some(q) => q.size
case None => ZIO.succeed(0)
}
expiredItemCodes <- crawlStatus.getExpiredItemCode(10 * 60 * 1000 - cntEndItemCodes * 1000)
_ <- ZIO.foreachPar(expiredItemCodes){itemCode => crawlStatus.syncCrawlStatus(itemCode, "PEND")}
_ <- ZIO.log("Create target itemCodes : " + expiredItemCodes)
} yield expiredItemCodes)
.catchAll(e => ZIO.log(e.getMessage()) *> ZIO.succeed(List.empty[String]))
override def processJob(queue: Queue[String], workerId: Int = -1): ZIO[Any, Nothing, Unit] =
(for {
itemCode <- queue.take
cd <- crawler.crawl(itemCode)
res <- stockRepo.insertStockMinVolumeSerialBulk(cd)
_ <- crawlStatus.syncCrawlStatus(itemCode, "SUSP")
_ <- ZIO.foreach(cd){r => Console.printLine(r.toString())}
_ <- Console.printLine(s"Inserted data ${cd.size} for $itemCode by worker #$workerId").ignore // log
} yield ()).repeat(Schedule.spaced(1.seconds))
.provide(
Client.customized,
NettyClientDriver.live,
ZLayer.succeed(NettyConfig.default),
DnsResolver.default,
DataDownloader.live,
)
.unit.catchAll(e => ZIO.log(e.getMessage()).ignore)
}
종가패턴 발견
- 우상향 여부를 판별하고 기울기 값과 오차범위내의 가격을 count하여 패턴의 일치여부를 찾습니다.
class EndPriceAnalyzerImpl extends EndPriceAnalyzer {
def analyzeUpstream(data: List[StockMinVolumeTable], allowedError: Double = 0.25, innerErrorPer: Double = 0.80) = {
val streamedData = ZStream.fromIterable(data).filter(r => {
val tokens = r.tsCode.split("_")
tokens.length == 2 && tokens(1) <= "15:30"
})
(for {
firstElem <- streamedData.runHead
lastElem <- streamedData.runLast
_ <- Console.printLine(s"First : ${firstElem}, end : ${lastElem}")
delta <- ZIO.attempt(
lastElem.flatMap(l => firstElem.map(f => l.fixedPrice - f.fixedPrice))
.getOrElse(0.0)
)
_ <- Console.printLine(s"Delta : ${delta}")
_ <- ZIO.when(delta < 0)(ZIO.fail(new Exception("---> Stock price is not trending upward <--")))
slope <- ZIO.attempt(delta / (data.length - 1))
_ <- Console.printLine(s"Slope : ${slope}")
result <- streamedData.zipWithIndex.runFold(0) { case (inCount, (element, idx)) =>
val expectedY = firstElem.map(a => a.fixedPrice + slope * idx)
val error = expectedY.map(exp => Math.abs(element.fixedPrice - exp) / delta)
error match {
case Some(e) if e <= allowedError =>
inCount + 1
case x =>
inCount
}
}
_ <- Console.printLine(s"Result : ${result} / ${data.length}")
finResult <- ZIO.succeed(result > data.length * innerErrorPer)
} yield finResult).catchAll(e => Console.printError(e.getLocalizedMessage()) *> ZIO.succeed(false))
}
override def analyze(stockCode: String, targetDt: String) = for {
repo <- ZIO.service[StockRepo]
targetData <- repo.selectStockDataByItemCode(stockCode, targetDt)
result <- analyzeUpstream(targetData)
} yield result
}
'Tech > ZIO' 카테고리의 다른 글
ZIO의 Error 처리 (0) | 2024.08.30 |
---|---|
Scala ZIO의 error handling - (1) (0) | 2024.05.05 |