zabbix监控CDH搭建的Hadoop集群

参考来源:
https://github.com/jbfavre/python-zabbix
https://github.com/jbfavre/python-protobix/

1. 先在Cloudera Manager创建只读用户monitor

2. 将CDH的API地址、monitor账号密码保存在/etc/zabbix/config/cdh.conf
ServerActive=10.33.83.242   # 同Zabbix_agent的配置,指向zabbix-proxy/zabbix-server
ServerPort=10051            # zabbix-server默认端口
LogType=file                # 监控脚本日志类型
LogFile=/tmp/zabbix_agentd_cdh.log  # 监控脚本日志输出路径
DebugLevel=2                # 日志级别,2-只显示错误,3-显示INFO级别,5-调试用
Hostname=10.33.99.23        # 同Zabbix_agent配置的主机名
CDHApiHost=10.33.99.23      # Cloudera Manager的http地址
CDHApiPort=7180             # Cloudera Manager的http端口
CDHApiUser=monitor          # 只读角色账号
CDHApiPass=xxxxxxxxxxxxxxxx # 只读角色密码
3. Zabbix配置/etc/zabbix/zabbix_agentd.d/cdh.conf
UserParameter=hadoop.cm.check,/usr/local/zabbix-ztc/bin/zabbix_cdh_monitor.py -c /etc/zabbix/config/cdh.conf --update-items
UserParameter=hadoop.cm.discovery,/usr/local/zabbix-ztc/bin/zabbix_cdh_monitor.py -c /etc/zabbix/config/cdh.conf --discovery
4. 监控脚本/usr/local/zabbix-ztc/bin/zabbix_cdh_monitor.py
#!/usr/local/bin/python2.7
# -*- coding: utf-8 -*-
import sys
import socket
import configobj
import protobix
import json
from datetime import datetime
from cm_api.api_client import ApiResource
class CDH(protobix.SampleProbe):
__version__ = '1.0.0'
CM_API_VERSION = 5
CM_COMMISSION_MAPPING = { 'UNKNOWN': -1,
'COMMISSIONED': 0,
'DECOMMISSIONING': 1,
'DECOMMISSIONED': 2 }
CM_HEALTH_MAPPING = { 'HISTORY_NOT_AVAILABLE': -1,
'NOT_AVAILABLE': -1,
'DISABLED': -1,
'GOOD': 0,
'CONCERNING': 1,
'BAD': 2 }
CM_SERVICE_MAPPING = { 'HISTORY_NOT_AVAILABLE': -1,
'UNKNOWN': -1,
'STARTING': 0,
'STARTED': 0,
'STOPPING': 1,
'STOPPED': 1,
'NA': 0 }
CM_BOOLEAN_MAPPING = { 'False': 0,
'True': 1,
'FRESH': 0,
'STALE_REFRESHABLE': 0,
'STALE': 1
}
def _parse_probe_args(self, parser):
# cdh_probe_options = parser.add_argument_group('Hadoop Cloudera Manager API Configuration')
# cdh_probe_options.add_argument(
#     "-H", "--cdh-api-host", default="localhost",
#     help="Hadoop Cloudera Manager API hostname"
# )
# cdh_probe_options.add_argument(
#     "-P", "--cdh-api-port", default=7180,
#     help="Hadoop Cloudera Manager API port"
# )
# cdh_probe_options.add_argument(
#     "-F", "--cdh-auth-file", default=None,
#     help="Hadoop Cloudera Manager API User And Password File"
# )
# cdh_probe_options.add_argument(
#     "-T", "--cdh-use-tls", default=False,
#     help="Hadoop Cloudera Manager API User And Password File"
# )
return parser
def _init_probe(self):
tmp_config = configobj.ConfigObj(self.options.config_file, list_values=False)
if 'CDHApiHost' in tmp_config:
self.options.cdh_host = tmp_config['CDHApiHost']
if 'CDHApiPort' in tmp_config:
self.options.cdh_port = int(tmp_config['CDHApiPort'])
if 'CDHApiUser' in tmp_config:
username = tmp_config['CDHApiUser']
if 'CDHApiPass' in tmp_config:
password = tmp_config['CDHApiPass']
if self.options.cdh_host == 'localhost':
self.options.cdh_host = socket.getfqdn()
self.hostname = self.options.cdh_host
self.cdh_api = ApiResource(
self.options.cdh_host,
self.options.cdh_port,
username,
password
)
return {'username': username, 'password': password}
def _get_discovery(self):
mgmt_hostname = self.hostname
data = {}
for cluster in self.cdh_api.get_all_clusters():
if not mgmt_hostname in data:
data[mgmt_hostname] = {
'hadoop.cm.cluster.discovery': [],
'hadoop.cm.cluster.check.discovery': [],
'hadoop.cm.host.discovery': [],
'hadoop.cm.host.check.discovery': [],
'hadoop.cm.role.discovery': [],
'hadoop.cm.role.check.discovery': [],
'hadoop.cm.service.discovery': [],
'hadoop.cm.service.check.discovery': []
}
for instance in cluster.list_hosts():
host = self.cdh_api.get_host(instance.hostId)
data[host.hostname] = {
'hadoop.cm.cluster.discovery': [],
'hadoop.cm.cluster.check.discovery': [],
'hadoop.cm.host.discovery': [],
'hadoop.cm.host.check.discovery': [],
'hadoop.cm.role.discovery': [],
'hadoop.cm.role.check.discovery': [],
'hadoop.cm.service.discovery': [],
'hadoop.cm.service.check.discovery': []
}
host_list = {'{#HDPCLUSTERNAME}': ("%s" % cluster.name)}
key = 'hadoop.cm.host.discovery'
data[host.hostname][key].append(host_list)
for check in host.healthChecks:
check_list = {
'{#HDPCLUSTERNAME}': ("%s" % cluster.name),
'{#HDPHOSTCHECKNAME}': ("%s" % check['name'].lower())
}
key = 'hadoop.cm.host.check.discovery'
data[host.hostname][key].append(check_list)
for service in cluster.get_all_services(view="full"):
service_list = {
'{#HDPCLUSTERNAME}': ("%s" % cluster.name),
'{#HDPSERVICENAME}': ("%s" % service.type.lower())
}
key = 'hadoop.cm.service.discovery'
data[mgmt_hostname][key].append(service_list)
for check in service.healthChecks:
check_list = {
'{#HDPCLUSTERNAME}': ("%s" % cluster.name),
'{#HDPSERVICENAME}': ("%s" % service.type.lower()),
'{#HDPSERVICECHECKNAME}': ("%s" % check['name'].lower())
}
key = 'hadoop.cm.service.check.discovery'
data[mgmt_hostname][key].append(check_list)
for role in service.get_all_roles(view="full"):
host = self.cdh_api.get_host(instance.hostId)
role_list = {
'{#HDPCLUSTERNAME}': ("%s" % cluster.name),
'{#HDPSERVICENAME}': ("%s" % service.type.lower()),
'{#HDPROLENAME}': ("%s" % role.type.lower())
}
key = 'hadoop.cm.role.discovery'
data[host.hostname][key].append(role_list)
for check in role.healthChecks:
check_list = {
'{#HDPCLUSTERNAME}': ("%s" % cluster.name),
'{#HDPSERVICENAME}': ("%s" % service.type.lower()),
'{#HDPROLENAME}': ("%s" % role.type.lower()),
'{#HDPROLECHECKNAME}': ("%s" % check['name'].lower())
}
key = 'hadoop.cm.role.check.discovery'
data[host.hostname][key].append(check_list)
cluster_list = {"{#HDPCLUSTERNAME}": ("%s" % cluster.name)}
key = 'hadoop.cm.cluster.discovery'
data[mgmt_hostname][key].append(cluster_list)
mgmt_service = self.cdh_api.get_cloudera_manager().get_service()
service_list = {
'{#HDPCLUSTERNAME}': ("%s" % cluster.name),
'{#HDPSERVICENAME}': ("%s" % mgmt_service.type.lower())
}
key = 'hadoop.cm.service.discovery'
data[mgmt_hostname][key].append(service_list)
for check in mgmt_service.healthChecks:
check_list = {
'{#HDPCLUSTERNAME}': ("%s" % cluster.name),
'{#HDPSERVICENAME}': ("%s" % mgmt_service.type.lower()),
'{#HDPSERVICECHECKNAME}': ("%s" % check['name'].lower())
}
key = 'hadoop.cm.service.check.discovery'
data[mgmt_hostname][key].append(check_list)
return data
def _get_metrics(self):
mgmt_hostname = self.hostname
data = {}
for cluster in self.cdh_api.get_all_clusters():
if not mgmt_hostname in data:
data[mgmt_hostname] = {}
key = "hadoop.cm.cluster[%s,version]"
data[mgmt_hostname][(key % cluster.name)] = cluster.version
key = "hadoop.cm.cluster[%s,maintenanceMode]"
data[mgmt_hostname][(key % cluster.name)] = self.CM_BOOLEAN_MAPPING[str(cluster.maintenanceMode)]
for instance in cluster.list_hosts():
host = self.cdh_api.get_host(instance.hostId)
if not host.hostname in data:
data[host.hostname] = {}
key = "hadoop.cm.host[%s,maintenanceMode]"
data[host.hostname][(key % cluster.name)] = self.CM_BOOLEAN_MAPPING[str(host.maintenanceMode)]
key = "hadoop.cm.host[%s,healthSummary]"
data[host.hostname][(key % cluster.name)] = self.CM_HEALTH_MAPPING[str(host.healthSummary)]
key = "hadoop.cm.host[%s,commissionState]"
data[host.hostname][(key % cluster.name)] = self.CM_COMMISSION_MAPPING[str(host.commissionState)]
difference = datetime.now() - host.lastHeartbeat
differenceTotalSeconds = (difference.microseconds + (
difference.seconds + difference.days * 24 * 3600) * 1e6) / 1e6
key = "hadoop.cm.host[%s,lastHeartbeat]"
data[host.hostname][(key % cluster.name)] = differenceTotalSeconds
''' Only works with Python 2.7
differenceTotalSeconds = (datetime.now() - host.lastHeartbeat).total_seconds()
data[host.hostname][("hadoop.cm.host[%s,lastHeartbeat]" % cluster.name)] = differenceTotalSeconds'''
for check in host.healthChecks:
key = "hadoop.cm.host.check[%s,%s]"
data[host.hostname][(key % (cluster.name, check['name'].lower()))] = self.CM_HEALTH_MAPPING[
check['summary']]
for service in cluster.get_all_services(view="full"):
key = "hadoop.cm.service[%s,%s,serviceState]"
data[mgmt_hostname][(key % (cluster.name, service.type.lower()))] = self.CM_SERVICE_MAPPING[
service.serviceState]
key = "hadoop.cm.service[%s,%s,healthSummary]"
data[mgmt_hostname][(key % (cluster.name, service.type.lower()))] = self.CM_HEALTH_MAPPING[
service.healthSummary]
key = "hadoop.cm.service[%s,%s,configStalenessStatus]"
data[mgmt_hostname][(key % (cluster.name, service.type.lower()))] = self.CM_BOOLEAN_MAPPING[
str(service.configStalenessStatus)]
key = "hadoop.cm.service[%s,%s,maintenanceMode]"
data[mgmt_hostname][(key % (cluster.name, service.type.lower()))] = self.CM_BOOLEAN_MAPPING[
str(service.maintenanceMode)]
for check in service.healthChecks:
key = "hadoop.cm.service.check[%s,%s,%s,checkSummary]"
data[mgmt_hostname][(key % (cluster.name, service.type.lower(), check['name'].lower()))] = \
self.CM_HEALTH_MAPPING[check['summary']]
for role in service.get_all_roles(view="full"):
host = self.cdh_api.get_host(role.hostRef.hostId)
key = "hadoop.cm.role[%s,%s,%s,commissionState]"
data[host.hostname][(key % (cluster.name, service.type.lower(), role.type.lower()))] = \
self.CM_COMMISSION_MAPPING[str(role.commissionState)]
key = "hadoop.cm.role[%s,%s,%s,configStalenessStatus]"
data[host.hostname][(key % (cluster.name, service.type.lower(), role.type.lower()))] = \
self.CM_BOOLEAN_MAPPING[str(role.configStalenessStatus)]
key = "hadoop.cm.role[%s,%s,%s,healthSummary]"
data[host.hostname][(key % (cluster.name, service.type.lower(), role.type.lower()))] = \
self.CM_HEALTH_MAPPING[str(role.healthSummary)]
key = "hadoop.cm.role[%s,%s,%s,maintenanceMode]"
data[host.hostname][(key % (cluster.name, service.type.lower(), role.type.lower()))] = \
self.CM_BOOLEAN_MAPPING[str(role.maintenanceMode)]
key = "hadoop.cm.role[%s,%s,%s,roleState]"
data[host.hostname][(key % (cluster.name, service.type.lower(), role.type.lower()))] = \
self.CM_SERVICE_MAPPING[str(role.roleState)]
for check in role.healthChecks:
key = "hadoop.cm.role.check[%s,%s,%s,%s,checkSummary]"
data[host.hostname][(key % (
cluster.name, service.type.lower(),
role.type.lower(),
check['name'].lower())
)] = self.CM_HEALTH_MAPPING[check['summary']]
mgmt_service = self.cdh_api.get_cloudera_manager().get_service()
key = "hadoop.cm.service[%s,%s,serviceState]"
data[mgmt_hostname][(key % (cluster.name, mgmt_service.type.lower()))] = self.CM_SERVICE_MAPPING[
mgmt_service.serviceState]
key = "hadoop.cm.service[%s,%s,healthSummary]"
data[mgmt_hostname][(key % (cluster.name, mgmt_service.type.lower()))] = self.CM_HEALTH_MAPPING[
mgmt_service.healthSummary]
key = "hadoop.cm.service[%s,%s,configStalenessStatus]"
data[mgmt_hostname][(key % (cluster.name, mgmt_service.type.lower()))] = self.CM_BOOLEAN_MAPPING[
str(mgmt_service.configStalenessStatus)]
key = "hadoop.cm.service[%s,%s,maintenanceMode]"
data[mgmt_hostname][(key % (cluster.name, mgmt_service.type.lower()))] = self.CM_BOOLEAN_MAPPING[
str(mgmt_service.maintenanceMode)]
for check in mgmt_service.healthChecks:
key = "hadoop.cm.service.check[%s,%s,%s,checkSummary]"
data[mgmt_hostname][(key % (cluster.name, mgmt_service.type.lower(), check['name'].lower()))] = \
self.CM_HEALTH_MAPPING[check['summary']]
data[mgmt_hostname]['hadoop.cm.zbx_version'] = self.__version__
return data
if __name__ == '__main__':
ret = CDH().run()
print ret
sys.exit(ret)
5. Zabbix Template模板下载

下载: zbx_cdh_hadoop_templates.zip

订阅
提醒
4 评论
在线反馈
查看全部评论