Another "speed up the load" question from a relatively inexperienced Neo4j user

I am trying to load up a fairly large set of data. My input file is fairly straightforward but big. The data is all cypher commands.

MERGE (:typeImportOnDemandDeclaration {nodeSeq:4,name:'importjava.io.*;',compileunit:'webgoat.combined.source',type:'typeImportOnDemandDeclaration'});

later in the program are the node connections

MATCH (a:ProgNode),(b:ProgNode) WITH a,b WHERE a.nodeSeq = 4 AND b.nodeSeq = 5 MERGE (a)-[r:Program{compileunit:'webgoat.combined.source', source:'webgoat.combined.source'}]->(b);

All of these are located in a single file coming in from multiple sources. When I wrote the original upload, I was fine with a few thousand nodes. But we just got a file with 100M and its a bit slow. I realize I was not doing it efficiently, so I needed to batch things up. That sounded easy. It has NOT been and the answers given all over the internet are creating more confusion.

To start, I cannot go back and rewrite for CSV for a variety of reasons. So unless someone can come up with a compelling CSV reason, thats out. It has to be some variant of the code below where the line variable is actually a complete cypher statement, as above. the "for line in FI:" loops over the 100m cypher lines. Label is not the same on each line. It varies.

This version used a single embedded string ( I know, clumsy) but none of my other variants had any better luck. The "payload" statement is the big one.

       **batch_statement = """
   UNWIND {batch} as row**
    MERGE (n:Label {row.id})**
    (ON CREATE) SET n += row.properties
   """
**
    payload = '{batch: ['
    maxcount = 4
    with graphDB_Driver.session() as graphDB_Session:
        start_time = time.time()    
        print("Starting Node load @ %s\n" % time.asctime())
        # Create nodes
        tx = graphDB_Session.begin_transaction()
        for line in FI:
            counter +=1
            if counter >= startrow:
                if (counter % maxcount) == 0:
                   print(counter)
                   payload = payload + payloadstring + "]" + batch_statement
# payload is the string I need to run. 

                   tx.run(payload)
                   tx.commit()
                   print("     line %s was reached" % counter)

                   payload = '{batch: ['
                   time.sleep(3)

                   
                firstword = line.split()[0]    
                if firstword == "MATCH" and matchflag == False:
                    print("  Created %s nodes\n" % counter)
                    print("  Beginning links @ %s\n" % str(time.asctime()))
                    matchflag = True
                elif firstword == "CREATE" and createflag == False:
                    print("  Beginning Node Creation\n")
                    createflag = True
                elif firstword == "//" and postflag == False:
                    print("  %s  @ %s\n" % (line[:-2], str(time.asctime())))
                    postflag = True
                else:
                    print("  %s  @ %s - unknown \n" % (line[:-2], str(time.asctime())))
                 
                if firstword != "//":
                   # break down the cypher into a key and a data 
                    splitstart = line.find("{")
                    splitstop = line.find("}")
                    indexstring = "{id:'"+line[7:splitstart-1].strip()+"',"
                    payloadstring = indexstring + " properties:"+line[splitstart:splitstop]+"}"
                    
                    payload = payload + payloadstring + ","

        FO.close()    

This seems basically easy to do but its beating me. 

Thanks

This bit of this query can be rewritten to get you an advantage. Don't do this:

MATCH (a:ProgNode),(b:ProgNode) WITH a,b WHERE a.nodeSeq = 4 AND b.nodeSeq = 5

Do this:

MATCH (a:ProgNode { nodeSeq: 4 })
WITH a
MATCH (b:ProgNode { nodeSeq: 5 })
(Merge other stuff here)

For big imports, make sure those fields are indexed, and that you have plenty of page cache configured in your database, and these changes alone should speed it up quite a bit.
2 Likes

Thank you - that is easy enough to fix so I appreciate the pointer. Any thoughts on the node creation itself. We just ran a test and we are about to bump 200M nodes. Clearly one at a time isn't going to cut it!

Thanks again, I'll have that fixed asap.

For the payload, use parameters submitted to the query. I'd make my cypher query something like:

UNWIND $batch as event
/* Do merge based on a single event */

And then I'd submit an array of objects as the batch parameter to the query. Dont' try to put all of your data into the cypher string

1 Like

David, I'm sorry but you are going to have to be a lot more specific than that one line! lol.

