Rapid Prototyping to split tuple values in multiple streams

stefan
Posts: 85
Joined: Tue Jul 12, 2016 1:03 pm

Rapid Prototyping to split tuple values in multiple streams

Post by stefan » Tue Nov 01, 2016 10:41 pm

Hello,

I try to get out data of a Neo4j graph database. At the moment I am using the REST API of Neo4j and the WSEnrich Operator the get the required data. For each tuple I get a JSON-document as response from Neo4j that is stored in an attribute of this tuple. This JSON includes several paths through the subgraph. To work with this data I want to split this result into individual paths and this paths again into the nodes and edges.

I tried the option KEYVALUEOUTPUT of the WSEnrich Operator but I dont know how to do this split operation to generate new streams that look like this:

Path1, Node1, ...Attributes
Path1, Node2, ...Attributes
Path1, Node3, ...Attributes
Path2, Node1, ...Attributes
Path2, Node2, ...Attributes
etc.

and

Path1, Edge1, ...Attributes
Path1, Edge2, ...Attributes
Path1, Edge3, ...Attributes
Path2, Edge1, ...Attributes
Path2, Edge2, ...Attributes

The first idea was to write a user defined function in Python, but I've got some problems:
1. I wrote a very simple test in python that should set the attribute2 to 123:

#!/usr/bin/python
$attr2 = 123

I also tried:

#!/usr/bin/python
attr2 = 123

and use it like this:

#PARSER PQL
#RUNQUERY
srcMobilityRequests ::= ACCESS({
source='srcMobilityRequests',
wrapper='GenericPull',
transport='File',
protocol='csv',
dataHandler='Tuple',
options=[
['delimiter',';'],
['textDelimiter',"'"],
['readfirstline','false'],
['delay','3000'],
['filename', 'C:\Sourcen\csv\MobilityRequestInitial2_DEU.csv']
],
Schema=[
['eventTypeIdentifier','String'],
['mobilityRequestId','String'],
['version','Integer'],
['userId','String'],
['mobilityRequestType','String'],
['departureLocation','String'],
['destinationLocation','String'],
['departureTimeFull','String'],
['arrivalTimeFull','String'],
['departureTimeTs','Timestamp'],
['arrivalTimeTs','Timestamp'],
['departureDate','Integer'],
['departureWeekday','String'],
['departureTime','Integer']
]
})

#RUNQUERY
output = SCRIPT({path='C:\GIT\Odysseus\prototyping\script.py'}, srcMobilityRequests)

I know that the script file is read correctly. If I change the file name or path I get an error that the file/path does not exist. But the output is empty. I do not get an error message and there seems to be no error in the logfile (extended logging with trace is enabled).

My questions are:
1. What I am doing wrong? I could not find additional information or examples. Do I have to use the variables in a different way?
2. Is it possible to create more than one output stream in this way?
3. What different approach exist?

Thank you in advance!

Stefan

User avatar
Marco Grawunder
Posts: 272
Joined: Tue Jul 29, 2014 10:29 am
Location: Oldenburg, Germany
Contact:

Re: Rapid Prototyping to split tuple values in multiple streams

Post by Marco Grawunder » Wed Nov 02, 2016 9:15 am

Hi Stefan,

can you give an example, what the output of the WSEnrich operator is?

If I get it right, you want to retrieve some values from the attribute, that contains the result of the Neo4J query? This is currently not possible, but should be no big deal.

Update: I created a MAP function that retrieves subvalues from a KeyValueObject

get(<KVAttribute>,<pathToValue>)

Due to missing sample input, I could not test it. The <pathToValue> is with dot-notation: a.b.c

Maybe you could test it.

The rapid prototype feature seems not to work really good anymore. I guess, this is no longer an option to use :-/

Greetings,

Marco

stefan
Posts: 85
Joined: Tue Jul 12, 2016 1:03 pm

Re: Rapid Prototyping to split tuple values in multiple streams

Post by stefan » Thu Nov 03, 2016 12:01 pm

Hi Marco,

