From 68d46e86f69cf1f0d64223f59604076879ed4f6f Mon Sep 17 00:00:00 2001 From: Adam Douglass Date: Mon, 28 Jun 2021 10:39:03 -0400 Subject: [PATCH 1/3] Make ftp transport threadsafe, add type hints --- assemblyline/filestore/transport/ftp.py | 36 ++++++++++++++++--------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/assemblyline/filestore/transport/ftp.py b/assemblyline/filestore/transport/ftp.py index 3cb3f5481..0f0267b21 100644 --- a/assemblyline/filestore/transport/ftp.py +++ b/assemblyline/filestore/transport/ftp.py @@ -1,11 +1,15 @@ +from __future__ import annotations import ftplib import logging import os import posixpath +import threading import time import errno +import weakref from io import BytesIO +from typing import Union from assemblyline.common.exceptions import ChainAll from assemblyline.common.path import splitpath @@ -14,7 +18,7 @@ def reconnect_retry_on_fail(func): - def new_func(self, *args, **kwargs): + def new_func(self: TransportFTP, *args, **kwargs): max_retry = 3 try_count = 0 @@ -94,13 +98,13 @@ class TransportFTP(Transport): """ def __init__(self, base=None, host=None, password=None, user=None, port=None, use_tls=None): self.log = logging.getLogger('assemblyline.transport.ftp') - self.base = base - self.ftp = None - self.host = host - self.port = int(port or 21) - self.password = password - self.user = user - self.use_tls = use_tls + self.base: str = base + self.ftp_objects: weakref.WeakKeyDictionary[threading.Thread, ftplib.FTP] = weakref.WeakKeyDictionary() + self.host: str = host + self.port: int = int(port or 21) + self.password: str = password + self.user: str = user + self.use_tls: bool = use_tls def ftp_normalize(path): # If they've provided an absolute path. Leave it a is. @@ -116,6 +120,14 @@ def ftp_normalize(path): super(TransportFTP, self).__init__(normalize=ftp_normalize) + @property + def ftp(self) -> Union[ftplib.FTP, ftplib.FTP_TLS]: + return self.ftp_objects.get(threading.current_thread(), None) + + @ftp.setter + def ftp(self, value: Union[ftplib.FTP, ftplib.FTP_TLS]): + self.ftp_objects[threading.current_thread()] = value + def __str__(self): out = 'ftp://{}@{}'.format(self.user, self.host) if self.base: @@ -123,8 +135,8 @@ def __str__(self): return out def close(self): - if self.ftp: - self.ftp.close() + for con in self.ftp_objects.values(): + con.close() @reconnect_retry_on_fail def delete(self, path): @@ -188,14 +200,14 @@ def upload_batch(self, local_remote_tuples): # Buffer based functions @reconnect_retry_on_fail - def get(self, path): + def get(self, path) -> bytes: path = self.normalize(path) bio = BytesIO() self.ftp.retrbinary('RETR ' + path, bio.write) return bio.getvalue() @reconnect_retry_on_fail - def put(self, dst_path, content): + def put(self, dst_path, content: Union[bytes, str]): dst_path = self.normalize(dst_path) dirname = posixpath.dirname(dst_path) filename = posixpath.basename(dst_path) From b6759b762c87012ce6bf4fd7efd5d04983af57d6 Mon Sep 17 00:00:00 2001 From: Adam Douglass Date: Mon, 28 Jun 2021 10:50:27 -0400 Subject: [PATCH 2/3] More FTP type annotations --- assemblyline/filestore/transport/base.py | 16 +++++++++------- assemblyline/filestore/transport/ftp.py | 10 +++++----- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/assemblyline/filestore/transport/base.py b/assemblyline/filestore/transport/base.py index 24443245c..396718286 100644 --- a/assemblyline/filestore/transport/base.py +++ b/assemblyline/filestore/transport/base.py @@ -1,3 +1,5 @@ +from typing import AnyStr + from assemblyline.common.exceptions import ChainException @@ -37,20 +39,20 @@ def __init__(self, normalize=normalize_srl_path): def close(self): pass - def delete(self, path): + def delete(self, path: str): """ Deletes the file. """ raise TransportException("Not Implemented") - def exists(self, path): + def exists(self, path: str) -> bool: """ Returns True if the path exists, False otherwise. Should work with both files and directories. """ raise TransportException("Not Implemented") - def makedirs(self, path): + def makedirs(self, path: str): """ Like os.makedirs the super-mkdir, create the leaf directory path and any intermediate path segments. @@ -58,13 +60,13 @@ def makedirs(self, path): raise TransportException("Not Implemented") # File based functions - def download(self, src_path, dst_path): + def download(self, src_path: str, dst_path: str): """ Copies the content of the filestore src_path to the local dst_path. """ raise TransportException("Not Implemented") - def upload(self, src_path, dst_path): + def upload(self, src_path: str, dst_path: str): """ Save upload source file src_path to to the filesotre dst_path, overwriting dst_path if it already exists. """ @@ -84,13 +86,13 @@ def upload_batch(self, local_remote_tuples): return failed_tuples # Buffer based functions - def get(self, path): + def get(self, path: str) -> bytes: """ Returns the content of the file. """ raise TransportException("Not Implemented") - def put(self, dst_path, content): + def put(self, dst_path: str, content: AnyStr): """ Put the content of the file in memory directly to the filestore dst_path """ diff --git a/assemblyline/filestore/transport/ftp.py b/assemblyline/filestore/transport/ftp.py index 0f0267b21..9d906e0de 100644 --- a/assemblyline/filestore/transport/ftp.py +++ b/assemblyline/filestore/transport/ftp.py @@ -9,7 +9,7 @@ import weakref from io import BytesIO -from typing import Union +from typing import Union, AnyStr from assemblyline.common.exceptions import ChainAll from assemblyline.common.path import splitpath @@ -97,7 +97,7 @@ class TransportFTP(Transport): FTP Transport class. """ def __init__(self, base=None, host=None, password=None, user=None, port=None, use_tls=None): - self.log = logging.getLogger('assemblyline.transport.ftp') + self.log: logging.Logger = logging.getLogger('assemblyline.transport.ftp') self.base: str = base self.ftp_objects: weakref.WeakKeyDictionary[threading.Thread, ftplib.FTP] = weakref.WeakKeyDictionary() self.host: str = host @@ -144,7 +144,7 @@ def delete(self, path): self.ftp.delete(path) @reconnect_retry_on_fail - def exists(self, path): + def exists(self, path) -> bool: path = self.normalize(path) self.log.debug('Checking for existence of %s', path) size = None @@ -178,7 +178,7 @@ def download(self, src_path, dst_path): self.ftp.retrbinary('RETR ' + src_path, localfile.write) @reconnect_retry_on_fail - def upload(self, src_path, dst_path): + def upload(self, src_path: str, dst_path: str): dst_path = self.normalize(dst_path) dirname = posixpath.dirname(dst_path) filename = posixpath.basename(dst_path) @@ -207,7 +207,7 @@ def get(self, path) -> bytes: return bio.getvalue() @reconnect_retry_on_fail - def put(self, dst_path, content: Union[bytes, str]): + def put(self, dst_path: str, content: AnyStr): dst_path = self.normalize(dst_path) dirname = posixpath.dirname(dst_path) filename = posixpath.basename(dst_path) From 682b52a9de7306584e677f3832aa9810f2b57a31 Mon Sep 17 00:00:00 2001 From: Adam Douglass Date: Mon, 28 Jun 2021 10:52:33 -0400 Subject: [PATCH 3/3] Add python 3.9 to the base tests --- pipelines/azure-tests.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pipelines/azure-tests.yaml b/pipelines/azure-tests.yaml index 5af718cb5..9ae79b093 100644 --- a/pipelines/azure-tests.yaml +++ b/pipelines/azure-tests.yaml @@ -33,8 +33,10 @@ jobs: matrix: python3_7: python.version: '3.7' - Python3_8: + python3_8: python.version: '3.8' + python3_9: + python.version: '3.9' timeoutInMinutes: 10 services: