Skip to content

Commit

Permalink
AL-131: Add streaming read operations
Browse files Browse the repository at this point in the history
 - Adds ftp filestore tests
 - Adds ftpserver docker container
  • Loading branch information
cccs-samp committed Dec 15, 2020
1 parent 336b47e commit 9246b4a
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 25 deletions.
18 changes: 13 additions & 5 deletions assemblyline/filestore/transport/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import os
import posixpath
import queue
import threading
import time
from io import BytesIO

Expand Down Expand Up @@ -229,16 +231,22 @@ def __init__(self, filepath, filequeue, retrMethod):
self.filepath = filepath
self.filequeue = filequeue
self.retrMethod = retrMethod
self.readThread = None
self.generator = None

def close(self):
pass

def read(self, chunk_size = 8192):
if self.readthread is None:
self.readthread = threading.Thread(target=self.retrMethod(chunk_size), daemon=True)
self.readthread.start()
def read(self, chunk_size=8192):
if self.generator is None:
self.generator = self.getGen(chunk_size=chunk_size)
return next(self.generator)

def getGen(self, chunk_size=8192):
self.readThread = threading.Thread(target=self.retrMethod(chunk_size), daemon=True)
self.readThread.start()
chunk = self.filequeue.get()
if chunk is not None:
yield chunk
else:
return
raise StopIteration()
5 changes: 4 additions & 1 deletion assemblyline/filestore/transport/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,12 @@ def read(self, path):
class TransportReadStreamHTTP(TransportReadStream):
def __init__(self, response):
self.response = response
self.iter = None

def close(self):
self.response.close()

def read(self, chunk_size=1024):
return next(self.response.iter_content(chunk_size=chunk_size))
if self.iter is None:
self.iter = self.response.iter_content(chunk_size=chunk_size)
return next(self.iter)
17 changes: 2 additions & 15 deletions assemblyline/filestore/transport/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from assemblyline.common.exceptions import ChainAll
from assemblyline.common.uid import get_random_id
from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream
from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path


@ChainAll(TransportException)
Expand Down Expand Up @@ -131,7 +131,7 @@ def read(self, path):
fh = None
try:
fh = open(path, "rb")
return TransportReadStreamLocal(fh)
return fh
finally:
if fh:
fh.close()
Expand All @@ -150,16 +150,3 @@ def _join(base, path):
if base is None:
return path
return os.path.join(base, path.lstrip("/")).replace("\\", "/")

class TransportReadStreamLocal(TransportReadStream):
def __init__(self, file):
self.file = file

def close(self):
self.file.close()

def read(self, chunk_size = -1):
if chunk_size > 0:
return self.file.read(chunk_size)
else:
return self.file.read()
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,4 @@ def read(self, chunk_size):
if chunk_size > 0:
return self.file.read(chunk_size)
else:
return self.file.read()
return self.file.read()
10 changes: 10 additions & 0 deletions pipelines/azure-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ resources:
DISCOVERY_TYPE: 'single-node'
ports:
- 9200:9200
- container: ftpserver
image: fauria/vsftpd
environment:
- FTP_USER=al_test_user
- FTP_PASS=password
ports:
- '21:21'
- '20:20'
- '21100-21110:21100-21110'

stages:
- stage: build
Expand Down Expand Up @@ -55,6 +64,7 @@ stages:
services:
elasticsearch: elasticsearch
redis: redis
ftpserver: ftpserver
steps:
- checkout: none
- task: UsePythonVersion@0
Expand Down
10 changes: 10 additions & 0 deletions pipelines/azure-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ resources:
DISCOVERY_TYPE: 'single-node'
ports:
- 9200:9200
- container: ftpserver
image: fauria/vsftpd
environment:
- FTP_USER=al_test_user
- FTP_PASS=password
ports:
- '21:21'
- '20:20'
- '21100-21110:21100-21110'

jobs:
- job: run_test
Expand All @@ -33,6 +42,7 @@ jobs:
services:
elasticsearch: elasticsearch
redis: redis
ftpserver: ftpserver

steps:
- task: UsePythonVersion@0
Expand Down
7 changes: 4 additions & 3 deletions test/test_filestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def test_http():
tempf = tempfile.NamedTemporaryFile()
fs.download('assembyline', tempf.name)
assert open(tempf.name, 'r').read() is not None
assert fs.read('assemblyline').read(chunk_size=24) is not None
httpObject = fs.read('assemblyline')
assert httpObject.read(chunk_size=32) is not None

def test_https():
"""
Expand Down Expand Up @@ -66,8 +67,8 @@ def test_ftp():
# fs = FileStore('ftp://demo:[email protected]')
assert fs.exists('readme.txt') != []
assert fs.get('readme.txt') is not None
asdf = fs.read('readme.txt')
assert asdf.read() is not None
ftpfile = fs.read('readme.txt')
assert ftpfile.read() is not None


# def test_ftps():
Expand Down

0 comments on commit 9246b4a

Please sign in to comment.