ok, sure. The user can fill in some information, e. g. where s/he wants to start, where to go and at which time. This information is sent to Odysseus as a new tuple in an stream. The response I get from the WSEnrich Operator is a JSON like the following. I had to provide it online because of the limitation of the maximum number of allowed characters. ;)

https://dl.dropboxusercontent.com/u/280 ... erator.txt

This includes mainly this information:
- several paths that start from a point A to a point Z (so, multiple possible ways)
- each path contains the nodes and edges for the corresponding path

At the moment I add this full JSON as one attribute to the corresponding request of the user. To compute on this information I actually need a new stream. Therefore I need to split up this information. I need a new tuple for each node and each edge. This means I want to generate a new stream for these nodes from that one attribute:
<path1, node1, ...attributes...>
<path1, node2, ...attributes...>
<path1, node3, ...attributes...>
<path2, node1, ...attributes...>
<path2, node2, ...attributes...>

...and I also want to do this for each edge (relation):
<path1, edge1, ...attributes...>
<path1, edge2, ...attributes...>
<path2, edge1, ...attributes...>

I understand that I can get the values by the dot-Notation but how can I create several tuples for a new stream out of this one attribute.

I had a look in the documentation but I still dont know that. Sorry, maybe I just dont know for what I have to look for...

Thank you very much!
Stefan

User avatar
Marco Grawunder
Posts: 272
Joined: Tue Jul 29, 2014 10:29 am
Location: Oldenburg, Germany
Contact:

Re: Rapid Prototyping to split tuple values in multiple streams

Post by Marco Grawunder » Thu Nov 03, 2016 1:31 pm

Hi Stefan,

ok, I see the problem.

It is easy to get from each input tuple with the JSON object another tuple with elements from that tuple, but you want some kind of unnesting, i.e., multipe tuples for one input. This would meen query processing over the JSON object.

Can you somehow express (XPath-like) what do you want to get? Ich could image a Mep-Funktion that transforms the JSON-Object into a list of tuples and then using the unnest operator. But at the moment it is not clear to me, how this operation will be configured?

Greetings,

Marco

stefan
Posts: 85
Joined: Tue Jul 12, 2016 1:03 pm

Re: Rapid Prototyping to split tuple values in multiple streams

Post by stefan » Thu Nov 03, 2016 2:43 pm

Hi Marco,

ok, with the current JSON (as you can see above) it would look like the following description. I wrote these statements in JSONPath so you can use my JSON file and evaluate it easily at http://jsonpath.com/.

Code: Select all

--- first path ---
$.results[0].data[0].row[0].[0] --> first NODE in first path
$.results[0].data[0].meta[0].[0] --> meta data (node or relation) of first node in first path

$.results[0].data[0].row[0].[1] --> first EDGE in first path
$.results[0].data[0].meta[0].[1] --> meta data (node or relation) of first edge in first path

$.results[0].data[0].row[0].[2] --> second NODE in first path
$.results[0].data[0].meta[0].[2] --> meta data (node or relation) of second node in first path

--- second path ---
$.results[0].data[1].row[0].[0]

etc.
This means:

Code: Select all

$.results[0].data[i].row[0].[j] --> the j-th element in the i-th path
But it would no big deal to change the query that I get the nodes and edges separately instead of the path. This would result in:

Code: Select all

$.results[0].data[i].row[0].[j] --> The j-th node in the i-th path
$.results[0].data[i].row[1].[j].platform  --> The j-th edge in the i-th path
If I can provide additional information, just let me know!

greetings,
Stefan

User avatar
Marco Grawunder
Posts: 272
Joined: Tue Jul 29, 2014 10:29 am
Location: Oldenburg, Germany
Contact:

Re: Rapid Prototyping to split tuple values in multiple streams

Post by Marco Grawunder » Thu Nov 03, 2016 4:42 pm

Ok. I got the problem ... have to think about it ;-)

Greetings,

Marco

stefan
Posts: 85
Joined: Tue Jul 12, 2016 1:03 pm

Re: Rapid Prototyping to split tuple values in multiple streams

Post by stefan » Thu Nov 03, 2016 8:20 pm

