-
Notifications
You must be signed in to change notification settings - Fork 385
/
Copy pathtor_project.py
executable file
·89 lines (80 loc) · 3.21 KB
/
tor_project.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import requests
from datetime import datetime, timedelta
from dateutil.parser import parse
import pytz
from diskcache import Cache
class TorProjectClient:
"""Simple client to query torproject.org for exit nodes.
The client will download https://check.torproject.org/exit-addresses
and check if a specified IP address is present in it. If that IP address
is found it will check for its last update time and return a description
of the node if its last update time is less than `ttl` seconds ago.
:param ttl: Tor node will be kept only if its last update was
less than `ttl` seconds ago. Ignored if `ttl` is 0
:param cache_duration: Duration before refreshing the cache (in seconds).
Ignored if `cache_duration` is 0.
:param cache_root: Path where to store the cached file
downloaded from torproject.org
:param proxies: Proxies to be using during requests session
:type ttl: int
:type cache_duration: int
:type cache_root: str
"""
def __init__(
self,
ttl=86400,
cache_duration=3600,
cache_root="/tmp/cortex/tor_project",
proxies=None,
):
self.session = requests.Session()
if proxies:
self.session.proxies.update(proxies)
self.delta = None
self.cache = None
if ttl > 0:
self.delta = timedelta(seconds=ttl)
if cache_duration > 0:
self.cache = Cache(cache_root)
self.cache_duration = cache_duration
self.url = "https://check.torproject.org/exit-addresses"
__cache_key = __name__ + ":raw_data"
def _get_raw_data(self):
try:
return self.cache["raw_data"]
except (AttributeError, TypeError):
return self.session.get(self.url).text
except KeyError:
self.cache.set(
"raw_data",
self.session.get(self.url).text,
expire=self.cache_duration,
)
return self.cache["raw_data"]
def search_tor_node(self, ip):
"""Lookup an IP address to check if it is a known tor exit node.
:param ip: The IP address to lookup
:type ip: str
:return: Data relative to the tor node. If `ip`is a tor exit node
it will contain a `node` key with the hash of the node and
a `last_status` key with the last update time of the node.
If `ip` is not a tor exit node, the function will return an
empty dictionary.
:rtype: dict
"""
data = {}
tmp = {}
present = datetime.utcnow().replace(tzinfo=pytz.utc)
for line in self._get_raw_data().splitlines():
params = line.split(" ")
if params[0] == "ExitNode":
tmp["node"] = params[1]
elif params[0] == "ExitAddress":
tmp["last_status"] = params[2] + "T" + params[3] + "+0000"
last_status = parse(tmp["last_status"])
if self.delta is None or (present - last_status) < self.delta:
data[params[1]] = tmp
tmp = {}
else:
pass
return data.get(ip, {})