需求

生产java服务需要监控jvm,暂时还没有使用apm,因此结合当时的zabbix做了一个数据收集的脚本,然后发送给zabbix展示:

#!/usr/bin/python
#coding: utf-8
import os
import sys
import json
import jpype
from jpype import java
from jpype import javax
from multiprocessing.dummy import Pool as ThreadPool


# 参考:
# https://blog.nobugware.com/post/2010/11/08/jmx-query-python-cpython/
# 官网:http://jpype.sourceforge.net/

#服务路径
service_prefix_path = "/data/apps/soa/"


def discovery(service_prefix_path):
    r = {}
    r['data'] = []

    ret = os.popen("ls {0}".format(service_prefix_path))
    res = ret.read()
    for service in res.splitlines():
        if service:
            service_path = os.path.join(service_prefix_path,service)
            if os.path.islink(service_path):
                r['data'].append({'{#SERVICE}': service})
    print(json.dumps(r))


def _Get_Jmx(service,port):
    user = ""
    password = ""
    r_str = ""

    URL = "service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % (port)
    #jpype.startJVM("C:\Program Files\Java\jre1.8.0_171\bin\server\jvm.dll")

    #下面会有一个连接异常的处理,让一个连接报错,不至于影响脚本继续往下执行
    try:
        jhash = java.util.HashMap()
        jarray=jpype.JArray(java.lang.String)([user,password])
        jhash.put(javax.management.remote.JMXConnector.CREDENTIALS,jarray)
        jmxurl = javax.management.remote.JMXServiceURL(URL)
        jmxsoc = javax.management.remote.JMXConnectorFactory.connect(jmxurl,jhash)
        connection = jmxsoc.getMBeanServerConnection()
    except Exception as e:
        print(e)
        return ""   #如果连接不上,直接返回空字符串


    #Threading
    type_str = "Threading"
    object="java.lang:type={0}".format(type_str)
    for atrribute in ["ThreadCount","TotalStartedThreadCount","PeakThreadCount","DaemonThreadCount"]:
        try:
            attr=connection.getAttribute(javax.management.ObjectName(object),atrribute)
        except Exception, e:
            pass
        else:
            attr = int(attr)
            r_str += 'jmx.{0}.{1}.[{2}]'.format(type_str, atrribute, service) + " " + str(attr) + "\n"

    #OperatingSystem
    type_str = "OperatingSystem"
    object="java.lang:type={0}".format(type_str)
    for atrribute in ["MaxFileDescriptorCount","OpenFileDescriptorCount","ProcessCpuLoad"]:
        try:
            attr=connection.getAttribute(javax.management.ObjectName(object),atrribute)
        except Exception, e:
            pass
        else:
            if atrribute == "ProcessCpuLoad":
                attr = round(float(attr),4)
            else:
                attr = int(attr)
            r_str += 'jmx.{0}.{1}.[{2}]'.format(type_str, atrribute, service) + " " + str(attr) + "\n"

    #ClassLoading
    type_str = "ClassLoading"
    object="java.lang:type={0}".format(type_str)
    for atrribute in ["LoadedClassCount","TotalLoadedClassCount","UnloadedClassCount"]:
        try:
            attr=connection.getAttribute(javax.management.ObjectName(object),atrribute)
        except Exception, e:
            pass
        else:
            attr = int(attr)
            r_str += 'jmx.{0}.{1}.[{2}]'.format(type_str, atrribute, service) + " " + str(attr) + "\n"

    #Runtime
    type_str = "Runtime"
    object="java.lang:type={0}".format(type_str)
    for atrribute in ["VmName","Uptime","VmVersion"]:
        try:
            attr=connection.getAttribute(javax.management.ObjectName(object),atrribute)
        except Exception, e:
            pass
        else:
            attr = str(attr)
            r_str += 'jmx.{0}.{1}.[{2}]'.format(type_str, atrribute, service) + " " + attr + "\n"

    #memory
    type_str = "Memory"
    object="java.lang:type={0}".format(type_str)
    for atrribute in ["HeapMemoryUsage","NonHeapMemoryUsage","ObjectPendingFinalizationCount"]:
        try:
            attr=connection.getAttribute(javax.management.ObjectName(object),atrribute)
        except Exception,e:
            pass
        else:
            if atrribute == "ObjectPendingFinalizationCount":
                r_str +=  'jmx.{0}.{1}.[{2}]'.format(type_str,atrribute,service) + " " + str(int(attr)) + "\n"
            else:
                for branch in ["committed","max","used"]:
                    r_str += 'jmx.{0}.{1}.{2}.[{3}]'.format(type_str, atrribute,branch,service) + " " + str(int(attr.contents.get(branch))) + "\n"

    #GarbageCollector:ok,其中时间单位是s
    type_str = "GarbageCollector"
    for name in ["Copy","MarkSweepCompact","PS Scavenge","ConcurrentMarkSweep","ParNew","PS MarkSweep"]:
        object = "java.lang:type={0},name={1}".format(type_str,name)
        for atrribute in ["CollectionTime","CollectionCount"]:
            try:
                attr=connection.getAttribute(javax.management.ObjectName(object),atrribute)
            except:
                pass  #如果报错直接就没有数据
                # r_str += 'jmx["{0}",{1}].[{2}]'.format(object, atrribute, service) + " " + str(0) + "\n"
            else:
                r_str += 'jmx.{0}.{1}.{2}.[{3}]'.format(type_str,name.replace(" ","_"),atrribute,service) + " " + str(int(attr)) + "\n"

    #memoryPool
    type_str = "MemoryPool"
    for name in ["Code Cache","Metaspace","Compressed Class Space","PAR Eden Space","PAR Survivor Space","PS Eden Space","PS Old Gen","PS Perm Gen","PS Survivor Space","CMS Old Gen","CMS Perm Gen","Perm Gen"]:
        object = "java.lang:type={0},name={1}".format(type_str,name)
        try:
            attr=connection.getAttribute(javax.management.ObjectName(object),"Usage")
        except Exception,e:
            pass   #如果报错,直接不会有数据
            # for branch in ["committed", "used", "max"]:
            #     r_str += 'jmx["{0}",{1}.{2}].[{3}]'.format(object, "Usage", branch,service) + " " + str(0) + "\n"
        else:
            for branch in ["committed","used","max"]:
                r_str += 'jmx.{0}.{1}.{2}.{3}.[{4}]'.format(type_str,name.replace(" ","_"),"Usage",branch,service)+ " " + str(int(attr.contents.get(branch))) + "\n"

    return r_str

