Open
Description
(Not sure if this should be raised on faust repo. Since the issue and fix can be on aiokafka, raising it here. Let me know if I should move it to faust repo)
Steps to reproduce
When changing the number of faust workers from around 5 to 6-10, while data is being streamed to the topics that agents subscribe to, the workers crash with keyerror
The following script is used for app.py
from faust import App
app = App(
'app_main',
broker='kafka://kafka:9094',
store='rocksdb://',
)
PARTITITONS = 10
event_topic = []
event_table = []
for i in range(20):
event_topic.append(app.topic(
f'event_topic_write{i}',
internal=True,
partitions=PARTITITONS,
))
event_table.append(app.Table(
f'event_table{i}',
partitions=PARTITITONS,
))
@app.agent(event_topic[0])
async def event_topic_write(streams):
async for payload in streams.events():
print(f'Got data: {payload}')
event_table[0][payload.key] = payload.value
if __name__ == '__main__':
app.main()
The following script was used to stream events to the topics:
import json
import random
import string
from kafka import KafkaProducer
producer_instance = KafkaProducer(
bootstrap_servers=['kafka:9094'],
)
event_topic = []
for i in range(20):
event_topic.append(f'event_topic_write{i}')
def randomString(stringLength=10):
"""Generate a random string of fixed length """
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(stringLength))
while True:
key_bytes = bytes(json.dumps(randomString()), encoding='utf-8')
value_bytes = bytes(json.dumps('test'), encoding='utf-8')
topic_name = event_topic[random.randint(0, len(event_topic)-1)]
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
print(key_bytes)
Expected behavior
The rebalance to finish successfully
Actual behavior
Some workers crash
Full traceback
[2020-02-21 11:04:39,
6D40
177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table1-changelog', partition=0) at offset 0
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table12-changelog', partition=9) at offset 0
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table4-changelog', partition=2) at offset 0
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table3-changelog', partition=6) at offset 0
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table13-changelog', partition=6) at offset 0
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table6-changelog', partition=4) at offset 0
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table12-changelog', partition=4) at offset 0
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table2-changelog', partition=9) at offset 0
[2020-02-21 11:04:39,179] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table11-changelog', partition=1) at offset 0
[2020-02-21 11:04:39,179] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table5-changelog', partition=3) at offset 0
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table9-changelog', partition=1) at offset 0
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table2-changelog', partition=4) at offset 0
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table14-changelog', partition=9) at offset 0
[2020-02-21 11:04:39,182] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table19-changelog', partition=8) at offset 0
[2020-02-21 11:04:39,186] [9] [DEBUG] <AIOKafkaConnection host=kafka port=9094> Request 357: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='app_main-event_table18-changelog', partitions=[(partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576)]), (topic='app_main-event_table6-changelog', partitions=[(partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table0-changelog', partitions=[(partition=6, offset=133, max_bytes=1048576), (partition=0, offset=33, max_bytes=1048576), (partition=4, offset=20, max_bytes=1048576), (partition=5, offset=26, max_bytes=1048576), (partition=2, offset=35, max_bytes=1048576)]), (topic='app_main-event_table2-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table11-changelog', partitions=[(partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table5-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-event_table7-changelog', partitions=[(partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576)]), (topic='app_main-event_table19-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576)]), (topic='app_main-event_table15-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576)]), (topic='app_main-event_table9-changelog', partitions=[(partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576)]), (topic='app_main-event_table1-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576)]), (topic='app_main-event_table10-changelog', partitions=[(partition=3, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576)]), (topic='app_main-event_table8-changelog', partitions=[(partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table13-changelog', partitions=[(partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table17-changelog', partitions=[(partition=1, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576)]), (topic='app_main-event_table4-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576)]), (topic='app_main-event_table3-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table16-changelog', partitions=[(partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table12-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-event_table14-changelog', partitions=[(partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)])])
[2020-02-21 11:04:39,190] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=8)
[2020-02-21 11:04:39,190] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=7)
[2020-02-21 11:04:39,192] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=9)
[2020-02-21 11:04:39,192] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=9)
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=3)
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=3)
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=3)
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=9)
[2020-02-21 11:04:39,194] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=7)
[2020-02-21 11:04:39,194] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=8)
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=3)
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=1)
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=7)
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=1)
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=7)
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=7)
[2020-02-21 11:04:39,198] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=8)
[2020-02-21 11:04:39,198] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=8)
[2020-02-21 11:04:39,199] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=9)
[2020-02-21 11:04:39,199] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=8)
[2020-02-21 11:04:39,202] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=7)
[2020-02-21 11:04:39,202] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=8)
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=3)
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=1)
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=7)
[2020-02-21 11:04:39,204] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=7)
[2020-02-21 11:04:39,204] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=9)
[2020-02-21 11:04:39,205] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=3)
[2020-02-21 11:04:39,205] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=1)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=3)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=9)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=8)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=8)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=3)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=8)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=7)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=7)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=9)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=1)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=1)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=3)
[2020-02-21 11:04:39,208] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=1)
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=9)
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=1)
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=9)
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=1)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=3)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=8)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=1)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=3)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=1)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=9)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=3)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=7)
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=1)
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=8)
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=1)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=3)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=7)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=8)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=8)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=1)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=3)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=8)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=7)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=8)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=9)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=7)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=8)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=9)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=7)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=8)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=7)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=7)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=8)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=8)
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=9)
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=3)
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=1)
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=7)
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=9)
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=7)
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=1)
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=9)
[2020-02-21 11:04:39,220] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=3)
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=7)
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=3)
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=9)
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=1)
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=3)
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=3)
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=1)
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=9)
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=9)
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=9)
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=3)
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=1)
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=1)
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=9)
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=8)
[2020-02-21 11:04:39,229] [9] [ERROR] [^--Consumer]: Drain messages raised: KeyError(TopicPartition(topic='app_main-event_table0-changelog', partition=9),)
Traceback (most recent call last):
File "/application/faust_mod/transport/consumer.py", line 1039, in _drain_messages
async for tp, message in ait:
File "/application/faust_mod/transport/consumer.py", line 640, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/application/faust_mod/transport/consumer.py", line 678, in _wait_next_records
timeout=timeout,
File "/application/faust_mod/transport/consumer.py", line 1269, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/application/faust_mod/transport/drivers/aiokafka.py", line 810, in getmany
max_records=_consumer._max_poll_records,
File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/application/faust_mod/transport/drivers/aiokafka.py", line 827, in _fetch_records
max_records=max_records,
File "/usr/local/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 1082, in fetched_records
res_or_error = self._records[tp]
KeyError: TopicPartition(topic='app_main-event_table0-changelog', partition=9)
[2020-02-21 11:04:39,263] [9] [DEBUG] Timer Recovery.stats woke up - iteration=70 time_spent_sleeping=5.114945699984673 drift=-0.11494569998467341 new_interval=4.9 since_epoch=355.7561254000175
[2020-02-21 11:04:39,263] [9] [ERROR] [^---Fetcher]: Crashed reason=KeyError(TopicPartition(topic='app_main-event_table0-changelog', partition=9),)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 779, in _execute_task
await task
File "/application/faust_mod/transport/consumer.py", line 176, in _fetcher
await self._drainer
File "/application/faust_mod/transport/consumer.py", line 1039, in _drain_messages
async for tp, message in ait:
File "/application/faust_mod/transport/consumer.py", line 640, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/application/faust_mod/transport/consumer.py", line 678, in _wait_next_records
timeout=timeout,
File "/application/faust_mod/transport/consumer.py", line 1269, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/application/faust_mod/transport/drivers/aiokafka.py", line 810, in getmany
max_records=_consumer._max_poll_records,
File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/application/faust_mod/transport/drivers/aiokafka.py", line 827, in _fetch_records
max_records=max_records,
File "/usr/local/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 1082, in fetched_records
res_or_error = self._records[tp]
KeyError: TopicPartition(topic='app_main-event_table0-changelog', partition=9)
Versions
- Python version - 3.6
- Faust version - 1.10.1 to 1.10.3 and master with corresponding versions of robinhood aiokafka
- Operating system
- Kafka version
- RocksDB version (if applicable)
Metadata
Metadata
Assignees
Labels
No labels