Ok, thanks.
If there is a handy solution in Odysseus, it would be great. But if there is no good solution at the moment, just let me know. I could simulate an external system (a small java program or python script) that splits this data and send it back to Odysseus. Thats not my preferred way but would solve the problem without putting much effort in it on Odysseus-side.

Greetings,
Stefan

User avatar
Marco Grawunder
Posts: 272
Joined: Tue Jul 29, 2014 10:29 am
Location: Oldenburg, Germany
Contact:

Re: Rapid Prototyping to split tuple values in multiple streams

Post by Marco Grawunder » Fri Nov 04, 2016 2:58 pm

No, this is a good way to redesign our key value handling ;-)

But this takes some time. I am not sure if this will be available in the next two week. So if you have an urgent problem, your workaround may be better.

Greetings,

Marco

stefan
Posts: 85
Joined: Tue Jul 12, 2016 1:03 pm

Re: Rapid Prototyping to split tuple values in multiple streams

Post by stefan » Fri Nov 04, 2016 5:23 pm

Ok, thanks for your response. I will start with my workaround - I need that functionality at the moment. But as soon as the functionality is available - let me know. Maybe I can test and use it in a later version of my prototype.

greetings,
Stefan

stefan
Posts: 85
Joined: Tue Jul 12, 2016 1:03 pm

Re: Rapid Prototyping to split tuple values in multiple streams

Post by stefan » Sun Nov 06, 2016 1:46 am

Hi Marco,

I thought about using a message queue and python to do this job.
I saw the transport handler RabbitMQ in the Odysseus documentation (https://wiki.odysseus.offis.uni-oldenbu ... Id=7111519) and played around with RabbitMQ and Python. Some simple examples worked well and now I wanted to try to create a Odysseus Source by an ACCESS operator. But I always get the error:

'Parsing and Executing Query' has encountered a problem.
Script Execution Error: transformation failed ; Cannot create transport handler RabbitMQ

If I try to create a sender:
'Parsing and Executing Query' has encountered a problem.
Script Execution Error: transformation failed ; No transport handler RabbitMQ found.

In the output.log I can see:

Code: Select all

37185551 ERROR TransportHandlerRegistry  - No handler with name RabbitMQ found! - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:68) 
37185552 DEBUG TransportHandlerRegistry  - Available handler:  - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:70) 
37185552 DEBUG TransportHandlerRegistry  - tcp - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185552 DEBUG TransportHandlerRegistry  - udpclient - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185552 DEBUG TransportHandlerRegistry  - tcpclient1 - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185553 DEBUG TransportHandlerRegistry  - tcpserver1 - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185553 DEBUG TransportHandlerRegistry  - tcpserver2 - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185553 DEBUG TransportHandlerRegistry  - directory - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185553 DEBUG TransportHandlerRegistry  - tcpclient - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185553 DEBUG TransportHandlerRegistry  - timer - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185554 DEBUG TransportHandlerRegistry  - simpleudpreceive - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185554 DEBUG TransportHandlerRegistry  - file - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185554 DEBUG TransportHandlerRegistry  - spatiotemporal - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185554 DEBUG TransportHandlerRegistry  - planmodificationwatcher - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185554 DEBUG TransportHandlerRegistry  - tcpserver - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185555 DEBUG TransportHandlerRegistry  - udpserver - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
37185555 DEBUG TransportHandlerRegistry  - nonblockingtcp - de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.TransportHandlerRegistry.getInstance(TransportHandlerRegistry.java:72) 
...and there is no RabbitMQ as an available handler listet.

I used the RabbitMQ examples:
https://www.rabbitmq.com/tutorials/tuto ... ython.html

and the Odysseus Examples for a work queue:
https://wiki.odysseus.offis.uni-oldenbu ... Id=7111519

Is RabbitMQ no longer supported? If no, what would be a good alternative? TCP sockets --> nonblockingtcp? Is it required to have different ports for the sink and the source? If I think about a general architecture I would think that Odysseus connects the sink to e. g. 5555 and receives the results at the same port. But I am sure that I read that this is not possible...

Thanks,
Stefan

Post Reply

Who is online

Users browsing this forum: No registered users and 1 guest