diff --git a/assemblyline/filestore/__init__.py b/assemblyline/filestore/__init__.py index 1423734ba..5ed4ccb03 100644 --- a/assemblyline/filestore/__init__.py +++ b/assemblyline/filestore/__init__.py @@ -226,6 +226,16 @@ def put(self, dst_path, content, location='all', force=False): 'exist for %s on %s (%s)' % (dst_path, location, t)) return transports + @elasticapm.capture_span(span_type='filestore') + def read(self, path, location='any'): + for t in self.slice(location): + try: + if t.exists(path): + return t.read(path) + except Exception as ex: + trace = get_stacktrace_info(ex) + self.log.warning('Transport problem: %s', trace) + def slice(self, location): start, end = { 'all': (0, len(self.transports)), diff --git a/assemblyline/filestore/transport/azure.py b/assemblyline/filestore/transport/azure.py index 46f720d58..b089b24ac 100644 --- a/assemblyline/filestore/transport/azure.py +++ b/assemblyline/filestore/transport/azure.py @@ -1,15 +1,14 @@ import logging import os import time +from io import BytesIO # noinspection PyProtectedMember from azure.core.exceptions import * from azure.storage.blob import BlobServiceClient -from io import BytesIO from assemblyline.common.exceptions import ChainAll -from assemblyline.filestore.transport.base import Transport, TransportException - +from assemblyline.filestore.transport.base import Transport, TransportException, TransportReadStream """ This class assumes a flat file structure in the Azure storage blob. @@ -143,6 +142,15 @@ def get(self, path): blob_data.readinto(my_blob) return my_blob.getvalue() + def read(self, path): + key = self.normalize(path) + my_blob = BytesIO() + + blob_client = self.service_client.get_blob_client(self.blob_container, key) + blob_data = self.with_retries(blob_client.download_blob) + + return TransportReadStreamAzure(blob_data.chunks()) + def put(self, dst_path, content): if self.read_only: raise TransportException("READ ONLY TRANSPORT: Method not allowed") @@ -158,3 +166,14 @@ def put(self, dst_path, content): except TransportException as error: if not isinstance(error.cause, ResourceExistsError): raise + + +class TransportReadStreamAzure(TransportReadStream): + def __init__(self, streamFile): + self.file = streamFile + + def close(self): + pass + + def read(self, chunk_size=-1): + return next(self.file) diff --git a/assemblyline/filestore/transport/base.py b/assemblyline/filestore/transport/base.py index 396718286..7ffa10196 100644 --- a/assemblyline/filestore/transport/base.py +++ b/assemblyline/filestore/transport/base.py @@ -97,3 +97,41 @@ def put(self, dst_path: str, content: AnyStr): Put the content of the file in memory directly to the filestore dst_path """ raise TransportException("Not Implemented") + + def read(self, path): + """ + Returns a file-like object for the file. + """ + raise TransportException("Not Implemented") + +class TransportReadStream(object): + """ + TransportFile base class. + + - Subclasses should override all methods. + - Except as noted, TransportFile methods do not return value and raise + - TransportException on failure. + - Methods should only raise TransportExceptions. (The decorators + Chain and ChainAll can be applied to a function/method and class, + respectively, to ensure that any exceptions raised are converted to + TransportExceptions. + """ + + def read(self, chunk_size = -1): + """ + Returns the next chunk of a streamed file, to the maximum that is currently available + should never return an empty string, always > 0 + """ + raise TransportException("Not Implemented") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): + """ + Ends the use of the file, requiring the user to create another read stream to access the same file + """ + raise TransportException("Not Implemented") \ No newline at end of file diff --git a/assemblyline/filestore/transport/ftp.py b/assemblyline/filestore/transport/ftp.py index 9d906e0de..bbc5f7280 100644 --- a/assemblyline/filestore/transport/ftp.py +++ b/assemblyline/filestore/transport/ftp.py @@ -1,11 +1,12 @@ from __future__ import annotations +import errno import ftplib import logging import os import posixpath +import queue import threading import time -import errno import weakref from io import BytesIO @@ -14,7 +15,7 @@ from assemblyline.common.exceptions import ChainAll from assemblyline.common.path import splitpath from assemblyline.common.uid import get_random_id -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream def reconnect_retry_on_fail(func): @@ -227,3 +228,37 @@ def put(self, dst_path: str, content: AnyStr): self.log.debug("Rename: %s -> %s", temppath, finalpath) self.ftp.rename(temppath, finalpath) assert (self.exists(dst_path)) + + def read(self, path): + path = self.normalize(path) + callbackqueue = queue.Queue() + def retrbinary(chunk_size = 8192): + self.ftp.retrbinary('RETR ' + path, callback = callbackqueue.put, blocksize = chunk_size) + callbackqueue.put(None) + return TransportReadStreamFTP(path, callbackqueue, retrbinary) + + +class TransportReadStreamFTP(TransportReadStream): + def __init__(self, filepath, filequeue, retrMethod): + self.filepath = filepath + self.filequeue = filequeue + self.retrMethod = retrMethod + self.readThread = None + self.generator = None + + def close(self): + pass + + def read(self, chunk_size=8192): + if self.generator is None: + self.generator = self.getGen(chunk_size=chunk_size) + return next(self.generator) + + def getGen(self, chunk_size=8192): + self.readThread = threading.Thread(target=self.retrMethod(chunk_size), daemon=True) + self.readThread.start() + chunk = self.filequeue.get() + if chunk is not None: + yield chunk + else: + raise StopIteration() diff --git a/assemblyline/filestore/transport/http.py b/assemblyline/filestore/transport/http.py index 4128515ea..14c6c9104 100644 --- a/assemblyline/filestore/transport/http.py +++ b/assemblyline/filestore/transport/http.py @@ -1,10 +1,11 @@ import logging import os import posixpath + import requests from assemblyline.common.exceptions import ChainAll -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream @ChainAll(TransportException) @@ -108,3 +109,27 @@ def get(self, path): def put(self, dst_path, content): raise TransportException("READ ONLY TRANSPORT: Method not implemented") + + def read(self, path): + path = self.normalize(path) + resp = self.session.get(path, auth=self.auth, cert=self.pki, verify=self.verify, stream=True) + if resp.ok: + return TransportReadStreamHTTP(resp) + else: + raise TransportException("[%s] %s: %s" % (resp.status_code, resp.reason, path)) + + +# TODO: Create an extension of the base class TransportFile + +class TransportReadStreamHTTP(TransportReadStream): + def __init__(self, response): + self.response = response + self.iter = None + + def close(self): + self.response.close() + + def read(self, chunk_size=1024): + if self.iter is None: + self.iter = self.response.iter_content(chunk_size=chunk_size) + return next(self.iter) diff --git a/assemblyline/filestore/transport/local.py b/assemblyline/filestore/transport/local.py index 27b624696..7731881d2 100644 --- a/assemblyline/filestore/transport/local.py +++ b/assemblyline/filestore/transport/local.py @@ -126,6 +126,17 @@ def put(self, path, content): pass assert(self.exists(path)) + def read(self, path): + path = self.normalize(path) + fh = None + try: + fh = open(path, "rb") + return fh + finally: + if fh: + fh.close() + + def __str__(self): return 'file://{}'.format(self.base) diff --git a/assemblyline/filestore/transport/s3.py b/assemblyline/filestore/transport/s3.py index de3683479..d9bd1f325 100644 --- a/assemblyline/filestore/transport/s3.py +++ b/assemblyline/filestore/transport/s3.py @@ -1,13 +1,13 @@ -import boto3 import logging import os import tempfile +from io import BytesIO +import boto3 from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError -from io import BytesIO from assemblyline.common.exceptions import ChainAll -from assemblyline.filestore.transport.base import Transport, TransportException +from assemblyline.filestore.transport.base import Transport, TransportException, TransportReadStream try: from botocore.vendored.requests.packages.urllib3 import disable_warnings @@ -173,3 +173,22 @@ def put(self, dst_path, content): with BytesIO(content) as file_io: self.with_retries(self.client.upload_fileobj, file_io, self.bucket, dst_path) + + def read(self, path): + key = self.normalize(path) + file = self.with_retries(self.client.get_object, Key = key, Bucket = self.bucket) + return TransportReadStreamS3(file['Body']) + + +class TransportReadStreamS3(TransportReadStream): + def __init__(self, file): + self.file = file + + def close(self): + self.file.close() + + def read(self, chunk_size = -1): + if chunk_size > 0: + return self.file.read(chunk_size) + else: + return self.file.read() \ No newline at end of file diff --git a/assemblyline/filestore/transport/sftp.py b/assemblyline/filestore/transport/sftp.py index c9d700fba..87ede0abb 100644 --- a/assemblyline/filestore/transport/sftp.py +++ b/assemblyline/filestore/transport/sftp.py @@ -1,17 +1,16 @@ import logging import os import posixpath -import pysftp import tempfile import warnings - from io import BytesIO +import pysftp from paramiko import SSHException from assemblyline.common.exceptions import ChainAll from assemblyline.common.uid import get_random_id -from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path +from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path, TransportReadStream def reconnect_retry_on_fail(func): @@ -170,3 +169,22 @@ def put(self, dst_path, content): # Cleanup os.unlink(src_path) + + @reconnect_retry_on_fail + def read(self, path): + path = self.normalize(path) + with self.sftp.open(path) as sftp_handle: + return TransportReadStreamSFTP(sftp_handle) + +class TransportReadStreamSFTP(TransportReadStream): + def __init__(self, file): + self.file = file + + def close(self): + self.file.close() + + def read(self, chunk_size): + if chunk_size > 0: + return self.file.read(chunk_size) + else: + return self.file.read() diff --git a/pipelines/azure-build.yaml b/pipelines/azure-build.yaml index d44039e39..20e550635 100644 --- a/pipelines/azure-build.yaml +++ b/pipelines/azure-build.yaml @@ -28,6 +28,15 @@ resources: MINIO_SECRET_KEY: Ch@ngeTh!sPa33w0rd ports: - 9000:9000 + - container: ftpserver + image: fauria/vsftpd + environment: + - FTP_USER=al_test_user + - FTP_PASS=password + ports: + - '21:21' + - '20:20' + - '21100-21110:21100-21110' stages: - stage: build @@ -71,6 +80,7 @@ stages: elasticsearch: elasticsearch redis: redis minio: minio + ftpserver: ftpserver steps: - checkout: none - task: UsePythonVersion@0 diff --git a/pipelines/azure-tests.yaml b/pipelines/azure-tests.yaml index 9ae79b093..a1b4f1520 100644 --- a/pipelines/azure-tests.yaml +++ b/pipelines/azure-tests.yaml @@ -26,6 +26,15 @@ resources: MINIO_SECRET_KEY: Ch@ngeTh!sPa33w0rd ports: - 9000:9000 + - container: ftpserver + image: fauria/vsftpd + environment: + - FTP_USER=al_test_user + - FTP_PASS=password + ports: + - '21:21' + - '20:20' + - '21100-21110:21100-21110' jobs: - job: run_test @@ -43,6 +52,7 @@ jobs: elasticsearch: elasticsearch redis: redis minio: minio + ftpserver: ftpserver steps: - task: UsePythonVersion@0 diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 9891484ca..e5133792b 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -24,3 +24,19 @@ services: image: redis ports: - '6379:6379' + + ftpServer: + image: fauria/vsftpd + environment: + - FTP_USER=al_test_user + - FTP_PASS=password + ports: + - '21:21' + - '20:20' + - '21100-21110:21100-21110' + + sftpServer: + image: atmoz/sftp + ports: + - '2222:22' + command: sftp_test_user:password:1001::test diff --git a/test/readme.txt b/test/readme.txt new file mode 100644 index 000000000..c27b75a29 --- /dev/null +++ b/test/readme.txt @@ -0,0 +1,2 @@ +this is a readme file with two lines of text +this is the second line of text \ No newline at end of file diff --git a/test/test_filestore.py b/test/test_filestore.py index 87b49102d..6a1cb4a11 100644 --- a/test/test_filestore.py +++ b/test/test_filestore.py @@ -1,9 +1,10 @@ import os +import tempfile import pytest -from assemblyline.filestore.transport.base import TransportException from assemblyline.filestore import FileStore +from assemblyline.filestore.transport.base import TransportException def test_azure(): @@ -13,6 +14,7 @@ def test_azure(): fs = FileStore("azure://alpytest.blob.core.windows.net/pytest/", connection_attempts=2) assert fs.exists('test') != [] assert fs.get('test') is not None + assert fs.read('test').read() is not None with pytest.raises(TransportException): fs.put('bob', 'bob') @@ -26,6 +28,15 @@ def test_http(): assert fs.exists('assemblyline-base') != [] assert fs.get('assemblyline-base') is not None + fs = FileStore('http://cyber.gc.ca/en/') + assert fs.exists('assemblyline') != [] + assert fs.get('assemblyline') is not None + tempf = tempfile.NamedTemporaryFile() + fs.download('assembyline', tempf.name) + assert open(tempf.name, 'r').read() is not None + httpObject = fs.read('assemblyline') + assert httpObject.read(chunk_size=32) is not None + def test_https(): """ @@ -36,6 +47,14 @@ def test_https(): assert fs.exists('assemblyline-base') != [] assert fs.get('assemblyline-base') is not None + fs = FileStore('https://cyber.gc.ca/en/') + assert fs.exists('assemblyline') != [] + assert fs.get('assemblyline') is not None + tempf = tempfile.NamedTemporaryFile() + fs.download('assembyline', tempf.name) + assert open(tempf.name, 'r').read() is not None + assert fs.read('assemblyline').read(chunk_size=24) is not None + # def test_sftp(): # """ @@ -43,18 +62,28 @@ def test_https(): # Rebex test server. # """ # fs = FileStore('sftp://demo:password@test.rebex.net') +# # fs = FileStore('sftp://sftp_test_user:password@localhost:2222') +# # fs.upload('readme.txt', 'readme.txt') # assert fs.exists('readme.txt') != [] # assert fs.get('readme.txt') is not None +# sftpfile = fs.read('readme.txt') +# assert sftpfile.read() is not None -# def test_ftp(): -# """ -# Test FTP FileStore by fetching the readme.txt file from -# Rebex test server. -# """ -# fs = FileStore('ftp://demo:password@test.rebex.net') -# assert fs.exists('readme.txt') != [] -# assert fs.get('readme.txt') is not None +def test_ftp(): + """ + Test FTP FileStore by fetching the readme.txt file from + containerized server. + """ + fs = FileStore('ftp://al_test_user:password@localhost') + fs.upload('readme.txt', 'readme.txt') + # fs = FileStore('ftp://demo:password@test.rebex.net') + assert fs.exists('readme.txt') != [] + assert fs.get('readme.txt') is not None + ftpfile = fs.read('readme.txt') + assert ftpfile.read() is not None + fs.delete('readme.txt') + assert fs.exists('readme.txt') == [] # def test_ftps(): @@ -65,6 +94,8 @@ def test_https(): # fs = FileStore('ftps://demo:password@test.rebex.net') # assert fs.exists('readme.txt') != [] # assert fs.get('readme.txt') is not None +# ftpsfile = fs.read('readme.txt') +# assert ftpsfile.read() is not None def test_file(): @@ -78,6 +109,7 @@ def test_file(): fs = FileStore('file://%s' % os.path.dirname(__file__)) assert fs.exists(os.path.basename(__file__)) != [] assert fs.get(os.path.basename(__file__)) is not None + assert fs.read(os.path.basename(__file__)) is not None def test_s3(): @@ -87,8 +119,12 @@ def test_s3(): """ fs = FileStore('s3://AKIAIIESFCKMSXUP6KWQ:Uud08qLQ48Cbo9RB7b+H+M97aA2wdR8OXaHXIKwL@' 's3.amazonaws.com/?s3_bucket=assemblyline-support&aws_region=us-east-1') + tempf = tempfile.NamedTemporaryFile() assert fs.exists('al4_s3_pytest.txt') != [] assert fs.get('al4_s3_pytest.txt') is not None + assert fs.read('al4_s3_pytest.txt') is not None + fs.download('al4_s3_pytest.txt', tempf.name) + assert open(tempf.name, 'r').read() is not None def test_minio(): @@ -103,4 +139,3 @@ def test_minio(): assert fs.exists('al4_minio_pytest.txt') != [] assert fs.get('al4_minio_pytest.txt') == content assert fs.delete('al4_minio_pytest.txt') is None -