def _Get_Port(service_path):
    cmd = "ps -ef |grep %s |grep -v grep | awk -F'jmxremote.port=' '{print $2}' | awk '{print $1}'"  % (service_path)
    #print(cmd)
    try:
        ret = os.popen(cmd)
        res = ret.read()
    except Exception as e:
        pass

    #要不要在这里就做一个状态值出来jmx.jvm_status.[{#SERVICE}]
    #0是正常的,2是没有启动,1是没有开启jmx
    if res:
        if res == "\n":
            #print("The service of {0} is running,but the jmx have not config".format(service_path.split('/')[-1]))
            return None,1
        else:
            #print(int(res.splitlines()[0]))
            return int(res.splitlines()[0]),0
    else:
        #print("This service of {0} is not running!".format(service_path.split('/')[-1]))
        return None,2


def SendData(service_prefix_path):
    zbx_sender_cmd = "{0} -c {1} -i {2}"
    zbx_conf = "/usr/local/services/zabbix-3.0.0/etc/zabbix_agentd.conf"
    zbx_sender_file = "/tmp/.zbx_jmx_sender.txt"
    zbx_sender = "/usr/local/services/zabbix-3.0.0/bin/zabbix_sender"
    r_str = ""

    # 启动虚拟机
    jpype.startJVM("/data/java/jdk1.8.0_161/jre/lib/amd64/server/libjvm.so")

    #当时测试机只有一颗cpu,多线程一开就报错,有点像是jpype的问题,有时执行频繁也会抛错
    #或者jpype本身对多线程或者多进程支持不是很好
    #而且这里用并发,并不见得比串行时间少
    cpu_num_cmd = "grep 'processor' /proc/cpuinfo  |wc -l"
    cpu_num = int(os.popen(cpu_num_cmd).read())
    if cpu_num > 4:
        cpu_num = 5   #也就是线程池最大是5

    pool = ThreadPool(processes=cpu_num)
    threads = []

    ret = os.popen("ls {0}".format(service_prefix_path))
    res = ret.read()

    for service in res.splitlines():
        if service:
            service_path = os.path.join(service_prefix_path,service)
            if os.path.islink(service_path):
                #调用_Get_Port函数,获取服务端口
                port = _Get_Port(service_path)
                if port:
                    thread = pool.apply_async(_Get_Jmx,(service,port,))
                    threads.append(thread)

    pool.close()
    pool.join()
    for th in threads:
        if th.ready():
          if th.successful():
              r_str += th.get()

    print(r_str)

    # with open(zbx_sender_file,"w") as f:
    #     f.write(r_str)
    #
    # send_ret = os.popen(zbx_sender_cmd.format(zbx_sender, zbx_conf, zbx_sender_file))
    # #print(zbx_sender_cmd.format(zbx_sender, zbx_conf, zbx_sender_file))
    # if "failed: 0" in send_ret.read():  #这一步,用一个普通的item来触发,并返回执行结果,1是正常的,0是发送异常
    #     print(1)
    # else:
    #     print(0)

if __name__ == "__main__":
    if len(sys.argv) == 2 and sys.argv[1]=="discovery":
        discovery(service_prefix_path)
    elif len(sys.argv) == 1:
        SendData(service_prefix_path)
    else:
        sys.stderr.write("Args is wrong!")

缺点

  • 并不能一定定位到应用的程序名,当有服务如soa-gateway和soa-gateway-hehe时,会混淆;
  • 使用了多线程,但是生产过程中,发现无论是多线程、多进程、多协程,效率都不高;
  • 垃圾回收时间并没有做处理,获取到的数据直接传到了zabbix
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