提问者:小点点

使用Akka Http转换Slick Streaming数据并发送分块响应


目的是从数据库中流式传输数据,对这块数据执行一些计算(此计算返回某个案例类的Future),并将此数据作为分块响应发送给用户。目前,我能够在不执行任何计算的情况下流式传输数据并发送响应。但是,我无法执行此计算,然后流式传输结果。

这是我执行的路线。

def streamingDB1 =
path("streaming-db1") {
  get {
    val src = Source.fromPublisher(db.stream(getRds))
    complete(src)
  }
}

函数getRds返回映射到案例类的表的行(使用slick)。现在考虑函数计算,它将每一行作为输入并返回另一个案例类的Future。类似

def compute(x: Tweet) : Future[TweetNew] = ?

如何在变量src上实现此函数并将此计算的分块响应(作为流)发送给用户。


共2个答案

匿名用户

您可以使用scala. concurrent.Future[T])转换源代码:FlowOps.this.Repr[T]"rel="no117 rer">mapAsync

val src =
  Source.fromPublisher(db.stream(getRds))
        .mapAsync(parallelism = 3)(compute)

complete(src)

根据需要调整并行级别。

请注意,您可能需要配置一些设置,如Slick留档中所述:

注意:某些数据库系统可能需要以某种方式设置会话参数以支持流式传输,而无需在客户端将所有数据一次缓存在内存中。例如,PostgreSQL需要. withStatementParameters(rsType=ResultSetType.ForwardOnly,rsConcurrency=ResultSetConcurrency.ReadOnly,fetchSize=n)(具有所需的页面大小n)和.transactily才能进行正确的流式传输。

因此,例如,如果您使用的是PostgreSQL,那么您的Source可能如下所示:

val src =
  Source.fromPublisher(
    db.stream(
      getRds.withStatementParameters(
        rsType = ResultSetType.ForwardOnly,
        rsConcurrency = ResultSetConcurrency.ReadOnly,
        fetchSize = 10
      ).transactionally
    )
  ).mapAsync(parallelism = 3)(compute)

匿名用户

您需要有一种方法来编组TweetNew,并且如果您发送长度为0的块,客户端可能会关闭连接。

此代码适用于curl:

case class TweetNew(str: String)

def compute(string: String) : Future[TweetNew] = Future {
  TweetNew(string)
}

val route = path("hello") {
  get {
    val byteString: Source[ByteString, NotUsed] = Source.apply(List("t1", "t2", "t3"))
      .mapAsync(2)(compute)
      .map(tweet => ByteString(tweet.str + "\n"))
    complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, byteString))
  }
}