Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenSearch indexing script disconnects from server: ConnectionRefusedError #770

Open
lijie123bes opened this issue Jun 27, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@lijie123bes
Copy link

Describe the bug

I am encountering a ConnectionRefusedError while running an OpenSearch indexing script. Initially, the script worked without issues, indexing approximately 200 files, each containing around 30,000 documents.

However, now it fails with the following error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 198, in _new_conn
    sock = connection.create_connection(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/connection.py", line 85, in create_connection
    raise err
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/connection.py", line 73, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/connection/http_urllib3.py", line 264, in perform_request
    response = self.pool.urlopen(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 847, in urlopen
    retries = retries.increment(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/retry.py", line 445, in increment
    raise reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/util.py", line 39, in reraise
    raise value
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 793, in urlopen
    response = self._make_request(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 491, in _make_request
    raise new_e
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 467, in _make_request
    self._validate_conn(conn)
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 1099, in _validate_conn
    conn.connect()
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 616, in connect
    self.sock = sock = self._new_conn()
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 213, in _new_conn
    raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "indexing.py", line 162, in <module>
    create_index(index_name, path, model, batch_size=100)
  File "indexing.py", line 139, in create_index
    client.bulk(body=batch, refresh=True)
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/client/utils.py", line 181, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/client/__init__.py", line 462, in bulk
    return self.transport.perform_request(
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/transport.py", line 446, in perform_request
    raise e
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/transport.py", line 409, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/connection/http_urllib3.py", line 279, in perform_request
    raise ConnectionError("N/A", str(e), e)
opensearchpy.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused) caused by: NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused)

When encountering this error, I restart OpenSearch and resume indexing from the exact file where the process stopped. However, upon resuming, the same error occurs again.

Related component

Clients

To Reproduce

The script reads JSON files from a directory and indexes them into an OpenSearch cluster. It uses the opensearchpy library to interact with OpenSearch and the sentence-transformers library for document embeddings. This is the code:

import pandas as pd
import os
import json
import torch
import time
from opensearchpy import OpenSearch
from sentence_transformers import SentenceTransformer

path = "../outputNJSONextracted"  # Directory containing your JSON files
model_card = 'sentence-transformers/msmarco-distilbert-base-tas-b'

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device {device}")


host = '127.0.0.1' #host = '54.93.99.186' 
port = 9200
auth = ('admin','IVIngi2024!') #('admin', 'admin')  

client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False,
    timeout=30, 
    max_retries=10
)
print("Connection opened...")

index_name = 'medline-faiss-hnsw-3' # change the index name

index_body = {
 "settings": {
    "index": {
     "knn": "true",
     "refresh_interval" : -1,
     #default_pipeline": "medline-ingest-pipeline",  # embedding in script
     "number_of_shards": 5,
     "number_of_replicas": 0
    }
  },
  "mappings": {
    "properties": {
         "embedding_abstract": {
                        "type": "knn_vector",
                        "dimension": 768,
                        "method":{
                            "engine":"faiss",
                            "name": "hnsw",
                            "space_type": "innerproduct"
                        }
                       },
          "title":{"type":"text"},
          "abstract":{"type":"text"},
          "pmid":{"type":"keyword"},
          "journal":{"type":"text"},
          "pubdate":{"type":"date"},
          "authors":{"type": "text"}
    }
  }
}

response = client.indices.create(index_name, body=index_body)
print(response)

def create_index(index_name, directory_path, model, batch_size=100):
    j = 0
    documents = set()

    files_number = 0
    for filename in sorted(os.listdir(directory_path)):
        start_time = time.time()
        if filename.endswith(".json"):
            print(f"Starting indexing {filename} ...")
            # Construct the full file path
            file_path = os.path.join(directory_path, filename)
            # Read the JSON file
            with open(file_path, 'r') as file:
                # Initialize an empty list to store dictionaries
                dictionaries = []
                
                # Read the file line by line
                for line in file:
                    # Parse each line as JSON and append it to the list
                    dictionaries.append(json.loads(line))
            # Create a DataFrame
            df = pd.DataFrame(dictionaries)
            # Select only the required columns
            df = df[['pmid', 'title', 'abstract', 'journal', 'authors', 'pubdate']]
            # Output the file name
            batch = []
            for i, row in df.iterrows(): 
                pmid = row["pmid"]
                if pmid in documents:
                    continue
                else:
                    documents.add(pmid)
                    embedding = model.encode(row["abstract"])
                    doc = {
                        "pmid": pmid,
                        "abstract": row["abstract"],
                        "title": row["title"],
                        "authors": row['authors'],
                        "journal": row['journal'],
                        "pubdate": row['pubdate'],
                        "embedding_abstract": embedding
                        }
                    batch.append({"index": {"_index": index_name, "_id": pmid}})
                    batch.append(doc)
                    j += 1
                if len(batch) >= batch_size*2:
                    client.bulk(body=batch, refresh=True)
                    batch = []
                 
            if batch:
                client.bulk(body=batch, refresh=True)
                print(f"Indexed remaining documents")

            files_number += 1
            print(f"Processed file: {filename} in {time.time()-start_time}")
            print("Number of currently documents indexed ",j)
            if files_number % 100 == 0:
                print("-"*50)
                print(f"Files indexed = {files_number}")
                print()

    print("Total documents inserted = ", j)
    
    

model = SentenceTransformer(model_card)
model.to(device)
print("Creating indexing...")
start = time.time()
create_index(index_name, path, model, batch_size=100)
print(f"Time neeeded {time.time() - start}")

Troubleshooting Steps Taken:

Increased heap size in jvm.options file to 8GB (the max that I can).
Attempted to reduce batch size to mitigate potential HTTP request size issues.
Despite these efforts, the script continues to encounter connection issues.

I suspect that the problem may be related to sending large HTTP requests or some configuration issue with the OpenSearch server. However, I'm unsure how to proceed in diagnosing and resolving the issue.

Any insights or suggestions would be greatly appreciated. Thank you!

Expected behavior

namal Connection

Additional Details

Plugins
Please list all plugins currently enabled.

Screenshots
If applicable, add screenshots to help explain your problem.

Host/Environment (please complete the following information):

  • OS: [e.g. iOS]
  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

@lijie123bes lijie123bes added bug Something isn't working untriaged Need triage labels Jun 27, 2024
@peternied peternied removed the untriaged Need triage label Jul 3, 2024
@peternied
Copy link
Member

[Triage - attendees 1 2 3]
@lijie123bes Thanks for creating this issue with full reproduction steps, moving this to the python client repo for further investigation.

@opensearch-project/admin could you please transfer this issue to the https://github.com/opensearch-project/opensearch-py repository?

@gaiksaya gaiksaya transferred this issue from opensearch-project/OpenSearch Jul 3, 2024
@github-actions github-actions bot added the untriaged Need triage label Jul 3, 2024
@dblock
Copy link
Member

dblock commented Jul 3, 2024

@lijie123bes Let's narrow this down to a client vs. server problem.

Do you have errors in the server-side log, meaning is the server actually refusing connections because of a circuit breaker or something like this?

@saimedhi saimedhi removed the untriaged Need triage label Jul 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants