Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
mytests
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
songxinkai
mytests
Commits
62e14426
Commit
62e14426
authored
Oct 31, 2024
by
songxinkai
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Upload New File
parent
70bd660e
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
161 additions
and
0 deletions
+161
-0
python/a.py
+161
-0
No files found.
python/a.py
0 → 100644
View file @
62e14426
import
requests
from
bs4
import
BeautifulSoup
import
os
from
urllib.parse
import
urljoin
,
urlparse
import
time
import
random
import
re
class
WebCrawler
:
def
__init__
(
self
,
start_urls
,
base_output_dir
=
"output"
,
delay_min
=
1
,
delay_max
=
3
,
url_prefix
=
None
):
self
.
start_urls
=
start_urls
if
isinstance
(
start_urls
,
list
)
else
[
start_urls
]
self
.
domains
=
{
urlparse
(
url
)
.
netloc
for
url
in
self
.
start_urls
}
self
.
base_output_dir
=
base_output_dir
self
.
visited_urls
=
set
()
self
.
delay_min
=
delay_min
self
.
delay_max
=
delay_max
self
.
url_prefix
=
url_prefix
if
not
os
.
path
.
exists
(
base_output_dir
):
os
.
makedirs
(
base_output_dir
)
def
get_random_delay
(
self
):
return
random
.
uniform
(
self
.
delay_min
,
self
.
delay_max
)
def
sanitize_filename
(
self
,
filename
):
"""清理文件名,移除非法字符"""
if
not
filename
:
return
"index"
illegal_chars
=
r'[<>:"/\\|?*]'
filename
=
re
.
sub
(
illegal_chars
,
'_'
,
filename
)
return
filename
.
strip
()
or
"index"
def
download_page
(
self
,
url
):
try
:
headers
=
{
'User-Agent'
:
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response
=
requests
.
get
(
url
,
headers
=
headers
,
timeout
=
10
)
response
.
encoding
=
response
.
apparent_encoding
return
response
.
text
except
Exception
as
e
:
print
(
f
"Error downloading {url}: {str(e)}"
)
return
None
def
save_page
(
self
,
html_content
,
filepath
):
"""直接使用给定的文件路径保存页面"""
try
:
with
open
(
filepath
,
'w'
,
encoding
=
'utf-8'
)
as
f
:
f
.
write
(
html_content
)
print
(
f
"Saved: {filepath}"
)
except
Exception
as
e
:
print
(
f
"Error saving file {filepath}: {str(e)}"
)
def
should_crawl_url
(
self
,
url
):
if
any
(
url
.
startswith
(
start_url
)
or
url
==
start_url
.
rstrip
(
'/'
)
for
start_url
in
self
.
start_urls
):
return
True
if
self
.
url_prefix
:
return
url
.
startswith
(
self
.
url_prefix
)
return
True
def
normalize_url
(
self
,
url
):
return
url
.
split
(
'#'
)[
0
]
def
crawl
(
self
,
url
,
path_str
,
depth
=
3
):
url
=
self
.
normalize_url
(
url
)
if
url
in
self
.
visited_urls
or
depth
<=
0
:
return
should_crawl
=
self
.
should_crawl_url
(
url
)
self
.
visited_urls
.
add
(
url
)
if
not
should_crawl
:
print
(
f
"Skipping {url} - does not match prefix"
)
return
# 如果depth大于1,创建目录
if
depth
>
1
:
if
not
os
.
path
.
isdir
(
path_str
):
os
.
makedirs
(
path_str
,
exist_ok
=
True
)
print
(
f
"Crawling: {url} (Depth: {depth})"
)
html_content
=
self
.
download_page
(
url
)
if
not
html_content
:
return
soup
=
BeautifulSoup
(
html_content
,
'html.parser'
)
# 保存当前页面
current_filename
=
path_str
.
rstrip
(
"/"
)
+
".html"
self
.
save_page
(
html_content
,
current_filename
)
if
depth
<=
1
:
return
# 处理页面中的链接
links
=
soup
.
find_all
(
'a'
,
href
=
True
)
for
link
in
links
:
href
=
link
[
'href'
]
if
href
.
startswith
(
'#'
):
continue
absolute_url
=
urljoin
(
url
,
href
)
absolute_url
=
self
.
normalize_url
(
absolute_url
)
if
urlparse
(
absolute_url
)
.
netloc
in
self
.
domains
:
if
absolute_url
not
in
self
.
visited_urls
:
if
self
.
should_crawl_url
(
absolute_url
):
# 获取链接文本作为目录名
link_text
=
link
.
get_text
(
strip
=
True
)
if
link_text
:
new_path
=
os
.
path
.
join
(
path_str
,
self
.
sanitize_filename
(
link_text
))
delay
=
self
.
get_random_delay
()
print
(
f
"Next: {absolute_url} ({link_text}), depth: {depth - 1}. Waiting for {delay:.2f} seconds..."
)
time
.
sleep
(
delay
)
self
.
crawl
(
absolute_url
,
new_path
,
depth
-
1
)
def
start
(
self
,
depth
=
3
):
for
url
in
self
.
start_urls
:
print
(
"-"
*
50
)
print
(
f
"
\n
Starting crawl for: {url}"
)
self
.
crawl
(
url
,
os
.
path
.
join
(
self
.
base_output_dir
,
url
.
split
(
'/'
)[
-
2
]),
depth
)
print
(
f
"Completed crawl for: {url}"
)
def
main
():
start_urls
=
[
"https://www.guoxuemi.com/shuku/
%
d/"
%
i
for
i
in
range
(
1
,
12
)]
url_prefix
=
"https://www.guoxuemi.com/a/"
delay_min
=
0.0
delay_max
=
0.0
max_depth
=
3
base_output_directory
=
"downloaded_pages"
random
.
seed
(
time
.
time
())
crawler
=
WebCrawler
(
start_urls
=
start_urls
,
base_output_dir
=
base_output_directory
,
delay_min
=
delay_min
,
delay_max
=
delay_max
,
url_prefix
=
url_prefix
)
print
(
f
"Starting crawler with following settings:"
)
print
(
f
"Start URLs: {start_urls}"
)
print
(
f
"URL Prefix Filter: {url_prefix} (only for sub-links)"
)
print
(
f
"Random delay range: {delay_min} - {delay_max} seconds"
)
print
(
f
"Max depth: {max_depth}"
)
print
(
f
"Base output directory: {base_output_directory}"
)
print
(
"-"
*
50
)
crawler
.
start
(
depth
=
max_depth
)
print
(
"-"
*
50
)
print
(
f
"Crawling completed. Total pages visited: {len(crawler.visited_urls)}"
)
if
__name__
==
"__main__"
:
main
()
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment