Crawl가 수집한 데이터를 임시로 Mysql에 저장하고 있고,
수집 데이터를 SparkStreaming으로 실시간으로 분석해야 하는 경우를 가정하자.
[Custom Receiver의 구현]
주기 시간에 따라 Table의 데이터를 Select하는 구현이 필요하다.
SparkStreaming 상에서 처리가 끝난 데이터가 다시 조회되어야 할 일은 없어야 하기에 가장 최근에 조회된 max(id)값을 저장/갱신한다.
class MySqlSourceReceiver(val seedNo : Long) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2)
with Logging {
var latestCrawlNo = 0L
override def onStart(): Unit = {
new Thread("MysqlSt") {
override def run(): Unit = {
createGetData
}
}.start()
}
override def onStop(): Unit = synchronized {
// this.db.close()
}
private def createGetData(): Unit = {
while(!isStopped) {
try {
val res = DbUtil.getLatestContextFrom(seedNo, latestCrawlNo)
println(s"Count of crawled data : ${res.size} for ${seedNo}")
val mergedRes = DbUtil.getLatestContextFrom(seedNo, latestCrawlNo).mkString("\n\n")
println(mergedRes)
store(mergedRes)
latestCrawlNo = DbUtil.latestCrawlNo(seedNo)
println(s"Update Point ${latestCrawlNo}")
Thread.sleep(5000)
} catch {
case e: Exception => e.printStackTrace()
}
}
}
}
[DBUtil의 구현]
아래와 같이 DB 조회부분을 위의 Receiver에서 분리해야 한다.
Receiver상에서 구현할 경우 대부분의 경우 Serializable Exception이 발생한다.
object DbUtil {
protected implicit def executor = scala.concurrent.ExecutionContext.Implicits.global
val db : Database = Database.forURL(url ="jdbc:mysql://localhost:3306/horus?useSSL=false",
user="root", password="18651865", driver = "com.mysql.jdbc.Driver")
val getLatestAnchorWithLimit = (seedNo: Long, startCrawlNo: Long, limit : Int) =>
Await.result(db.run(CrawledRepo.findLatestAnchor(seedNo, startCrawlNo, limit).result), 10.seconds)
val getMaxCrawlNo = (seedNo: Long) => Await.result(db.run(CrawledRepo.findLatestCrawlNo(seedNo).result), 10.seconds).getOrElse(0L)
def latestCrawlNo(seedNo: Long) : Long = {
Await.result(db.run(CrawledRepo.findLatestCrawlNo(seedNo).result), 10.seconds).getOrElse(0L)
}
def getLatestAnchorFrom(seedNo: Long, startCrawlNo: Long) = {
Await.result(db.run(CrawledRepo.findLatestAnchor(seedNo, startCrawlNo, 100).result), 10.seconds)
}
def getLatestContextFrom(seedNo: Long, startCrawlNo: Long) = {
Await.result(db.run(CrawledRepo.findLatestContent(seedNo, startCrawlNo, 100).result), 10.seconds)
}
}
[Slick 구현]
일반적인 Slick의 구현과 동일하게 하면 된다.
적절한 구간을 분리만 잘 해주면 SparkStreaming 상에서 별 무리 없이 잘 동작한다.
object CrawledRepo {
case class CrawlUnit(
crawlNo: Long = -1L,
url: Option[String] = None,
anchorText: Option[String] = None,
anchorImg: Option[String] = None,
// ...
pageTitle: Option[String] = None,
parsedPageDate: Option[Timestamp] = None
)
class CrawlUnitTableSchema(tag: Tag) extends Table[CrawlUnit](tag, None,"CRAWL_UNIT1") {
def crawlNo = column[Long]("CRAWL_NO", O.PrimaryKey, O.AutoInc)
def url = column[Option[String]]("URL")
def anchorText = column[Option[String]]("ANCHOR_TEXT")
// ...
def pageTitle = column[Option[String]]("PAGE_TITLE")
def parsedPageDate = column[Option[Timestamp]]("PARSED_PAGE_DATE")
def * = (crawlNo, url, anchorText, anchorImg, status, seedNo, pageDate, regDate,
updDate, pageText, pageTitle, parsedPageDate) <> (CrawlUnit.tupled, CrawlUnit.unapply)
}
val crawlUnitsQuery = TableQuery[CrawlUnitTableSchema]
val createSchemaAction = (crawlUnitsQuery.schema).create
def findLatestAnchor(seedNo: Long) = crawlUnitsQuery
.filter(_.seedNo === seedNo).filter(_.status === "SUCC")
.sortBy(_.crawlNo.desc).drop(0).take(20).map(_.anchorText)
def findLatestCrawlNo(seedNo1: Long) = crawlUnitsQuery.filter(_.seedNo === seedNo1)
.filter(_.status === "SUCC").map(_.crawlNo).max
def getCrawledData(crawlNo: Long) = crawlUnitsQuery.filter(_.crawlNo === crawlNo)
}
[SparkStreaming App 구현]
아래와 같이 sss.receiverStream 을 통해 직접 구현한 Receiver를 설정해 주면 된다.
object CrawledProcessing extends SparkStreamingInit {
def processCrawled(seedId : Long) = {
val anchors = ssc.receiverStream(new MySqlSourceReceiver(seedId))
val words = anchors.flatMap(anchor => {
HangleTokenizer().arrayNouns(anchor)
})
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.foreachRDD(rdd => {
rdd.foreach(tf => {
InfluxClient.writeTf(seedId, tf._1, tf._2)
})
})
// ...
}
긁어온 신문기사의 단어를 Count하는 모듈이 완성되었다.
'Tech > DATA & AI' 카테고리의 다른 글
고전 머신러닝으로 주가 예측 해보기 (0) | 2024.08.04 |
---|---|
실시간 검색어를 대체하는 이슈 추적기 (0) | 2022.12.08 |
딥러닝을 활용한 댓글 감성분석 개발 및 상용화 (0) | 2021.06.28 |