The line "parameters submitted to the query" doesn't make any sense in this case. There are 120 Million lines with 86 attributes ! Each line is a unique combination. There are no duplicate nodes unless it gets rerun.

I have tried every version of the UNWIND and its not working. The programs that generate the cypher have all been distributed so either I post process (which is what this program is doing) or I call up all 200 companies and make them rewrite.

Can you look at it again, and see if there is a better example. Again, starting from Cypher should be easy, not harder !

Hello @bill_dickenson,

Can we see the CSV format? (some rows as example)
Do you have one CSV for nodes and one for relationships?

Regards,
Cobra

There is none. I would have to go back and recreate it or extract it from the cypher.

What is the data format in entry?

The data is generated by a bunch of different client programs, some I know of, some I do not. There are 200 clients, each has already done the preprocessing to get it into a cypher format.

You have the output from their programs in the sample. So if I have to write something to untranslate, I will.

They all follow this syntax? MERGE (:typeImportOnDemandDeclaration {nodeSeq:4,name:'importjava.io.*;',compileunit:'webgoat.combined.source',type:'typeImportOnDemandDeclaration'});

I assume you can't directly send requests to your Neo4j database.
The cleanest way would be to translate all these requests into a CSV.
If you are able to make it, I can propose several queries to load nodes and relationships from CSV.

Yes, although there are probably 86 more variables in that list. I cut it after type. So whatever technique we use, we can expand to the other.

"send the request to Neo4j" ? From where ? From each of their sites ? Have all 200 log on ? No.

Ok. If I must generate CSV, I will. It will have to be something not comma delimited since the comma is used all over the place.

I advice you to make a CSV for the nodes and one for the relationships. When you will have them, you will can adapt these queries:

  • To load nodes:
USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///nodes.csv" AS row
MERGE (p:ProgNode{nodeSeq: row.nodeSeq})
SET p += row
  • To load relationships:
USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///relationships.csv" AS row
MATCH (a:ProgNode{nodeSeq: row.start})
MATCH (b:ProgNode{nodeSeq: row.end})
CALL apoc.create.relationship(a, row.RELATIONSHIP, {}, b) YIELD rel
RETURN rel

Moreover don't forget to use UNIQUE CONSTRAINTS on nodeSeq if they are unique of course, it will speed up a lot your query :slight_smile:

I hope it will help you :slight_smile:

Ok - not a good answer but if its CSV or nothing, I guess its CSV. I do consider the inability to bulk update using your own language to be a heck of a miss that is unexpected.

I'll repost once they finish. I had the canned code from up above but thank you for posting it here as well.

Be back.

Yeah sorry, but I would like to know the person who got the "GOOD IDEA" to send you Cypher requests from 200 clients, it should have been classic communication format like JSON or CSV.

The option to translate into CSV is the best way for you since the quantity of data you have, it should load everything in a few seconds or minutes depends of your database :slight_smile:

Don't hesitate to ask if you need anything else.

Regards,
Cobra

Me. Also the person who WILL make the recommendation on Neo4j or your COMPETITION. lol. So if your implication is that cypher isn't strong enough, we agree. But I applaud your candor.

I agree that in hindsight, json would have been better. CSV is old school and very hard to control for realworld application. For what we needed, it would have been (and may likely still be) impossible. We did go with simple. That was my mistake. I won't make that mistake again with Neo4j.

Ok - so now we have this format.

Nodes - PSV (pipe separated, in this case a double pipe) with headers...

ProgNode||nodeSeq||name||compileunit||type||itd||szAEP||szAFPF||quViolations||quVioDensity||quChange||location||level||szlocs||eieo
compilationUnit||0||'DisplayIncomplete:importjava.net.*;importjava.io.*;importjava.nio.channels.*;importjava.util.Properties;publ'||'webgoat.combined.source'||'compilationUnit'||'data:Writes'||25||''||2||2.0||False||0||'0'||1||False
typeImportOnDemandDeclaration>>1||'importjava.net.*;'||'webgoat.combined.source'||'typeImportOnDemandDeclaration'||'data:Reads'||1||''||0||0.0||False||[16, 0, 16, 17]||'code'||2||True

and relations

'a'||'b'||'aunit'||'bunit'
0||1||'webgoat.combined.source'||'webgoat.combined.source'
1||2||'webgoat.combined.source'||'webgoat.combined.source'

