先看一下效果:

毕竟是免费接口,效果可想而知  :(

之前写过一个单线程的,代码也扔在这了,需要的自取。

单线程版本代码下载:https://github.com/leizhang9527/speech_recognition/tree/master/Single%20thread

多线程版本代码下载:https://github.com/leizhang9527/speech_recognition/tree/master/Multi-thread

直接上代码:

一、ui界面

# -*- coding: utf-8 -*-

# Form implementation generated from reading ui file 'main.ui'
#
# Created by: PyQt4 UI code generator 4.11.4
#
# WARNING! All changes made in this file will be lost!

from PyQt4 import QtCore, QtGui

try:
    _fromUtf8 = QtCore.QString.fromUtf8
except AttributeError:
    def _fromUtf8(s):
        return s

try:
    _encoding = QtGui.QApplication.UnicodeUTF8
    def _translate(context, text, disambig):
        return QtGui.QApplication.translate(context, text, disambig, _encoding)
except AttributeError:
    def _translate(context, text, disambig):
        return QtGui.QApplication.translate(context, text, disambig)

class Ui_MainWindow(object):
    def setupUi(self, MainWindow):
        MainWindow.setObjectName(_fromUtf8("MainWindow"))
        MainWindow.setEnabled(True)
        MainWindow.resize(556, 439)
        icon = QtGui.QIcon()
        icon.addPixmap(QtGui.QPixmap(_fromUtf8("1.ico")), QtGui.QIcon.Normal, QtGui.QIcon.Off)
        MainWindow.setWindowIcon(icon)
        MainWindow.setAutoFillBackground(True)
        MainWindow.setDocumentMode(True)
        MainWindow.setDockNestingEnabled(True)
        MainWindow.setUnifiedTitleAndToolBarOnMac(True)
        self.centralWidget = QtGui.QWidget(MainWindow)
        self.centralWidget.setObjectName(_fromUtf8("centralWidget"))
        self.pushButton = QtGui.QPushButton(self.centralWidget)
        self.pushButton.setGeometry(QtCore.QRect(80, 360, 93, 28))
        self.pushButton.setObjectName(_fromUtf8("pushButton"))
        self.textEdit = QtGui.QTextEdit(self.centralWidget)
        self.textEdit.setGeometry(QtCore.QRect(50, 40, 451, 281))
        self.textEdit.setMouseTracking(True)
        self.textEdit.setAutoFillBackground(True)
        self.textEdit.setStyleSheet(_fromUtf8("background-color: rgb(246, 255, 221);"))
        self.textEdit.setLineWidth(0)
        self.textEdit.setReadOnly(True)
        self.textEdit.setObjectName(_fromUtf8("textEdit"))
        self.pushButton_2 = QtGui.QPushButton(self.centralWidget)
        self.pushButton_2.setGeometry(QtCore.QRect(360, 360, 93, 28))
        self.pushButton_2.setObjectName(_fromUtf8("pushButton_2"))
        self.pushButton_3 = QtGui.QPushButton(self.centralWidget)
        self.pushButton_3.setGeometry(QtCore.QRect(220, 360, 93, 28))
        self.pushButton_3.setObjectName(_fromUtf8("pushButton_3"))
        self.textEdit.raise_()
        self.pushButton.raise_()
        self.pushButton_2.raise_()
        self.pushButton_3.raise_()
        MainWindow.setCentralWidget(self.centralWidget)

        self.retranslateUi(MainWindow)
        QtCore.QMetaObject.connectSlotsByName(MainWindow)

    def retranslateUi(self, MainWindow):
        MainWindow.setWindowTitle(_translate("MainWindow", "       语音识别", None))
        self.pushButton.setText(_translate("MainWindow", "开始录音", None))
        self.textEdit.setHtml(_translate("MainWindow", "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0//EN\" \"http://www.w3.org/TR/REC-html40/strict.dtd\">\n"
"<html><head><meta name=\"qrichtext\" content=\"1\" /><style type=\"text/css\">\n"
"p, li { white-space: pre-wrap; }\n"
"</style></head><body style=\" font-family:\'SimSun\'; font-size:9pt; font-weight:400; font-style:normal;\">\n"
"<p style=\"-qt-paragraph-type:empty; margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px;\"><br /></p></body></html>", None))
        self.pushButton_2.setText(_translate("MainWindow", "停止录音", None))
        self.pushButton_3.setText(_translate("MainWindow", "清空内容", None))

二、核心代码:

# -*- coding: utf-8 -*-

"""
多线程版本的语音识别系统
Author:Decadence
Email:leizhang9527@163.com
"""

import Queue
import base64
import json
import threading
import time
import wave
import sys
from time import ctime

from urllib2 import urlopen
from urllib2 import Request
from urllib2 import URLError
from urllib import urlencode


import numpy as np

from PyQt4 import QtGui
from PyQt4.QtCore import pyqtSignature
from PyQt4.QtGui import QMainWindow


from main import Ui_MainWindow
from pyaudio import PyAudio,paInt16


class DemoError(Exception):
    pass


class MyThread (threading.Thread):
    def __init__(self,func, args, name = ''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        print self.name + "开始线程:" + ctime()
        self.res = apply(self.func)
        print self.name + "退出线程:" + ctime()

class MainWindow(QMainWindow, Ui_MainWindow,threading.Thread):
    def __init__(self, parent=None):
        QMainWindow.__init__(self, parent)
        reload(sys)
        sys.setdefaultencoding('utf8')
        threading.Thread.__init__(self)
        self.name = self.voise_tts.__name__
        self.setupUi(self)

        self.CUID = '123456PYTHON'
        self.RATE = 16000  # 采样率固定值
        self.DEV_PID = 1537  # 1537 表示识别普通话,使用输入法模型。1536表示识别普通话,使用搜索模型。
        self.ASR_URL = 'http://vop.baidu.com/server_api'
        self.SCOPE = 'audio_voice_assistant_get'  # 有此scope表示有asr能力
        self.TOKEN_URL = 'http://openapi.baidu.com/oauth/2.0/token'
        self.chunk = 1024
        self.NUM_SAMPLES = 2000  # 采样点
        self.channels = 1  # 一个声道
        self.sampwidth = 2  # 两个字节十六位

        self.API_KEY = 'GkegucnW3b559fCPZ8eQOkjK'
        self.SECRET_KEY = 'mhinZuFI05jx2K3dFv8yYWnsPxU93tTT'

        self.TIME = 5  # 条件变量,可以设置定义录音的时间
        self.Level = 600
        self.mute_count_limit = 50
        self.mute_begin = 0
        self.mute_end = 1
        self.not_mute = 0
        self.voice_queue = Queue.Queue(1024)
        self.wav_queue = Queue.Queue(1024)
        self.file_name_index = 1
        self.thread_flag = 0
        self.start_flag = 1


    def getResult(self):
        return self.res

    def run(self):
        t = threading.currentThread()  # 获取当前子线程对象
        print self.name + "开始线程:" + ctime()
        self.res = apply(self.voise_tts)
        print self.name + "退出线程:" + ctime()

    def writeQ(self, queue, data):
        queue.put(data, 1)

    def readQ(self, queue):
        val = queue.get(1)
        return val

    def save_wave_file(self, filename, data):
        wf = wave.open(filename, 'wb')  # 二进制写入模式
        wf.setnchannels(self.channels)
        wf.setsampwidth(self.sampwidth)  # 两个字节16位
        wf.setframerate(self.RATE)  # 帧速率
        wf.writeframes(b"".join(data))  # 把数据加进去,就会存到硬盘上去wf.writeframes(b"".join(data))
        wf.close()

    # 录音模块
    def record_wave(self):
        while self.start_flag == 1:
            pa = PyAudio()
            stream = pa.open(format = paInt16, channels = 1,
                             rate = self.RATE, input = True,
                             frames_per_buffer = self.NUM_SAMPLES)
            save_buffer = []
            count = 0
            print '当前正在录音(同时录制系统内部和麦克风的声音)……'
            while count < self.TIME * 30:
                string_audio_data = stream.read(self.NUM_SAMPLES)
                audio_data = np.fromstring(string_audio_data, dtype = np.short)
                large_sample_count = np.sum(audio_data > self.Level)
                print large_sample_count
                if large_sample_count < self.mute_count_limit:  # 静音
                    self.mute_begin = 1
                else:
                    save_buffer.append(string_audio_data)
                    self.mute_begin = 0
                    self.mute_end = 1
                count += 1
                if (self.mute_end - self.mute_begin) > 9:  # 留白时间过长
                    self.mute_begin = 0
                    self.mute_end = 1
                    break
                if self.mute_begin:
                    self.mute_end += 1
                print '...'

            save_buffer = save_buffer[:]
            if save_buffer:
                if self.file_name_index < 11:
                    pass
                else:
                    self.file_name_index = 1
                filename = str(self.file_name_index) + '.wav'
                self.save_wave_file(filename = filename, data = save_buffer)
                self.writeQ(queue = self.wav_queue, data = filename)
                self.file_name_index += 1
                print filename + "已经保存"
            else:
                print "文件未保存"
            # self.save_wave_file(filename, my_buf)
            save_buffer = []
            stream.close()

    # 获取token
    def fetch_token(self):
        params = {'grant_type': 'client_credentials',
                  'client_id': self.API_KEY,
                  'client_secret': self.SECRET_KEY}
        post_data = urlencode(params)
        post_data = post_data.encode('utf-8')
        req = Request(self.TOKEN_URL, post_data)
        try:
            f = urlopen(req)
            result_str = f.read()
        except URLError as err:
            print 'token http response http code : ' + str(err.code)
            result_str = err.read()
        result_str = result_str.decode()

        print result_str
        result = json.loads(result_str)
        print result

        if ('access_token' in result.keys() and 'scope' in result.keys()):
            print self.SCOPE
            if self.SCOPE and (not self.SCOPE in result['scope'].split(' ')):  # SCOPE = False 忽略检查
                raise DemoError('scope is not correct')
            print 'SUCCESS WITH TOKEN: %s  EXPIRES IN SECONDS: %s' % (result['access_token'], result['expires_in'])
            return result['access_token']
        else:
            raise DemoError(
                'API_KEY或者SECRET_KEY不正确,请重新设置')

    # 识别模块
    def speech_recognition(self):
        # qRegisterMetaType < s_MSGBoxInfo > ("QTextCursor");
        while True:
            if self.wav_queue.qsize():
                filename = self.readQ(queue = self.wav_queue)
            else:
                continue
            token = self.fetch_token()
            speech_data = []
            with open(filename, 'rb') as speech_file:
                speech_data = speech_file.read()

            length = len(speech_data)
            if length == 0:
                raise DemoError('file %s length read 0 bytes' % filename)
            speech = base64.b64encode(speech_data)
            # speech = str(speech, 'utf-8')
            # speech = str(speech,'')
            params = {'dev_pid': self.DEV_PID,
                      'format': filename[-3:] ,
                      'rate': self.RATE,
                      'token': token,
                      'cuid': self.CUID,
                      'channel': 1,
                      'speech': speech,
                      'len': length
                      }
            post_data = json.dumps(params, sort_keys=False)
            print post_data
            req = Request(self.ASR_URL, post_data.encode('utf-8'))
            req.add_header('Content-Type', 'application/json')
            timer = time.clock
            try:
                begin = timer()
                f = urlopen(req)
                result_str = f.read()
                print "Request time cost %f" % (timer() - begin)
            except URLError as err:
                print 'asr http response http code :' + str(err.code)
                result_str = err.read()

            # resultstr = str(result_str)
            # result_str = resultstr.decode( 'utf-8')
            # print result_str
            # print type(result_str)
            my_temp = json.loads(result_str)
            err = my_temp['err_no']
            print my_temp['err_no']
            if my_temp['err_no']:
                if my_temp['err_no'] == 3300:
                    text = '输入参数不正确'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3301:
                    text = '音频质量过差'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3302:
                    text = '鉴权失败'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3303:
                    text = '语音服务器后端问题'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3304:
                    text = '用户的请求QPS超限'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3305:
                    text = '用户的日pv(日请求量)超限'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3307:
                    text = '语音服务器后端识别出错问题'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3308:
                    text = '音频过长'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3309:
                    text = '音频数据问题'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3310:
                    text = '输入的音频文件过大'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3311:
                    text = '采样率rate参数不在选项里'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3312:
                    text = '音频格式format参数不在选项里'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3313:
                    text = '语音服务器解析超时'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3314:
                    text = '音频长度过短'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3315:
                    text = '语音服务器处理超时'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
                elif my_temp['err_no'] == 3316:
                    text = '音频转为pcm失败'
                    text1 = str(text)
                    text = text1.decode('utf-8')
                    print text
                    self.textEdit.append("语音识别出现错误,错误码是: %f,原因是:" % my_temp['err_no'] + text)
            else:
                word = my_temp['result']
                print word[0]
                with open("result.txt", "a") as of:
                    if of is None:
                        of.write(word[0])
                    else:
                        of.write('\n')
                        of.write(word[0])
                    self.textEdit.append(word[0])
            time.sleep(0.1)

    def voise_tts(self):
        self.speech_recognition()

    @pyqtSignature("",)
    def on_pushButton_clicked(self):
        if self.thread_flag == 0:
            self.start_flag = 1
            record = MyThread(self.record_wave,(self,),self.record_wave.__name__)
            record.setDaemon(True)
            record.start()
            self.thread_flag =1
        print '开始识别'

    @pyqtSignature("",)
    def on_pushButton_2_clicked(self):
        if self.thread_flag == 1:
            self.start_flag = 0
            self.thread_flag = 0
        print'停止识别'

    @pyqtSignature("", )
    def on_pushButton_3_clicked(self):
        self.textEdit.clear()
        print'清空内容'



if __name__ == "__main__":
    import sys
    app = QtGui.QApplication(sys.argv)
    ui = MainWindow()
    ui.setDaemon(True)
    ui.start()
    ui.show()
    sys.exit(app.exec_())

 

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