diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0735432 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +__pycache__/ +*.py[cod] +*.egg-info/ +.eggs/ +build/ +dist/ +.pytest_cache/ +.venv/ +venv/ diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..cb4803e --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1 @@ +pytest>=7,<9 diff --git a/tests/test_update_geonames.py b/tests/test_update_geonames.py new file mode 100644 index 0000000..5fb2be1 --- /dev/null +++ b/tests/test_update_geonames.py @@ -0,0 +1,262 @@ +import copy +import json +from importlib import import_module +from unittest import mock + +import pytest +import requests + +ug = import_module("update_address.update_geonames") + +ERROR_BODY = {"status": {"message": "the geoname feature does not exist.", "value": 11}} + +GOOD_BODY = { + "geonameId": 3165185, + "name": "Trieste", + "toponymName": "Trieste", + "lat": "45.64953", + "lng": "13.77678", + "continentCode": "EU", + "countryId": "3175395", + "countryCode": "IT", + "countryName": "Italy", + "adminName1": "Friuli Venezia Giulia", + "adminId1": "3176525", + "adminCode1": "06", + "adminName2": "Trieste", + "adminId2": "3165184", + "adminCode2": "TS", + "adminCodes1": {"ISO3166_2": "36"}, +} + +DEAD_ID = 2110683 +GOOD_ID = 3165185 +ROR_ID = "https://ror.org/01g5y5k24" + + +@pytest.fixture(autouse=True) +def _clear_cache(): + ug.RESPONSE_CACHE.clear() + yield + ug.RESPONSE_CACHE.clear() + + +def _v2_record(geonames_ids): + return { + "id": ROR_ID, + "locations": [ + { + "geonames_id": gid, + "geonames_details": { + "continent_code": None, + "continent_name": None, + "country_code": None, + "country_name": None, + "country_subdivision_code": None, + "country_subdivision_name": None, + "lat": None, + "lng": None, + "name": None, + }, + } + for gid in geonames_ids + ], + } + + +def _v1_record(geonames_id): + return { + "id": ROR_ID, + "country": {"country_code": None, "country_name": None}, + "addresses": [ug.ror_empty_address(geonames_id)], + } + + +def _fake_http_response(body, status=200): + m = mock.Mock() + m.text = json.dumps(body) + m.status_code = status + m.raise_for_status = mock.Mock() + return m + + +def test_get_geonames_response_none_on_http_error(): + resp = _fake_http_response({}, status=404) + resp.raise_for_status.side_effect = requests.exceptions.HTTPError("404") + with mock.patch.object(ug.requests, "get", return_value=resp): + result, msg = ug.get_geonames_response(DEAD_ID) + assert result is None + assert msg and "Http Error" in msg + + +def test_get_geonames_response_detects_error_status_body(): + with mock.patch.object(ug.requests, "get", return_value=_fake_http_response(ERROR_BODY)): + result, msg = ug.get_geonames_response(DEAD_ID) + assert result is None + assert msg and "11" in msg and str(DEAD_ID) in msg + assert DEAD_ID not in ug.RESPONSE_CACHE + + +def test_get_geonames_response_caches_valid_response(): + with mock.patch.object(ug.requests, "get", return_value=_fake_http_response(GOOD_BODY)): + result, msg = ug.get_geonames_response(GOOD_ID) + assert msg is None + assert result["geonameId"] == GOOD_ID + assert GOOD_ID in ug.RESPONSE_CACHE + + +def test_update_geonames_v2_dead_id_raises_specific_error(): + rec = _v2_record([DEAD_ID]) + with mock.patch.object(ug, "get_geonames_response", return_value=(None, "status 11: gone")): + with pytest.raises(ValueError) as exc: + ug.update_geonames_v2(rec) + msg = str(exc.value) + assert str(DEAD_ID) in msg + assert "01g5y5k24" in msg + + +def test_update_geonames_v2_error_body_int_id_no_typeerror(): + rec = _v2_record([DEAD_ID]) + with mock.patch.object(ug, "get_geonames_response", return_value=(ERROR_BODY, None)): + with pytest.raises(ValueError): + ug.update_geonames_v2(rec) + + +def test_update_geonames_v2_happy_path(): + rec = _v2_record([GOOD_ID]) + with mock.patch.object(ug, "get_geonames_response", return_value=(copy.deepcopy(GOOD_BODY), None)): + out = ug.update_geonames_v2(rec) + assert out is not None + det = out["locations"][0]["geonames_details"] + assert det["name"] == "Trieste" + assert det["country_code"] == "IT" + assert det["country_name"] == "Italy" + assert det["continent_code"] == "EU" + assert det["continent_name"] == "Europe" + assert det["country_subdivision_code"] == "36" + + +def test_update_geonames_v2_multi_location_one_bad(): + rec = _v2_record([GOOD_ID, DEAD_ID]) + + def fake(gid): + return (copy.deepcopy(GOOD_BODY), None) if gid == GOOD_ID else (None, "status 11") + + with mock.patch.object(ug, "get_geonames_response", side_effect=fake): + with pytest.raises(ValueError) as exc: + ug.update_geonames_v2(rec) + assert str(DEAD_ID) in str(exc.value) + + +def test_update_geonames_v1_dead_id_raises(): + rec = _v1_record(DEAD_ID) + with mock.patch.object(ug, "get_geonames_response", return_value=(None, "status 11")): + with pytest.raises(ValueError) as exc: + ug.update_geonames(rec) + assert str(DEAD_ID) in str(exc.value) + assert "01g5y5k24" in str(exc.value) + + +def test_update_geonames_v1_happy_path(): + rec = _v1_record(GOOD_ID) + with mock.patch.object(ug, "get_geonames_response", return_value=(copy.deepcopy(GOOD_BODY), None)): + out = ug.update_geonames(rec) + assert out is not None + assert out["country"]["country_code"] == "IT" + assert out["country"]["country_name"] == "Italy" + assert out["addresses"][0]["city"] == "Trieste" + + +def test_new_geonames_dead_id_raises(): + with mock.patch.object(ug, "get_geonames_response", return_value=(None, "status 11")): + with pytest.raises(ValueError) as exc: + ug.new_geonames(DEAD_ID) + assert str(DEAD_ID) in str(exc.value) + + +def test_new_geonames_v2_dead_id_raises(): + with mock.patch.object(ug, "get_geonames_response", return_value=(ERROR_BODY, None)): + with pytest.raises(ValueError) as exc: + ug.new_geonames_v2(DEAD_ID) + assert str(DEAD_ID) in str(exc.value) + + +def test_new_geonames_v2_happy_path(): + with mock.patch.object(ug, "get_geonames_response", return_value=(copy.deepcopy(GOOD_BODY), None)): + out = ug.new_geonames_v2(GOOD_ID) + assert out is not None + det = out["location"]["geonames_details"] + assert det["name"] == "Trieste" + assert det["country_code"] == "IT" + assert det["continent_name"] == "Europe" + + +def _v2_details(body): + rec = _v2_record([GOOD_ID]) + with mock.patch.object(ug, "get_geonames_response", return_value=(copy.deepcopy(body), None)): + return ug.update_geonames_v2(rec)["locations"][0]["geonames_details"] + + +def test_v2_missing_admincodes1_nulls_subdivision_code(): + det = _v2_details({k: v for k, v in GOOD_BODY.items() if k != "adminCodes1"}) + assert det["country_subdivision_code"] is None + assert det["country_subdivision_name"] == "Friuli Venezia Giulia" + assert det["country_code"] == "IT" + assert det["continent_name"] == "Europe" + + +def test_v2_admincodes1_without_iso_key_nulls_subdivision_code(): + det = _v2_details({**GOOD_BODY, "adminCodes1": {}}) + assert det["country_subdivision_code"] is None + assert det["country_subdivision_name"] == "Friuli Venezia Giulia" + + +def test_v2_empty_adminname1_nulls_subdivision_name(): + det = _v2_details({**GOOD_BODY, "adminName1": ""}) + assert det["country_subdivision_name"] is None + assert det["country_subdivision_code"] == "36" + + +def test_v2_missing_continent_code_leaves_continent_unset(): + det = _v2_details({k: v for k, v in GOOD_BODY.items() if k != "continentCode"}) + assert det["continent_code"] is None + assert det["continent_name"] is None + assert det["country_code"] == "IT" + + +def test_v2_decimal_latlng_converts_to_float(): + det = _v2_details(GOOD_BODY) + assert det["lat"] == 45.64953 and isinstance(det["lat"], float) + assert det["lng"] == 13.77678 and isinstance(det["lng"], float) + + +def test_v2_integer_latlng_converts_to_int(): + det = _v2_details({**GOOD_BODY, "lat": "45", "lng": "13"}) + assert det["lat"] == 45 and isinstance(det["lat"], int) + assert det["lng"] == 13 and isinstance(det["lng"], int) + + +def test_v2_details_sorted_by_key(): + det = _v2_details(GOOD_BODY) + assert list(det.keys()) == sorted(det.keys()) + + +def test_v1_maps_admin_codes_and_country_geonames_id(): + rec = _v1_record(GOOD_ID) + with mock.patch.object(ug, "get_geonames_response", return_value=(copy.deepcopy(GOOD_BODY), None)): + out = ug.update_geonames(rec) + addr = out["addresses"][0] + city = addr["geonames_city"] + assert addr["city"] == "Trieste" + assert city["geonames_admin1"]["code"] == "IT.06" + assert city["geonames_admin2"]["code"] == "IT.06.TS" + assert addr["country_geonames_id"] == 3175395 + assert isinstance(addr["country_geonames_id"], int) + + +def test_v1_compare_countries_updates_country(): + rec = _v1_record(GOOD_ID) + rec["country"] = {"country_code": "XX", "country_name": "Wrong"} + with mock.patch.object(ug, "get_geonames_response", return_value=(copy.deepcopy(GOOD_BODY), None)): + out = ug.update_geonames(rec) + assert out["country"] == {"country_code": "IT", "country_name": "Italy"} diff --git a/update_address/update_geonames.py b/update_address/update_geonames.py index 87890e4..e62f4f5 100644 --- a/update_address/update_geonames.py +++ b/update_address/update_geonames.py @@ -160,7 +160,14 @@ def get_geonames_response(id): response = requests.get(url,params=query_params) response.raise_for_status() result = json.loads(response.text) - RESPONSE_CACHE[id] = result + if is_error_response(result): + status = result.get("status", {}) + msg = ("GeoNames error for ID " + str(id) + " (status " + + str(status.get("value")) + "): " + str(status.get("message"))) + print(msg) + result = None + else: + RESPONSE_CACHE[id] = result except requests.exceptions.HTTPError as errh: msg = "Http Error: " + str(errh) print (msg) @@ -176,6 +183,17 @@ def get_geonames_response(id): return result,msg +def is_error_response(result): + return isinstance(result, dict) and "status" in result + +def geonames_error_message(geonames_id, result, msg): + if result is None: + reason = msg or "no response from GeoNames" + return "GeoNames ID " + str(geonames_id) + " could not be resolved (" + reason + ")" + status = result.get("status", {}) + return ("GeoNames ID " + str(geonames_id) + " does not exist (status " + + str(status.get("value")) + "): " + str(status.get("message"))) + def field_types(key, geonames_value): types = { "lat": CONVERT_FLOAT, @@ -309,46 +327,54 @@ def update_geonames(record, alt_id=None): id, ror_address = get_record_address(record) if alt_id: id = alt_id - geonames_response = get_geonames_response(id)[0] + geonames_response, msg = get_geonames_response(id) + if geonames_response is None or is_error_response(geonames_response): + raise ValueError(geonames_error_message(id, geonames_response, msg) + + " for record " + str(record["id"])) try: mapped_fields = ror_geonames_mapping() address = compare_ror_geoname(mapped_fields, ror_address, geonames_response, ror_address) record['addresses'][0] = address record = compare_countries(record, geonames_response) return record - except: - print("Could not update Geonames ID " + str(id) + " for record " + str(record["id"])) + except Exception as e: + raise ValueError("Could not update Geonames ID " + str(id) + + " for record " + str(record["id"]) + ": " + str(e)) def update_geonames_v2(record, alt_id=None): print("Updating Geonames info for record: " + record["id"]) updated_locations = [] mapped_fields = ror_geonames_mapping_v2() - try: - for location in record['locations']: - try: - geonames_response = get_geonames_response(location['geonames_id'])[0] - print("Geonames response:") - print(geonames_response) - updated_location = compare_ror_geoname_v2(mapped_fields, location, geonames_response, location) - if updated_location['geonames_details']['continent_code']: - updated_location['geonames_details']['continent_name'] = CONTINENT_CODES_NAMES[updated_location['geonames_details']['continent_code']] - print("Updated location:") - print(updated_location) - sorted_geonames_details = dict(sorted(updated_location['geonames_details'].items())) - updated_location['geonames_details'] = sorted_geonames_details - updated_locations.append(updated_location) - except: - print("Could not update Geonames ID " + location['geonames_id'] + " for record " + str(record["id"])) + for location in record['locations']: + geonames_id = location['geonames_id'] + geonames_response, msg = get_geonames_response(geonames_id) + print("Geonames response:") + print(geonames_response) + if geonames_response is None or is_error_response(geonames_response): + raise ValueError(geonames_error_message(geonames_id, geonames_response, msg) + + " for record " + str(record["id"])) + try: + updated_location = compare_ror_geoname_v2(mapped_fields, location, geonames_response, location) + if updated_location['geonames_details']['continent_code']: + updated_location['geonames_details']['continent_name'] = CONTINENT_CODES_NAMES[updated_location['geonames_details']['continent_code']] + print("Updated location:") + print(updated_location) + sorted_geonames_details = dict(sorted(updated_location['geonames_details'].items())) + updated_location['geonames_details'] = sorted_geonames_details + updated_locations.append(updated_location) + except Exception as e: + raise ValueError("Could not update Geonames ID " + str(geonames_id) + + " for record " + str(record["id"]) + ": " + str(e)) - record['locations'] = deepcopy(updated_locations) - return record - except: - print("Could not update locations for record " + str(record["id"])) + record['locations'] = deepcopy(updated_locations) + return record def new_geonames(geonames_id): response = {} - print("Getting Geonames info for ID: " + geonames_id) - geonames_response = get_geonames_response(geonames_id)[0] + print("Getting Geonames info for ID: " + str(geonames_id)) + geonames_response, msg = get_geonames_response(geonames_id) + if geonames_response is None or is_error_response(geonames_response): + raise ValueError(geonames_error_message(geonames_id, geonames_response, msg)) ror_address = ror_empty_address(geonames_id) ror_country = ror_empty_country() try: @@ -358,13 +384,16 @@ def new_geonames(geonames_id): response['address'] = address response['country'] = country return response - except: - print("Could not create ROR address for Geonames ID " + str(geonames_id)) + except Exception as e: + raise ValueError("Could not create ROR address for Geonames ID " + + str(geonames_id) + ": " + str(e)) def new_geonames_v2(geonames_id): response = {} - print("Getting Geonames info for ID: " + geonames_id) - geonames_response = get_geonames_response(geonames_id)[0] + print("Getting Geonames info for ID: " + str(geonames_id)) + geonames_response, msg = get_geonames_response(geonames_id) + if geonames_response is None or is_error_response(geonames_response): + raise ValueError(geonames_error_message(geonames_id, geonames_response, msg)) ror_location = ror_empty_location_v2(geonames_id) try: mapped_fields = ror_geonames_mapping_v2() @@ -373,6 +402,7 @@ def new_geonames_v2(geonames_id): location['geonames_details']['continent_name'] = CONTINENT_CODES_NAMES[location['geonames_details']['continent_code']] response['location'] = location return response - except: - print("Could not create ROR address for Geonames ID " + str(geonames_id)) + except Exception as e: + raise ValueError("Could not create ROR address for Geonames ID " + + str(geonames_id) + ": " + str(e))