<python模块>kafka-python

时间:May 6, 2019 分类:

目录:

KafkaAdminClient在1.4.4版本才开始被支持

$ pip2.7 install kafka-python==1.4.4

官方文档

list_consumer_group_offset方法

返回字典的key为kafka.structs.TopicPartition,value为kafka.structs.OffsetAndMetadata

代码中是namedtuple

TopicPartition = namedtuple("TopicPartition",
    ["topic", "partition"])
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
    # TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata)
    ["offset", "metadata"])

namedtuple的官方源码的描述

def namedtuple(typename, field_names, verbose=False, rename=False):
    """Returns a new subclass of tuple with named fields.

    >>> Point = namedtuple('Point', ['x', 'y'])
    >>> Point.__doc__                   # docstring for the new class
    'Point(x, y)'
    >>> p = Point(11, y=22)             # instantiate with positional args or keywords
    >>> p[0] + p[1]                     # indexable like a plain tuple
    33
    >>> x, y = p                        # unpack like a regular tuple
    >>> x, y
    (11, 22)
    >>> p.x + p.y                       # fields also accessible by name
    33
    >>> d = p._asdict()                 # convert to a dictionary
    >>> d['x']
    11
    >>> Point(**d)                      # convert from a dictionary
    Point(x=11, y=22)
    >>> p._replace(x=100)               # _replace() is like str.replace() but targets named fields
    Point(x=100, y=22)

    """

编写了一个kafka_export用于上报kafka数据

#!/bin/env python
# -*- coding:utf-8 -*-

__author__ = 'why'

from kafka import KafkaAdminClient, SimpleClient, KafkaConsumer
from kafka.common import OffsetRequestPayload, TopicPartition

import time
import sys
import urllib2
import base64
import json
import socket

brokers = ["192.168.21.70:9094", "192.168.21.80:9094", "192.168.21.106:9094"]
group_id = "elk"
ts = int(time.time())
p = []


client = SimpleClient(brokers)
consumer = KafkaConsumer(bootstrap_servers=brokers, group_id=group_id,)


def get_topic_producter_offset(topic):
  """
    获取一个topic的offset值的和
  :param brokers:
  :param topic:
  :return:
  """
  partitions = client.topic_partitions[topic]
  offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
  offsets_responses = client.send_offset_request(offset_requests)
  return sum([r.offsets[0] for r in offsets_responses])


def get_topic_consumer_offset(topic):
  """
    获取一个topic特定group已经消费的offset值的和
  :param brokers:
  :param group_id:
  :param topic:
  :return:
  """
  pts = [TopicPartition(topic=topic, partition=i) for i in consumer.partitions_for_topic(topic)]
  result = consumer._coordinator.fetch_committed_offsets(pts)
  return sum([r.offset for r in result.values()])


def get_topic_logsize_dic(brokers, group_id):
  """
    获取kafka所有topic状态字典
  :param brokers:
  :param group_id:
  :return:
  """

  kafka_admin_client = KafkaAdminClient(bootstrap_servers=brokers)
  topic_logsize_dic = {}
  t = kafka_admin_client.list_consumer_group_offsets(group_id=group_id)
  for k, v in t.items():
    #print  k._asdict()['topic'], v._asdict()['offset'], v._asdict()['metadata']
    topic = k._asdict()['topic']
    topic_producter_offset = get_topic_producter_offset(topic)
    topic_consumer_offset = get_topic_consumer_offset(topic)
    topic_lag = topic_producter_offset - topic_consumer_offset
    pro_dic = ({
                 "endpoint": "test_kafka",
                 "metric": "kafka.topic_producter_offset",
                 "timestamp": ts,
                 "step": 60,
                 "value": topic_producter_offset,
                 "counterType": "COUNTER",
                 "tags": "topic_name=%s" % topic,
              })
    con_dic = ({
                 "endpoint": "test_kafka",
                 "metric": "kafka.topic_consumer_offset",
                 "timestamp": ts,
                 "step": 60,
                 "value": topic_consumer_offset,
                 "counterType": "COUNTER",
                 "tags": "topic_name=%s" % topic,
              })
    lag_dic = ({
                 "endpoint": "test_kafka",
                 "metric": "kafka.topic_lag",
                 "timestamp": ts,
                 "step": 60,
                 "value": topic_lag,
                 "counterType": "GAUGE",
                 "tags": "topic_name=%s" % topic,
              })
    p.append(pro_dic)
    p.append(con_dic)
    p.append(lag_dic)
  kafka_admin_client.close()
  client.close()
  consumer.close()


def postdata2falcon(data):
  """
    向falcon进行post数据
  :param data:
  :return:
  """

  method = "POST"
  handler = urllib2.HTTPHandler()
  opener = urllib2.build_opener(handler)
  url = 'http://127.0.0.1:1988/v1/push'
  request = urllib2.Request(url, data=json.dumps(data))
  request.add_header("Content-Type", 'application/json')
  request.get_method = lambda: method
  try:
    connection = opener.open(request)
  except urllib2.HTTPError, e:
    connection = e
  # 检查HTTP返回码
  if connection.code == 200:
    print connection.read()
  else:
    print '{"err":1,"msg":"%s"}' % connection


if __name__ == "__main__":
  get_topic_logsize_dic(brokers, group_id)
  postdata2falcon(p)