When there is need to resolve many entities, multithreading can be used
to speed things up. Below is an example using the autosuggest method and
the implementation would be similar with any of the other methods from
the knowledge_graph family.
The instance of Bigdata includes a built-in rate limiter to avoid making
too many requests in a short period of time. More information can be
foud at threading
.
Example
Here the goal is to use threads and not having to wait for the response
of KnowledgeGraph.autosuggest
before making the next call. We create a
function autosuggest_handling_errors
that will be used by the threads.
Step 1: Create a function that invokes the method and wraps any
possible error.
from bigdata_client import Bigdata
bigdata = Bigdata()
def autosuggest_handling_errors(bigdata: Bigdata, value: str):
try:
return bigdata.knowledge_graph.autosuggest(value)
except Exception as e:
return f"Error: {e}"
What to do when an error appears is up to the implementation, in this
case, the error message is returned. You may prefer to log it, store it
in a file or raise it and stop the execution.
Step 2: Set up the threads and use the function created in step 1.
def concurrent_autosuggest(
bigdata: Bigdata, values: list[str], max_concurrency: int
):
results = {}
with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
futures_to_values = {
executor.submit(autosuggest_handling_errors, bigdata, value): value
for value in values
}
for future in concurrent.futures.as_completed(futures_to_values):
value_resolved = futures_to_values[future]
results[value_resolved] = future.result()
return results
The function above creates a thread pool and will take turns calling
autosuggest_handling_errors
. The response is returned as a dictionary.
The instance of Bigdata reuses the connection so in case you plan on
creating many threads, increase the number of simultaneous connections
allowed by configuring
BIGDATA_MAX_PARALLEL_REQUESTS
.
Step 3: Execution
import pprint
if __name__ == "__main__":
values = ["tesla", "apple"]
max_concurrency = 2
results = concurrent_autosuggest(bigdata, values, max_concurrency)
pprint.PrettyPrinter().pprint(results)
Output:
{'apple': [Company(id='D8442A', name='Apple Inc.', volume=None, description=None, entity_type='COMP', company_type='Public', country='United States', sector='Technology', industry_group='Computer Hardware', industry='Computer Hardware', ticker='AAPL'),
Concept(id='4AD3C9', name='Apple', volume=None, description=None, entity_type='FRTS', entity_type_name='Fruits', concept_level_2=None, concept_level_3=None, concept_level_4=None, concept_level_5=None)],
'tesla': [Company(id='DD3BB1', name='Tesla Inc.', volume=None, description=None, entity_type='COMP', company_type='Public', country='United States', sector='Consumer Goods', industry_group='Automobiles', industry='Automobiles', ticker='TSLA'),
Product(id='C7BCE3', name='Tesla Automobile', volume=None, description=None, entity_type='PROD', product_type='Electric Vehicle', product_owner='Tesla Inc.')]}
Full code
from bigdata_client import Bigdata
import concurrent.futures
from concurrent.futures.thread import ThreadPoolExecutor
import pprint
bigdata = Bigdata()
def autosuggest_handling_errors(bigdata: Bigdata, value: str):
try:
return bigdata.knowledge_graph.autosuggest(value)
except Exception as e:
return f"Error: {e}"
def concurrent_autosuggest(
bigdata: Bigdata, values: list[str], max_concurrency: int
):
results = {}
with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
futures_to_values = {
executor.submit(autosuggest_handling_errors, bigdata, value): value
for value in values
}
for future in concurrent.futures.as_completed(futures_to_values):
value_resolved = futures_to_values[future]
results[value_resolved] = future.result()
return results
values = ["tesla", "apple"]
max_concurrency = 2
results = concurrent_autosuggest(bigdata, values, max_concurrency)
pprint.PrettyPrinter().pprint(results)