前一段时间用树莓派做了一个语音识别控制的智能垃圾箱,通过麦克风获取语音然后送到百度进行识别,根据返回的结果控制四个舵机(开启、关闭垃圾箱)或发出语音提示。

声明:语音识别的方法、流程是根据百度的例程改的!

整个工程的架构如下:

其中主程序voiceToControl:

# coding=utf-8

import sys
import json
import time
import os
import RPi.GPIO as GPIO
reload(sys)
sys.setdefaultencoding('utf8')

IS_PY3 = sys.version_info.major == 3

if IS_PY3:
    from urllib.request import urlopen
    from urllib.request import Request
    from urllib.error import URLError
    from urllib.parse import urlencode

    timer = time.perf_counter
else:
    import urllib2
    from urllib2 import urlopen
    from urllib2 import Request
    from urllib2 import URLError
    from urllib import urlencode

    if sys.platform == "win32":
        timer = time.clock
    else:
        # On most other platforms the best timer is time.time()
        timer = time.time

#API_KEY  SECRET_KEY是在百度智能平台申请的
API_KEY = '****************************'
SECRET_KEY = '*************************'

# 需要识别的文件
#AUDIO_FILE = './audio/16k.pcm'  # 只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式
AUDIO_FILE = './test.pcm'
# 文件格式
FORMAT = AUDIO_FILE[-3:];  # 文件后缀只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式

CUID = '123456PYTHON';
# 采样率
RATE = 16000;  # 固定值

# 普通版

DEV_PID = 1537  # 1537 表示识别普通话,使用输入法模型。根据文档填写PID,选择语言及识别模型
ASR_URL = 'http://vop.baidu.com/server_api'
SCOPE = 'audio_voice_assistant_get'  # 有此scope表示有asr能力,没有请在网页里勾选,非常旧的应用可能没有

#测试自训练平台需要打开以下信息, 自训练平台模型上线后,您会看见 第二步:“”获取专属模型参数pid:8001,modelid:1234”,按照这个信息获取 dev_pid=8001,lm_id=1234
# DEV_PID = 8001 ;   
# LM_ID = 1234 ;

# 极速版 打开注释的话请填写自己申请的appkey appSecret ,并在网页中开通极速版(开通后可能会收费)

#DEV_PID = 80001
#ASR_URL = 'http://vop.baidu.com/pro_api'
#SCOPE = 'brain_enhanced_asr'  # 有此scope表示有asr能力,没有请在网页里开通极速版

# 忽略scope检查,非常旧的应用可能没有
# SCOPE = False


# 极速版

class DemoError(Exception):
    pass


"""  TOKEN start """

TOKEN_URL = 'http://openapi.baidu.com/oauth/2.0/token'


def fetch_token():
    params = {'grant_type': 'client_credentials',
              'client_id': API_KEY,
              'client_secret': SECRET_KEY}
    post_data = urlencode(params)
    if (IS_PY3):
        post_data = post_data.encode('utf-8')
    req = Request(TOKEN_URL, post_data)
    try:
        f = urlopen(req)
        result_str = f.read()
    except URLError as err:
        print('token http response http code : ' + str(err.code))
        result_str = err.read()
    if (IS_PY3):
        result_str = result_str.decode()
    #print("---------result_str--------")
    #print(result_str)
    
    result = json.loads(result_str)
    #print("hello baidu voice")
    #print(result)
    if ('access_token' in result.keys() and 'scope' in result.keys()):
        if SCOPE and (not SCOPE in result['scope'].split(' ')):  # SCOPE = False 忽略检查
            raise DemoError('scope is not correct')
        #print('SUCCESS WITH TOKEN: %s ; EXPIRES IN SECONDS: %s' % (result['access_token'], result['expires_in']))
        return result['access_token']
    else:
        raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')


"""  TOKEN end """
def get_audio():
    os.system('arecord -Dplughw:1,0 -fS16_LE -c1 -r16000 -traw test.pcm')
#判断是否有人
def hasPeople():
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(12,GPIO.IN)
    return GPIO.input(12)
def get_result():
    token = fetch_token()

    """
    httpHandler = urllib2.HTTPHandler(debuglevel=1)
    opener = urllib2.build_opener(httpHandler)
    urllib2.install_opener(opener)
    """

    speech_data = []
    with open(AUDIO_FILE, 'rb') as speech_file:
        speech_data = speech_file.read()
    length = len(speech_data)
    if length == 0:
        raise DemoError('file %s length read 0 bytes' % AUDIO_FILE)

    params = {'cuid': CUID, 'token': token, 'dev_pid': DEV_PID}
    #测试自训练平台需要打开以下信息
    #params = {'cuid': CUID, 'token': token, 'dev_pid': DEV_PID, 'lm_id' : LM_ID}
    params_query = urlencode(params);

    headers = {
        'Content-Type': 'audio/' + FORMAT + '; rate=' + str(RATE),
        'Content-Length': length
    }

    url = ASR_URL + "?" + params_query
    #print("url is", url);
    #print("header is", headers)
    # print post_data
    req = Request(ASR_URL + "?" + params_query, speech_data, headers)
    try:
        begin = timer()
        f = urlopen(req)
        result_str = f.read()
        print("Request time cost %f" % (timer() - begin))
    except  URLError as err:
        print('asr http response http code : ' + str(err.code))
        result_str = err.read()

    if (IS_PY3):
        result_str = str(result_str, 'utf-8')
   

    result = json.loads(result_str)
   
    result=result['result'][0]
    return result
