8000 Faust workers crash due to keyerror from aiokafka · Issue #19 · robinhood/aiokafka · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
Faust workers crash due to keyerror from aiokafka #19
Open
@DhruvaPatil98

Description

@DhruvaPatil98

(Not sure if this should be raised on faust repo. Since the issue and fix can be on aiokafka, raising it here. Let me know if I should move it to faust repo)

Steps to reproduce

When changing the number of faust workers from around 5 to 6-10, while data is being streamed to the topics that agents subscribe to, the workers crash with keyerror

The following script is used for app.py

from faust import App

app = App(
    'app_main',
    broker='kafka://kafka:9094',
    store='rocksdb://',
)

PARTITITONS = 10

event_topic = []
event_table = []
for i in range(20):
    event_topic.append(app.topic(
        f'event_topic_write{i}',
        internal=True,
        partitions=PARTITITONS,
    ))

    event_table.append(app.Table(
        f'event_table{i}',
        partitions=PARTITITONS,
    ))

@app.agent(event_topic[0])
async def event_topic_write(streams):
    async for payload in streams.events():
        print(f'Got data: {payload}')
        event_table[0][payload.key] = payload.value


if __name__ == '__main__':
    app.main()

The following script was used to stream events to the topics:

import json
import random
import string
from kafka import KafkaProducer

producer_instance = KafkaProducer(
    bootstrap_servers=['kafka:9094'],
)

event_topic = []
for i in range(20):
    event_topic.append(f'event_topic_write{i}')


def randomString(stringLength=10):
    """Generate a random string of fixed length """
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(stringLength))


while True:
    key_bytes = bytes(json.dumps(randomString()), encoding='utf-8')
    value_bytes = bytes(json.dumps('test'), encoding='utf-8')

    topic_name = event_topic[random.randint(0, len(event_topic)-1)]

    producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
    producer_instance.flush()
    print(key_bytes)

Expected behavior

The rebalance to finish successfully

Actual behavior

Some workers crash

Full traceback

[2020-02-21 11:04:39,
6D40
177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table1-changelog', partition=0) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table12-changelog', partition=9) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table4-changelog', partition=2) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table3-changelog', partition=6) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table13-changelog', partition=6) at offset 0 
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table6-changelog', partition=4) at offset 0 
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table12-changelog', partition=4) at offset 0 
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table2-changelog', partition=9) at offset 0 
[2020-02-21 11:04:39,179] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table11-changelog', partition=1) at offset 0 
[2020-02-21 11:04:39,179] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table5-changelog', partition=3) at offset 0 
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table9-changelog', partition=1) at offset 0 
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table2-changelog', partition=4) at offset 0 
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table14-changelog', partition=9) at offset 0 
[2020-02-21 11:04:39,182] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table19-changelog', partition=8) at offset 0 
[2020-02-21 11:04:39,186] [9] [DEBUG] <AIOKafkaConnection host=kafka port=9094> Request 357: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='app_main-event_table18-changelog', partitions=[(partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576)]), (topic='app_main-event_table6-changelog', partitions=[(partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table0-changelog', partitions=[(partition=6, offset=133, max_bytes=1048576), (partition=0, offset=33, max_bytes=1048576), (partition=4, offset=20, max_bytes=1048576), (partition=5, offset=26, max_bytes=1048576), (partition=2, offset=35, max_bytes=1048576)]), (topic='app_main-event_table2-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table11-changelog', partitions=[(partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table5-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-event_table7-changelog', partitions=[(partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576)]), (topic='app_main-event_table19-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576)]), (topic='app_main-event_table15-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576)]), (topic='app_main-event_table9-changelog', partitions=[(partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576)]), (topic='app_main-event_table1-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576)]), (topic='app_main-event_table10-changelog', partitions=[(partition=3, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576)]), (topic='app_main-event_table8-changelog', partitions=[(partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table13-changelog', partitions=[(partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table17-changelog', partitions=[(partition=1, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576)]), (topic='app_main-event_table4-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576)]), (topic='app_main-event_table3-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table16-changelog', partitions=[(partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table12-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-event_table14-changelog', partitions=[(partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)])]) 
[2020-02-21 11:04:39,190] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=8) 
[2020-02-21 11:04:39,190] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=7) 
[2020-02-21 11:04:39,192] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=9) 
[2020-02-21 11:04:39,192] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=9) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=3) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=3) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=3) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=9) 
[2020-02-21 11:04:39,194] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=7) 
[2020-02-21 11:04:39,194] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=8) 
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=3) 
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=1) 
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=7) 
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=1) 
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=7) 
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=7) 
[2020-02-21 11:04:39,198] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=8) 
[2020-02-21 11:04:39,198] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=8) 
[2020-02-21 11:04:39,199] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=9) 
[2020-02-21 11:04:39,199] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=8) 
[2020-02-21 11:04:39,202] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=7) 
[2020-02-21 11:04:39,202] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=8) 
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=3) 
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=1) 
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=7) 
[2020-02-21 11:04:39,204] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=7) 
[2020-02-21 11:04:39,204] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=9) 
[2020-02-21 11:04:39,205] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=3) 
[2020-02-21 11:04:39,205] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=1) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=3) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=9) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=8) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=8) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=3) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=8) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=7) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=7) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=9) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=1) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=1) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=3) 
[2020-02-21 11:04:39,208] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=1) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=9) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=1) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=9) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=1) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=3) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=8) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=1) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=3) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=1) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=9) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=3) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=7) 
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=1) 
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=8) 
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=1) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=3) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=7) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=8) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=8) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=1) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=3) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=8) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=7) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=8) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=9) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=7) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=8) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=9) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=7) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=8) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=7) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=7) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=8) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=8) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=9) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=3) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=1) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=7) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=9) 
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=7) 
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=1) 
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=9) 
[2020-02-21 11:04:39,220] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=3) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=7) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=3) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=9) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=1) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=3) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=3) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=1) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=9) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=9) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=9) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=3) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=1) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=1) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=9) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=8) 
[2020-02-21 11:04:39,229] [9] [ERROR] [^--Consumer]: Drain messages raised: KeyError(TopicPartition(topic='app_main-event_table0-changelog', partition=9),) 
Traceback (most recent call last):
  File "/application/faust_mod/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/application/faust_mod/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/application/faust_mod/transport/consumer.py", line 678, in _wait_next_records
    timeout=timeout,
  File "/application/faust_mod/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 810, in getmany
    max_records=_consumer._max_poll_records,
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 827, in _fetch_records
    max_records=max_records,
  File "/usr/local/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 1082, in fetched_records
    res_or_error = self._records[tp]
KeyError: TopicPartition(topic='app_main-event_table0-changelog', partition=9)
[2020-02-21 11:04:39,263] [9] [DEBUG] Timer Recovery.stats woke up - iteration=70 time_spent_sleeping=5.114945699984673 drift=-0.11494569998467341 new_interval=4.9 since_epoch=355.7561254000175 
[2020-02-21 11:04:39,263] [9] [ERROR] [^---Fetcher]: Crashed reason=KeyError(TopicPartition(topic='app_main-event_table0-changelog', partition=9),) 
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/application/faust_mod/transport/consumer.py", line 176, in _fetcher
    await self._drainer
  File "/application/faust_mod/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/application/faust_mod/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/application/faust_mod/transport/consumer.py", line 678, in _wait_next_records
    timeout=timeout,
  File "/application/faust_mod/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 810, in getmany
    max_records=_consumer._max_poll_records,
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 827, in _fetch_records
    max_records=max_records,
  File "/usr/local/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 1082, in fetched_records
    res_or_error = self._records[tp]
KeyError: TopicPartition(topic='app_main-event_table0-changelog', partition=9)

Versions

  • Python version - 3.6
  • Faust version - 1.10.1 to 1.10.3 and master with corresponding versions of robinhood aiokafka
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0