게임 회사에 입사했을때 처음으로 주어진 업무는

아이온이라는 MMORPG 게임에서 캐릭터를 하나 생성해서 만랩으로 만드는 것이었다.

업무시간에 점심시간에 야근으로 게임을 하더라도 아무도 뭐라 그러는 사람이 없으니 한편으론 문화충격이기도 했고..

게임에 관심이 별로 없었던 나로써는 이건 왜 해야하지라는 의문만 들었다.

 

물론 만랩에 도달하고서야 알게되었는데,

캐릭터를 일단 만랩으로 만들고 나서 시작되는 퀘스트와 할 수 있는 것들이 너무나도 많았다.

 

결국 게임 시작의 필요조건은 본인 캐릭터를 만렙으로 만드는 것이라는 것을 알았다.

----

 

Scala를 공부하면서 드는 생각은 파도파도 끝이 없는데 도대체 어디까지 가야 만랩의 시작점일까?

물론 Spark 정도를 돌려보는 수준이라면 Scala의 5% 정도만 알아도 대부분의 기능을 원활하게 쓸 수 있다.

 

그 이외의 것들을 하기위한 95%의 노력이 들어가는 영역. 외로운 싸움인 것 같다.

Scala를 시작하고 2년 정도 흘러간 현시점.

ZIO, Cats를 조금씩 쓰고 있는데 결국은 Monad 구조를 코드에 얼마나 잘 녹여 넣는가로 정리가 되어 가는 느낌이다.

Scala의 Purely Functional을 지향하는 대부분의 툴들은 Free Monad 구조가 녹아 있는 부분이 많다.

Java에 Spring Framework이 있다면 현 시점 Scala에는 ZIO가 있고 Scala에 대표 툴들이 ZIO로 통합되어가고 있는 분위기다.

 

Free Monad를 몰라도 ZIO 등의 Free Monad 친화적 구조의 툴 들을 쓰는데 큰 문제는 없지만 쓰면 쓸수록 뭔가 마음속에 답답한 짐이 커짐을 느낀다.

Free Monad의 구현방식을 정확하게 이해하지 않으면 Scala와의 여정이 시작부터 쉽지 않을 것 같다.

일단 Free Monad 구현 코드를 꼼꼼히 살펴보자.

----

 

구현 코드는 스칼라 공부하는 사람의 천사, 다니엘님의 영상과 블로그를 참고하였다.

https://blog.rockthejvm.com/free-monad/

 

Free Monad in Scala

A tutorial on the Free monad in Scala, how it works and what it’s good for.

blog.rockthejvm.com

매우 잘 정리된 글에 영상이지만 그저 놀라운 모습을 바라만 보고 있기를 반복하는데 뭔가 막히는 부분을 찾아내고 정리하는 과정이 필요하다 생각되었다.

 

우선 Free Monad로 쓸 수 있는 유용한 좋은 점중 하나가 비지니스로직과 구현로직의 분리이다.

Java나 객체지향 언어에서 사용한 Tempate Method Pattern은 상위계층에서 Function의 flow를 정의하고 구현 Class를 분리하는데, 

Scala역시 Free Monad를 이용하여 함수적으로 로직과 구현을 분리한다.

 

def myLittleProgram = for { 
  _ <- create[String]("123-456", "Daniel")
  name <- get[String]("123-456")
  _ <- create[String]("567", name.toUpperCase())
  _ <- delete("123-456")
} yield ()

위의 코드는 비지니스 로직, 즉 로직에 의한 output 데이터의 flow라고도 볼 수 있다.

데이터를 생성(create)하고, 생성된 데이터의 내부 값(get)을 변환 저장(create)하고, 기존 데이터는 삭제(delete)하는 흐름이다.

 

for comprehension 상에서 코드가 정의되었기에 create, get, delete는 개별적으로 Monad 객체라는 것을 짐작할 수 있다.

trait DBOps[A]
case class Create[A](key: String, value: A) extends DBOps[Unit]
case class Read[A](key: String) extends DBOps[A]
case class Update[A](key: String, value: A) extends DBOps[A]
case class Delete(key: String) extends DBOps[Unit]

def create[A](key: String, value: A): Free[DBOps, Unit] =
Free.liftM[DBOps, Unit](Create(key, value))

def get[A](key: String): Free[DBOps, A] =
Free.liftM[DBOps, A](Read[A](key))

def update[A](key: String, value: A): Free[DBOps, A] =
Free.liftM[DBOps, A](Update[A](key, value))

