Working with JSON

harpreet
Posts: 7
Joined: Tue Sep 16, 2014 6:27 pm

Working with JSON

Post by harpreet » Tue Sep 16, 2014 6:42 pm

Hi All,

I am new to Odysseus and also new to CEP in general.
I have been trying to achieve simple JSON data aggregation using Odysseus but no luck. I use OpenSuse linux 12.3.

I have a json file /root/odysseus_workspace/sample.json with following content
{"firstName":"John", "lastName":"Doe", "age":"25"},
{"firstName":"Rocky", "lastName":"Geller", "age":"28"}

I have tried to read this data by using PQL and CQL but haven't been able to achieve it.
Below are the scripts that I have written:

Code: Select all

#PARSER CQL
#RUNQUERY
CREATE STREAM Employee (firstName String, lastName String, age Integer) 
    WRAPPER 'GenericPull'
    PROTOCOL 'JSON'
    TRANSPORT 'File'
    DATAHANDLER 'Tuple'
    OPTIONS ( 'filename' '/root/odysseus_workspace/sample.json')


#PARSER PQL
#RUNQUERY
oneEms:jsonTest::= ACCESS({
    source='oneEms:jsonTest',
    wrapper='GenericPull',
    transport='File',
    protocol='JSON',
    dataHandler='KeyValueObject', 
    options=[['filename','/root/odysseus_workspace/sample.json']],
    schema=[
        ['firstName', 'STRING'],
        ['lastName', 'STRING'],
        ['age', 'INTEGER']
        ]
})
I have tried combinations of "Tuple" and "keyValueObject" data handlers but none seems to work.
Though I don't get any errors while executing these queries. But also I don't get to see the data.
After I execute these queries, no new source is added in the "Source" perspective of Odysseus Studio but these queries are then available under "Queries" perspective. And from there when I:
Right click on query name>Show Stream Elements > List show last 10 elements (No data at all)
Right click on query name>Show Stream Elements > Table show last 10 elements (Only headings available)

Can someone please tell me what am I missing? I would be really thankful if someone can give me a good example of reading and displaying data from JSON.

Thanks,
Harpreet

User avatar
ckuka
Posts: 35
Joined: Tue Jul 29, 2014 10:55 am
Location: Shanghai, P.R. China/中国上海
Contact:

Re: Working with JSON

Post by ckuka » Wed Sep 17, 2014 2:34 am

Hi Harpreet,

First of all, the JSON protocol handler in this configuration expects a JSON document on each line in the file resulting in a parser exception because of the "," at the end of the line. If you remove that one it should be fine, like:

Code: Select all

