使用python和opencv实现简单的网络摄像头


这个网络摄像头需要一个客户端和服务端,客户端使用opencv捕捉摄像头,通过socket将图片流传输到服务端,服务端接受图片流,使用HTTPServer渲染生成一个简单的网页,将图片流用mjpeg的方式传输到浏览器中,这样就可以直接用浏览器来查看摄像头视频了。主要的实现方式就是这样的,下面详细介绍下.

安装依赖

首先需要在客户端安装opencv,安装方式可以Google下,这里就不再赘述了,服务端需要安装pillownumpy这两个模块。

实现代码

首先是客户端的代码:

import io
import socket
import struct
import time
import datetime
import cv2

client_socket = socket.socket()
client_socket.connect(('服务端ip地址或域名', 8002))

connection = client_socket.makefile('wb')
try:
    #打开摄像头
    cap = cv2.VideoCapture(0)
    while (1):
        #读取图片
        ret, frame = cap.read()
        # cv2.imshow("capture", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        #转换为jpg格式
        img_str = cv2.imencode('.jpg', frame)[1].tostring()
        #获得图片长度
        s = struct.pack('<L', len(img_str))
        # print(s)
        #将图片长度传输到服务端
        connection.write(s)
        connection.flush()
        # 传输图片流
        connection.write(img_str)
        connection.flush()

except Exception as e:
    print(e)
finally:
    connection.close()
    client_socket.close()

简单说明下,将代码中的服务端ip地址或域名修改为你自己的,或者本地地址也可以。 下面是服务端的代码:

import socket
import struct
from PIL import Image
import numpy
import io
import logging
import socketserver
from threading import Condition
from http import server

server_socket = socket.socket()
# 绑定socket通信端口
server_socket.bind(('0.0.0.0', 8002))
server_socket.listen(0)

connection = server_socket.accept()[0].makefile('rb')

PAGE = """\
<html>
<head>
<title>camera MJPEG streaming demo</title>
</head>
<body>
<h1>PiCamera MJPEG Streaming Demo</h1>
<img src="stream.mjpg" width="640" height="480" />
</body>
</html>
"""


class StreamingOutput(object):
    def __init__(self):
        self.frame = None
        self.buffer = io.BytesIO()
        self.condition = Condition()

    def write(self, buf):
        if buf.startswith(b'\xff\xd8'):
            self.buffer.truncate()
            with self.condition:
                self.frame = self.buffer.getvalue()
                self.condition.notify_all()
            self.buffer.seek(0)
        return self.buffer.write(buf)


class StreamingHandler(server.BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/':
            self.send_response(301)
            self.send_header('Location', '/index.html')
            self.end_headers()
        elif self.path == '/index.html':
            content = PAGE.encode('utf-8')
            self.send_response(200)
            self.send_header('Content-Type', 'text/html')
            self.send_header('Content-Length', len(content))
            self.end_headers()
            self.wfile.write(content)
        elif self.path == '/stream.mjpg':
            self.send_response(200)
            self.send_header('Age', 0)
            self.send_header('Cache-Control', 'no-cache, private')
            self.send_header('Pragma', 'no-cache')
            self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
            self.end_headers()
            try:
                while True:
                    #获得图片长度
                    image_len = struct.unpack('<L', connection.read(struct.calcsize('<L')))[0]
                    print(image_len)
                    if not image_len:
                        break

                    image_stream = io.BytesIO()
                    #读取图片
                    image_stream.write(connection.read(image_len))

                    image_stream.seek(0)
                    image = Image.open(image_stream)
                    cv2img = numpy.array(image, dtype=numpy.uint8)
                    imgbuffer = image_stream.getvalue()
                    #写入http响应
                    self.wfile.write(b'--FRAME\r\n')
                    self.send_header('Content-Type', 'image/jpeg')
                    self.send_header('Content-Length', len(imgbuffer))
                    self.end_headers()
                    self.wfile.write(imgbuffer)
                    self.wfile.write(b'\r\n')


            except Exception as e:
                logging.warning(
                    'errror streaming client %s: %s',
                    self.client_address, str(e))

        else:
            self.send_error(404)
            self.end_headers()


class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
    allow_reuse_address = True
    daemon_threads = True


output = StreamingOutput()

try:
    address = ('127.0.0.1', 8000)
    server = StreamingServer(address, StreamingHandler)
    server.serve_forever()
except Exception as e:
    print(e)

下面是我本地跑的时候的demo:

如果网络ok的话即时性还是蛮不错的,这样以后就可以随时看到不在家的时候家里的猫主子在干嘛了,只要在家里的树莓派上跑着客户端的程序,在我的vps上跑服务端的程序就可以了。