본 문서에 관하여

  • python을 활용하여 crawler와 고전 ml로 주가 회귀 분석 진행
  • 아직 작성중

----

 

KODEX 코스닥150레버리지 상품의 다음날 상승, 하락, 가격을 예측하는 것을 목표로 하였다.

 

아래의 사이트에서 '외국인/기관 순매매 거래량' 2016년 부터의 데이터를 Crawling 하였다.

 

코스닥 전체의 투자자별 매매 동향 또한 동일 기간 Crawling을 하였다.

 

Regression 방식의 머신러닝일 적용하므로 과거의 많은 일 수의 특성의 반영이 필요하여 주요 지표에 대해서 아래와 같이 1, 2, 4주 기준으로 합산, 평균, 표준편차 Feature를 추가하였다.

def createColumns(df , name: str) :
    # print(name + ' newly added ..')
    df[name + '_SUM_D5'] = df[name].rolling(5).sum().shift(1)
    df[name + '_AVG_D5'] = df[name].rolling(5).mean().shift(1)
    df[name + '_STD_D5'] = df[name].rolling(5).std().shift(1)
    df[name + '_SUM_D10'] = df[name].rolling(10).sum().shift(1)
    df[name + '_AVG_D10'] = df[name].rolling(10).mean().shift(1)
    df[name + '_STD_D10'] = df[name].rolling(10).std().shift(1)
    df[name + '_SUM_D20'] = df[name].rolling(20).sum().shift(1)
    df[name + '_AVG_D20'] = df[name].rolling(20).mean().shift(1)
    df[name + '_STD_D20'] = df[name].rolling(20).std().shift(1)


feature_cols = ['VOLUME',
                'COMP_BUY', 'FOR_BUY', 'FOR_CONT', 'FOR_PER', 'PERSONAL',
                'FOREIGNER', 'COMPANY', 'FINANCE', 'INSURANCE', 'TOOSIN', 'BANK',
                'ETC_FIN', 'GOV_FUND', 'ETC_FUND']

for col in feature_cols:
    createColumns(df3, col)

 

다음날 종가를 예측하는게 목표이므로 output은 다음날의 종가로 설정을 하였다.

df3['END_VALUE_PRED_OUT']=df3.END_VALUE.shift(-1)

 

pycaret을 활용하여 모델을 생성하였다.

from pycaret.regression import *

s = setup(data = train, target='DELTA_OUT', session_id=123, normalize=True, normalize_method='zscore')

 

베이스라인 테스트를 진행해 보았다. 예측 정확도가 굉장히 높게 나왔다.

best = compare_models(exclude=['lar'])

 

Feature Importance를 보면 대체로 한달 정도의 기간의 통계 데이터가 결과에 영향을 많이 줌을 확인할 수 있다.

SHAP value로 각 Feature별 공헌 특성을 확인해보자

 

대략적인 해석은,

  • COMP_BUY_STD_D20 : 기관투자자 순매수 20일치 표본표준편차, 음의 기여도를 가진다. 기관투자자 매도하는 시점은 상승으로 이어진다.
  • 은행, 국민연금, 보험, 펀드 등 : 양의 기여도, 즉 주가가 상승할때 이들의 양의 매수도 증가한다 볼 수 있다.
  • 투신 : 대체로 양의 기여도이나 다른 주체와는 달리 선형적이지 않다. 미리 치고 빠지는 느낌
  • 외국인 : 대체로 양의 관계를 보이나 결과 영향도는 미미함.
  • COMP_BUIY_STD_D10 : 20일 값과 유사하게 음의 기여도이나 기여도 자체는 20일과 비교하여 줄어드는 양상, 즉 기관 투자자들은 상승 3~4주 전에 움직인다.
 

LSTM은,

  • 동일 데이터로 실험시 매우 저조한 예측 정확도를 출력
    • Feature Engineering에 추가적인 시간 투자가 필요해 보임
    • 일간 데이터이므로 딥러닝을 의미있는 정확도를 추출하기에는 데이터가 과부족로 판단됨

