zabbix监控CDH搭建的Hadoop集群

参考来源:
https://github.com/jbfavre/python-zabbix
https://github.com/jbfavre/python-protobix/

1. 先在Cloudera Manager创建只读用户monitor

2. 将CDH的API地址、monitor账号密码保存在/etc/zabbix/config/cdh.conf
ServerActive=10.33.83.242   # 同Zabbix_agent的配置,指向zabbix-proxy/zabbix-server
ServerPort=10051            # zabbix-server默认端口
LogType=file                # 监控脚本日志类型
LogFile=/tmp/zabbix_agentd_cdh.log  # 监控脚本日志输出路径
DebugLevel=2                # 日志级别,2-只显示错误,3-显示INFO级别,5-调试用
Hostname=10.33.99.23        # 同Zabbix_agent配置的主机名
CDHApiHost=10.33.99.23      # Cloudera Manager的http地址
CDHApiPort=7180             # Cloudera Manager的http端口
CDHApiUser=monitor          # 只读角色账号
CDHApiPass=xxxxxxxxxxxxxxxx # 只读角色密码
3. Zabbix配置/etc/zabbix/zabbix_agentd.d/cdh.conf
UserParameter=hadoop.cm.check,/usr/local/zabbix-ztc/bin/zabbix_cdh_monitor.py -c /etc/zabbix/config/cdh.conf --update-items
UserParameter=hadoop.cm.discovery,/usr/local/zabbix-ztc/bin/zabbix_cdh_monitor.py -c /etc/zabbix/config/cdh.conf --discovery
4. 监控脚本/usr/local/zabbix-ztc/bin/zabbix_cdh_monitor.py
#!/usr/local/bin/python2.7
# -*- coding: utf-8 -*-

import sys
import socket
import configobj
import protobix
import json
from datetime import datetime
from cm_api.api_client import ApiResource


