Connect to OpenSearch Serverless using Python#

Keywords: AWS, Amazon, OpenSearch, OS, OSS, Serverless.

We introduced two methods that can quickly setup an public accessible (need IAM permission) collection and connect to it using Python.

Method 1

use boto3 SDK to create collection, configure security policy, and connect to it.

use-boto3/requirements.txt

1boto3>=1.28.0
2opensearch-py
3requests_aws4auth
4rich

use-boto3/create_and_configure.py

  1# -*- coding: utf-8 -*-
  2
  3"""
  4A simple script to create and configure an OpenSearch serverless collection,
  5then you can create index, index data, search documents.
  6
  7Requirements: see ``requirements.txt``
  8
  9Reference:
 10
 11- Using the AWS SDKs to interact with Amazon OpenSearch Serverless: https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-sdk.html#serverless-sdk-python
 12"""
 13
 14import typing as T
 15import json
 16import time
 17import dataclasses
 18
 19import boto3
 20import botocore.exceptions
 21from opensearchpy import OpenSearch, RequestsHttpConnection
 22from requests_aws4auth import AWS4Auth
 23
 24
 25def create_encryption_policy(
 26    oss_client,
 27    collection_name: str,
 28    verbose: bool = True,
 29) -> T.Optional[dict]:
 30    """
 31    Creates an encryption policy that matches the given collection.
 32    """
 33    try:
 34        response = oss_client.create_security_policy(
 35            name=f"{collection_name}-policy",
 36            description=f"Encryption policy for {collection_name} collection",
 37            type="encryption",
 38            policy=json.dumps(
 39                {
 40                    "Rules": [
 41                        {
 42                            "ResourceType": "collection",
 43                            "Resource": [
 44                                f"collection/{collection_name}",
 45                            ],
 46                        }
 47                    ],
 48                    "AWSOwnedKey": True,
 49                }
 50            ),
 51        )
 52        if verbose:
 53            print("Encryption policy created:")
 54            print(response)
 55        return response
 56    except botocore.exceptions.ClientError as error:
 57        if error.response["Error"]["Code"] == "ConflictException":
 58            if verbose:
 59                print(
 60                    "[ConflictException] "
 61                    "The policy name or rules conflict with an existing policy."
 62                )
 63            return None
 64        else:
 65            raise error
 66
 67
 68def create_network_policy(
 69    oss_client,
 70    collection_name: str,
 71    verbose: bool = True,
 72) -> T.Optional[dict]:
 73    """
 74    Creates a network policy that matches the given collection.
 75    The dashboard and collection are both public accessible.
 76    """
 77    try:
 78        response = oss_client.create_security_policy(
 79            name=f"{collection_name}-policy",
 80            description=f"Network policy for {collection_name} collections",
 81            type="network",
 82            policy=json.dumps(
 83                [
 84                    {
 85                        "Description": f"Public access for {collection_name} collection",
 86                        "Rules": [
 87                            {
 88                                "ResourceType": "dashboard",
 89                                "Resource": [
 90                                    f"collection/{collection_name}",
 91                                ],
 92                            },
 93                            {
 94                                "ResourceType": "collection",
 95                                "Resource": [
 96                                    f"collection/{collection_name}",
 97                                ],
 98                            },
 99                        ],
100                        "AllowFromPublic": True,
101                    },
102                ]
103            ),
104        )
105        if verbose:
106            print("Network policy created:")
107            print(response)
108        return response
109    except botocore.exceptions.ClientError as error:
110        if error.response["Error"]["Code"] == "ConflictException":
111            if verbose:
112                print(
113                    "[ConflictException] "
114                    "A network policy with this name already exists."
115                )
116            return None
117        else:
118            raise error
119
120
121@dataclasses.dataclass
122class IamUser:
123    name: str
124
125    def to_arn(self, aws_account_id: str):
126        return f"arn:aws:iam::{aws_account_id}:user/{self.name}"
127
128
129@dataclasses.dataclass
130class IamRole:
131    name: str
132
133    def to_arn(self, aws_account_id: str):
134        return f"arn:aws:iam::{aws_account_id}:role/{self.name}"
135
136
137def create_access_policy(
138    oss_client,
139    collection_name: str,
140    aws_account_id: str,
141    trusted_iam_entity_arns: T.List[str],
142    verbose: bool = True,
143) -> T.Optional[dict]:
144    """
145    Creates a data access policy that matches the given collection,
146    it allows the given IAM entities to access the collection.
147    """
148    try:
149        response = oss_client.create_access_policy(
150            name=f"{collection_name}-policy",
151            description=f"Data access policy for {collection_name} collections",
152            type="data",
153            policy=json.dumps(
154                [
155                    {
156                        "Rules": [
157                            {
158                                "Resource": [
159                                    f"index/{collection_name}/*",
160                                ],
161                                "Permission": [
162                                    "aoss:CreateIndex",
163                                    "aoss:DeleteIndex",
164                                    "aoss:UpdateIndex",
165                                    "aoss:DescribeIndex",
166                                    "aoss:ReadDocument",
167                                    "aoss:WriteDocument",
168                                ],
169                                "ResourceType": "index",
170                            },
171                            {
172                                "Resource": [
173                                    f"collection/{collection_name}",
174                                ],
175                                "Permission": [
176                                    "aoss:CreateCollectionItems",
177                                ],
178                                "ResourceType": "collection",
179                            },
180                        ],
181                        "Principal": trusted_iam_entity_arns,
182                    }
183                ]
184            ),
185        )
186        if verbose:
187            print("Access policy created:")
188            print(response)
189        return response
190    except botocore.exceptions.ClientError as error:
191        if error.response["Error"]["Code"] == "ConflictException":
192            if verbose:
193                print(
194                    "[ConflictException] "
195                    "An access policy with this name already exists."
196                )
197            return None
198        else:
199            raise error
200
201
202def create_collection(
203    oss_client,
204    collection_name: str,
205    verbose: bool = True,
206) -> T.Optional[dict]:
207    """
208    Creates a collection.
209
210    Ref:
211
212    - create_collection: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/create_collection.html
213    """
214    try:
215        response = oss_client.create_collection(
216            name=collection_name,
217            type="SEARCH",
218        )
219        return response
220    except botocore.exceptions.ClientError as error:
221        if error.response["Error"]["Code"] == "ConflictException":
222            if verbose:
223                print(
224                    "[ConflictException] "
225                    "A collection with this name already exists. Try another name."
226                )
227            return None
228        else:
229            raise error
230
231
232def _get_endpoint_from_connection_details(connection_details: dict) -> str:
233    return connection_details["collectionEndpoint"].replace("https://", "")
234
235
236def wait_for_collection_creation(
237    oss_client,
238    collection_name: str,
239    verbose: bool = True,
240) -> str:
241    """
242    Waits for the collection to become active.
243
244    :return: the collection endpoint.
245    """
246    response = oss_client.batch_get_collection(names=[collection_name])
247    # Periodically check collection status
248    while True:
249        collection_details = response["collectionDetails"][0]
250        status = collection_details["status"]
251        if status == "CREATING":
252            if verbose:
253                print("Creating collection...")
254            time.sleep(10)
255            response = oss_client.batch_get_collection(names=[collection_name])
256        elif status == "ACTIVE":
257            if verbose:
258                print("Collection successfully created:")
259            return _get_endpoint_from_connection_details(collection_details)
260        else:
261            raise SystemError(f"status is {status!r}!")
262
263
264def create_oss_object_by_collection_endpoint(
265    boto_ses: boto3.session.Session,
266    collection_endpoint: str,
267) -> OpenSearch:
268    """
269    Create the opensearch-py SDK OpenSearch object from the boto3 session.
270    """
271    credentials = boto_ses.get_credentials()
272    awsauth = AWS4Auth(
273        credentials.access_key,
274        credentials.secret_key,
275        boto_ses.region_name,
276        "aoss",
277        session_token=credentials.token,
278    )
279    oss = OpenSearch(
280        hosts=[{"host": collection_endpoint, "port": 443}],
281        http_auth=awsauth,
282        use_ssl=True,
283        verify_certs=True,
284        connection_class=RequestsHttpConnection,
285        timeout=300,
286    )
287    return oss
288
289
290def create_oss_object_by_collection_name(
291    boto_ses: boto3.session.Session,
292    collection_name: str,
293) -> OpenSearch:
294    oss_client = boto_ses.client("opensearchserverless")
295    collection_endpoint = wait_for_collection_creation(
296        oss_client, collection_name, False
297    )
298    return create_oss_object_by_collection_endpoint(boto_ses, collection_endpoint)
299
300
301def test_oss_connection(
302    oss: OpenSearch,
303    verbose: bool = True,
304):
305    if verbose:
306        print("test opensearch serverless connection by listing indices ...")
307    res = oss.cat.indices(format="json")
308    if verbose:
309        print(res)
310        print("success!")
311
312
313def create_and_configure(
314    aws_profile: str,
315    collection_name: str,
316    verbose: bool = True,
317) -> OpenSearch:
318    boto_ses = boto3.session.Session(profile_name=aws_profile)
319    res = boto_ses.client("sts").get_caller_identity()
320    aws_account_id = res["Account"]
321    iam_entity_arn = res["Arn"]
322    oss_client = boto_ses.client("opensearchserverless")
323    create_encryption_policy(oss_client, collection_name, verbose)
324    create_network_policy(oss_client, collection_name, verbose)
325    create_access_policy(
326        oss_client,
327        collection_name,
328        aws_account_id,
329        [iam_entity_arn],
330        verbose,
331    )
332    res = create_collection(oss_client, collection_name, verbose)
333    if res is None:
334        verbose_ = False
335    else:
336        verbose_ = True
337    collection_endpoint = wait_for_collection_creation(
338        oss_client,
339        collection_name,
340        verbose_,
341    )
342    oss = create_oss_object_by_collection_endpoint(boto_ses, collection_endpoint)
343    test_oss_connection(oss, verbose)
344    return oss
345
346
347def delete_collection(
348    aws_profile: str,
349    collection_name: str,
350    verbose: bool = True,
351):
352    """
353    Ref:
354
355    - delete_collection: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/delete_collection.html
356    """
357    boto_ses = boto3.session.Session(profile_name=aws_profile)
358    oss_client = boto_ses.client("opensearchserverless")
359    response = oss_client.batch_get_collection(names=[collection_name])
360    collection_details = response["collectionDetails"]
361    if len(collection_details) == 1:
362        collection_id = collection_details[0]["id"]
363        if verbose:
364            print("Deleting collection...")
365        response = oss_client.delete_collection(
366            id=collection_id,
367        )
368        if verbose:
369            print("done")
370    else:
371        if verbose:
372            print("Collection not found")
373
374
375if __name__ == "__main__":
376    from config import aws_profile, collection_name
377
378    oss = create_and_configure(aws_profile, collection_name, verbose=True)
379    # delete_collection(aws_profile, collection_name, verbose=True)