내일의 예측은 어떻게 할 것인가?

  • 아직은 신의 영역이나 기관투자자들은 최대 한달정도 전부터 움직임, 이것을 Catch 하는 방향으로 접근이 요구됨

https://youtu.be/N6ystK4XoXU

현재 일어나고 있는 이슈를 어떻게 빠르게 파악할 수 있을까?

포털이나 언론사의 뉴스를 하루종일 보고 있기는 힘든 일이다.

특히 뉴스의 빠른 파악이 중요한 주식, 코인 투자자들에겐 더더욱..

 

그래서 그들을 위해서 뉴스를 수집하여 이슈를 요약하여 시각화하는 서비스를 직접 만들어 보기로 하였다.

 

Tech Stack은 Spark, Scala, Slick, Scalatra, Twirl, Infux 등을 사용하였다.

Crawl가 수집한 데이터를 임시로 Mysql에 저장하고 있고,

수집 데이터를 SparkStreaming으로 실시간으로 분석해야 하는 경우를 가정하자.

 

[Custom Receiver의 구현]

주기 시간에 따라 Table의 데이터를 Select하는 구현이 필요하다.

SparkStreaming 상에서 처리가 끝난 데이터가 다시 조회되어야 할 일은 없어야 하기에 가장 최근에 조회된 max(id)값을 저장/갱신한다.

class MySqlSourceReceiver(val seedNo : Long) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2)
  with Logging {

  var latestCrawlNo = 0L

  override def onStart(): Unit = {
    new Thread("MysqlSt") {
      override def run(): Unit = {
        createGetData
      }
    }.start()
  }

  override def onStop(): Unit = synchronized {
//    this.db.close()
  }

  private def createGetData(): Unit = {
    while(!isStopped) {
      try {
        val res = DbUtil.getLatestContextFrom(seedNo, latestCrawlNo)
        println(s"Count of crawled data : ${res.size} for ${seedNo}")

        val mergedRes = DbUtil.getLatestContextFrom(seedNo, latestCrawlNo).mkString("\n\n")
        println(mergedRes)
        store(mergedRes)

        latestCrawlNo = DbUtil.latestCrawlNo(seedNo)
        println(s"Update Point ${latestCrawlNo}")

        Thread.sleep(5000)
      } catch {
        case e: Exception => e.printStackTrace()
      }
    }
  }
}

[DBUtil의 구현]

아래와 같이 DB 조회부분을 위의 Receiver에서 분리해야 한다.

Receiver상에서 구현할 경우 대부분의 경우 Serializable Exception이 발생한다.

object DbUtil {
  protected implicit def executor = scala.concurrent.ExecutionContext.Implicits.global
  val db : Database = Database.forURL(url ="jdbc:mysql://localhost:3306/horus?useSSL=false",
    user="root", password="18651865", driver = "com.mysql.jdbc.Driver")

  val getLatestAnchorWithLimit = (seedNo: Long, startCrawlNo: Long, limit : Int) =>
    Await.result(db.run(CrawledRepo.findLatestAnchor(seedNo, startCrawlNo, limit).result), 10.seconds)

  val getMaxCrawlNo = (seedNo: Long) => Await.result(db.run(CrawledRepo.findLatestCrawlNo(seedNo).result), 10.seconds).getOrElse(0L)

  def latestCrawlNo(seedNo: Long) : Long = {
    Await.result(db.run(CrawledRepo.findLatestCrawlNo(seedNo).result), 10.seconds).getOrElse(0L)
  }

  def getLatestAnchorFrom(seedNo: Long, startCrawlNo: Long) = {
    Await.result(db.run(CrawledRepo.findLatestAnchor(seedNo, startCrawlNo, 100).result), 10.seconds)
  }

