Neo4j Streams consuming messages with GREAT LAG, and sudden bumps

Hi All,

I have set up a Neo4j server with Kafka Sink/Source integration enabled. The Kafka servers is a standard Managed Kafka Service in AWS (3 brokers), and the Neo4j server is a docker container running in a EC2 Instance, with enough resources to manage it the graph (no lags/delays with everydays queries)


Has it ever happened to you that Neo4j doesn't poll messages for very long time (e.g. like one hour), or polls too few messages at a time, but you can see through other tools that the messages are actually there? It's like it's lagging behind, and then all of the sudden, consumes 1000 messages in a row.

The message volume is pretty much stable during business hours, but here you can appreciatte the gaps and then the sudden catch-ups:

I have recently decreased the max.poll.records value to 1, because I was running into race conditions when consuming the messages. Do you think that could be the cause? I'm certainly lost, since I don't know how to debug the Kafka plugin.

I can see there is a lag for several topics:

Though I don't know why the plugin isn't polling it.

I'm checking the running queries through Halin, and the server looks not busy at all.

Any ideas?

Thank you so much in advance.

Kafka streams version: neo4j-streams-3.5.6
Kafka brokers version: 2.1.0
Neo4j server: 3.5.14

Below is the redacted docker-compose file:

version: '3'
    image: neo4j:3.5
    container_name: neo4j-cs
    network_mode: "host"
    - "9474:9474"
    - "9687:9687"
        soft: "40000"
        hard: "40000"
    - ./cs-data/neo4j_plugins:/plugins
    - /data/neo4j-cs/data:/data
    - /data/neo4j-cs/logs:/logs
    - ./cs-data/neo4j_conf:/conf
      NEO4J_AUTH: neo4j/gsF3SF7VzIIXwo3wi8Yf
      # KAFKA related configuration
      NEO4J_kafka_zookeeper_connect: 'three ZK servers '
      NEO4J_kafka_bootstrap_servers: 'three Kafka brokers'
      NEO4J_kafka_group_id: 'neo4j-cs-consumer'
      NEO4J_kafka_max_poll_records: 1
      NEO4J_kafka_replication: 3
      NEO4J_streams_sink_enabled: 'true'
      NEO4J_streams_source_enabled: 'true'
      NEO4J_streams_sink_errors_tolerance: 'all'
      NEO4J_streams_sink_errors_log_enable: 'true'
      NEO4J_streams_sink_errors_log_include_messages: 'true'
      NEO4J_streams_sink_errors_deadletterqueue_topic_name: 'interkafka4j-dlq-topic'
      NEO4J_streams_sink_errors_deadletterqueue_context_headers_enable: 'true'
      NEO4J_dbms_memory_heap_max__size: 5G
      NEO4J_dbms_memory_heap_initial__size: 5G
      NEO4J_dbms_memory_pagecache_size: 5G
      NEO4J_dbms_connector_bolt_listen__address: ''
      NEO4J_dbms_connector_http_listen__address: ''
      NEO4J_dbms_connector_https_listen__address: ''
      NEO4J_dbms_security_procedures_unrestricted: apoc.*
      NEO4J_apoc_jdbc_moltres_url: 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
      NEO4J_dbms_logs_query_enabled: 'true'
      NEO4J_dbms_logs_query_threshold: '20s'
      NEO4J_dbms_logs_query_rotation_size: '40M'
      NEO4J_dbms_tx__log_rotation_retention__policy: '500M size'
      NEO4J_dbms_transaction_timeout: '1800s'
#      NEO4J_dbms_allow__format__migration: 'true'
      NEO4J_dbms_allow__upgrade: 'true'
      NEO4J_metrics_graphite_enabled: 'true'
      NEO4J_metrics_graphite_interval: '3s'
      NEO4J_metrics_prefix: data.interkafka4j.neo4j_cs
      NEO4J_apoc_initializer_cypher_1a: 'CREATE CONSTRAINT ON (n:Company) ASSERT IS UNIQUE;'
      NEO4J_apoc_initializer_cypher_1b: 'CREATE CONSTRAINT ON (n:User) ASSERT IS UNIQUE;'
      NEO4J_apoc_initializer_cypher_1c: 'CREATE CONSTRAINT ON (n:Admin) ASSERT (, n.app_id) IS NODE KEY;'
      NEO4J_apoc_initializer_cypher_1d: 'CREATE CONSTRAINT ON (n:Lead) ASSERT IS UNIQUE;'
      NEO4J_apoc_initializer_cypher_1e: 'CREATE CONSTRAINT ON (n:Bot) ASSERT (, n.app_id) IS NODE KEY;'
      NEO4J_apoc_initializer_cypher_1f: 'CREATE CONSTRAINT ON (n:ExternalConv) ASSERT (, n.app_id) IS NODE KEY;'
      NEO4J_apoc_initializer_cypher_1g: 'CREATE CONSTRAINT ON (n:NubeConv) ASSERT IS UNIQUE;'
      NEO4J_apoc_initializer_cypher_1h: 'CREATE CONSTRAINT ON (n:Part) ASSERT IS UNIQUE;' 
      NEO4J_apoc_initializer_cypher_1i: 'CREATE CONSTRAINT ON (n:Part) ASSERT (n.external_id, n.app_id) IS NODE KEY;'
      NEO4J_apoc_initializer_cypher_1j: 'CREATE CONSTRAINT ON (n:Store) ASSERT IS UNIQUE;'
      NEO4J_apoc_initializer_cypher_1k: 'CREATE CONSTRAINT ON (n:Tag) ASSERT (, n.app_id) IS NODE KEY;'
      NEO4J_apoc_initializer_cypher_1l: 'CREATE CONSTRAINT ON (n:Country) ASSERT IS UNIQUE;'
      NEO4J_apoc_initializer_cypher_1m: 'CREATE INDEX ON :ExternalConv(created_at);'
      NEO4J_apoc_initializer_cypher_1n: 'CREATE INDEX ON :NubeConv(created_at);'
      NEO4J_apoc_initializer_cypher_1o: 'CREATE INDEX ON :Part(created_at);'
      NEO4J_apoc_initializer_cypher_1p: 'CREATE INDEX ON :Part(external_id);'
      NEO4J_apoc_initializer_cypher_2a: 'MERGE (c:Country {id: "AR"}) with c set c += {timezone: "America/Buenos Aires", app_id: "zqzv6i6r", friendly_name: "🇦🇷 AR"};'
      NEO4J_apoc_initializer_cypher_2b: 'MERGE (c:Country {id: "BR"}) with c set c += {timezone: "America/Sao Paulo", app_id: "45v3k8ch", friendly_name: "🇧🇷 BR"};'

Re-setting the max.poll.records to 50 fixed the lag issue.

BUT, the Race Condition error seems to never have gone. I've created this topic on that: Is it possible to Retry processing a message a after race condition?