等忙过这段时间再回头补全笔记,暂时只有消费者部分
import json
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
class KafkaTopicNotExistException(Exception):
def __init__(self, message):
Exception.__init__(self, message)
self.message = message
class KfkHelper(KafkaConsumer):
def __init__(self, gid, nodes, auto_offset_reset, consumer_timeout_ms=30000):
KafkaConsumer.__init__(self,
group_id = gid,
bootstrap_servers = nodes,
value_deserializer = lambda m: json.loads(m.decode('utf8')),
auto_offset_reset = auto_offset_reset, # latest earliest
enable_auto_commit = True,
# consumer_timeout_ms = consumer_timeout_ms,
request_timeout_ms = 1000500,
session_timeout_ms = 1000000,
heartbeat_interval_ms = 5000,
fetch_min_bytes = 1*1024*1024,
fetch_max_bytes = 100*1024*1024,
fetch_max_wait_ms = 500,
)
def get(self):
for msg in self:
msg = msg.value
if isinstance(msg, str):
yield eval(msg)
else:
yield msg
def subscribe_for(self, topics):
assert type(topics) in (tuple, list)
self._check_topics(topics)
self.subscribe(topics=topics)
def get_partitions(self, topic):
return self.partitions_for_topic(topic)
def _check_topics(self, topics):
for topic in topics:
partitions = self.get_partitions(topic)
if not partitions:
raise KafkaTopicNotExistException(
f"topic '{topic}' does not exist")
if __name__ == '__main__':
pass
0 评论
大哥整点话呗~