diff --git a/misp/client/src/main/scala/org/thp/misp/client/Base64Flow.scala b/misp/client/src/main/scala/org/thp/misp/client/Base64Flow.scala index 99e0dc386b..7840d72535 100644 --- a/misp/client/src/main/scala/org/thp/misp/client/Base64Flow.scala +++ b/misp/client/src/main/scala/org/thp/misp/client/Base64Flow.scala @@ -19,7 +19,8 @@ class Base64EncoderFlow extends GraphStage[FlowShape[ByteString, ByteString]] { val encoder: Base64.Encoder = Base64.getEncoder override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - var remainingBytes: ByteString = ByteString.empty + var remainingBytes: ByteString = ByteString.empty + var upstreamIsFinished: Boolean = false setHandler( in, @@ -35,15 +36,21 @@ class Base64EncoderFlow extends GraphStage[FlowShape[ByteString, ByteString]] { } } - override def onUpstreamFinish(): Unit = { - push(out, ByteString(encoder.encode(remainingBytes.toArray))) - completeStage - } + override def onUpstreamFinish(): Unit = + upstreamIsFinished = true + } + ) + setHandler( + out, + new OutHandler { + override def onPull(): Unit = + if (!upstreamIsFinished) + pull(in) + else { + push(out, ByteString(encoder.encode(remainingBytes.toArray))) + completeStage + } } ) - setHandler(out, new OutHandler { - override def onPull(): Unit = - pull(in) - }) } }