[go: up one dir, main page]
More Web Proxy on the site http://driver.im/

ElasticSearch

Qualified Name Type Release

apoc.es.stats

apoc.es.stats(host-or-key,$config) - elastic search statistics

Procedure

Apoc Extended

apoc.es.get

apoc.es.get(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - perform a GET operation on elastic search

Procedure

Apoc Extended

apoc.es.query

apoc.es.query(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - perform a SEARCH operation on elastic search

Procedure

Apoc Extended

apoc.es.getRaw

apoc.es.getRaw(host-or-key,path,payload-or-null,$config) yield value - perform a raw GET operation on elastic search

Procedure

Apoc Extended

apoc.es.postRaw

apoc.es.postRaw(host-or-key,path,payload-or-null,$config) yield value - perform a raw POST operation on elastic search

Procedure

Apoc Extended

apoc.es.post

apoc.es.post(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - perform a POST operation on elastic search

Procedure

Apoc Extended

apoc.es.put

apoc.es.put(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - perform a PUT operation on elastic search

Procedure

Apoc Extended

apoc.es.delete

apoc.es.delete(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,$config) yield value - perform a DELETE operation on elastic search

Procedure

Apoc Extended

It is currently not possible to query Elastic 8 via certificate, but only disabling ssl with the configuration "xpack.security.http.ssl.enabled=false", using the basic authentication via the header config (see config parameter below) or (not recommended) disabling security via xpack.security.enabled=false

Example

call apoc.es.post("localhost","tweets","users",null,{name:"Chris"})
call apoc.es.put("localhost","tweets","users","1",null,{name:"Chris"})
call apoc.es.get("localhost","tweets","users","1",null,null)
call apoc.es.stats("localhost")
call apoc.es.delete("localhost","indexName","typeName","idName")
apoc.es.get

Pagination

To use the pagination feature of Elasticsearch you have to follow these steps:

  1. Call apoc.es.query to get the first chunk of data and obtain also the scroll_id (in order to enable the pagination).

  2. Do your merge/create etc. operations with the first N hits

  3. Use the range(start,end,step) function to repeat a second call to get all the other chunks until the end. For example, if you have 1000 documents and you want to retrieve 10 documents for each request, you cand do range(11,1000,10). You start from 11 because the first 10 documents are already processed. If you don’t know the exact upper bound (the total size of your documents) you can set a number that is bigger than the real total size.

  4. The second call to repeat is apoc.es.get. Remember to set the scroll_id as a parameter.

  5. Then process the result of each chunk of data as the first one.

Here an example:

// It's important to create an index to improve performance
CREATE INDEX FOR (n:Document) ON (n.id)
// First query: get first chunk of data + the scroll_id for pagination
CALL apoc.es.query('localhost','test-index','test-type','name:Neo4j&size=1&scroll=5m',null) yield value with value._scroll_id as scrollId, value.hits.hits as hits
// Do something with hits
UNWIND hits as hit
// Here we simply create a document and a relation to a company
MERGE (doc:Document {id: hit._id, description: hit._source.description, name: hit._source.name})
MERGE (company:Company {name: hit._source.company})
MERGE (doc)-[:IS_FROM]->(company)
// Then call for the other docs and use the scrollId value from previous query
// Use a range to count our chunk of data (i.e. i want to get chunks from 2 to 10)
WITH range(2,10,1) as list, scrollId
UNWIND list as count
CALL apoc.es.get("localhost","_search","scroll",null,{scroll:"5m",scroll_id:scrollId},null) yield value with value._scoll_id as scrollId, value.hits.hits as nextHits
// Again, do something with hits
UNWIND nextHits as hit
MERGE (doc:Document {id: hit._id, description: hit._source.description, name: hit._source.name})
MERGE (company:Company {name: hit._source.company})
MERGE (doc)-[:IS_FROM]->(company) return scrollId, doc, company

This example was tested on a Mac Book Pro with 16GB of RAM. Loading 20000 documents from ES to Neo4j (100 documents for each request) took 1 minute.

General Structure and Parameters

call apoc.es.post(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value

// GET/PUT/POST url/index/type/id?query -d payload

host-or-key parameter

The parameter can be:

For example, by using the apoc.es.stats, we can execute:

CALL apoc.es.stats('http://username:password@host:port')

Moreover, it can be an entry to be lookup up in apoc.conf:

  • lookup apoc.es.url

  • lookup apoc.es.host

This takes precedence over the direct string host or url as the first parameter, as above.

For example, with a apoc.conf like this:

apoc.es.url=http://username:password@host:port

or like this :

apoc.es.host=username:password@host:port

we can connect to elastic by putting null as the first parameter.

For example, by using the apoc.es.stats, we can execute:

CALL apoc.es.stats(null)

Furthermore, it can be an entry to be lookup up in apoc.conf, where <key> have be placed in the first parameter:

  • lookup via key to apoc.es.<key>.url

  • lookup via key apoc.es.<key>.host

For example, with a apoc.conf like this:

apoc.es.custom.url=http://username:password@host:port

or like this:

apoc.es.custom.host=username:password@host:port

we can connect to elastic by putting null as the first parameter.

For example, by using the apoc.es.stats, we can execute:

CALL apoc.es.stats('custom')

index parameter

Main ES index, will be sent directly, if null then "_all" multiple indexes can be separated by comma in the string.

type parameter

Document type, will be sent directly, if null then "_all" multiple types can be separated by comma in the string.

id parameter

Document id, will be left off when null.

query parameter

Query can be a map which is turned into a query string, a direct string or null then it is left off.

payload parameter

Payload can be a map which will be turned into a json payload or a string which will be sent directly or null.

config parameter

Config can be an optional map, which can have the following entries:

Table 1. Config parameters
name type default description

headers

Map

{content-type: "application/json", method, "<httpMethod>"}

Contains a header map to add (or replace) the default one. The method: <httpMethod> is needed by APOC to figure out under the hood, which http request method to pass. That is, by default, it is PUT with the apoc.es.put, POST with the apoc.es.post and apoc.es.postRaw, and GET in other cases.

version

String

DEFAULT

Can be DEFAULT and EIGHT, in order to change the RestAPI endpoint based on Elastic version. See Endpoint table below.

For example, by using the apoc.es.stats, we can execute:

CALL apoc.es.stats('custom', { headers: {Authorization: "Basic <Base64Token>"} })

to use a Basic authentication and create the following HTTP header:

Authorization: Basic <Base64Token>
method: GET
Content-Type: application/json

Some APIs in Elastic 8 can be called by the procedures without needing configuration {version: 'EIGHT'}, for example the apoc.es.stats, but for several APIs, it is necessary to set it, to handle the endpoint properly, for example the apoc.es.query.

Table 2. Endpoint
procedure(s) with version: DEFAULT with version: EIGHT

apoc.es.stats(host)

<host>/_stats

same as DEFAULT

apoc.es.query(host, index, type, query, payload, $conf)

<host>/<index param>/<type param>/_stats?<query param>

<host>/<index param>/_stats?<query param>

apoc.es.getRaw/apoc.es.postRaw(host, path, payload, $conf)

<host>/<path param>

same as DEFAULT

the others apoc.es.<name>(host, index, type, id, query, payload, $conf) procedures

<host>/<index param>/<type param>/<id param>_stats?<query param> By default, the <index param> and <id param> will be populated as _all, while the <id param>, if not present, will be removed from the endpoint

<host>/<index param>/<type param>/<id param>_stats?<query param>. Note that you only need to enter one of three values between <index param>,<id param> and <type param>, the others will eventually be excluded from the endpoint.

The type param is usually an underscore string indicating the type of the API, e.g. _doc or _update (while previously indicated the mapping types). This is to allow you to call, for example, this API

For example, by using the apoc.es.query, we can execute a Search API:

CALL apoc.es.query(<$host>, <$index>, <$type>, 'q=name:Neo4j', null, { version: 'EIGHT' })

Updates a document in Elastic 8 via Update API:

CALL apoc.es.put($host,'<indexName>','_doc','<idName>','refresh=true',{name: 'foo'}, {version: 'EIGHT'})

Call a Create Index API in elastic 8:

CALL apoc.es.put($host,'<indexName>', null, null, null, null, { version: 'EIGHT' })

Results

Results are stream of map in value.

Reciprocal Rank Fusion (RRF)

RRF can be performed from Neo4j using ES. For further details, read the official documentation. Note that this API is supported since version 8.14.x of Elastic.

Here an example using Neo4j with ES.

Step 1 - Mapping creation

CALL apoc.es.put($host, 'example-index', null, null, null,
{
              "mappings": {
                "properties": {
                  "text": {
                    "type": "text"
                  },
                  "vector": {
                    "type": "dense_vector",
                    "dims": 1,
                    "index": true,
                    "similarity": "l2_norm"
                  },
                  "integer": {
                    "type": "integer"
                  }
                }
              }
            }, $config)

Results

Results are stream of map in value.

Step 2 - Put documents

CALL apoc.es.put($host, 'example-index/_doc/1', null, null, null,
{
    "text" : "rrf",
    "vector" : [5],
    "integer": 1
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/2', null, null, null,
{
    "text" : "rrf rrf",
    "vector" : [4],
    "integer": 2
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/3', null, null, null,
{
    "text" : "rrf rrf rrf",
    "vector" : [3],
    "integer": 1
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/4', null, null, null,
{
    "text" : "rrf rrf rrf rrf",
    "integer": 2
}, $config)

CALL apoc.es.put($host, 'example-index/_doc/5', null, null, null,
{
    "vector" : [0],
    "integer": 1
}, $config)

Results

Results are stream of map in value.

Step 3 - Refresh index

CALL apoc.es.post($host, 'example-index/_refresh', null, null, '', $config)

Results

Results are stream of map in value.

Step 4 - Perform search using rrf retriever

CALL apoc.es.getRaw($host,'example-index/_search',
{
    "retriever": {
        "rrf": {
            "retrievers": [
                {
                    "standard": {
                        "query": {
                            "term": {
                                "text": "rrf"
                            }
                        }
                    }
                },
                {
                    "knn": {
                        "field": "vector",
                        "query_vector": [3],
                        "k": 5,
                        "num_candidates": 5
                    }
                }
            ],
            "window_size": 5,
            "rank_constant": 1
        }
    },
    "size": 3,
    "aggs": {
        "int_count": {
            "terms": {
                "field": "integer"
            }
        }
    }
}
,$config) yield value

Results

Results are stream of map in value.