Odysseus

This is the official support forum for Odysseus

Communication with Neo4j graph database

User avatar
Marco Grawunder
Posts: 264
Joined: Tue Jul 29, 2014 10:29 am
Location: Oldenburg, Germany
Contact:

Re: Communication with Neo4j graph database

Postby Marco Grawunder » Thu Oct 20, 2016 11:25 am

Hi Stephan,

good to hear.

Regarding the logging problem. When I use the downloaded product, everything works fine ...

Could you just create a bug report, so I can see your environment? Maybe there is something special and I can try to reproduce your environment?

https://wiki.odysseus.offis.uni-oldenburg.de/display/ODYSSEUS/How+to+report+a+bug

Greetings,

Marco

stefan
Posts: 85
Joined: Tue Jul 12, 2016 1:03 pm

Re: Communication with Neo4j graph database

Postby stefan » Tue Dec 13, 2016 10:41 pm

Hi Marco,

I got here another question.
I use the WSEnrich operator to create new data in my Neo4j Database.

My Code looks like this:

Code: Select all

#PARSER PQL

#DROPALLQUERIES
#DROPALLSINKS
#DROPALLSOURCES

#RUNQUERY
/// Read Mobility Offer CSV Data
moCsvSource = ACCESS({
      source='src_moCsvSource',
      wrapper='GenericPull',
      transport='File',
      protocol='csv',
      dataHandler='Tuple',
      options=[
         ['delimiter',';'],
         ['textDelimiter',"'"],
         ['readfirstline','false'],
         ['delay','5000'],
         ['filename', 'C:\SIMPLe\Sourcen\csv\MobilityOffers.csv']
      ],
      Schema=[
         ['eventTypeIdentifier','String'],
         ...
         a lot of attributes
         ...
         ['destinationSwitchTime','Integer']
      ]
   }
)

moCreateNeo4jMo = WSENRICH(
   {
      SERVICEMETHOD = 'REST',
      METHOD = 'POST_DOCUMENT',
      URL = 'http://localhost:7474/db/data/transaction/commit?',
      HEADER = [['Authorization', 'Basic bmVvNGo6bmVvcGFzcw==']],
      CONTENTTYPE = 'application/json',
      TEMPLATE = '...very long statement...',
      ARGUMENTS = [
         ...
         a lot of arguments
         ...
      ],
      PARSINGMETHOD = 'JSONPATH',
      DATAFIELDS = [
         ['*', 'String'],
         ['$.results[0].data[0].row[1]','Integer'],
         ['$.errors', 'String'],
         ['$.errors[0].code', 'String'],
         ['$.errors[0].message', 'String']
      ]
   },
   moCsvSource
)

moCreated = SENDER({
      protocol = 'simplecsv',
      transport = 'file',
      sink = 'sink_moCreated',
      wrapper = 'GenericPush',
      options = [
         ['filename', 'C:\SIMPLe\Sinks\moCreated.csv'],
         ['csv.delimiter', ';'],
         ['csv.writeheading', 'true'],
         ['nullvaluetext', '<NULL>']
      ]
   },
   moCreateNeo4jMo
)


I used a SOAPUI Service mock to track what is sent out by Odysseus. The statement is as it should be. If I execute this statement by hand (via postman) I get the following responses.

If the statement could be executed:

Code: Select all

{
  "results": [
    {
      "columns": [
        "ID(from)",
        "ID(startNode)",
        "ID(endNode)",
        "ID(to)"
      ],
      "data": [
        {
          "row": [
            32,
            63,
            64,
            33
          ],
          "meta": [
            null,
            null,
            null,
            null
          ]
        }
      ]
    }
  ],
  "errors": []
}


In the error case:

Code: Select all

{
  "results": [],
  "errors": [
    {
      "code": "Neo.ClientError.Schema.ConstraintValidationFailed",
      "message": "Node 55 already exists with label Stop and property \"stopUri\"=[http://.../MO90000000101]"
    }
  ]
}


The above code executes the statement, i. e. the new nodes are created in Neo4j - as it should be.

But I want to add a new attribute to the stream that I can see/track whether it was possible to create the new data in Neo4j. If I execute the above code the result of the operator is empty and I get the following console output:

Code: Select all

1835633 WARN  AbstractEnrichPO  - Empty result for input ... | META | 1481653494746|oo - de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractEnrichPO.process_next(AbstractEnrichPO.java:112)


(The ... represents the attributes)

Therefore I tried the following lines one by one:

Code: Select all

         ['*', 'String'],
         ['$.results[0].data[0].row[1]','Integer'],
         ['$.errors', 'String'],
         ['$.errors[0].code', 'String'],
         ['$.errors[0].message', 'String'],
         ['$.errors', 'KeyValueObject']


If I output ['$.errors', 'String'] the WSEnrich Operator shows the correct information if there was an error. If everything was ok, i. e. the JSON Path expression results in an empty result, I dont get a new tuple and the console output:

Code: Select all

1835633 WARN  AbstractEnrichPO  - Empty result for input ... | META | 1481653494746|oo - de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractEnrichPO.process_next(AbstractEnrichPO.java:112)


Furthermore, if there was an error (no empty result), the csv sink produces an error:

Code: Select all

