Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Al 131 add streaming read #33

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions assemblyline/filestore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@ def put(self, dst_path, content, location='all', force=False):
'exist for %s on %s (%s)' % (dst_path, location, t))
return transports

@elasticapm.capture_span(span_type='filestore')
def read(self, path, location='any'):
for t in self.slice(location):
try:
if t.exists(path):
return t.read(path)
except Exception as ex:
trace = get_stacktrace_info(ex)
self.log.warning('Transport problem: %s', trace)

def slice(self, location):
start, end = {
'all': (0, len(self.transports)),
Expand Down
25 changes: 22 additions & 3 deletions assemblyline/filestore/transport/azure.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import logging
import os
import time
from io import BytesIO

# noinspection PyProtectedMember
from azure.core.exceptions import *
from azure.storage.blob import BlobServiceClient
from io import BytesIO

from assemblyline.common.exceptions import ChainAll
from assemblyline.filestore.transport.base import Transport, TransportException

from assemblyline.filestore.transport.base import Transport, TransportException, TransportReadStream

"""
This class assumes a flat file structure in the Azure storage blob.
Expand Down Expand Up @@ -143,6 +142,15 @@ def get(self, path):
blob_data.readinto(my_blob)
return my_blob.getvalue()

def read(self, path):
key = self.normalize(path)
my_blob = BytesIO()

blob_client = self.service_client.get_blob_client(self.blob_container, key)
blob_data = self.with_retries(blob_client.download_blob)

return TransportReadStreamAzure(blob_data.chunks())

def put(self, dst_path, content):
if self.read_only:
raise TransportException("READ ONLY TRANSPORT: Method not allowed")
Expand All @@ -158,3 +166,14 @@ def put(self, dst_path, content):
except TransportException as error:
if not isinstance(error.cause, ResourceExistsError):
raise


class TransportReadStreamAzure(TransportReadStream):
def __init__(self, streamFile):
self.file = streamFile

def close(self):
pass

def read(self, chunk_size=-1):
return next(self.file)
38 changes: 38 additions & 0 deletions assemblyline/filestore/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,41 @@ def put(self, dst_path: str, content: AnyStr):
Put the content of the file in memory directly to the filestore dst_path
"""
raise TransportException("Not Implemented")

def read(self, path):
"""
Returns a file-like object for the file.
"""
raise TransportException("Not Implemented")

class TransportReadStream(object):
"""
TransportFile base class.

- Subclasses should override all methods.
- Except as noted, TransportFile methods do not return value and raise
- TransportException on failure.
- Methods should only raise TransportExceptions. (The decorators
Chain and ChainAll can be applied to a function/method and class,
respectively, to ensure that any exceptions raised are converted to
TransportExceptions.
"""

def read(self, chunk_size = -1):
"""
Returns the next chunk of a streamed file, to the maximum that is currently available
should never return an empty string, always > 0
"""
raise TransportException("Not Implemented")

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def close(self):
"""
Ends the use of the file, requiring the user to create another read stream to access the same file
"""
raise TransportException("Not Implemented")
39 changes: 37 additions & 2 deletions assemblyline/filestore/transport/ftp.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from __future__ import annotations
import errno
import ftplib
import logging
import os
import posixpath
import queue
import threading
import time
import errno
import weakref

from io import BytesIO
Expand All @@ -14,7 +15,7 @@
from assemblyline.common.exceptions import ChainAll
from assemblyline.common.path import splitpath
from assemblyline.common.uid import get_random_id
from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path
from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream


def reconnect_retry_on_fail(func):
Expand Down Expand Up @@ -227,3 +228,37 @@ def put(self, dst_path: str, content: AnyStr):
self.log.debug("Rename: %s -> %s", temppath, finalpath)
self.ftp.rename(temppath, finalpath)
assert (self.exists(dst_path))

def read(self, path):
path = self.normalize(path)
callbackqueue = queue.Queue()
def retrbinary(chunk_size = 8192):
self.ftp.retrbinary('RETR ' + path, callback = callbackqueue.put, blocksize = chunk_size)
callbackqueue.put(None)
return TransportReadStreamFTP(path, callbackqueue, retrbinary)


class TransportReadStreamFTP(TransportReadStream):
def __init__(self, filepath, filequeue, retrMethod):
self.filepath = filepath
self.filequeue = filequeue
self.retrMethod = retrMethod
self.readThread = None
self.generator = None

def close(self):
pass

def read(self, chunk_size=8192):
if self.generator is None:
self.generator = self.getGen(chunk_size=chunk_size)
return next(self.generator)

def getGen(self, chunk_size=8192):
self.readThread = threading.Thread(target=self.retrMethod(chunk_size), daemon=True)
self.readThread.start()
chunk = self.filequeue.get()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure read method doesn't get blocked forever if there is an extra call to read after the last one that returns none

if chunk is not None:
yield chunk
else:
raise StopIteration()
27 changes: 26 additions & 1 deletion assemblyline/filestore/transport/http.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import os
import posixpath

import requests

from assemblyline.common.exceptions import ChainAll
from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path
from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream


@ChainAll(TransportException)
Expand Down Expand Up @@ -108,3 +109,27 @@ def get(self, path):

def put(self, dst_path, content):
raise TransportException("READ ONLY TRANSPORT: Method not implemented")

def read(self, path):
path = self.normalize(path)
resp = self.session.get(path, auth=self.auth, cert=self.pki, verify=self.verify, stream=True)
if resp.ok:
return TransportReadStreamHTTP(resp)
else:
raise TransportException("[%s] %s: %s" % (resp.status_code, resp.reason, path))


# TODO: Create an extension of the base class TransportFile

class TransportReadStreamHTTP(TransportReadStream):
def __init__(self, response):
self.response = response
self.iter = None

def close(self):
self.response.close()

def read(self, chunk_size=1024):
if self.iter is None:
self.iter = self.response.iter_content(chunk_size=chunk_size)
return next(self.iter)
11 changes: 11 additions & 0 deletions assemblyline/filestore/transport/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ def put(self, path, content):
pass
assert(self.exists(path))

def read(self, path):
path = self.normalize(path)
fh = None
try:
fh = open(path, "rb")
return fh
finally:
if fh:
fh.close()


def __str__(self):
return 'file://{}'.format(self.base)

Expand Down
25 changes: 22 additions & 3 deletions assemblyline/filestore/transport/s3.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import boto3
import logging
import os
import tempfile
from io import BytesIO

import boto3
from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError
from io import BytesIO

from assemblyline.common.exceptions import ChainAll
from assemblyline.filestore.transport.base import Transport, TransportException
from assemblyline.filestore.transport.base import Transport, TransportException, TransportReadStream

try:
from botocore.vendored.requests.packages.urllib3 import disable_warnings
Expand Down Expand Up @@ -173,3 +173,22 @@ def put(self, dst_path, content):

with BytesIO(content) as file_io:
self.with_retries(self.client.upload_fileobj, file_io, self.bucket, dst_path)

def read(self, path):
key = self.normalize(path)
file = self.with_retries(self.client.get_object, Key = key, Bucket = self.bucket)
return TransportReadStreamS3(file['Body'])


class TransportReadStreamS3(TransportReadStream):
def __init__(self, file):
self.file = file

def close(self):
self.file.close()

def read(self, chunk_size = -1):
if chunk_size > 0:
return self.file.read(chunk_size)
else:
return self.file.read()
24 changes: 21 additions & 3 deletions assemblyline/filestore/transport/sftp.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import logging
import os
import posixpath
import pysftp
import tempfile
import warnings

from io import BytesIO

import pysftp
from paramiko import SSHException

from assemblyline.common.exceptions import ChainAll
from assemblyline.common.uid import get_random_id
from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path
from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream


def reconnect_retry_on_fail(func):
Expand Down Expand Up @@ -170,3 +169,22 @@ def put(self, dst_path, content):

# Cleanup
os.unlink(src_path)

@reconnect_retry_on_fail
def read(self, path):
path = self.normalize(path)
with self.sftp.open(path) as sftp_handle:
return TransportReadStreamSFTP(sftp_handle)

class TransportReadStreamSFTP(TransportReadStream):
def __init__(self, file):
self.file = file

def close(self):
self.file.close()

def read(self, chunk_size):
if chunk_size > 0:
return self.file.read(chunk_size)
else:
return self.file.read()
10 changes: 10 additions & 0 deletions pipelines/azure-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ resources:
MINIO_SECRET_KEY: Ch@ngeTh!sPa33w0rd
ports:
- 9000:9000
- container: ftpserver
image: fauria/vsftpd
environment:
- FTP_USER=al_test_user
- FTP_PASS=password
ports:
- '21:21'
- '20:20'
- '21100-21110:21100-21110'

stages:
- stage: build
Expand Down Expand Up @@ -71,6 +80,7 @@ stages:
elasticsearch: elasticsearch
redis: redis
minio: minio
ftpserver: ftpserver
steps:
- checkout: none
- task: UsePythonVersion@0
Expand Down
10 changes: 10 additions & 0 deletions pipelines/azure-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ resources:
MINIO_SECRET_KEY: Ch@ngeTh!sPa33w0rd
ports:
- 9000:9000
- container: ftpserver
image: fauria/vsftpd
environment:
- FTP_USER=al_test_user
- FTP_PASS=password
ports:
- '21:21'
- '20:20'
- '21100-21110:21100-21110'

jobs:
- job: run_test
Expand All @@ -43,6 +52,7 @@ jobs:
elasticsearch: elasticsearch
redis: redis
minio: minio
ftpserver: ftpserver

steps:
- task: UsePythonVersion@0
Expand Down
16 changes: 16 additions & 0 deletions test/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,19 @@ services:
image: redis
ports:
- '6379:6379'

ftpServer:
image: fauria/vsftpd
environment:
- FTP_USER=al_test_user
- FTP_PASS=password
ports:
- '21:21'
- '20:20'
- '21100-21110:21100-21110'

sftpServer:
image: atmoz/sftp
ports:
- '2222:22'
command: sftp_test_user:password:1001::test
2 changes: 2 additions & 0 deletions test/readme.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
this is a readme file with two lines of text
this is the second line of text
Loading