class CDH(protobix.SampleProbe):

    __version__ = '1.0.0'
    CM_API_VERSION = 5
    CM_COMMISSION_MAPPING = { 'UNKNOWN': -1,
                              'COMMISSIONED': 0,
                              'DECOMMISSIONING': 1,
                              'DECOMMISSIONED': 2 }
    CM_HEALTH_MAPPING = { 'HISTORY_NOT_AVAILABLE': -1,
                          'NOT_AVAILABLE': -1,
                          'DISABLED': -1,
                          'GOOD': 0,
                          'CONCERNING': 1,
                          'BAD': 2 }
    CM_SERVICE_MAPPING = { 'HISTORY_NOT_AVAILABLE': -1,
                           'UNKNOWN': -1,
                           'STARTING': 0,
                           'STARTED': 0,
                           'STOPPING': 1,
                           'STOPPED': 1,
                           'NA': 0 }
    CM_BOOLEAN_MAPPING = { 'False': 0,
                           'True': 1,
                           'FRESH': 0,
                           'STALE_REFRESHABLE': 0,
                           'STALE': 1
                           }

    def _parse_probe_args(self, parser):
        # cdh_probe_options = parser.add_argument_group('Hadoop Cloudera Manager API Configuration')
        # cdh_probe_options.add_argument(
        #     "-H", "--cdh-api-host", default="localhost",
        #     help="Hadoop Cloudera Manager API hostname"
        # )
        # cdh_probe_options.add_argument(
        #     "-P", "--cdh-api-port", default=7180,
        #     help="Hadoop Cloudera Manager API port"
        # )
        # cdh_probe_options.add_argument(
        #     "-F", "--cdh-auth-file", default=None,
        #     help="Hadoop Cloudera Manager API User And Password File"
        # )
        # cdh_probe_options.add_argument(
        #     "-T", "--cdh-use-tls", default=False,
        #     help="Hadoop Cloudera Manager API User And Password File"
        # )
        return parser

    def _init_probe(self):
        tmp_config = configobj.ConfigObj(self.options.config_file, list_values=False)
        if 'CDHApiHost' in tmp_config:
            self.options.cdh_host = tmp_config['CDHApiHost']
        if 'CDHApiPort' in tmp_config:
            self.options.cdh_port = int(tmp_config['CDHApiPort'])
        if 'CDHApiUser' in tmp_config:
            username = tmp_config['CDHApiUser']
        if 'CDHApiPass' in tmp_config:
            password = tmp_config['CDHApiPass']
        if self.options.cdh_host == 'localhost':
            self.options.cdh_host = socket.getfqdn()
        self.hostname = self.options.cdh_host
        self.cdh_api = ApiResource(
            self.options.cdh_host,
            self.options.cdh_port,
            username,
            password
        )
        return {'username': username, 'password': password}

    def _get_discovery(self):
        mgmt_hostname = self.hostname
        data = {}

        for cluster in self.cdh_api.get_all_clusters():
            if not mgmt_hostname in data:
                data[mgmt_hostname] = {
                    'hadoop.cm.cluster.discovery': [],
                    'hadoop.cm.cluster.check.discovery': [],
                    'hadoop.cm.host.discovery': [],
                    'hadoop.cm.host.check.discovery': [],
                    'hadoop.cm.role.discovery': [],
                    'hadoop.cm.role.check.discovery': [],
                    'hadoop.cm.service.discovery': [],
                    'hadoop.cm.service.check.discovery': []
                }

            for instance in cluster.list_hosts():
                host = self.cdh_api.get_host(instance.hostId)
                data[host.hostname] = {
                    'hadoop.cm.cluster.discovery': [],
                    'hadoop.cm.cluster.check.discovery': [],
                    'hadoop.cm.host.discovery': [],
                    'hadoop.cm.host.check.discovery': [],
                    'hadoop.cm.role.discovery': [],
                    'hadoop.cm.role.check.discovery': [],
                    'hadoop.cm.service.discovery': [],
                    'hadoop.cm.service.check.discovery': []
                }

                host_list = {'{#HDPCLUSTERNAME}': ("%s" % cluster.name)}
                key = 'hadoop.cm.host.discovery'
                data[host.hostname][key].append(host_list)

                for check in host.healthChecks:
                    check_list = {
                        '{#HDPCLUSTERNAME}': ("%s" % cluster.name),
                        '{#HDPHOSTCHECKNAME}': ("%s" % check['name'].lower())
                    }
                    key = 'hadoop.cm.host.check.discovery'
                    data[host.hostname][key].append(check_list)

            for service in cluster.get_all_services(view="full"):
                service_list = {
                    '{#HDPCLUSTERNAME}': ("%s" % cluster.name),
                    '{#HDPSERVICENAME}': ("%s" % service.type.lower())
                }
                key = 'hadoop.cm.service.discovery'
                data[mgmt_hostname][key].append(service_list)

                for check in service.healthChecks:
                    check_list = {
                        '{#HDPCLUSTERNAME}': ("%s" % cluster.name),
                        '{#HDPSERVICENAME}': ("%s" % service.type.lower()),
                        '{#HDPSERVICECHECKNAME}': ("%s" % check['name'].lower())
                    }
                    key = 'hadoop.cm.service.check.discovery'
                    data[mgmt_hostname][key].append(check_list)

                for role in service.get_all_roles(view="full"):
                    host = self.cdh_api.get_host(instance.hostId)
                    role_list = {
                        '{#HDPCLUSTERNAME}': ("%s" % cluster.name),
                        '{#HDPSERVICENAME}': ("%s" % service.type.lower()),
                        '{#HDPROLENAME}': ("%s" % role.type.lower())
                    }
                    key = 'hadoop.cm.role.discovery'
                    data[host.hostname][key].append(role_list)

                    for check in role.healthChecks:
                        check_list = {
                            '{#HDPCLUSTERNAME}': ("%s" % cluster.name),
                            '{#HDPSERVICENAME}': ("%s" % service.type.lower()),
                            '{#HDPROLENAME}': ("%s" % role.type.lower()),
                            '{#HDPROLECHECKNAME}': ("%s" % check['name'].lower())
                        }
                        key = 'hadoop.cm.role.check.discovery'
                        data[host.hostname][key].append(check_list)

            cluster_list = {"{#HDPCLUSTERNAME}": ("%s" % cluster.name)}
            key = 'hadoop.cm.cluster.discovery'
            data[mgmt_hostname][key].append(cluster_list)

        mgmt_service = self.cdh_api.get_cloudera_manager().get_service()
        service_list = {
            '{#HDPCLUSTERNAME}': ("%s" % cluster.name),
            '{#HDPSERVICENAME}': ("%s" % mgmt_service.type.lower())
        }
        key = 'hadoop.cm.service.discovery'
        data[mgmt_hostname][key].append(service_list)

        for check in mgmt_service.healthChecks:
            check_list = {
                '{#HDPCLUSTERNAME}': ("%s" % cluster.name),
                '{#HDPSERVICENAME}': ("%s" % mgmt_service.type.lower()),
                '{#HDPSERVICECHECKNAME}': ("%s" % check['name'].lower())
            }
            key = 'hadoop.cm.service.check.discovery'
            data[mgmt_hostname][key].append(check_list)
        return data

    def _get_metrics(self):
        mgmt_hostname = self.hostname
        data = {}

        for cluster in self.cdh_api.get_all_clusters():
            if not mgmt_hostname in data:
                data[mgmt_hostname] = {}

            key = "hadoop.cm.cluster[%s,version]"
            data[mgmt_hostname][(key % cluster.name)] = cluster.version
            key = "hadoop.cm.cluster[%s,maintenanceMode]"
            data[mgmt_hostname][(key % cluster.name)] = self.CM_BOOLEAN_MAPPING[str(cluster.maintenanceMode)]
            for instance in cluster.list_hosts():
                host = self.cdh_api.get_host(instance.hostId)
                if not host.hostname in data:
                    data[host.hostname] = {}

                key = "hadoop.cm.host[%s,maintenanceMode]"
                data[host.hostname][(key % cluster.name)] = self.CM_BOOLEAN_MAPPING[str(host.maintenanceMode)]
                key = "hadoop.cm.host[%s,healthSummary]"
                data[host.hostname][(key % cluster.name)] = self.CM_HEALTH_MAPPING[str(host.healthSummary)]
                key = "hadoop.cm.host[%s,commissionState]"
                data[host.hostname][(key % cluster.name)] = self.CM_COMMISSION_MAPPING[str(host.commissionState)]
                difference = datetime.now() - host.lastHeartbeat
                differenceTotalSeconds = (difference.microseconds + (
                difference.seconds + difference.days * 24 * 3600) * 1e6) / 1e6
                key = "hadoop.cm.host[%s,lastHeartbeat]"
                data[host.hostname][(key % cluster.name)] = differenceTotalSeconds
                ''' Only works with Python 2.7
                   differenceTotalSeconds = (datetime.now() - host.lastHeartbeat).total_seconds()
                   data[host.hostname][("hadoop.cm.host[%s,lastHeartbeat]" % cluster.name)] = differenceTotalSeconds'''
                for check in host.healthChecks:
                    key = "hadoop.cm.host.check[%s,%s]"
                    data[host.hostname][(key % (cluster.name, check['name'].lower()))] = self.CM_HEALTH_MAPPING[
                        check['summary']]

            for service in cluster.get_all_services(view="full"):
                key = "hadoop.cm.service[%s,%s,serviceState]"
                data[mgmt_hostname][(key % (cluster.name, service.type.lower()))] = self.CM_SERVICE_MAPPING[
                    service.serviceState]
                key = "hadoop.cm.service[%s,%s,healthSummary]"
                data[mgmt_hostname][(key % (cluster.name, service.type.lower()))] = self.CM_HEALTH_MAPPING[
                    service.healthSummary]
                key = "hadoop.cm.service[%s,%s,configStalenessStatus]"
                data[mgmt_hostname][(key % (cluster.name, service.type.lower()))] = self.CM_BOOLEAN_MAPPING[
                    str(service.configStalenessStatus)]
                key = "hadoop.cm.service[%s,%s,maintenanceMode]"
                data[mgmt_hostname][(key % (cluster.name, service.type.lower()))] = self.CM_BOOLEAN_MAPPING[
                    str(service.maintenanceMode)]

                for check in service.healthChecks:
                    key = "hadoop.cm.service.check[%s,%s,%s,checkSummary]"
                    data[mgmt_hostname][(key % (cluster.name, service.type.lower(), check['name'].lower()))] = \
                    self.CM_HEALTH_MAPPING[check['summary']]

                for role in service.get_all_roles(view="full"):
                    host = self.cdh_api.get_host(role.hostRef.hostId)
                    key = "hadoop.cm.role[%s,%s,%s,commissionState]"
                    data[host.hostname][(key % (cluster.name, service.type.lower(), role.type.lower()))] = \
                    self.CM_COMMISSION_MAPPING[str(role.commissionState)]
                    key = "hadoop.cm.role[%s,%s,%s,configStalenessStatus]"
                    data[host.hostname][(key % (cluster.name, service.type.lower(), role.type.lower()))] = \
                    self.CM_BOOLEAN_MAPPING[str(role.configStalenessStatus)]
                    key = "hadoop.cm.role[%s,%s,%s,healthSummary]"
                    data[host.hostname][(key % (cluster.name, service.type.lower(), role.type.lower()))] = \
                    self.CM_HEALTH_MAPPING[str(role.healthSummary)]
                    key = "hadoop.cm.role[%s,%s,%s,maintenanceMode]"
                    data[host.hostname][(key % (cluster.name, service.type.lower(), role.type.lower()))] = \
                    self.CM_BOOLEAN_MAPPING[str(role.maintenanceMode)]
                    key = "hadoop.cm.role[%s,%s,%s,roleState]"
                    data[host.hostname][(key % (cluster.name, service.type.lower(), role.type.lower()))] = \
                    self.CM_SERVICE_MAPPING[str(role.roleState)]

                    for check in role.healthChecks:
                        key = "hadoop.cm.role.check[%s,%s,%s,%s,checkSummary]"
                        data[host.hostname][(key % (
                            cluster.name, service.type.lower(),
                            role.type.lower(),
                            check['name'].lower())
                                             )] = self.CM_HEALTH_MAPPING[check['summary']]

        mgmt_service = self.cdh_api.get_cloudera_manager().get_service()
        key = "hadoop.cm.service[%s,%s,serviceState]"
        data[mgmt_hostname][(key % (cluster.name, mgmt_service.type.lower()))] = self.CM_SERVICE_MAPPING[
            mgmt_service.serviceState]
        key = "hadoop.cm.service[%s,%s,healthSummary]"
        data[mgmt_hostname][(key % (cluster.name, mgmt_service.type.lower()))] = self.CM_HEALTH_MAPPING[
            mgmt_service.healthSummary]
        key = "hadoop.cm.service[%s,%s,configStalenessStatus]"
        data[mgmt_hostname][(key % (cluster.name, mgmt_service.type.lower()))] = self.CM_BOOLEAN_MAPPING[
            str(mgmt_service.configStalenessStatus)]
        key = "hadoop.cm.service[%s,%s,maintenanceMode]"
        data[mgmt_hostname][(key % (cluster.name, mgmt_service.type.lower()))] = self.CM_BOOLEAN_MAPPING[
            str(mgmt_service.maintenanceMode)]

        for check in mgmt_service.healthChecks:
            key = "hadoop.cm.service.check[%s,%s,%s,checkSummary]"
            data[mgmt_hostname][(key % (cluster.name, mgmt_service.type.lower(), check['name'].lower()))] = \
            self.CM_HEALTH_MAPPING[check['summary']]

        data[mgmt_hostname]['hadoop.cm.zbx_version'] = self.__version__
        return data


if __name__ == '__main__':
    ret = CDH().run()
    print ret
    sys.exit(ret)
5. Zabbix Template模板下载

下载: zbx_cdh_hadoop_templates.zip

订阅
提醒
4 评论
在线反馈
查看全部评论