from elasticsearch import Elasticsearch
import pandas as pd
import numpy as np
import pprint
es = Elasticsearch([{'host': '', 'port': 9200}])
INDEX_NAME = "russian_affiliations_with_coords"
START_DATE = "2019-05-01T00:00:00.000Z"
END_DATE = "2019-08-01T00:00:00.000Z"
def _match_all_date_limit(start_date, end_date):
return {
"query": {
"bool": {
"must": [
"range": {
"metadata.preprint_date": {
"format": "strict_date_optional_time",
"gte": start_date,
"lte": end_date
def scroller(es, index, request, pagesize=250, scroll_timeout="2m", **kwargs):
Helper to iterate ALL values from a single index
Yields all the documents.
is_first = True
while True:
# Scroll next
if is_first: # Initialize scroll
result =, scroll="2m", size=pagesize, **kwargs, body=request)
is_first = False
result = es.scroll(body={
"scroll_id": scroll_id,
"scroll": scroll_timeout
scroll_id = result["_scroll_id"]
print(f'Scrolling... - page {scroll_id}')
records = []
hits = result["hits"]["hits"]
# Stop after no more docs
if not hits:
# Yield each entry
for item in hits:
yield item
def get_joint_affiliations(es, index, query):
Executes query, reads each records and returns list of affiliations IDs
{recid, recid, recid, ...}, - for the first document
{recid, recid, ...}, - for the next document(s)
:param es:
:param index:
:param query:
joint = []
total_counter = 0 # counter of all records
for entry in scroller(es, INDEX_NAME, _match_all_date_limit(START_DATE, END_DATE)):
total_counter += 1
authors = entry['_source']['metadata']['authors']
aff_list = []
# add each affiliation record from all authors in array
for author in authors:
if 'affiliations' in author:
for aff in author['affiliations']:
if 'record' in aff:
ref_record = aff['record']['$ref']
# append array as a set to get all unique affiliations records
print(f'Total number of documents = {total_counter}')
print(f'Number of documents with affiliations = {len(joint)}')
return joint
def get_unique(list_of_ids):
Get a list of all (unique) affiliations, which were found in all documents in query
:param list_of_ids:
all_affiliations = []
for row in list_of_ids:
for i in row:
return set(all_affiliations)
def get_matrix(joint):
Create zero-matrix with rows and columns as affiliation records.
Shape of the matrix = number of unique affiliation records
:param affiliations:
unique = get_unique(joint)
column_names, row_names = unique, unique
matrix = np.zeros((len(unique), len(unique)))
return pd.DataFrame(matrix, columns=column_names, index=row_names)
def update_matrix(matrix_df, joint):
Update matrix values according with list of joint affiliations
:param matrix_df:
:param affiliations:
for item in joint:
matrix_df.loc[list(item), list(item)] += 1
def get_connections_df(matrix_df):
Convert sparce matrix to dense representation
id | level_0 | level_1 | count
affiliation_record X | affiliation_record_Y | number of records
:param matrix_df:
df = matrix_df.stack().sort_values(ascending=False).reset_index()
df['level_0'] = df['level_0'].astype(np.int64)
df['level_1'] = df['level_1'].astype(np.int64)
df.rename(columns={0: "count"}, inplace=True)
return df
def extend_connections(connections):
Add information from affiliations repository to the connections dataframe
:param connections:
:param repository:
affiliations_repository = pd.read_csv('affiliations_geoloc.csv', index_col=0)
df = pd.merge(connections, affiliations_repository, how='left', left_on=['level_0'], right_on=['id'])
df.rename(columns={"lat": "lat_0", "lon": "lon_0"}, inplace=True)
df_ = pd.merge(df, affiliations_repository, how='left', left_on=['level_1'], right_on=['id'])
df_.rename(columns={"lat": "lat_1", "lon": "lon_1"}, inplace=True)
return df_
def main():
query = _match_all_date_limit(START_DATE, END_DATE)
joint = get_joint_affiliations(es, INDEX_NAME, query)
matrix = get_matrix(joint)
update_matrix(matrix, joint)
connections = get_connections_df(matrix)
result = extend_connections(connections)
if __name__ == '__main__':
\ No newline at end of file
