Connect to OpenSearch Serverless using Python#
Keywords: AWS, Amazon, OpenSearch, OS, OSS, Serverless.
We introduced two methods that can quickly setup an public accessible (need IAM permission) collection and connect to it using Python.
Method 1
use boto3 SDK to create collection, configure security policy, and connect to it.
use-boto3/requirements.txt
1boto3>=1.28.0
2opensearch-py
3requests_aws4auth
4rich
use-boto3/create_and_configure.py
1# -*- coding: utf-8 -*-
2
3"""
4A simple script to create and configure an OpenSearch serverless collection,
5then you can create index, index data, search documents.
6
7Requirements: see ``requirements.txt``
8
9Reference:
10
11- Using the AWS SDKs to interact with Amazon OpenSearch Serverless: https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-sdk.html#serverless-sdk-python
12"""
13
14import typing as T
15import json
16import time
17import dataclasses
18
19import boto3
20import botocore.exceptions
21from opensearchpy import OpenSearch, RequestsHttpConnection
22from requests_aws4auth import AWS4Auth
23
24
25def create_encryption_policy(
26 oss_client,
27 collection_name: str,
28 verbose: bool = True,
29) -> T.Optional[dict]:
30 """
31 Creates an encryption policy that matches the given collection.
32 """
33 try:
34 response = oss_client.create_security_policy(
35 name=f"{collection_name}-policy",
36 description=f"Encryption policy for {collection_name} collection",
37 type="encryption",
38 policy=json.dumps(
39 {
40 "Rules": [
41 {
42 "ResourceType": "collection",
43 "Resource": [
44 f"collection/{collection_name}",
45 ],
46 }
47 ],
48 "AWSOwnedKey": True,
49 }
50 ),
51 )
52 if verbose:
53 print("Encryption policy created:")
54 print(response)
55 return response
56 except botocore.exceptions.ClientError as error:
57 if error.response["Error"]["Code"] == "ConflictException":
58 if verbose:
59 print(
60 "[ConflictException] "
61 "The policy name or rules conflict with an existing policy."
62 )
63 return None
64 else:
65 raise error
66
67
68def create_network_policy(
69 oss_client,
70 collection_name: str,
71 verbose: bool = True,
72) -> T.Optional[dict]:
73 """
74 Creates a network policy that matches the given collection.
75 The dashboard and collection are both public accessible.
76 """
77 try:
78 response = oss_client.create_security_policy(
79 name=f"{collection_name}-policy",
80 description=f"Network policy for {collection_name} collections",
81 type="network",
82 policy=json.dumps(
83 [
84 {
85 "Description": f"Public access for {collection_name} collection",
86 "Rules": [
87 {
88 "ResourceType": "dashboard",
89 "Resource": [
90 f"collection/{collection_name}",
91 ],
92 },
93 {
94 "ResourceType": "collection",
95 "Resource": [
96 f"collection/{collection_name}",
97 ],
98 },
99 ],
100 "AllowFromPublic": True,
101 },
102 ]
103 ),
104 )
105 if verbose:
106 print("Network policy created:")
107 print(response)
108 return response
109 except botocore.exceptions.ClientError as error:
110 if error.response["Error"]["Code"] == "ConflictException":
111 if verbose:
112 print(
113 "[ConflictException] "
114 "A network policy with this name already exists."
115 )
116 return None
117 else:
118 raise error
119
120
121@dataclasses.dataclass
122class IamUser:
123 name: str
124
125 def to_arn(self, aws_account_id: str):
126 return f"arn:aws:iam::{aws_account_id}:user/{self.name}"
127
128
129@dataclasses.dataclass
130class IamRole:
131 name: str
132
133 def to_arn(self, aws_account_id: str):
134 return f"arn:aws:iam::{aws_account_id}:role/{self.name}"
135
136
137def create_access_policy(
138 oss_client,
139 collection_name: str,
140 aws_account_id: str,
141 trusted_iam_entity_arns: T.List[str],
142 verbose: bool = True,
143) -> T.Optional[dict]:
144 """
145 Creates a data access policy that matches the given collection,
146 it allows the given IAM entities to access the collection.
147 """
148 try:
149 response = oss_client.create_access_policy(
150 name=f"{collection_name}-policy",
151 description=f"Data access policy for {collection_name} collections",
152 type="data",
153 policy=json.dumps(
154 [
155 {
156 "Rules": [
157 {
158 "Resource": [
159 f"index/{collection_name}/*",
160 ],
161 "Permission": [
162 "aoss:CreateIndex",
163 "aoss:DeleteIndex",
164 "aoss:UpdateIndex",
165 "aoss:DescribeIndex",
166 "aoss:ReadDocument",
167 "aoss:WriteDocument",
168 ],
169 "ResourceType": "index",
170 },
171 {
172 "Resource": [
173 f"collection/{collection_name}",
174 ],
175 "Permission": [
176 "aoss:CreateCollectionItems",
177 ],
178 "ResourceType": "collection",
179 },
180 ],
181 "Principal": trusted_iam_entity_arns,
182 }
183 ]
184 ),
185 )
186 if verbose:
187 print("Access policy created:")
188 print(response)
189 return response
190 except botocore.exceptions.ClientError as error:
191 if error.response["Error"]["Code"] == "ConflictException":
192 if verbose:
193 print(
194 "[ConflictException] "
195 "An access policy with this name already exists."
196 )
197 return None
198 else:
199 raise error
200
201
202def create_collection(
203 oss_client,
204 collection_name: str,
205 verbose: bool = True,
206) -> T.Optional[dict]:
207 """
208 Creates a collection.
209
210 Ref:
211
212 - create_collection: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/create_collection.html
213 """
214 try:
215 response = oss_client.create_collection(
216 name=collection_name,
217 type="SEARCH",
218 )
219 return response
220 except botocore.exceptions.ClientError as error:
221 if error.response["Error"]["Code"] == "ConflictException":
222 if verbose:
223 print(
224 "[ConflictException] "
225 "A collection with this name already exists. Try another name."
226 )
227 return None
228 else:
229 raise error
230
231
232def _get_endpoint_from_connection_details(connection_details: dict) -> str:
233 return connection_details["collectionEndpoint"].replace("https://", "")
234
235
236def wait_for_collection_creation(
237 oss_client,
238 collection_name: str,
239 verbose: bool = True,
240) -> str:
241 """
242 Waits for the collection to become active.
243
244 :return: the collection endpoint.
245 """
246 response = oss_client.batch_get_collection(names=[collection_name])
247 # Periodically check collection status
248 while True:
249 collection_details = response["collectionDetails"][0]
250 status = collection_details["status"]
251 if status == "CREATING":
252 if verbose:
253 print("Creating collection...")
254 time.sleep(10)
255 response = oss_client.batch_get_collection(names=[collection_name])
256 elif status == "ACTIVE":
257 if verbose:
258 print("Collection successfully created:")
259 return _get_endpoint_from_connection_details(collection_details)
260 else:
261 raise SystemError(f"status is {status!r}!")
262
263
264def create_oss_object_by_collection_endpoint(
265 boto_ses: boto3.session.Session,
266 collection_endpoint: str,
267) -> OpenSearch:
268 """
269 Create the opensearch-py SDK OpenSearch object from the boto3 session.
270 """
271 credentials = boto_ses.get_credentials()
272 awsauth = AWS4Auth(
273 credentials.access_key,
274 credentials.secret_key,
275 boto_ses.region_name,
276 "aoss",
277 session_token=credentials.token,
278 )
279 oss = OpenSearch(
280 hosts=[{"host": collection_endpoint, "port": 443}],
281 http_auth=awsauth,
282 use_ssl=True,
283 verify_certs=True,
284 connection_class=RequestsHttpConnection,
285 timeout=300,
286 )
287 return oss
288
289
290def create_oss_object_by_collection_name(
291 boto_ses: boto3.session.Session,
292 collection_name: str,
293) -> OpenSearch:
294 oss_client = boto_ses.client("opensearchserverless")
295 collection_endpoint = wait_for_collection_creation(
296 oss_client, collection_name, False
297 )
298 return create_oss_object_by_collection_endpoint(boto_ses, collection_endpoint)
299
300
301def test_oss_connection(
302 oss: OpenSearch,
303 verbose: bool = True,
304):
305 if verbose:
306 print("test opensearch serverless connection by listing indices ...")
307 res = oss.cat.indices(format="json")
308 if verbose:
309 print(res)
310 print("success!")
311
312
313def create_and_configure(
314 aws_profile: str,
315 collection_name: str,
316 verbose: bool = True,
317) -> OpenSearch:
318 boto_ses = boto3.session.Session(profile_name=aws_profile)
319 res = boto_ses.client("sts").get_caller_identity()
320 aws_account_id = res["Account"]
321 iam_entity_arn = res["Arn"]
322 oss_client = boto_ses.client("opensearchserverless")
323 create_encryption_policy(oss_client, collection_name, verbose)
324 create_network_policy(oss_client, collection_name, verbose)
325 create_access_policy(
326 oss_client,
327 collection_name,
328 aws_account_id,
329 [iam_entity_arn],
330 verbose,
331 )
332 res = create_collection(oss_client, collection_name, verbose)
333 if res is None:
334 verbose_ = False
335 else:
336 verbose_ = True
337 collection_endpoint = wait_for_collection_creation(
338 oss_client,
339 collection_name,
340 verbose_,
341 )
342 oss = create_oss_object_by_collection_endpoint(boto_ses, collection_endpoint)
343 test_oss_connection(oss, verbose)
344 return oss
345
346
347def delete_collection(
348 aws_profile: str,
349 collection_name: str,
350 verbose: bool = True,
351):
352 """
353 Ref:
354
355 - delete_collection: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/opensearchserverless/client/delete_collection.html
356 """
357 boto_ses = boto3.session.Session(profile_name=aws_profile)
358 oss_client = boto_ses.client("opensearchserverless")
359 response = oss_client.batch_get_collection(names=[collection_name])
360 collection_details = response["collectionDetails"]
361 if len(collection_details) == 1:
362 collection_id = collection_details[0]["id"]
363 if verbose:
364 print("Deleting collection...")
365 response = oss_client.delete_collection(
366 id=collection_id,
367 )
368 if verbose:
369 print("done")
370 else:
371 if verbose:
372 print("Collection not found")
373
374
375if __name__ == "__main__":
376 from config import aws_profile, collection_name
377
378 oss = create_and_configure(aws_profile, collection_name, verbose=True)
379 # delete_collection(aws_profile, collection_name, verbose=True)
use-boto3/connect_and_test.py
1# -*- coding: utf-8 -*-
2
3"""
4This script creates a test index, insert a document and search it.
5Reference:
6
7- Low-level Python client: https://opensearch.org/docs/latest/clients/python-low-level/
8"""
9
10from datetime import datetime, timezone
11
12import boto3
13from rich import print as rprint
14
15from config import aws_profile, collection_name, index_name
16from create_and_configure import create_oss_object_by_collection_name
17
18
19def delete_index():
20 res = oss.indices.delete(index=index_name, ignore=[400, 404])
21 # print(res)
22
23
24def create_index():
25 res = oss.indices.create(
26 index=index_name,
27 body={
28 "mappings": {
29 "properties": {
30 "time": {"type": "date"},
31 "log": {"type": "text"},
32 }
33 }
34 },
35 ignore=400,
36 )
37 # print(res)
38
39
40def insert_document():
41 oss.index(
42 index=index_name,
43 id="id-1",
44 body={
45 "time": get_utc_now(),
46 "log": "login failed, username not found",
47 },
48 )
49
50
51def search_all():
52 res = oss.search(
53 index=index_name,
54 body={"query": {"match_all": {}}},
55 )
56 rprint(res)
57
58
59def search_by_fts():
60 res = oss.search(
61 index=index_name,
62 body={"query": {"match": {"log": "failed"}}},
63 )
64 rprint(res)
65
66
67def get_utc_now() -> datetime:
68 return datetime.utcnow().replace(tzinfo=timezone.utc)
69
70
71if __name__ == "__main__":
72 boto_ses = boto3.session.Session(profile_name=aws_profile)
73 oss = create_oss_object_by_collection_name(boto_ses, collection_name)
74
75 # delete_index()
76 create_index()
77 # insert_document()
78 # search_all()
79 # search_by_fts()
Method 2
use AWS CDK to create collection, configure security policy, and connect to it.
use-cdk/requirements.txt
1aws-cdk-lib==2.81.0
2constructs==10.2.70
use-cdk/app.py
1# -*- coding: utf-8 -*-
2
3"""
4A simple CDK stack to create and configure an OpenSearch serverless collection.
5So you can create index, index data, search documents.
6Please read the doc string in :class:`Stack` for more details.
7
8Requirements: see ``requirements.txt``, or do ``pip install -r requirements.txt``.
9
10Reference:
11
12- https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_opensearchserverless.html
13"""
14
15import json
16
17import aws_cdk as cdk
18import aws_cdk.aws_opensearchserverless as oss
19from constructs import Construct
20
21
22class Stack(cdk.Stack):
23 """
24 We assume that you want to use IAM user to connect to the OpenSearch serverless
25 collection.
26
27 :param collection_name: the collection name, you can use alpha, letter, hyphen
28 but not underscore.
29 :param iam_entity_arn: the IAM entity arn that has the permission to manipulate
30 the collection.
31 """
32
33 def __init__(
34 self,
35 scope: Construct,
36 construct_id: str,
37 collection_name: str,
38 iam_entity_arn: str,
39 **kwargs,
40 ) -> None:
41 super().__init__(scope, construct_id, **kwargs)
42
43 self.encryption_policy = oss.CfnSecurityPolicy(
44 self,
45 "EncryptionPolicy",
46 name=f"{collection_name}-policy",
47 description=f"Encryption policy for {collection_name} collection",
48 type="encryption",
49 policy=json.dumps(
50 {
51 "Rules": [
52 {
53 "ResourceType": "collection",
54 "Resource": [
55 f"collection/{collection_name}",
56 ],
57 }
58 ],
59 "AWSOwnedKey": True,
60 }
61 ),
62 )
63
64 self.network_policy = oss.CfnSecurityPolicy(
65 self,
66 "NetworkPolicy",
67 name=f"{collection_name}-policy",
68 description=f"Network policy for {collection_name} collections",
69 type="network",
70 policy=json.dumps(
71 [
72 {
73 "Description": f"Public access for {collection_name} collection",
74 "Rules": [
75 {
76 "ResourceType": "dashboard",
77 "Resource": [
78 f"collection/{collection_name}",
79 ],
80 },
81 {
82 "ResourceType": "collection",
83 "Resource": [
84 f"collection/{collection_name}",
85 ],
86 },
87 ],
88 "AllowFromPublic": True,
89 },
90 ]
91 ),
92 )
93
94 self.access_policy = oss.CfnAccessPolicy(
95 self,
96 "AccessPolicy",
97 name=f"{collection_name}-policy",
98 description=f"Data access policy for {collection_name} collections",
99 type="data",
100 policy=json.dumps(
101 [
102 {
103 "Rules": [
104 {
105 "Resource": [
106 f"index/{collection_name}/*",
107 ],
108 "Permission": [
109 "aoss:CreateIndex",
110 "aoss:DeleteIndex",
111 "aoss:UpdateIndex",
112 "aoss:DescribeIndex",
113 "aoss:ReadDocument",
114 "aoss:WriteDocument",
115 ],
116 "ResourceType": "index",
117 },
118 {
119 "Resource": [
120 f"collection/{collection_name}",
121 ],
122 "Permission": [
123 "aoss:CreateCollectionItems",
124 ],
125 "ResourceType": "collection",
126 },
127 ],
128 "Principal": [
129 iam_entity_arn,
130 ],
131 }
132 ]
133 ),
134 )
135
136 self.collection = oss.CfnCollection(
137 self,
138 "OpenSearchServerlessCollection",
139 name=collection_name,
140 type="SEARCH",
141 )
142 self.collection.apply_removal_policy(cdk.RemovalPolicy.DESTROY)
143
144
145if __name__ == "__main__":
146 import boto3
147 from config import aws_profile, collection_name, stack_name
148
149 app = cdk.App()
150
151 boto_ses = boto3.session.Session(profile_name=aws_profile)
152 iam_entity_arn = boto_ses.client("sts").get_caller_identity()["Arn"]
153 stack = Stack(
154 app,
155 construct_id=stack_name,
156 collection_name=collection_name,
157 iam_entity_arn=iam_entity_arn,
158 stack_name=stack_name,
159 )
160
161 app.synth()
use-cdk/deploy.py
1# -*- coding: utf-8 -*-
2
3"""
4Deploy the OpenSearch serverless collection.
5"""
6
7import os
8import subprocess
9from pathlib import Path
10
11dir_here = Path(__file__).absolute().parent
12os.chdir(str(dir_here))
13
14args = [
15 "cdk",
16 "deploy",
17 "--require-approval",
18 "never",
19]
20subprocess.run(args, check=True)
use-cdk/connect_and_test.py
1# -*- coding: utf-8 -*-
2
3import boto3
4
5from config import aws_profile, collection_name
6from create_and_configure import (
7 create_oss_object_by_collection_name,
8 test_oss_connection,
9)
10
11
12if __name__ == "__main__":
13 boto_ses = boto3.session.Session(profile_name=aws_profile)
14 oss = create_oss_object_by_collection_name(boto_ses, collection_name)
15 test_oss_connection(oss, verbose=True)