https://youtu.be/N6ystK4XoXU

현재 일어나고 있는 이슈를 어떻게 빠르게 파악할 수 있을까?

포털이나 언론사의 뉴스를 하루종일 보고 있기는 힘든 일이다.

특히 뉴스의 빠른 파악이 중요한 주식, 코인 투자자들에겐 더더욱..

 

그래서 그들을 위해서 뉴스를 수집하여 이슈를 요약하여 시각화하는 서비스를 직접 만들어 보기로 하였다.

 

Tech Stack은 Spark, Scala, Slick, Scalatra, Twirl, Infux 등을 사용하였다.

Crawl가 수집한 데이터를 임시로 Mysql에 저장하고 있고,

수집 데이터를 SparkStreaming으로 실시간으로 분석해야 하는 경우를 가정하자.

 

[Custom Receiver의 구현]

주기 시간에 따라 Table의 데이터를 Select하는 구현이 필요하다.

SparkStreaming 상에서 처리가 끝난 데이터가 다시 조회되어야 할 일은 없어야 하기에 가장 최근에 조회된 max(id)값을 저장/갱신한다.

class MySqlSourceReceiver(val seedNo : Long) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2)
  with Logging {

  var latestCrawlNo = 0L

  override def onStart(): Unit = {
    new Thread("MysqlSt") {
      override def run(): Unit = {
        createGetData
      }
    }.start()
  }

  override def onStop(): Unit = synchronized {
//    this.db.close()
  }

  private def createGetData(): Unit = {
    while(!isStopped) {
      try {
        val res = DbUtil.getLatestContextFrom(seedNo, latestCrawlNo)
        println(s"Count of crawled data : ${res.size} for ${seedNo}")

        val mergedRes = DbUtil.getLatestContextFrom(seedNo, latestCrawlNo).mkString("\n\n")
        println(mergedRes)
        store(mergedRes)

        latestCrawlNo = DbUtil.latestCrawlNo(seedNo)
        println(s"Update Point ${latestCrawlNo}")

        Thread.sleep(5000)
      } catch {
        case e: Exception => e.printStackTrace()
      }
    }
  }
}

[DBUtil의 구현]

아래와 같이 DB 조회부분을 위의 Receiver에서 분리해야 한다.

Receiver상에서 구현할 경우 대부분의 경우 Serializable Exception이 발생한다.

object DbUtil {
  protected implicit def executor = scala.concurrent.ExecutionContext.Implicits.global
  val db : Database = Database.forURL(url ="jdbc:mysql://localhost:3306/horus?useSSL=false",
    user="root", password="18651865", driver = "com.mysql.jdbc.Driver")

  val getLatestAnchorWithLimit = (seedNo: Long, startCrawlNo: Long, limit : Int) =>
    Await.result(db.run(CrawledRepo.findLatestAnchor(seedNo, startCrawlNo, limit).result), 10.seconds)

  val getMaxCrawlNo = (seedNo: Long) => Await.result(db.run(CrawledRepo.findLatestCrawlNo(seedNo).result), 10.seconds).getOrElse(0L)

  def latestCrawlNo(seedNo: Long) : Long = {
    Await.result(db.run(CrawledRepo.findLatestCrawlNo(seedNo).result), 10.seconds).getOrElse(0L)
  }

  def getLatestAnchorFrom(seedNo: Long, startCrawlNo: Long) = {
    Await.result(db.run(CrawledRepo.findLatestAnchor(seedNo, startCrawlNo, 100).result), 10.seconds)
  }

  def getLatestContextFrom(seedNo: Long, startCrawlNo: Long) = {
    Await.result(db.run(CrawledRepo.findLatestContent(seedNo, startCrawlNo, 100).result), 10.seconds)
  }
}

[Slick 구현]

일반적인 Slick의 구현과 동일하게 하면 된다.

적절한 구간을 분리만 잘 해주면 SparkStreaming 상에서 별 무리 없이 잘 동작한다.

object CrawledRepo {
  case class CrawlUnit(
                        crawlNo: Long = -1L,
                        url: Option[String] = None,
                        anchorText: Option[String] = None,
                        anchorImg: Option[String] = None,
                        // ...
                        pageTitle: Option[String] = None,
                        parsedPageDate: Option[Timestamp] = None
                      )

  class CrawlUnitTableSchema(tag: Tag) extends Table[CrawlUnit](tag, None,"CRAWL_UNIT1") {
    def crawlNo = column[Long]("CRAWL_NO", O.PrimaryKey, O.AutoInc)
    def url = column[Option[String]]("URL")
    def anchorText = column[Option[String]]("ANCHOR_TEXT")
	// ...
    def pageTitle = column[Option[String]]("PAGE_TITLE")
    def parsedPageDate = column[Option[Timestamp]]("PARSED_PAGE_DATE")

    def * = (crawlNo, url, anchorText, anchorImg, status, seedNo, pageDate, regDate,
      updDate, pageText, pageTitle, parsedPageDate) <> (CrawlUnit.tupled, CrawlUnit.unapply)
  }

  val crawlUnitsQuery = TableQuery[CrawlUnitTableSchema]
  val createSchemaAction = (crawlUnitsQuery.schema).create

  def findLatestAnchor(seedNo: Long) = crawlUnitsQuery
    .filter(_.seedNo === seedNo).filter(_.status === "SUCC")
    .sortBy(_.crawlNo.desc).drop(0).take(20).map(_.anchorText)

  def findLatestCrawlNo(seedNo1: Long) = crawlUnitsQuery.filter(_.seedNo === seedNo1)
    .filter(_.status === "SUCC").map(_.crawlNo).max

  def getCrawledData(crawlNo: Long) = crawlUnitsQuery.filter(_.crawlNo === crawlNo)
}

[SparkStreaming App 구현]

아래와 같이 sss.receiverStream 을 통해 직접 구현한 Receiver를 설정해 주면 된다.

object CrawledProcessing extends SparkStreamingInit {

  def processCrawled(seedId : Long) = {
    val anchors = ssc.receiverStream(new MySqlSourceReceiver(seedId))
    val words = anchors.flatMap(anchor => {
      HangleTokenizer().arrayNouns(anchor)
    })

    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.foreachRDD(rdd => {
      rdd.foreach(tf => {
        InfluxClient.writeTf(seedId, tf._1, tf._2)
      })
    })
	
    // ...
  }

긁어온 신문기사의 단어를 Count하는 모듈이 완성되었다.

https://www.slideshare.net/jeffykim/ver-2-249475816

 

댓글 감성 분석 상용화 개발기(Ver. 2)

모 앱스토어에서 개발된 댓글의 긍정,부정,VOC 분류 자동화를 위한 연구 개발기를 정리하였습니다. 상용화 오픈 이후 1년이 진행된 시점에 작성된 문서입니다. Java 기반의 ML 툴인 DL4J가 적용되었

www.slideshare.net

 

모 앱스토어의 댓글의 긍정, 부정, VOC를 실시간으로 분류할 수 있는 서비스 개발, 상용화하면서,

공유할 만한 내용들을 정리하였습니다.

 

Java 기반의 머신러닝 툴인 DL4J를 사용하였고, 분류에는 CNN을 주기술로  Word2vec 등의 활용되었습니다.

상용서비스를 위해  Springboot 기반의  서비스 배포환경이 적용되었습니다.

+ Recent posts