Dear comunity,
when loading a good amount of datasets from a MySql Database into a Neo4J Database (batches of 300 000 entries with connected data) actually load works fine until roughly 4.5 mio connected nodes and then with the next batch fails with the following error (there is 30 more batches of 300000 connected datasets which each results in one or more connected nodes).
Database versions:
MySQL: 5.1.47
Neo4J: 3.5.2
Process as follows:
- loading data from nodes table to create main nodes
- create for each main node a set of connected subnodes from nodes_attributes table
- commit each 1000 datasets,
Logging:
129000:Committing and Reopening Neo4J-Transaction.
....................................................................................................
130000:Committing and Reopening Neo4J-Transaction.
....................................................................................................
131000:Committing and Reopening Neo4J-Transaction.
....................................................................................................
132000:Committing and Reopening Neo4J-Transaction.
.................................registerShutdownHook....about to close Graph DB
java.lang.Exception
at eu.whitewolf2000.mysql.MySQLDataLoader$1.run(MySQLDataLoader.java:252)
java.lang.Exception
type
ENTITY
Rolling back Neo4J-Transaction.
This database is shutdown. - 132325
org.neo4j.graphdb.DatabaseShutdownException: This database is shutdown.
at org.neo4j.kernel.impl.core.ThreadToStatementContextBridge.assertInUnterminatedTransaction(ThreadToStatementContextBridge.java:96)
at org.neo4j.kernel.impl.core.ThreadToStatementContextBridge.getKernelTransactionBoundToThisThread(ThreadToStatementContextBridge.java:87)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.createNode(GraphDatabaseFacade.java:260)
at eu.whitewolf2000.mysql.MySQLDataLoader.loadData(MySQLDataLoader.java:102)
at eu.whitewolf2000.mysql.MySQLDataLoader.main(MySQLDataLoader.java:403)
Closing Neo4J-Transaction.
Tue Nov 19 05:19:49 CET 2019
Graph database closed.
The following Method is invoked by the calling main method:
MySQLDataLoader.java (class):
package eu.tester.sql (dummy)
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.Properties;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.MultipleFoundException;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
...
private void loadData(String tableName, String idColumn, String nameColumn, String labelColumn) {
System.out.println("loadData");
System.out.println(con);
// String query = "SELECT * from " + tableName + " LIMIT 100";
String query = "SELECT * from " + tableName;
System.out.println(query);
String pst_query = "select * from node_properties where node_id = ?";
String pstc3query = "select * from country_codes where country_code = ?";
String pstc2query = "select * from country_codes where country_code_iso2 = ?";
// int i = 0;
String labelName = null;
int j = 0;
Transaction tx = null;
try (Statement st = con.createStatement();
// System.out.println("1");
ResultSet rs = st.executeQuery(query);
// System.out.println("1");
PreparedStatement pstNodeAttribs = con.prepareStatement(pst_query);
PreparedStatement pstc3 = con.prepareStatement(pstc3query);
PreparedStatement pstc2 = con.prepareStatement(pstc2query);
){
tx = graphDB.beginTx();
// System.out.println("1");
ResultSetMetaData rsmd = rs.getMetaData();
while (rs.next()) {
// System.out.println("2");
// TODO Knoten anlegen
String value = rs.getString(idColumn);
//String labelName = rs.getString(labelColumn);
labelName = rs.getString(labelColumn);
if (getNode(idColumn, value, labelName) != null) {
continue;
}
Label l = Label.label(labelName);
Node node = graphDB.createNode(l);
node.setProperty("source", rs.getString("source"));
node.setProperty("processingDate", rs.getString("processing_date"));
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
String columnName = rsmd.getColumnName(i);
String columnValue = rs.getString(columnName);
if (columnValue == null || columnValue.trim().isEmpty()) {
continue;
}
// Eigenschaften anlegen
String propertyKey = columnName.equals(nameColumn) ? "name"
: columnName.equals(idColumn) ? "node_id" : columnName;
node.setProperty(propertyKey, columnValue);// einkommentieren
// if (j%10000==0) {
// System.out.println(columnName + ":" + columnValue + ", propertyKey=" +
// propertyKey);
// System.out.println(j+".");
// }
// if (j % 1000 == 0) {
// System.out.println(j+".");
// System.out.print(".");
// }
}
// if (j%10000==0) {
// System.out.println(columnName + ":" + columnValue + ", propertyKey=" +
// propertyKey);
// System.out.println(j+".");
// }
if (j % 1000 == 0 && j!=0) {
System.out.println();
System.out.print(j + ":");
System.out.println("Committing and Reopening Neo4J-Transaction.");
tx.success();
tx.close();
tx = graphDB.beginTx();
}
if (j % 10 == 0) {
System.out.print(".");
}
++j;
pstNodeAttribs.setString(1, rs.getString("node_id"));
ResultSet rsNodeAttribs = pstNodeAttribs.executeQuery();
// Node attribNode = graphDB.createNode(Label.label("LabelAttrib"));
// attribNode.setProperty("source", rs.getString("source"));
// attribNode.setProperty("processingDate", rs.getString("processing_date"));
while (rsNodeAttribs.next()) {
Node attribNode = graphDB.createNode(Label.label("ATTRIBUTE"));
Node attribNode1 = null;
if (rsNodeAttribs.getString("s_key") == "country_codes"
|| rsNodeAttribs.getString("s_key") == "jurisdiction"
|| rsNodeAttribs.getString("s_key") == "jurisdiction_2") {
// lookup für countries machen mit prepstatment und dann gib ihm
// setzte
if (rsNodeAttribs.getString("s_key") == "country_codes"
|| rsNodeAttribs.getString("s_key") == "jurisdiction") {
pstc3.setString(1, rsNodeAttribs.getString("s_key"));
ResultSet rs3 = pstc3.executeQuery();
while (rs3.next()) {
// Node mit country key
attribNode.setProperty(rsNodeAttribs.getString("name"),
rsNodeAttribs.getString("s_value"));
attribNode.setProperty(rsNodeAttribs.getString("type"),
rsNodeAttribs.getString("s_key"));
attribNode.setProperty(rsNodeAttribs.getString("country"), rs3.getString("country"));
// Node mit country name
attribNode1 = graphDB.createNode(Label.label("ATTRIBUTE"));
attribNode1.setProperty("source", rs.getString("source"));
attribNode1.setProperty("processingDate", rs.getString("processing_date"));
attribNode1.setProperty("name", rs3.getString("country"));
}
}
if (rsNodeAttribs.getString("s_key") == "jurisdiction_2") {
pstc2.setString(1, rsNodeAttribs.getString("s_key"));
ResultSet rs2 = pstc2.executeQuery();
while (rs2.next()) {
// Node mit country key
attribNode.setProperty(rsNodeAttribs.getString("name"),
rsNodeAttribs.getString("s_value"));
attribNode.setProperty(rsNodeAttribs.getString("type"),
rsNodeAttribs.getString("s_key"));
attribNode.setProperty(rsNodeAttribs.getString("country"), rs2.getString("country"));
// Node mit country name
attribNode1 = graphDB.createNode(Label.label("ATTRIBUTE"));
attribNode1.setProperty("source", rs.getString("source"));
attribNode1.setProperty("processingDate", rs.getString("processing_date"));
attribNode1.setProperty("name", rs2.getString("country"));
}
}
} else {
attribNode.setProperty("source", rs.getString("source"));
attribNode.setProperty("processingDate", rs.getString("processing_date"));
// attribNode.setProperty(rsNodeAttribs.getString("name"),
// rsNodeAttribs.getString("s_value"));
// attribNode.setProperty(rsNodeAttribs.getString("type"),
// rsNodeAttribs.getString("s_key"));
attribNode.setProperty("name", rsNodeAttribs.getString("s_value"));
attribNode.setProperty("type", rsNodeAttribs.getString("s_key"));
}
if (attribNode != null) {
Relationship in1Path = node.createRelationshipTo(attribNode,
RelationshipType.withName("Attributes"));
in1Path.setProperty("source", rs.getString("source"));
}
if (attribNode1 != null) {
Relationship in2Path = node.createRelationshipTo(attribNode1,
RelationshipType.withName("Attributes"));
in2Path.setProperty("source", rs.getString("source"));
}
}
// System.out.println();
}
System.out.println("Nach der While-Loop: "+j);
System.out.println("Committing Neo4J-Transaction.");
tx.success();
} catch (Exception ex) {
if (tx != null) {
System.out.println(labelColumn);
System.out.println(labelName);
//System.out.println(rs.getString(labelColumn));
System.out.println("Rolling back Neo4J-Transaction.");
tx.failure();
}
System.out.println(ex.getMessage()+" - "+j);
ex.printStackTrace(System.out);
} finally {
if (tx != null) {
System.out.println("Closing Neo4J-Transaction.");
tx.close();
}
}
}
...
public static void main(String[] args) throws SQLException {
// TODO Auto-generated method stub
System.out.println(new Date());
String url = null;
String user = null;
String password = null;
try {
InputStream input = new FileInputStream("./config.properties");
Properties prop = new Properties();
prop.load(input);
url = prop.getProperty("url");
user = prop.getProperty("user");
password = prop.getProperty("password");
if (url==null || user==null || password== null) {
throw new IOException();
}
}catch(IOException e) {
System.out.println("config.properties missing or incomplete!");
System.exit(0);
}
if (args != null && args.length > 0) {
for (int i = 0; i < args.length; i++) {
String string = args[i];
System.out.println(i + ": " + string);
}
MySQLDataLoader loader = new MySQLDataLoader();
try {
System.out.println(new Date());
loader.initialize("test2graphDB");
// System.out.println(new Date());
loader.buildConnection(url, user, password);
// System.out.println(new Date());
if (args[0].equalsIgnoreCase("0")){
if (args[1]!=null) {
System.out.println(args[1]);
loader.loadData(args[1], "node_id", "name", "type");
System.out.println(new Date());
}
}
if (args[0].equalsIgnoreCase("1")){
if (args[1]!=null) {
System.out.println(args[1]);
loader.createEdges(args[1]);
System.out.println(new Date());
}
}
// System.out.println(new Date());
// loader.createEdges("relationships");
// System.out.println(new Date());
loader.closeConnection();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
statistics of data already loaded:
+----------+-------+
| count(*) | 300K |
+----------+-------+
| 563543 | 300K |
| 566324 | 600K |
| 352020 | 900K |
| 600036 | 1200K |
| 2391745 | 1500K |
| 4473668 | ALL |
+----------+-------+
6 rows in set (7.96 sec)
I do a commit on the neo4J every 1000 entries as shown in line 131.
Anyone, any idea how to do further tracing where the error is originated from?
I wouldn't consider this already big data.
Thank you very much in advance.
Kind regards,
MalteR