<python模块>kafka-python
目录:
KafkaAdminClient在1.4.4版本才开始被支持
$ pip2.7 install kafka-python==1.4.4
list_consumer_group_offset
方法
返回字典的key为kafka.structs.TopicPartition,value为kafka.structs.OffsetAndMetadata
代码中是namedtuple
TopicPartition = namedtuple("TopicPartition",
["topic", "partition"])
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
# TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata)
["offset", "metadata"])
namedtuple的官方源码的描述
def namedtuple(typename, field_names, verbose=False, rename=False):
"""Returns a new subclass of tuple with named fields.
>>> Point = namedtuple('Point', ['x', 'y'])
>>> Point.__doc__ # docstring for the new class
'Point(x, y)'
>>> p = Point(11, y=22) # instantiate with positional args or keywords
>>> p[0] + p[1] # indexable like a plain tuple
33
>>> x, y = p # unpack like a regular tuple
>>> x, y
(11, 22)
>>> p.x + p.y # fields also accessible by name
33
>>> d = p._asdict() # convert to a dictionary
>>> d['x']
11
>>> Point(**d) # convert from a dictionary
Point(x=11, y=22)
>>> p._replace(x=100) # _replace() is like str.replace() but targets named fields
Point(x=100, y=22)
"""
编写了一个kafka_export用于上报kafka数据
#!/bin/env python
# -*- coding:utf-8 -*-
__author__ = 'why'
from kafka import KafkaAdminClient, SimpleClient, KafkaConsumer
from kafka.common import OffsetRequestPayload, TopicPartition
import time
import sys
import urllib2
import base64
import json
import socket
brokers = ["192.168.21.70:9094", "192.168.21.80:9094", "192.168.21.106:9094"]
group_id = "elk"
ts = int(time.time())
p = []
client = SimpleClient(brokers)
consumer = KafkaConsumer(bootstrap_servers=brokers, group_id=group_id,)
def get_topic_producter_offset(topic):
"""
获取一个topic的offset值的和
:param brokers:
:param topic:
:return:
"""
partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
offsets_responses = client.send_offset_request(offset_requests)
return sum([r.offsets[0] for r in offsets_responses])
def get_topic_consumer_offset(topic):
"""
获取一个topic特定group已经消费的offset值的和
:param brokers:
:param group_id:
:param topic:
:return:
"""
pts = [TopicPartition(topic=topic, partition=i) for i in consumer.partitions_for_topic(topic)]
result = consumer._coordinator.fetch_committed_offsets(pts)
return sum([r.offset for r in result.values()])
def get_topic_logsize_dic(brokers, group_id):
"""
获取kafka所有topic状态字典
:param brokers:
:param group_id:
:return:
"""
kafka_admin_client = KafkaAdminClient(bootstrap_servers=brokers)
topic_logsize_dic = {}
t = kafka_admin_client.list_consumer_group_offsets(group_id=group_id)
for k, v in t.items():
#print k._asdict()['topic'], v._asdict()['offset'], v._asdict()['metadata']
topic = k._asdict()['topic']
topic_producter_offset = get_topic_producter_offset(topic)
topic_consumer_offset = get_topic_consumer_offset(topic)
topic_lag = topic_producter_offset - topic_consumer_offset
pro_dic = ({
"endpoint": "test_kafka",
"metric": "kafka.topic_producter_offset",
"timestamp": ts,
"step": 60,
"value": topic_producter_offset,
"counterType": "COUNTER",
"tags": "topic_name=%s" % topic,
})
con_dic = ({
"endpoint": "test_kafka",
"metric": "kafka.topic_consumer_offset",
"timestamp": ts,
"step": 60,
"value": topic_consumer_offset,
"counterType": "COUNTER",
"tags": "topic_name=%s" % topic,
})
lag_dic = ({
"endpoint": "test_kafka",
"metric": "kafka.topic_lag",
"timestamp": ts,
"step": 60,
"value": topic_lag,
"counterType": "GAUGE",
"tags": "topic_name=%s" % topic,
})
p.append(pro_dic)
p.append(con_dic)
p.append(lag_dic)
kafka_admin_client.close()
client.close()
consumer.close()
def postdata2falcon(data):
"""
向falcon进行post数据
:param data:
:return:
"""
method = "POST"
handler = urllib2.HTTPHandler()
opener = urllib2.build_opener(handler)
url = 'http://127.0.0.1:1988/v1/push'
request = urllib2.Request(url, data=json.dumps(data))
request.add_header("Content-Type", 'application/json')
request.get_method = lambda: method
try:
connection = opener.open(request)
except urllib2.HTTPError, e:
connection = e
# 检查HTTP返回码
if connection.code == 200:
print connection.read()
else:
print '{"err":1,"msg":"%s"}' % connection
if __name__ == "__main__":
get_topic_logsize_dic(brokers, group_id)
postdata2falcon(p)