-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTopazScraper.py
275 lines (236 loc) · 10.7 KB
/
TopazScraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
from tqdm import tqdm
# import openpyxl
from lxml import html
import requests
# import pandas
import xml.etree.ElementTree as ET
from time import sleep
import os
import csv
import re
import json
class ProductScrape:
def __init__(self, result_file, error_file):
self.results_file = result_file
self.error_file = error_file
self.encoding = 'Windows-1252'
def get_spider_urls(self, zap_output):
""""
Parses result file from OWASP ZAP spider.
Appends results to the URL set to be used by other functions
"""
urls = set()
with open(zap_output, encoding=self.encoding) as file:
for line in file:
if "true" in line:
if "sorting" not in line.lower():
url = line.split(",")[2]
url = url.split("?")[0]
if not url.endswith("/"): # don't add URL's which are a categories for items
urls.add(url)
return urls
def get_sitemap_urls(self, sitemap):
"""
Pulls a provided sitemap URL.
Parses the result and adds URLs to the URL set to be used by other functions
"""
urls = set()
r = requests.get(sitemap)
root = ET.fromstring(r.content)
for child in root:
url = child[4].attrib.get('href')
if not url.endswith("/"): # don't add URL's which are a categories for items
urls.add(url)
return urls
## Prob the better way to do it, but need to understand the XML better
# for child in root.iter("link"):
# print(child.attrib)
# sleep(1)
def get_sku(self, url,content):
try:
tree = html.fromstring(content)
sku = tree.xpath('//span[@class="value"]/text()')[0] # sku is used in other comparisons, so force it to lower case
return sku
except Exception as e:
with open(self.error_file, 'a', encoding=self.encoding) as file:
error = f"URL {url} failed to parse the SKU. {e}"
file.write(error)
file.write("\n")
return None
def get_title(self, url, content):
try:
tree = html.fromstring(content)
title = tree.xpath('//h1[@class="font-product-title"]/text()')[0]
return title
except Exception as e:
with open(self.error_file, 'a', encoding=self.encoding) as file:
error = f"URL {url} failed to parse the TITLE. {e}"
file.write(error)
file.write("\n")
return None
def get_description(self, url, content):
try:
# The description can be broken into multiple tags. Added logic to try and find these scenarios
# Manually setting descriptions to None so that conditional statements can be used later.
description = None
description2 = None
tree = html.fromstring(content)
description = tree.xpath('//*[@id="productPage"]/div[1]/div/div[2]/div[2]/div[2]/p/text()')[0]
# description = tree.xpath('//*[@id="productPage"]/div[1]/div/div[2]/div[2]/div[2]/text()') # Tried pulling everythign returned, but it doesn't return the bolded text
return description
except:
try:
description = tree.xpath('//*[@id="productPage"]/div[1]/div/div[2]/div[2]/div[2]/text()')[0]
# Check for a second description, this occured when there were bold words
try:
bold = tree.xpath('//*[@id="productPage"]/div[1]/div/div[2]/div[2]/div[2]/strong/text()')[0]
description2 = tree.xpath('//*[@id="productPage"]/div[1]/div/div[2]/div[2]/div[2]/text()')[1]
except:
pass
if description2:
description = description + bold + description2
return description
except Exception as e:
with open(self.error_file, 'a', encoding=self.encoding) as file:
error = f"URL {url} failed to parse the DESCRIPTION.{e}"
file.write(error)
file.write("\n")
return None
def get_upc(self, url, content):
# UPC
try:
tree = html.fromstring(content)
td_list = tree.xpath('//td[@class="value"]/text()')
# The location of the UPC can move, so we search all of the results and look for a 12 digit value.
for i in td_list:
match = re.search(r"\d{12}", i)
if match:
upc = i
break
return upc
except Exception as e:
with open(self.error_file, 'a', encoding=self.encoding) as file:
error = f"URL {url} failed to parse the UPC. {e}"
file.write(error)
file.write("\n")
return None
def get_image(self, url, content, sku):
# Tries to grab the largest product image available.
try:
base_url = "/".join(url.split("/")[0:3]) # Storing base url like https://site.com
image_page_list = []
tree = html.fromstring(content)
site_images = tree.xpath('//img')
sku = sku.lower() # making sure comparisons don't fail due to case
for image in site_images:
image_name = base_url + image.attrib.get('src').lower()
if "product" in image_name and "large" in image_name and sku in image_name:
if "large" in image_name:
if image_name not in image_page_list:
image_page_list.append(image_name)
if not image_page_list:
for image in site_images:
image_name = base_url + image.attrib.get('src').lower()
if "product" in image_name and "medium" in image_name and sku in image_name:
if image_name not in image_page_list:
image_page_list.append(image_name)
if not image_page_list:
for image in site_images:
image_name = base_url + image.attrib.get('src').lower()
if "product" in image_name and "small" in image_name and sku in image_name:
if image_name not in image_page_list:
image_page_list.append(image_name)
all_images = ",".join(sorted(image_page_list))
return all_images
except Exception as e:
with open(self.error_file, 'a', encoding=self.encoding) as file:
error = f"URL {url} failed to parse the Image Path(s). {e}"
file.write(error)
file.write("\n")
return None
def get_pricing(self, pricing_sheet):
"""
Work in Progress
"""
try:
# load excel with its path
file = "path"
excel_data_df = pandas.read_excel(file)
# t = excel_data_df.to_dict()
t = excel_data_df.to_csv(index=False)
for row in t:
print(row)
except:
pass
def get_category(self, url):
try:
category_url_list = url.split("/")[:-1] # all but the last element of the url
category_url = "/".join(category_url_list) # remake the url
r = requests.get(category_url) # pull page content for the category page
tree = html.fromstring(r.content)
category = tree.xpath('//div[@class="h1-holder"]/h1/text()')[0]
return category
except Exception as e:
with open(self.error_file, 'a', encoding=self.encoding) as file:
error = f"URL {url} failed to parse the catgegory. {e}"
file.write(error)
file.write("\n")
return None
def get_page_info(self, urls, Price_Sheet=None):
"""
Requests content from URL's and isolates various product information on the page.
Product information is written to a CSV file.
"""
# Create Column Names for Result File
with open(self.results_file, 'w', newline='', encoding=self.encoding) as file:
fields = ['URL', 'SKU', 'TITLE', 'CATEGORY', 'DESCRIPTION', 'UPC', 'IMAGE_PATH(s)']
writer = csv.writer(file)
writer.writerow(fields)
for url in tqdm(urls, desc="Parsing Product Data from URLs"):
try:
r = requests.get(url)
# Isolated page info
sku = self.get_sku(url, r.content)
title = "Topaz" + " " + sku + " - " + self.get_title(url, r.content)
description = self.get_description(url, r.content)
upc = self.get_upc(url, r.content)
image_path = self.get_image(url, r.content, sku)
category = self.get_category(url)
# Write parsed date to results file
if Price_Sheet:
result = [url, sku, title, category, description, upc, image_path]
else:
result = [url, sku, title, category, description, upc, image_path]
with open(self.results_file, 'a', newline='', encoding=self.encoding) as file:
writer = csv.writer(file)
writer.writerow(result)
except Exception as e:
with open(self.error_file, 'a', encoding=self.encoding) as file:
error = f"URL {url} failed, no additional processing. {e}"
file.write(error)
file.write("\n")
result = [url, "FAILURE - SEE LOGS"]
with open(self.results_file, 'a', newline='', encoding=self.encoding) as file:
writer = csv.writer(file)
writer.writerow(result)
def main():
config_file = "config.json"
with open(config_file) as file:
config = json.load(file)
for key, value in config.items():
if key == "sitemap":
sitemap = value
if key == "zap_output":
zap_output = value
if key == "result_file":
result_file = value
if key == "error_file":
error_file = value
ps = ProductScrape(result_file, error_file)
# Pull URL's via sitemap
# urls = ps.get_sitemap_urls(sitemap)
# Pull URL's from OWASP ZAP Spider Output File
urls = ps.get_spider_urls(zap_output)
ps.get_page_info(urls)
if __name__ == "__main__":
main()