From 49a8717ce857f6aeb76925501f55c64a1f29689c Mon Sep 17 00:00:00 2001 From: Sam Perreault Date: Thu, 24 Sep 2020 11:29:54 -0400 Subject: [PATCH 1/8] AL-131: Add streaming read operations - Adds TransportFile and TransportFile children objects --- assemblyline/filestore/transport/azure.py | 25 ++++++++++++++++--- assemblyline/filestore/transport/base.py | 29 +++++++++++++++++++++++ assemblyline/filestore/transport/ftp.py | 17 +++++++++++-- assemblyline/filestore/transport/http.py | 15 +++++++++++- assemblyline/filestore/transport/local.py | 17 ++++++++++++- assemblyline/filestore/transport/s3.py | 23 ++++++++++++++++-- assemblyline/filestore/transport/sftp.py | 21 +++++++++++++--- 7 files changed, 135 insertions(+), 12 deletions(-) diff --git a/assemblyline/filestore/transport/azure.py b/assemblyline/filestore/transport/azure.py index 46f720d58..d1db7c824 100644 --- a/assemblyline/filestore/transport/azure.py +++ b/assemblyline/filestore/transport/azure.py @@ -1,15 +1,13 @@ import logging import os import time - # noinspection PyProtectedMember from azure.core.exceptions import * from azure.storage.blob import BlobServiceClient from io import BytesIO from assemblyline.common.exceptions import ChainAll -from assemblyline.filestore.transport.base import Transport, TransportException - +from assemblyline.filestore.transport.base import Transport, TransportException, TransportFile """ This class assumes a flat file structure in the Azure storage blob. @@ -143,6 +141,15 @@ def get(self, path): blob_data.readinto(my_blob) return my_blob.getvalue() + def read(self, path): + key = self.normalize(path) + my_blob = BytesIO() + + blob_client = self.service_client.get_blob_client(self.blob_container, key) + blob_data = self.with_retries(blob_client.download_blob) + + file = TransportFileAzure(blob_data) + def put(self, dst_path, content): if self.read_only: raise TransportException("READ ONLY TRANSPORT: Method not allowed") @@ -158,3 +165,15 @@ def put(self, dst_path, content): except TransportException as error: if not isinstance(error.cause, ResourceExistsError): raise + + +# TODO: Create an extension of the base class TransportFile +class TransportFileAzure(TransportFile): + def __init__(self, streamFile): + self.file = streamFile + + def iterator(self): + pass + + def read(self): + pass diff --git a/assemblyline/filestore/transport/base.py b/assemblyline/filestore/transport/base.py index 396718286..cd13991e5 100644 --- a/assemblyline/filestore/transport/base.py +++ b/assemblyline/filestore/transport/base.py @@ -97,3 +97,32 @@ def put(self, dst_path: str, content: AnyStr): Put the content of the file in memory directly to the filestore dst_path """ raise TransportException("Not Implemented") + +class TransportFile(object): + """ + TransportFile base class. + + - Subclasses should override all methods. + - Except as noted, TransportFile methods do not return value and raise + - TransportException on failure. + - Methods should only raise TransportExceptions. (The decorators + Chain and ChainAll can be applied to a function/method and class, + respectively, to ensure that any exceptions raised are converted to + TransportExceptions. + """ + + def __init__(self, file): + self.file = file + + + def iterator(self): + """ + Returns the iterator associated with this TransportFile + """ + raise TransportException("Not Implemented") + + def read(self): + """ + Returns the next chunk of a streamed file + """ + raise TransportException("Not Implemented") \ No newline at end of file diff --git a/assemblyline/filestore/transport/ftp.py b/assemblyline/filestore/transport/ftp.py index 9d906e0de..f3eae2019 100644 --- a/assemblyline/filestore/transport/ftp.py +++ b/assemblyline/filestore/transport/ftp.py @@ -1,11 +1,11 @@ from __future__ import annotations +import errno import ftplib import logging import os import posixpath import threading import time -import errno import weakref from io import BytesIO @@ -14,7 +14,7 @@ from assemblyline.common.exceptions import ChainAll from assemblyline.common.path import splitpath from assemblyline.common.uid import get_random_id -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportFile def reconnect_retry_on_fail(func): @@ -227,3 +227,16 @@ def put(self, dst_path: str, content: AnyStr): self.log.debug("Rename: %s -> %s", temppath, finalpath) self.ftp.rename(temppath, finalpath) assert (self.exists(dst_path)) + + +# TODO: Create an extension of the base class TransportFile + +class TransportFileFTP(TransportFile): + def __init__(self, iterator): + self.iterator = iterator + + def iterator(self): + pass + + def read(self): + pass \ No newline at end of file diff --git a/assemblyline/filestore/transport/http.py b/assemblyline/filestore/transport/http.py index 4128515ea..4c9f58323 100644 --- a/assemblyline/filestore/transport/http.py +++ b/assemblyline/filestore/transport/http.py @@ -4,7 +4,7 @@ import requests from assemblyline.common.exceptions import ChainAll -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportFile @ChainAll(TransportException) @@ -108,3 +108,16 @@ def get(self, path): def put(self, dst_path, content): raise TransportException("READ ONLY TRANSPORT: Method not implemented") + + +# TODO: Create an extension of the base class TransportFile + +class TransportFileHTTP(TransportFile): + def __init__(self, iterator): + self.iterator = iterator + + def iterator(self): + pass + + def read(self): + pass \ No newline at end of file diff --git a/assemblyline/filestore/transport/local.py b/assemblyline/filestore/transport/local.py index 27b624696..d37a79d0b 100644 --- a/assemblyline/filestore/transport/local.py +++ b/assemblyline/filestore/transport/local.py @@ -4,7 +4,7 @@ from assemblyline.common.exceptions import ChainAll from assemblyline.common.uid import get_random_id -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportFile @ChainAll(TransportException) @@ -139,3 +139,18 @@ def _join(base, path): if base is None: return path return os.path.join(base, path.lstrip("/")).replace("\\", "/") + + +# TODO: Create an extension of the base class TransportFile + +class TransportFileLocal(TransportFile): + def __init__(self, file, chunk_size = 1024): + super().__init__(file) + self.chunk_size = chunk_size + self.iterator = iter(partial(self.file.read, self.chunk_size), b'') + + def iterator(self): + return self.iterator + + def read(self): + return self.file.read(self.chunk_size) \ No newline at end of file diff --git a/assemblyline/filestore/transport/s3.py b/assemblyline/filestore/transport/s3.py index de3683479..ab6a7720f 100644 --- a/assemblyline/filestore/transport/s3.py +++ b/assemblyline/filestore/transport/s3.py @@ -2,12 +2,11 @@ import logging import os import tempfile - from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError from io import BytesIO from assemblyline.common.exceptions import ChainAll -from assemblyline.filestore.transport.base import Transport, TransportException +from assemblyline.filestore.transport.base import Transport, TransportException, TransportFile try: from botocore.vendored.requests.packages.urllib3 import disable_warnings @@ -166,6 +165,12 @@ def get(self, path): if os.path.exists(dst_path): os.remove(dst_path) + def read(self, path): + key = self.normalize(path) + file = self.with_retries(self.client.get_object, Key = key, Bucket = self.bucket) + tranFile = TransportFileS3(file.StreamingBody) + return tranFile + def put(self, dst_path, content): dst_path = self.normalize(dst_path) if isinstance(content, str): @@ -173,3 +178,17 @@ def put(self, dst_path, content): with BytesIO(content) as file_io: self.with_retries(self.client.upload_fileobj, file_io, self.bucket, dst_path) + + +# TODO: Create an extension of the base class TransportFile + +class TransportFileS3(TransportFile): + def __init__(self, streamFile): + super().__init__(streamFile) + self.iterator = streamFile.iter_chunks() + + def iterator(self): + return self.iterator + + def read(self): + return self.file.next() \ No newline at end of file diff --git a/assemblyline/filestore/transport/sftp.py b/assemblyline/filestore/transport/sftp.py index c9d700fba..71852daec 100644 --- a/assemblyline/filestore/transport/sftp.py +++ b/assemblyline/filestore/transport/sftp.py @@ -4,14 +4,12 @@ import pysftp import tempfile import warnings - from io import BytesIO - from paramiko import SSHException from assemblyline.common.exceptions import ChainAll from assemblyline.common.uid import get_random_id -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportFile def reconnect_retry_on_fail(func): @@ -147,6 +145,9 @@ def get(self, path): bio.write(sftp_handle.read()) return bio.getvalue() + def read(self, path): + pass + @reconnect_retry_on_fail def put(self, dst_path, content): dst_path = self.normalize(dst_path) @@ -170,3 +171,17 @@ def put(self, dst_path, content): # Cleanup os.unlink(src_path) + + +# TODO: Create an extension of the base class TransportFile + +class TransportFileSFTP(TransportFile): + def __init__(self, iterator): + self.iterator = iterator + + def iterator(self): + return self.iterator + + def read(self): + #TODO: actually get a chunk + pass \ No newline at end of file From ef86f6d8077caee3b5571a475ab57904a54b41a2 Mon Sep 17 00:00:00 2001 From: Sam Perreault Date: Tue, 29 Sep 2020 12:26:05 -0400 Subject: [PATCH 2/8] AL-131: Add streaming read operations to transports - Adds TransportReadStreamS3 object --- assemblyline/filestore/transport/s3.py | 37 +++++++++++++++----------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/assemblyline/filestore/transport/s3.py b/assemblyline/filestore/transport/s3.py index ab6a7720f..da0883cd4 100644 --- a/assemblyline/filestore/transport/s3.py +++ b/assemblyline/filestore/transport/s3.py @@ -6,7 +6,7 @@ from io import BytesIO from assemblyline.common.exceptions import ChainAll -from assemblyline.filestore.transport.base import Transport, TransportException, TransportFile +from assemblyline.filestore.transport.base import Transport, TransportException, TransportReadStream try: from botocore.vendored.requests.packages.urllib3 import disable_warnings @@ -165,12 +165,6 @@ def get(self, path): if os.path.exists(dst_path): os.remove(dst_path) - def read(self, path): - key = self.normalize(path) - file = self.with_retries(self.client.get_object, Key = key, Bucket = self.bucket) - tranFile = TransportFileS3(file.StreamingBody) - return tranFile - def put(self, dst_path, content): dst_path = self.normalize(dst_path) if isinstance(content, str): @@ -179,16 +173,27 @@ def put(self, dst_path, content): with BytesIO(content) as file_io: self.with_retries(self.client.upload_fileobj, file_io, self.bucket, dst_path) + def read(self, path): + key = self.normalize(path) + file = self.with_retries(self.client.get_object, Key = key, Bucket = self.bucket) + transportFile = TransportReadStreamS3(file.StreamingBody) + return transportFile + +class TransportReadStreamS3(TransportReadStream): + def __init__(self, file): + super().__init__(file) -# TODO: Create an extension of the base class TransportFile + def enter(self): + pass -class TransportFileS3(TransportFile): - def __init__(self, streamFile): - super().__init__(streamFile) - self.iterator = streamFile.iter_chunks() + def exit(self): + pass - def iterator(self): - return self.iterator + def close(self): + pass - def read(self): - return self.file.next() \ No newline at end of file + def read(self, chunk_size = -1): + if chunk_size > 0: + return self.file.read(chunk_size) + else: + return self.file.read() \ No newline at end of file From dd6fe3aa82ad58706badd26fd13e7be670af699b Mon Sep 17 00:00:00 2001 From: Sam Perreault Date: Wed, 30 Sep 2020 21:34:38 -0400 Subject: [PATCH 3/8] AL-131: Add streaming read options - Adds FTP implementation --- assemblyline/filestore/transport/ftp.py | 31 ++++++++++++++++++------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/assemblyline/filestore/transport/ftp.py b/assemblyline/filestore/transport/ftp.py index f3eae2019..439c39fd4 100644 --- a/assemblyline/filestore/transport/ftp.py +++ b/assemblyline/filestore/transport/ftp.py @@ -14,7 +14,7 @@ from assemblyline.common.exceptions import ChainAll from assemblyline.common.path import splitpath from assemblyline.common.uid import get_random_id -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportFile +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream def reconnect_retry_on_fail(func): @@ -228,15 +228,30 @@ def put(self, dst_path: str, content: AnyStr): self.ftp.rename(temppath, finalpath) assert (self.exists(dst_path)) + def read(self, path): + path = self.normalize(path) + callbackqueue = queue.Queue() + def retrbinary(chunk_size = 8192): + self.ftp.retrbinary('RETR ' + path, callback = callbackqueue.put, blocksize = chunk_size) + callbackqueue.put(None) + return TransportReadStreamFTP(path, callbackqueue, retrbinary) -# TODO: Create an extension of the base class TransportFile -class TransportFileFTP(TransportFile): - def __init__(self, iterator): - self.iterator = iterator +class TransportReadStreamFTP(TransportReadStream): + def __init__(self, filepath, filequeue, retrMethod): + self.filepath = filepath + self.filequeue = filequeue + self.retrMethod = retrMethod - def iterator(self): + def close(self): pass - def read(self): - pass \ No newline at end of file + def read(self, chunk_size = 8192): + if self.readthread is None: + self.readthread = threading.Thread(target=self.retrMethod(chunk_size), daemon=True) + self.readthread.start() + chunk = self.filequeue.get() + if chunk is not None: + yield chunk + else: + return From e278147c8c1443c87feffe9a3b494453f64d61f1 Mon Sep 17 00:00:00 2001 From: Sam Perreault Date: Wed, 7 Oct 2020 10:50:26 -0400 Subject: [PATCH 4/8] AL-131: Add streaming read options - Changes TransportFile to TransportReadStream - Adds some tests - Fixes s3 Transport's read function - Adds sftp Transport's read function --- assemblyline/filestore/__init__.py | 10 +++++++ assemblyline/filestore/transport/azure.py | 12 ++++---- assemblyline/filestore/transport/base.py | 27 +++++++++++------ assemblyline/filestore/transport/http.py | 11 +++---- assemblyline/filestore/transport/local.py | 35 ++++++++++++++--------- assemblyline/filestore/transport/s3.py | 19 +++++------- assemblyline/filestore/transport/sftp.py | 33 +++++++++++---------- test/docker-compose.yml | 9 ++++++ test/test_filestore.py | 22 ++++++++++++-- 9 files changed, 116 insertions(+), 62 deletions(-) diff --git a/assemblyline/filestore/__init__.py b/assemblyline/filestore/__init__.py index 1423734ba..5ed4ccb03 100644 --- a/assemblyline/filestore/__init__.py +++ b/assemblyline/filestore/__init__.py @@ -226,6 +226,16 @@ def put(self, dst_path, content, location='all', force=False): 'exist for %s on %s (%s)' % (dst_path, location, t)) return transports + @elasticapm.capture_span(span_type='filestore') + def read(self, path, location='any'): + for t in self.slice(location): + try: + if t.exists(path): + return t.read(path) + except Exception as ex: + trace = get_stacktrace_info(ex) + self.log.warning('Transport problem: %s', trace) + def slice(self, location): start, end = { 'all': (0, len(self.transports)), diff --git a/assemblyline/filestore/transport/azure.py b/assemblyline/filestore/transport/azure.py index d1db7c824..ba00d11ac 100644 --- a/assemblyline/filestore/transport/azure.py +++ b/assemblyline/filestore/transport/azure.py @@ -1,13 +1,14 @@ import logging import os import time +from io import BytesIO + # noinspection PyProtectedMember from azure.core.exceptions import * from azure.storage.blob import BlobServiceClient -from io import BytesIO from assemblyline.common.exceptions import ChainAll -from assemblyline.filestore.transport.base import Transport, TransportException, TransportFile +from assemblyline.filestore.transport.base import Transport, TransportException, TransportReadStream """ This class assumes a flat file structure in the Azure storage blob. @@ -148,7 +149,7 @@ def read(self, path): blob_client = self.service_client.get_blob_client(self.blob_container, key) blob_data = self.with_retries(blob_client.download_blob) - file = TransportFileAzure(blob_data) + return TransportReadStreamAzure(blob_data) def put(self, dst_path, content): if self.read_only: @@ -167,12 +168,11 @@ def put(self, dst_path, content): raise -# TODO: Create an extension of the base class TransportFile -class TransportFileAzure(TransportFile): +class TransportReadStreamAzure(TransportReadStream): def __init__(self, streamFile): self.file = streamFile - def iterator(self): + def close(self): pass def read(self): diff --git a/assemblyline/filestore/transport/base.py b/assemblyline/filestore/transport/base.py index cd13991e5..7ffa10196 100644 --- a/assemblyline/filestore/transport/base.py +++ b/assemblyline/filestore/transport/base.py @@ -98,7 +98,13 @@ def put(self, dst_path: str, content: AnyStr): """ raise TransportException("Not Implemented") -class TransportFile(object): + def read(self, path): + """ + Returns a file-like object for the file. + """ + raise TransportException("Not Implemented") + +class TransportReadStream(object): """ TransportFile base class. @@ -111,18 +117,21 @@ class TransportFile(object): TransportExceptions. """ - def __init__(self, file): - self.file = file - - - def iterator(self): + def read(self, chunk_size = -1): """ - Returns the iterator associated with this TransportFile + Returns the next chunk of a streamed file, to the maximum that is currently available + should never return an empty string, always > 0 """ raise TransportException("Not Implemented") - def read(self): + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): """ - Returns the next chunk of a streamed file + Ends the use of the file, requiring the user to create another read stream to access the same file """ raise TransportException("Not Implemented") \ No newline at end of file diff --git a/assemblyline/filestore/transport/http.py b/assemblyline/filestore/transport/http.py index 4c9f58323..759bf1fc0 100644 --- a/assemblyline/filestore/transport/http.py +++ b/assemblyline/filestore/transport/http.py @@ -1,10 +1,11 @@ import logging import os import posixpath + import requests from assemblyline.common.exceptions import ChainAll -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportFile +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream @ChainAll(TransportException) @@ -112,11 +113,11 @@ def put(self, dst_path, content): # TODO: Create an extension of the base class TransportFile -class TransportFileHTTP(TransportFile): - def __init__(self, iterator): - self.iterator = iterator +class TransportReadStreamHTTP(TransportReadStream): + def __init__(self, file): + self.file = file - def iterator(self): + def close(self): pass def read(self): diff --git a/assemblyline/filestore/transport/local.py b/assemblyline/filestore/transport/local.py index d37a79d0b..b3e30f544 100644 --- a/assemblyline/filestore/transport/local.py +++ b/assemblyline/filestore/transport/local.py @@ -4,7 +4,7 @@ from assemblyline.common.exceptions import ChainAll from assemblyline.common.uid import get_random_id -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportFile +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream @ChainAll(TransportException) @@ -126,6 +126,17 @@ def put(self, path, content): pass assert(self.exists(path)) + def read(self, path): + path = self.normalize(path) + fh = None + try: + fh = open(path, "rb") + return TransportReadStreamLocal(fh) + finally: + if fh: + fh.close() + + def __str__(self): return 'file://{}'.format(self.base) @@ -140,17 +151,15 @@ def _join(base, path): return path return os.path.join(base, path.lstrip("/")).replace("\\", "/") +class TransportReadStreamLocal(TransportReadStream): + def __init__(self, file): + self.file = file -# TODO: Create an extension of the base class TransportFile - -class TransportFileLocal(TransportFile): - def __init__(self, file, chunk_size = 1024): - super().__init__(file) - self.chunk_size = chunk_size - self.iterator = iter(partial(self.file.read, self.chunk_size), b'') - - def iterator(self): - return self.iterator + def close(self): + self.file.close() - def read(self): - return self.file.read(self.chunk_size) \ No newline at end of file + def read(self, chunk_size = -1): + if chunk_size > 0: + return self.file.read(chunk_size) + else: + return self.file.read() \ No newline at end of file diff --git a/assemblyline/filestore/transport/s3.py b/assemblyline/filestore/transport/s3.py index da0883cd4..d9bd1f325 100644 --- a/assemblyline/filestore/transport/s3.py +++ b/assemblyline/filestore/transport/s3.py @@ -1,10 +1,11 @@ -import boto3 import logging import os import tempfile -from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError from io import BytesIO +import boto3 +from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError + from assemblyline.common.exceptions import ChainAll from assemblyline.filestore.transport.base import Transport, TransportException, TransportReadStream @@ -176,21 +177,15 @@ def put(self, dst_path, content): def read(self, path): key = self.normalize(path) file = self.with_retries(self.client.get_object, Key = key, Bucket = self.bucket) - transportFile = TransportReadStreamS3(file.StreamingBody) - return transportFile + return TransportReadStreamS3(file['Body']) + class TransportReadStreamS3(TransportReadStream): def __init__(self, file): - super().__init__(file) - - def enter(self): - pass - - def exit(self): - pass + self.file = file def close(self): - pass + self.file.close() def read(self, chunk_size = -1): if chunk_size > 0: diff --git a/assemblyline/filestore/transport/sftp.py b/assemblyline/filestore/transport/sftp.py index 71852daec..67fb59f63 100644 --- a/assemblyline/filestore/transport/sftp.py +++ b/assemblyline/filestore/transport/sftp.py @@ -1,15 +1,16 @@ import logging import os import posixpath -import pysftp import tempfile import warnings from io import BytesIO + +import pysftp from paramiko import SSHException from assemblyline.common.exceptions import ChainAll from assemblyline.common.uid import get_random_id -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportFile +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream def reconnect_retry_on_fail(func): @@ -145,9 +146,6 @@ def get(self, path): bio.write(sftp_handle.read()) return bio.getvalue() - def read(self, path): - pass - @reconnect_retry_on_fail def put(self, dst_path, content): dst_path = self.normalize(dst_path) @@ -172,16 +170,21 @@ def put(self, dst_path, content): # Cleanup os.unlink(src_path) + @reconnect_retry_on_fail + def read(self, path): + path = self.normalize(path) + with self.sftp.open(path) as sftp_handle: + return TransportReadStreamSFTP(sftp_handle) -# TODO: Create an extension of the base class TransportFile - -class TransportFileSFTP(TransportFile): - def __init__(self, iterator): - self.iterator = iterator +class TransportReadStreamSFTP(TransportReadStream): + def __init__(self, file): + self.file = file - def iterator(self): - return self.iterator + def close(self): + self.file.close() - def read(self): - #TODO: actually get a chunk - pass \ No newline at end of file + def read(self, chunk_size): + if chunk_size > 0: + return self.file.read(chunk_size) + else: + return self.file.read() \ No newline at end of file diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 9891484ca..e5eb3098b 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -24,3 +24,12 @@ services: image: redis ports: - '6379:6379' + + ftpServer: + image: fauria/vsftpd + environment: + - FTP_USER=al_test_user + - FTP_PASS=Th!sP@33w0rd15Temp0rary! + ports: + - '21110:21110' + command: --rm \ No newline at end of file diff --git a/test/test_filestore.py b/test/test_filestore.py index 87b49102d..e35b39964 100644 --- a/test/test_filestore.py +++ b/test/test_filestore.py @@ -1,9 +1,10 @@ import os +import tempfile import pytest -from assemblyline.filestore.transport.base import TransportException from assemblyline.filestore import FileStore +from assemblyline.filestore.transport.base import TransportException def test_azure(): @@ -26,6 +27,13 @@ def test_http(): assert fs.exists('assemblyline-base') != [] assert fs.get('assemblyline-base') is not None + fs = FileStore('http://cyber.gc.ca/en/') + assert fs.exists('assemblyline') != [] + assert fs.get('assemblyline') is not None + tempf = tempfile.NamedTemporaryFile() + fs.download('assembyline', tempf.name) + assert open(tempf.name, 'r').read() is not None + def test_https(): """ @@ -36,6 +44,13 @@ def test_https(): assert fs.exists('assemblyline-base') != [] assert fs.get('assemblyline-base') is not None + fs = FileStore('https://cyber.gc.ca/en/') + assert fs.exists('assemblyline') != [] + assert fs.get('assemblyline') is not None + tempf = tempfile.NamedTemporaryFile() + fs.download('assembyline', tempf.name) + assert open(tempf.name, 'r').read() is not None + # def test_sftp(): # """ @@ -87,8 +102,12 @@ def test_s3(): """ fs = FileStore('s3://AKIAIIESFCKMSXUP6KWQ:Uud08qLQ48Cbo9RB7b+H+M97aA2wdR8OXaHXIKwL@' 's3.amazonaws.com/?s3_bucket=assemblyline-support&aws_region=us-east-1') + tempf = tempfile.NamedTemporaryFile() assert fs.exists('al4_s3_pytest.txt') != [] assert fs.get('al4_s3_pytest.txt') is not None + assert fs.read('al4_s3_pytest.txt') is not None + fs.download('al4_s3_pytest.txt', tempf.name) + assert open(tempf.name, 'r').read() is not None def test_minio(): @@ -103,4 +122,3 @@ def test_minio(): assert fs.exists('al4_minio_pytest.txt') != [] assert fs.get('al4_minio_pytest.txt') == content assert fs.delete('al4_minio_pytest.txt') is None - From ae506a321330e983a9300b1d4908ed6e1239ed40 Mon Sep 17 00:00:00 2001 From: Sam Perreault Date: Fri, 9 Oct 2020 15:36:38 -0400 Subject: [PATCH 5/8] AL-131: add streaming read - Adds HTTP read implementation - Adds testing for local, http, https read --- assemblyline/filestore/transport/azure.py | 6 +++--- assemblyline/filestore/transport/http.py | 18 +++++++++++++----- test/docker-compose.yml | 5 +++-- test/test_filestore.py | 4 ++++ 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/assemblyline/filestore/transport/azure.py b/assemblyline/filestore/transport/azure.py index ba00d11ac..b089b24ac 100644 --- a/assemblyline/filestore/transport/azure.py +++ b/assemblyline/filestore/transport/azure.py @@ -149,7 +149,7 @@ def read(self, path): blob_client = self.service_client.get_blob_client(self.blob_container, key) blob_data = self.with_retries(blob_client.download_blob) - return TransportReadStreamAzure(blob_data) + return TransportReadStreamAzure(blob_data.chunks()) def put(self, dst_path, content): if self.read_only: @@ -175,5 +175,5 @@ def __init__(self, streamFile): def close(self): pass - def read(self): - pass + def read(self, chunk_size=-1): + return next(self.file) diff --git a/assemblyline/filestore/transport/http.py b/assemblyline/filestore/transport/http.py index 759bf1fc0..73d0a9a0f 100644 --- a/assemblyline/filestore/transport/http.py +++ b/assemblyline/filestore/transport/http.py @@ -110,15 +110,23 @@ def get(self, path): def put(self, dst_path, content): raise TransportException("READ ONLY TRANSPORT: Method not implemented") + def read(self, path): + path = self.normalize(path) + resp = self.session.get(path, auth=self.auth, cert=self.pki, verify=self.verify, stream=True) + if resp.ok: + return TransportReadStreamHTTP(resp) + else: + raise TransportException("[%s] %s: %s" % (resp.status_code, resp.reason, path)) + # TODO: Create an extension of the base class TransportFile class TransportReadStreamHTTP(TransportReadStream): - def __init__(self, file): - self.file = file + def __init__(self, response): + self.response = response def close(self): - pass + self.response.close() - def read(self): - pass \ No newline at end of file + def read(self, chunk_size=1024): + return next(self.response.iter_content(chunk_size=chunk_size)) \ No newline at end of file diff --git a/test/docker-compose.yml b/test/docker-compose.yml index e5eb3098b..e6f10de86 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -31,5 +31,6 @@ services: - FTP_USER=al_test_user - FTP_PASS=Th!sP@33w0rd15Temp0rary! ports: - - '21110:21110' - command: --rm \ No newline at end of file + - '20:20' + - '21:21' + - '21110:21110' \ No newline at end of file diff --git a/test/test_filestore.py b/test/test_filestore.py index e35b39964..f76ede20d 100644 --- a/test/test_filestore.py +++ b/test/test_filestore.py @@ -14,6 +14,7 @@ def test_azure(): fs = FileStore("azure://alpytest.blob.core.windows.net/pytest/", connection_attempts=2) assert fs.exists('test') != [] assert fs.get('test') is not None + assert fs.read('test').read() is not None with pytest.raises(TransportException): fs.put('bob', 'bob') @@ -33,6 +34,7 @@ def test_http(): tempf = tempfile.NamedTemporaryFile() fs.download('assembyline', tempf.name) assert open(tempf.name, 'r').read() is not None + assert fs.read('assemblyline').read(chunk_size=24) is not None def test_https(): @@ -50,6 +52,7 @@ def test_https(): tempf = tempfile.NamedTemporaryFile() fs.download('assembyline', tempf.name) assert open(tempf.name, 'r').read() is not None + assert fs.read('assemblyline').read(chunk_size=24) is not None # def test_sftp(): @@ -93,6 +96,7 @@ def test_file(): fs = FileStore('file://%s' % os.path.dirname(__file__)) assert fs.exists(os.path.basename(__file__)) != [] assert fs.get(os.path.basename(__file__)) is not None + assert fs.read(os.path.basename(__file__)) is not None def test_s3(): From c40090cc0a682f30eb461cf48fb9768fe3d71203 Mon Sep 17 00:00:00 2001 From: Sam Perreault Date: Thu, 5 Nov 2020 12:42:27 -0500 Subject: [PATCH 6/8] AL 131 Add read operation to filestores - creates infrastructure for FTP test with docker container --- test/docker-compose.yml | 6 +++--- test/readme.txt | 2 ++ test/test_filestore.py | 20 ++++++++++++-------- 3 files changed, 17 insertions(+), 11 deletions(-) create mode 100644 test/readme.txt diff --git a/test/docker-compose.yml b/test/docker-compose.yml index e6f10de86..d502fe0d4 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -29,8 +29,8 @@ services: image: fauria/vsftpd environment: - FTP_USER=al_test_user - - FTP_PASS=Th!sP@33w0rd15Temp0rary! + - FTP_PASS=password ports: - - '20:20' - '21:21' - - '21110:21110' \ No newline at end of file + - '20:20' + - '21100-21110:21100-21110' \ No newline at end of file diff --git a/test/readme.txt b/test/readme.txt new file mode 100644 index 000000000..c27b75a29 --- /dev/null +++ b/test/readme.txt @@ -0,0 +1,2 @@ +this is a readme file with two lines of text +this is the second line of text \ No newline at end of file diff --git a/test/test_filestore.py b/test/test_filestore.py index f76ede20d..33dac83de 100644 --- a/test/test_filestore.py +++ b/test/test_filestore.py @@ -65,14 +65,18 @@ def test_https(): # assert fs.get('readme.txt') is not None -# def test_ftp(): -# """ -# Test FTP FileStore by fetching the readme.txt file from -# Rebex test server. -# """ -# fs = FileStore('ftp://demo:password@test.rebex.net') -# assert fs.exists('readme.txt') != [] -# assert fs.get('readme.txt') is not None +def test_ftp(): + """ + Test FTP FileStore by fetching the readme.txt file from + Rebex test server. + """ + fs = FileStore('ftp://al_test_user:password@localhost') + + # fs = FileStore('ftp://demo:password@test.rebex.net') + assert fs.exists('readme.txt') != [] + assert fs.get('readme.txt') is not None + asdf = fs.read('readme.txt') + assert asdf.read() is not None # def test_ftps(): From 7de506dca2987bc5ed56a69d7963fdc5b041092b Mon Sep 17 00:00:00 2001 From: Sam Perreault Date: Tue, 15 Dec 2020 18:30:08 -0500 Subject: [PATCH 7/8] AL-131: Add streaming read operations - Adds ftp filestore tests - Adds ftpserver docker container --- assemblyline/filestore/transport/ftp.py | 17 ++++++++++++----- assemblyline/filestore/transport/http.py | 5 ++++- assemblyline/filestore/transport/local.py | 17 ++--------------- assemblyline/filestore/transport/sftp.py | 2 +- pipelines/azure-build.yaml | 10 ++++++++++ pipelines/azure-tests.yaml | 10 ++++++++++ test/test_filestore.py | 7 ++++--- 7 files changed, 43 insertions(+), 25 deletions(-) diff --git a/assemblyline/filestore/transport/ftp.py b/assemblyline/filestore/transport/ftp.py index 439c39fd4..bbc5f7280 100644 --- a/assemblyline/filestore/transport/ftp.py +++ b/assemblyline/filestore/transport/ftp.py @@ -4,6 +4,7 @@ import logging import os import posixpath +import queue import threading import time import weakref @@ -242,16 +243,22 @@ def __init__(self, filepath, filequeue, retrMethod): self.filepath = filepath self.filequeue = filequeue self.retrMethod = retrMethod + self.readThread = None + self.generator = None def close(self): pass - def read(self, chunk_size = 8192): - if self.readthread is None: - self.readthread = threading.Thread(target=self.retrMethod(chunk_size), daemon=True) - self.readthread.start() + def read(self, chunk_size=8192): + if self.generator is None: + self.generator = self.getGen(chunk_size=chunk_size) + return next(self.generator) + + def getGen(self, chunk_size=8192): + self.readThread = threading.Thread(target=self.retrMethod(chunk_size), daemon=True) + self.readThread.start() chunk = self.filequeue.get() if chunk is not None: yield chunk else: - return + raise StopIteration() diff --git a/assemblyline/filestore/transport/http.py b/assemblyline/filestore/transport/http.py index 73d0a9a0f..14c6c9104 100644 --- a/assemblyline/filestore/transport/http.py +++ b/assemblyline/filestore/transport/http.py @@ -124,9 +124,12 @@ def read(self, path): class TransportReadStreamHTTP(TransportReadStream): def __init__(self, response): self.response = response + self.iter = None def close(self): self.response.close() def read(self, chunk_size=1024): - return next(self.response.iter_content(chunk_size=chunk_size)) \ No newline at end of file + if self.iter is None: + self.iter = self.response.iter_content(chunk_size=chunk_size) + return next(self.iter) diff --git a/assemblyline/filestore/transport/local.py b/assemblyline/filestore/transport/local.py index b3e30f544..7731881d2 100644 --- a/assemblyline/filestore/transport/local.py +++ b/assemblyline/filestore/transport/local.py @@ -4,7 +4,7 @@ from assemblyline.common.exceptions import ChainAll from assemblyline.common.uid import get_random_id -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path @ChainAll(TransportException) @@ -131,7 +131,7 @@ def read(self, path): fh = None try: fh = open(path, "rb") - return TransportReadStreamLocal(fh) + return fh finally: if fh: fh.close() @@ -150,16 +150,3 @@ def _join(base, path): if base is None: return path return os.path.join(base, path.lstrip("/")).replace("\\", "/") - -class TransportReadStreamLocal(TransportReadStream): - def __init__(self, file): - self.file = file - - def close(self): - self.file.close() - - def read(self, chunk_size = -1): - if chunk_size > 0: - return self.file.read(chunk_size) - else: - return self.file.read() \ No newline at end of file diff --git a/assemblyline/filestore/transport/sftp.py b/assemblyline/filestore/transport/sftp.py index 67fb59f63..87ede0abb 100644 --- a/assemblyline/filestore/transport/sftp.py +++ b/assemblyline/filestore/transport/sftp.py @@ -187,4 +187,4 @@ def read(self, chunk_size): if chunk_size > 0: return self.file.read(chunk_size) else: - return self.file.read() \ No newline at end of file + return self.file.read() diff --git a/pipelines/azure-build.yaml b/pipelines/azure-build.yaml index d44039e39..20e550635 100644 --- a/pipelines/azure-build.yaml +++ b/pipelines/azure-build.yaml @@ -28,6 +28,15 @@ resources: MINIO_SECRET_KEY: Ch@ngeTh!sPa33w0rd ports: - 9000:9000 + - container: ftpserver + image: fauria/vsftpd + environment: + - FTP_USER=al_test_user + - FTP_PASS=password + ports: + - '21:21' + - '20:20' + - '21100-21110:21100-21110' stages: - stage: build @@ -71,6 +80,7 @@ stages: elasticsearch: elasticsearch redis: redis minio: minio + ftpserver: ftpserver steps: - checkout: none - task: UsePythonVersion@0 diff --git a/pipelines/azure-tests.yaml b/pipelines/azure-tests.yaml index 9ae79b093..a1b4f1520 100644 --- a/pipelines/azure-tests.yaml +++ b/pipelines/azure-tests.yaml @@ -26,6 +26,15 @@ resources: MINIO_SECRET_KEY: Ch@ngeTh!sPa33w0rd ports: - 9000:9000 + - container: ftpserver + image: fauria/vsftpd + environment: + - FTP_USER=al_test_user + - FTP_PASS=password + ports: + - '21:21' + - '20:20' + - '21100-21110:21100-21110' jobs: - job: run_test @@ -43,6 +52,7 @@ jobs: elasticsearch: elasticsearch redis: redis minio: minio + ftpserver: ftpserver steps: - task: UsePythonVersion@0 diff --git a/test/test_filestore.py b/test/test_filestore.py index 33dac83de..69e380111 100644 --- a/test/test_filestore.py +++ b/test/test_filestore.py @@ -34,7 +34,8 @@ def test_http(): tempf = tempfile.NamedTemporaryFile() fs.download('assembyline', tempf.name) assert open(tempf.name, 'r').read() is not None - assert fs.read('assemblyline').read(chunk_size=24) is not None + httpObject = fs.read('assemblyline') + assert httpObject.read(chunk_size=32) is not None def test_https(): @@ -75,8 +76,8 @@ def test_ftp(): # fs = FileStore('ftp://demo:password@test.rebex.net') assert fs.exists('readme.txt') != [] assert fs.get('readme.txt') is not None - asdf = fs.read('readme.txt') - assert asdf.read() is not None + ftpfile = fs.read('readme.txt') + assert ftpfile.read() is not None # def test_ftps(): From c805dd51e18927acd8140c1f6ab34d99383bde93 Mon Sep 17 00:00:00 2001 From: Sam Perreault Date: Fri, 18 Dec 2020 18:11:13 -0500 Subject: [PATCH 8/8] AL-131 Add streaming read operation - Adds SFTP server container to docker compose - Adds lines for testing read() operations --- test/docker-compose.yml | 8 +++++++- test/test_filestore.py | 12 ++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/test/docker-compose.yml b/test/docker-compose.yml index d502fe0d4..e5133792b 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -33,4 +33,10 @@ services: ports: - '21:21' - '20:20' - - '21100-21110:21100-21110' \ No newline at end of file + - '21100-21110:21100-21110' + + sftpServer: + image: atmoz/sftp + ports: + - '2222:22' + command: sftp_test_user:password:1001::test diff --git a/test/test_filestore.py b/test/test_filestore.py index 69e380111..6a1cb4a11 100644 --- a/test/test_filestore.py +++ b/test/test_filestore.py @@ -62,22 +62,28 @@ def test_https(): # Rebex test server. # """ # fs = FileStore('sftp://demo:password@test.rebex.net') +# # fs = FileStore('sftp://sftp_test_user:password@localhost:2222') +# # fs.upload('readme.txt', 'readme.txt') # assert fs.exists('readme.txt') != [] # assert fs.get('readme.txt') is not None +# sftpfile = fs.read('readme.txt') +# assert sftpfile.read() is not None def test_ftp(): """ Test FTP FileStore by fetching the readme.txt file from - Rebex test server. + containerized server. """ fs = FileStore('ftp://al_test_user:password@localhost') - + fs.upload('readme.txt', 'readme.txt') # fs = FileStore('ftp://demo:password@test.rebex.net') assert fs.exists('readme.txt') != [] assert fs.get('readme.txt') is not None ftpfile = fs.read('readme.txt') assert ftpfile.read() is not None + fs.delete('readme.txt') + assert fs.exists('readme.txt') == [] # def test_ftps(): @@ -88,6 +94,8 @@ def test_ftp(): # fs = FileStore('ftps://demo:password@test.rebex.net') # assert fs.exists('readme.txt') != [] # assert fs.get('readme.txt') is not None +# ftpsfile = fs.read('readme.txt') +# assert ftpsfile.read() is not None def test_file():