java.lang.ClassCastException: java.lang.StringBuffer cannot be cast to java.lang.String
   at de.uniol.inf.is.odysseus.core.datahandler.StringHandler.writeData(StringHandler.java:108)
   at de.uniol.inf.is.odysseus.core.datahandler.TupleDataHandler.writeData(TupleDataHandler.java:239)
   at de.uniol.inf.is.odysseus.core.datahandler.AbstractStreamObjectDataHandler.writeData(AbstractStreamObjectDataHandler.java:149)
   at de.uniol.inf.is.odysseus.core.datahandler.AbstractStreamObjectDataHandler.writeCSVData(AbstractStreamObjectDataHandler.java:200)
   at de.uniol.inf.is.odysseus.core.physicaloperator.access.protocol.AbstractCSVHandler.write(AbstractCSVHandler.java:146)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.sink.SenderPO.process_next(SenderPO.java:55)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractSink.process(AbstractSink.java:319)
   at de.uniol.inf.is.odysseus.core.physicaloperator.AbstractPhysicalSubscription.sendObject(AbstractPhysicalSubscription.java:90)
   at de.uniol.inf.is.odysseus.core.physicaloperator.ControllablePhysicalSubscription.process_internal(ControllablePhysicalSubscription.java:93)
   at de.uniol.inf.is.odysseus.core.physicaloperator.ControllablePhysicalSubscription.do_process(ControllablePhysicalSubscription.java:81)
   at de.uniol.inf.is.odysseus.core.physicaloperator.AbstractPhysicalSubscription.process(AbstractPhysicalSubscription.java:80)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractSource.transfer(AbstractSource.java:488)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractSource.transfer(AbstractSource.java:478)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractSource.transfer(AbstractSource.java:500)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractEnrichPO.process_next(AbstractEnrichPO.java:108)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractPipe.delegatedProcess(AbstractPipe.java:264)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractPipe.access$6(AbstractPipe.java:263)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractPipe$DelegateSink.process_next(AbstractPipe.java:55)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractSink.process(AbstractSink.java:319)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractPipe.process(AbstractPipe.java:260)
   at de.uniol.inf.is.odysseus.core.physicaloperator.AbstractPhysicalSubscription.sendObject(AbstractPhysicalSubscription.java:90)
   at de.uniol.inf.is.odysseus.core.physicaloperator.ControllablePhysicalSubscription.process_internal(ControllablePhysicalSubscription.java:93)
   at de.uniol.inf.is.odysseus.core.physicaloperator.ControllablePhysicalSubscription.do_process(ControllablePhysicalSubscription.java:81)
   at de.uniol.inf.is.odysseus.core.physicaloperator.AbstractPhysicalSubscription.process(AbstractPhysicalSubscription.java:80)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractSource.transfer(AbstractSource.java:488)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractSource.transfer(AbstractSource.java:478)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.access.push.ReceiverPO.transfer(ReceiverPO.java:122)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.AbstractSource.transfer(AbstractSource.java:500)
   at de.uniol.inf.is.odysseus.core.server.physicaloperator.access.pull.AccessPO.transferNext(AccessPO.java:178)
   at de.uniol.inf.is.odysseus.scheduler.singlethreadscheduler.MultipleSourceExecutor.transfer(MultipleSourceExecutor.java:212)
   at de.uniol.inf.is.odysseus.scheduler.singlethreadscheduler.MultipleSourceExecutor.processSources(MultipleSourceExecutor.java:155)
   at de.uniol.inf.is.odysseus.scheduler.singlethreadscheduler.MultipleSourceExecutor.run(MultipleSourceExecutor.java:63)


I think the reason for that is the KeyValueObject-like representation of the new attribute:

Code: Select all

[, code:Neo.ClientError.Schema.ConstraintValidationFailed, message:Node 77 already exists with label Stop and property \stopUri\=http:\/\/simple.....\/MO9000000016]


This means, I dont get new tuples if there was no error and I can not store the tuple if an error has occured. :)
A workaround would be, that I output just the error code/message of the first error (this would result into a real string) and I join this result to the original stream. Alternatively I could replace the error attribute in a MAP operator by a flag (error = true/flase) and I use the new MAP functionality to create an additional error stream:

Code: Select all

map1 = MAP({EXPRESSIONS = [['path(kv,\'$.errors[*].code\')','codes']]}, tuple)

unnested = UNNEST({ATTRIBUTE = 'codes'}, map1)


The latter is my preferred way at the moment and I will try this tomorrow evening. I just dont think that it is intended to work like this. :) Just let me know if you need additional information.

greetings,
Stefan

User avatar
Marco Grawunder
Posts: 264
Joined: Tue Jul 29, 2014 10:29 am
Location: Oldenburg, Germany
Contact:

Re: Communication with Neo4j graph database

Postby Marco Grawunder » Wed Dec 14, 2016 10:34 am

Hmm. I think, there something else wrong (e.g. an unseen exception)

I added some more logging. Could you please make a run and create a bug report with the output? Please assure, that Logging output from WSEnrichPO is set to TRACE ...
In Project: de.uniol.inf.is.odysseus.slf4j
File: log4j.properties

Greetings,

Marco

stefan
Posts: 85
Joined: Tue Jul 12, 2016 1:03 pm

Re: Communication with Neo4j graph database

Postby stefan » Thu Dec 15, 2016 1:12 pm

Hmm, ok.
Do you mean that I just should enable extended logging (https://wiki.odysseus.offis.uni-oldenburg.de/display/ODYSSEUS/Enable+extended+logging) or can I modify the logging for the WSEnrichPO in a specific way?

Thanks,
Stefan

User avatar
Marco Grawunder
Posts: 264
Joined: Tue Jul 29, 2014 10:29 am
Location: Oldenburg, Germany
Contact:

Re: Communication with Neo4j graph database

Postby Marco Grawunder » Thu Dec 15, 2016 1:46 pm

If you use the downloaded product use the way of the wiki, if you use the source code in eclipse use the file in the given project.

Greetings,

Marco


phpbb 3.1 style demo

Return to “Adapter/Wrapper/Access Framework”

Who is online

Users browsing this forum: No registered users and 1 guest