RAG based recommendation system on Shopify, using PyAibyte, Langchain and Pinecone

Learn how to build an end-to-end RAG pipeline, extracting data from Shopify using PyAirbyte, storing it on Pinecone, and then use LangChain to perform RAG on the stored data.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

This notebook demonstrates an end-to-end Retrieval-Augmented Generation (RAG) pipeline. We will extract data from an Shopify bucket using PyAirbyte, store it in a Pinecone vector store, and then use LangChain to perform RAG on the stored data. This workflow showcases how to integrate these tools to build a scalable RAG system.

Prerequisites

  1. Shopify:
    • Follow the instructions in the Shopify Connector Docs to set up your Shopify and obtain the necessary access keys.
  2. Pinecone Account:
    • Create a Pinecone Account: Sign up for an account on the Pinecone website.
    • Obtain Pinecone API Key: Generate a new API key from your Pinecone project settings. For detailed instructions, refer to the Pinecone documentation.
  3. OpenAI API Key:
    • Create an OpenAI Account: Sign up for an acco unt on OpenAI.
    • Generate an API Key: Go to the API section and generate a new API key. For detailed instructions, refer to the OpenAI documentation.

Install PyAirbyte and other dependencies

# Add virtual environment support for running in Google Colab
!apt-get install -qq python3.10-venv

# First, we need to install the necessary libraries.
!pip3 install airbyte openai langchain pinecone-client langchain-openai langchain-pinecone python-dotenv langchainhub

Setup Source Shopify with PyAirbyte

The provided code configures an Airbyte source to extract data from a Shopify store.

To configure according to your requirements, you can refer to this references.

Note: The credentials are retrieved securely using the get_secret() method. This will automatically locate a matching Google Colab secret or environment variable, ensuring they are not hard-coded into the notebook. Make sure to add your key to the Secrets section on the left.

import airbyte as ab

source = ab.get_source(
    "source-shopify",
    config={

        "credentials":{
            # There are two methods available for authentication 'api_password' and 'oauth2.0',
            # Choose one of them (https://docs.airbyte.com/integrations/sources/shopify#airbyte-open-source)
            "auth_method": "api_password",
            "api_password": ab.get_secret("API_PASSWORD")
        },
        "shop":ab.get_secret("STORE_NAME"),
        "start_date": ab.get_secret("START_DATE"),
        "bulk_window_in_days": 30, # change this according to your requirement (Defines what would be a date range per single BULK Job)
        "fetch_transactions_user_id": False
    }
)
source.check()

This is a basic process of fetching data from an using Airbyte and converting it into a format suitable for further processing or analysis.

# List the available streams available for the Shopify source
# Available Stream for shopify here(https://docs.airbyte.com/integrations/sources/shopify#supported-streams)
source.get_available_streams()
# Select the streams we are interested in loading to cache
source.select_streams(["products", "product_variants", "collections", "customers"])
cache = ab.get_default_cache()
result = source.read(cache=cache)

product_details = [doc for doc in result["products"].to_documents()]  # Fetching data for products stream only

print(str(product_details[10]))

Use Langchain to build a RAG pipeline

The code uses RecursiveCharacterTextSplitter to break documents into smaller chunks. Metadata within these chunks is converted to strings. This facilitates efficient processing of large texts, enhancing analysis capabilities.

from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
chunked_docs = splitter.split_documents(product_details)
print(f"Created {len(chunked_docs)} document chunks.")

for doc in chunked_docs:
    for md in doc.metadata:
        doc.metadata[md] = str(doc.metadata[md])
from langchain_openai import OpenAIEmbeddings

## Embedding Technique Of OPENAI
os.environ['OPENAI_API_KEY'] = ab.get_secret("OPENAI_API_KEY")
embeddings=OpenAIEmbeddings()

Setting up Pinecone

Pinecone is a managed vector database service designed for storing, indexing, and querying high-dimensional vector data efficiently.

from pinecone import Pinecone, ServerlessSpec
from pinecone import Pinecone
import os

os.environ['PINECONE_API_KEY'] = ab.get_secret("PINECONE_API_KEY")
pc = Pinecone()
index_name = "shopifyproductsindex" # Replace with your index name


# Uncomment this if you have not created a Pinecone index yet

# spec = ServerlessSpec(cloud="aws", region="us-east-1") #Replace with your cloud and region
# pc.create_index(
#         name=index_name,
#         dimension=1536,
#         metric='cosine',
#         spec=spec
# )

index = pc.Index(index_name)

index.describe_index_stats()

PineconeVectorStore is a class provided by the LangChain library specifically designed for interacting with Pinecone vector stores. from_documents method of PineconeVectorStore is used to create or update vectors in a Pinecone vector store based on the provided documents and their corresponding embeddings.

from langchain_pinecone import PineconeVectorStore

pinecone = PineconeVectorStore.from_documents(
    chunked_docs, embeddings, index_name=index_name
)

Now setting up a pipeline for RAG using LangChain, incorporating document retrieval from Pinecone, prompt configuration, and a chat model from OpenAI for response generation.

from langchain_openai import ChatOpenAI
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

retriever = pinecone.as_retriever()
prompt = hub.pull("rlm/rag-prompt")
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)


def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)
print("Langchain RAG pipeline set up successfully.")
print(rag_chain.invoke("What type of products do we sell?"))

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Similar use cases

No similar recipes were found, but check back soon!