nodejs 链接 redis/mysql 连接池

redis是单线程作业,所以不管查询任务是由一个链接发来的还是多个链接发来的,redis是串行的执行。并通过当前的链接返回客户端。nodejs接受redis的返回后,不管是不是并行,都要等主线程空闲下来才能一个个处理服务器返回的数据。

再看mysql~

mysql不是单线程服务的,可以并行处理多个查询请求。

mysql会为每一个链接创建一个单独的线程查询。redis数据基本在内存中,mysql会大量的读取磁盘的I/O,多线程比较快。

但是nodejs是单线程的。但是它调用的I/O指令等是通过另外的线程做的,I/O指令完成后就给主线程小任务,回调函数。

nodejs主线程一个,但是IO线程会有多个。

nodejs 使用多个连接来连接mysql。多连接是需要连接池的,有连接池就避免了每次连接都要去创建销毁的消耗了。

综上:nodejs + mysql用线程池是没什么问题的。nodejs + redis只用单个连接就够。

所以有了连接管理模块,egg-redis。

redis性能

错误原因:redis client的业务代码以及redis client的I/O性能。

redis client采用的是单链接模式,底层采用的非阻塞网络I/O,

调优:pipeline,script

无依赖批量请求采用pipeline。

redis高性能体现在服务端处理能力,但瓶颈往往出现在客户端,因此增强客户端I/O能力与并发并行多客户端才是高并发解决方案。

ioredis

性能为中心,功能齐全的,支持Redis >= 2.6.12 and (Node.js >= 6).

npm install ioredis

basic usage

var Redis = require('ioredis');
var redis = new Redis();
 
redis.set('foo', 'bar');
redis.get('foo', function (err, result) {
  console.log(result);
});
 
// Or using a promise if the last argument isn't a function
redis.get('foo').then(function (result) {
  console.log(result);
});
 
// Arguments to commands are flattened, so the following are the same:
redis.sadd('set', 1, 3, 5, 7);
redis.sadd('set', [1, 3, 5, 7]);
 
// All arguments are passed directly to the redis server:
redis.set('key', 100, 'EX', 10);

Connect to Redis

new Redis()       // Connect to 127.0.0.1:6379
new Redis(6380)   // 127.0.0.1:6380
new Redis(6379, '192.168.1.1')        // 192.168.1.1:6379
new Redis('/tmp/redis.sock')
new Redis({
  port: 6379,          // Redis port
  host: '127.0.0.1',   // Redis host
  family: 4,           // 4 (IPv4) or 6 (IPv6)
  password: 'auth',
  db: 0
})

一个特别的连接方法:

// Connect to 127.0.0.1:6380, db 4, using password "authpassword":
new Redis('redis://:authpassword@127.0.0.1:6380/4')

pub/sub 发布/订阅

publish/subscribe

redis.on进行事件的监听

var Redis = require('ioredis');
var redis = new Redis();
var pub = new Redis();
redis.subscribe('news', 'music', function (err, count) {
  // 现在我们订阅了news和music两个频道
  // `count` 代表我们当前订阅的序号
 
  pub.publish('news', 'Hello world!');
  pub.publish('music', 'Hello again!');
});
 
redis.on('message', function (channel, message) {
  // 收到消息 Hello world! from channel news
  // 收到消息 Hello again! from channel music
  console.log('Receive message %s from channel %s', message, channel);
});

// 还有一个是事件叫做`messageBuffer`,和message是一样的
// 返回buffer,替代字符串
redis.on('messageBuffer', function (channel, message) {
  // Both `channel` and `message` are buffers.
});

PSUBSCRIBE发布

redis.psubscribe('pattern', function (err, count) {});
redis.on('pmessage', function (pattern, channel, message) {});
redis.on('pmessageBuffer', function (pattern, channel, message) {});

当客户端发出订阅或PSUBSCRIBE时,该连接将被放入“订阅”模式。此时,只有修改订阅集的命令才是有效的。当订阅集为空时,连接将返回常规模式。

如果在订阅模式下发送正常的命令,只会打开一个新的连接。

处理二进制数据:

redis.set('foo', Buffer.from('bar'));

拿到缓存中的数据:

redis.getBuffer('foo', function (err, result) {
  // result is a buffer.
});

管道 Pipelining

发送5条以上的命令,就可以使用管道的将命令放进内存的队列中,然后一起发送至redis,这个表现能提升百分之50-300.

这些命令在内存的队列李阿敏,并配合exec方法进行执行。

var pipeline = redis.pipeline();
pipeline.set('foo', 'bar');
pipeline.del('cc');
pipeline.exec(function (err, results) {
  // `err` 总是null, and `results` 响应的数组
  // corresponding to the sequence of queued commands.
  // Each response follows the format `[err, result]`.
});
 
// 甚至能够链式调用命令
redis.pipeline().set('foo', 'bar').del('cc').exec(function (err, results) {
});
 
// `exec` 也是返回的promise
var promise = redis.pipeline().set('foo', 'bar').get('foo').exec();
promise.then(function (result) {
  // result === [[null, 'OK'], [null, 'bar']]
});

每个链式命令也可以有一个回调,当命令得到回复时将调用它:

redis.pipeline().set('foo', 'bar').get('foo', function (err, result) {
  // result === 'bar'
}).exec(function (err, result) {
  // result[1][1] === 'bar'
});

除了单独向管道队列添加命令外,还可以将一组命令和参数传递给构造函数:

redis.pipeline([
  ['set', 'foo', 'bar'],
  ['get', 'foo']
]).exec(function () { /* ... */ });