use-boto3/connect_and_test.py

 1# -*- coding: utf-8 -*-
 2
 3"""
 4This script creates a test index, insert a document and search it.
 5Reference:
 6
 7- Low-level Python client: https://opensearch.org/docs/latest/clients/python-low-level/
 8"""
 9
10from datetime import datetime, timezone
11
12import boto3
13from rich import print as rprint
14
15from config import aws_profile, collection_name, index_name
16from create_and_configure import create_oss_object_by_collection_name
17
18
19def delete_index():
20    res = oss.indices.delete(index=index_name, ignore=[400, 404])
21    # print(res)
22
23
24def create_index():
25    res = oss.indices.create(
26        index=index_name,
27        body={
28            "mappings": {
29                "properties": {
30                    "time": {"type": "date"},
31                    "log": {"type": "text"},
32                }
33            }
34        },
35        ignore=400,
36    )
37    # print(res)
38
39
40def insert_document():
41    oss.index(
42        index=index_name,
43        id="id-1",
44        body={
45            "time": get_utc_now(),
46            "log": "login failed, username not found",
47        },
48    )
49
50
51def search_all():
52    res = oss.search(
53        index=index_name,
54        body={"query": {"match_all": {}}},
55    )
56    rprint(res)
57
58
59def search_by_fts():
60    res = oss.search(
61        index=index_name,
62        body={"query": {"match": {"log": "failed"}}},
63    )
64    rprint(res)
65
66
67def get_utc_now() -> datetime:
68    return datetime.utcnow().replace(tzinfo=timezone.utc)
69
70
71if __name__ == "__main__":
72    boto_ses = boto3.session.Session(profile_name=aws_profile)
73    oss = create_oss_object_by_collection_name(boto_ses, collection_name)
74
75    # delete_index()
76    create_index()
77    # insert_document()
78    # search_all()
79    # search_by_fts()

