Commit bbf1c7aa authored by ETretyakov's avatar ETretyakov
Browse files

Added GeoJSON

parent 966ec330
from dataclasses import dataclass
from json import dumps
from elasticsearch import Elasticsearch
from datetime import datetime
from collections import defaultdict
# GeoJSON data structure
#
# data = {
# "type": "FeatureCollection",
# "features": [
# {
# "type": "Feature",
# "properties": {
# "name": "Novosibirsk State U.",
# "flows": {
# 0: 1, <- Ссылка на запись (0 - это index объекта в features)
# 1: 1
# },
# "centroid": [
# 83.1514854, <- Долгота (lon)
# 54.8454047, <- Широта (lat)
# 0
# ]
# },
# "geometry": {
# "type": "MultiPolygon", <- Представление на карте в виде геометрической фигуры
# "coordinates": [
# ...
# ]
# }
# }
# ]
# }
class Loader:
def __init__(self, hosts=None, login=None, password=None, index_name="russian_affiliations"):
if hosts is None:
hosts = [{"host": "127.0.0.1", "port": 9200}]
if login and password:
self.es_connection = Elasticsearch(hosts=hosts, http_auth=(login, password))
else:
self.es_connection = Elasticsearch(hosts=hosts)
self.index_name = index_name
self.page_size = 250
self.scroll_timeout = "2m"
def do_query(self, scroll_id=None):
query = {"query": {"match_all": {}}}
if scroll_id is None:
search_kwargs = {
"index": self.index_name,
"scroll": self.scroll_timeout,
"size": self.page_size,
"body": query
}
results = self.es_connection.search(**search_kwargs)
else:
search_kwargs = {
"body": {
"scroll_id": scroll_id,
"scroll": self.scroll_timeout
}
}
results = self.es_connection.scroll(**search_kwargs)
return results
def get_records(self):
scroll_id = None
while True:
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} -> Querying ES ...")
results = self.do_query(scroll_id=scroll_id)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} -> Results received")
if not results["hits"]["hits"]:
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} -> End reached, exiting ...")
break
scroll_id = results["_scroll_id"]
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} -> Scroll id: {scroll_id}")
for hit in results["hits"]["hits"]:
yield hit
@dataclass
class Affiliation:
id: int
name: str
location: list = None
geojson_id: int = None
def __eq__(self, other):
if isinstance(other, Affiliation):
return self.__dict__ == other.__dict__
return False
def register(self, counter, buffer, bucket):
if self.id not in buffer:
self.geojson_id = counter
buffer.append(self.id)
bucket[self.id] = self
counter += 1
return True, counter
return False, counter
class Converter:
def __init__(self):
self.data = None
self.counter = 0
self.affiliations = {}
self.flows = defaultdict(list)
self.flows_values = defaultdict(int)
self.affiliations_integrity = []
def get_affiliations(self, record):
affiliations = []
try:
for author in record["_source"]["metadata"]["authors"]:
for r in author["affiliations"]:
affiliation_id = int(r["record"].get("$ref").split("/")[-1])
affiliation_name = r["value"]
location = []
for address in r["record"].get("addresses", []):
location.append(address["location"].get("lon"))
location.append(address["location"].get("lat"))
location.append(0)
break
affiliation = Affiliation(affiliation_id, affiliation_name, location)
registered, self.counter = affiliation.register(self.counter, self.affiliations_integrity, self.affiliations)
if registered:
affiliations.append(affiliation)
else:
affiliations.append(self.affiliations.get(affiliation_id))
except ValueError:
print("ValueError")
except KeyError:
print("KeyError")
return affiliations
def build_flows(self, affiliations):
for affiliation in affiliations:
for a in affiliations:
if a != affiliation:
self.flows[affiliation.geojson_id].append(a.geojson_id)
self.flows_values[f"{affiliation.geojson_id}-{a.geojson_id}"] += 1
def assemble_feature(self, affiliation):
feature = {
"type": "Feature",
"properties": {
"name": "",
"flows": {},
"centroid": []
},
"geometry": {
"type": "Point",
"coordinates": []
}
}
feature["properties"]["name"] = affiliation.name
flows = sorted(self.flows[affiliation.geojson_id])
flows = {geojson_id: self.flows_values[f"{affiliation.geojson_id}-{geojson_id}"] for geojson_id in flows}
feature["properties"]["flows"] = flows
feature["properties"]["centroid"] = affiliation.location
feature["geometry"]["coordinates"] = affiliation.location[:2]
return feature
def convert(self, data):
for record in data:
affiliations = self.get_affiliations(record)
self.build_flows(affiliations)
for affiliation in self.affiliations.values():
yield self.assemble_feature(affiliation)
def start():
loader = Loader()
data = loader.get_records()
converter = Converter()
geojson = {
"type": "FeatureCollection",
"features": [i for i in converter.convert(data)]
}
with open("geo.json", "w", encoding="utf-8") as file:
file.write(dumps(geojson))
if __name__ == '__main__':
start()
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment