Python爬虫+Go WebServer+Flutter App(Python篇)
文章目录1.前言2.Python爬虫获取内容2.1安装python1.前言对于一个应用来说,需要获取内容、服务端提供内容、客户端展示内容,这个三部分可以通过python,go,flutter结合从而实现一个应用。2.Python爬虫获取内容通过selenium调用浏览器内核,获取对应网页内容,并解析需要的内容,最后通过MySQL保存到数据库。2.1安装python...
文章目录
一.Python篇
相应 GitHub工程
二.GoLang篇
相应 GitHub工程
1.前言
对于一个应用来说,需要获取内容、服务端提供内容、客户端展示内容,这个三部分可以通过python,go,flutter结合从而实现一个应用。
2.Python爬虫获取内容
通过selenium调用浏览器内核,获取对应网页内容,并解析需要的内容,最后通过MySQL保存到数据库。
2.1安装python
前往官网下载安装包,我选择的python2
选择对应系统环境安装包,下载安装完成,设置环境变量
然后在终端输入"python --version",如果显示python版本则安装完成
zxl@zxl-7060:~$ python --version
Python 2.7.12
2.2安装pip
pip 是 Python 包管理工具,该工具提供了对Python 包的查找、下载、安装、卸载的功能。
如果未安装,则通过命令curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
下载安装脚本
再通过命令sudo python get-pip.py
安装脚本
通过命令pip --version
来判断是否安装成功
出现如下类似信息,则安装成功
zxl@zxl-7060:~$ pip --version
pip 19.1.1 from /home/mi/.local/lib/python2.7/site-packages/pip (python 2.7)
2.3安装selenium
Selenium 是一个用于Web应用程序测试的工具
通过python调用Selenium就像真正的用户在操作浏览器一样,可以很好的解决网页js加载等问题
pip install selenium
2.4选择浏览器驱动
以chrome为例,下载chrome driver,选择电脑中chrome浏览器对应版本的driver
2.4安装开发工具PyCharm
前往官网下载安装包,选择PyCharm下载
安装好开发工具,如下图所示创建好工程,准备开发
2.5安装MySQL
前往官网下载mysql,并进行安装
2.6安装mysql-connector
输入命令
pip install mysql-connector
这样就可以操作mysql数据库了
2.7网页请求
1.设置浏览器驱动位置
2.设置不打开浏览器进行网页请求
3.网页请求
#!/usr/bin/python
# coding=utf-8
import platform
from selenium import webdriver
class BaseRequest:
def get_web_content(self, url):
#chromedriver = "C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe"
chromedriver = "/Users/zxl/Downloads/chromedriver"
sysstr = platform.system()
if sysstr == 'Darwin':
chromedriver = "/Users/zxl/Downloads/chromedriver"
elif sysstr == 'Windows':
chromedriver = "D:\\my_github_workspace\\chromedriver.exe"
elif sysstr == 'Linux':
chromedriver = "/Users/zxl/Downloads/chromedriver"
# 创建chrome参数对象
opt = webdriver.ChromeOptions()
# 把chrome设置成无界面模式,不论windows还是linux都可以,自动适配对应参数
opt.set_headless()
prefs = {"profile.managed_default_content_settings.images": 2}
opt.add_experimental_option("prefs", prefs)
# 创建chrome无界面对象
driver = webdriver.Chrome(executable_path=chromedriver, options=opt)
driver.get(url)
return driver
2.8网页内容解析
1.网页请求成功后,获取到该网页对象driver
2.通过xpath进行页面标签解析
3.解析完成关闭浏览器driver
#!/usr/bin/python
# coding=utf-8
import datetime
import hashlib
import re
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
from com_zxl_spider_db.JokeDB import JokeDB
from com_zxl_spider_request.BaseRequest import *
from com_zxl_spider_data.JokeBean import *
class RequestQsbkTxt(BaseRequest):
def __init__(self):
global jokeDB
jokeDB = JokeDB()
def parse(self, end_url, index):
print "parse::end_url = ", end_url, "::index = ", index
driver = self.get_web_content("https://www.qiushibaike.com/" + end_url + str(index))
elem1 = WebDriverWait(driver, 10).until(
expected_conditions.presence_of_element_located((By.XPATH, '//ul[@class="pagination"]')))
print "elem1 = ", elem1
elem2 = WebDriverWait(driver, 10).until(
expected_conditions.presence_of_element_located((By.XPATH, '//div[@class="article block untagged mb15"]')))
print "elem2 = ", elem2
# page_source = driver.page_source
isFindNextPage = False
paginationObject = driver.find_element_by_xpath('//ul[@class="pagination"]')
pageListObject = paginationObject.find_elements_by_xpath('.//li')
for pageItemObject in pageListObject:
page_index_txt = pageItemObject.text
print "pageItemObject::page_index_txt = ", page_index_txt
itemFindResult = re.findall(".*?(\d+).*?", page_index_txt)
print "pageItemObject::itemFindResult = ", itemFindResult
if len(itemFindResult) > 0:
if int(itemFindResult[0]) > index:
index = int(itemFindResult[0])
isFindNextPage = True
break
# if index - int(itemFindResult[0]) == 1:
# index = int(itemFindResult[0])
# isFindNextPage = True
# break
print "parse::isFindNextPage = ", isFindNextPage, "::index = ", index, "::end_url = ",
hotPicJokeItemPath = '//div[@class="article block untagged mb15"]'
hotPicJokeItems = driver.find_elements_by_xpath(hotPicJokeItemPath)
print 'hotPicJokeItems length = ', len(hotPicJokeItems)
for hotPicJokeItem in hotPicJokeItems:
jokeId = hotPicJokeItem.get_attribute('id')
md5Object = hashlib.md5()
md5Object.update(jokeId.encode('utf-8'))
jokeMd5Value = md5Object.hexdigest()
authorObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="author clearfix"]')
authorNickObject = authorObject.find_element_by_xpath('.//h2')
authorNickName = authorNickObject.text
authorImgObject = authorObject.find_element_by_xpath('.//img')
authorImgUrl = authorImgObject.get_attribute('src')
authorGender = ''
authorAge = -1
try:
authorGenderObject = authorObject.find_element_by_xpath(".//div[starts-with(@class,'articleGender')]")
authorGender = authorGenderObject.get_attribute('class')
authorAge = authorGenderObject.text
except NoSuchElementException as e:
print e
contentObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="content"]')
content = contentObject.text
thumbImgUrl = ''
try:
thumbObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="thumb"]')
thumbImgObject = thumbObject.find_element_by_xpath('.//img')
thumbImgUrl = thumbImgObject.get_attribute('src')
except NoSuchElementException as e:
print e
statsVoteContent = ''
statsCommentContent = ''
statsCommentDetailUrl = ''
try:
statsObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="stats"]')
try:
statsVoteObject = statsObject.find_element_by_xpath('.//span[@class="stats-vote"]')
statsVoteContent = statsVoteObject.text
except NoSuchElementException as e:
print e
try:
statsCommentObject = statsObject.find_element_by_xpath('.//span[@class="stats-comments"]')
statsCommentContent = statsCommentObject.find_element_by_xpath(
'.//a[@class="qiushi_comments"]').text
statsCommentDetailUrl = statsCommentObject.find_element_by_xpath(
'.//a[@class="qiushi_comments"]').get_attribute('href')
except NoSuchElementException as e:
print e
except NoSuchElementException as e:
print e
# print authorNickName
# print authorGender
# print authorAge
# print authorImgUrl
# print content
# print thumbImgUrl
# print statsVoteContent
# print statsCommentContent
# print statsCommentDetailUrl
# print jokeId
# print jokeMd5Value
# print '\n'
# print '======================================end=========================================='
# print '\n'
joke_bean = JokeBean()
joke_bean = joke_bean.create_joke_bean(
authorNickName.encode('utf-8'),
authorGender,
authorAge,
authorImgUrl,
content.encode('utf-8'),
thumbImgUrl,
statsVoteContent,
statsCommentContent,
statsCommentDetailUrl,
jokeMd5Value)
isExistJokeItem = jokeDB.query_by_md5(jokeMd5Value)
print isExistJokeItem
if isExistJokeItem is None:
print "not ExistJokeItem"
jokeDB.insert_joke(joke_bean)
else:
print "ExistJokeItem"
driver.close()
return
print "==============end================="
print "\n"
driver.close()
if not isFindNextPage:
return
else:
self.parse(end_url, index)
def clas_db(self):
if jokeDB is not None:
jokeDB.close_db()
def start_task(self):
print "start_task::", 'Now Time::', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.parse("pic/page/", 1)
self.clas_db()
if __name__ == "__main__":
request = RequestQsbkTxt()
# request.parse("pic/page/", 1)
request.parse("pic/page/", 1)
request.clas_db()
2.9数据集合
1.根据解析内容构造数据集合
#!/usr/bin/python
# coding=utf-8
class JokeBean:
def create_joke_bean(self,
author_nick_name,
author_gender,
author_age,
author_img_url,
content,
thumb_img_url,
stats_vote_content,
stats_comment_content,
stats_comment_detail_url,
md5):
bean = {'author_nick_name': author_nick_name,
'author_gender': author_gender,
'author_age': author_age,
'author_img_url': author_img_url,
'content': content,
'thumb_img_url': thumb_img_url,
'stats_vote_content': stats_vote_content,
'stats_comment_content': stats_comment_content,
'stats_comment_detail_url': stats_comment_detail_url,
'md5': md5}
return bean
2.10保存数据库
1.设置数据库ip地址,端口号
2.设置连接数据用户名,密码
3.设置连接的数据库名称
4.如果数据库不存在,则创建数据库,并创建相应的表
5.设置增删改查操作
#!/usr/bin/python
# coding=utf-8
import mysql.connector
from mysql.connector import errorcode
class BaseDB:
host = 'zxltest.zicp.vip'
port = '42278'
urser_name = "***"
pass_word = "***"
db_name = 'joke'
CREATE_TABLE_SQL = ("")
def __init__(self):
global cnx
global cursor
try:
cnx = mysql.connector.connect(user=self.urser_name, password=self.pass_word, host=self.host, port=self.port, database=self.db_name)
cursor = cnx.cursor()
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
exit(1)
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
cnx = mysql.connector.connect(user=self.urser_name, password=self.pass_word, host=self.host, port=self.port)
cursor = cnx.cursor()
self.__create_database()
self.__create_table()
else:
print(err)
exit(1)
else:
self.__create_table()
print("DBUtil init finish")
def __create_database(self):
try:
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(self.db_name))
cnx.database = self.db_name
print("Create database finish")
except mysql.connector.Error as err:
print("Failed creating database: {}".format(err))
exit(1)
def __create_table(self):
# for name, ddl in CityDB.TABLES.iteritems():
print "create table::", self.CREATE_TABLE_SQL
try:
print("Creating table {}: ".format(self.CREATE_TABLE_SQL),)
cursor.execute(self.CREATE_TABLE_SQL)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
print("already exists.")
else:
print(err.msg)
exit(1)
else:
print("OK")
def query(self, sql_str):
print "query::", sql_str
cursor.execute(sql_str)
return cursor
def insert(self, sql_str, param):
cursor.execute(sql_str, param)
cnx.commit()
def update(self, sql_str):
cursor.execute(sql_str)
cnx.commit
def delete(self, sql_str):
cursor.execute(sql_str)
cnx.commit()
def close_db(self):
cursor.close()
cnx.close()
#!/usr/bin/python
# coding=utf-8
import mysql
from mysql.connector import errorcode
from com_zxl_spider_data.JokeBean import JokeBean
from com_zxl_spider_db.BaseDB import BaseDB
class JokeDB(BaseDB):
TABLE_NAME = 'joke'
COLUME_ID = 'id'
COLUME_AUTHOR_NICK_NAME = 'author_nick_name'
COLUME_AUTHOR_GENDER = 'author_gender'
COLUME_AUTHOR_AGE = 'author_age'
COLUME_AUTHOR_IMG_URL = 'author_img_url'
COLUME_CONTENT = 'content'
COLUME_THUMB_IMG_URL = 'thumb_img_url'
COLUME__STATS_VOTE_CONTENT = 'stats_vote_content'
COLUME_STATS_COMMENT_CONTENT = 'stats_comment_content'
COLUME_STATS_COMMENT_DETAIL_URL = 'stats_comment_detail_url'
COLUME_MD5 = 'md5'
CREATE_TABLE_SQL = (
"CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " ("
" " + COLUME_ID + " bigint(20) NOT NULL AUTO_INCREMENT,"
" " + COLUME_AUTHOR_NICK_NAME + " varchar(16),"
" " + COLUME_AUTHOR_GENDER + " text,"
" " + COLUME_AUTHOR_AGE + " text,"
" " + COLUME_AUTHOR_IMG_URL + " text,"
" " + COLUME_CONTENT + " text,"
" " + COLUME_THUMB_IMG_URL + " text,"
" " + COLUME__STATS_VOTE_CONTENT + " text,"
" " + COLUME_STATS_COMMENT_CONTENT + " text,"
" " + COLUME_STATS_COMMENT_DETAIL_URL + " text,"
" " + COLUME_MD5 + " text,"
" PRIMARY KEY (" + COLUME_ID + ")"
") ENGINE=InnoDB")
INSERT_JOKE_SQL = ("INSERT INTO " + TABLE_NAME + " ("
+ COLUME_AUTHOR_NICK_NAME + ","
+ COLUME_AUTHOR_GENDER + ","
+ COLUME_AUTHOR_AGE + ","
+ COLUME_AUTHOR_IMG_URL + ","
+ COLUME_CONTENT + ","
+ COLUME_THUMB_IMG_URL + ","
+ COLUME__STATS_VOTE_CONTENT + ","
+ COLUME_STATS_COMMENT_CONTENT + ","
+ COLUME_STATS_COMMENT_DETAIL_URL + ","
+ COLUME_MD5
+ ") "
+ "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)")
QUERY_JOKE_BY_MD5 = ("SELECT "
+ COLUME_AUTHOR_NICK_NAME + ","
+ COLUME_AUTHOR_GENDER + ","
+ COLUME_AUTHOR_AGE + ","
+ COLUME_AUTHOR_IMG_URL + ","
+ COLUME_CONTENT + ","
+ COLUME_THUMB_IMG_URL + ","
+ COLUME__STATS_VOTE_CONTENT + ","
+ COLUME_STATS_COMMENT_CONTENT + ","
+ COLUME_STATS_COMMENT_DETAIL_URL + ","
+ COLUME_MD5
+ " FROM " + TABLE_NAME
+ " WHERE " + COLUME_MD5 + " = '%s'")
def create_insert_data(self, joke_bean):
return (
joke_bean['author_nick_name'],
joke_bean['author_gender'],
joke_bean['author_age'],
joke_bean['author_img_url'],
joke_bean['content'],
joke_bean['thumb_img_url'],
joke_bean['stats_vote_content'],
joke_bean['stats_comment_content'],
joke_bean['stats_comment_detail_url'],
joke_bean['md5']
)
def insert_joke(self, joke_bean):
self.insert(self.INSERT_JOKE_SQL, self.create_insert_data(joke_bean))
def query_by_md5(self, md5):
cursor = self.query(self.QUERY_JOKE_BY_MD5 % (md5,))
for (COLUME_AUTHOR_NICK_NAME,
COLUME_AUTHOR_GENDER,
COLUME_AUTHOR_AGE,
COLUME_AUTHOR_IMG_URL,
COLUME_CONTENT,
COLUME_THUMB_IMG_URL,
COLUME__STATS_VOTE_CONTENT,
COLUME_STATS_COMMENT_CONTENT,
COLUME_STATS_COMMENT_DETAIL_URL,
COLUME_MD5) in cursor:
jokeBean = JokeBean()
return jokeBean.create_joke_bean(COLUME_AUTHOR_NICK_NAME,
COLUME_AUTHOR_GENDER,
COLUME_AUTHOR_AGE,
COLUME_AUTHOR_IMG_URL,
COLUME_CONTENT,
COLUME_THUMB_IMG_URL,
COLUME__STATS_VOTE_CONTENT,
COLUME_STATS_COMMENT_CONTENT,
COLUME_STATS_COMMENT_DETAIL_URL,
COLUME_MD5)
return None
更多推荐
所有评论(0)