Hi I have the following code running on the same box as a neo4j database (to rule out network issues)
the box has 12 cores and 60 GiG memory ::
public class Repository {
Driver driverSupplier;
SessionConfig sessionConfig = SessionConfig.builder().withDatabase(DB_NAME)
.withDefaultAccessMode(AccessMode.WRITE)
.build();
public Repository(@Autowired @Qualifier("pulseNeo4jDriver") Driver driverSupplier){
this.driverSupplier = driverSupplier;
}
private Supplier<RxSession> getRxSession() {
return () -> driverSupplier.rxSession(sessionConfig);
}
public Mono<ResultSummary> save(String query) {
return Mono.usingWhen(
Mono.fromSupplier(getRxSession()),
session -> Mono.from(session.beginTransaction())
.flatMap(tx -> Mono.from(tx.run(query).consume())
.flatMap(summary -> Mono.from(tx.commit()).thenReturn(summary))
.onErrorResume(error -> Mono.from(tx.rollback())
.then(Mono.error(error)))
),
RxSession::close
)
.publishOn(Schedulers.boundedElastic())
.doOnNext(e -> log.info("pushing data doOnNext " + e))
.doOnSuccess(resultSummary -> log.info(resultSummary.toString()))
.onErrorResume(e -> {
log.error("Error occurred: ", e);
return Mono.error(e);
});
}
public Mono<ResultSummary> saveNoCommit(String query) {
return Mono.usingWhen(
Mono.fromSupplier(getRxSession()),
session -> Mono.from(session.writeTransaction(tx -> tx.run(query).consume())),
RxSession::close
)
.publishOn(Schedulers.boundedElastic())
.doOnNext(e -> log.info("pushing data doOnNext " + e))
.doOnSuccess(resultSummary -> log.info(resultSummary.toString()))
.onErrorResume(e -> {
log.error("Error occurred: ", e);
return Mono.error(e);
});
}
}
the class recivices kafka messages and writes to the neo4j db using an rxSession. it's has a very hi volume of messages to process
at the moment using the saveNoCommit(String query) method i'm only processing around 600 messages a minute but nothing is committed
even though i'm using writeTransaction (ok no biggie). when using the save(String query) method this drops to around 30 messages a minute!!
I need the system to process a few 1000 a minute is this possible ?
i'm pooling sessions like so :
Config config = Config.builder()
.withMaxConnectionLifetime(30, TimeUnit.MINUTES)
.withConnectionAcquisitionTimeout(5, TimeUnit.MINUTES)
.withConnectionTimeout(15, TimeUnit.MINUTES)
.withMaxConnectionPoolSize(100)
.withLeakedSessionsLogging().build();
and changes in neo4j.conf is :
#*****************************************************************
# Neo4j configuration
#
# For more details and a complete list of settings, please see
# https://neo4j.com/docs/operations-manual/current/reference/configuration-settings/
#*****************************************************************
# The name of the default database
initial.dbms.default_database=pulse
# Paths of directories in the installation.
dbms.security.auth_enabled=false
#********************************************************************
# Memory Settings
#********************************************************************
server.memory.heap.initial_size=20G
server.memory.heap.max_size=20G
db.memory.transaction.max=5G
#*****************************************************************
# Network connector configuration
server.bolt.enabled=true
server.http.enabled=true
# Number of Neo4j worker threads.
server.threads.worker_count=10
dbms.security.procedures.unrestricted=*
# set lenientcy for chypher.
dbms.cypher.lenient_create_relationship = true
#********************************************************************
# JVM Parameters
#********************************************************************
# G1GC generally strikes a good balance between throughput and tail
# latency, without too much tuning.
server.jvm.additional=-XX:+UseG1GC
server.jvm.additional=-XX:-OmitStackTraceInFastThrow
server.jvm.additional=-XX:+AlwaysPreTouch
server.jvm.additional=-XX:+UnlockExperimentalVMOptions
server.jvm.additional=-XX:+TrustFinalNonStaticFields
server.jvm.additional=-XX:+DisableExplicitGC
server.jvm.additional=-Djdk.nio.maxCachedBufferSize=1024
# More efficient buffer allocation in Netty by allowing direct no cleaner buffers.
server.jvm.additional=-Dio.netty.tryReflectionSetAccessible=true
server.jvm.additional=-Djdk.tls.ephemeralDHKeySize=2048
# This mitigates a DDoS vector.
server.jvm.additional=-Djdk.tls.rejectClientInitiatedRenegotiation=true
# Open modules for neo4j to allow internal access
server.jvm.additional=--add-opens=java.base/java.nio=ALL-UNNAMED
server.jvm.additional=--add-opens=java.base/java.io=ALL-UNNAMED
server.jvm.additional=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
db.tx_state.memory_allocation=OFF_HEAP
Version: 5.15.0
Edition: Community
why am i getting such bad performance ?
org.neo4j.driver.exceptions.ClientException: Unable to acquire connection from the pool within configured maximum time of 300000ms
at org.neo4j.driver.internal.async.pool.ConnectionPoolImpl.processAcquisitionError(ConnectionPoolImpl.java:222) ~[pulse-common-0.0.1-SNAPSHOT.jar!/:na]
at org.neo4j.driver.internal.async.pool.ConnectionPoolImpl.lambda$acquire$0(ConnectionPoolImpl.java:130) ~[pulse-common-0.0.1-SNAPSHOT.jar!/:na]