  def getLatestContextFrom(seedNo: Long, startCrawlNo: Long) = {
    Await.result(db.run(CrawledRepo.findLatestContent(seedNo, startCrawlNo, 100).result), 10.seconds)
  }
}

[Slick 구현]

일반적인 Slick의 구현과 동일하게 하면 된다.

적절한 구간을 분리만 잘 해주면 SparkStreaming 상에서 별 무리 없이 잘 동작한다.

object CrawledRepo {
  case class CrawlUnit(
                        crawlNo: Long = -1L,
                        url: Option[String] = None,
                        anchorText: Option[String] = None,
                        anchorImg: Option[String] = None,
                        // ...
                        pageTitle: Option[String] = None,
                        parsedPageDate: Option[Timestamp] = None
                      )

  class CrawlUnitTableSchema(tag: Tag) extends Table[CrawlUnit](tag, None,"CRAWL_UNIT1") {
    def crawlNo = column[Long]("CRAWL_NO", O.PrimaryKey, O.AutoInc)
    def url = column[Option[String]]("URL")
    def anchorText = column[Option[String]]("ANCHOR_TEXT")
	// ...
    def pageTitle = column[Option[String]]("PAGE_TITLE")
    def parsedPageDate = column[Option[Timestamp]]("PARSED_PAGE_DATE")

    def * = (crawlNo, url, anchorText, anchorImg, status, seedNo, pageDate, regDate,
      updDate, pageText, pageTitle, parsedPageDate) <> (CrawlUnit.tupled, CrawlUnit.unapply)
  }

  val crawlUnitsQuery = TableQuery[CrawlUnitTableSchema]
  val createSchemaAction = (crawlUnitsQuery.schema).create

  def findLatestAnchor(seedNo: Long) = crawlUnitsQuery
    .filter(_.seedNo === seedNo).filter(_.status === "SUCC")
    .sortBy(_.crawlNo.desc).drop(0).take(20).map(_.anchorText)

  def findLatestCrawlNo(seedNo1: Long) = crawlUnitsQuery.filter(_.seedNo === seedNo1)
    .filter(_.status === "SUCC").map(_.crawlNo).max

  def getCrawledData(crawlNo: Long) = crawlUnitsQuery.filter(_.crawlNo === crawlNo)
}

[SparkStreaming App 구현]

아래와 같이 sss.receiverStream 을 통해 직접 구현한 Receiver를 설정해 주면 된다.

object CrawledProcessing extends SparkStreamingInit {

  def processCrawled(seedId : Long) = {
    val anchors = ssc.receiverStream(new MySqlSourceReceiver(seedId))
    val words = anchors.flatMap(anchor => {
      HangleTokenizer().arrayNouns(anchor)
    })

    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.foreachRDD(rdd => {
      rdd.foreach(tf => {
        InfluxClient.writeTf(seedId, tf._1, tf._2)
      })
    })
	
    // ...
  }

긁어온 신문기사의 단어를 Count하는 모듈이 완성되었다.

https://www.slideshare.net/jeffykim/ver-2-249475816

 

댓글 감성 분석 상용화 개발기(Ver. 2)

모 앱스토어에서 개발된 댓글의 긍정,부정,VOC 분류 자동화를 위한 연구 개발기를 정리하였습니다. 상용화 오픈 이후 1년이 진행된 시점에 작성된 문서입니다. Java 기반의 ML 툴인 DL4J가 적용되었

www.slideshare.net

 

모 앱스토어의 댓글의 긍정, 부정, VOC를 실시간으로 분류할 수 있는 서비스 개발, 상용화하면서,

공유할 만한 내용들을 정리하였습니다.

 

Java 기반의 머신러닝 툴인 DL4J를 사용하였고, 분류에는 CNN을 주기술로  Word2vec 등의 활용되었습니다.

상용서비스를 위해  Springboot 기반의  서비스 배포환경이 적용되었습니다.

+ Recent posts