Kafka的Python客户端封装

  • 分类: Python 数据库
  • 发表日期:2021-07-30 15:13:00
  • 最后修改:2021-07-30 15:13:00

等忙过这段时间再回头补全笔记,暂时只有消费者部分

import json
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

class KafkaTopicNotExistException(Exception):
    def __init__(self, message):
        Exception.__init__(self, message)
        self.message = message

class KfkHelper(KafkaConsumer):
    def __init__(self, gid, nodes, auto_offset_reset, consumer_timeout_ms=30000):
        KafkaConsumer.__init__(self, 
            group_id = gid,
            bootstrap_servers = nodes,
            value_deserializer = lambda m: json.loads(m.decode('utf8')),
            auto_offset_reset = auto_offset_reset,   # latest earliest
            enable_auto_commit = True, 
            # consumer_timeout_ms = consumer_timeout_ms, 
            request_timeout_ms = 1000500, 
            session_timeout_ms = 1000000,
            heartbeat_interval_ms = 5000,
            fetch_min_bytes = 1*1024*1024,
            fetch_max_bytes = 100*1024*1024,
            fetch_max_wait_ms = 500,
        )

    def get(self):
        for msg in self:
            msg = msg.value
            if isinstance(msg, str):
                yield eval(msg)
            else:
                yield msg

    def subscribe_for(self, topics):
        assert type(topics) in (tuple, list)
        self._check_topics(topics)
        self.subscribe(topics=topics)

    def get_partitions(self, topic):
        return self.partitions_for_topic(topic)

    def _check_topics(self, topics):
        for topic in topics:
            partitions = self.get_partitions(topic)
            if not partitions:
                raise KafkaTopicNotExistException(
                        f"topic '{topic}' does not exist")


if __name__ == '__main__':
    pass

 

post
2020年12月26日 19:59 原创
post
2021年2月24日 18:06 原创 草稿

记一次elasticsearch项目的部署

post
2021年3月25日 22:09 原创
post
2021年4月15日 20:59 原创
post
2021年4月18日 16:32 原创
post
2021年6月17日 16:16 原创
post
2021年7月9日 09:40 原创 草稿

针对情报平台的多种 es dsl 测试

post
2021年7月13日 09:36 原创
post
2021年7月30日 12:01 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:18 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 16:09 原创
post
2021年7月30日 16:02 原创
post
2021年7月30日 16:02 原创
post
2021年8月16日 15:28 原创
post
2021年8月16日 20:01
post
2021年8月17日 12:07 原创
post
2021年8月25日 16:11
post
2021年8月31日 15:42 原创
post
2021年10月8日 16:17
post
2021年10月13日 11:43
post
2021年10月21日 15:47 原创
post
2021年10月25日 11:27
post
2021年11月24日 16:45 原创
post
2023年8月18日 20:39 原创 未公开

本博客部署教程

post
2024年6月13日 09:47 原创
post
2024年7月4日 08:54 原创

0 评论

大哥整点话呗~