-
Notifications
You must be signed in to change notification settings - Fork 640
/
Copy pathStreamCtrl.scala
136 lines (123 loc) · 5.13 KB
/
StreamCtrl.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package controllers
import javax.inject.{Inject, Singleton}
import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.{DurationLong, FiniteDuration}
import scala.util.Random
import play.api.http.Status
import play.api.libs.json.Json
import play.api.libs.json.Json.toJsFieldJsValueWrapper
import play.api.mvc._
import play.api.{Configuration, Logger}
import akka.actor.{ActorIdentity, ActorSystem, Identify, Props}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Put, Send}
import akka.pattern.{ask, AskTimeoutException}
import akka.util.Timeout
import models.Roles
import services.StreamActor
import services.StreamActor.StreamMessages
import org.elastic4play.controllers._
import org.elastic4play.services.{AuxSrv, EventSrv, MigrationSrv, UserSrv}
import org.elastic4play.Timed
@Singleton
class StreamCtrl(
cacheExpiration: FiniteDuration,
refresh: FiniteDuration,
authenticated: Authenticated,
renderer: Renderer,
eventSrv: EventSrv,
userSrv: UserSrv,
auxSrv: AuxSrv,
migrationSrv: MigrationSrv,
components: ControllerComponents,
implicit val system: ActorSystem,
implicit val ec: ExecutionContext
) extends AbstractController(components)
with Status {
@Inject() def this(
configuration: Configuration,
authenticated: Authenticated,
renderer: Renderer,
eventSrv: EventSrv,
userSrv: UserSrv,
auxSrv: AuxSrv,
migrationSrv: MigrationSrv,
components: ControllerComponents,
system: ActorSystem,
ec: ExecutionContext
) =
this(
configuration.getMillis("stream.longpolling.cache").millis,
configuration.getMillis("stream.longpolling.refresh").millis,
authenticated,
renderer,
eventSrv,
userSrv,
auxSrv,
migrationSrv,
components,
system,
ec
)
private val streamLength = 20
private lazy val logger = Logger(getClass)
private val mediator = DistributedPubSub(system).mediator
private val alphanumeric: immutable.IndexedSeq[Char] = ('a' to 'z') ++ ('A' to 'Z') ++ ('0' to '9')
private def generateStreamId() = Seq.fill(streamLength)(alphanumeric(Random.nextInt(alphanumeric.size))).mkString
private def isValidStreamId(streamId: String): Boolean = streamId.length == streamLength && streamId.forall(alphanumeric.contains)
/**
* Create a new stream entry with the event head
*/
@Timed("controllers.StreamCtrl.create")
def create: Action[AnyContent] = authenticated(Roles.read) {
val id = generateStreamId()
val streamActor = system.actorOf(Props(classOf[StreamActor], cacheExpiration, refresh), s"stream-$id")
logger.debug(s"Register stream actor $streamActor")
mediator ! Put(streamActor)
Ok(id)
}
/**
* Get events linked to the identified stream entry
* This call waits up to "refresh", if there is no event, return empty response
*/
@Timed("controllers.StreamCtrl.get")
def get(id: String): Action[AnyContent] = Action.async { implicit request ⇒
implicit val timeout: Timeout = Timeout(refresh + 1.second)
if (!isValidStreamId(id)) {
Future.successful(BadRequest("Invalid stream id"))
} else {
val futureStatus = authenticated.expirationStatus(request) match {
case ExpirationError if !migrationSrv.isMigrating ⇒
userSrv.getInitialUser(request).recoverWith { case _ ⇒ authenticated.getFromApiKey(request) }.map(_ ⇒ OK)
case _: ExpirationWarning ⇒ Future.successful(220)
case _ ⇒ Future.successful(OK)
}
// Check if stream actor exists
mediator
.ask(Send(s"/user/stream-$id", Identify(1), localAffinity = false))(Timeout(2.seconds))
.flatMap {
case ActorIdentity(1, Some(_)) ⇒
futureStatus.flatMap { status ⇒
(mediator ? Send(s"/user/stream-$id", StreamActor.GetOperations, localAffinity = false)) map {
case StreamMessages(operations) ⇒ renderer.toOutput(status, operations)
case m ⇒ InternalServerError(s"Unexpected message : $m (${m.getClass})")
}
}
case _ ⇒ Future.successful(renderer.toOutput(NOT_FOUND, Json.obj("type" → "StreamNotFound", "message" → s"Stream $id doesn't exist")))
}
.recover {
case _: AskTimeoutException ⇒ renderer.toOutput(NOT_FOUND, Json.obj("type" → "StreamNotFound", "message" → s"Stream $id doesn't exist"))
}
}
}
@Timed("controllers.StreamCtrl.status")
def status = Action { implicit request ⇒
val status = authenticated.expirationStatus(request) match {
case ExpirationWarning(duration) ⇒ Json.obj("remaining" → duration.toSeconds, "warning" → true)
case ExpirationError ⇒ Json.obj("remaining" → 0, "warning" → true)
case ExpirationOk(duration) ⇒ Json.obj("remaining" → duration.toSeconds, "warning" → false)
}
Ok(status)
}
}