length的属性:

const length = redis.pipeline().set('foo', 'bar').get('foo').length;
// length === 2

事务:

redis.multi().set('foo', 'bar').get('foo').exec(function (err, results) {
  // results === [[null, 'OK'], [null, 'bar']]
});

如果事务的命令链中存在语法错误(例如,参数数量错误,命令名称错误等),则不会执行任何命令,并返回错误:

redis.multi().set('foo').set('foo', 'new value').exec(function (err, results) {
  // err:
  //  { [ReplyError: EXECABORT Transaction discarded because of previous errors.]
  //    name: 'ReplyError',
  //    message: 'EXECABORT Transaction discarded because of previous errors.',
  //    command: { name: 'exec', args: [] },
  //    previousErrors:
  //     [ { [ReplyError: ERR wrong number of arguments for 'set' command]
  //         name: 'ReplyError',
  //         message: 'ERR wrong number of arguments for \'set\' command',
  //         command: [Object] } ] }
});

就接口而言,multi与管道的不同之处在于,在为每个链接命令指定回调时,排队状态将传递给回调而不是命令的结果:

redis.multi({ pipeline: false });
redis.set('foo', 'bar');
redis.get('foo');
redis.exec(function (err, result) {
  // result === [[null, 'OK'], [null, 'bar']]
});
redis.multi([
  ['set', 'foo', 'bar'],
  ['get', 'foo']
]).exec(function () { /* ... */ });

管道支持内联事务,这意味着您可以将管道中的命令子集分组到事务中:
redis.pipeline().get('foo').multi().set('foo', 'bar').get('foo').exec().get('foo').exec();

Lua脚本

ioredis支持所有脚本命令,例如EVAL,EVALSHA和SCRIPT。

ioredis公开了一个defineCommand方法,使脚本编写更容易使用:

var redis = new Redis();
 
// This will define a command echo:
redis.defineCommand('echo', {
  numberOfKeys: 2,
  lua: 'return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}'
});
 
// Now `echo` can be used just like any other ordinary command,
// and ioredis will try to use `EVALSHA` internally when possible for better performance.
redis.echo('k1', 'k2', 'a1', 'a2', function (err, result) {
  // result === ['k1', 'k2', 'a1', 'a2']
});
 
// `echoBuffer` is also defined automatically to return buffers instead of strings:
redis.echoBuffer('k1', 'k2', 'a1', 'a2', function (err, result) {
  // result[0] equals to Buffer.from('k1');
});
 
// And of course it works with pipeline:
redis.pipeline().set('foo', 'bar').echo('k1', 'k2', 'a1', 'a2').exec();

如果在定义命令时无法确定键的数量,则可以省略numberOfKeys属性,并在调用命令时将键数作为第一个参数传递:

redis.defineCommand('echoDynamicKeyNumber', {
  lua: 'return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}'
});
 
// Now you have to pass the number of keys as the first argument every time
// you invoke the `echoDynamicKeyNumber` command:
redis.echoDynamicKeyNumber(2, 'k1', 'k2', 'a1', 'a2', function (err, result) {
  // result === ['k1', 'k2', 'a1', 'a2']
});

Redis规范

cluster构造函数接受两个参数,

  • 第一个参数是连接到的集群节点列表,不需要枚举所有的集群节点,但是如果一个节点无法访问客户端将会尝试下一个节点,并且客户端将至少连接一个节点时自动发现其他节点。

  • 第二个参数是选项

    • clusterRetryStrategy:当没有启动任何节点可访问时,clusterRetryStrategy将调用,返回一个数字,ioredis将尝试在指定的延迟(毫秒为单位)后从头开始重新连接到启动节点,否则将返回无启动节点可用错误。
    // 选项的默认值
    function (times) {
      var delay = Math.min(100 + times * 2, 2000);
      return delay;
    }
    
    • enableOfflineQueue:类似于类的enableOfflineQueue选项Redis
    • enableReadyCheck:启用后,只有在cluster info报告集群准备好处理命令的命令时才会发出“就绪”事件。否则,它将在“connect”发出后立即发出。
    • scaleReads:配置发送读取查询的位置。
    • maxRedirections:当收到与群集相关的错误(例如MOVED,等)时,客户端将命令重定向到另一个节点。此选项限制发送命令时允许的最大重定向。默认值为。ASK``CLUSTERDOWN``16
    • retryDelayOnFailover:如果在发送命令时目标节点断开连接,ioredis将在指定的延迟后重试。默认值为100。您应该确保retryDelayOnFailover * maxRedirections > cluster-node-timeout 在故障转移期间确保没有命令失败。
    • retryDelayOnClusterDown:当群集关闭时,所有命令都将被拒绝,错误为CLUSTERDOWN。如果此选项是数字(默认情况下为100),则客户端将在指定时间(以毫秒为单位)后重新发送命令
    • retryDelayOnTryAgain:如果此选项是一个数字(默认情况下是100),则客户端将TRYAGAIN在指定时间(以毫秒为单位)后重新发送被拒绝的命令。
    • redisOptionsRedis连接到节点时传递给构造函数的默认选项。
    • slotsRefreshTimeout:从群集刷新插槽时发生超时前的毫秒数(默认值1000
    • slotsRefreshInterval:每个自动插槽刷新之间的毫秒数(默认5000

    集群模式下的发布和订阅:(和独立模式下的发布订阅一致)

    sub.on('message', function (channel, message) {
      console.log(channel, message);
    });
     
    sub.subscribe('news', function () {
      pub.publish('news', 'highlights');
    });
    
Logo

更多推荐