Hi all
Hope I can bother and ask for some assistance... Long history in structured databases, so understand relationships etc. Just dipping toes into GraphDB's.
I got an inbound streaming financial dataset and trying to model it out from a large json document into sub sets structured as json messages to go onto various Kafka Topics to be sinked into Neo4J.
<Dumping to CSV files and loading CSV's are not a option>
These will will be defining the Account nodes and Bank nodes and the edges edges between the accounts based on the transactions and the edges between the banks and the accounts...
Idea for Topics:
AccountHolders -> Nodes/Vertices
Banks - Nodes/Vertices
CustomerTransaction -> Edges/Links
CustomerBank -> Edges/Links
Below is some basic "records" examples.
Please advise on "best" proposal for json message structures.
If I can ask... also... for the NeofJ.json sink file. I'm posting a generic version I found on the site. I will take care of the server detail, need assistance with the "sink" lines. was thinking everything except for transactions should be upserts, transactions will be a straight insert.
Nodes/Vertices: AccountHolders 1
AccountNumber: aolfyh-45284520934579845
BankID: aolfyh
FullName: John Doe
...
Nodes/Vertices: AccountHolders 2
AccountNumber: ppkyye-45284503744579845
BankID: ppkyye
FullName: Mary Ann
...
AccountHolders 1 (Subject) -> PaysFunds (Transaction 1): Predicate -> AccountHolders 2 (Object)
AccountHolders 1 (Subject) -> CustomerOf: Predicate -> Bank 1 (Object)
AccountHolders 2 (Subject) -> ReceivesFunds (Transaction 2): Predicate -> AccountHolders 1 (Object)
AccountHolders 2 (Subject) -> CustomerOf: Predicate -> Bank 2 (Object)
Nodes/Vertices: Banks 1
Bank: Silly Pot of Gold
BankID: aolfyh
...
Nodes/Vertices: Banks 2
Bank: Diamond Heist
BankID: aolfyh
...
Bank/AccountHolders: Edges/Links 1
AccountNumber: aolfyh-45284520934579845
Verb: Customer of
BankID: aolfyh
...
Bank/AccountHolders: Edges/Links 2
AccountNumber: ppkyye-45284503744579845
Verb: Customer of
BankID: ppkyye
...
CustomerTransaction: Edges 1
Verb: PaidFundsTo
TransactionID: 341234-56354634-123412341-56745674-2452345
CustomerAccountNumber: aolfyh-45284520934579845
CustomerBankID: aolfyh
CounterPartyAccountNumber: ppkyye-45284503744579845
CounterPartyBankID: ppkyye
TxnDateTime: DD:MM:YYYY-HH:MM:SS
Amount: 100.00
Currency: ZAR
...
*CustomerTransaction: Edges 2
Verb: RecievedFundsFrom
TransactionID: 341234-56354634-123412341-56745674-2452543
CustomerAccountNumber: ppkyye-45284503744579845
CustomerBankID: ppkyye
CounterPartyAccountNumber: aolfyh-45284520934579845
CounterPartyBankID: aolfyh
DateTime: DD:MM:YYYY-HH:MM:SS
Amount: 100.00
Currency: ZAR
...
Neo4JSink.json
I left a gap at the lines I think I need to define sinks for for the above topics...
{
"name": "Neo4jSinkConnectorCypherJSONSchema",
"config": {
"topics": "creates,updates,deletes",
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"key.converter": "io.confluent.connect.json.JsonSchemaConverter",
"key.converter.schemas.enable": true,
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": true,
"value.converter.schema.registry.url": "http://schema-registry:8081",
"neo4j.uri": "neo4j://neo4j:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "dbpassword",
"neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
"neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
"neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
"neo4j.cypher.bind-header-as": "",
"neo4j.cypher.bind-key-as": "",
"neo4j.cypher.bind-value-as": "__value",
"neo4j.cypher.bind-value-as-event": false
}
}