前言:
今天给你们分享一个有趣的小项目,话说在好久之前我须要作一个报告,原来的屏幕共享软件出了点问题,因而临时写了这个程序来应急,效果还不错,今天分享给你们,若是遇到某些特殊状况,能够用来应急html

1、使用环境

Pillow 6.0.0
Opencv-Python 4.1.1.26
flask 1.1.1

2、项目介绍
其实也不是很难,以前我使用 UDP广播的共享屏幕, 如今咱们用 Python 有一个库加 Pillow 能够获取屏幕, 能够实现截图效果, 将图片转换成字节流, 可是 Pillow 这个库有一个缺点,若是屏幕使用了缩放他是按比例截取的,等于说你获取的屏幕图片不是完整的图片,接下来我介绍一个如何使用 Python 截取屏幕并转换为字节流 :python

Pillow

from PIL import ImageGrab
img = ImageGrab.grab()
img.save(“a.png”)

pillow 在屏幕缩放状况下没法截取全屏web

pyautogui

import pyautogui

img = pyautogui.screenshot()
img.save(“a.png”)

pyautogui 在屏幕缩放的状况下也能够获取全屏flask

图片转换为字节流

import pyautogui from io
import BytesIO

img = pyautogui.screenshot()

output_buffer = BytesIO() # 建立二进制对象

img.save(output_buffer, format=‘JPEG’, quality=100) # quality提高图片分辨率

frame = output_buffer.getvalue() # 获取二进制数据

print(frame)

2、动手实践
建立一个新的文件夹 (跟我一块儿作吧)
在文件夹里再建立一个 templates 文件夹
在 templates 建立一个 index.html
建立 app.py camera_pil.py base_camera.py 三个空的Python 文件
在这里插入图片描述
建立完成后将以下代码粘贴到 index.html

<html>
<head>
    <title>屏幕共享</title>
</head>
<body>
<h4>屏幕共享</h4>
<img src="{{ url_for('video_feed') }}">
</body>
</html>

而后再将以下代码粘贴到: base_camera.py

import time
import threading
try:
    from greenlet import getcurrent as get_ident
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
        from _thread import get_ident


class CameraEvent(object):
    def __init__(self):
        self.events = {}

    def wait(self):
        ident = get_ident()
        if ident not in self.events:
            self.events[ident] = [threading.Event(), time.time()]
        return self.events[ident][0].wait()

    def set(self):
        now = time.time()
        remove = None
        for ident, event in self.events.items():
            if not event[0].isSet():
                event[0].set()
                event[1] = now
            else:
                if now - event[1] > 5:
                    remove = ident
        if remove:
            del self.events[remove]

    def clear(self):
        self.events[get_ident()][0].clear()


class BaseCamera(object):
    thread = None
    frame = None
    last_access = 0
    event = CameraEvent()

    def __init__(self):
        if BaseCamera.thread is None:
            BaseCamera.last_access = time.time()

            BaseCamera.thread = threading.Thread(target=self._thread)
            BaseCamera.thread.start()

            while self.get_frame() is None:
                time.sleep(0)

    def get_frame(self):
        BaseCamera.last_access = time.time()

        BaseCamera.event.wait()
        BaseCamera.event.clear()

        return BaseCamera.frame

    @staticmethod
    def frames():
        raise RuntimeError('Must be implemented by subclasses.')

    @classmethod
    def _thread(cls):
        print('Starting camera thread.')
        frames_iterator = cls.frames()
        for frame in frames_iterator:
            BaseCamera.frame = frame
            BaseCamera.event.set()
            time.sleep(0)
            if time.time() - BaseCamera.last_access > 10:
                frames_iterator.close()
                print('Stopping camera thread due to inactivity.')
                break
        BaseCamera.thread = None

再将以下代码粘贴到: camera_pil.py

from io import BytesIO
import cv2
from PIL import ImageGrab, Image
from base_camera import BaseCamera


class Camera(BaseCamera):
    video_source = 0

    @staticmethod
    def set_video_source(source):
        Camera.video_source = source

    @staticmethod
    def frames():
        camera = cv2.VideoCapture(Camera.video_source)
        if not camera.isOpened():
            raise RuntimeError('Error')

        while True:
            image = ImageGrab.grab()  # 获取屏幕数据
            # w, h = image.size
            image = image.resize((1366, 750), Image.ANTIALIAS)  # 图片缩放
            output_buffer = BytesIO()  # 建立二进制对象
            image.save(output_buffer, format='JPEG', quality=100)  # quality提高图片分辨率
            frame = output_buffer.getvalue()  # 获取二进制数据
            yield frame  # 生成器返回一张图片的二进制数据

再将以下代码粘贴到: app.py

import os
from importlib import import_module
from flask import Flask, render_template, Response

if os.environ.get('CAMERA'):
    Camera = import_module('camera_' + os.environ['CAMERA']).Camera
else:
    from camera_pil import Camera

app = Flask(__name__)


@app.route('/')
def index():
    """
    视图函数
    :return:
    """
    return render_template('index.html')


def gen(camera):
    """
    流媒体发生器
    """
    while True:
        frame = camera.get_frame()

        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')


@app.route('/video_feed')
def video_feed():
    """流媒体数据"""
    return Response(gen(Camera()),
                    mimetype='multipart/x-mixed-replace; boundary=frame')


if __name__ == '__main__':
    app.run(threaded=True, host="127.0.0.1", port=80)

运行 app.py ,而后打开你的浏览器输入: http://127.0.0.1/ 试试看吧!
若是要使用的话, 请把 host=“127.0.0.1” 改成 host=“0.0.0.0” 而后运行屏幕共享服务器, 查看本身以太网的 ip 告诉给接收者,让他们使用浏览器访问你的 ip 地址,就能够实现屏幕共享了。

原文地址:https://blog.csdn.net/qq_42768234/article/details/104301603

Logo

致力于链接即构和开发者,提供实时互动和元宇宙领域的前沿洞察、技术分享和丰富的开发者活动,共建实时互动世界。

更多推荐