import Stomp from 'stompjs'

// tips
// 发送消息
// 客户端向服务端发送消息利用send()方法,此方法有三个参数:第一个参数(string)必需,为发送消息的目的地;第二个参数(object)可选,包含了额外的头部信息;第三个参数(string)可选,为发送的消息。
// client.send(destination, {}, body)

// 订阅消息
// 订阅消息也就是客户端接收服务端发送的消息,订阅消息可以利用subscribe()方法,此方法有三个参数:第一个参数(string)必需,为接收消息的目的地;第二个参数必需为回调函数;第三个参数{object}为可选,包含额外的头部信息。
// client.subscribe(destination, callback, {})

// 取消订阅消息可以利用unsubscribe()方法:
//  let mySubscribe =  client.subscribe
//  mySubscribe.unsubscribe()

// 客户端订阅消息可以订阅广播,如下所示:
// client.subscribe('/topic/msg',function(messages){
//     console.log(messages)
// })

// 也可以进行一对一消息的接收:
// 第一种方式
// const userId = 666;
// client.subscribe('/user/' + userId + '/msg',,function(messages){
//     console.log(messages)
// })
// 第二种方式
// client.subscribe('/msg',function(messages){
//     console.log(messages)
// }, {"userId ": userId  })

class StompClient {
  constructor(rmqServer, rmqVirtualHost, rmqAccount, rmqPassword) {
    this.queueName = '' // 队列名
    this.client = null
    this.frame = null
    this.options = {
      vhost: rmqVirtualHost,
      incoming: 10000, // 接收频率,默认10000ms
      outgoing: 10000, // 发送频率,默认10000ms
      account: rmqAccount,
      pwd: rmqPassword,
      server: `ws://${rmqServer}/ws`,
      message: ''
    }
  }

  // routerKey
  getQueueName() {
    this.queueName = 'xxx'
  }

  connect(options) {
    this.options = { ...this.options, ...options }
    const mqUrl = this.options.server
    const ws = new WebSocket(mqUrl)
    ws.onclose = (close) => {
      console.log('webSocket关闭', close)
    }
    ws.onerror = (error) => {
      console.log('webSocket异常', error)
    }
    ws.onopen = (success) => {
      console.log('webSocket连接成功', success)
    }
    this.client = Stomp.over(ws)
    this.client.heartbeat.incoming = this.options.incoming
    this.client.heartbeat.outgoing = this.options.outgoing
    this.client.debug = null // 关闭控制台调试
    // mq连接
    const onConnect = async () => {
      console.log('stomp 连接成功!')
      // 订阅消息
      this.subscribe()
    }
    // mq错误重新进行连接
    const onError = (errorMsg) => {
      console.error(`stomp 断开连接,正在准备重新连接...`, errorMsg)
      this.connect(this.options)
    }
    // 连接
    this.client.connect(
      this.options.account, // 用户名
      this.options.pwd, // 密码
      onConnect,
      onError,
      this.options.vhost
    )
  }
  // 消息订阅
  subscribe() {
    this.getQueueName()
    if (!this.queueName) {
      return
    }
    this.client.subscribe(
      `/exchange/amq.direct/${this.queueName}`,
      this.onMessage.bind(this),
      this.onSubscribeFailed.bind(this),
      {
        'auto-delete': 'true'
      }
    )
  }
  // 关闭mq连接
  closeConnect() {
    this.client.disconnect(() => {
      console.log('已关闭mq连接')
    })
  }
  onMessage(frame) {
    const data = frame && JSON.parse(frame.body)
    console.log('data :>> ', data.index)
    this.options.message = data
    // console.log('data :>> ', data)
  }

  onSubscribeFailed(frame) {
    console.log('rabbitmq subscribe failed')
  }
}

const RMQ_SERVER = '10.0.0.0:15674' // mq服务地址
const RMQ_VIRTUAL_HOST = 'xx' // 虚拟主机
const RMQ_ACCOUNT = 'xxx' // 用户名
const RMQ_PASSWORD = 'xxx' // 密码

export default new StompClient(RMQ_SERVER, RMQ_VIRTUAL_HOST, RMQ_ACCOUNT, RMQ_PASSWORD)

  watch: {
    opt: {
      deep: true,
      immediate: true,
      handler(newVal) {
        this.fileDetail = (newVal && newVal.message) || {}
        // console.log('fileDetail :>> ', this.fileDetail)
      }
    }
  },
  // 使用
  // mounted() {
  //   StompClient.connect()
  //   this.opt = StompClient.options
  // },
  // beforeDestroy() {
  //   StompClient.closeConnect()
  // },
Logo

前往低代码交流专区

更多推荐