def delete(key: String): Free[DBOps, Unit] =
Free.liftM(Delete(key))

모든 함수는 Free[_,_] 의 인스턴스를 리턴한다. Free[_,_] 자체가 flatMap, pure 함수를 포함하는 Monad라는 것을 짐작할 수 있다.

다만 우리가 알고 있던 모나드의 모습은 M[_]인데 type 인자가 하나 더 있는 더 복잡한 구조라는 것을 짐작할 수 있다.(겁부터 먹는다)

 

일단 Free의 구조를 살펴보자.

trait Free[M[_], A] {
    import Free.*
    def flatMap[B](f: A => Free[M, B]): Free[M, B] = FlatMap(this, f)
    def map[B](f: A => B): Free[M, B] = flatMap(a => pure(f(a)))
    def foldMap[G[_]: Monad](natTrans: M ~> G): G[A] = this match {
      case Pure(a) => Monad[G].pure(a)
      case Suspend(ma) => natTrans.apply(ma)
      case FlatMap(fa, f) => // need a G[B] --> fa = DBMonad = Suspend, f = Next DBMonad = Suspend
        Monad[G].flatMap(fa.foldMap(natTrans))(a => f(a).foldMap(natTrans))
    }
}
  
object Free {
    def pure[M[_], A](a: A): Free[M, A] = Pure(a)
    def liftM[M[_], A](ma: M[A]): Free[M, A] = Suspend(ma)

    case class Pure[M[_], A](a: A) extends Free[M, A]
    case class FlatMap[M[_],A,B](fa: Free[M, A], f: A => Free[M, B]) extends Free[M, B]
    case class Suspend[M[_], A](ma: M[A]) extends Free[M, A]
}

짧은 지식이지만 Monad에는 flatMap, pure 함수가 있다는 걸 알았고 map은 보너스라는 건 알고 있다.

새롭게 보이는 function이 liftM과 foldMap 이다.

 

def liftM[M[_], A](ma: M[A]): Free[M, A] = Suspend(ma)

// 사용 예제
def create[A](key: String, value: A): Free[DBOps, Unit] =
	Free.liftM[DBOps, Unit](Create(key, value))

// 참고
case class Create[A](key: String, value: A) extends DBOps[Unit]

liftM의 정의와 구현은 위와 같은데 뭔가 심오하다. 

일단은 M[A] 를 받아서 Free[M,A]로 변환된 객체를 리턴하는 구조로 보면 되는데, DBOps[Unit] -> Free[DBOps, Unit] 의 형태로 변환하는 pure의 Free Monad 함수로 보면 될 것 같다.

 

그리고 Free Monad의 핵심 foldMap인데 우선 어떻게 사용되는지 살펴보자

val dbOps2IO: DBOps ~> IO = new (DBOps ~> IO) {
  override def apply[A](fa: DBOps[A]): IO[A] = fa match {
    case Create(key, value) => IO.create { // actual code that uses the database
      println(s"insert into people(id, name) values ($key, $value)")
      myDB += (key -> serialize(value))
      ()
    }
    case Read(key) => IO.create {
      println(s"select * from people where id=$key limit 1")
      deserialize(myDB(key))
    }
    case Update(key, value) => IO.create {
      println(s"update people(name=$value) where id=$key")
      val oldValue = myDB(key)
      myDB += (key -> serialize(value))
      deserialize(oldValue)
    }
    case Delete(key) => IO.create {
      println(s"delete from people where id=$key")
      ()
    }
  }
}

val ioProgram: IO[Unit] = myLittleProgram.foldMap(dbOps2IO)

ioProgram.unsafeRun()

우선 사전 정의되었던 myLittlePrograme의 foldMap(dbOps2IO) 함수를 호출하여 DBOps 기반의 데이터를 IO 형태로 변환하고, 

IO에서 제공하는 unsafeRun() 함수를 호출하여 실제 로직의 프로그램을 실행시킨다 (놀랍다)

 

즉 비지니스로직상의 전체 data flow와 세부 action의 로직을 foldMap() 함수가 연결시켜주는 역할을 한다.

처음으로 생각나는 Scala 문법 구조상 이해가 되지 않는 의문은 for comprehesion이다.

def myLittleProgram: Free[DBOps, Unit] = for { // monadic
  _ <- create[String]("123-456", "Daniel")
  name <- get[String]("123-456")
  _ <- create[String]("567", name.toUpperCase())
  _ <- delete("123-456")
} yield ()

myLittleProgram의 최종 return type은 위의 코드에서 보듯이 Free[DBOps, Unit] 이다. 

 

여기서 foldMap의 동작 원리를 이해하기 위해서 for comprehension의 동작 방식에 대한 의문이 든다.

 

우리가 알고 있는 for comprehension은 각 Monad 객체 flatMap 연산을 chain형태로 순차적으로 호출하고 최종 결과값을 return하는 구조로 알고 있다.

 

 chain과 같은 정방향의 실행구조 (a.flatMap(a1 => b.flatMap(b1 => c.flatMap(c1 => c1.map(.. ))) 를 가지고 최종 결과값에 대한 접근만 myLittleProgram 함수명을 통하여 가능하다. 즉 myLittleProgram은 아래와 같이 풀어 쓸 수 있다.

val myLittleProgram2: Free[DBOps, Unit] = create[String]("123-456", "Daniel").flatMap(a1 => {
  get[String]("123-456").flatMap(name => {
    create[String]("567", name.toUpperCase()).flatMap(a3 => {
      delete("123-456").map(a4 => ())
    })
  })
})

for comprehension의 표현을 단순히 풀어 쓴 코드라 myLittleProgram2의 return type 역시 Free[DBOps, Unit]으로 동일하다. 

foldMap 구현시 첫번째로 해야될 일은 DBOps[String], 즉 Free[DBOps, String]을 IO[String]으로 변환하는 로직일 것이다.

 

그런데 다시 create[String]에 어떻게 접근 혹은 진입을 할 수 있을까?

 

결론부터 얘기를 하면 우리가 알고 있던 flatMap과 map은 뭔가 연산을 하고 결과값을 던져주는 함수인데,

여기서 사용되는 flatMap은 단순히 원본의 데이터를 IO 형태로 변환시켜주는 역할만을 한다.

println("MyLittleProgram => " + myLittleProgram)

// output
MyLittleProgram => FlatMap(Suspend(Create(123-456,Daniel)),c.y.study.FreeMonadSample$$$Lambda$17/0x0000000800096040@319b92f3)

위의 코드는 놀랍게도 for comprehension의 실행 결과가 값이 아닌 변환된 데이터 타입을 보여준다. 변환된 데이터 타입을 풀어쓰면 아래와 같다. 

def myLittleProgram = FlatMap(Suspend(Create(123-456,Daniel)),
	FlatMap(Suspend(Read("123-456")), 
		FlatMap(Suspend(Create(Read(..)), 
        		FlatMap(Suspend(Delete("123-456")), Pure(Unit))))

최상 root 객체가 FlatMap이고 chain 혹은 Linked List와 같은 형태의 구조적인 데이터 구조를 이루고 있다.

즉, myLittleProgram의 참조 객체는 FlatMap이 된다.

 

실제로 foldMap() 함수를 호출하였을때 실행되는 순서는 아래와 같다.

[FM] FlatMap: Suspend(Create(123-456,Daniel)), f =>c.y.study.FreeMonadSample$$$Lambda$17/0x0000000800096040@77846d2c
[FM] Suspend: Create(123-456,Daniel)
insert into people(id, name) values (123-456, Daniel)
[FM] FlatMap: Suspend(Read(123-456)), f =>c.y.study.FreeMonadSample$$$Lambda$23/0x0000000800091840@3b94d659
[FM] Suspend: Read(123-456)
select * from people where id=123-456 limit 1
[FM] FlatMap: Suspend(Create(567,DANIEL)), f =>c.y.study.FreeMonadSample$$$Lambda$25/0x0000000800118040@103f852
[FM] Suspend: Create(567,DANIEL)
insert into people(id, name) values (567, DANIEL)
[FM] FlatMap: Suspend(Delete(123-456)), f =>c.y.study.FreeMonadSample$Free$$Lambda$27/0x000000080011a040@71623278
[FM] Suspend: Delete(123-456)
delete from people where id=123-456
[FM] Pure :()

데이터의 비지니스로직을 정의한 순서에 따라 순회하여 데이터를 참조 변환이 가능한 구조가 된다. 아래의 코드가 실질적인 그 역할을 하고 있다.

trait Free[M[_], A] {
// ...

    def foldMap[G[_]: Monad](natTrans: M ~> G): G[A] = this match {
      case Pure(a) => Monad[G].pure(a)
      case Suspend(ma) => natTrans.apply(ma)
      case FlatMap(fa, f) => // need a G[B] --> fa = DBMonad = Suspend, f = Next DBMonad = Suspend
        Monad[G].flatMap(fa.foldMap(natTrans))(a => f(a).foldMap(natTrans) )
    }
}

// 참고1
def flatMap[B](f: A => Free[M, B]): Free[M, B] = FlatMap(this, f)

// 참고2
case class FlatMap[M[_],A,B](fa: Free[M, A], f: A => Free[M, B]) extends Free[M, B]

// 참고3
given ioMonad: Monad[IO] with {
  override def pure[A](a: A) = IO(() => a)

  override def flatMap[A, B](ma: IO[A])(f: A => IO[B]) =
    IO(() => f(ma.unsafeRun()).unsafeRun())
}

데이터 구조(리스트라 하자)를 순회하며 기존 데이터를 Free[DBOps, A] --> IO[A] 형태로 변환하며, 비지니스로직의 실제 구현체가 호출(unsafeRun())되는 구조를 가진다.

 

foldMap은 FlatMap을 발견하면 내부의 본래 데이터로직(Suspend)을 실행하고, 실행 결과를 다음 로직(FlatMap)이 참조하여 실행될 수 있도록 실행하는 구조를 가진다.

 

즉, FlatMap은 내부 데이터를 추출 검증(실행)하고, 다음 로직을 실행하는 역할, Suspend는 실제 로직을 실행(() => a)하는 역할을  한다 보면된다.

 

---

굉장히 복잡해 보일 수 있는데  FreeMonad는 단순하게 비지니스로직을 정의하고 실제 구현로직은 IO로 정의하여 순차적으로 구현로직이 비지니스 로직의 정의에 따라 실행될 수 있도록 내부적으로 복잡한 변환 로직을 가지고 있다.

물론 로직의 실행 흐름은 Monad의 for comprension에 정의된 흐름의 틀에 어느정도 의존성을 가진다 볼 수 있다.

 

ZIO, Cats 등 Purely Fuctional을 지향하는 모든 툴에서 위와 유사한 구조의 코드를 볼 수 있다.

 

---

즉, 위의 Free Monad 코드와 개념, 활용을 정확히 이해하는 것이 Scala 시작을 위한 준비라 볼 수 있겠다 -_-;;

 

/*

Monad는 FP의 진입 관문이면서 정확히 이해하기 쉽지않은 개념이다. 

어떤 글을 보아도 100% 충족되지 않아 의식의 흐름대로 개념을 정리해 보았다.

*/

 

Functor, Monoid의 이해

 

Monad 진입전 Functor, Monoid 등의 이해가 필요하다.

 

Functor는 내부의 데이터를 감싸고 있는 일종의 Wrapper이다.

내부의 data는 map function을 통해 접근 및 추가 연산의 처리가 가능하다.

대수적으로 map(x)(a => a) == x 을 만족해야 한다.

즉 오직 자료구조의 요소를 수정할 수 있으나, 구조 자체의 형태(순서, length, size 등)는 변함이 없어야 한다.

trait Functor[F[_]] {
  def map[A,B](da: F[A])(f: A=> B): F[B]
}

val listFunctor = new Functor[List] {
  override def map[A, B](da: List[A])(f: A => B): List[B] = da map f
}

listFunctor.map(List[Int](1,2,3))(_ + 3).foreach(println)

/* 
output:
4
5
6
*/

Monoid(모노이드)는 함수 합성의 대수적 접근(항등원, 결합법칙)을 함수의 합성에 활용하는 개념이다.

(이 포스트에서는 일단 자세한 설명은 생략한다. 파면 팔수록 나오는데 그것들을 하나하나 확인하고 가다 보면 이 글을 맺음하는데 수백년은 걸릴 것 같다.)

 

Monad란 무엇인가

 

Monad는 자료를 감싸고 있는 일종의 Wrapper이다.

 

Monad의 Wrapper가 '선물의 포장같은..' 이라는 표현을 쓰는 게시글도 보았는데, 선물의 내용물을 가공하여 새로운 내용물을 창조한다는 관점에서 뭔가 적절치 않은 것 같다.

 

Monad는 모호한 부분이 있다.

단순히 배워왔던 Design Pattern적 개념으로 접근해선 안된다.

그 보다 상위의 추상적 원칙에 가깝고, function의 명을 강제하지 않지만 signature에 의거한 최소한의 fuction을 제공해야 한다.

 

Monad는 기본적으로 아래의 연산을 function으로 제공한다.

- identity : unit(x) or pure

- bind : flatMap(d: data)(f: function) or bind

def unit[A](x: A): M[A]

trait M[A] {
  def flatMap[B](f: A => M[B]): M[B]
}

실제 Scala에서는 위의 Monad 관련 trait을 API적으로 제공하지 않는다. 

위에서 설명했듯이 디자인패턴적인 접근보다는 개념, 법칙, 원칙적 정의에 의한 접근이 필요하다.

 

unit은 특정 값을 특정 Class로 감싸는 Wrapping 함수라 할 수 있다.

실제로 Scala에서는 'unit 이라는 이름으로 이것은 Monad의 unit 함수입니다'와 같이 제공하지는 않는다. 

(Monad인 Option, List 등 어디에도 unit 함수는 없다)

 

아래와 같이 apply(..)를 활용하거나 class의 생성자 등 해당 funtion의 표현상의 제약은 없다.

// case class 생성자 활용
case class WrappedValue[+T](private val internalValue: T) {
	def get: T = synchronized {
    	internalValue
    }
    
    def flatMap[S](transformer: T => WrappedValue[S]): SafeValue[S] = syncroized {
    	trainsformer(internalValue)
    }
}

val wrappedInt = WrappedValue(1)	// unit
val wrappedString = WrappedValue("One")		//unit

// Type이 List이고 value가 1:Int 인 경우
val listInt = List(1)
listInt.flatMap(a => a) // == List(1)

다음은 flatMap인데 일단 아래의 의문에서 시작하는 것이 좋은 접근으로 보인다.

 

'flatMap이 왜 필요한가? map으로는 안되는 것인가?'

 

결론은 Type 중첩 제거와 데이터 변환 등의 로직 함수 적용시 코드 효율에 있다.

다양한 Type으로 확장시 구조적으로 모든 Type을 수용하면서 적용 가능한 코드 중복을 줄일려면..

map function으로는 어렵다.

def flatMap[B](f: (A) => U[B]): U[B]

val listInt2Str = (i: Int) => List(i, i+1)

println(List(1,2).map(listInt2Str))
//List(List(1, 2), List(2, 3))

println(List(1,2).flatMap(listInt2Str))
//List(1, 2, 2, 3)

Functor의 map 연산시 A => M[B] 로 가정하면 Input A의 length값과 M[B]의 length값은 동일하지만,

flatMap을 사용할 경우 input에 대한 output의 형태(Type, Length 등)적인 제약이 사라진다.

 

만약 monad 로직상에서 map이 필요한 경우 flatMap과 unit의 조합으로 map의 구현이 가능하다.

m map g = m flatMap (x => unit(g(x)))

즉, 기능적으로 flatMap은 map의 확장이며 이를 활용하여 자유도가 더 높은 코드의 구현이 가능해진다.

 

Functor와 Monad의 차이는 flatten(join) 연산이 있고 없고의 차이로 정리가 된다.

다시 해석하면 Monad도 Functor이다. (더 많은 기능을 제공하는..)  

 

Monad의 대수적 조건(3원칙)

Monad는 함수 합성시 연산의 대수적 특성으로 항등법칙, 결합법칙이 성립해야된다.

 

- left-identity law:

//표현1 :
unit(x).flatMap(f)==f(x)

//표현2 :
Monad(x).flatMap(f) == f(x)

//예제1 (unit을 List로 가정) :
def twoConsecutive(x: int) = List(x, x+1)
twoConsecutive(3)	// == List(3,4)
List(3).flatMap(twoConsecutive)	// == List(3,4)

- right-identiy law:

//표현1 :
m.flatMap(unit) == m

//표현2 :
Monad(x).flatMap(x => Monad(x)) == Monad(x)

//예제1 (Monad를 List로) :
List(1,2,3).flatMap(x => List(x)) // == List(1,2,3)

- associativity law:

//정의1 :
m.flatMap(f).flatMap(g) == m.flatMap(x ⇒ f(x).flatMap(g))

//정의2 :
MyMonad(x).flatMap(f).flatMap(g) == MyMonad(x).flatMap(x => f(x).flatMap(g))

//예제1 :
val numbers = List(1,2,3)
val incrementer = (x: Int) => List(x, x + 1)
val doubler = (x: Int) => List(x, 2 * x)

numbers.flatMap(incrementer).flatMap(doubler) == List(1,2,2,4,2,4,3,6,3,6,4,8)
//output: true

//연산순서
[1,2,3].flatMap(incremeter) == List(1,2, 2,3, 3,4)
[1,2,2,3,3,4].flatMap(doubler) == List(1,2,2,4,2,4,3,6,3,6,4,8)

numbers.flatMap(x => incrementer(x).flatMap(doubler)) == List(1,2,2,4,2,4,3,6,3,6,4,8)
//output: true 

//연산순서
[
  incrementer(1).flatMap(doubler),	
  // 1 => List(1,2) => (List(1,2), List(2,4)) => List(1,2,2,4)
  incrementer(2).flatMap(doubler),
  // ...
  incrementer(3).flatMap(doubler)
  // ...
]


numbers.flatMap(incrementer).flatMap(doubler) 
== numbers.flatMap(x => incrementer(x).flatMap(doubler))
//output: true

앞서 언급하였듯 이러한 대수적 특징은 모노이드(Monoid)의 특징이기도 하다. 

 

Monad 정리

 

요약 : 함수 합성을 위한 추상 개념, 대수적인 인터페이스

첨언 : 공통점이 없어 보이는 객체(List, Option, Future ..)에 동일한 로직 flow를 추상적으로 정의할 수 있고, 완전히 다른 내부 자료형을 처리하더라도 동일 코드로 처리될 수 있다.

 

1) Monad는 아래의 최소 2개의 function 조합을 제공한다.

- unit, flatMap

- unit, compose

- unit, map, join

 

2) 함수 합성시 결합법칙과 항등법칙의 성립

 

Scala에서는 별도로 Monad trait을 API적으로 제공하지는 않는다.

디자인패턴 등에 비하면 추상적인 개념원칙에 가깝고 실제 활용될 class 특성에 따라 구현 코드의 표현상의 자유도가 높아 질 수 있기 때문이다.

위에서 언급한 unit(...) 이라는 이름으로 사용되는 경우는 아주 드물다.

trait Monad[F[_]] {
  def flatMap[A,B](fa: F[A])(f: A => F[B]): F[B]
}

그래도 trait으로 정의를 하자면 위와 같은 모양이 된다. 물론 답은 여러가지 형태가 될 수 있다.

 

Monad의 활용

- Option[T]의 Monad적 활용

Option은 get, getOrElse, flatMap 등의 코드 지옥에 빠질 수 있으나,

For comprehension 적용시 코드의 가독성을 증가 시키며 코드 지옥에서 해방될 수 있다.

object UserService {
  def loadUser(name: String) =  Option(User("A", Some(User("A-1", Some(User("A-1-1", None))))))
  def loadUserOnlyChild(name: String) =  Option(User("A", Some(User("A-1", None))))
  def loadUserAlone(name: String) =  Option(User("A", None))
}

val noChild = UserService.loadUserAlone("A").flatMap(_.child).flatMap(_.child)
println(noChild) //output: None

//for comprehension (syntax sugar)
val res = for {
  user <- UserService.loadUser("A")
  userChild <- user.child
  grandChild <- userChild.child
} yield grandChild

println(res) //output: Some(User(A-1-1,None))

참고로 Option.flatMap의 구현은 아래와 같다.

  /** Returns the result of applying $f to this $option's value if
   * this $option is nonempty.
   * Returns $none if this $option is empty.
   * Slightly different from `map` in that $f is expected to
   * return an $option (which could be $none).
   *
   * This is equivalent to:
   * {{{
   * option match {
   *   case Some(x) => f(x)
   *   case None    => None
   * }
   * }}}
   *  @param  f   the function to apply
   *  @see map
   *  @see foreach
   */
  @inline final def flatMap[B](f: A => Option[B]): Option[B] =
    if (isEmpty) None else f(this.get)

 

 

- Future[T]의 Monad적 활용

DB에서 조회 쿼리를 호출했으나 해당 요청이 비동기로 처리되는 경우를 가정하자

(

실제 Scala Slick(DB util)의 모든 호출이 이러한 비동기 구조이다.

대부분의 예제 코드에서 결과값을 받아 와서 처리하는 부분은 Await.result(TableQuery.select(...)) 와 같은 비동기 호출을 동기적으로 바꾸는 식으로 많이 작성되어 있으나,

아래 Future 예제는 비동기 호출에 대한 답을 Monad로 찾을 수 있다는 답을 제시하고 있다.

)

flatMap을 활용하면 코드적으로 onComplete() 구현을 통한 callback 처리를 생략할 수 있고,

데이터 처리를 위한 operation은 future가 complete 되었을때 1회 실행된다.

즉, 아래의 for comprehension나 flatMan chain은 순차적으로 실행된다.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
trait Order
trait Item
trait PurchaseResult
trait LogResult
object OrderService {  
  def loadOrder(username: String): Future[Order] 
}
object ItemService {  
  def loadItem(order: Order): Future[Item] 
}
object PurchasingService { 
  def purchaseItem(item: Item): Future[PurchaseResult]
  def logPurchase(purchaseResult: PurchaseResult): Future[LogResult] 
}

// ---
val loadItem: Order => Future[Item] = {
  order => ItemService.loadItem(order)
}
val purchaseItem: Item => Future[PurchaseResult] = {
  item => PurchasingService.purchaseItem(item)
}

val logPurchase: PurchaseResult => Future[LogResult] = {
  purchaseResult => PurchasingService.logPurchase(purchaseResult)
}

// ---
val result = 
  OrderService.loadOrder("customerUsername")
  .flatMap(loadItem)
  .flatMap(purchaseItem)
  .flatMap(logPurchase)
  
val result2 =
  for {
    loadedOrder    <- orderService.loadOrder(“customerUsername”)
    loadedItem     <- itemService.loadItem(loadedOrder)
    purchaseResult <- purchasingService.purchaseItem(loadedItem)
    logResult      <- purchasingService.logPurchase(purchaseResult)
  } yield logResult
  
println(result == result2)	//output: true

참고로 Future.flatMap의 정의는 아래와 같이 생겼다.

/* Creates a new future by applying a function to the successful result of the function
as the new future. If this future is completed with an exception then the new future 
will also contain this exception.

Example:

  val f = Future { 5 }
  val g = Future { 3 }
  val h = for {
    x: Int <- f // returns Future(5)
    y: Int <- g // returns Future(3)
  } yield x + y
  
is translated to:

  f flatMap { (x: Int) => g map { (y: Int) => x + y } }
  
Params:
f – the function which will be applied to the successful result of this Future
Type parameters:
S – the type of the returned Future
Returns:
a Future which will be completed with the result of the application of the function
*/

def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = 
transformWith {
t =>
  if(t.isInstanceOf[Success[T]]) f(t.asInstanceOf[Success[T]].value)
  else this.asInstanceOf[Future[S]] // Safe cast
}

 

ZIO http 를 활용하여 db의 내용을 조회하여 응답하는 API를 개발하게 될 경우 Monad적으로 Controller의 코드를 작성하면 아래와 같은 형태가 된다.

    (for {
        // validate user
        _    <- MyAuthService.doAuth(request)
        // log request
        _    <- logRequest(request)
        // core business logic
        user <- dbService.lookupUsersById(id).map(Response.json(_.json))
        resp <- Response.json(user.toJson)
        // log response
        _    <- logResponse(resp)                
    } yield resp)
            .timeout(2.seconds)
            .retryN(5)

 

더 봐야 될 것들 (TBD, 지친다)

Free Monad

IO Monad

 

 

[좋았던 참고 자료]

https://medium.com/free-code-camp/demystifying-the-monad-in-scala-cc716bb6f534

https://www.youtube.com/watch?v=d-dy1x33moA 

https://www.youtube.com/watch?v=a0C-RrncrYA 

 

Crawl가 수집한 데이터를 임시로 Mysql에 저장하고 있고,

수집 데이터를 SparkStreaming으로 실시간으로 분석해야 하는 경우를 가정하자.

 

[Custom Receiver의 구현]

주기 시간에 따라 Table의 데이터를 Select하는 구현이 필요하다.

SparkStreaming 상에서 처리가 끝난 데이터가 다시 조회되어야 할 일은 없어야 하기에 가장 최근에 조회된 max(id)값을 저장/갱신한다.

class MySqlSourceReceiver(val seedNo : Long) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2)
  with Logging {

  var latestCrawlNo = 0L

  override def onStart(): Unit = {
    new Thread("MysqlSt") {
      override def run(): Unit = {
        createGetData
      }
    }.start()
  }

  override def onStop(): Unit = synchronized {
//    this.db.close()
  }

  private def createGetData(): Unit = {
    while(!isStopped) {
      try {
        val res = DbUtil.getLatestContextFrom(seedNo, latestCrawlNo)
        println(s"Count of crawled data : ${res.size} for ${seedNo}")

        val mergedRes = DbUtil.getLatestContextFrom(seedNo, latestCrawlNo).mkString("\n\n")
        println(mergedRes)
        store(mergedRes)

        latestCrawlNo = DbUtil.latestCrawlNo(seedNo)
        println(s"Update Point ${latestCrawlNo}")

        Thread.sleep(5000)
      } catch {
        case e: Exception => e.printStackTrace()
      }
    }
  }
}

[DBUtil의 구현]

아래와 같이 DB 조회부분을 위의 Receiver에서 분리해야 한다.

Receiver상에서 구현할 경우 대부분의 경우 Serializable Exception이 발생한다.

object DbUtil {
  protected implicit def executor = scala.concurrent.ExecutionContext.Implicits.global
  val db : Database = Database.forURL(url ="jdbc:mysql://localhost:3306/horus?useSSL=false",
    user="root", password="18651865", driver = "com.mysql.jdbc.Driver")

  val getLatestAnchorWithLimit = (seedNo: Long, startCrawlNo: Long, limit : Int) =>
    Await.result(db.run(CrawledRepo.findLatestAnchor(seedNo, startCrawlNo, limit).result), 10.seconds)

  val getMaxCrawlNo = (seedNo: Long) => Await.result(db.run(CrawledRepo.findLatestCrawlNo(seedNo).result), 10.seconds).getOrElse(0L)

  def latestCrawlNo(seedNo: Long) : Long = {
    Await.result(db.run(CrawledRepo.findLatestCrawlNo(seedNo).result), 10.seconds).getOrElse(0L)
  }

  def getLatestAnchorFrom(seedNo: Long, startCrawlNo: Long) = {
    Await.result(db.run(CrawledRepo.findLatestAnchor(seedNo, startCrawlNo, 100).result), 10.seconds)
  }

  def getLatestContextFrom(seedNo: Long, startCrawlNo: Long) = {
    Await.result(db.run(CrawledRepo.findLatestContent(seedNo, startCrawlNo, 100).result), 10.seconds)
  }
}

[Slick 구현]

일반적인 Slick의 구현과 동일하게 하면 된다.

적절한 구간을 분리만 잘 해주면 SparkStreaming 상에서 별 무리 없이 잘 동작한다.

object CrawledRepo {
  case class CrawlUnit(
                        crawlNo: Long = -1L,
                        url: Option[String] = None,
                        anchorText: Option[String] = None,
                        anchorImg: Option[String] = None,
                        // ...
                        pageTitle: Option[String] = None,
                        parsedPageDate: Option[Timestamp] = None
                      )

  class CrawlUnitTableSchema(tag: Tag) extends Table[CrawlUnit](tag, None,"CRAWL_UNIT1") {
    def crawlNo = column[Long]("CRAWL_NO", O.PrimaryKey, O.AutoInc)
    def url = column[Option[String]]("URL")
    def anchorText = column[Option[String]]("ANCHOR_TEXT")
	// ...
    def pageTitle = column[Option[String]]("PAGE_TITLE")
    def parsedPageDate = column[Option[Timestamp]]("PARSED_PAGE_DATE")

    def * = (crawlNo, url, anchorText, anchorImg, status, seedNo, pageDate, regDate,
      updDate, pageText, pageTitle, parsedPageDate) <> (CrawlUnit.tupled, CrawlUnit.unapply)
  }

  val crawlUnitsQuery = TableQuery[CrawlUnitTableSchema]
  val createSchemaAction = (crawlUnitsQuery.schema).create

  def findLatestAnchor(seedNo: Long) = crawlUnitsQuery
    .filter(_.seedNo === seedNo).filter(_.status === "SUCC")
    .sortBy(_.crawlNo.desc).drop(0).take(20).map(_.anchorText)

  def findLatestCrawlNo(seedNo1: Long) = crawlUnitsQuery.filter(_.seedNo === seedNo1)
    .filter(_.status === "SUCC").map(_.crawlNo).max

  def getCrawledData(crawlNo: Long) = crawlUnitsQuery.filter(_.crawlNo === crawlNo)
}

[SparkStreaming App 구현]

아래와 같이 sss.receiverStream 을 통해 직접 구현한 Receiver를 설정해 주면 된다.

object CrawledProcessing extends SparkStreamingInit {

  def processCrawled(seedId : Long) = {
    val anchors = ssc.receiverStream(new MySqlSourceReceiver(seedId))
    val words = anchors.flatMap(anchor => {
      HangleTokenizer().arrayNouns(anchor)
    })

    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.foreachRDD(rdd => {
      rdd.foreach(tf => {
        InfluxClient.writeTf(seedId, tf._1, tf._2)
      })
    })
	
    // ...
  }

긁어온 신문기사의 단어를 Count하는 모듈이 완성되었다.

+ Recent posts