def voiceControlMotor():
    print("please input your voice command")
    os.system('aplay -Dplughw:0,0 ./audio/提示音.wav')
    os.system("arecord -Dplughw:1,0 -fS16_LE -c1 -r16000 -twav test.pcm -d59")
    print("Done!")
    result=get_result()
    print(result)
    if(rubbish.has_key(result)):
        #'result'][0].replace('。','');
        #print(result)
        result=rubbish[result]

        print(result)
        if '其他垃圾'==result:
            os.system('aplay -Dplughw:0,0 ./audio/其他垃圾.wav ')
            os.system('sudo ./controlMotor 20 10 &')

        elif '厨余垃圾'==result:
            os.system('aplay -Dplughw:0,0 ./audio/厨余垃圾.wav ')
            os.system('sudo ./controlMotor 21 10 &')
        elif '有害垃圾'==result:
            os.system('aplay -Dplughw:0,0 ./audio/有害垃圾.wav ')

            os.system('sudo ./controlMotor 22 10 &')
        else:# '可回收垃圾'==result
            os.system('aplay -Dplughw:0,0 ./audio/可回收垃圾.wav ')
            os.system('sudo ./controlMotor 23 10 &')
    else:
        #print(result)
        print('我没听清,请再说一遍!')
        os.system('aplay -Dhw:0,0 ./audio/没听清.wav ');

    return  hasPeople()

def mainLoop():
    #判断是否有人,否则等待
    while hasPeople()==False:
        pass;
    #有人,发出提示音
    os.system('aplay -Dplughw:0,0 ./audio/welcome.wav')
    #识别语音信息,并控制电机
    while voiceControlMotor()==False:
        pass;
    os.system('aplay -Dplughw:0,0 ./audio/bye.wav')  

if __name__ == '__main__':
    rate1 = open('rubbish.txt', 'r')
    rubbish = dict()
    for line in rate1:
        line = line.strip().split(' ')
        data=line[0].decode('utf-8')
        rubbish[data]=line[1]
    while True:
        mainLoop()

这里语音的采集直接用arecord,播放用aplay,并没有采用python的库,因为这样比较方便,也不需要学新的东西(就是懒呗),控制舵机是自己用C实现了一个程序(因为C比较熟悉),用系统功能调用的方式还能实现后台控制(爽)!可以多次识别,不需要等着一个动作执行完再执行下一个动作!另外还采用热释电用于人的检测。

至于舵机的控制直接采用C语言,采用wiringPi控制,代码如下(controlMotor):

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
 
#include <wiringPi.h>
#include<softPwm.h> 
 int pin[40]={
//G0~G9---扩展板名称(BCM)
30,31,8,9,7,21,22,11,10,13,//wiringPi名称
//G10~G19
12,14,26,//10
23,15,16,//13
27,0,1,24,//16
//G20~G29
28,29,3,//20
10,4,6,//23
25,2,0,0,//26
//G30~G39
0,0,0,0,0,0,0,0,0,0
};

//舵机有三条线,棕红黄。棕色线接地,红色接5V正电压,黄色接信号线。
int main(int arg, char **argv)
{

 int pwmPin = 18;
 int time=10;
   if(arg<3){
      printf("\nFormat:./control pwmPin time\n");
      exit(-1);
    }else{
      pwmPin=pin[atoi(argv[1])];
      time=atoi(argv[2]);
    }
    
   printf("port=%d,time=%d\n",pwmPin,time);

   if(wiringPiSetup() == -1){ //when initialize wiring failed, print message to screen
		printf("setup wiringPi failed !");
		return 1; 
   }
   softPwmCreate(pwmPin,  0, 200);

   printf("\nstart motor:%d\n",pwmPin);
 
   softPwmWrite(pwmPin,   20);

  
   delay(1000*time);//延时10S
   printf("\nstop motor:%d\n",pwmPin);
   softPwmWrite(pwmPin,   5);
   delay(1000);//延时10S
    

    return 0;
}

垃圾的信息保存在一个文件(rubbish.txt)中:

剩菜剩饭。 厨余垃圾
过期食品。 厨余垃圾
菜叶。 厨余垃圾
鸡蛋壳。 厨余垃圾
玻璃。 可回收垃圾 
塑料。 可回收垃圾 
饮料瓶。 可回收垃圾 
衣服。 可回收垃圾
废旧报纸。 可回收垃圾
电池。 有害垃圾 
502胶水。 有害垃圾 
药品。 有害垃圾 
电灯泡。 有害垃圾 
油漆桶。 有害垃圾
灯管。 有害垃圾
塑料袋。 其他垃圾
便利贴。 其他垃圾
被污染的餐巾纸。 其他垃圾 
清凉油。 其他垃圾 
陶瓷盆。 其他垃圾 

至于垃圾名称后有一个句号是因为百度每次识别返回的结果带着句号,不想费事编代码了,直接在垃圾信息中加句号吧。

整个系统不复杂,主要碰到两个坑:

1.在X宝上花了几块钱买了一个据说能采集语音也能播放的usb声卡,结果不能采集,全是噪音,整个人感觉不好了,以为自己在哪错了,查了几天才定位问题,当即下单买了个好的,第二天到货,功能立刻调通!真不能过分省钱!

2.树莓派的wiringPi没有仔细看,一开始以为所有的管脚都可以实现硬pwm,浪费了一个下午的时间,后来将说明书拿来一看,只有一个管脚可以,要实现4个舵机的pwm控制,要采用软pwm,赶快改!

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