Часть 10. Akka Streams и реактивные потоки

Страничка курса: https://maxcom.github.io/scala-course-2020/

План

  1. Реактивные потоки
  2. Akka Streams: базовые элементы
  3. Обзор готовых операторов
  4. Обработка ошибок
  5. Интеграция

Реактивные потоки

Преобразования потоков данных.

  • ETL - Extract, Transform, Load
  • Поточные запросы и ответы HTTP, прокси
  • Выборки, статистики, health check

Асинхронная обработка

  • Возможности сложных схем
  • Работа со сбоями и отзывчивости
  • Эффективность

Возникает проблемы:

  • Регулировка скорости producer
  • API для интеграции

При синхронной обработке скорость
регулируется автоматически.

Back pressure можно делать на акторах, но сложно.

Reactive streams - API для интеграции, не для конечного пользователя.

https://www.reactive-streams.org/

  • Akka Streams и Akka HTTP (+Play)
  • Netty - http клиенты, драйверы СУБД и очередей
  • Spring 5, ряд сервлетных веб-серверов
  • RxJava
  • Vert.x

В идеальном мире нам нужно писать только преобразование.

Позже посмотрим интеграцию с другим асинхронным кодом.

Akka Streams: базовые элементы

  • Source - источник (producer)
  • Flow - цепочка преобразования
  • Sink - назначение (consumer)

https://doc.akka.io/docs/akka/current/stream/stream-composition.html

Два API для сборки цепочек:

  • Обычный, похожий на API коллекций
  • GraphDSL для сложных схем
    (не рассматриваем сегодня)

Пример - замена Future.sequence


def func(x: Int): Future[Int] = ???
val input: Seq[Int] = Seq.range(1, 10000)

val output: Future[Seq[Int]] = Future.sequence(input.map(func))
					

проблема - неограниченный параллелизм, перегрузка ExecutionContext

Используем Akka Streams

libraryDependencies += 
  "com.typesafe.akka" %% "akka-stream" % "2.6.4"
					

версия должна соответствовать версии Akka


// классическая ActorSystem (можно и typed)
implicit val actorSystem: ActorSystem = ActorSystem()

def func(x: Int): Future[Int] = ???
val input: List[Int] = List.range(1, 10000)

val output: Future[Seq[Int]] = 
  Source(input).mapAsync(10)(func).runWith(Sink.seq)
                      // 10 потоков
					

Materialization - запуск интерпретации.

Например runWith, но есть и другие варианты.


val source: Source[Int, NotUsed] = Source(input)
// final class Source[+Out, +Mat]
					

Mat - materialized value

Пример Source с materialized value


val source: Source[Int, ActorRef] = 
  Source.actorRef[Int](1000, OverflowStrategy.dropTail)
					

ActorRef возникает при материализации.


val flow: Flow[Int, Int, NotUsed] = Flow[Int].mapAsync(10)(func)
val sink: Sink[Int, Future[Seq[Int]]] = Sink.seq

val output: Future[Seq[Int]] = source.via(flow).runWith(sink)
					

Обзор готовых операторов

Более 100 операторов

  • map[T](f: Out => T)
  • mapConcat[T](f: Out => immutable.Iterable[T])
  • filter(p: Out => Boolean)
  • mapAsync[T](parallelism: Int)(f: Out => Future[T])
  • mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T])

flatMap?

  • flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M])
  • flatMapMerge[T, M](breadth: Int, f: Out => Graph[SourceShape[T], M])

Группировки

  • grouped(n: Int)
  • groupedWithin(n: Int, d: FiniteDuration)

есть еще аналоги с "весом" элемента

Группировка:


def processBatch(v: Seq[Int]): Future[Seq[Int]] = ???

source
  .groupedWithin(1000, 1 minute) // до 1000, в течении минуты
  .mapAsync(16)(processBatch)
  .mapConcat(identity) // поток Seq[Int] в поток Int
  .runWith(Sink.ignore)
					

обработка "пачками" часто эффективнее

Ограничение скорости

throttle(elements: Int, per: FiniteDuration)

Несколько источников:


val source1 = Source(Seq.range(1, 1000))
val source2 = Source(Seq.range(1, 1000))

// один за другим
source1.concat(source2)

// по 10 из каждого по порядку
source1.interleave(source2, 10)

// в порядке готовности
source1.intersperse(source2)
					

Sink

  • ignore: Sink[Any, Future[Done]]
  • seq[T]: Sink[T, Future[immutable.Seq[T]]]
  • foreach[T](f: T => Unit): Sink[T, Future[Done]]
  • fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]]

Обработка ошибок

Stream завершается при возникновении ошибки.

Можно работать с Try в потоке.

Рестарт при сбоях


RestartSource.onFailuresWithBackoff(
  minBackoff = 100 millis, 
  maxBackoff = 10 minutes, 
  randomFactor = 0.2)(() => source)
					

бывают еще RestartFlow и RestartSink

Интеграция

Интеграция через Source.queue

val source: Source[Int, SourceQueueWithComplete[Int]] =
  Source.queue[Int](bufferSize = 1000, 
                    OverflowStrategy.backpressure)
// backpressure или выбрасываем элементы 
val queue: SourceQueueWithComplete[Int] = source
                  .to(Sink.foreach(println))
                  .run() 
// очередь - "материализованное" значение
val result: Future[QueueOfferResult] = queue.offer(1000)
// при backpressure нужно ждать вычисления Future

Source.unfold


// числа Фибоначчи
val fib: Source[Int, NotUsed] = Source.unfold(0 -> 1) {
  case (a, _) if a > 10000000 => None
  case (a, b) => Some((b -> (a + b)) -> a)
}
					

есть асинхронный аналог - unfoldAsync

Интеграция с акторами


libraryDependencies += 
  "com.typesafe.akka" %% "akka-stream-typed" % "2.6.4"
					
  • ActorSource.actorRef - без backpressure, аналог очереди
  • ActorSource.actorRefWithBackpressure
  • ActorSink.actorRef
  • ActorSink.actorRefWithBackpressure

// T - тип значения
// M - протокол актора
// A - тип Ack
def actorRefWithBackpressure[T, M, A](
    ref: ActorRef[M],
    messageAdapter: (ActorRef[A], T) => M,
    onInitMessage: ActorRef[A] => M,
    ackMessage: A,
    onCompleteMessage: M,
    onFailureMessage: Throwable => M): Sink[T, NotUsed]
					

Напоминаю: