Python快速进阶知识点【岗前必备技能】
Python 课程方向:Web全栈(前端Vue,后端服务(前端App-小程序、Android/iOS 手机App、 H5 响应式网站)、 自动化测试(UI/功能性测试、 接口测试)、开发运维(Linux、部署、 Docker) ) 、 数据分析+AI+开发(爬虫、Excel文件处理、商业化Power BI、Numpy+Pandas+Notebook+sklearn机器学习算法模型、 Tensorf
Python 课程方向: Web全栈(前端Vue,后端服务(前端App-小程序、 Android/iOS 手机App、 H5 响应式网站)、 自动化测试(UI/功能性测试、 接口测试)、开发运维(Linux、部署、 Docker) ) 、 数据分析+AI+开发(爬虫、Excel文件处理、商业化Power BI、Numpy+Pandas+Notebook+sklearn机器学习算法模型、 Tensorflow/keras/PyTouch 深度机器 学习框架)
【重要提示】感兴趣的同学,可以加Disen老师的QQ(610039018)
一、Python基础
1.1 数据类型
1.1.1 常用的数据类型
- 数值类型: int 整数, float 小数(浮点)
- 字符类型: str, 表示: “文本信息"或"文本信息” , “”“多行文本”""
- 布尔类型: bool , 表示: True 或 False
- 字节类型: bytes, 表示: b""
1.1.2 集合类型
- 列表 list, 表示: [ ], 列表推导式
- 元组tuple 表示: (), 一个元素元素: (1, )
- 字典 dict , 表示 { } , key对象的类型: 一切可被hash对象
- 集合 set, 表示 set(), 特性: 无序、不可重复
- 生成器和迭代器-> 可迭代对象
- 生成器的创建两种方式: ()推导式 和 yield所在的函数
- 迭代器: 将其它集合类型通过iter()函数转换
1.1.3 类型转换函数
-
int(str, base=10), 如 int(“0b1120”, base=16)
-
float(str)
-
str(obj) 将任意对象转成字符串
-
list(可迭代对象)
-
tuple([[1, 2], ])
-
dict() ,传参的格式: [ [key, value], …] , ((k, v), …)
字典对象的所有方法: dir(dict)
"clear", "copy", "fromkeys", "get", "items", "keys", "pop", "popitem", "setdefault", "update", "values"
作业-1: 使用多种方式创建字典对象,并练习字典对象中所有的方法。
import uuid def save(**kwargs): kwargs.setdefault("id", uuid.uuid4().hex) db.save(sql, **kwargs) # 保存-新插入一条记录 save(name="disen", age=20) save(id=1, name="lucy", age=30)
-
问题: 创建字典的方法有哪些?
-
{key: value, }
-
dict(zip(keys, values))
dict([(k, v), (k, v)])
dict([ [k, v], [k, v] ])
-
dict(key=value, key=value)
-
字典推导式 { k:0 for k in range(1, 10) }
- 给定一个字典对象,用尽量少的代码实现key和value互换?
- { v:k for k, v in dict1.items() }
-
dict.fromkeys([k1, k2, k3], None)
-
-
字典对象可迭代吗?迭代的是什么?答: 可以的,迭代的key
作业- 2: 任意创建字典对象,按value中的数值和字符的ASCII值进行排序。
如: {“a”: “18ac”, “b”:“9bb”, “c”:“18ad”} 排序后的结果是:
{“b”:“9bb”, “a”: “18ac”, “c”:“18ad” }
import re d={"a": "18ac", "b":"9bb", "c":"18ad"} def padding(item): value = item[-1] new_digits = re.findall(r"\d+", value)[0].rjust(4, "0") return re.sub(r"\d+", new_digits, value) dict(sorted(d.items(), key=padding))
-
-
bool() 可以将任意对象
- 哪些对象可以表示为bool的False?
- 0 、 ""空字符串、[], None, () , { } , set()
- 0.0、b""
- 哪些对象可以表示为bool的False?
-
set()
- add()
- remove()
- pop()
-
iter()
-
ord(“ASCII字符”) 将ASCII字符转成数值
-
chr(n) 将数值转成ASCII字符
案例: 随机产生n位长度的验证码
import random
def random_code(_len):
"""
生成由大小写字母和数字组成的指定长度的验证码
要求: 不可重复
"""
ret = set()
while len(ret) < _len:
choise = random.randint(0, 2)
start_, end_ = (ord("0"), ord("9")) if choise == 0 else \
(ord("a"), ord("z")) if choise == 1 else \
(ord("A"), ord("Z"))
c_ = chr(random.randint(start_, end_))
ret.add(c_)
return "".join(ret)
- zip(a, b ) 将两个列表转成一个列表,按索引组成一行。
1.1.4 其它类型
主要讲解的是collections模块的集合类型。
import collections
1.1.4.1 namedtuple
有命名的元组类, 作用于常量类。一般的类实例对象可以动态增加属性和修改属性值,但是namedtuple声明的类,不能修改属性及属性值的。
-
help(collections.namedtuple)
-
示例
Person = collections.namedtuple("Person", ["id", "name", "age", "sex"])
p1 = Person("1001", "disen", 20, "男")
# 报错 AttributeError
# p1.id="1002"
p1.id # 显示id属性值,正常的
1.1.4.2 OrderedDict
有序字典类, 与dict区别是: dict是无序的,OrderedDict可以移动key在任意的位置。
# 正常的字典
p1 = {}
p1["name"] ="lucy"
p1["age"] = 20
p1["birthday"] = "2009-10-11"
# p1
# {"age": 20, "birthday": "2009-10-11", "name": "lucy"}
# 以上的结果,输入和输出的顺序是不一致的,因此,称普通的字典是无序的
# 使用有序字典
p2 = collections.OrderedDict()
p2["age"] = 20
p2["name"] = "disen"
p2["birthday"] = "2009-10-11"
# p2
# OrderedDict([("age", 20), ("name", "disen"), ("birthday", "2009-10-11")])
p2.move_to_end("name", last=False) # last=True表示将key移动到最后的位置
作业-3: 参照OrderedDict类,自定义dict类实现有序字典的功能。
#!/usr/bin/python3
# coding: utf-8
class MyOrderedDict(dict):
dict_list = []
def __getitem__(self, key):
print("--getitem-->", key)
# 调用父类的方法: self[key] 获取字典中key值
return super().__getitem__(key)
def __setitem__(self, key, value):
self.dict_list.append((key, value))
# 调用父类的方法,设置key的value值--》正常字典功能
super(MyOrderedDict, self).__setitem__(key, value)
def move_to_end(self, key, last=True):
k_v = (key, self[key])
self.dict_list.remove(k_v)
if last:
self.dict_list.append(k_v)
else:
self.dict_list.insert(0, k_v)
def __str__(self):
return str(f"MyOrderedDict({self.dict_list})")
d1 = MyOrderedDict()
d1["name"] = "disen"
d1["age"] = 20
d1["sex"] = "男"
print(d1)
print(d1["name"])
d1.move_to_end("name")
print(d1)
1.2 函数
面试题:
def f(a):
f.a += a
print(f.a)
f.a = 100
f(100)
【提示】在Python中,一切皆对象。
1.2.1 函数的参数
- 位置参数传值: 从左到右的顺序,依次传值
- 关键参数传值: 根据参数的名称传值, 关键参数必须在位置参数的后面。
import os
# 实现文件(图片、文本、word、excel)的移动或复制
def copy_files(src_path, dst_path, copy=False):
# 作业-4: 参考 cp 和 mv 命令
if copy:
os.system(f"cp {src_path} {dst_path}")
else:
os.system(f"mv {src_path} {dst_path} ")
# 调用函数
copy_files("a.txt", "/files") # 移动a.txt文件到/files目录中
copy_files(dst_path="b.txt", src_path="a.txt", copy=True) # 复制a.txt为b.txt
parmas = {
"src_path": "a.txt",
"dst_path": "b.txt",
"copy": True
}
copy_files(**params) # 解构成 key=value, key=value 的形式, **解析字典的
# 实现多个数值累加计算
def cumsum(*args):
return sum(args)
# 生成多个数的元组
nums = (1, 2, 3) # tuple((i for i in range(1, 11)))
cumsum(*nums) # *解析元组, 将元素按位置顺序向函数传值
# 扩展:
nums = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
a, *_, b = nums
1.2.2 回调函数
当某一项任务完成后,由另一个函数将任务完成的结果回传给调用者,这个函数称之为回调函数 。
同步调用和异步调用的区别在于是否使用回调函数 。
同步调用的示例:
import time
import random
def sum(a, b):
time.sleep(random.uniform(3, 5))
return a+b
c = sum(10, 20) # 同步获取函数返回的结果
c2 = sum(100, 200)
print(c, c2)
异步调用的示例:
import time
import random
import threading
def sum(a, b, callback):
time.sleep(random.uniform(3, 5))
ret = a+b
callback(ret) # 回传数据
def receive_data(data):
"""接收数据"""
print("-->", data)
t1 = threading.Thread(target=sum, args=(10, 20, receive_data))
t2 = threading.Thread(target=sum, args=(100, 200, receive_data))
t1.start() # 异步执行
t2.start()
# 等待子线完成
t1.join()
t2.join()
print("Game Over!")
1.2.3 匿名函数
匿名(lambda)函数是回调函数的一种用法(写法)。
f=2x+5
为例定义匿名函数,代码如下:
f = lambda x: 2*x + 5
print(f(10))
面试题: 是什么是lambda函数?它有什么优点?
- 什么是lambda函数?
- 是匿名函数
- 参数可以任意多个
- 表示式即是函数的返回结果
- 它有什么优点?
- 简洁
- 减少内存使用
# 如果 x < 10 ,则表达式为 x**2 + 5
# 如果 x > 10 , 则表示式为 x*2 + 10
f = lambda x: x**2 + 5 if x < 10 else x*2 + 10
f(20) # 50
f(2) # 9
1.2.4 高阶函数
-
sorted() 排序
# key是自定义排序的函数 sorted(iterable, key=None, reverse=False) 返回一个可迭代对象的副本。
a = [(100, 20, 30), (90, 30, 19), (10, 85, 65)] # 默认按元组中第一个元素进行排序的 # 按元组中第三个数进行排序 sorted(a, key=lambda item: item[-1])
-
filter() 过滤
过滤函数,通过自定义函数规则 ,则返回为True元素进行保留。
files = ["a.txt", "b.png", "_.text_c.jpg", "d.txt", "e.txt"] # 从文件列表中提取文本文件 # list(filter(lambda filename: filename.endswith(".txt"), files)) for file in filter(lambda filename: filename.endswith(".txt"), files): print(file)
-
map() 映射
映射函数,将内容转成相应的数据,并返回。
import random scores = [ round(random.uniform(30, 100),1) for _ in range(10)] scores_labels = list(map(lambda s: "优" if s>90 else "良" if s>80 else "及格" if s>=60 else "不及格", scores))
-
reduce() 递归计算
reduce() 递归计算用于累积计算的,如累加和、阶乘等。
import functools # 计算 n 以内的阶乘 # n*(n-1)*(n-2)*...*1 def reduce_(x, y): print(x, y) return x*y # functools.reduce(lambda x, y: x*y , [n for n in range(1, 7)]) functools.reduce(reduce_, [n for n in range(1, 7)])
1.3 文件操作
1.3.1 open函数
def open(filepath, mode="r", encoding=None, ...)
默认打开模式: "r" 只读-权限, "t" 文本-文件类型
常用模式的组合:
r+ 读写文本,如果文件不存在,则会报错(异常)
r+b 读写字节数据(媒体文件: 图片、音频、视频、office文件), 如果文件不存在,则报错
w+ 读写文本, 如果文件不存在,则会自动创建
a+/a+b 追加文本或字节数据,如果文件不存在,则会创建。
encoding参数: 只针对读取文本数据模式, 即指定文本数据的编码(名词), 如果是"b"模式时,则不需要指定这个参数,如果指定则会抛出异常。
# 任务1: 通过编程方式生成一个helloworld的Python脚本文件,并通过os.system()实现脚本的执行。
import os
file_name = "hello.py"
py_file = open(file_name, "w") # 打开一个文件,并返回一个stream流对象
content = """
print("hi, Disen!")
"""
py_file.write(content)
py_file.close() # 关闭文件流对象
# 执行这个脚本
os.system(f"python { file_name }")
# 任务2: 生成10万8位长度的验证码,并写入到codes.txt文件中。
# 注:每一行是一个验证码。
import random
def random_code(_len):
"""
生成由大小写字母和数字组成的指定长度的验证码
要求: 不可重复
"""
ret = set()
while len(ret) < _len:
choise = random.randint(0, 2)
start_, end_ = (ord("0"), ord("9")) if choise == 0 else \
(ord("a"), ord("z")) if choise == 1 else \
(ord("A"), ord("Z"))
c_ = chr(random.randint(start_, end_))
ret.add(c_)
return "".join(ret)
def new_code(filename, size=100000):
file = open(filename, "a", encoding="utf-8")
n = 0
print("--开始生成验证码--")
while n<size:
file.write(random_code(8)+"\n")
n += 1
print("--生成成功--")
file.close()
# 任务3: 如何以最优的方式读取codes.txt文件中所有的验证码。
# [提示] 文件流对象可以被迭代
def read_code(filename):
file = open(filename)
# 文件流对象可以被迭代
for line in file:
print(line)
file.close()
1.3.2 with上下文
with 是Python中管理上下文环境时的关键字, 当某一个对象在使用with时,则会存在两个节点,即当对象进入上下文时,则调用对象的__enter__()
方法,此方法返回一个对象,在with表达式中,通过as关键字来接收这个对象。当代码执行完,则会退出上下文,此时会调用对象的__exit__()
方法。
class MyClass():
def __enter__(self):
# 当对象进入上下文环境时,则初始化资源,如打开文件,打开网络连接等
print("--进入上下文环境时---")
return self
def __exit__(self, excep_type, excep_val, excep_tb):
print("--准备退出上下文---")
if excep_type:
print("--出现的异常-", excep_val)
return True # 自已处理异常,不会向解释器抛出异常
with MyClass() as obj:
# obj 已进入上下文
print("处理业务")
raise Exception("想找点事")
# obj对象退出上下文
print("--业务处理完成--")
# 使用with的文件操作
with open("codes.txt") as f:
for line in f:
print(line)
1.4 正则表达式
1.4.1 正则模块
import re
re模块中常用的函数:
-
compile(pattern, flags) 生成正则对象,一次生成对象,可以被多次调用。
-
match(pattern, string, flags=0) 如果pattern正则表达式匹配了string内容,则返回match对象,反之返回None。一般用于验证string的完整性(手机号、身份证、邮箱)。
-
search(pattern, string, pos=0, flags=0) 默认从pos的索引下标开始, 匹配第一次的内容
-
findall(pattern, string, flags=0) 查找所有与正则匹配的数据,返回list。
-
sub(pattern, replace_str, string, flags=0) 在string文本中,使用replace_str替换正则匹配的内容
# 验证手机号的合法性
pattern= re.compile(r"1[3-57-9]\d{9}") # 创建手机号正则对象
if pattern.match("17791692095"):
print("正确的")
else:
print("Error", "17791692095")
# 提取文本中的数字或手机号
texts = ["源自4075位住客点评","源自1075位住客点评","源自75位住客点评"]
pattern2 = re.compile(r"\d+")
# s = pattern2.search(texts[0])
# s.group(), s.groups(), s.groupdict(), s.string
nums = [ pattern2.search(text).group()
for text in texts
]
# findall()方法
nums = pattern2.findall("".join(texts))
# 使用sub()方法,将点评的数量替换成10000
texts2 = [
pattern2.sub("10000", text)
for text in texts
]
正则中的flags参数表示,正则对象匹配的标识,常用的如下所示:
re.S 全称是 DOTALL 表示 .代表任意字符
re.I 全称是 IGNORECASE 表示忽略大小写
re.M 全称是 MULTILINE 表示多行匹配
text = "hello, Disen!"
re.findall(r"disen", text, re.I)
text = """
A: 190 \n
B: 200 \n
C: 120 \n
"""
re.findall(r"\d+", text, re.I|re.M)
# [\u4e00-\u9fa5] 中文表达式的范围
# 查找文本内的中文信息
text = "hi, 狄老师, 下午早点结束!go, go, go!"
re.findall(r"[\u4e00-\u9fa5]+", text)
day02: 中午默写题
- 什么是lambda函数
- 是一个匿名函数
- 可以接收多个参数
- 表达的结果即是函数返回的结果
- 尽可能多地写出类型转换函数
int() float() bool() str()
list() set() tuple() dict() iter()
ord() chr()
- 描述*和**的作用
- 运算: * 乘法, ** 幂次方
- 函数参数:
- * 修饰位置参数, 表示tuple类型
- ** 修饰关键参数, 表示dict类型
- 解构数据:
- * 解构元素
- ** 解构字典
1.4.2 正则的转义字符
\d 任意一个数字 , 表示[0-9]
\D 非任意一个数字
\w 任意一个数字、字母、下划线或中文【Python中】 ,表示 [a-zA-Z0-9_]
\W 非任意一个数字、字母、下划线或中文
\s 任意一个空白(<Space>空格、<Tab>制表符、<Enter>换行符等)
\S 非任意一个空白
1.4.3 贪婪模式
贪婪模式是指量词贪婪, 即想要的更多。
以下量词表示是贪婪的:
* 0或多个
+ 至少1个
? 0或1个
{n, m} 至少n个,最多m个
{n, } 至少n个
禁止贪婪的写法是在量词表示的后面 加一个"?"字符,如:
# abcdgood123
(.*?)good ->
.??
# 作业1: 使用正则提取所有图片的src地址
html = """
<ul>
<li><img src="images/1.png" ></li>
<li><img src="images/2.png" ></li>
<li><img src="images/3.png" ></li>
</ul>
"""
1.5 异常处理
为什么要处理异常?
为了保证程序的健壮性和容错性,避免程序崩溃,应该在可能存在异常的位置,来捕获异常并根据实际情况来处理。
1.5.1 常用的四种结构
# 1. try-except
try:
f = open("abc.txt")
print(f.read())
except:
print("abc.txt", "文件不存在")
try:
f = open("abc.txt")
print(f.read())
except Exception as e:
print("abc.txt", "文件不存在")
try:
f = open("abc.txt")
print(f.read())
except OSError as e:
print("abc.txt", "OS ->文件不存在")
except Exception as e:
print("abc.txt", "文件不存在")
# 2. try-except-else
try:
f = open("abc.txt")
except:
print("文件不存在")
else:
# 无异常,文件存在
print(f.read())
# 3. try-except-finally
# try语句是否存在作用域(有效范围): 不存在
try:
f = open("/Users/apple/nxu_pm.sql")
print(f.read())
except:
print("文件不存在")
finally:
if f:
f.close() # 关闭文件
# 4. try-except-else-finally
try:
f = open("/Users/apple/nxu_pm.sql")
except:
# 存在异常
print("文件不存在")
else:
# 不存在异常
print(f.read())
finally:
# 无论是否存在异常,总会执行的
if f:
f.close() # 关闭文件
1.5.2 自定义异常
常见的异常类:
KeyError: 字典不存在key的错误
AttributeError: 对象不存在属性的错误
TypeError: 数据类型错误
OSError: 打开文件流的错误
IndexError: 索引下标错误
如何自定义异常:
# 为了快速定义错误信息,可以自定义异常
# 封装SDK(Software Development Kit),根据业务功能,会自定义异常类。
class DBError(Exception):
pass
class FileError(OSError):
pass
try:
raise DBError("数据库连接失败")
except Exception as e:
print(e)
1.6 日志模块
Python的日志模块是: logging
import logging
1.6.1 日志的四大部分
日志是用于记录(Logger)程序中的一些重要信息的,记录的信息按等级(Level)交给特定的处理器Handler按一定的格式(Formatter)进行处理(打印、文件保存、上传网络和发送邮件等)。
1.6.1.1 记录器Logger
# 如果未指定记录器名称,则是默认root的记录器
logger = logging.getLogger("network")
# DEBUG 调试->INFO 消息->WARNING 警告->ERROR 错误->CRITICAL 严重错误
logger.setLevel(logging.INFO)
# 向日志系统中输入消息
logger.debug(msg)
logger.info(msg)
logger.warning() 或 logger.warn()
logger.error()
logger.fatal() 或 logger.critical()
1.6.1.2 日志处理器 Handler
from logging import StreamHandler, FileHandler
from logging.handlers import HTTPHandler, SMTPHandler
- StreamHandler 流处理器, 控制台打印的
- FileHandler 文件处理器,将日志消息写入到文件中
- HTTPHandler 网络处理器,将日志消息上传到服务器
- SMTPHandler 邮件处理器,将日志消息发送到指定邮件中。
日志记录器和处理器的关系:
一个记录器可以添加多个处理器。
logger.addHandler(handler1)
logger.addHandler(handler2)
logger = logging.getLogger("network")
logger.setLevel(logging.INFO)
handler1 = StreamHandler()
handler1.setLevel(logging.INFO)
handler2 = FileHandler("network.log")
handler1.setLevel(logging.WARNING)
logger.addHandler(handler1)
logger.addHandler(handler2)
1.6.1.3 处理器的格式化Formatter
每一个日志处理器都应用设置它的日志格式,核心如下:
from logging import Formatter
logger_fmt = "[ %(asctime)s %(name)s %(levelname)s ] %(message)s"
date_fmt = "%Y-%m-%d %H:%M:%S"
fmt = Formatter(fmt=logger_fmt, datefmt=date_fmt)
handler1.setFormatter(fmt)
handler2.setFormatter(fmt)
1.6.1.4 处理器的过滤器Filter
自已了解。类似于filter()函数。
from logging import Filter
class MessageFilter(Filter):
def filter(self, record):
print(dir(record))
# 过滤msg中包含‘警告’信息的日志记录
if record.msg.find("警告") > -1:
return False # False表示不要的
return True # True表示需要记录的
# 向日志记录器中添加过滤器
logger.addFilter(MessageFilter())
1.6.2 日志的文件处理器
文件处理器是将日志消息,按照某种格式写入文件中。
from logging import FileHandler
from logging.handlers import TimedRotatingFileHandler
TimedRotatingFileHandler(filename, when="h", interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None)
1.6.3 日志的网络处理器
from logging.handlers import HTTPHandler
# host 表示ip地址和端口号, 如: localhost:8000
# url 表示path请求路径, 如 /upload_log/
HTTPHandler(host, url, method="GET")
# 不需要设置formatter格式化,会将日志记录的所有消息都会上传到服务器。
1.6.4 日志的邮件处理器
from logging.handlers import SMTPHandler
# mailhost: 邮件的服务器地址和端口号 如 (smtp.163.com, 465|25)
# credentials: 包含(username, password)
SMTPHandler(mailhost, fromaddr, toaddrs, subject,credentials=None)
handler4 = SMTPHandler("smtp.163.com",
"disen666888@163.com", "610039018@qq.com",subject="系统异常提示",
credentials=("disen666888@163.com", "EIZNMRJRZUXBDOKB")) # EIZNMRJRZUXBDOKB 是网易的授权码
handler4.setFormatter(fmt)
handler4.setLevel(logging.CRITICAL)
logger.addHandler(handler4)
作业-2: 自我学习 SMTPHandler 发送日志的邮件
作业-3: 练习代码+总结
作业-4: 中午默写的题抄2遍
综合代码:
#!/usr/bin/python3
# coding: utf-8
import logging
from logging import StreamHandler, FileHandler, Formatter
from logging.handlers import HTTPHandler, SMTPHandler, TimedRotatingFileHandler
# 创建日志记录器
logger = logging.getLogger("network")
logger.setLevel(logging.INFO)
# 创建处理器
handler1 = StreamHandler()
handler1.setLevel(logging.INFO)
# 方法参数的提示快捷键: ctrl+p
handler2 = FileHandler("network.log")
handler2.setLevel(logging.WARNING)
# 创建日志格式化对象
fmt = Formatter(fmt="[%(asctime)s %(name)s %(levelname)s ] %(message)s ",
datefmt="%Y-%m-%d %H:%M:%S")
# 设置处理器的日志格式化
handler1.setFormatter(fmt)
handler2.setFormatter(fmt)
# 将处理器添加到日志记录器上
logger.addHandler(handler1)
logger.addHandler(handler2)
# 创建每隔7天分隔的日志文件处理器
handler3 = TimedRotatingFileHandler("nt.log", when="S", interval=10)
handler3.setLevel(logging.ERROR)
handler3.setFormatter(fmt)
logger.addHandler(handler3)
if __name__ == "__main__":
# 开始使用日志记录器记录系统或程序产生消息
logger.debug("这是一个debug")
logger.info("这是一个正常的消息")
logger.warning("这是一个警告的消息")
logger.error("这是一个错误的消息")
logger.fatal("这是一个严重错误的消息")
1.7 单元测试
单元测试是测试中最小测试单位,它主要用于检测我们自己写的代码功能是否完整或健全。
import unittest
from unittest import TestCase
# 定义单元测试类
class LoggerTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("---测试类运行前的初始化----")
@classmethod
def tearDownClass(cls):
print("---测试类运行后的回收资源----")
def setUp(self):
print("---测试方法运行前的初始化----")
def tearDown(self):
print("---测试方法运行后的回收资源----")
def test_sum(self):
print("--test_sum---")
a = 100 + 20
self.assertEqual(a, 120, "Python运算操作存在问题")
def test_handler(self):
print("--test_handler---")
# self.assertFalse(os.path.exists("obj.log"), "文件已存在")
logger_.logger.info("测试日志文件处理器是否正常")
self.assertTrue(os.path.exists("obj.log"), "文件不存在")
with open("obj.log", encoding="utf-8") as f:
last_line = f.readlines()[-1]
self.assertTrue("测试日志文件处理器是否正常" in last_line, "记录未成功")
class DBTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.host = "localhost"
cls.port=3306
cls.conn = None
@classmethod
def tearDownClass(cls):
del cls.host
del cls.port
cls.conn = None
def test_a1_zconn(self):
time.sleep(2)
self.__class__.conn = 123 # 假设数据库连接对象,连接成功则不为None
self.assertIsNotNone(self.__class__.conn, "连接失败")
def test_a2_save(self):
self.assertIsNotNone(self.__class__.conn, "连接失败")
print("---执行保存数据库的操作--")
【注意】测试脚本的文件名必须是test_xxx.py
, 在测试类中每个测试方法是以test_
开头。测试方法名是按ASCII值从小到大排序的。
测试单元套件 TestSuite 后期给大家补上。
二、面向对象
主要概念: 类、 类对象、 类实例对象
类: 描述事物的类型,是多个实例对象共有的属性或方法的集合。
类对象: 类的对象,类本身也是对象,是由元类
创建的。
# type元类声明类示例
# 元类: 是创建类对象的类
Person = type("Person", (), {"name": "disen", "hi": lambda self: "hi, disen" })
类实例对象: 由类通过__new__()
方法来创建的实例,简称为对象。
2.1 对象的三大特征
2.1.1 封装性
抽象多个对象的共同属性特征和行为方法,然后封装进类的描述中。
# 声明水果类 - 实体类(数据类)
class Fruit():
def __init__(self, color, price, summary):
# self 表示调用此方法的实例对象
self.color=color
self.price = price
self.summary=summary
def __new__(cls, *args, **kwargs):
# 创建类实例对象的方法
return super().__new__(cls)
# 声明水果管理类 - 业务类(功能类)
class FruitManager():
def __init__(self, max_size=100):
self.max_size = max_size
self.current_size = 0
self.fruits = []
def add(self, fruit):
if self.current_size < self.max_size:
print("添加水果")
self.current_size += 1
self.fruits.append(fruit)
def delete(self, fruit_id):
del_fruit = None
for fruit in self.fruits:
if fruit.id == fruit_id:
del_fruit = fruit
if del_fruit:
self.current_size -= 1
self.fruits.remove(del_fruit)
print("删除水果")
def list(self):
for f in self.fruits:
print(f.id, f.color, f.price, f.summary)
class TestFruit(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.fruitmgr = FruitManager()
def test_fruit_obj(self):
f1 = Fruit("red", 10.0, summary="红苹果")
self.assertIs(f1.color, "red", "对象属性未初始成功")
def test_add(self):
fruitmgr = self.__class__.fruitmgr
fruitmgr.add(Fruit("red", 10.0, summary="红苹果"))
self.assertEqual(fruitmgr.current_size, 1)
def test_list(self):
fruitmgr = self.__class__.fruitmgr
fruitmgr.list()
day03: 中午默写题
- 简述一下with的作用?
作用: 将对象交由上下文管理器来管理, 当对象进入上下文时,会调用对象的__enter__()方法,当对象退出上下文时,会调用对象的__exit__()方法。
两个方法的结构:
__enter__(self) -> 返回对象
__exit__(self, excep_type, excep_value, excep_tb)
返回 True 表示内部处理异常
返回 False 或无返回,表示如果存在异常则会抛出(代码外部或解析器)
- 写出异常处理的四种结构
try-except
try-except-else
try-except-finally
try-except-else-finally
- 写出日志的五个等级Level名称
DEBUG -> INFO -> WARNING -> ERROR -> CRITICAL|FATAL
2.1.2 继承性
继承: 某一个可以继承一个或多个父类。
作用: 基于父类的方式,扩展子类的功能(具备父类一切特征和行为)。
class Apple(Fruit):
def __init__(self, color, price, summary, name, city):
super(Apple, self).__init__(color,price,summary)
self.name = name
self.city = city
def __repr__(self):
return self.__str__()
def __str__(self):
return f"{self.id} {self.name} {self.color} {self.city} {self.price} {self.summary}"
class Banana(Fruit):
pass
class MyOrderedDict(dict):
pass
if __name__ == "__main__":
a1 = Apple("green", 10, "青苹果", "红富士", "延安")
print(str(a1) + ": 消息")
# 多继承
class Person():
def __init__(self, name, age):
self.name = name
self.age = age
def say(self):
print(f"hi, {self.name}, {self.age} old years")
class Worker(Person):
def __init__(self, name, age, salary):
super().__init__(self, name, age)
self.salary = salary
# 重写父类的方法
def say(self):
super().say(self)
print(f"I am is Worker, Salary is {self.salary}")
# 多继承中存在MRO问题: 方法调用的排序算法
class Student(Worker, Person):
def __init__(self, name, age, sid, salary):
# super()调用父类的方法时,遵守MRO的顺序
# 强制使用特定的类来初始化, 但也要遵守MRO的顺序
# super().__init__(self, name, age)
Worker.__init__(self, name, age, salary)
self.sid = sid
# 重写父类的方法
def say(self):
super().say(self)
print(f"I am is Student, SID is {self.sid}")
2.1.3 多态性
多态: 多种形态, 在方法传参时,形参定义是某一种父类, 在调用方法时实参传入是父类或其子类的对象,要求子类重写父类的方法。在方法内部使用时,只限于父类的方法调用,但实际调用子类对应的重写父类的方法。
class Animal():
def hi(self):
# 当前方法即为抽象方法
raise Exception("子类必须实现")
def start(self):
raise Exception("子类必须实现")
def stop(self):
raise Exception("子类必须实现")
class Dog(Animal):
def hi(self):
print("hi, Dog")
def start(self):
print("Dog staring...")
def stop(self):
print("Dog stoped")
def daemon_home(self):
print("Dog 守护家")
class Cat(Animal):
def hi(self):
print("hi, Cat")
def start(self):
print("Cat staring...")
def stop(self):
print("Cat stoped")
def mouse(self):
print("抓老鼠")
def run(animal: Animal) -> None:
# 多态的体现-animal形参的类型是Animal (统一的形态)
animal.hi()
animal.start()
animal.stop()
# 判断对象的类型
if isinstance(animal, Dog):
animal.daemon_home()
elif isinstance(animal, Cat):
animal.mouse()
dog = Dog()
cat = Cat()
# run()方法中可以传入多种形态的对象
run(dog) # 多态的体现, 传入实参是Dog类实例对象
run(cat) # 多态的体现,传入实参是Cat类实例对象
a = Animal()
run(a) # 报异常, 不能传入Animal对象,必须是它的子类实例对象
2.2 类的魔术方法
类的魔术方法即是object类的方法, 以__xx__
形式出现的,部分的魔术方法跟内置的函数有直接的关系,如str(obj) 调用obj对象的__str__
方法。
2.2.1 算术运算方法
- 减法 __sub__
+ 加法 __add__
* 乘法 __mul__
/ 除法 __truediv__
// 整除 __floordiv__
% 取余 __mod__
** 平方 __pow__
# 设计Num类,实现算术运算
class Num():
def __init__(self, value):
self.value = value
def __add__(self, other):
return self.value + (other.value if isinstance(other, Num) else int(other))
def __sub__(self, other):
return self.value - (other.value if isinstance(other, Num) else int(other))
def __mul__(self, other):
return self.value * (other.value if isinstance(other, Num) else int(other))
def __truediv__(self, other):
return self.value / (other.value if isinstance(other, Num) else int(other))
def __floordiv__(self, other):
return self.value // (other.value if isinstance(other, Num) else int(other))
def __mod__(self, other):
return self.value % (other.value if isinstance(other, Num) else int(other))
案例用法
# numpy->ndarray类相似
n1 = Num(20)
print(n1 + 100) # 120
n2 = Num(200)
print(n1*n2, n1+n2, n2-n1, n2/n1)
扩展: 关系运算的魔术
> __gt__
< __lt__
== __eq__
>= __ge__
<= __le__
!= __ne__
2.2.2 自省方法
自省: 自我检查, 常用方法如下:
- dir(obj) 查看对象或模块的方法、类、属性等信息
- help(obj)/hash(obj)/next(obj)/str(obj)/id(obj)/iter(obj)
- type() 查看对象类型的
- isinstance(obj, Dog) 判断obj是否属于XClass类
- issubclass(cls, Animal) 判断cls是否为Animal的子类
- getattr(obj, "属性或方法名") 获取obj中的指定的属性或方法
- hasattr(obj, "属性或方法名") 判断obj是否包含指定的属性或方法
- setattr(obj, "属性或方法名", value) 动态添加对象的属性或方法
class Person():
def __init__(self, id, name):
self.id = id
self.name = name
def play(self):
print(self.id, self.name)
p = Person(1001, "disen")
print(p.id)
print(getattr(p, "id"))
setattr(p, "level", 100)
print(p.level)
print(hasattr(p, "level")) # 返回 True
2.2.3 上下文方法
class DB:
def __enter__(self):
return self
def __exit__(self, excep_type, excep_val, excep_tb):
if excep_type:
pass
return True # 如果有异常,则内部处理
2.2.4 动态属性的方法
# 类实例对象作为其它类的属性使用时,当获取或修改属性值时,会调用以下方法:
__get__(self, instance, owner) 获取属性值
__set__(self, instance, value) 修改属性值
# 实例对象的属性修改或获取时,会调用以下方法:
__getattribute__(self, item) 获取对象属性
__setattr__(self, key, value) 修改对象属性
示例:
#!/usr/bin/python3
# coding: utf-8
class SonyPrinter():
def __init__(self, val):
self.val = val
def __get__(self, instance, owner):
print("-SonyPrinter ___get__--")
# instance -> Printer 实例
# owner -> Printer类
# print(self, instance, owner)
return self.val
def __set__(self, instance, value):
print("-SonyPrinter ___set__--")
# print(instance)
self.val = value
class Printer(object):
sony = SonyPrinter("9800")
def __init__(self, port=9900):
print("--__init__--")
self.port = port
def __getattribute__(self, item):
print("-__getattribute__--", item)
return super(Printer, self).__getattribute__(item)
def __setattr__(self, key, value):
print("-___setattr__--")
super(Printer, self).__setattr__(key, value)
if __name__ == "__main__":
printer = Printer()
print(printer.port) # 触发 __getattribute__()
print("*" * 20)
print(getattr(printer, "port")) # 触发 __getattribute__()
print("*" * 20)
print(printer.sony)
printer.sony = 9700
print(printer.sony)
作业:
1. 设计银行类、银行卡类、用户类
2. 设计以上三个类之间的业务管理类:
1) 用户可以在银行开户
2) 用户存钱、取钱、查询
3) 用户可以注销
4) 对以上功能实现单元测试通过
3. 练习+总结
#!/usr/bin/python3
# coding: utf-8
import uuid
import collections as c
from datetime import datetime
class Bank():
"""
信息: 唯一编号、名称、地址、所在城市、联系电话
关系:
1) 一个银行下存在多个用户
2) 一个用户拥有多个银行卡
3) 一个银行存在多张银行卡
"""
def __init__(self, name, address, city, phone):
self.name = name
self.address = address
self.city = city
self.phone = phone
self.id = uuid.uuid4().hex
self.users = c.OrderedDict()
self.cards = c.OrderedDict()
def add_user(self, user):
self.users[user.id] = user
def add_card(self, card):
self.cards[card.card_id] = card
class Card():
"""
信息: 卡号、开户银行、持卡或所属用户、余额、取款密码、开卡时间
"""
def __init__(self, bank_id, user_id, create_date, balance, password):
self.bank_id = bank_id
self.user_id = user_id
self._create_date = create_date
self._balance = balance
self.__password = password
self.card_id = uuid.uuid4().hex # 银行卡号
def __str__(self):
return f"{self.card_id} {self._create_date}"
def check_password(self, password):
return self.__password == password
def add_money(self, money):
# 存钱
self._balance += money
def sub_money(self, money):
# 取款
self._balance -= money
def get_balance(self):
# 查询余额
return self._balance
def modify_password(self, old_pwd, new_pwd):
if self.__password == old_pwd:
self.__password = new_pwd
return True
return False
class User():
def __init__(self, name, phone, user_card):
self.name = name
self.phone = phone
self.user_card = user_card
self.id = uuid.uuid4().hex
self.cards = {}
def add_card(self, card: Card):
self.cards[card.card_id] = card
def remove_card(self, card):
# 删除字典的key和value
del self.cards[card.card_id]
class BankSystemService():
def __init__(self):
self.message = """
欢迎进入银行综合管理系统
1. 开户
2. 登录
3. 退出
"""
# 默认生产三家银行
self.banks = c.OrderedDict()
self.init_bank()
def init_bank(self):
bank1 = Bank("中国银行", "团结南路", "西安", "029-10019992")
bank2 = Bank("工商银行", "电子一路", "西安", "029-10019392")
bank3 = Bank("招商银行", "科技四路", "西安", "029-10019790")
self.banks[bank1.id] = bank1
self.banks[bank2.id] = bank2
self.banks[bank3.id] = bank3
def start(self):
while True:
opt = int(input(self.message))
if opt == 3:
break
elif opt == 1:
bank_opt = self.select_bank()
if bank_opt:
# 获取选择银行
bank = self.banks[list(self.banks.keys())[bank_opt - 1]]
print(f"欢迎进入 {bank.name}, {bank.address}")
username = input("您的用户名: ")
password = input("您的口令: ")
user_card = input("您的身份证号: ")
phone = input("您的手机号: ")
user = User(username, phone, user_card)
card = Card(bank.id, user.id, datetime.now().strftime("%Y-%m-%d"), 0, password)
user.add_card(card)
bank.add_card(card)
bank.add_user(user)
print("开户成功", card)
elif opt == 2:
bank_opt = self.select_bank()
if bank_opt:
# 获取选择银行
bank = self.banks[list(self.banks.keys())[bank_opt - 1]]
print(f"欢迎进入 {bank.name}, {bank.address}")
card_id = input("您的银行卡号: ")
password = input("银行卡取款密码: ")
if card_id in bank.cards:
card = bank.cards[card_id]
if card.check_password(password):
info = """
登录成功
1. 查询余额
2. 取款
3. 存款
0. 退卡
"""
card_opt = int(input(info))
if card_opt == 0:
break
else:
# 如何设计允许输入三次口令的机会
print("密码不正确,正在退出银行卡。")
print("感谢您的使用,欢迎下次再来")
def select_bank(self):
bank_msg = "\n".join([
f"{index+1} {bank.name}"
for index, bank in enumerate(self.banks.values())
])
bank_msg += "\n 0 返回"
bank_opt = int(input(bank_msg))
return bank_opt
if __name__ == "__main__":
BankSystemService().start()
day04: 中午默写
- 简述正则模块的match、search和findall的区别
match: 验证信息或数据的完整性、合法性, 如果验证成功后,返回非None,反之返回None
search和findall: 用于搜索正则匹配的数据
1) search只匹配第一个, findall全部匹配
2) search匹配的数据,通过group()或groups()返回, findall返回一个list列表。
- 写出常见的内置函数(至少10个)
type() dir() help() hash() id()
int() float() tuple() str() list() set() dict() bool()
isinstance() issubclass() hasattr() getattr() setattr()
iter() chr() ord() zip() eval() exec() repr()
- 写出object类中的魔术方法(至少10个)
__new__
__init__
__str__
__doc__
__repr__
__sub__
__add__
__mul__
__truediv__
__floordiv__
__mod__
__pow__
__lt__
__gt__
__le__
__ge__
__eq__
__ne__
__enter__
__exit__
__get__
__set__
__getattribute__
__setattr__
2.3 类的设计模式
设计模式: 全称Design Pattern, 是一种良好的面向对象编程的习惯,用于解决一些实际的业务问题。在Java的编程思想中,存在23种设计模式,分别解决不同的业务问题。
2.3.1 单例 Singleton
解决问题: 无论类实例化多少次,只有一个实例对象。
class Singleton:
def __init__(self, name, phone):
self.name = name
self.phone = phone
def save(self):
print(self.name, self.phone)
def __new__(cls, *args, **kwargs):
if not hasattr(cls, "instance"):
cls.instance = super().__new__(cls)
return cls.instance
s1 = Singleton("disen", "17791692095")
s2 = Singleton("jack", "17791692094")
s3 = Singleton("lucy", "17791692093")
s1.save()
s2.save()
s3.save()
2.3.2 代理 Proxy
解决问题: 间接性调用目标功能, 要求代理必须具备目标的功能(即目标类和代理类具有相同的父类)。
class Store:
def add(self, goods):
raise Exception("子类必须重写")
def sale(self, goods):
raise Exception("子类必须重写")
class DellStore(Store):
def add(self, goods):
print("Dell add ", goods)
def sale(self, goods):
print("Dell sale ", goods)
# 声明代理类
class StoreProxy(Store):
def __init__(self, store):
# store 是代理的目标
self.store = store
def add(self, goods):
print("--proxy add--")
self.store.add(goods)
def sale(self, goods):
print("---proxy sale---")
self.store.sale(goods)
store = StoreProxy(DellStore())
store.sale("Printer-1900")
2.3.3 工厂 Factory
解决问题: 按需创建类对象, 即根据提供的数据和目标生产出相应的类实例对象。
#!/usr/bin/python3
# coding: utf-8
class Dog():
def __init__(self, name, color):
self.name = name
self.color = color
def hi(self):
print(self.name, self.color, "旺旺旺")
class Cat():
def __init__(self, name, color, food):
self.name = name
self.color = color
self.food = food
def hi(self):
print(self.name, self.color, self.food, "喵喵喵")
class Pig():
def __init__(self, name, color, weight):
self.name = name
self.color = color
self.weight = weight
def hi(self):
print(self.name, self.color, self.weight, "哼哼哼")
class AnimalFactory():
def __init__(self):
self.regist_classes = {}
def regist(self, name, cls):
self.regist_classes[name] = cls
def get_animal(self, name, *args, **kwargs):
if name in self.regist_classes:
return self.regist_classes[name](*args, **kwargs)
raise Exception(f"工厂类未注册{name} 的类")
if __name__ == "__main__":
factory = AnimalFactory()
factory.regist("Dog", Dog)
factory.regist("Cat", Cat)
factory.regist("Pig", Pig)
dog = factory.get_animal("Dog", "大黄", color="黄色")
dog.hi()
cat = factory.get_animal("Cat", "小喵", "白色", "猫粮")
cat.hi()
2.3.4 构建器 Builder
解决问题: 复杂问题最小化,如创建一个复杂对象时,分解成小模块。
实现方式: 【建议】使用内部类, 每一个小模块(构建方法)返回构建器类的对象, 最后一步返回目标类对象。
class CarBuilder():
class Car():
def __init__(self, name):
self.name = name
def run(self):
print(self.name, self.color, self.wheel, "runing")
def create(self, name):
self.car = self.Car(name)
return self
def color(self, color):
self.car.color=color
return self
def wheel(self, wheel):
self.car.wheel = wheel
return self
def build(self):
# 最后一步: 出厂
return self.car
car = CarBuilder().create("宝马").color("red").wheel("570") \
.build()
car.run()
2.3.5 装饰器
解决的问题:不改变现有的代码结构,扩展新的业务功能。
2.3.5.1 装饰器函数
# 实现某一个类的单例的装饰函数
def Singleton(cls):
def wrapper(*args, **kwargs):
# print(cls, args, kwargs)
if not hasattr(cls, "instance"):
cls.instance = cls(*args, **kwargs)
# cls.instance.__init__(*args, **kwargs)
return cls.instance
return wrapper
@Singleton
class Person:
def __init__(self, name, phone):
self.name = name
self.phone = phone
def save(self):
print(self.name, self.phone)
s1 = Person("disen", "17791692095")
s2 = Person("jack", "17791692094")
s3 = Person("lucy", "17791692093")
s1.save()
s2.save()
s3.save()
print(id(s1) , id(s2), id(s3))
2.3.5.2 带参的装饰器函数
#!/usr/bin/python3
# coding: utf-8
# 设计缓存功能的装饰器
import threading
import time
def clear_cache(obj, timeout):
time.sleep(timeout)
del obj.cache_data
def cache(timeout):
print("---cache--", timeout)
def outer_wrapper(func):
def wrapper(*args, **kwargs):
if not hasattr(func, "cache_data"):
ret = func(*args, **kwargs)
# 将ret数据保存
func.cache_data = ret
threading.Thread(target=clear_cache, args=(func, timeout)).start()
return func.cache_data
return wrapper
return outer_wrapper
@cache(timeout=3)
def load_data(filename):
# 从txt中读取文本内容
print("---load data---", filename)
with open(filename) as f:
content = f.read()
return content
if __name__ == "__main__":
for _ in range(50):
content = load_data("nt.log")
print(content)
time.sleep(0.5)
day04作业:
1. 重构银行系统的管理类,将其设计为单例模式
2. 基于构建器设计模式, 重构银行开户的业务流程
3. 基于Redis缓存服务,将cache装饰器函数的缓存功能修改为redis存储。
4. 总结+练习
2.3.5.3 装饰器类
装饰器的四种写法:
@函数名
@函数名(参数)
@类名
@类名(参数)
装饰器类: 重写类的__call__
方法, 表示类实例对象可以作为函数被调用。
class A:
def __call__(self, *args, **kwargs):
# 作用: 类实例对象可以作为函数被调用
pass
a = A()
a()
#!/usr/bin/python3
# coding: utf-8
# 实现计算运行时长的装饰器类
import random
import time
class Runtime():
def __init__(self, func):
print("---初始化装饰器类的实例对象,并接收被装饰的函数--")
self.func = func
def __call__(self, *args, **kwargs):
print("当前装饰器实例对象被作为函数调用")
# 扩展功能: 计算func函数运行的时长
start_time = time.time()
ret = self.func(*args, **kwargs)
delta_seconds = time.time() - start_time
print("运行", self.func.__name__,"时长: ", delta_seconds)
return ret
@Runtime
def read_data(filename):
with open(filename, encoding="utf-8") as f:
for line in f:
print(line, end="")
time.sleep(random.uniform(0.01, 0.05))
print("-over-")
if __name__ == "__main__":
read_data("../day02/codes.txt")
2.3.5.4 带参的装饰器类
作用: 为装饰器指定一些额外的条件,如缓存时间、运行时长限制、调用方法权限等。
class A:
def __init__(self, username):
pass
def __call__(self, func):
def wrapper(*args, **kwargs):
pass
return wrapper
@A("root")
def cp(src_filepath, dst_filepath):
# 复制文件
pass
#!/usr/bin/python3
# coding: utf-8
import random
# 设计带有参数的装饰类: 验证权限
class CheckQx():
def __init__(self, qx_name):
self.qx_name = qx_name
def __call__(self, func):
def wrapper(*args, **kwargs):
qx_v = qxs[self.qx_name]
print("当前拥有的权限值", qx)
if qx & qx_v == qx_v:
ret = func(*args, **kwargs)
return ret
raise Exception(f"当前无{self.qx_name}权限")
return wrapper
qxs = {"r": 4, "w": 2, "x": 1}
# 随机产生权限值
qx = random.randint(1, 7) # r -> 4, w-> 2 x -> 1
@CheckQx("w")
def copy(src_path, dst_path):
with open(src_path, "rb") as src_file:
content = src_file.read() # 读取字节码数据
with open(dst_path, "wb") as dst_file:
dst_file.write(content)
print("copy 成功")
if __name__ == "__main__":
import time
copy("../day02/nt.log", f"nt{time.time()}.log")
三、多任务编程
多任务: 并行运行的多个任务
场景: 运行一个程序A(QQ), 同时运行另一个程序B(微信)…
多进程: 在操作系统中,同时运行多个应用程序(进程)。
多线程: 在一程序(进程)中同时运行多个任务(子进程或子线程)。
协程: 在一个线程中,使用事件模型(select/poll/epoll-> IO多路复用),自我调度方法执行的任务。在Python中独一无二,又称之为“微线程”。
进程: 由操作系统分配程序启动时必要的内存资源、进程描述和程序代码三部分组成的。当进程的资源分配完成后,则会启动线程(Main主线程)来执行程序代码。由于系统安全问题,进程被设计为一个独立空间,避免程序崩溃时,影响其它进程。
3.1 多线程threading
重要常识: 多线程之间是共享进程中的内存资源。
3.1.1 线程的本地变量
解决问题: 线程资源私有化。
from threading import local # 理解为一个dict,key是当前线程对象
import threading
name = "disen"
def set_name(name):
global name
print(threading.current_thread().name, "name->", name)
name = name
print(name)
def main():
t1 = threading.Thread(target=set_name, args=("jack",))
t1.start() # 启动后准备进入就绪
t2 = threading.Thread(target=set_name, args=("lucy",))
t2.start()
t1.join()
t2.join()
print("MainThread", name)
#!/usr/bin/python3
# coding: utf-8
# 线程变量私有化
import threading
from threading import local
local_val = local() # 线程的本地变量
local_val.name = "disen" # 当前线程添加一个name变量
def set_name(uname):
global local_val
# local_val.name -> None
# print(threading.current_thread().name, "name->", local_val.name)
local_val.name = uname
print(threading.current_thread().name, "name->",local_val.name)
def main():
t1 = threading.Thread(target=set_name, args=("jack",))
t1.start() # 启动后准备进入就绪
t2 = threading.Thread(target=set_name, args=("lucy",))
t2.start()
t1.join()
t2.join()
print("MainThread", local_val.name)
if __name__ == "__main__":
main()
day05: 中午默写
1.简述一下设计模式的作用和代理设计模式解决什么问题
设计模式的作用: 为了解决业务中实际问题,根据编程思想和特性来设计类的结构,可以理解为实现某种业务功能的代码设计规范。
代理设计模式的解决问题: 解决间接性调用目标方法, 借助于接口规范(抽象类-代理类和目标类的父类)来实现的。另外,在创建代理类时,需要提供目标类的实例对象。
- 简述一下对象的多态特性,并尝试举例说明
对象的多态特性: 对象具有某一种特性的多种形态(身份或角色)。
举例说明: 一个对象具有人的特性, 但是在不同的场景下具有不同身份,如工人、学生、父亲、儿子等。
- 写出str对象的常用方法
lower() 小写字母转换
upper() 大写字母转换
find() 查找子字符串在字符串中的索引下标,从0开始,未查找到返回-1
rfind() 从右边每一个开始查找
index() 未查到,则抛出ValueError
rindex()
isalpha()
isascii()
istitle()
isdigit()
islower()
isupper()
isnumeric()
encode(encoding="utf-8")
split() 分隔
replace() 替换
strip() 删除两边空白或指定的字符
rjust()/ljust() 右或左填充指定长度的指定的字符
startswith() 以指定的字符开始
endswith() 以指定的字符结尾
title() 首字符大写
capitalize() 单词的首字符大写
swapcase()
join() 以指定的字符连接可迭代对象中的每一项
partition(sep) 通过分隔符将字符内容转成三部分的元组。
# 常用的str方法
join()
find()
index()
replace()
encode()
split()
endswith()
startswith()
rjust()
format() # f"{}"
# 扩展字符串格式化
"hi %(name)s old years: %(age)s " % {"name": "disen", "age": 20}
3.1.2 线程条件变量
作用: 当线程操作的数据不满足条件时,则会停止当前线程的运行,等待其它线程唤醒(数据可以进行操作),常用于生产者消费者的设计模式中。
from threading import Condition
from threading import Thread, Condition
import uuid
# 生产者线程类
class Producer(Thread):
def __init__(self, data: list, cond: Condition):
self.data = data # 数据
self.cond = cond # 条件
super().__init__()
def run(self):
while True:
with self.cond:
if len(self.data)<100:
self.data.append(("面包", uuid.uuid4().hex ))
self.cond.notify()
else:
try:
self.cond.wait(timeout=30)
except:
break
print(f"Producer {threading.current_thread().name} over")
# 消费者线程类
class Consumer(Thread):
def __init__(self, data: list, cond: Condition):
self.data = data
self.cond = cond
super().__init__()
def run(self):
while True:
with self.cond:
if len(self.data)>0:
d = self.data.pop()
print(f"Consumer {threading.current_thread().name}: {d}")
self.cond.notify()
else:
try:
self.cond.wait(timeout=30)
except:
break
print(f"Consumer {threading.current_thread().name} over")
data = []
# Condition支持上下文的使用:
# __enter__ -> _lock.acquire() 加锁, __exit__-> _lock.release()
# 条件变量对象的方法:
# wait(timeout)等待, notify(n=1) 唤醒
cond = Condition() notify_all()
def create(cls,*args, size=3):
return [ cls(*args) for _ in range(size) ]
def start(threads):
for t in threads:
t.start()
def join(threads):
for t in threads:
t.join()
# 开启生产者和消费者系统
print("--runing--")
# 创建三个生产者线程
pthreads = create(Producer, data, cond)
start(pthreads)
# 创建5个消费者线程
cthreads = create(Consumer, data, cond, size=5)
start(cthreads)
join(pthreads)
join(cthreads)
print("---over--")
# 扩展: 数据安全锁(线程锁)
#!/usr/bin/python3
# coding: utf-8
from threading import Thread, Lock, current_thread
import random
import time
lock = Lock()
data = []
def insert(n):
thread_name = current_thread().name
# lock.acquire() # 加锁
with lock:
print(f"{thread_name}-> insert {n} => {data}")
time.sleep(0.5)
data.append(n)
print(f"{thread_name}-> insert {n} => {data}")
# lock.release() # 解锁
if __name__ == "__main__":
t1 = Thread(target=insert, args=(5, ))
t2 = Thread(target=insert, args=(15,))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"{current_thread().name} => {data}")
3.1.3 线程队列
queue.Queue队列实现多线程的数据共享。
from queue import Queue
# 基本操作
q = Queue(maxsize=0) # maxsize=0 不限制大小,无限个
q.empty() # 是否为空队列, True即是空的,False非空
q.full() # 是否为满队列, True即是已满
q.get(block=True,timeout=None) # block为True表示队列中存在数据时,则返回,不存在数据时,一直等待,如果存在timeout超时时间,则在超出等待时间范围外,抛出 Empty。如果block为False时,意味着需要立即获取到数据,如果没有数据,则抛出Empty,此时timeout失效。
q.get_nowait()
q.put(item, block=True, timeout=None)
q.put_nowait(item)
q.qsize() # 获取当前队列的大小
#!/usr/bin/python3
# coding: utf-8
from queue import Queue as Q
from threading import Thread as T, current_thread as ct
import time
import hashlib
# 实现下载和解析的两个线程类
# 使用两个队列
download_queue = Q(maxsize=1000) # 存放的url网站地址
parser_queue = Q(maxsize=1000) # 下载url之后content和url及响应状态码
class DownloadThread(T):
def __init__(self, dq: Q, pq: Q):
super(DownloadThread, self).__init__()
self.download_q = dq
self.parser_q = pq
def run(self):
while True:
try:
url = self.download_q.get(timeout=10) # 超时则会抛出Empty异常
# 下载功能
time.sleep(2) # 模拟下载
# 下载成功
self.parser_q.put((f"hi, { hashlib.md5(url.encode("utf-8")).hexdigest()}",url, 200 ))
except:
break
print(f"--{ct().name}--over--")
class ParserThread(T):
def __init__(self, pq: Q):
super(ParserThread, self).__init__()
self.parser_q = pq
def run(self):
while True:
try:
content, url, status_code = self.parser_q.get(timeout=10)
print("Parser", content, url, status_code)
except:
break
print(f"--{ct().name}--over--")
if __name__ == "__main__":
downloader = DownloadThread(download_queue, parser_queue)
parser = ParserThread(parser_queue)
downloader.start()
parser.start()
while True:
url = input("请输入下载的网站: ")
if url == "0":
break
download_queue.put(url)
downloader.join()
parser.join()
print("Bye Bye")
# 作业-1: 将生产者消费者示例的数据类型修改为线程队列
3.2 多进程multiprocessing
多进程: 多个并行运行的进程(父子进程、并级进程)
大Python的程序中,创建的进程属性子进程。
3.2.1 创建进程的方式
在Python中创建进程方式:
1. os.fork创建, 只支持Linux/Unix系统
2. os.system(cmd) 创建子进程来执行cmd描述的操作系统的命令,但无法获取命令执行的结果
3. os.popen(cmd) 创建子进程来执行cmd命令,可以获取命令执行的结果。引用subprocessing.Popen类。
4. multiprocessing.Process类创建子进程, 需要重写run方法,来实现自己的业务。
c:/windows/system32/drivers/etc/hosts
# fork的示例
# process_fork.py
import os
from multiprocessing import current_process as cp
import time
process_id = os.fork() # 返回0是子进程, 非0是父进程中执行(返回是子进程的pid)
for i in range(1, 10):
if process_id == 0:
# 子进程
print("---", cp().pid, i**2)
else:
# 父进程: process_id是子进程的ID
print("*"*20, cp().pid, process_id, i*5)
time.sleep(5)
# process_system.py
import os
cmd = "ip addr"
ret = os.system(cmd)
print("-->", ret)
chmod +x process_system.py
ln -s /root/python_codes/day06/process_system.py /usr/local/bin/ipa
# popen的使用
import os
cmd = "ip addr"
ret = os.popen(cmd) # 返回一个流对象
print("-"*20)
# lines = ret.readlines() # ret.read()
for line in ret:
print(line, end="") # 获取网卡名或IP地址
import os
import re
ret = os.popen("ip addr")
net_name = None
for line in ret:
net_name_search = re.search(r"\d:\s+?(\w+?):", line)
if net_name_search:
net_name, = net_name_search.groups()
ip_search = re.search(r"\s+?inet (.+?)/\d", line)
if ip_search:
ip, = ip_search.groups()
print(net_name+":"+ip)
# print(line, end="")
# scp命令(可以上传文件到服务器, 可以从服务器下载文件)
一、 上传本地文件到服务器
scp [-r] 本地文件或目录 root@lydserver:服务器的文件或目录的路径
> scp -r docs root@lydserver:/usr/src
二、从服务器中下载文件到本地
scp [-r] root@lydserver:服务器的文件或目录的路径 本地文件或目录
> scp root@lydserver:/usr/src/docs .
day06: 中午默写
- 简述
__init__
,__new__
和__call__
三者之间的区别
__init__ 是初始化实例对象的,是__new__返回对象之后调用的。
__new__ 是创建实例对象的,它是一个静态方法。调用父类时,只传入一个cls对象即可。
__call__ 当实例对象被作为函数调用时,对象会调用__call__()用法,这种机制一般会用于装饰器类中。
2.简述线程的本地变量local和条件变量Condition的作用
线程的本地变量local: 私有化当前线程的变量或数据,同一进程中的多个线程不会共享数据。可以看作为一个dict对象,key是当前线程实例对象的id(内存的首地址)方法转换的数值->id(thread),value是local()返回的实例对象动态添加的属性,如:
values = local() # 本地变量的实例化对象
values.name = "disen"
线程的条件变量Condition:
线程间同步数据的一种方式, 主用于生产者消费者的设计模式中。当某数据不满足条件时,可以让当前线程变成等待 wait()方法, 如果条件满足,除了正常的数据操作之外,需要通过notify()或notify_all()方法唤醒其它的等待线程。需要注意是,对同步进行操作时,需要通过条件变量对象进行加锁(with上下文)操作。
3.写出logging模块的常用Handler处理器类
- logging.StreamHandler 输出流
- logging.FileHandler 输出到文件
- logging.handlers.SMTPHandler 通过SMTP邮件协议,向指定的邮箱发送日志信息
- logging.handlers.HTTPHandler 通过HTTP Web协议,向指定WEB服务器上传日志信息
- logging.handlers.TimedRotatingFileHandler 时间分隔的日志文件处理器。
3.2.2 Socket多进程通信
from multiprocessing import Queue, Process
import socket
class DonwloadServer(Process):
def __init__(self, q):
self.q = q # 实现当前进程与父进程之间的通信
super(DonwloadServer, self).__init__()
def run(self):
print("---已启动下载服务器---")
# 获取socket的连接
server = socket.socket(socket.AF_UNIX) # Window: SYSTEM
server.bind("downloader.socket")
server.listen()
# 作业-2: 设计ClientThread线程,管理客户的连接及下载需求
# 作业-3: 基于设计模式的方式,尝试优化代码
client, address = server.accept() # 阻塞方法,等待客户端进程的连接
# client.send("")
while True:
url = client.recv(8*1024)
print(url) # 接收的是字节数据
if url == b"0":
break
self.q.put(url.decode("utf-8"))
client.close()
server.close()
# 删除连接socket文件
os.remove("downloader.socket")
print("Downloader Server Over")
# 启动服务器进程
task_queue = Queue() # 进程间通信的Queue队列
server = DonwloadServer(task_queue)
server.start()
server.join()
# 客户端进程
#!/usr/bin/python3
# coding: utf-8
import socket
client = socket.socket(socket.AF_UNIX)
client.connect("downloader.socket")
while True:
url = input("请输入下载的站点:")
client.send(url.encode("utf-8"))
if url == "0":
break
client.close()
print("--close client--")
# socket实现多进程之间的通信
- 定义*.socket文件,用于进程间通信的接口
- server=socket.socket(socket.AF_UNIX) # 基于Linux的进程间通信机制
- server.bind("xxx.socket")
- server.listen() # 监听
- client = server.accept() 接收其它进程(或网络)连接
- data = client.recv(1024) # 开始接收客户端进程的数据(字节数据)
- client.send(b"") 向客户端发送字节数据
- client.close() # 关闭连接
- client = socket.socket(socket.AF_UNIX)
- client.connect("xxx.socket")
- client.send(b"")
- data = client.recv(1024)
# 【扩展】Socket用于TCP/IP的网络通信时
TCP三次握手:
Client: SYN -> Server
Client: <- SYN+ACK Server
Client: ACK -> Server
TCP四次挥手:
Client: FIN -> Server
Client: <- ACK Server
Client: <- FIN Server
client: ACK -> Server
3.2.3 Queue进程通信队列
from multiprocessing import Queue, Process
#!/usr/bin/python3
# coding: utf-8
# 证明: 进程之间的内存是相互独立的
import multiprocessing as mp
num = 100
def add(n): # 在子进程中执行的目标函数
global num # 子进程中会将主进程的num复制子进程所在的内存块中
num += n
print(mp.current_process().name, num)
def sub(n): # 在子进程中执行的目标函数
global num
num -= n
print(mp.current_process().name, num)
if __name__ == "__main__":
p1 = mp.Process(target=add, args=(10, ))
p1.start()
p1.join()
p2 = mp.Process(target=sub, args=(50, ))
p2.start()
p2.join()
# 每个进程的内存是相互独立的,因此无论子进程如何修改num,但是不会修改主进程的num
print("MainProcess: ", num)
#!/usr/bin/python3
# coding: utf-8
# 通过Queue实现进程间的数据通信
import multiprocessing as mp
def add(n, q): # 在子进程中执行的目标函数
# global num # 子进程中会将主进程的num复制子进程所在的内存块中
num = q.get()
num += n
print(mp.current_process().name, num)
q.put(num)
def sub(n,q): # 在子进程中执行的目标函数
# global num
num = q.get()
num -= n
print(mp.current_process().name, num)
q.put(num)
if __name__ == "__main__":
queue = mp.Queue() # 实现多进程之间的数据通信
queue.put(100)
p1 = mp.Process(target=add, args=(10, queue))
p1.start()
p2 = mp.Process(target=sub, args=(50, queue))
p2.start()
p1.join()
p2.join()
num = queue.get()
print("MainProcess: ", num)
3.2.4 Pipe进程通信管道
#!/usr/bin/python3
# coding: utf-8
import multiprocessing as mp
def receive(conn):
while True:
msg = conn.recv()
if msg == b"0":break
print(mp.current_process().name,"接收", msg)
def send(conn):
for i in range(20):
msg = "msg> %s" % i
conn.send(msg.encode("utf-8"))
print(mp.current_process().name, "发送", msg)
conn.send(b"0")
if __name__ == "__main__":
# duplex=True 全双工管道
# duplex=False 半双工管道, conn1 只收, conn2只发
conn1, conn2 = mp.Pipe(False)
p1 = mp.Process(target=receive, args=(conn1, ))
p2 = mp.Process(target=send, args=(conn2, ))
p1.start()
p2.start()
p1.join()
p2.join()
print("---over---")
#!/usr/bin/python3
# coding: utf-8
import multiprocessing as mp
import time
import random
def download(conn):
while True:
url_bytes = conn.recv() # 接收conn2的下载任务URL数据
if url_bytes == b"0": break
url = url_bytes.decode("utf-8")
time.sleep(3)
data = str((url, "<html><head></head><body>hi,baidu</body></html>"))
conn.send(data) # 向conn2发送下载完成的数据
def parse(conn):
# 先接收下载任务进程中的数据
while True:
data = conn.recv()
print("解析接收下载数据:", data)
time.sleep(2)
flag = random.random()
print(flag)
if flag > 0.3:
params = "?%.5f" % flag
conn.send(b"http://google.com"+params.encode("utf-8"))
else:
conn.send(b"0")
break
if __name__ == "__main__":
# tasks = mp.Queue()
# conn1和conn2 可以发送也可接收数据
conn1, conn2 = mp.Pipe()
p1 = mp.Process(target=download, args=(conn1, ))
p2 = mp.Process(target=parse, args=(conn2,))
p1.start()
p2.start()
# 发布任务
conn2.send(b"http://www.baidu.com")
p1.join()
p2.join()
print("--over--")
3.3 协程 asyncio
线程和进程由CPU调度, 协程由用户(开发者或程序本身)来调度的。
协程的发展阶段:
- 每一阶段: yield和send的生成器的关键字来实现用户自我调度
- 第二阶段: @asyncio.coroutine 和 yield from
- 第三阶段: async 和 await 两个关键
【注】第二和第三阶段使用asynio模块, 协程的对象必须事件循环对象中运行。
def fib(n):
print("---fib---")
index = 0
x, y = 0, 1
while index < n:
print("---index---", index)
x, y = y, x+y
time_ = yield y # 向外部程序提供一个y数值,等待接收time_数值
print("--time--", time_)
time.sleep(time_)
index += 1
def fib_test():
f = fib(10) # 返回一个生成器, 并不行执行其中的代码
print("--main--")
try:
n = next(f) # 第一次执行时,执行到 fib函数中的"yield y"位置
while True:
print("---n---", n)
n = f.send(random.uniform(0.5, 2)) # 向f发送时间之后,会阻塞到f返回一个数据
except StopIteration as e:
print("--Over--")
if __name__ == "__main__":
fib_test()
#!/usr/bin/python3
# coding: utf-8
import asyncio
import os
# 任务: 读取files目录下所有txt文件,将文件中的每行的数值提取后相加,返回文件名和它的数值和
import time
import random
"""
def filter(callback, items):
for item in items:
if callback(item):
yield item
"""
@asyncio.coroutine
def filter_txt(dir_path):
print("scan file path: ", dir_path)
files = filter(lambda filename: filename.endswith(".txt"), os.listdir(dir_path))
for filename in files:
# yield from 其它协程: 当前的协程阻塞,等待其它协程完成任务
n = yield from read_txt_number(os.path.join(dir_path, filename))
print(filename, n)
@asyncio.coroutine
def read_txt_number(filepath):
with open(filepath) as f:
sum_ = 0
for line in f:
if line.isdigit():
sum_ += int(line.strip())
return sum_
if __name__ == "__main__":
# asyncio.run(filter_txt("files")) # Python 3.7可以直接运行
loop = asyncio.get_event_loop() # 获取事件循环器(事件模型-select/poll/epoll)
# loop.run_until_complete(filter_txt("files")) # 单任务
# 多任务协程
loop.run_until_complete(asyncio.wait([filter_txt("files"),
filter_txt("file2")]))
async def read_txt_number(filepath):
with open(filepath) as f:
sum_ = await sum_file(f)
return sum_
async def sum_file(f):
sum_ = 0
for line in f:
if line.isdigit():
sum_ += int(line.strip())
return sum_
总结:
- 使用asyncio模块
- 函数上方使用@asyncio.coroutine或async关键字,使得变成协程对象
- 在协程函数中,使用yield from 或 await关键字,告知事件模型当前协程等待其它协程完成任务
- 协程对象需要在事件模型中运行
- loop = asyncio.get_event_loop() 获取事件模型对象
- loop.run_until_complete(协程对象)
- 协程对象,默认是单任务
- 多个协程对象,可以通过列表方式和asynio.wait()函数封装多任务协程对象
# coding: utf-8
import asyncio
@asyncio.coroutine
def sum_(*args):
print(f"开始计算{args}的和 ")
s = yield from total(*args)
yield from asyncio.sleep(0.5)
print(args, "--->", s)
@asyncio.coroutine
def total(*args):
return sum(args)
if __name__ == "__main__":
# 1. 获取事件模型对象
loop = asyncio.get_event_loop()
# 2. 根据协程的任务数量,可以选择单任务或多任务
# 2.1 单任务
sum_1 = sum_(1, 2, 3)
sum_2 = sum_(5, 6, 7)
sum_3 = sum_(50, 60, 70, 80, 90)
# 2.2 多任务
tasks = asyncio.wait([sum_1, sum_2, sum_3])
# 3. 运行事件模型
loop.run_until_complete(tasks)
作业: 通过popen()方式执行ip addr
命令,将数据解释出来网卡名:IP
,将程序封装为协程。另外尝试基于socket通信,实现客户端进程向发送服务进程发送Linux执行的命令。
四、数据库编程
4.1 数据库的认知
关系型数据库: SQLServer(字符集: GBK/GB2312)、MySQL(版本: 5.5, 5.6、5.7, 8.0)、Oracle(9i/10g/11g, 12c)、DB2、 MariDB(同MySQL是一个开发者)等。
非关系型数据库: redis、mongodb(js)、elasticsearch 全文检索引擎(Lucene)
大数据库: hive、hbase、spark(Hadoop)
常用的字符集:
ASCII : 一个字符一个字节(8位)
各国家语言: 中国 GBK、GB2312 一个汉字占两个字节, 一个ascii占一个字节
万国编码:
unicode 每个字符占2个字节, 不支持ascii码
utf-8 支持ascii码, 即英文字符占1个字节, 中文占三个字节
4.2 MySQL必备知识
4.2.1 root用户
安装完mysql之后,进入mysql数据库进行修改root的权限(口令、远程连接)
# mysql
mysql> use mysql
mysql> select user, host from user;
mysql> update user set host="%" where user="root";
mysql> alert user "root"@"%" with mysql_native_password identified by "root";
mysql> grant all privileges on *.* to "root"@"%" identified by "root";
mysql> flush privileges;
4.2.2 查看数据库的引擎
mysql> show engines;
支持数据库引擎: MEMORY(内存)、InnoDB(默认的,支持事务、行级锁、外键)、MyISAM、CSV等
4.2.3 常见的数据操作
4.2.3.1 数据库管理
创建数据库:
create database db_name default charater set utf8;
create database db_name charset utf8;
删除数据库:
drop database db_name;
drop database if exists db_name; -- 如果db_name存在,则删除,不存在则继续。
修改数据库:
alert database db_name default character set utf8;
show create database db_name; -- 查看创建数据库的语句
打开数据库:
use db_name
4.2.3.2 数据表操作
创建表:
create table table_name(
field_name 字段的类型(长度或大小) 约束, -- 字段级约束
field_name 字段的类型 约束,
constraint 约束类型 字段 -- 表级约束
)
字段类型:
varchar 字符类型
interger 整型,一般用于主键
float 小数类型
enum 枚举类型
date 日期类型, 年月日格式
time 时间类型, 时分秒格式
timestamp 日期时间类型
int 整型
long 长整型
约束:
primary key 主键约束
not null 非空约束
unique 唯一
foreign key 外键, 只有 InnoDB引擎存在
auto_increment 自增
default 默认值
check 检查约束, 目前MySQL不支持, SQLServer和Oracle支持。
练习:
创建以下4张表:
student 表: sn 学号、name 姓名、 age 出生日期、 sex 性别
cource 表: cn 课程号, name 课程名, tn 教师号
tearch 表: tn 教师号, name 教师名
score 表: sn 学号, cn 课程号 , score 成绩
它们之间的关系:
1. cource 表的tn 是外键与 tearch表的tn关联
2. score 表中sn和cn是外键,分别与student表和cource进行关联
3. sn、cn、tn 都是主键
4. cource表的name是唯一的
-- vi /root/sql/1.sql
-- coding: utf8
drop database if exists stu;
create database stu charset utf8;
use stu;
create table student(
sn varchar(20) primary key,
name varchar(20) not null,
age timestamp default current_timestamp,
sex char(2) default "男"
);
create table cource(
cn varchar(20) primary key,
name varchar(20) not null unique,
tn varchar(20)
);
create table teacher(
tn varchar(20) primary key,
name varchar(20) not null
);
-- 修改cource 表增加外键约束
-- on delete cascade 级联删除(主表中的数据删除后,从表的数据也会删除)
-- on delete set null 级联置null(主表中的数据删除后,从表的数据则会设置为null)
-- on update cascade 级联更新
alter table cource
add constraint fk_cource_tn foreign key(tn)
references teacher(tn) on delete set null;
create table score(
sn varchar(20) references student(sn), -- 列级别的外键约束
cn varchar(20) references cource(cn),
score float(10,2) comment "成绩"
) comment "成绩表";
mysql> source /root/sql/1.sql
mysql> show tables;
-- 2.sql
insert into student(sn, name, age, sex) values
("01" , "赵雷" , "1990-01-01" , "男"),
("02" , "钱电" , "1990-12-21" , "男"),
("03" , "孙风" , "1990-05-20" , "男"),
("04" , "李云" , "1990-08-06" , "男"),
("05" , "周梅" , "1991-12-01" , "女"),
("06" , "吴兰" , "1992-03-01" , "女"),
("07" , "郑竹" , "1989-07-01" , "女"),
("08" , "王菊" , "1990-01-20" , "女");
insert into teacher(tn, name) values
("01" , "张三"),
("02" , "李四"),
("03" , "王五");
insert into cource(cn, name, tn) values
("01" , "语文" , "02"),
("02" , "数学" , "01"),
("03" , "英语" , "03");
insert into score(sn, cn, score) values
("01" , "01" , 80),
("01" , "02" , 90),
("01" , "03" , 99),
("02" , "01" , 70),
("02" , "02" , 60),
("02" , "03" , 80),
("03" , "01" , 80),
("03" , "02" , 80),
("03" , "03" , 80),
("04" , "01" , 50),
("04" , "02" , 30),
("04" , "03" , 20),
("05" , "01" , 76),
("05" , "02" , 87),
("06" , "01" , 31),
("06" , "03" , 34);
-- 尝试删除 教师编号为01的教师记录
begin; -- 开启事务
delete from teacher where tn="01";
select * from cource; -- 发现数学的教师的编号为Null
-- 执行创建表或数据库的语句(DDL)后,事务会自动提交
create database a;
rollback; -- 回滚事务,撤消开启事务之后的一切更新操作。
DDL(Database Define Language) 数据库描述或定义语言
数据库、表结构、视图、索引、约束的创建、修改、删除等相关的SQL语句。
DML (Database Manipulation Language) 数据库操纵语言
查询、修改、增加、删除表的数据
DCL (Database Control Language) 数据库控制语言
修改数据库运行环境的语句:set 、 grant、 source
DTL (Database Transaction Language) 数据库事务语言
begin
commit
rollback
-- 添加数学的老师为 张三, 01
-- desc[ribe] teacher 查看表结构
-- order by name desc 排序中的降序,默认是asc升序
insert into teacher(name, tn) values("张三", "01");
update cource set tn="01" where name="数学";
docker命令:
docker pull mysql 下载镜像
docker run -itd --name db1 -v /Users/apple/sql:/usr/src/sql -v /Users/apple/mysql_conf/conf.d:/etc/mysql/conf.d -e MYSQL_ROOT_PASSWORD=root -p 3307:3306 mysql
docker exec -it db1 bash
day07: 中午默写
1.写出线程Queue队列的核心方法
- empty()
- full()
- qsize()
- put(block=True, timeout)
- get(block=True,timeout)
- put_nowait()
- get_nowait()
2.写出socket对象的核心方法
- bind(addr) # "可以是.socket文件"也可以是("", port)
- listen()
- accept() 阻塞方法,等待客户端连接,返回客户端的scoket对象和addr地址
- connect(addr) 连接服务端(addr同bind参数类同,必须指定服务端地址)
- recv(buffer_size) 接收另一端发送过来的字节码数据
- send(byte_data) 向另一端发送字节码数据
- close() 关闭连接
3.写出协程相关的函数
- @asyncio.coroutine
- asyncio.get_event_loop() 返回事件模型对象
- loop.run_until_complete(task)
- asyncio.wait()
- asyncio.sleep(delay_seconds)
- 涉及关键字
- yield from
- async
- await
- IO多路复用的事件模型
- select 事件论询
- poll 事件回调
- epoll 增强事件回调
练习-2:
-- 1、查询"01"课程比"02"课程成绩高的学生的信息及课程分数
-- 多表连接方式
select s.*,
s1.cn as "c01",
s1.score as "c01_score",
s2.cn as "c02",
s2.score as "c02_score"
from score s1
join score s2 on(s1.sn = s2.sn)
join student s on (s1.sn = s.sn)
where s1.cn="01"
and s2.cn="02"
and s1.score > s2.score
-- if(条件,为真的值,为假的值) 函数写法
select s.*, A.c01, A.c02
from student s
join (
select sn,
max(if(cn="01", score, 0)) as c01,
max(if(cn="02", score, 0)) as c02
from score
GROUP BY sn
) A on (A.sn = s.sn)
WHERE A.c01 > A.c02
面试题:
现有表A数据:
year month amout
1991 1 1.1
1991 2 1.2
1992 1 2.1
1992 2 2.2
如何编写SQL脚本,实现如下的查询结果:
year m1 m2
1991 1.1 1.2
1992 2.1 2.2
select year,
max(if(month=1, amout,0)) as m1,
max(if(month=2, amout,0)) as m2
from A
group by year;
-- 8、查询没学过"张三"老师授课的同学的信息
select *
from student s
where s.sn not in (
select distinct sc.sn
from teacher t
join cource c on (c.tn=t.tn)
join score sc on (sc.cn=c.cn)
where t.name = "张三"
);
-- 18、查询各科成绩最高分、最低分和平均分:以如下形式显示:课程ID,课程name,最高分,最低分,平均分,及格率,中等率,优良率,优秀率
-- 及格为>=60,中等为:70-80,优良为:80-90,优秀为:>=90
select sn,cn,score,
case when score >=60 then "及格"
when score >=70 then "中等"
when score >=80 then "优良"
when score >=90 then "优秀"
else "不及格" end status
from score;
select sn,cn,score,
case when score >=90 then 1
else 0 end d1,
case when score >=80 and score <90 then 1
else 0 end d2,
case when score >=70 and score <80 then 1
else 0 end d3,
case when score >=60 then 1
else 0 end d4
from score;
select sn,cn,score,
if(score>=90, 1, 0) d1,
if(score>=80 and score < 90, 1, 0) d2,
if(score >=70 and score < 80, 1, 0) d3,
if(score >=60,1, 0) d4
from score;
-- 最终SQL
select c.cn, c.name,
max(sc.score) "最高分",
min(sc.score) "最低分",
avg(sc.score) "平均分",
round(sum(A.d1)/count(A.d1),2) "及格率",
round(sum(A.d2)/count(A.d2),2) "中等率",
round(sum(A.d3)/count(A.d3),2) "优良率",
round(sum(A.d4)/count(A.d4),2) "优秀率"
from cource c
join score sc on (sc.cn = c.cn)
join (
select cn,
if(score>=60, 1, 0) d1,
if(score>=70 and score<80, 1, 0) d2,
if(score>=80 and score<90, 1, 0) d3,
if(score>=90, 1, 0) d4
from score
) A on (A.cn = sc.cn)
group by c.cn, c.name;
4.2.4 事务特性-ACID
Atomicity(原子性): 事务中的所有操作要么都成功,要么都失败。
Consistency(一致性): 事务开始和结束的数据保持一致的(保持数据的完整性)。
Isolation(隔离性): 事务之间互不影响
Durability(持久性): 数据存储之后,即使系统出现故障,数据也会持久保存。
事务的四个隔离级别 isolation level:
read uncommitted 读未提交: A事务可以读B事务未提交的修改数据。
问题:【脏读】【不可重复读】【幻读】
read committed 读已提交: A事务可以读取B事务已提交的数据。 【不可重复读】【幻读】
repeatable read 可重复读【默认】: A事务修改数据时,会读取其它事务提交后数据,再进一步修改,保持数据的一致性。 【幻读】
serializable 序列化或串行化: 多个事务在修改同一条数据时,必须等待其他事务完成才能执行。
客户端设置事务隔离级别的语句:
set session transaction isolation level 隔离级别;
4.2.3 常见数据库的函数
4.2.3.1 聚合函数
count() 统计行数
sum() 计算总和
max() 最大值
min() 最小值
avg() 平均值
4.2.3.2 字符函数
concat(a, b) 将a和b拼接成一个字符串
length("子符串") 计算字符串的字节长度
char_length() 计算字符个数
lower() 小写转换
upper() 大写转换
replace(str, sub, replace_str) 替换子子符串
trim() 同python的strip()函数,删除两连的空白
instr(str, sub) 返回sub在str的首次出现的位置, 索引位置是从1开始
lpad(str,len,sub) 如果str的长度不足时,在左边填充sub字符, 同Python的str对象的rjust()
rpad(str, len, sub) 如果str的长度不足时,在右边填充sub字符, 同Python的str对象的ljust()
char(N) 将AscII数值转成字符, N可以是一个字符,也可以是数值
ord(str) 将ASCII字符转成数值
format(n, 小数位长) 将n转成带千位符‘#,###.##’格式的字符串
4.2.3.3 数值函数
round(n, 小数位长) 四舍五入
ceil(n) 上行取整,比n大的,最小的整数
floor() 下行取整,比n小的,最大的整数
pow(n, c) 计算n的c次方
mod(n1, n2) 计算两者的余数
rand() 随机产生[0,1)之间数,如 ceil(rand()*15) + 1,生成[1,16]之间的数
abs() 求绝对值
sin()/asin()/cos()/acos()/tan()/atan()/con() 数学相关的三角函数
4.2.3.4 日期函数
current_date()/curdate() 当前日期
current_timestamp()/now() 当前日期时间
curerent_time()/curtime() 当前时间
year() 获取日期中年
month() 获取日期中的月
day() 获取日期中的天
dayofweek() 获取星期几, 1是星期日, 7是星期六
dayofyear() 获取一年中的第几天
weekofyear() 获取一年中第几周
hour()
minute()
second()
datediff(now(), "2019-10-11") 获取两个日期之间的间隔天数
timediff("12:10:09", "15:10:09") 获取两时间之间的问隔时间
str_to_date(日期字符串, "格式") 将字符串转成日期, 格式: %Y-%m-%d %H:%i:%s
date_format(data, "格式") 将时间转成字符串
date_add(date, interval n 时间关键字) 某一时间添加 n的间隔(关键字)时长后的日期时间
时间关键字: year,month, day, hour, minute, second, week
date_sub(data, interval n 时间关键字) 某一时间减少n的间隔(关键字)时长后的日期时间
-- 15天之后是几号?星期几? 是一年中第几天?
select date_add(now(), interval 15 day) "15天后的日期", dayofweek(date_add(now(), interval 15 day))-1 "星期", dayofyear(date_add(now(), interval 15 day)) "年的天";
-- 46、查询各学生的年龄
select sn,name,sex,ceil(datediff(now(), age)/365) from student;
-- 47、查询本周过生日的学生
select *
from student
where weekofyear(now()) = weekofyear(age);
-- 48、查询下周过生日的学生
select *
from student
where weekofyear(now())+1 = weekofyear(age);
-- 49、查询本月过生日的学生
select *
from student
where month(now()) = month(age);
-- 50、查询下月过生日的学生
select *
from student
where month(now())+1 = month(age);
4.2.3.5 条件函数
if(条件, 条件为真的结果, 条件为假的结果)
ifnull(v1, v2) 如果第一个值为null时,返回v2, 反之返回v1。
case 分支语句:
-- MySQL 不支持全外连接
select s.*, ifnull(sc.score, 0) score
from student s
left join score sc on (sc.sn = s.sn);
select sn, name, sex, age,
case month(age) when 1 then "A"
when 2 then "B"
when 3 then "C"
when 4 then "D"
when 5 then "E"
when 6 then "F"
when 7 then "G"
when 8 then "H"
when 9 then "I"
when 10 then "J"
when 11 then "K"
when 12 then "L"
else "Z" end "level"
from student
order by level; -- 只有order by语句可以使用 查询中的新增列名
-- where level = "A"; where 语句只针对原有表字段
select sn, name, sex, age,
case when month(age)>=2 and month(age)<=4 then "A"
when month(age)>=5 and month(age)<=7 then "B"
when month(age)>=8 and month(age)<=10 then "C"
else "D" end "level"
from student
order by level;
4.2.3.6 加密算法函数
password() -- MySQL5存在的, MySQL 8不存在
md5(str) 将str转成md5编码字符,长度32位
sha1() 将str转成sha1编码字符,长度40位
uuid() 获取uuid的字符串, 字符串中带有`-`符号,可以使用replace()函数将其去除。
4.2.4 高级子查询
4.2.4.1 多表连接查询
-- join on 方式
-- 内连接 join on
-- 左外连接 left join on
-- 右外连接 right join on
select s.*, sc.*, c.*
from student s
left join score sc on (sc.sn=s.sn)
right join cource c on (c.cn = sc.cn)
-- 等值连接条件(属于内连接)
select s.*, sc.*, c.*
from student s, score sc , cource c
where s.sn = sc.sn
and sc.cn = c.cn;
-- 联合查询
select * from s1
union all -- all 表示不去重
select * from s1;
4.2.4.2 子查询
作用:
- 可以作为条件使用
- 可以创建视图或表
- 批量插入数据
-- 将student表分成两张表, 以出生的月份作为条件(6月中位线)
drop table if exists s1;
create table s1
select * from student
where month(age) <=6;
drop table if exists s2;
create table s2
select * from student
where month(age) >6;
-- 将s1的数据合并到s2中
insert into s2
select * from s1;
4.2.4.3 视图
视图: 将复杂的查询语句进行封装,对视图查询,即是将视图对应的SQL语句作为子查询语句使用。
-- 查询学生的所有信息(基本信息,成绩、课程名、教师名)
create view v_all_student as
select s.*, sc.score, c.name cource, t.name as teacher
from student s
left join score sc on (sc.sn=s.sn)
join cource c on (c.cn=sc.cn)
join teacher t on (t.tn = c.tn)
删除视图:
drop view view_name;
day08: 中午默写
1.写出dict对象相关的方法
value = d.get(key) 获取key的值
d.setdefault(key, default) 设置默认值
keys() 获取所有的key
values() 获取所有的value
pop(key) 弹出key
k,v = popitem() 弹出key,value的元组
fromkeys([key...]) 基于key列表生成dict对象
update({}) 更新字典
clear() 清空
copy() 浅拷贝
items() 获取(key,value)元组为元素的列表
2.简述并行、串行、同步、异步的认知
并行: 多个程序或线程同一时段运行,由于同一时间点只能运行某一个任务,因此多个程序交替式运行。
串行: 一个任务接一个任务去运行或执行,如A->B->c->D->Over。
同步: 当A调用B时,等待B完成后,再继续执行A。
异步: 当A需要调用B和C时,可以以回调方式同步运行B和C,当其中某一任务完成后,通过回调试将数据回给A即可。这种场景A、B和C处于并行状态,A调用B或C时,不需要等待它们立即完成,只提供回调即可。
A():
def callback(ret):
# 10秒之后继续
pass
B(callback) # B 需要10秒, callback(102)
# 立即执行
3.MySQL数据库支持哪些引擎
可以通过show engines语句查看MySQL支持的引擎
InnoDB: 支持事务、外键、行锁
MyISAM: 普通引擎,不支持事务和外键
Memory: 内存存储,无持久化
CSV: csv格式存储数据, 体积小,文本数据
4.3 Python的MySQL编程
4.3.1 pymysql库
pymysql是属于第三方库, 需要安装:
(py201) d:\codes> pip install pymysql -i https://mirrors.aliyun.com/pypi/simple
# 导包
from pymysql.connections import Connection
from pymysql.cursors import DictCursor
# DB配置
db_params = {
"host": "10.12.153.232",
"port": 3307,
"user": "root",
"passwd": "root",
"charset": "utf8",
"db": "stu",
"autocommit": True, # 自动提交事务
"cursorclass": Cursor
}
# 测试连接
conn = Connection(**db_params)
print("--OK--")
# 执行查询SQL
# 查询没有上"张三"老师课程的学生信息
sql8 = """
select *
from student s
where s.sn not in (
select distinct sc.sn
from teacher t
join cource c on (c.tn=t.tn)
join score sc on (sc.cn=c.cn)
where t.name = %s
)
"""
def query_not_cource(teacher_name):
"""
查询未学习过指定老师所带课程的学生信息
:param teacher_name 教师姓名
:return:
"""
cursor = conn.cursor() # 获取游标对象
cursor.execute(sql8, (teacher_name, )) # 执行SQL脚本
# 获取查询结果
print(cursor.fetchall()) # 返回是可迭代的对象,元素是元组
# 关闭游标
cursor.close()
if __name__ == "__main__":
query_not_cource("张三")
conn.close() # 关闭连接
# 优化函数
def query_not_cource(teacher_name):
"""
查询未学习过指定老师所带课程的学生信息
:param teacher_name 教师姓名
:return:
"""
with conn as c:
c.execute(sql8, (teacher_name, )) # 执行SQL脚本
# 获取查询结果
print(c.fetchall())
# 练习1-尝试获取rowcount属性值
# rowcount 返回查询数据的行数,插入、修改或删除操作后的影响的记录数
def add_teacher(**data):
sql = "insert into teacher values( %(tn)s, %(name)s )"
with conn as c:
ret = c.execute(sql, data)
print("-->>", ret)
print("--rowcount--", c.rowcount)
if c.rowcount > 0:
print("新增 教师成功")
# 练习2-尝试获取stu库中所有的对象(表、视图、索引、函数和存储过程)
# mysql存在一个字典库: information_schema
def show_all_table():
# mysqlclient 是否支持设置事务的isolation level 隔离级别
with conn as c:
sql = """
select table_name,table_type
from information_schema.tables
where table_schema = "stu"
"""
c.execute(sql)
for table_name in c.fetchall():
print("-->", table_name)
# 思考: 批量插入1000条数据,最优的写法是什么?
# 默认情况下,插入一条提交一次事务。最优的做法是先开事务、插入数据、提交事务。
conn.begin()
# 执行相关的语句
conn.commit()
4.3.2 DAO设计
类的类型:
- 数据类、 实体类(Entity),表示业务中与数据表对应的类。
- 业务类、功能类(Service),对数据类进行增删改查操作。
- 工具类,扩展功能(Utils或Helper),在处理业务时,方便调用,如md5加密、日志记录、缓存、短信验证、图片验证码、简化网络操作。
- 数据操作类 DAO类: 只对象数据实体类,简化对象与数据库的操作。
class Singleton:
def __init__(self, cls):
self.cls = cls
def __call__(self, *args, **kwargs):
if not hasattr(self.cls, "instance"):
self.cls.instance = self.cls(*args, **kwargs)
return self.cls.instance
@Singleton
class DB():
def __init__(self):
params = {
"host": "10.12.153.232",
"port": 3307,
"user": "root",
"passwd": "root",
"charset": "utf8",
"db": "stu",
"cursorclass": pymysql.cursors.DictCursor
}
self.conn = pymysql.Connect(**params)
def __enter__(self):
return self.conn.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
return self.conn.__exit__(exc_type, exc_val, exc_tb)
def __del__(self):
self.conn.close()
DictCursor字典游标类,查询结果自动转化列表+字典的结构,这样的结果便于json序列化。
class BaseDao():
def __init__(self, table_name=None, pk_name="id"):
self.table = table_name # 当前Dao操作的表名称
self.pk = pk_name # 当前Dao操作的表的主键名
self.db = DB()
def get_table(self, obj=None):
if self.table is None:
self.table = obj.__class__.__name__.lower()
return self.table
def save(self, t):
# sql = "insert into teacher(tn, name) values( %(tn)s, %(name)s )"
sql = "INSERT INTO %s(%s) VALUES (%s)"
table = self.get_table(t)
colums = ",".join(t.__dict__)
colums_placeholds = ",".join(["%%(%s)s" % name for name in t.__dict__])
with self.db as c:
c.execute(sql % (table, colums, colums_placeholds), t.__dict__)
self.msg("Add", c.rowcount)
def update(self, t):
# update student set name=%(name)s,age=%(age)s,sex=%(sex)s where sn="08"
sql = "UPDATE %s SET %s WHERE %s"
obj_dict = t.__dict__
# 获取主键值
pk = obj_dict.pop(self.pk) # 弹出主键列和它的值, 避免set 语句中出现主键值更新
update_cols = ",".join("%s=%%(%s)s" % (col_name, col_name) for col_name in obj_dict)
where_str = f"{self.pk}={pk}"
with self.db as c:
c.execute(sql % (self.get_table(), update_cols, where_str), obj_dict)
self.msg("Update", c.rowcount)
def remove(self, pk):
sql = f"delete from {self.get_table()} where {self.pk}={pk}"
with self.db as c:
c.execute(sql)
self.msg("Delete", c.rowcount)
def remove_batch(self, *pks):
with self.db as c:
for pk in pks:
sql = f"delete from {self.get_table()} where {self.pk}={pk}"
c.execute(sql)
self.msg("Delete", c.rowcount)
def get(self, cls, pk):
# select * from student where sn=%s
sql = "SELECT * FROM %s WHERE %s"
with self.db as c:
c.execute(sql % (self.table, f"{self.pk}={pk}"))
ret = c.fetchone()
return cls(**ret)
def list(self, cls):
sql = f"select * from {self.get_table()}"
with self.db as c:
c.execute(sql)
ret = c.fetchall()
# 将[{}] 转成 [<对象>]
return [cls(**dict_obj) for dict_obj in ret]
def msg(self, title, rowcount):
if rowcount > 0:
print(f"{title} OK")
else:
print(f"{title} Error")
class TeacherDao(BaseDao):
def __init__(self):
super(TeacherDao, self).__init__("teacher", "tn")
def save(self, t):
super(TeacherDao, self).save(t)
def update(self, t):
super(TeacherDao, self).update(t)
def remove(self, pk):
super(TeacherDao, self).remove(pk)
def get(self,pk):
return super(TeacherDao, self).get(Teacher, pk)
def list(self):
return super(TeacherDao, self).list(Teacher)
def remove_batch(self, *pks):
super(TeacherDao, self).remove_batch(*pks)
if __name__ == "__main__":
dao = TeacherDao()
# t1 = Teacher("08", "Disen")
# dao.save(t1)
#
# t2 = dao.get("08")
# t2.name = "狄老师"
# dao.update(t2)
#
# print(dao.get("08"))
# dao.remove("08")
dao.remove_batch("04", "05", "06")
print(dao.list())
4.3.3 ORM设计
# ORM(Object Relationshap Mapping)对象关系映射
# 元类: 创建类的类, 默认的元类是type
#!/usr/bin/python3
# coding: utf-8
class Field():
pass
class CharField(Field):
def __init__(self, max_length=50):
self.max_length = max_length
class IntegerField(Field):
def __init__(self, default=0):
self.default = default
# 定义模型元类
class ModelMeta(type):
def __new__(cls, cls_name, cls_bases, cls_attrs):
if cls_name == "Model":
return type.__new__(cls, cls_name, cls_bases, cls_attrs)
# 提取实体类的字段名和表名
attrs = {}
for k, v in cls_attrs.items():
if isinstance(v, Field):
attrs[k] = v
table_name = cls_name.lower()
meta = cls_attrs.get("Meta", None)
if meta:
if hasattr(meta, "db_table"):
table_name = meta.db_table
cls_attrs.pop("Meta")
cls_attrs["attrs"] = attrs
cls_attrs["table_name"] = table_name
return type.__new__(cls, cls_name, cls_bases, cls_attrs)
class Model(metaclass=ModelMeta):
def get_db_type(self, field):
if isinstance(field, CharField):
return f"varchar({field.max_length})"
elif isinstance(field, IntegerField):
return f"integer default {field.default}"
return "varchar(50)"
def create(self):
# create table table_name (col_name col_type(length), ..)
cols = ",".join([
"%s %s" % (k, self.get_db_type(v))
for k, v in self.attrs.items()
])
sql = f"create table {self.table_name}({cols})"
print(sql)
def save(self):
pass
def update(self):
pass
class TeacherModel(Model):
tn = CharField(max_length=20)
name = CharField(max_length=20)
level = IntegerField(default=10)
class Meta:
db_table = "teacher"
if __name__ == "__main__":
t = TeacherModel()
t.create()
day09: 中午默写
1.简述一下事务的ACID?
ACID是事务的四个特性,它们的含义如下:
A:Atomicity 原子性, 表示事务中的操作要么都成功,要么都失败。
C:Consistency 一致性: 事务开始到结束的数据保持一致
I: Isolation 隔离性: 多个事务之间互不影响
D: Durability 持久性: 数据提交后,持久保存数据(即使系统崩溃,不会影响数据)
2.简述子查询语句使用位置
- where条件中使用
- join 或 from 作为数据表使用
- create table 创建表时使用
- create view 创建视图时使用
- insert 插入语句中使用
- union 联合查询多表时使用
3.写出MySQL中字符、日期、数值的常用函数(每类至少3个)
字符函数: lower(), upper(), replace(), instr(), length, char_length(), concat(), lpad(), rpad(), trim()< char(), ord(), format(str, n)
日期函数: year()/month()/day()/hour()/minute()/week()
dayofweek() dayofyear() weekofyear()
date_add(,interval n 日期时间关键字) date_sub()
str_to_date() date_format()
now()/current_timestamp
current_date/curdate()
current_time/curtime()
数值函数: abs()/round()/ceil()/floor()/pow()/mod()
sin()/cos()/tan()/con()
rand()
五、Docker的应用
Docker是镜像容器的管理工具, 实现虚拟化技术 替代类传于VMware或VirtualBox虑拟环境。Docker基于硬件虚拟化, 实现多个独立的容器(微型操作系统)在同等的硬件资源下和基础的操作系统(Ubuntu)上构建的虚拟环境。
Docker简化的开发到运维的过程, 要求开发人员具备运维能力。
5.1 Centos7安装Docker
yum update
yum install -y yum-utils device-mapper-persistent-data lvm2
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
yum list docker-ce --showduplicates | sort -r
uname -a
以两个命令搜索适当的docker版本。
yum install docker-ce
安装完成后,检查docker环境
docker version
只有客户端信息,并没有启动docker服务。
运行以下命令启动docker后台服务:
systemctl start docker
如果想开机启动docker服务,则运行以下命令:
systemctl enable docker
systemctl disable docker
5.2 Docker的加速配置
Docker中运行的镜像从Docker Hub远程仓库中下载的,因为在外,需要配置加速器。
编辑/etc/docker/daemon.json文件, 如果文件不存在,则直接创建:
vi /etc/docker/daemon.json
{
"registry-mirrors": ["https://y4tay211.mirror.aliyuncs.com"]
}
修改完成后,执行以下两个命令:
systemctl daemon-reload
systemctl restart docker
5.3 镜像和容器的关系
类似于类和对象的关系, 镜像对应是类, 容器对应是对象, 一个镜像可以运行多次成为容器,同一个类可以创建多个实现的思想一样。当然,在容器中安装或修改相关的内容之后,可以将容器保存为镜像。
一般开发人员,通过容器配置项目开发环境,等开发完成后,将容器保存为镜像,并向本地仓库上传,其他相关人员即可以从本地仓库下载并运行。
5.4 镜像命令
5.4.1 搜索命令
docker search mysql
根据搜索的镜像列表,可以看到完整的镜像名称,以便下载。
5.4.2 下载镜像
docker pull 镜像名
5.4.3 查看本地镜像
docker images
5.4.4 删除镜像
要求: 运行的容器没有一个属于待删除镜像的容器。
docker rmi 镜像全名[:tag]或ID
5.4.5 运行镜像
docker run [-i -t -d] [--name name] [-v local_path:path] [-p local_port:port] [--network network_name] [-e envirment_name=value] 镜像全名[:tag]或ID
-i 表示input,可以进入容器中,
-t 表示打开新的terminal 终端, 一般和-i
组合使用
-d 让容器处于后台运行, 同步返回容器的ID
–name 指定容器的名称
-v 将本地的文件路径和容器的文件绑定,同步文件数据。
-p 将本地的端口与容器中服务端口绑定。
-e 指定容器的环境变量,容器的程序在运行时需要的变量。
5.4.6 部署mysql示例
docker run -tid --name db2 -p 3307:3306 -e MYSQL_ROOT_PASSWORD=root -v /root/sql:/usr/src/sql mysql
5.5 容器命令
5.5.1 查询容器状态
因为容器是Docker守护进程(后台进程)的子进程,执行以下命令可以查看正在运行的容器信息:
docker ps
如果想查看最近一个运行的容器:
docker ps -l
如果想查看所有的容器,包含未运行的:
docker ps -a
5.5.2 启动与停止容器
docker stop 容器名或容器ID
docker start 容器名或容器ID
docker restart 容器名或容器ID
5.5.3 进入容器
当容器启动后,可以进入到容器中,执行Linux相关的命令,完成软件安装或项目部署。
docker exec -it 容器名或ID bash
5.5.4 执行容器命令
如果只想单独执行某一项命令:
docker exec 容器名或ID Linux命令
# 查看db1容器中的/usr/src/sql目录的所有文件
Disen:~ apple$ docker exec db1 ls /usr/src/sql -la
total 32
drwxr-xr-x 9 root root 288 May 28 03:48 .
drwxr-xr-x 1 root root 4096 May 26 07:31 ..
-rw-r--r-- 1 root root 1026 May 26 07:29 1.sql
-rw-r--r-- 1 root root 968 May 26 08:31 2.sql
-rw-r--r-- 1 root root 87 May 27 03:07 3.sql
-rw-r--r-- 1 root root 30 May 27 03:40 4.sql
-rw-r--r-- 1 root root 175 May 27 07:24 5.sql
-rw-r--r-- 1 root root 622 May 27 02:07 w18.sql
-rw-r--r-- 1 root root 178 May 27 01:53 w8.sql
5.5.5 复制文件
通过docker cp命令将本地文件复制到容器中,或者容器中文件复制到本地来。
docker cp 容器名:源文件路径 本地目标路径
docker cp 本地源路径 容器名:目标文件路径
作业:
1. 完成课程表和成绩表的Dao
2. 部署ES搜索引擎服务
5.5.6 容器日志
当容器启动后,可以查看容器中服务的运行日志:
docker logs 容器名或ID
5.5.7 删除容器
docker rm [-f] 容器名或ID
-f 如果容器运行的状态下,强制停止容器并删除。
六、搜索引擎编程
Lucene是apache软件基金会4 jakarta项目组的一个子项目,是一个开放源代码的全文检索引擎工具包,但它不是一个完整的全文检索引擎,而是一个全文检索引擎的架构,ElasticSearch(简称ES)是基于Lucene引擎实现的分布式全文检索的应用服务, ES基于RESTful WEB接口方式,便于客户端(使用者-开发者)快速与ES进行交换。
6.1 基于Docker部署ES
下载镜像
docker pull elasticsearch
启动镜像
docker run -itd --name es1 -e discovery.type=single-node -p 9200:9200 elasticsearch
查看容器启动的日志
docker logs es1
如果出现的started信息时,表示es服务已启动。
在浏览器,输入如下地址:
http://localhost:9200
如果是云服务器, localhost即是服务器外网IP,而且在云服务的安全组,将9200端口放开。
http://47.105.137.19:80/
6.2 API接口说明
API(Application Programing Interface)应用程序接口。
ES 基于RESTful 接口规范,规范的四个方面:
- 基于HTTP协议,且请求无状态,即服务器不维护会话连接。
- 每个服务器资源都具有URI(统一资源标识符)
- 每一个资源都具有GET、POST、 PUT、DELETE等四个请求方法
- 交互的数据使用json或xml。
ES-API接口常识:
index_ 索引名,类似于数据库
type_ 文档数据类型,类似于表
doc_ 文档数据, 类似于一条表中的记录
doc_id 文档数据的唯一标识
base_ = "http://10.12.153.232:9200"
6.2.1 索引接口
index_uri = "/{index_}/"
6.2.1.1 创建索引
请求方法: PUT
请求JSON参数:
{
"settings" : {
"number_of_shards" : 5,
"number_of_replicas" : 1
}
}
响应数据:
{
"acknowledged": true,
"shards_acknowledged": true,
"index": "aa_indexs"
}
6.2.1.2 查看索引
请求方法: GET
请求参数: 无
响应数据:
{
"student2": {
"aliases": {
},
"mappings": {
},
"settings": {
"index": {
"creation_date": "1590719367325",
"number_of_shards": "5",
"number_of_replicas": "1",
"uuid": "cJJW2qkdQLCLgXoTqUG8yg",
"version": {
"created": "5061299"
},
"provided_name": "student2"
}
}
}
}
6.2.1.3 删除索引
请求方法: DELETE
请求参数: 无
响应数据:
{"acknowledged": True}
6.2.2 文档接口
uri = '/{index_name}/{type_name}/[{doc_id}/]'
【注意】一个索引库下只能添加一类文档的数据。
doc_id可以不存在,表示自动创建id, 如果存在话,系统使用指定文档ID。
6.2.2.1 添加文档
示例接口: uri = ‘/stu_index/stu/02/’
请求方法: POST
请求json参数:
{
"name": "disen",
"age": "2019-10-10"
}
响应json数据:
{
"_index": "stu_index",
"_type": "stu",
"_id": "02",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
6.2.2.2 更新文档
示例接口: ‘/stu_index/stu/02’
请求方法: PUT
请求json参数: 同添加文档的参数。
6.2.2.3 删除文档
示例接口: ‘/stu_index/stu/03’
请求方法: DELETE
6.2.2.4 查看文档
示例接口: ‘/stu_index/stu/02’
请求方法: GET
正常返回结果:
{
"_index": "stu_index",
"_type": "stu",
"_id": "02",
"_version": 1,
"found": true,
"_source": {
"name": "李五",
"age": "1990-10-11",
"sex": "男"
}
}
6.2.3 搜索接口
检索的请求方法: GET
请求URI: ‘{index_name}/_search’
6.2.3.1 全文检索
URI: /_search
URI: `/_all/cource,teacher/_search
从所有的索引库中找cource和teacher两个文档类型的数据。
URI:/c*,t*/_search
从以c和t开头的索引库搜索。
6.2.3.2 条件检索
URI: /_search?q=关键字
URI: /_search?q=name:关键字[ +|- sex:男]
指定某一姓名搜索关键字。+必须满足,- 必须不满足。
6.2.3.3 分页检索
URI: /_search?size=10&from=1
size 表示当前页面的记录数
from 表示页号,默认是0
6.3 封装SDK
SDK(Software Developement Kit) 软件开发环境
import json
from urllib.request import urlopen, Request
from urllib.parse import quote
from collections import namedtuple
Response = namedtuple('Response', ['code', 'json', 'content', 'text'])
class NotDocIdError(Exception):
def __init__(self):
super(NotDocIdError, self).__init__('文档的_id参数缺少')
6.3.1 索引操作
class ESStorage:
def __init__(self, host, port=9200):
self.base_ = f'http://{host}:{port}'
self.headers = {
'Content-Type': 'application/json;charset=utf-8'
}
def __response__(self, resp):
bytes = resp.read() # content
text = bytes.decode('utf-8')
json_ = json.loads(text)
return Response(resp.code, json_, bytes, text)
def __request__(self, *args, **kwargs):
if kwargs.get('data'):
# 如果请求中带有数据时,设置数据类型
kwargs.setdefault('headers', self.headers)
return urlopen(Request(*args, **kwargs))
def create_index(self, index_name, shared=5, replicas=1):
uri = self.base_ + f'/{index_name}/'
data_ = json.dumps(
{
"settings": {
"number_of_shards": shared,
"number_of_replicas": replicas
}
}
)
return self.__response__(self.__request__(uri, data=data_.encode('utf-8'), method='PUT'))
def delete_index(self, index_name):
uri = self.base_ + f'/{index_name}/'
return self.__response__(urlopen(Request(uri, method="DELETE")))
6.3.2 文档操作
class ESStorage:
# ...
def add_doc(self, index_, type_, **data):
_id = data.pop('_id', None)
uri = self.base_ + f'/{index_}/{type_}/' + (f'{_id}/' if _id else '')
data_ = json.dumps(data).encode('utf-8')
return self.__response__(self.__request__(uri, data=data_, method='POST'))
def update_doc(self, index_, type_, **data):
_id = data.pop('_id', None)
if _id is None:
raise NotDocIdError()
uri = self.base_ + f'/{index_}/{type_}/{_id}/'
data_ = json.dumps(data).encode('utf-8')
return self.__response__(self.__request__(uri, data=data_, method='PUT'))
def get_doc(self, index_, type_, id_):
uri = self.base_ + f'/{index_}/{type_}/{id_}/'
return self.__response__(self.__request__(uri, method='GET'))
def delete_doc(self, index_, type_, id_):
uri = self.base_ + f'/{index_}/{type_}/{id_}/'
return self.__response__(self.__request__(uri, method='DELETE'))
def query(self, kw, page=0, size=5, index_=None):
# quote作用: 将中文转成url编码
uri = self.base_ + (f'/{index_}/' if index_ else '') + f'_search?q={quote(kw)}&from={page}&size={size}'
return self.__response__(self.__request__(uri, method='GET'))
6.3.3 搜索操作
class ESStorage:
# ...
def query(self, kw, page=0, size=5, index_=None):
# quote作用: 将中文转成url编码
uri = self.base_ + (f'/{index_}/' if index_ else '') + f'_search?q={quote(kw)}&from={page}&size={size}'
return self.__response__(self.__request__(uri, method='GET'))
6.3.4测试
#!/usr/bin/python3
# coding: utf-8
from unittest import TestCase
from es_ import ESStorage, Response
class TestES(TestCase):
@classmethod
def setUpClass(cls):
cls.es = ESStorage('47.105.137.19', 80)
@classmethod
def tearDownClass(cls):
cls.es = None
def test_add_index(self):
resp:Response = self.es.create_index('stu_index1')
print(resp.json)
print(resp.code)
print(resp.content)
print(resp.text)
self.assertEqual(resp.code, 200)
def test_del_index(self):
resp = self.es.delete_index('stu_index1')
print(resp.text)
self.assertTrue(resp.json.get('acknowledged'))
6.4 MySQL数据同步ES
#!/usr/bin/python3
# coding: utf-8
# 同步ES和MySQL
import time
from datetime import datetime
from decimal import Decimal
from pymysql import connect
from pymysql.cursors import DictCursor
from es_ import ESStorage
# MySQL 8.0.19 出现连接问题 sha256_password()加密问题
config = {
'host': '59.110.22.243',
'port': 3306,
'user': 'root',
'password': '242634',
'charset': 'utf8',
'db': 'stu',
'cursorclass': DictCursor
}
conn = connect(**config)
print('--OK--')
es = ESStorage('47.105.137.19', 80)
def async_table(table_name, pk='id'):
sql = f'select * from {table_name}'
try:
es.delete_index(f'{table_name}_idx')
except:
pass
es.create_index(f'{table_name}_idx')
with conn as c:
c.execute(sql)
for row in c.fetchall():
# row 是一个dict对象
if pk:
row['_id'] = row.pop(pk)
# 考虑特殊字段类型的问题:1. datatime 2. Decimal 双精度,类似于java和c中的double
row = dict(list(map(lambda item: (item[0],
item[1].strftime('%Y-%m-%d') if isinstance(item[1], datetime) else float(
item[1]) if isinstance(item[1], Decimal) else item[1]), row.items())))
es.add_doc(f'{table_name}_idx', f'{table_name}', **row)
print(f'同步 {table_name} 成功')
if __name__ == '__main__':
for name, pk in [('student', 'sn'), ('tearch', 'tn'), ('cource', 'cn'), ('score', None)]:
async_table(name, pk)
time.sleep(0.2)
6.5 扩展:解决sha256_password问题
由于MySQL5.x和MySQL8.*的加密规则不同,因此在使用pymysql连接数据库时,会存在密码不正确问题,以下是解决办法:
> use mysql
> alter user 'root'@'%' identified by 'root' password expire never;
> alter user 'root'@'%' identified with mysql_native_password by 'root';
day10: 默写
1.写出pymysql.connect()方法的常用参数
- host
- port
- user
- passwd / password
- db/database
- charset
- cursor: pymysql.cursors.Cursor/DictCursor
- autocommit: False
2.写出pymsql的Cursor对象的方法与属性
- execute(sql, args)
args是tuple或list时,填充sql语句中的%s占位符
args是dict时,填充sql语句的%(name)s占位符
- fetchall()
- fetone()
- rowcount
- callproc(name, args) 调用存储过程(procedure)
- close
3.简述哪些对象可以使用with关键字
- stream文件流对象
- Condition 线程的条件变量对象
- Lock线程锁对象
- Connection数据库连接类的对象
- 实现了__enter__和__exit__两方法的类的对象
更多推荐
所有评论(0)