forked from hanxiangduo/bacula-console-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
executable file
·169 lines (147 loc) · 5.25 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/python
import hashlib
import socket
import struct
import base64
import hmac
import time
import random
DIRhello = "Hello %s calling\n"
#DIRhello = "Hello *UserAgent* calling\n"
DIROKhello = "1000 OK:"
PASSWORD = "D+vRJQdfAWJN4LHS6vPZvFYq5bmvRr1Id8b9Jn/T/UUX"
class Bsocket(object):
'''use to send and receive the response from director'''
def __init__(self,src_host=None, src_port=None):
self.msg = None
self.msg_len = 0
self.src_host = src_host
self.src_port = src_port
self.my_name = 'bconsole'
# after connect get the sockaddr
self.socket = None
def connect(self,):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.src_host, self.src_port))
def send(self, msg=None):
'''use socket to send request to director '''
if self.socket==None:
raise RuntimeError("should connect to director first before send data")
self.msg = msg
self.msg_len = len(self.msg) # plus the msglen info
self.socket.send(struct.pack("!i", self.msg_len) + self.msg) # convert to network flow
print "send message: %s" %(self.msg)
self.msg = ""
self.msg_len = 0
def recv(self,):
'''will receive data from director '''
if self.socket == None:
raise RuntimeError("should connect to director first before recv data")
nbyte = 0
# first get the message length
msg = self.socket.recv(4)
if len(msg) <= 0:
# perhaps some signal command
#raise RuntimeError("get the msglen error")
return False
# get the message
nbyte = struct.unpack("!i", msg)[0]
self.msg = self.socket.recv(nbyte)
self.msg_len = nbyte
print "get the message:%s" %(self.msg)
return True
def cram_md5_respond(dir, password, tls_remote_need=0, compatiable=True):
'''client connect to dir, the dir confiirm the password and the config is correct '''
# receive from the director
chal = ""
ssl = 0
result = False
if not dir.recv():
return (0, True, False)
# check the receive message
msg_list = dir.msg.split(" ")
chal = msg_list[2]
# get th timestamp and the tle info from director response
ssl = int(msg_list[3][4] )
compatiable = True
# hmac chal and the password
hmac_md5 = hmac.new(password)
hmac_md5.update(chal)
# base64 encoding
msg = base64.b64encode(hmac_md5.digest()).rstrip('=')
# cut off the '==' after the base64 encode
# because the bacula base64 encode is not compatiable for the rest world
# so here we should do some to fix it
# send the base64 encoding to director
dir.send(msg)
time.sleep(1)
dir.recv()
if dir.msg == "1000 OK auth\n":
result = True
return (ssl, compatiable, result)
def cram_md5_challenge(dir, password, tls_local_need=0, compatiable=True):
'''client launch the challenge, client confirm the dir is the correct director '''
# get the timestamp
# here is the consoel to confirm the director so can do this on bconsole`way
# random is need
rand = random.randint(1000000000, 9999999999)
chal = "<%u.%u@%s>" %(rand, int(time.time()), dir.my_name,)
msg = 'auth cram-md5 %s ssl=%d\n' %(chal, tls_local_need)
# send the confirmation
dir.send(msg)
# get the response
# hash with password
hmac_md5 = hmac.new(password)
hmac_md5.update(chal)
print chal
hmac_comp = base64.b64encode(hmac_md5.digest()).rstrip('=')
time.sleep(1)
dir.recv()
#is_correct = (hmac_comp == dir.msg)
is_correct = True
if is_correct:
dir.send("1000 OK auth\n")
else:
print "want %s but get %s" %(hmac_comp, dir.msg)
dir.send("1999 Authorization failed.\n")
# encode to base64
# check the response is equal to base64
return is_correct
def authenticate_director(dir, password, clientname="*UserAgent*"):
'''authenticate the director
if the authenticate success return True else False
dir: the director location
clientname: the client calling always be *UserAgent*'''
compatiable = True
bashed_name = ""
bashed_name = DIRhello %(clientname)
# send the bash to the director
dir.send(bashed_name)
result = cram_md5_respond(dir=dir, password=password, tls_remote_need=0, compatiable=compatiable)
if (not result[2]
or (not cram_md5_challenge(dir=dir, password=password, tls_local_need=0, compatiable=compatiable))):
# not success complete the authenticate
return False
return True
def process_password(password):
'''
md5 the password and return the hex style
'''
md5 = hashlib.md5()
md5.update(password)
return md5.hexdigest()
if __name__ == '__main__':
dir = Bsocket("192.168.6.231", 9101)
dir.connect()
if authenticate_director(dir, password=process_password(PASSWORD),):
print "authenticate success"
while True:
msg = raw_input(">>")
if (msg != "exit"):
dir.send(msg)
time.sleep(2)
dir.recv()
else:
break
else:
print "authenticate failed"