{"firstName":"John", "lastName":"Doe", "age":"25"}
{"firstName":"Rocky", "lastName":"Geller", "age":"28"}
The next thing is the source configuration. You define a schema, however, the key value object has no schema (it's in the nature of key value objects that you can write whatever you want into a key value object). Anyway, if you prefer to work with a schema, you can transform a key value object into a relational tuple object as shown in the following source configuration:

Code: Select all

#PARSER PQL
#RUNQUERY

mysource := ACCESS({
    source='sourceName',
    wrapper='GenericPull',
    transport='File',
    protocol='JSON',
    dataHandler='KeyValueObject', 
    options=[['filename','/root/odysseus_workspace/sample.json']]})
    
#PARSER PQL
#ADDQUERY
data = KEYVALUETOTUPLE({schema=[['firstName', 'String'], ['lastName', 'String'], ['age', 'Integer']],
                                  type='sourceName',
                                  keepinput='false'},mysource)
Here are two queries. The first one just registers a source (I called it 'mysource'). This source should also be visible in the source list window after execution and produces key value objects.
The second query transforms the key value object from the first query into a relational tuple object with a schema using the KEYVALUETOTUPLE operator[1]. To get a chance to see the output, I also set the "ADDQUERY" directive so that the query is added to the list of queries but not started. Thus, you can right click on query in the list to display the results and then start the query.
Also keep in mind, that you can also perform filtering on the key value objects as shown in [2] and do not need to transform them into a relational tuple first.

[1] http://odysseus.offis.uni-oldenburg.de: ... e+operator
[2] http://odysseus.offis.uni-oldenburg.de: ... ue+Feature
kind regards / 祝好
Christian Kuka

harpreet
Posts: 7
Joined: Tue Sep 16, 2014 6:27 pm

Re: Working with JSON

Post by harpreet » Wed Sep 17, 2014 8:46 am

Thanks a lot ckuka. That was very helpful.
Now I can see that the json data that is read from the file or from a TCP port. Now I would like to perform some aggregation over that data. As you mentioned that "keyValueObject" handler is not supposed to have any schema, I followed your advice and I used the KEYVALUETOTUPLE function to define the schema. But how do I use some field from that schema in my aggregation function?

This is what I have now (stream data from TCP):

Code: Select all

#PARSER PQL
#RUNQUERY
///Your first pql-query here

json:test ::= ACCESS({source='json:test',
    wrapper='GenericPush',
    transport='TCPClient',
    protocol='JSON',
    dataHandler='KeyValueObject',  
    options=[
        ['host', '10.206.88.10'],
        ['port', '5000']
        ]
    })
    
#PARSER PQL
#ADDQUERY
jsonData = KEYVALUETOTUPLE({schema=[['firstName', 'String'], ['lastName', 'String'], ['age', 'Integer']],
                                  type='nexmark:test',
                                  keepinput='false'}, json:test)
                                                     

Now to aggregate :

Code: Select all

#PARSER PQL
#RUNQUERY
///Your first pql-query here

out = AGGREGATE({
          aggregations=[
            ['COUNT', 'age', 'COUNT_age']
          ]               
        },
        jsonData
      )
SENDERtest = SENDER({
    transport='File',
    wrapper='GenericPush',
    protocol='CSV',
    dataHandler='Tuple',
    SINK="SENDERtest",
    options=[
        ['filename','/harpreet/odysseus_workspace/test.txt'],
        ['json.write.metadata','true'],
        ['json.write.starttimestamp','metadata.starttimestamp'],
        ['json.write.endtimestamp','metadata.endtimestamp']
    ]}, out)

Now I am getting error
de.uniol.inf.is.odysseus.core.server.planmanagement.QueryParseException: no such operator: data

What exactly should go in place of 'jsonData'? The documentation is not very clear on this.

User avatar
ckuka
Posts: 35
Joined: Tue Jul 29, 2014 10:55 am
Location: Shanghai, P.R. China/中国上海
Contact:

Re: Working with JSON

Post by ckuka » Wed Sep 17, 2014 8:54 am

Do have two queries there or one?
It looks like the Aggregation is done in one query and the definition of "jsonData" in another. If you put both in one query it should work (at least I can not find any errors there).

Code: Select all

#PARSER PQL
#ADDQUERY

jsonData = KEYVALUETOTUPLE({schema=[['firstName', 'String'], ['lastName', 'String'], ['age', 'Integer']],
                                  type='nexmark:test',
                                  keepinput='false'}, json:test)
out = AGGREGATE({
          aggregations=[
            ['COUNT', 'age', 'COUNT_age']
          ]               
        },
        jsonData
      )

SENDERtest = SENDER({
    transport='File',
    wrapper='GenericPush',
    protocol='CSV',
    dataHandler='Tuple',
    SINK="SENDERtest",
    options=[
        ['filename','/harpreet/odysseus_workspace/test.txt'],
        ['json.write.metadata','true'],
        ['json.write.starttimestamp','metadata.starttimestamp'],
        ['json.write.endtimestamp','metadata.endtimestamp']
    ]}, out)


If that does not help and you get a dialog with "Problem occurred", can you please press the button for "Send bug report" so that I have the complete exception text.
kind regards / 祝好
Christian Kuka

User avatar
Marco Grawunder
Posts: 272
Joined: Tue Jul 29, 2014 10:29 am
Location: Oldenburg, Germany
Contact:

Re: Working with JSON

Post by Marco Grawunder » Wed Sep 17, 2014 9:03 am

Yes, the problem here, is that names are only local in a query, if not defined global with ":=".

So "jsonData := " should also work.

harpreet
Posts: 7
Joined: Tue Sep 16, 2014 6:27 pm

Re: Working with JSON

Post by harpreet » Thu Sep 18, 2014 8:03 am

Thank you ckuka and Marco Grawunder.

Yes the issue was that I was using aggregation in another query. Now I have put the aggregation in the same query and I am not getting any error but I am not getting the desired result also.

So this is the data that I have in sample.json

Code: Select all

{"firstName":"a", "age":"27"}
{"firstName":"b", "age":"28"}
{"firstName":"c", "age":"24"}
{"firstName":"d", "age":"25"}
{"firstName":"e", "age":"10"}
{"firstName":"f", "age":"56"}
{"firstName":"g", "age":"45"}
{"firstName":"h", "age":"36"}
{"firstName":"i", "age":"31"}
{"firstName":"j", "age":"35"}
{"firstName":"x", "age":"25"}
And below is my complete script:

Code: Select all

#PARSER PQL
#RUNQUERY
///Your first pql-query here

mysource := ACCESS({
    source='mysource',
    wrapper='GenericPull',
    transport='File',
    protocol='JSON',
    dataHandler='KeyValueObject',
    options=[['filename','/harpreet/odysseus_workspace/sample.json']]})
   
#PARSER PQL
#ADDQUERY
data = KEYVALUETOTUPLE({schema=[['firstName', 'String'], ['age', 'Integer']],
                                  type='sourceName',
                                  keepinput='false'},mysource)


windowed = ELEMENTWINDOW({SIZE=4},data)


out = AGGREGATE({
          aggregations=[
            ['SUM', 'age', 'SUM_age']
          ]         
        },
        windowed
      )
SENDERjson = SENDER({
    transport='File',
    wrapper='GenericPush',
    protocol='CSV',
    dataHandler='Tuple',
    SINK="SENDERjson",
    options=[
        ['filename','/harpreet/odysseus_workspace/test2.txt'],
        ['json.write.metadata','true'],
        ['json.write.starttimestamp','metadata.starttimestamp'],
        ['json.write.endtimestamp','metadata.endtimestamp']
    ]}, out)                                  
So as you can see I am trying to aggregate some values using ELEMENTWINDOW as a start. Though I ultimately want to use TIMEWINDOW.

The above code does not produce any output and the generated "test2.txt" file is empty.

Please help.

Thanks,
Harpreet

User avatar
Marco Grawunder
Posts: 272
Joined: Tue Jul 29, 2014 10:29 am
Location: Oldenburg, Germany
Contact:

Re: Working with JSON

Post by Marco Grawunder » Thu Sep 18, 2014 11:01 am

Ok. I found the problem.

The KEYVALUETOTUPLE operator does not change datatypes and keyvalue attributes are currently treated as strings. The workarround is in the following query (with out the file dump).

Code: Select all

#PARSER PQL
#RUNQUERY
///Your first pql-query here
#IFSRCNDEF mysource
mysource := ACCESS({
    source='mysource',
    wrapper='GenericPull',
    transport='File',
    protocol='JSON',
    dataHandler='KeyValueObject',
    options=[['filename','${WORKSPACEPROJECT}/sample.json']]})
#ENDIF

#PARSER PQL
#ADDQUERY
data_pre = KEYVALUETOTUPLE({schema=[['firstName', 'String'], ['age', 'String']],
                                  type='sourceName',
                                  keepinput='false'},mysource)
                                  
data = MAP({EXPRESSIONS = ['firstname',['toInteger(age)','age']]},data_pre)


windowed = ELEMENTWINDOW({SIZE=4},data)


out = AGGREGATE({
          aggregations=[
            ['SUM', 'age', 'SUM_age']
          ]         
        },
        windowed
      )

harpreet
Posts: 7
Joined: Tue Sep 16, 2014 6:27 pm

Re: Working with JSON

Post by harpreet » Thu Sep 18, 2014 2:34 pm

Thank you so much Marco Grawunder. That worked.

Further I made following small changes in my code:

Code: Select all

windowed = ELEMENTWINDOW({SIZE=10}, data)


out = AGGREGATE({
          aggregations=[
            ['COUNT', 'age', 'COUNT_age', 'Integer'],
            ['MAX', 'age', 'MAX_age'],
            ['MIN', 'age', 'MIN_age'],
            ['SUM', 'age', 'SUM_age']
          ]         
        },
        windowed
      )
Now I have the following results in my output file:

Code: Select all

4,13,10,46.0
10,20,11,155.0

Now I have a few questions on this:
1. Looking at the second line of output, why did Odysseus created the window of 10 elements from element with age 11 (2nd element in my sample.json) to element with age 20 (11th element in my sample.json)? Why not from 1st element to 10th element?
2. By which logic was the second window created with elements from 1st position to 4th?
3. In our case, we would actually be processing historical events which have their own time stamps. We can ensure that these the incoming events have timestamps in ascending order only. Now how do we make Odysseus use those timestamps to create the window instead of system time? Is it even possible with Odysseus? This is very important to our design.

You guys have been very helpful. I hope you can help with these questions too.

Thanks,
Harpreet

User avatar
Marco Grawunder
Posts: 272
Joined: Tue Jul 29, 2014 10:29 am
Location: Oldenburg, Germany
Contact:

Re: Working with JSON

Post by Marco Grawunder » Thu Sep 18, 2014 3:24 pm

1. Looking at the second line of output, why did Odysseus created the window of 10 elements from element with age 11 (2nd element in my sample.json) to element with age 20 (11th element in my sample.json)? Why not from 1st element to 10th element?
Hmm. Did you change the input data?
I get:

Code: Select all

10,56,10,315
Maybe, I did not get the question right?

Here you can find some information on window processing in Odysseus:
http://odysseus.offis.uni-oldenburg.de: ... and+Window
2. By which logic was the second window created with elements from 1st position to 4th?
The output you have, seems to be from two different runs of the query ... a four element window and a ten element window?
3. In our case, we would actually be processing historical events which have their own time stamps. We can ensure that these the incoming events have timestamps in ascending order only. Now how do we make Odysseus use those timestamps to create the window instead of system time? Is it even possible with Odysseus? This is very important to our design.
Yes, you can state an attribute as start time stamp.
It is currently not documented, but will be in
http://odysseus.offis.uni-oldenburg.de: ... ue+Feature

As an alternative, you can use the TIMESTAMP-Operator on tuples:

http://odysseus.offis.uni-oldenburg.de: ... p+operator

harpreet
Posts: 7
Joined: Tue Sep 16, 2014 6:27 pm

Re: Working with JSON

Post by harpreet » Thu Sep 18, 2014 5:16 pm

My bad. I did actually change the data. Forgot to mention in my comment. Sorry about that. Following are the contents of sample.json now:

Code: Select all

{"firstName":"a", "age":"10"}
{"firstName":"b", "age":"11"}
{"firstName":"c", "age":"12"}
{"firstName":"d", "age":"13"}
{"firstName":"e", "age":"14"}
{"firstName":"f", "age":"15"}
{"firstName":"g", "age":"16"}
{"firstName":"h", "age":"17"}
{"firstName":"i", "age":"18"}
{"firstName":"j", "age":"19"}
{"firstName":"k", "age":"20"}
Can you please answer my first 2 questions now?

I'll look into the links that you have shared for question#3.

Thanks,
Harpreet

Post Reply

Who is online

Users browsing this forum: No registered users and 1 guest