Method 2

use AWS CDK to create collection, configure security policy, and connect to it.

use-cdk/requirements.txt

1aws-cdk-lib==2.81.0
2constructs==10.2.70

use-cdk/app.py

  1# -*- coding: utf-8 -*-
  2
  3"""
  4A simple CDK stack to create and configure an OpenSearch serverless collection.
  5So you can create index, index data, search documents.
  6Please read the doc string in :class:`Stack` for more details.
  7
  8Requirements: see ``requirements.txt``, or do ``pip install -r requirements.txt``.
  9
 10Reference:
 11
 12- https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_opensearchserverless.html
 13"""
 14
 15import json
 16
 17import aws_cdk as cdk
 18import aws_cdk.aws_opensearchserverless as oss
 19from constructs import Construct
 20
 21
 22class Stack(cdk.Stack):
 23    """
 24    We assume that you want to use IAM user to connect to the OpenSearch serverless
 25    collection.
 26
 27    :param collection_name: the collection name, you can use alpha, letter, hyphen
 28        but not underscore.
 29    :param iam_entity_arn: the IAM entity arn that has the permission to manipulate
 30        the collection.
 31    """
 32
 33    def __init__(
 34        self,
 35        scope: Construct,
 36        construct_id: str,
 37        collection_name: str,
 38        iam_entity_arn: str,
 39        **kwargs,
 40    ) -> None:
 41        super().__init__(scope, construct_id, **kwargs)
 42
 43        self.encryption_policy = oss.CfnSecurityPolicy(
 44            self,
 45            "EncryptionPolicy",
 46            name=f"{collection_name}-policy",
 47            description=f"Encryption policy for {collection_name} collection",
 48            type="encryption",
 49            policy=json.dumps(
 50                {
 51                    "Rules": [
 52                        {
 53                            "ResourceType": "collection",
 54                            "Resource": [
 55                                f"collection/{collection_name}",
 56                            ],
 57                        }
 58                    ],
 59                    "AWSOwnedKey": True,
 60                }
 61            ),
 62        )
 63
 64        self.network_policy = oss.CfnSecurityPolicy(
 65            self,
 66            "NetworkPolicy",
 67            name=f"{collection_name}-policy",
 68            description=f"Network policy for {collection_name} collections",
 69            type="network",
 70            policy=json.dumps(
 71                [
 72                    {
 73                        "Description": f"Public access for {collection_name} collection",
 74                        "Rules": [
 75                            {
 76                                "ResourceType": "dashboard",
 77                                "Resource": [
 78                                    f"collection/{collection_name}",
 79                                ],
 80                            },
 81                            {
 82                                "ResourceType": "collection",
 83                                "Resource": [
 84                                    f"collection/{collection_name}",
 85                                ],
 86                            },
 87                        ],
 88                        "AllowFromPublic": True,
 89                    },
 90                ]
 91            ),
 92        )
 93
 94        self.access_policy = oss.CfnAccessPolicy(
 95            self,
 96            "AccessPolicy",
 97            name=f"{collection_name}-policy",
 98            description=f"Data access policy for {collection_name} collections",
 99            type="data",
100            policy=json.dumps(
101                [
102                    {
103                        "Rules": [
104                            {
105                                "Resource": [
106                                    f"index/{collection_name}/*",
107                                ],
108                                "Permission": [
109                                    "aoss:CreateIndex",
110                                    "aoss:DeleteIndex",
111                                    "aoss:UpdateIndex",
112                                    "aoss:DescribeIndex",
113                                    "aoss:ReadDocument",
114                                    "aoss:WriteDocument",
115                                ],
116                                "ResourceType": "index",
117                            },
118                            {
119                                "Resource": [
120                                    f"collection/{collection_name}",
121                                ],
122                                "Permission": [
123                                    "aoss:CreateCollectionItems",
124                                ],
125                                "ResourceType": "collection",
126                            },
127                        ],
128                        "Principal": [
129                            iam_entity_arn,
130                        ],
131                    }
132                ]
133            ),
134        )
135
136        self.collection = oss.CfnCollection(
137            self,
138            "OpenSearchServerlessCollection",
139            name=collection_name,
140            type="SEARCH",
141        )
142        self.collection.apply_removal_policy(cdk.RemovalPolicy.DESTROY)
143
144
145if __name__ == "__main__":
146    import boto3
147    from config import aws_profile, collection_name, stack_name
148
149    app = cdk.App()
150
151    boto_ses = boto3.session.Session(profile_name=aws_profile)
152    iam_entity_arn = boto_ses.client("sts").get_caller_identity()["Arn"]
153    stack = Stack(
154        app,
155        construct_id=stack_name,
156        collection_name=collection_name,
157        iam_entity_arn=iam_entity_arn,
158        stack_name=stack_name,
159    )
160
161    app.synth()

use-cdk/deploy.py

 1# -*- coding: utf-8 -*-
 2
 3"""
 4Deploy the OpenSearch serverless collection.
 5"""
 6
 7import os
 8import subprocess
 9from pathlib import Path
10
11dir_here = Path(__file__).absolute().parent
12os.chdir(str(dir_here))
13
14args = [
15    "cdk",
16    "deploy",
17    "--require-approval",
18    "never",
19]
20subprocess.run(args, check=True)

use-cdk/connect_and_test.py

 1# -*- coding: utf-8 -*-
 2
 3import boto3
 4
 5from config import aws_profile, collection_name
 6from create_and_configure import (
 7    create_oss_object_by_collection_name,
 8    test_oss_connection,
 9)
10
11
12if __name__ == "__main__":
13    boto_ses = boto3.session.Session(profile_name=aws_profile)
14    oss = create_oss_object_by_collection_name(boto_ses, collection_name)
15    test_oss_connection(oss, verbose=True)