Real-Time S3 Processing Arrives on CocoIndex via AWS SQS Integration
CocoIndex now provides native support for Amazon S3 as a data source. Additionally, CocoIndex integrates with AWS Simple Queue Service (SQS), enabling true real-time incremental processing of your S3 data. We are constantly improving, and more features and examples are coming soon. If you love this article, please give us a star ⭐ at GitHub to help us grow.
Why incremental processing?
Incremental processing is a data processing approach that only processes new or changed data since the last run, rather than reprocessing the entire dataset each time. This is particularly valuable for large datasets where reprocessing everything would be inefficient and time-consuming, or when you require low latency on data freshness. In general, incremental processing is preferred under the following conditions:
High Freshness Requirement
For most user-facing applications this is needed. For example, when users update their documents, it’s unexpected if they see stale information in search results. If the search result is fed into an AI agent, it may mean unexpected response to users (i.e. LLM generate output based on inaccurate information). It’s more dangerous and users may even take the unexpected response without noticing.
High Transformation Cost
Transformation cost is significantly higher than retrieval itself. This is especially true when using expensive AI models for embedding or processing.
Data is large scale
When dealing with large-scale datasets, reprocessing the entire dataset can be impractical or even impossible due to:
– Limited computational resources
– Time constraints
– Cost considerations
– Storage limitations
For example, if you have terabytes of data in S3, reprocessing everything every time a new file arrives would be extremely inefficient and costly. Incremental processing allows you to handle these large datasets efficiently by only processing the changes.
Overall, say T is your most acceptable staleness. If you don’t want to recompute the whole thing repeatedly every cycle of T, you will need incremental processing more or less.
How does CocoIndex support incremental processing?
CocoIndex’s incremental processing provides several key technical advantages:
1. Efficient Change Processing: Only processes new or modified S3 files, optimizing computational resource usage.
2. Real-time SQS Integration: Leverages AWS SQS for instant S3 change detection and near real-time processing.
3.Smart Caching System:
– Automatically manages persisted states with data lineage tracking to ensure data consistency.
– Optimizes performance through smart caching and minimal recomputation of only changed components.
The caching is at the granularity of your control on the transformation flow, beyond the scope of a file. For example, if you have M chunks of a file, and N chunk changed and requires embedding, only the N chunks will be recomputed if the transformation logic is the same.
AWS SQS
Amazon SQS (Simple Queue Service) is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications. It provides a reliable, highly-scalable hosted queue for storing messages as they travel between applications or microservices.
When files are uploaded to or modified in S3, SQS receives notifications about these changes and queues them as messages. Each message contains metadata about the S3 event, such as:
– The type of event (e.g., ObjectCreated, ObjectRemoved)
– The S3 bucket name
– The object key (file path)
– Timestamp of the event
– Other relevant metadata
These messages remain in the queue until they are successfully processed, ensuring no events are lost even if the processing system is temporarily unavailable.
Live update out of the box with SQS
CocoIndex provides two modes to run your pipeline, one time update and live update, both leverage the incremental processing. Particularly with AWS SQS, you could leverage the live update mode –
where CocoIndex continuously monitors and reacts to the events in SQS, updating the target data in real-time. This is ideal for use cases where data freshness is critical.
How does it work?
Let’s take a look at simple example of how to build a real-time data transformation pipeline with S3 and CocoIndex. It builds a vector database of text embeddings from markdown files in S3. You could find a similar example to process local files in this blog.
S3 bucket and SQS setup
Please follow the documentation here to setup S3 bucket and SQS queue.
S3 bucket
– Creating an AWS account.
– Configuring IAM permissions.
– Configure policies. You’ll need at least the `AmazonS3ReadOnlyAccess` policy, and if you want to enable change notifications, you’ll also need the `AmazonSQSFullAccess` policy.
SQS queue
For real-time change detection, you’ll need to create an SQS queue and configure it to receive notifications from your S3 bucket.
Please follow the documentation to configure the S3 bucket to send event notifications to the SQS queue.
Particularly, the SQS queue needs a specific access policy that allows S3 to send messages to it.
{
...
"Statement": [
...
{
"Sid": "__publish_statement",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Resource": "${SQS_QUEUE_ARN}",
"Action": "SQS:SendMessage",
"Condition": {
"ArnLike": {
"aws:SourceArn": "${S3_BUCKET_ARN}"
}
}
}
]
}
Then you can upload your files to the S3 bucket.
Define Indexing Flow
Flow Design
The flow diagram illustrates how we’ll process our codebase:
1. Read text files from the Amazon S3 bucket
2. Chunk each document
3. For each chunk, embed it with a text embedding model
4. Store the embeddings in a vector database for retrieval
AWS File Ingestion
Define the AWS endpoint and the SQS queue name in `.env` file:
# Database Configuration
DATABASE_URL=postgresql://localhost:5432/cocoindex
# Amazon S3 Configuration
AMAZON_S3_BUCKET_NAME=your-bucket-name
AMAZON_S3-SQS_QUEUE_URL=https://sqs.us-west-2.amazonaws.com/123456789/S3ChangeNotifications
Define indexing flow and ingest from Amazon S3 SQS queue:
@cocoindex.flow_def(name="AmazonS3TextEmbedding")
def amazon_s3_text_embedding_flow(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
):
bucket_name = os.environ["AMAZON_S3_BUCKET_NAME"]
prefix = os.environ.get("AMAZON_S3_PREFIX", None)
sqs_queue_url = os.environ.get("AMAZON_S3_SQS_QUEUE_URL", None)
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.AmazonS3(
bucket_name=bucket_name,
prefix=prefix,
included_patterns=["*.md", "*.mdx", "*.txt", "*.docx"],
binary=False,
sqs_queue_url=sqs_queue_url,
)
)
This defines a flow that reads text files from the Amazon S3 bucket.
### Rest of the flow
For the rest of the flow, we can follow the tutorial [in this blog](https://cocoindex.io/blog/text-embeddings-101).
The entire project is available [here](https://github.com/cocoindex-io/cocoindex/tree/main/examples/amazon_s3_embedding).
## Run the flow with live update
In the `main()` function, you can add the following code to run the flow with live update:
def _main():
with cocoindex.FlowLiveUpdater(amazon_s3_text_embedding_flow):
and start the flow by running:
python3 main.py
Note that the main purpose of putting the FlowLiveUpdater in main.py is that it could run query in the same process – so one process for both live indexing and serving.
Other option to have long running process to keep index updated:
cocoindex update main.py -L
And you will have a continuous long running process that will update the vector database with the new files in the S3 bucket.
We are constantly improving, and more features and examples are coming soon. If you love this article, please give us a star ⭐ at GitHub – https://github.com/cocoindex-io/cocoindex to help us grow. Thanks for reading!