-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathhttp.py
110 lines (90 loc) · 3.57 KB
/
http.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import logging
import os
import posixpath
import requests
from assemblyline.common.exceptions import ChainAll
from assemblyline.filestore.transport.base import Transport, TransportException, normalize_srl_path
@ChainAll(TransportException)
class TransportHTTP(Transport):
"""
HTTP Transport class.
"""
def __init__(self, scheme='http', base=None, host=None, password=None, user=None, pki=None, port=None, verify=None):
self.log = logging.getLogger('assemblyline.transport.http')
self.base = base
self.host = host
self.password = password
self.user = user
self.pki = pki
self.scheme = scheme
self.verify = verify
if not port:
if scheme == 'http':
port = 80
else:
port = 443
self.port = port
if user and password:
self.auth = (user, password)
else:
self.auth = None
def http_normalize(path):
if '/' in path or len(path) != 64:
s = posixpath.join(self.base, path)
else:
s = posixpath.join(self.base, normalize_srl_path(path))
return "{scheme}://{host}:{port}{path}".format(scheme=scheme, host=host, port=port, path=s)
self._session = None
super(TransportHTTP, self).__init__(normalize=http_normalize)
@property
def session(self):
if self._session is None:
self._session = requests.Session()
return self._session
def __str__(self):
out = "%s://" % self.scheme
if self.user:
out += "%s@" % self.user
out += "%s:%s" % (self.host, self.port)
if self.base:
out += self.base
return out
def close(self):
if self._session:
self._session.close()
def delete(self, path):
raise TransportException("READ ONLY TRANSPORT: Method not implemented")
def exists(self, path):
path = self.normalize(path)
resp = self.session.head(path, auth=self.auth, cert=self.pki, verify=self.verify)
return resp.ok
def makedirs(self, path):
raise TransportException("READ ONLY TRANSPORT: Method not implemented")
# File based functions
def download(self, src_path, dst_path):
dir_path = os.path.dirname(dst_path)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
with open(dst_path, 'wb') as localfile:
src_path = self.normalize(src_path)
resp = self.session.get(src_path, auth=self.auth, cert=self.pki, verify=self.verify)
if resp.ok:
for chunk in resp.iter_content(chunk_size=1024):
if chunk:
localfile.write(chunk)
else:
raise TransportException("[%s] %s: %s" % (resp.status_code, resp.reason, src_path))
def upload(self, src_path, dst_path):
raise TransportException("READ ONLY TRANSPORT: Method not implemented")
def upload_batch(self, local_remote_tuples):
raise TransportException("READ ONLY TRANSPORT: Method not implemented")
# Buffer based functions
def get(self, path: str) -> bytes:
path = self.normalize(path)
resp = self.session.get(path, auth=self.auth, cert=self.pki, verify=self.verify)
if resp.ok:
return resp.content
else:
raise TransportException("[%s] %s: %s" % (resp.status_code, resp.reason, path))
def put(self, dst_path, content):
raise TransportException("READ ONLY TRANSPORT: Method not implemented")