This will be called from a python program so when you respond, can you fill out enough so it could be used that way ?

Thanks

To be honest I'm not working for Neo4j :smile:, but I have always find a way to do what I want to do, in some cases you still will have to do some Python treatment. Just don't forget that Cypher is not a programming language like Python, Cypher is lile SQL :smile:

The easy way will be to load directly the CSV file :slight_smile:

Have a look here to configure correctly your database that will allow it to access your CSV files.

You can directly change the code I give you above, in your case, is ProgNode the common Label or for example compilationUnit will be a Label?

Good ! I was on the original DB2 product team. ( I am old)

Freelance by any chance ?

        if output == 'cypher':
            neostmt = "MERGE (:ProgNode:%s {nodeSeq:%s,name:'%s',compileunit:'%s',type:'%s',kdm:'%s',szAEP:%s,szAFPF:'%s',quViolations:%s,quVioDensity:%s,quChange:%s,location:%s,level:'%s',szlocs:%s,eieo:%s});\n"
            neoout = neostmt % (nodetype,inode,istring,COMPILEUNIT,nodetype,kdm,szaep,szAFPF,quviolations,quVioDensity,changed, location,level, szlocs, eieo)
        else:
            neostmt = "%s||%s||'%s'||'%s'||'%s'||'%s'||%s||'%s'||%s||%s||%s||%s||'%s'||%s||%s\n"
            neoout = neostmt % (nodetype,inode,istring,COMPILEUNIT,nodetype,kdm,szaep,szAFPF,quviolations,quVioDensity,changed, location,level, szlocs, eieo)

No, the node label should be nodetype and ProgNode, the inode is the unique identifier.

Thanks

I'm so young :see_no_evil:, and no, I'm working for a startup but we are opened to consulting :slight_smile:

So you must create batches of data now:

BATCH = {'batch': []}


def reset_batch():
    """
    Function to reset the batch.
    """
    BATCH["batch"] = []


def merge_relation(args):
    """
    Function to create relations from a batch.
    """
    if len(BATCH['batch']) > 1000:
        with graphDB_Driver.session() as ses:
            ses.run("UNWIND $batch AS row MATCH (a:ProgNode{inode:row.a}) MATCH (b:ProgNode{inode:row.b}) CALL apoc.merge.relationship(a, 'PROGRAM', {}, apoc.map.removeKeys(properties(row), ['a', 'b']), b) YIELD rel RETURN 1", batch=BATCH["batch"])
        reset_batch()
    BATCH['batch'].append(args.to_dict())


def merge_node(args):
    """
    Function to create nodes from a batch.
    """
    if len(BATCH['batch']) > 1000:
        with graphDB_Driver.session() as ses:
            ses.run("UNWIND $batch AS row CALL apoc.merge.node(['ProgNode', row.nodetype], {inode:row.inode}, apoc.map.removeKeys(properties(row), ['nodetype', 'inode'])) YIELD node RETURN 1", batch=BATCH["batch"])
        reset_batch()
    BATCH['batch'].append(args.to_dict())


nodes = pd.read_csv(filepath_or_buffer='nodes.csv', header=[0], sep='||', encoding='utf-8')
relations = pd.read_csv(filepath_or_buffer='relations.csv', header=[0], sep='||', encoding='utf-8')

nodes.apply(lambda h: merge_node(h), axis=1)
reset_batch()
relations.apply(lambda h: merge_relation(h), axis=1)

Don't forget to add the UNIQUE CONSTRAINTS:

CREATE CONSTRAINT constraint_inode ON (p:ProgNode) ASSERT p.inode IS UNIQUE

You also need to install APOC plugin on your database.

Documentation:

I'm not sure if the code is working correclty but the idea is here :slight_smile: I hope it will help you :slight_smile:

Regards,
Cobra

drop me a note at Bill.dickenson@veriprism.llc and lets talk non disclosure and rates.

Yes, I get the point on this one. I'll go ahead and make a run at it and see what happens.

And I would never have gotten there from the directions. Thank you

Thank you

1 Like

Need a plan b. CSV is probably not going to work. I have tried :;,|~` and all of the doubles, and all of the combnations (e.g. || or |;) and its not making it very far. JSON would be my next choice as Pandas doesn't work well under python with the double delimiter and no single will do.

I'll rewrite in json but some help would be good.