-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathForwardActor.scala
242 lines (201 loc) · 9.99 KB
/
ForwardActor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package service.actors
import akka.actor.{Actor, Props}
import akka.pattern.{ask, pipe}
import models.billotei.BasicUser
import models.billotei.BasicUser.VeryBasicUser
import models.billotei.places.Place
import models.billotei.relationships.{BasicPlaceUser, PlaceUser}
import models.billotei.users.{User, UserReach}
import models.intercom.{ConversationInit, IntercomMessage, Tag}
import models.{EventResponse, Message}
import play.Logger
import play.api.libs.json.{JsError, JsSuccess}
import service.actors.IntercomActor._
import service.akkaAskTimeout
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
object ForwardActor {
def props = Props[ForwardActor]
case class Forward[T](msg: Message[T])
case class Answer(result: Try[EventResponse])
}
class ForwardActor extends Actor {
import ForwardActor._
//implicit val timeout = Timeout(10 seconds)
/**
* Main forward method to the clients, so far only intercom, more to be added...
*
* @return
*/
def receive = {
case Forward(msg: Message[_]) =>
msg.event match {
// When a user creates a place
case "placeuser-creation" => (msg.payload \ "user").validate(User.userReads) match {
case u: JsSuccess[User] => (msg.payload \ "place").validate(Place.placeReads(msg.payload)) match {
case p: JsSuccess[Place] => forwardAndAskIntercom(PlaceUserMessage(u.value, p.value), msg.event)
case e: JsError => Logger.error(s"Place invalid ${e.toString}", new Exception(e.errors.mkString(";")))
}
case e: JsError => Logger.error(s"User invalid ${e.toString}", new Exception(e.errors.mkString(";")))
}
// When owner or centralAppAdmin deletes a place, that is what happens: all place user relationships deleted
case "all-placeusers-deletion" => (msg.payload \ "place" \ "id").validate[Long] match {
case JsSuccess(placeId: Long, _) => forwardAndAskIntercom(DeleteAllPlaceUsersMessage("", placeId), msg.event)
case e: JsError => Logger.error(s"Place invalid ${e.errors.mkString(";")}")
}
case "placeuser-deletion" =>
implicit val contextPayload = msg.payload
msg.payload.validate[PlaceUser] match {
case placeUser: JsSuccess[PlaceUser] => forwardAndAskIntercom(
PlaceUserMessage(placeUser.value.user, placeUser.value.place, removeRelationship = true),
msg.event
)
case e: JsError => Logger.error(s"PlaceUser invalid ${e.toString}", new Exception(e.errors.mkString(";")))
}
// On place update
case "place-update" => (msg.payload \ "place").validate(Place.placeReads(msg.payload)) match {
case p: JsSuccess[Place] => forwardAndAskIntercom(PlaceMessage(p.value), msg.event)
case e: JsError => Logger.error(s"Place invalid ${e.toString}", new Exception(e.errors.mkString(";")))
}
// When a user reaches us with place data that we don't already have in CentralApp
case "basic-placeuser-creation" =>
// No json payload atm (internal msg)
msg.optPayloadObj match {
case Some(placeUserPayload: BasicPlaceUser) =>
forwardAndAskIntercom(BasicPlaceUserMessage(placeUserPayload), msg.event)
case _ => Logger.error("BasicPlaceUser payload invalid")
}
// On user creation or update
case "user-creation" | "user-update" => (msg.payload \ "user").validate(User.userReads) match {
case u: JsSuccess[User] => forwardAndAskIntercom(
UserMessage(u.value, msg.event == "user-update"),
msg.event
)
case e: JsError => Logger.error(s"User invalid ${e.toString}", new Exception(e.errors.mkString(";")))
}
// On user login or when he asks for place verification
case "user-login" | "verification-request" =>
// See if we need a paypload for this event or if it is possible to use only one case "event"
(msg.payload \ "user").validate(User.userReads) match {
case u: JsSuccess[User] =>
forwardAndAskIntercom(
EventMessage(
msg.event,
(msg.payload \ "created_at").asOpt[Long].getOrElse(System.currentTimeMillis / 1000),
u.value,
(msg.payload \ "place").asOpt(Place.placeReads(msg.payload))
),
msg.event
)
case e: JsError => Logger.error(s"User invalid ${e.toString}", new Exception(e.errors.mkString(";")))
}
// On user or lead contact
case "user-contact" | "lead-contact" =>
// No json payload atm (internal msg)
msg.optPayloadObj match {
case Some(contactPayload: ConversationInit) =>
// So far only intercom conversation contact
forwardAndAskIntercom(ConversationInitMessage(contactPayload), msg.event)
case _ => Logger.error(s"${msg.event} payload invalid")
}
// On lead creation/contact with or without place data
case "lead-creation" =>
// No json payload atm (internal msg)
msg.optPayloadObj match {
case Some(placeUserPayload: BasicPlaceUser) =>
// So far only intercom leads
forwardAndAskIntercom(LeadMessage(placeUserPayload.user, Some(placeUserPayload)), msg.event)
case Some(userPayload: BasicUser) => forwardAndAskIntercom(LeadMessage(userPayload), msg.event)
case _ => Logger.error("lead-creation payload invalid")
}
// When a user tried to reach us (with whatever data and mainly through contact form), we store this
// with that event, due to intercom lead endpoints design, the lead-reach is done under the hood inside intercom actor atm
case "user-reach" =>
// No json payload atm (internal msg)
msg.optPayloadObj match {
case Some(userPayload: UserReach) =>
// So far we only use intercom tagging system for that
forwardAndAskIntercom(TagMessage(Tag(userPayload.subject, List(userPayload.basicUser))), msg.event)
case _ => Logger.error("user-reach payload invalid")
}
// Intercom dedicated event here that's not the best place for that but it's a single shot event
// implemented here to avoid messing the ctrl upper layers with forwarding events directly to intercom actor
case "intercom-users-update" =>
msg.payload.validate[List[VeryBasicUser]] match {
case userList: JsSuccess[List[VeryBasicUser]] =>
Logger.info("Processing users page " + (msg.payload \ "page").asOpt[Int].map(_.toString).getOrElse("unknown"))
context.actorOf(IntercomActor.props) ! BulkUserIdUpdate(userList.value)
case e: JsError => Logger.error(s"User list invalid ${e.toString}", new Exception(e.errors.mkString(";")))
}
case "users-update" =>
val page = (msg.payload \ "page").asOpt[Int].map(_.toString).getOrElse("unknown")
msg.payload.validate[List[User]] match {
case userList: JsSuccess[List[User]] =>
Logger.info("Processing users page " + page)
context.actorOf(IntercomActor.props) ! BulkUserUpdate(userList.value)
case e: JsError => Logger.error(s"For page $page: User list invalid ${e.toString}", new Exception(e.errors.mkString(";")))
}
// Update multiple place-users at a time
case "placeusers-update" =>
implicit val contextPayload = msg.payload
Logger.info("Processing placeusers page " + (msg.payload \ "page").asOpt[Int].map(_.toString).getOrElse("unknown"))
(msg.payload \ "placeusers").asOpt[List[PlaceUser]].orElse(
{
Logger.error("placeusers parsing failed")
None
}
).foreach(_.foreach(
pu =>
Logger.debug("place: " + pu.place.placePart1.centralAppId.toString + " - user: " + pu.user.centralAppId.toString)
))
(msg.payload \ "placeusers").validate[List[PlaceUser]] match {
case placeUserList: JsSuccess[List[PlaceUser]] =>
forwardAndAskIntercom(BulkPlaceUserUpdate(placeUserList.value), msg.event)
case e: JsError => Logger.error(s"PlaceUser list invalid ${e.toString}", new Exception(e.errors.mkString(";")))
}
case _ => Logger.warn(s"Service ${msg.event} not implemented yet")
}
case Answer(result) => result match {
case Success(resp) => Logger.info(resp.body)
case Failure(fail) => Logger.error(fail.getMessage)
}
}
/**
* Forwards and ask response to intercom in a non blocking way
* @param message: the intercom message to forward
* @param eventName: the event name to forward
* @tparam T: subtype of intercom message
*/
def forwardAndAskIntercom[T <: IntercomMessage](message: T, eventName: String) = {
Logger.debug(s"Forwarding $eventName to intercom...")
import context.dispatcher
(context.actorOf(IntercomActor.props) ? message).map {
res => Try(res.asInstanceOf[EventResponse]) map {
response =>
Logger.info(response.body)
response.body
} getOrElse {
res match {
case Failure(err) =>
Logger.error(
s"ForwardActor to Intercom did not succeed: ${err.getMessage} for message: $message",
err
)
}
}
} pipeTo sender
/*map {
res => Try(res.asInstanceOf[EventResponse]) map {
response => Logger.info(response.body)
} getOrElse {
res match {
case Failure(err) => Logger.error(
s"ForwardActor to Intercom did not succeed: ${err.getMessage} for message: ${message.toString}",
err
)
case _ => Logger.error(s"ForwardActor to Intercom did not succeed for unknown reason for message: ${message.toString}")
}
}
}*/
}
}