Source code for soso.strategies.spase.spase

"""The SPASE strategy module."""

import atexit
import json
import re
import os
import tempfile
import importlib.resources
from datetime import datetime, timedelta
from pathlib import Path
from typing import Union, List, Dict
from urllib.parse import urlparse
import requests
from lxml import etree
from urllib.parse import urlparse
from soso.interface import StrategyInterface
from soso.utilities import delete_null_values


# create temp file which holds problematic records encountered during script
# Create a named temporary file which is deleted via garbage collection
temp_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
temp_file_path = temp_file.name
# print("Temp file exists?: " + str(os.path.exists(temp_file_path)) + ':' + temp_file_path)


def cleanup_temp_file():
    """Cleanup the temporary file on exit."""
    if not temp_file.closed:
        temp_file.close()


atexit.register(cleanup_temp_file)


[docs]class SPASE(StrategyInterface): """Define the conversion strategy for SPASE (Space Physics Archive Search and Extract). Attributes: file: The path to the metadata file. This should be an XML file in SPASE format. schema_version: The version of the SPASE schema used in the metadata file. kwargs: Additional keyword arguments for handling unmappable properties. See the Notes section below for details. Notes: Some properties of this metadata standard don't directly map to SOSO. However, these properties can still be included by inputting the information as `kwargs`. Keys should match the property name, and values should be the desired value. For a deeper understanding of each SOSO property, refer to the `SOSO guidelines <https://github.com/ESIPFed/science-on-schema.org/blob/master/guides/Dataset.md>`_. Below are unmappable properties for this strategy: - includedInDataCatalog - is_accessible_for_free - version - expires - provider :ref:`A shared conversion script <spase_HowToConvert>` is available for this standard. It is designed for repositories that supplement SPASE metadata with shared infrastructure, using the ancillary information to generate a richer SOSO record. """ def __init__(self, file: str, **kwargs: dict): """Initialize the strategy.""" file = str(file) # incase file is a Path object if not file.endswith(".xml"): # file should be XML raise ValueError(file + " must be an XML file.") super().__init__(metadata=etree.parse(file)) self.file = file self.schema_version = get_schema_version(self.metadata) self.kwargs = kwargs self.root = self.metadata.getroot() namespace = "" for ns in list(self.root.nsmap.values()): if "spase-group" in ns: namespace = ns self.namespaces = {"spase": namespace} # find element in tree to iterate over for elt in self.root.iter(tag=etree.Element): if ( elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData") or elt.tag.endswith("Observatory") or elt.tag.endswith("Instrument") or elt.tag.endswith("Collection") or elt.tag.endswith("Catalog") ): self.desired_root = elt # if want to see entire xml file as a string # print(etree.tostring(self.desired_root, pretty_print = True).decode(), end=' ')
[docs] def get_id(self) -> str: # Mapping: schema:identifier = spase:ResourceHeader/spase:DOI # OR spase-metadata.org landing page for the SPASE record url = self.get_url() if url: spase_id = url else: spase_id = None return delete_null_values(spase_id)
[docs] def get_name(self) -> str: # Mapping: schema:name = spase:ResourceHeader/spase:ResourceName desired_tag = self.desired_root.tag.split("}") spase_location = ( ".//spase:" + f"{desired_tag[1]}/spase:ResourceHeader/spase:ResourceName" ) name = self.metadata.findtext( spase_location, namespaces=self.namespaces, ) return delete_null_values(name)
[docs] def get_description(self) -> Union[List, str]: # Mapping: schema:description = spase:ResourceHeader/spase:Description desired_tag = self.desired_root.tag.split("}") spase_location = ( ".//spase:" + f"{desired_tag[1]}/spase:ResourceHeader/spase:Description" ) description = self.metadata.findtext( spase_location, namespaces=self.namespaces, ) # print(len(description)) # add check for very long descriptions (>5000 chars) and split these up if description: if len(description) > 5000: # print("Long description found.") split_desc = [] splits_required = (len(description) // 5000) + 1 # print(str(splits_required)) for _ in range(splits_required): # print("Splitting description") split_desc.append(description[:5000]) description = description[5000:] description = split_desc return delete_null_values(description)
[docs] def get_url(self) -> str: # Mapping: schema:url = spase:ResourceHeader/spase:DOI # (or https://spase-metadata.org landing page, if no DOI) desired_tag = self.desired_root.tag.split("}") spase_location = ( ".//spase:" + f"{desired_tag[1]}/spase:ResourceHeader/spase:DOI" ) url = self.metadata.findtext( spase_location, namespaces=self.namespaces, ) if delete_null_values(url) is None: resource_id = get_resource_id(self.metadata, self.namespaces) if resource_id: url = resource_id.replace("spase://", "https://spase-metadata.org/") return delete_null_values(url)
[docs] def get_same_as(self) -> Union[List, None]: # Mapping: schema:sameAs = spase:ResourceHeader/spase:PriorID same_as = [] # traverse xml to extract needed info for child in self.desired_root.iter(tag=etree.Element): if child.tag.endswith("PriorID"): same_as.append(child.text) if not same_as: same_as = None elif len(same_as) == 1: same_as = same_as[0] return delete_null_values(same_as)
[docs] def get_version(self) -> None: version = None return delete_null_values(version)
# commented out partial code that was put on hold due to licenses being added to SPASE soon
[docs] def get_is_accessible_for_free(self) -> None: # free = None # """schema:description: spase:AccessInformation/AccessRights""" is_accessible_for_free = None # local vars needed # access = "" # iterate thru to find AccessInfo # for child in self.desired_root: # if access == "Open": # break # if child.tag.endswith("AccessInformation"): # target_child = child # iterate thru to find AccessRights # for child in target_child: # if child.tag.endswith("AccessRights"): # access = child.text # if access == "Open": # is_accessible_for_free = True # else: # is_accessible_for_free = False return delete_null_values(is_accessible_for_free)
[docs] def get_keywords(self) -> Union[List, None]: # Mapping: schema:keywords = spase:Keyword keywords = [] # traverse xml to extract needed info for child in self.desired_root.iter(tag=etree.Element): if child.tag.endswith("Keyword"): keywords.append(child.text) if not keywords: keywords = None return delete_null_values(keywords)
[docs] def get_identifier(self) -> Union[Dict, List[Dict], None]: # Mapping: schema:identifier = spase:ResourceHeader/spase:DOI # (or https://spase-metadata.org landing page, if no DOI) # Each item is: {@id: URL, @type: schema:PropertyValue, # propertyID: URI for identifier scheme, value: identifier value, url: URL} # Uses identifier scheme URI, provided at: https://schema.org/identifier # OR schema:PropertyValue, provided at: https://schema.org/PropertyValue url = self.get_url() spase_id = get_resource_id(self.metadata, self.namespaces) if url: # if SPASE record has a DOI if "doi" in url: landing_page_url = spase_id.replace( "spase://", "https://spase-metadata.org/" ) temp = url.split("/") value = "doi:" + "/".join(temp[3:]) identifier = { "@list": [ { "@type": "PropertyValue", "propertyID": "https://registry.identifiers.org/registry/doi", "value": value, "url": url, "name": value.replace("doi:", "DOI: "), }, { "@type": "PropertyValue", "propertyID": "SPASE", "value": spase_id, "url": landing_page_url, }, ] } # if SPASE record only has landing page instead else: identifier = { "@type": "PropertyValue", "propertyID": "SPASE", "url": url, "value": spase_id, } else: identifier = None return delete_null_values(identifier)
[docs] def get_citation(self) -> Union[List[Dict], None]: # Mapping: schema:citation = spase:ResourceHeader/spase:InformationURL citation = [] information_url = get_information_url(self.metadata) if information_url: for each in information_url: # most basic citation item entry = { "@id": each["url"], "@type": "CreativeWork", "url": each["url"], "identifier": each["url"], } if "name" in each.keys(): entry["name"] = each["name"] if "description" in each.keys(): entry["description"] = each["description"] citation.append(entry) else: citation = None return delete_null_values(citation)
[docs] def get_variable_measured(self) -> Union[List[Dict], None]: # Mapping: schema:variable_measured = spase:Parameters/spase:Name, # Description, Units, ParameterKey # Each object is: # {"@type": schema:PropertyValue, "name": Name, # "description": Description, "unitText": Units, "alternateName": ParameterKey} # Following schema:PropertyValue found at: https://schema.org/PropertyValue variable_measured = [] # minVal = "" # maxVal = "" param_desc = "" param_name = "" units_found = [] key = "" i = 0 # traverse xml to extract needed info for child in self.desired_root.iter(tag=etree.Element): if child.tag.endswith("Parameter"): target_child = child for child in target_child: units_found.append("") try: if child.tag.endswith("Name"): param_name = child.text elif child.tag.endswith("Description"): substring = child.text.split("\n", 1) param_desc = substring[0] elif child.tag.endswith("Units"): unit = child.text units_found[i] = unit elif child.tag.endswith("ParameterKey"): key = child.text # elif child.tag.endswith("ValidMin"): # minVal = child.text # elif child.tag.endswith("ValidMax"): # maxVal = child.text except AttributeError: continue # most basic entry for variable measured entry = {"@type": "PropertyValue", "name": param_name} # "minValue": f"{minVal}", # "maxValue": f"{maxVal}"}) if param_desc: entry["description"] = param_desc if units_found[i]: entry["unitText"] = units_found[i] if key: entry["alternateName"] = key i += 1 variable_measured.append(entry) if len(variable_measured) == 0: variable_measured = None return delete_null_values(variable_measured)
[docs] def get_included_in_data_catalog(self) -> None: included_in_data_catalog = None return delete_null_values(included_in_data_catalog)
[docs] def get_subject_of(self, *moreLicenseInfo) -> Union[Dict, None]: # Mapping: schema:subjectOf = {http://www.w3.org/2001/XMLSchema-instance}MetadataRights # AND spase:ResourceHeader/spase:ReleaseDate # Following type:DataDownload found at: https://schema.org/DataDownload date_modified = self.get_date_modified() metadata_license = get_metadata_license(self.metadata) content_url = self.get_id() doi = False if "doi" in content_url: doi = True resource_id = get_resource_id(self.metadata, self.namespaces) content_url = resource_id.replace("spase://", "https://spase-metadata.org/") # small lookup table for commonly used licenses in SPASE # (CC0 for NASA, CC-BY-NC-3.0 for ESA, etc) common_licenses = [ { "fullName": "Creative Commons Zero v1.0 Universal", "identifier": "CC0-1.0", "url": "https://spdx.org/licenses/CC0-1.0.html", }, { "fullName": "Creative Commons Attribution Non Commercial 3.0 Unported", "identifier": "CC-BY-NC-3.0", "url": "https://spdx.org/licenses/CC-BY-NC-3.0.html", }, { "fullName": "Creative Commons Attribution 1.0 Generic", "identifier": "CC-BY-1.0", "url": "https://spdx.org/licenses/CC-BY-1.0.html", }, ] # add additional licensing info provided by the user to the lookup table if moreLicenseInfo: if "https://spdx.org/licenses/" in moreLicenseInfo[2]: addition = { "fullName": moreLicenseInfo[0], "identifier": moreLicenseInfo[1], "url": moreLicenseInfo[2], } common_licenses.append(addition) else: raise ValueError( "Improper URL provided: Ensure that the URL" "is pulled from the SPDX repo at" "https://github.com/spdx/license-list-data/tree/main" "and that it contains the text 'https://spdx.org/licenses/'" ) if content_url: # basic format for item entry = { "@type": "DataDownload", "name": "SPASE metadata for dataset", "description": "The SPASE metadata describing the indicated dataset.", "encodingFormat": "application/xml", "contentUrl": content_url, "identifier": content_url, } # if spase-metadata.org landing page not used as top-level @id, include here as @id if doi: entry["@id"] = content_url if metadata_license: # find URL associated w license found in top-level SPASE line license_url = [] for meta_license in metadata_license: for each in common_licenses: if each["fullName"] == meta_license: license_url.append(each["url"]) # if license is not in lookup table if not license_url: # find license info from SPDX data file at # https://github.com/spdx/license-list-data/tree/main # and add to common_licenses dictionary OR provide the # fullName, identifier, and URL (in that order) as arguments # to the conversion function. Then rerun script for those that failed. pass else: entry["license"] = license_url # if date modified is available, add it if date_modified: entry["dateModified"] = date_modified subject_of = entry else: subject_of = None return delete_null_values(subject_of)
[docs] def get_distribution(self) -> Union[List[Dict], None]: # Mapping: schema:distribution = /spase:AccessInformation/spase:AccessURL/spase:URL # (if URL is a direct link to download data) # AND /spase:AccessInformation/spase:Format # Each object is: # {"@type": schema:DataDownload, "content_url": URL, "encodingFormat": Format} # Following schema:DataDownload found at: https://schema.org/DataDownload distribution = [] data_downloads, _ = get_access_urls(self.metadata) for k, v in data_downloads.items(): entry = {"@type": "DataDownload", "contentUrl": k, "encodingFormat": v[0]} # if AccessURL has a name if v[1]: entry["name"] = v[1] distribution.append(entry) if len(distribution) != 0: if len(distribution) == 1: distribution = distribution[0] else: distribution = None return delete_null_values(distribution)
[docs] def get_potential_action(self) -> Union[List[Dict], None]: # Mapping: schema:potentialAction = /spase:AccessInformation/spase:AccessURL/spase:URL # (if URL is not a direct link to download data) # AND /spase:AccessInformation/spase:Format # Following schema:potentialAction found at: https://schema.org/potentialAction potential_action_list = [] start_sent = "" end_sent = "" _, potential_actions = get_access_urls(self.metadata) temp_covg = self.get_temporal_coverage() if temp_covg is not None: # obtain trial start and stop times for use in entry description start_sent, end_sent = make_trial_start_and_stop(temp_covg) # potential_actions[url] = [encoding, {"keys": [], "name": ""}] # loop thru all AccessURLs for k, v in potential_actions.items(): prod_keys = v[1]["keys"] name = v[1]["name"] encoding = v[0] # regex pattern for DateTime objects pattern = ( "(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-" "(3[01]|0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9])" ":([0-5][0-9])(.[0-9]+)?(Z)?" ) multiple = False # most basic format for a potentialAction item entry = { "@type": "SearchAction", "target": { "@type": "EntryPoint", "contentType": encoding, "url": k, "description": f"Download dataset data as {encoding} file at this URL", }, } # if link has no prod_key if prod_keys == []: # if not an ftp link, include url as @id if "ftp" not in k: entry["target"]["@id"] = k entry["target"]["identifier"] = k # if name available, add it if name: entry["target"]["name"] = name potential_action_list.append(entry) else: # if name available, add it if name: entry["target"]["name"] = name # find if multiple product keys if len(prod_keys) > 1: multiple = True # let user know of product key names in description # unneeded for HelioCloud API since productKey already in URL if "api.heliocloud.org/cloudcatalog" not in k: entry["target"]["description"] += ( f" using these product key(s): {str(prod_keys)}" ) # if link is a hapi link, provide the hapi interface # web service to download data if "/hapi" in k: # additions needed for each hapi link query_format = [ { "@type": "PropertyValueSpecification", "valueName": "start", "description": f"A UTC ISO DateTime. {start_sent}", "valueRequired": False, "valuePattern": f"{pattern}", }, { "@type": "PropertyValueSpecification", "valueName": "end", "description": f"A UTC ISO DateTime. {end_sent}", "valueRequired": False, "valuePattern": f"{pattern}", }, ] if "url" in entry["target"].keys(): entry["target"].pop("url") # if multiple product keys, keep track of all of them if multiple: entry["target"]["urlTemplate"] = [] for prod_key in prod_keys: prod_key = prod_key.replace('"', "") entry["target"]["urlTemplate"].append( f"{k}/data?id={prod_key}&time.min={{start}}&time.max={{end}}" ) else: prod_keys[0] = prod_keys[0].replace('"', "") entry["target"]["urlTemplate"] = ( f"{k}/data?id={prod_keys[0]}&time.min={{start}}&time.max={{end}}" ) entry["target"]["description"] = ( "Download dataset labeled by id in CSV format based on " "the requested start and end dates" ) entry["target"]["httpMethod"] = "GET" entry["query-input"] = query_format # if not ftp link, include url as @id if "ftp" not in k: entry["target"]["@id"] = k entry["target"]["identifier"] = k potential_action_list.append(entry) if len(potential_action_list) != 0: potential_action = potential_action_list else: potential_action = None return delete_null_values(potential_action)
[docs] def get_date_created(self) -> Union[str, None]: # Mapping: schema:dateCreated = spase:ResourceHeader/ # spase:PublicationInfo/spase:PublicationDate # OR spase:ResourceHeader/spase:RevisionHistory/spase:ReleaseDate # Using schema:DateTime as defined in: https://schema.org/DateTime date_created = self.get_date_published() # release, revisions = get_dates(self.metadata) # if revisions == []: # date_created = str(release).replace(" ", "T") # find earliest date in revision history # else: # print("RevisionHistory found!") # date_created = str(revisions[0]) # if len(revisions) > 1: # for i in range(1, len(revisions)): # if (revisions[i] < revisions[i-1]): # date_created = str(revisions[i]) # date_created = date_created.replace(" ", "T") return delete_null_values(date_created)
[docs] def get_date_modified(self) -> Union[str, None]: # Mapping: schema:dateModified = spase:ResourceHeader/spase:ReleaseDate # Using schema:DateTime as defined in: https://schema.org/DateTime # trigger = False release, _ = get_dates(self.metadata) date_modified = str(release).replace(" ", "T") # date_created = date_modified # confirm that ReleaseDate is the latest date in the record # if revisions != []: # print("RevisionHistory found!") # find latest date in revision history # date_created = str(revisions[0]) # if len(revisions) > 1: # for i in range(1, len(revisions)): # if (revisions[i] > revisions[i-1]): # date_created = str(revisions[i]) # print(date_created) # print(date_modified) # raise Error if releaseDate is not the latest in RevisionHistory # if datetime.strptime(date_created, "%Y-%m-%d %H:%M:%S") != release: # raise ValueError("ReleaseDate is not the latest date in the record!") # trigger = True return delete_null_values(date_modified)
[docs] def get_date_published(self) -> Union[str, None]: # Mapping: schema:datePublished = spase:ResourceHeader/ # spase:PublicationInfo/spase:PublicationDate # OR spase:ResourceHeader/spase:RevisionHistory/spase:ReleaseDate # Using schema:DateTime as defined in: https://schema.org/DateTime _, _, pub_date, _, _, _, _, _ = get_authors(self.metadata) date_published = None _, revisions = get_dates(self.metadata) if pub_date == "": if revisions: # find earliest date in revision history date_published = str(revisions[0]) if len(revisions) > 1: for i in range(1, len(revisions)): if revisions[i] < revisions[i - 1]: date_published = str(revisions[i]) date_published = date_published.replace(" ", "T") date_published = date_published.replace("Z", "") else: date_published = pub_date.replace(" ", "T") date_published = date_published.replace("Z", "") return delete_null_values(date_published)
[docs] def get_expires(self) -> None: expires = None return delete_null_values(expires)
[docs] def get_temporal_coverage(self) -> Union[str, Dict, None]: # Mapping: schema:temporal_coverage = spase:TemporalDescription/spase:TimeSpan/* # Each object is: # {temporalCoverage: StartDate and StopDate|RelativeStopDate} # Result is either schema:Text or schema:DateTime, # found at https://schema.org/Text and https://schema.org/DateTime # Using format as defined in: 'https://github.com/ESIPFed/science-on-schema # .org/blob/main/guides/Dataset.md#temporal-coverage' desired_tag = self.desired_root.tag.split("}") spase_location = ( ".//spase:" + f"{desired_tag[1]}/spase:TemporalDescription/spase:TimeSpan/spase:StartDate" ) start = self.metadata.findtext( spase_location, namespaces=self.namespaces, ) spase_location = ( ".//spase:" + f"{desired_tag[1]}/spase:TemporalDescription/spase:TimeSpan/spase:StopDate" ) stop = self.metadata.findtext( spase_location, namespaces=self.namespaces, ) if start: if stop: # temporal_coverage = { # "@type": "DateTime", # "temporalCoverage": f"{start.strip()}/{stop.strip()}", # } temporal_coverage = f"{start.strip()}/{stop.strip()}" # in case there is a RelativeStopDate else: temporal_coverage = f"{start}/.." else: temporal_coverage = None return delete_null_values(temporal_coverage)
[docs] def get_spatial_coverage(self) -> Union[List[Dict], None]: # Mapping: schema:spatial_coverage = list of spase:NumericalData/spase:ObservedRegion spatial_coverage = [] desired_tag = self.desired_root.tag.split("}") spase_location = ".//spase:" + f"{desired_tag[1]}/spase:ObservedRegion" all_regions = self.metadata.findall(spase_location, namespaces=self.namespaces) for item in all_regions: # Split string on '.' pretty_name = item.text.replace(".", " ") # most basic entry for spatialCoverage entry = { "@type": "Place", "keywords": { "@type": "DefinedTerm", "inDefinedTermSet": { "@id": "https://spase-group.org/data/" + "model/spase-latest/spase-latest_xsd.htm#Region" }, "termCode": item.text, }, "name": pretty_name, } # if this is the first item added, add additional info for DefinedTermSet if all_regions.index(item) == 0: entry["keywords"]["inDefinedTermSet"]["@type"] = "DefinedTermSet" entry["keywords"]["inDefinedTermSet"]["name"] = "SPASE Region" entry["keywords"]["inDefinedTermSet"]["url"] = ( "https://spase-group.org/data/model/spase-latest" "/spase-latest_xsd.htm#Region" ) spatial_coverage.append(entry) if len(spatial_coverage) == 0: spatial_coverage = None return delete_null_values(spatial_coverage)
[docs] def get_creator(self) -> Union[List[Dict], None]: # Mapping: schema:creator = spase:ResourceHeader/spase:PublicationInfo/spase:Authors # OR schema:creator = spase:ResourceHeader/spase:Contact/spase:PersonID # Each item is: # {@type: Role, roleName: Contact Role, creator: # {@type: Person, name: Author Name, givenName: # First Name, familyName: Last Name}} # plus the additional properties if available: affiliation and identifier (ORCiD ID), # which are pulled from SMWG Person SPASE records # Using schema:Creator as defined in: https://schema.org/creator creator = [] multiple = False matching_contact = False given_name = "" family_name = "" home_dir = str(Path.home()).replace("\\", "/") ( author, author_role, *_, contacts_list, ) = get_authors(self.metadata, self.file.replace(f"{home_dir}/", "")) author_str = str(author).replace("[", "").replace("]", "") if author: # if creators were found in Contact/PersonID if "Person/" in author_str: # if multiple found, split them and iterate thru one by one if "'," in author_str: multiple = True for person in author: if multiple: # keep track of position so roles will match index = author.index(person) else: index = 0 # split text from Contact into properly formatted name fields author_str, given_name, family_name = name_splitter(person) # get additional info (if any) # uncomment if making snapshot and also add '**kwargs: dict' as parameter # if not kwargs: orcid_id, affiliation, ror = get_orcid_and_affiliation( person, self.file ) """else: orcid_id = "" ror = "" affiliation = """ # create the dictionary entry for that person and append to list creator_entry = person_format( "creator", author_role[index], author_str, given_name, family_name, affiliation, orcid_id, ror, ) creator.append(creator_entry) # if creators were found in PublicationInfo/Authors else: # if there are multiple authors if len(author) > 1: # get rid of extra quotations for num, each in enumerate(author): if "'" in each: author[num] = each.replace("'", "") # iterate over each person in author string for person in author: matching_contact = False index = author.index(person) family_name, _, given_name = person.partition(", ") # find matching person in contacts, if any, to retrieve # affiliation and ORCiD for key, val in contacts_list.items(): if not matching_contact: if person == val: matching_contact = True # uncomment if making snapshot # if not kwargs: orcid_id, affiliation, ror = ( get_orcid_and_affiliation(key, self.file) ) """else: orcid_id = "" ror = "" affiliation = """ creator_entry = person_format( "creator", author_role[index], person, given_name, family_name, affiliation, orcid_id, ror, ) if not matching_contact: creator_entry = person_format( "creator", author_role[index], person, given_name, family_name, ) creator.append(creator_entry) # if there is only one author listed else: # get rid of extra quotations person = author_str.replace('"', "") person = author_str.replace("'", "") # determine if creator is a consortium with open( importlib.resources.files("soso.strategies.spase").joinpath( "spase-ignoreCreatorSplit.txt" ), "r", encoding="utf-8", ) as f: do_not_split = f.read() if (", " in person) and ("Consortium" not in person): # if file is not in list of ones to not have their creators split if self.file.replace(home_dir, "") not in do_not_split: family_name, _, given_name = person.partition(", ") # find matching person in contacts, if any, to get affiliation and ORCiD for key, val in contacts_list.items(): if not matching_contact: if person == val: matching_contact = True # uncomment if making snapshot # if not kwargs: orcid_id, affiliation, ror = ( get_orcid_and_affiliation(key, self.file) ) """else: orcid_id = "" ror = "" affiliation = """ creator_entry = person_format( "creator", author_role[0], person, given_name, family_name, affiliation, orcid_id, ror, ) if not matching_contact: creator_entry = person_format( "creator", author_role[0], person, given_name, family_name, ) creator.append(creator_entry) # no comma OR has 'Consortium' = organization = no givenName and familyName else: creator_entry = person_format( "creator", author_role[0], person, "", "" ) creator.append(creator_entry) # preserve order of elements if len(creator) != 0: if len(creator) > 1: creator = {"@list": creator} else: creator = None return delete_null_values(creator)
[docs] def get_contributor(self) -> Union[List[Dict], None]: # Mapping: schema:contributor = spase:ResourceHeader/spase:Contact/spase:PersonID # Each item is: # {@type: Role, roleName: Contributor or curator role, # contributor: {@type: Person, name: Author Name, # givenName: First Name, familyName: Last Name}} # plus the additional properties if available: affiliation and identifier (ORCiD ID), # which are pulled from SMWG Person SPASE records # Using schema:Person as defined in: https://schema.org/Person *_, contributors, _, backups, contacts_list = get_authors(self.metadata) contributor = [] first_contrib = True # holds role values that are not initially considered for contributor var curator_roles = [ "HostContact", "GeneralContact", "DataProducer", "MetadataContact", "TechnicalContact", ] # Step 1: check for ppl w author roles that were not found in PubInfo for key, val in contacts_list.items(): # used so that DefinedTermSet info not repeated in output if contributor: first_contrib = False if "." not in val: # split contact into name, first name, and last name contributor_str, given_name, family_name = name_splitter(key) # attempt to get ORCiD and affiliation orcid_id, affiliation, ror = get_orcid_and_affiliation(key, self.file) # if contact has more than 1 role if len(val) > 1: individual = person_format( "contributor", val, contributor_str, given_name, family_name, affiliation, orcid_id, ror, first_contrib, ) else: individual = person_format( "contributor", val[0], contributor_str, given_name, family_name, affiliation, orcid_id, ror, first_contrib, ) contributor.append(individual) # Step 2a: check for non-author role contributors found in Contacts if contributors: for person in contributors: # used so that DefinedTermSet info not repeated in output if contributor: first_contrib = False # split contact into name, first name, and last name contributor_str, given_name, family_name = name_splitter(person) # add call to get ORCiD and affiliation orcid_id, affiliation, ror = get_orcid_and_affiliation( person, self.file ) individual = person_format( "contributor", "Contributor", contributor_str, given_name, family_name, affiliation, orcid_id, ror, first_contrib, ) contributor.append(individual) # Step 2b: if no non-author role contributor is found, use backups (editors/curators) else: found = False i = 0 # while a curator is not found while not found and i < len(curator_roles): # used so that DefinedTermSet info not repeated in output if contributor: first_contrib = False # search for roles in backups that match curator_roles (in order of priority) keys = [key for key, val in backups.items() if curator_roles[i] in val] if keys != []: for key in keys: # split contact into name, first name, and last name editor_str, given_name, family_name = name_splitter(key) # add call to get ORCiD and affiliation orcid_id, affiliation, ror = get_orcid_and_affiliation( key, self.file ) individual = person_format( "contributor", curator_roles[i], editor_str, given_name, family_name, affiliation, orcid_id, ror, first_contrib, ) contributor.append(individual) found = True i += 1 # preserve order of elements if len(contributor) != 0: if len(contributor) > 1: contributor = {"@list": contributor} else: contributor = None return delete_null_values(contributor)
[docs] def get_provider(self) -> None: provider = None return delete_null_values(provider)
[docs] def get_publisher(self) -> Union[Dict, None]: # Mapping: schema:publisher = spase:ResourceHeader/spase:Contacts # OR spase:ResourceHeader/spase:PublicationInfo/spase:PublishedBy # Each item is: # {@type: Organization, name: PublishedBy OR Contact (if Role = Publisher)} # Using schema:Organization as defined in: https://schema.org/Organization ( *_, publisher, _, _, _, _, ) = get_authors(self.metadata) # ror = None # commented out ROR for now until capability added in SPASE """if 'spase://' in publisher: ORCiD, affil, ror = get_orcid_and_affiliation(publisher) else: # add full SPASE path to publisher name # how to do that??? ORCiD, affil, ror = get_orcid_and_affiliation(publisher) if ror: publisher = {"@id": ror, "@type": "Organization", "name": publisher, "identifier": ror} else:""" if publisher == "": publisher = None else: publisher = {"@type": "Organization", "name": publisher} return delete_null_values(publisher)
[docs] def get_funding(self) -> Union[List[Dict], None]: # Mapping: schema:funding = spase:ResourceHeader/spase:Funding/spase:Agency # AND spase:ResourceHeader/spase:Funding/spase:Project # AND spase:ResourceHeader/spase:Funding/spase:AwardNumber # Each item is: # {@type: MonetaryGrant, funder: {@type: Organization, name: Agency}, name: Project} # Using schema:MonetaryGrant as defined in: https://schema.org/MonetaryGrant funding = [] agency = [] project = [] award = [] # ror = None # iterate thru to find all info related to funding for child in self.desired_root.iter(tag=etree.Element): if child.tag.endswith("Funding"): target_child = child for child in target_child: if child.tag.endswith("Agency"): agency.append(child.text) elif child.tag.endswith("Project"): project.append(child.text) elif child.tag.endswith("AwardNumber"): award.append(child.text) # if funding info was found if agency: i = 0 # ror = get_ROR(agency) for funder in agency: # basic format for funding item entry = { "@type": "MonetaryGrant", "funder": {"@type": "Organization", "name": funder}, "name": project[i], } if award: entry["identifier"] = award[i] """if ror: entry["funder"]["@id"] = ror entry["funder"]["identifier"] = ror""" funding.append(entry) i += 1 if len(funding) != 0: if len(funding) == 1: funding = funding[0] else: funding = None return delete_null_values(funding)
[docs] def get_license(self) -> Union[List, None]: # Mapping: schema:license = spase:AccessInformation/spase:RightsList/spase:Rights # Using schema:license as defined in: https://schema.org/license licenses = [] """<RightsList> <Rights> <SchemeURI>https://spdx.org/licenses/</SchemeURI> <RightsIdentifierScheme>SPDX</RightsIdentifierScheme> <RightsIdentifier>CC0-1.0</RightsIdentifier> <RightsURI>https://spdx.org/licenses/CC0-1.0.html</RightsURI> <RightsName>Creative Commons Zero v1.0 Universal</RightsName> <Note>CC0 1.0 Universal is the Creative Commons license applicable to all publicly available NASA Heliophysics data products</Note> </Rights> </RightsList>""" desired_tag = self.desired_root.tag.split("}") rights_uri = None spase_location = ( ".//spase:" + f"{desired_tag[1]}/spase:AccessInformation/spase:RightsList/spase:Rights" ) for item in self.metadata.findall( spase_location, namespaces=self.namespaces, ): for child in item.iter(tag=etree.Element): if child.tag.endswith("RightsURI"): rights_uri = child.text if rights_uri not in licenses: licenses.append(rights_uri) if not licenses: licenses = None # elif len(licenses) == 1: # licenses = licenses[0] return delete_null_values(licenses)
[docs] def get_was_revision_of(self) -> Union[List[Dict], Dict, None]: # Mapping: prov:wasRevisionOf = spase:Association/spase:AssociationID # (if spase:AssociationType is "RevisionOf") # prov:wasRevisionOf found at https://www.w3.org/TR/prov-o/#wasRevisionOf was_revision_of = get_relation(self.desired_root, ["RevisionOf"], self.file) return delete_null_values(was_revision_of)
[docs] def get_was_derived_from(self) -> Union[Dict, None]: # Mapping: schema:wasDerivedFrom = spase:Association/spase:AssociationID # (if spase:AssociationType is "DerivedFrom" or "ChildEventOf") # schema:wasDerivedFrom found at https://www.w3.org/TR/prov-o/#wasDerivedFrom was_derived_from = None # same mapping as is_based_on was_derived_from = self.get_is_based_on() return delete_null_values(was_derived_from)
[docs] def get_is_based_on(self) -> Union[List[Dict], Dict, None]: # Mapping: schema:isBasedOn = spase:Association/spase:AssociationID # (if spase:AssociationType is "DerivedFrom" or "ChildEventOf") # schema:isBasedOn found at https://schema.org/isBasedOn is_based_on = get_relation( self.desired_root, ["ChildEventOf", "DerivedFrom"], self.file ) return delete_null_values(is_based_on)
[docs] def get_was_generated_by(self) -> Union[List[Dict], None]: # Mapping: prov:wasGeneratedBy = spase:InstrumentID/spase:ResourceID # and spase:InstrumentID/spase:ResourceHeader/spase:ResourceName # AND spase:InstrumentID/spase:ObservatoryID/spase:ResourceID # and spase:InstrumentID/spase:ObservatoryID/spase:ResourceHeader/spase:ResourceName # AND spase:InstrumentID/spase:ObservatoryID/spase:ObservatoryGroupID/spase:ResourceID # and spase:InstrumentID/spase:ObservatoryID/spase:ObservatoryGroupID/ # spase:ResourceHeader/spase:ResourceName # prov:wasGeneratedBy found at https://www.w3.org/TR/prov-o/#wasGeneratedBy # commenting out observatories because of the email with Baptiste and Donny instruments = get_instrument(self.metadata, self.file) # only uncomment if trying to generate snapshot spase.json # instruments = get_instrument( # self.metadata, self.file, **{"testing": "soso-spase/tests/data/spase/"} # ) # observatories = get_observatory(self.metadata, self.file) was_generated_by = [] # if observatories: # for each in observatories: # was_generated_by.append({"@type": ["ResearchProject", "prov:Activity"], # "prov:used": each}) if instruments: for each in instruments: was_generated_by.append( {"@type": ["ResearchProject", "prov:Activity"], "prov:used": each} ) if not was_generated_by: was_generated_by = None return delete_null_values(was_generated_by)
# Below are utility functions for the SPASE strategy. def get_schema_version(metadata: etree.ElementTree) -> str: """ :param metadata: The SPASE metadata object as an XML tree. :returns: The version of the SPASE schema used in the metadata record. """ namespace = "" for ns in list(metadata.getroot().nsmap.values()): if "spase-group" in ns: namespace = ns schema_version = metadata.findtext(f"{{{namespace}}}Version") return schema_version def get_authors( metadata: etree.ElementTree, file="PlaceholderText" ) -> tuple[List, List, str, str, List, str, Dict, Dict]: """ Takes an XML tree and scrapes the desired authors (with their roles), publication date, publisher, contributors, and publication title. Also scraped are the names and roles of the backups, which are any Contacts found that are not considered authors. It then returns these items, with the author, author roles, and contributors as lists and the rest as strings, except for the backups which is a dictionary. :param metadata: The SPASE metadata object as an XML tree. :param file: The absolute path of the SPASE record being scraped. :returns: The highest priority authors found within the SPASE record as a list as well as a list of their roles, the publication date, publisher, contributors, and the title of the publication. It also returns any contacts found, along with their role(s) in two separate dictionaries: ones that are not considered for the author role and ones that are. """ # local vars needed author = [] contacts_list = {} author_role = [] pub_date = "" pub = "" contributor = [] dataset = "" backups = {} pi_child = None desired_root = None root = metadata.getroot() if file: file = file.replace("\\", "/") for elt in root.iter(tag=etree.Element): if ( elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData") or elt.tag.endswith("Catalog") or elt.tag.endswith("Collection") ): desired_root = elt # traverse xml to extract needed info # iterate thru to find ResourceHeader if desired_root is not None: for child in desired_root.iter(tag=etree.Element): if child.tag.endswith("ResourceHeader"): target_child = child # iterate thru to find PublicationInfo for child in target_child: try: if child.tag.endswith("PublicationInfo"): pi_child = child elif child.tag.endswith("Contact"): c_child = child # iterate thru Contact to find PersonID and Role for child in c_child: try: # find PersonID if child.tag.endswith("PersonID"): # store PersonID person_id = child.text.strip() backups[person_id] = [] contacts_list[person_id] = [] # find Role elif child.tag.endswith("Role"): # backup author if ( ("PrincipalInvestigator" in child.text) or ("PI" in child.text) or ("CoInvestigator" in child.text) or ("Author" in child.text) ): if person_id not in author: author.append(person_id) author_role.append(child.text.strip()) else: index = author.index(person_id) author_role[index] = [ author_role[index], child.text.strip(), ] # store author roles found here in case PubInfo present contacts_list[person_id] += [ child.text.strip() ] # preferred contributor elif child.text == "Contributor": contributor.append(person_id) # backup publisher (none found in SPASE currently) elif child.text == "Publisher": pub = child.text.strip() else: # use list for values in case one person # has multiple roles # store contacts w non-author roles for # use in contributors backups[person_id] += [child.text.strip()] except AttributeError: continue except AttributeError: continue if pi_child is not None: for child in pi_child.iter(tag=etree.Element): # collect preferred author if child.tag.endswith("Authors"): author = [child.text.strip()] author_role = ["Author"] # collect preferred publication date elif child.tag.endswith("PublicationDate"): pub_date = child.text.strip() # collect preferred publisher elif child.tag.endswith("PublishedBy"): pub = child.text.strip() # collect preferred dataset elif child.tag.endswith("Title"): dataset = child.text.strip() # remove contacts w/o role values contacts_copy = {} for contact, role in contacts_list.items(): if role: contacts_copy[contact] = role # compare author and contacts_list to add author roles # from contacts_list for matching people found in PubInfo # also formats the author list correctly for use in get_creator author, author_role, contacts_list = process_authors( author, author_role, contacts_copy, file ) return ( author, author_role, pub_date, pub, contributor, dataset, backups, contacts_list, ) def get_access_urls(metadata: etree.ElementTree) -> tuple[Dict, Dict]: """ Splits the SPASE AccessURLs present in the record into either the distribution or potentialAction schema.org properties. :param metadata: The SPASE metadata object as an XML tree. :returns: The AccessURLs found in the SPASE record, separated into two dictionaries, data_downloads and potential_actions, depending on if they are a direct link to data or not. These dictionaries are setup to have the keys as the url and the values to be a list containing their data format(s), name, and product key (if applicable). """ # needed local vars data_downloads = {} potential_actions = {} access_urls = {} encoding = [] encoder = [] i = 0 j = 0 desired_root = None root = metadata.getroot() for elt in root.iter(tag=etree.Element): if ( elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData") or elt.tag.endswith("Catalog") or elt.tag.endswith("Collection") ): desired_root = elt # get Formats before iteration due to order of elements in SPASE record desired_tag = desired_root.tag.split("}") spase_location = ( ".//spase:" + f"{desired_tag[1]}/spase:AccessInformation/spase:Format" ) namespace = "" for ns in list(root.nsmap.values()): if "spase-group" in ns: namespace = ns for item in metadata.findall(spase_location, namespaces={"spase": namespace}): encoding.append(item.text) # traverse xml to extract needed info # iterate thru children to locate Access Information for child in desired_root.iter(tag=etree.Element): if child.tag.endswith("AccessInformation"): target_child = child # iterate thru children to locate AccessURL and Format for child in target_child: if child.tag.endswith("AccessURL"): target_child = child name = "" # iterate thru children to locate URL for child in target_child: if child.tag.endswith("URL"): url = child.text # provide "NULL" value in case no keys are found access_urls[url] = {"keys": [], "name": name} # append an encoder for each URL encoder.append(encoding[j]) # check if URL has a product key elif child.tag.endswith("ProductKey"): prod_key = child.text # if only one prod_key exists if access_urls[url]["keys"] == []: access_urls[url]["keys"] = [prod_key] # if multiple prod_keys exist else: access_urls[url]["keys"] += [prod_key] elif child.tag.endswith("Name"): name = child.text j += 1 for k, v in access_urls.items(): # if URL has no access key if not v["keys"]: # non_data_file_ext = ["html", "com", "gov", "edu", "org", "eu", "int"] data_file_ext = [ "csv", "cdf", "fits", "txt", "nc", "jpeg", "png", "gif", "tar", "netcdf3", "netcdf4", "hdf5", "zarr", "asdf", "zip", ] substring = k.split("://") domain = substring[1] domain, _, download_file = domain.rpartition("/") download_file, _, ext = download_file.rpartition(".") # see if file extension is one associated w data files if ext not in data_file_ext: downloadable = False else: downloadable = True # if URL is direct link to download data, add to the data_downloads dictionary if downloadable: if v["name"]: data_downloads[k] = [encoder[i], v["name"]] else: data_downloads[k] = [encoder[i]] else: potential_actions[k] = [encoder[i], v] # if URL has access key, add to the potential_actions dictionary else: potential_actions[k] = [encoder[i], v] i += 1 return data_downloads, potential_actions def get_dates( metadata: etree.ElementTree, ) -> Union[tuple[datetime, List[datetime]], tuple[str, List]]: """ Scrapes the ReleaseDate and RevisionHistory:ReleaseDate(s) SPASE properties for use in the dateModified, dateCreated, and datePublished schema.org properties. :param metadata: The SPASE metadata object as an XML tree. :returns: The ReleaseDate and a list of all the dates found in RevisionHistory """ desired_root = None root = metadata.getroot() for elt in root.iter(tag=etree.Element): if ( elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData") or elt.tag.endswith("Collection") or elt.tag.endswith("Catalog") ): desired_root = elt revision_history = [] release_date = "" # traverse xml to extract needed info for child in desired_root.iter(tag=etree.Element): if child.tag.endswith("ResourceHeader"): target_child = child for child in target_child: # find ReleaseDate and construct datetime object from the string try: if child.tag.endswith("ReleaseDate"): date, _, time_str = child.text.partition("T") if "Z" in child.text: time_str = time_str.replace("Z", "") if "." in child.text: time_str, _, _ = time_str.partition(".") dt_string = date + " " + time_str dt_obj = datetime.strptime(dt_string, "%Y-%m-%d %H:%M:%S") release_date = dt_obj elif child.tag.endswith("RevisionHistory"): rev_hist_child = child for child in rev_hist_child: rev_ev_child = child for child in rev_ev_child: if child.tag.endswith("ReleaseDate"): date, _, time_str = child.text.partition("T") if "Z" in child.text: time_str = time_str.replace("Z", "") if "." in child.text: time_str, _, _ = time_str.partition(".") dt_string = date + " " + time_str try: dt_obj = datetime.strptime( dt_string, "%Y-%m-%d %H:%M:%S" ) # catch error when RevisionHistory is not formatted w time except ValueError: dt_obj = datetime.strptime( dt_string.strip(), "%Y-%m-%d" ).date() finally: revision_history.append(dt_obj) except AttributeError: continue return release_date, revision_history def person_format( person_type: str, role_name: Union[str, List], name: str, given_name: str, family_name: str, affiliation: str = "", orcid_id: str = "", ror: str = "", first_entry: bool = False, ) -> Dict: """ Groups up all available metadata associated with a given contact into a dictionary following the SOSO guidelines. :param person_type: The type of person being formatted. Values can be either: contributor or creator. :param role_name: The value found in the Role field associated with this Contact :param name: The full name of the Contact, as formatted in the SPASE record :param given_name: The first name/initial and middle name/initial of the Contact :param family_name: The last name of the Contact :param affiliation: The organization this Contact is affiliated with. :param orcid_id: The ORCiD identifier for this Contact :param ror: The ROR ID for the associated affiliation :param first_entry: Boolean signifying if this person is the first entry into its respective property result. :returns: The entry in the correct format to append to the contributor or creator dictionary """ *_, orcid_val = orcid_id.rpartition("/") entry = None if name: # add check for organization if ( ", " in name or ". " in name or (given_name and family_name) or "_" in name ) and ("Consortium" not in name): item_type = "Person" else: item_type = "Organization" # most basic format for creator item if person_type == "creator": entry = {"@type": item_type, "name": name} if (given_name and family_name) and item_type == "Person": entry["familyName"] = family_name.strip() entry["givenName"] = given_name.strip() elif person_type == "contributor": if isinstance(role_name, list): pretty_name = [] for role in role_name: # Split string on uppercase characters res = re.split(r"(?=[A-Z])", role) # prevent 'PI' from turning into 'P I' if "PI" in role: first, sep, _ = role.partition("PI") if "Co" in first: separated_name = first + "-" + sep else: separated_name = first + " " + sep # Remove empty strings and join with space or hypen depending on role elif "Co" in role: pattern = r"{}(?=[A-Z])".format(re.escape("Co")) if bool(re.search(pattern, role)): separated_name = "-".join(filter(None, res)) else: separated_name = " ".join(filter(None, res)) else: separated_name = " ".join(filter(None, res)) pretty_name.append(separated_name.strip()) else: # Split string on uppercase characters res = re.split(r"(?=[A-Z])", role_name) # prevent 'PI' from turning into 'P I' if "PI" in role_name: first, sep, _ = role_name.partition("PI") if "Co" in first: pretty_name = first + "-" + sep else: pretty_name = first + " " + sep # Remove empty strings and join with space or hypen depending on role_name elif "Co" in role_name: pattern = r"{}(?=[A-Z])".format(re.escape("Co")) if bool(re.search(pattern, role_name)): pretty_name = "-".join(filter(None, res)) else: pretty_name = " ".join(filter(None, res)) else: pretty_name = " ".join(filter(None, res)) pretty_name = pretty_name.strip() # most basic format for contributor item entry = { "@type": ["Role", "DefinedTerm"], "contributor": {"@type": item_type, "name": name}, "inDefinedTermSet": { "@id": "https://spase-group.org/data/model/spase-latest/" + "spase-latest_xsd.htm#Role" }, "roleName": pretty_name, "termCode": role_name, } if (given_name and family_name) and item_type == "Person": entry["contributor"]["familyName"] = family_name.strip() entry["contributor"]["givenName"] = given_name.strip() if first_entry: entry["inDefinedTermSet"]["@type"] = "DefinedTermSet" entry["inDefinedTermSet"]["name"] = "SPASE Role" entry["inDefinedTermSet"]["url"] = ( "https://spase-group.org/data/model/spase-latest/spase-latest_xsd.htm#Role" ) if item_type == "Person": if orcid_id: if person_type == "contributor": entry[f"{person_type}"]["identifier"] = { "@id": f"https://orcid.org/{orcid_id}", "@type": "PropertyValue", "propertyID": "https://registry.identifiers.org/registry/orcid", "url": f"https://orcid.org/{orcid_id}", "value": f"orcid:{orcid_val}", } entry[f"{person_type}"]["@id"] = f"https://orcid.org/{orcid_id}" else: entry["identifier"] = { "@id": f"https://orcid.org/{orcid_id}", "@type": "PropertyValue", "propertyID": "https://registry.identifiers.org/registry/orcid", "url": f"https://orcid.org/{orcid_id}", "value": f"orcid:{orcid_val}", } entry["@id"] = f"https://orcid.org/{orcid_id}" if affiliation: if person_type == "contributor": if ror: entry["contributor"]["affiliation"] = { "@type": "Organization", "name": affiliation, "identifier": { "@id": f"https://ror.org/{ror}", "@type": "PropertyValue", "propertyID": "https://registry.identifiers.org/registry/ror", "url": f"https://ror.org/{ror}", "value": f"ror:{ror}", }, } else: entry["contributor"]["affiliation"] = { "@type": "Organization", "name": affiliation, } else: if ror: entry["affiliation"] = { "@type": "Organization", "name": affiliation, "identifier": { "@id": f"https://ror.org/{ror}", "@type": "PropertyValue", "propertyID": "https://registry.identifiers.org/registry/ror", "url": f"https://ror.org/{ror}", "value": f"ror:{ror}", }, } else: entry["affiliation"] = { "@type": "Organization", "name": affiliation, } return entry def name_splitter(person: str) -> tuple[str, str, str]: """ Splits the given PersonID found in the SPASE Contacts container into three separate strings holding their full name, first name (and middle initial), and last name. :param person: The string found in the Contacts field as is formatted in the SPASE record. :returns: The string containing the full name of the Contact, the string containing the first name/initial of the Contact, and the string containing the last name of the Contact """ if person: *_, name_str = person.partition("Person/") # get rid of extra quotations name_str = name_str.replace("'", "") if "." in name_str: given_name, _, family_name = name_str.partition(".") # if first name is also initial if len(given_name) == 1: given_name += "." # if person has a generational suffix if ( family_name.endswith(".II") or family_name.endswith(".III") or family_name.endswith(".Jr") or family_name.endswith(".Sr") ): family_name, _, suffix = family_name.rpartition(".") family_name = family_name + " " + suffix # if name has initial(s) while "." in family_name: initial, _, family_name = family_name.partition(".") if len(initial) > 1: initial = initial[0] given_name = given_name + " " + initial + "." name_str = given_name + " " + family_name name_str = name_str.replace('"', "") else: given_name = "" family_name = "" else: raise ValueError( "This function only takes a nonempty string as an argument. Try again." ) return name_str, given_name, family_name def get_information_url(metadata: etree.ElementTree) -> Union[List[Dict], None]: """ Returns all relevant information from the SPASE informationURL(s) property for use within the schema.org citation property. :param metadata: The SPASE metadata object as an XML tree. :returns: The name, description, and url(s) for all InformationURL sections found in the ResourceHeader, formatted as a list of dictionaries. """ root = metadata.getroot() information_url = [] name = "" description = "" url = "" desired_root = None for elt in root.iter(tag=etree.Element): if ( elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData") or elt.tag.endswith("Observatory") or elt.tag.endswith("Instrument") or elt.tag.endswith("Collection") ): desired_root = elt # traverse xml to extract needed info for child in desired_root.iter(tag=etree.Element): if child.tag.endswith("ResourceHeader"): target_child = child # iterate thru children to locate AccessURL and Format for child in target_child: try: if child.tag.endswith("InformationURL"): target_child = child # iterate thru children to locate URL for child in target_child: if child.tag.endswith("Name"): name = child.text elif child.tag.endswith("URL"): url = child.text elif child.tag.endswith("Description"): description = child.text if name: if description: information_url.append( { "name": name, "url": url, "description": description, } ) else: information_url.append({"name": name, "url": url}) else: information_url.append({"url": url}) except AttributeError: continue if not information_url: information_url = None return information_url def get_instrument( metadata: etree.ElementTree, path: str, **kwargs: dict ) -> Union[List[Dict], None]: """ Attempts to retrieve all relevant information associated with all InstrumentID fields found in the SPASE record in order to be used in the prov-o wasGeneratedBy property. :param metadata: The SPASE metadata object as an XML tree. :param path: The absolute file path of the XML file the user wishes to pull info from. :returns: The name, url, and ResourceID for each instrument found in the InstrumentID section, formatted as a list of dictionaries. """ # Mapping: schema:IndividualProduct, prov:Entity, and sosa:System = spase:InstrumentID # schema:IndividualProduct found at https://schema.org/IndividualProduct # prov:Entity found at https://www.w3.org/TR/prov-o/#Entity # sosa:System found at https://w3c.github.io/sdw-sosa-ssn/ssn/#SOSASystem root = metadata.getroot() desired_root = None instrument = [] instrument_ids = {} if path: path = path.replace("\\", "/") for elt in root.iter(tag=etree.Element): if elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData"): desired_root = elt for child in desired_root.iter(tag=etree.Element): if child.tag.endswith("InstrumentID"): instrument_ids[child.text] = {} if not instrument_ids: instrument = None else: # if called by testing function, only test first link if kwargs: for key, val in instrument_ids.items(): if key == "spase://SMWG/Instrument/MMS/4/FIELDS/FGM": instrument_ids = {key: val} # follow link provided by instrumentID to instrument page # from there grab name and url for item in instrument_ids.keys(): instrument_ids[item]["name"] = "" instrument_ids[item]["URL"] = "" # get home directory home_dir = str(Path.home()) home_dir = home_dir.replace("\\", "/") # get current working directory cwd = str(Path.cwd()).replace("\\", "/") # split path into needed substrings if "src/soso/strategies/spase/" in path: abs_path, _, after = path.partition("src/soso/strategies/spase/") else: _, abs_path, after = path.partition(f"{home_dir}/") repo_name, _, after = after.partition("/") # add original SPASE repo to log file that holds name of repos needed update_log(cwd, repo_name, "requiredRepos") # add SPASE repo that contains instruments also repo_name, _, after = item.replace("spase://", "").partition("/") update_log(cwd, repo_name, "requiredRepos") # format record if "src/soso/strategies/spase/" in path: # being called by testing function = change directory to xml file in tests folder # only uncomment these lines if using snapshot creation script # if "soso-spase/" in path: # record = abs_path + item.replace("spase://", "") + ".xml" # else: # if called by CI *_, file_name = item.rpartition("/") record = abs_path + "tests/data/spase/" + f"spase-{file_name}" + ".xml" # to ensure correct file path used for those not found in tests/data if not os.path.isfile(record): if "soso-spase/" in path: abs_path, _, _ = path.partition("soso-spase/") record = abs_path + item.replace("spase://", "") + ".xml" else: record = abs_path + item.replace("spase://", "") + ".xml" record = record.replace("'", "") if os.path.isfile(record): test_spase = SPASE(record) root = test_spase.metadata.getroot() instrument_ids[item]["name"] = test_spase.get_name() instrument_ids[item]["URL"] = test_spase.get_url() else: # add file to log containing problematic records/files if os.path.exists(temp_file_path): temp_file.seek(0) if temp_file.read(): temp_file.write(f", {record}") else: temp_file.write(f"{record}") for k in instrument_ids.keys(): if instrument_ids[k]["URL"]: instrument.append( { "@id": instrument_ids[k]["URL"], "@type": ["IndividualProduct", "prov:Entity", "sosa:System"], "identifier": { "@id": instrument_ids[k]["URL"], "@type": "PropertyValue", "propertyID": "SPASE Resource ID", "value": k, }, "name": instrument_ids[k]["name"], "url": instrument_ids[k]["URL"], } ) return instrument def get_observatory(metadata: etree.ElementTree, path: str) -> Union[List[Dict], None]: """ Uses the get_instrument function to attempt to retrieve all relevant information associated with any ObservatoryID (and ObservatoryGroupID) fields found in their related SPASE records in order to be used in the prov-o wasGeneratedBy property. :param metadata: The SPASE metadata object as an XML tree. :param path: The absolute file path of the XML file the user wishes to pull info from. :returns: The name, url, and ResourceID for each observatory related to this dataset, formatted as a list of dictionaries. """ # Mapping: schema:ResearchProject, prov:Entity, and sosa:Platform = # spase:InstrumentID/spase:ObservatoryID # AND spase:InstrumentID/spase:ObservatoryID/spase:ObservatoryGroupID if available # schema:ResearchProject found at https://schema.org/ResearchProject # prov:Entity found at https://www.w3.org/TR/prov-o/#Entity # sosa:Platform found at https://w3c.github.io/sdw-sosa-ssn/ssn/#SOSAPlatform instrument = get_instrument(metadata, path) if instrument is not None: observatory = [] observatory_group_id = "" observatory_id = "" recorded_ids = [] instrument_ids = [] if path: path = path.replace("\\", "/") for each in instrument: instrument_ids.append(each["identifier"]["value"]) for item in instrument_ids: # get home directory home_dir = str(Path.home()) home_dir = home_dir.replace("\\", "/") # get current working directory cwd = str(Path.cwd()).replace("\\", "/") # split path into needed substrings if "src/soso/strategies/spase/" in path: abs_path, _, after = path.partition("src/soso/strategies/spase/") else: _, abs_path, after = path.partition(f"{home_dir}/") repo_name, _, after = after.partition("/") # add original SPASE repo to log file that holds name of repos needed update_log(cwd, repo_name, "requiredRepos") if "src/soso/strategies/spase/" in path: # being called by testing function = change directory # to xml file in tests folder *_, file_name = item.rpartition("/") record = abs_path + "tests/data/spase/" + f"spase-{file_name}" + ".xml" else: record = abs_path + item.replace("spase://", "") + ".xml" record = record.replace("'", "") # follow link provided by instrument to instrument page, # from there grab ObservatoryID if os.path.isfile(record): test_spase = SPASE(record) root = test_spase.metadata.getroot() for elt in root.iter(tag=etree.Element): if elt.tag.endswith("Instrument"): desired_root = elt for child in desired_root.iter(tag=etree.Element): if child.tag.endswith("ObservatoryID"): observatory_id = child.text # add SPASE repo that contains observatories to log file also repo_name, _, after = observatory_id.replace("spase://", "").partition( "/" ) update_log(cwd, repo_name, "requiredRepos") # use observatory_id as record to get observatory_group_id and other info if "src/soso/strategies/spase/" in path: # being called by test function = change directory to xml file in tests folder *_, file_name = observatory_id.rpartition("/") record = ( abs_path + "tests/data/spase/" + f"spase-MMS-{file_name}" + ".xml" ) else: record = abs_path + observatory_id.replace("spase://", "") + ".xml" record = record.replace("'", "") if os.path.isfile(record): url = "" test_spase = SPASE(record) root = test_spase.metadata.getroot() for elt in root.iter(tag=etree.Element): if elt.tag.endswith("Observatory"): desired_root = elt for child in desired_root.iter(tag=etree.Element): if child.tag.endswith("ObservatoryGroupID"): observatory_group_id = child.text name = test_spase.get_name() url = test_spase.get_url() # finally, follow that link to grab name and url from there if observatory_group_id: # add SPASE repo that contains observatory group to log file also repo_name, _, after = observatory_group_id.replace( "spase://", "" ).partition("/") update_log(cwd, repo_name, "requiredRepos") # format record if "src/soso/strategies/spase/" in path: # being called by test function = change directory to xml file in tests # folder *_, file_name = observatory_group_id.rpartition("/") record = ( abs_path + "tests/data/spase/" + f"spase-{file_name}" + ".xml" ) else: record = ( abs_path + observatory_group_id.replace("spase://", "") + ".xml" ) record = record.replace("'", "") if os.path.isfile(record): group_url = "" test_spase = SPASE(record) group_name = test_spase.get_name() group_url = test_spase.get_url() if group_url: if observatory_group_id not in recorded_ids: observatory.append( { "@type": [ "ResearchProject", "prov:Entity", "sosa:Platform", ], "@id": group_url, "name": group_name, "identifier": { "@id": group_url, "@type": "PropertyValue", "propertyID": "SPASE Resource ID", "value": observatory_group_id, }, "url": group_url, } ) recorded_ids.append(observatory_group_id) else: # add obsGrp to log file containing problematic records/files if os.path.exists(temp_file_path): temp_file.seek(0) if temp_file.read(): temp_file.write(f", {record}") else: temp_file.write(f"{record}") if url and (observatory_id not in recorded_ids): observatory.append( { "@type": [ "ResearchProject", "prov:Entity", "sosa:Platform", ], "@id": url, "name": name, "identifier": { "@id": url, "@type": "PropertyValue", "propertyID": "SPASE Resource ID", "value": observatory_id, }, "url": url, } ) recorded_ids.append(observatory_id) else: if os.path.exists(temp_file_path): temp_file.seek(0) if temp_file.read(): temp_file.write(f", {record}") else: temp_file.write(f"{record}") else: observatory = None return observatory def get_alternate_name(metadata: etree.ElementTree) -> Union[str, None]: """ :param metadata: The SPASE metadata object as an XML tree. :returns: The alternate name of the dataset as a string. """ root = metadata.getroot() alternate_name = None desired_root = None for elt in root.iter(tag=etree.Element): if ( elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData") or elt.tag.endswith("Collection") ): desired_root = elt for child in desired_root.iter(tag=etree.Element): if child.tag.endswith("ResourceHeader"): target_child = child # iterate thru children to locate AlternateName for dataset for child in target_child: try: if child.tag.endswith("AlternateName"): alternate_name = child.text except AttributeError: continue return alternate_name def get_cadence_context(cadence: str) -> Union[str, None]: """ Returns a more human friendly explanation of the ISO 8601 formatted value found in the TemporalDescription:Cadence field in SPASE. :param cadence: The value found in the Cadence field of the TemporalDescription section :returns: A string description of what this value represents/means. """ # takes cadence/repeatFreq and returns an explanation for what it means # ISO 8601 Format = PTHH:MM:SS.sss # P1D, P1M, and P1Y represent time cadences of one day, one month, and one year, respectively context = "The time series is periodic with a " if cadence is not None: start, _, end = cadence.partition("P") # cadence is in hrs, min, or sec if "T" in end: start, _, time_str = end.partition("T") if "H" in time_str: # hrs start, _, end = time_str.partition("H") context += start + " hour cadence" elif "M" in time_str: # min start, _, end = time_str.partition("M") context += start + " minute cadence" elif "S" in time_str: # sec start, _, end = time_str.partition("S") context += start + " second cadence" # one of the 3 base cadences else: if "D" in end: # days start, _, end = end.partition("D") context += start + " day cadence" elif "M" in end: # months start, _, end = end.partition("M") context += start + " month cadence" elif "Y" in end: # yrs start, _, end = end.partition("Y") context += start + " year cadence" if context == "The time series is periodic with a ": context = None return context def get_mentions( metadata: etree.ElementTree, file: str, **kwargs: dict ) -> Union[List[Dict], Dict, None]: """ Scrapes any AssociationIDs with the AssociationType "Other" and formats them as dictionaries using the get_relation function. :param metadata: The SPASE metadata object as an XML tree. :param file: The file path of the SPASE record being scraped. :param **kwargs: Allows for additional parameters to be passed (only to be used for testing). :returns: The ID's of other SPASE records related to this one in some way. """ # Mapping: schema:mentions = spase:Association/spase:AssociationID # (if spase:AssociationType is "Other") # schema:mentions found at https://schema.org/mentions root = metadata.getroot() desired_root = None for elt in root.iter(tag=etree.Element): if ( elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData") or elt.tag.endswith("Collection") ): desired_root = elt mentions = get_relation(desired_root, ["Other"], file, **kwargs) return mentions def get_is_part_of( metadata: etree.ElementTree, file: str, **kwargs: dict ) -> Union[List[Dict], Dict, None]: """ Scrapes any AssociationIDs with the AssociationType "PartOf" and formats them as dictionaries using the get_relation function. :param metadata: The SPASE metadata object as an XML tree. :param file: The file path of the SPASE record being scraped. :param **kwargs: Allows for additional parameters to be passed (only to be used for testing). :returns: The ID(s) of the larger resource this SPASE record is a portion of, as a dictionary. """ # Mapping: schema:isBasedOn = spase:Association/spase:AssociationID # (if spase:AssociationType is "PartOf") # schema:isPartOf found at https://schema.org/isPartOf root = metadata.getroot() desired_root = None for elt in root.iter(tag=etree.Element): if ( elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData") or elt.tag.endswith("Collection") ): desired_root = elt is_part_of = get_relation(desired_root, ["PartOf"], file, **kwargs) return is_part_of def get_orcid_and_affiliation(spase_id: str, file: str) -> tuple[str, str, str]: """ Uses the given PersonID to scrape the ORCiD and affiliation (and its ROR ID if provided) associated with this contact. :param spase_id: The SPASE ID linking the page with the Person's or Repository's info. :param file: The absolute path of the original xml file scraped. :returns: The ORCiD ID and organization name (with its ROR ID, if found) this Contact is affiliated with, as strings. """ # takes spase_id and follows its link to get ORCIdentifier, OrganizationName, and RORIdentifier orcid_id = "" affiliation = "" ror = "" desired_root = None if file: file = file.replace("\\", "/") if (spase_id is not None) and (file is not None): # get home directory home_dir = str(Path.home()).replace("\\", "/") # get current working directory cwd = str(Path.cwd()).replace("\\", "/") # split record into needed substrings if "src/soso/strategies/spase/" in file: abs_path, _, after = file.partition("src/soso/strategies/spase/") else: _, abs_path, after = file.partition(f"{home_dir}/") repo_name, _, after = after.partition("/") # add original SPASE repo to log file that holds name of repos needed update_log(cwd, repo_name, "requiredRepos") # add SPASE repo that contains Person descriptions to log file also repo_name, _, after = spase_id.replace("spase://", "").partition("/") update_log(cwd, repo_name, "requiredRepos") # format record name if "src/soso/strategies/spase/" in file: # being called by testing function = change directory to xml file in tests folder *_, file_name = spase_id.rpartition("/") record = abs_path + "tests/data/spase/" + f"spase-{file_name}" + ".xml" # to ensure correct file path used for those not found in tests/data # comment these lines out if using snapshot creation script if not os.path.isfile(record): if "soso-spase/" in file: abs_path, _, _ = file.partition("soso-spase/") record = abs_path + spase_id.replace("spase://", "") + ".xml" else: record = abs_path + spase_id.replace("spase://", "") + ".xml" record = record.replace("'", "") if os.path.isfile(record): test_spase = SPASE(record) root = test_spase.metadata.getroot() # iterate thru xml to get desired info for elt in root.iter(tag=etree.Element): if elt.tag.endswith("Person") or elt.tag.endswith("Repository"): desired_root = elt for child in desired_root.iter(tag=etree.Element): if child.tag.endswith("ORCIdentifier"): orcid_id = child.text elif child.tag.endswith("OrganizationName"): affiliation = child.text elif child.tag.endswith("RORIdentifier"): ror = child.text else: # add file to log containing problematic records/files if os.path.exists(temp_file_path): temp_file.seek(0) if temp_file.read(): temp_file.write(f", {record}") else: temp_file.write(f"{record}") return orcid_id, affiliation, ror def get_temporal(metadata: etree.ElementTree, namespaces: Dict) -> Union[List, None]: """ Scrapes the TemporalDescription:Cadence field in SPASE for use in the schema.org temporal property. :param metadata: The SPASE metadata object as an XML tree. :param namespaces: The SPASE namespaces used in the form of a dictionary. :returns: The cadence or common time interval between the start of successive measurements, given in its ISO 8601 formatting as well as a explanation sentence. """ # Mapping: schema:temporal = spase:TemporalDescription/spase:Cadence # Each object is: # [ explanation (string explaining meaning of cadence), Cadence] # Schema found at https://schema.org/temporal root = metadata.getroot() desired_root = None for elt in root.iter(tag=etree.Element): if elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData"): desired_root = elt desired_tag = desired_root.tag.split("}") spase_location = ( ".//spase:" + f"{desired_tag[1]}/spase:TemporalDescription/spase:Cadence" ) repeat_frequency = metadata.findtext( spase_location, namespaces=namespaces, ) explanation = "" if repeat_frequency: explanation = get_cadence_context(repeat_frequency) temporal = [explanation, repeat_frequency] else: temporal = None return delete_null_values(temporal) def get_metadata_license(metadata: etree.ElementTree) -> Union[str, None]: """ :param metadata: The metadata object as an XML tree. :returns: The metadata license(s) of the SPASE record. """ """<MetadataRightsList> <Rights> <SchemeURI>https://spdx.org/licenses/</SchemeURI> <RightsIdentifierScheme>SPDX</RightsIdentifierScheme> <RightsIdentifier>CC0-1.0</RightsIdentifier> <RightsURI>https://spdx.org/licenses/CC0-1.0.html</RightsURI> <RightsName>Creative Commons Zero v1.0 Universal</RightsName> <Note>CC0 1.0 Universal is the Creative Commons license applicable to all publicly available SPASE metadata descriptions</Note> </Rights> </MetadataRightsList>""" metadata_license = [] desired_root = None root = metadata.getroot() for elt in root.iter(tag=etree.Element): if elt.tag.endswith("MetadataRightsList"): desired_root = elt if desired_root is not None: for elt in desired_root.iter(tag=etree.Element): if elt.tag.endswith("Rights"): target_child = elt for child in target_child: if child.tag.endswith("RightsName"): metadata_license.append(child.text) if not metadata_license: metadata_license = None else: metadata_license = None return metadata_license def process_authors( author: List, author_role: List, contacts_list: Dict, file="PlaceholderText" ) -> tuple[List, List, Dict]: """ Groups any contact names from the SPASE Contacts container with their matching names, if found, in PubInfo:Authors, and adds any additional author roles (such as PI) to their corresponding entry in the author_roles list. Any contact with an author role not listed in PubInfo:Authors is added to the contacts_list with the rest of the non-matching contacts for use in get_contributors. :param author: The list of names found in SPASE record to be used in get_creator :param author_role: The list of roles associated with each person found in author list :param contacts_list: The dictionary containing the names of people considered to be authors as formatted in the Contacts container in the SPASE record, as well as their roles :returns: The updated author, author_roles, and contacts_list items after merging any author roles from Contacts with the roles associated with them if found in PubInfo. """ # loop thru all contacts to find any that match authors, unless no PubInfo was found # if matches found, add roles to author_roles and remove them from contacts_list # if no match found for person(s), leave in contacts_list for use in get_contributors author_str = str(author).replace("[", "").replace("]", "") if file: file = file.replace("\\", "/") # if creators were found in Contact/PersonID (no PubInfo) # remove author roles from contacts_list so not duplicated in contributors # (since already in author list) if "Person/" in author_str: contacts_copy = {} for person, val in contacts_list.items(): contacts_copy[person] = [] for role in val: # if role is not considered for author, add to acceptable roles # list for use in contributors if ( ("PrincipalInvestigator" not in role) and ("PI" not in role) and ("CoInvestigator" not in role) and ("Author" not in role) ): contacts_copy[person].append(role) # if no acceptable roles were found, remove that author from contributor consideration if contacts_copy[person] == []: contacts_copy.pop(person) return author, author_role, contacts_copy # if all creators were found in PublicationInfo/Authors else: # determine if authors are a consortium with open( importlib.resources.files("soso.strategies.spase").joinpath( "spase-ignoreCreatorSplit.txt" ), "r", encoding="utf-8", ) as f: do_not_split = f.read() # if file is not in list of ones to not have their creators split # and there are multiple authors if ( ( ("; " in author_str) or ("., " in author_str) or (" and " in author_str) or (" & " in author_str) ) and (file not in do_not_split) and ("Consortium" not in author_str) ): if ";" in author_str: author = author_str.split("; ") elif ".," in author_str: author = author_str.split("., ") elif " and " in author_str: author = author_str.split(" and ") else: author = author_str.split(" & ") # fix num of roles while len(author_role) < len(author): author_role += ["Author"] # get rid of extra quotations for num, each in enumerate(author): if "'" in each: author[num] = each.replace("'", "") # iterate over each person in author string for person in author: matching_contact = None index = author.index(person) # if first name doesnt have a period, check if it is an initial if not person.endswith("."): # if first name is an initial w/o a period, add one grp = re.search(r"[\.\s]{1}[\w]{1}$", person) if grp is not None: person += "." # remove 'and' from name if "and " in person: person = person.replace("and ", "") # continued formatting fixes if ", " in person: family_name, _, given_name = person.partition(", ") else: given_name, _, family_name = person.partition(". ") given_name += "." if "," in given_name: given_name = given_name.replace(",", "") # iterate thru contacts to find one that matches the current person for contact in contacts_list.keys(): if matching_contact is None: initial = None first_name, _, last_name = contact.rpartition(".") first_name, _, initial = first_name.partition(".") *_, first_name = first_name.rpartition("/") if len(first_name) == 1: first_name = first_name[0] + "." # Assumption: if first name initial, middle initial, and last name # match = same person # remove <f"{first_name[0]}."> in the lines below if this assumption # is no longer accurate # if no middle name if not initial: if ( (f"{first_name[0]}." in person) or (first_name in person) ) and (last_name in person): matching_contact = contact # if middle name is not initialized, check whole string elif len(initial) > 1: if ( ( (f"{first_name[0]}." in person) or (first_name in person) ) and (initial in person) and (last_name in person) ): matching_contact = contact else: if ( ( (f"{first_name[0]}." in person) or (first_name in person) ) and (f"{initial}." in person) and (last_name in person) ): matching_contact = contact # if match is found, add role to author_role and replace role with formatted # person name in contacts_list if matching_contact is not None: if author_role[index] != contacts_list[matching_contact]: author_role[index] = [author_role[index]] + contacts_list[ matching_contact ] if not initial: contacts_list[matching_contact] = f"{last_name}, {first_name}" elif len(initial) > 1: contacts_list[matching_contact] = ( f"{last_name}, {first_name} {initial}" ) else: contacts_list[matching_contact] = ( f"{last_name}, {first_name} {initial}." ) author[index] = (f"{family_name}, {given_name}").strip() # if there is only one author listed or file has consortium else: matching_contact = None # get rid of extra quotations person = author_str.replace('"', "") person = author_str.replace("'", "") if author_role == ["Author"]: # if author is a person (assuming names contain a comma) if ( (", " in person) and (file not in do_not_split) and ("Consortium" not in person) ): family_name, _, given_name = person.partition(", ") # also used when there are 3+ comma separated orgs # listed as authors - not intended (how to fix?) if "," in given_name: given_name = given_name.replace(",", "") # iterate thru contacts to find one that matches the current person contacts_list, author_role = find_match( contacts_list, person, author_role ) author[0] = (f"{family_name}, {given_name}").strip() else: # handle case when assumption 'names have commas' fails if ( (". " in person) and (file not in do_not_split) and ("Consortium" not in person) ): given_name, _, family_name = person.partition(". ") if " " in family_name: initial, _, family_name = family_name.partition(" ") given_name = given_name + ". " + initial[0] + "." # iterate thru contacts to find one that matches the current person contacts_list, author_role = find_match( contacts_list, person, author_role ) author[0] = (f"{family_name}, {given_name}").strip() # author is an organization, so no splitting is needed else: author[0] = person.strip() return author, author_role, contacts_list def _is_spase_metadata_host(url: str) -> bool: """ Return True if the URL's hostname is spase-metadata.org or a subdomain of it. """ parsed = urlparse(url) host = parsed.hostname if not host: return False return host == "spase-metadata.org" or host.endswith(".spase-metadata.org") def verify_type(url: str) -> tuple[bool, bool, dict]: """ Verifies that the link found in AssociationID is to a dataset or journal article and acquires more information if a dataset is not hosted by NASA. :param url: The link provided as an Associated work/reference for the SPASE record :returns: Boolean values signifying if the link is a Dataset/ScholarlyArticle. Also a dictionary with additional info about the related Dataset acquired from DataCite API if it is not hosted by NASA. """ # tests SPASE records to make sure they are datasets or a journal article is_dataset = False is_article = False non_spase_info = {} if url is not None: if _is_spase_metadata_host(url): if "Data" in url: is_dataset = True # case where url provided is a DOI else: link = requests.head(url, timeout=30) # check to make sure doi resolved to an spase-metadata.org page location = link.headers.get("location", "") parsed = urlparse(location) host = parsed.hostname if host == "spase-metadata.org": if "Data" in location: is_dataset = True # if not, call DataCite API to check resourceTypeGeneral # property associated w the record else: *_, doi = url.partition("doi.org/") # dataciteLink = f"https://api.datacite.org/dois/{doi}" # headers = {"accept": "application/vnd.api+json"} # response = requests.get(dataciteLink, headers=headers) response = requests.get( f"https://api.datacite.org/application/vnd.datacite.datacite+json/{doi}", timeout=30, ) if response.raise_for_status() is None: datacite_dict = json.loads(response.text) if "resourceType" in datacite_dict["types"].keys(): if datacite_dict["types"]["resourceType"]: if datacite_dict["types"]["resourceType"] == "Dataset": is_dataset = True elif ( datacite_dict["types"]["resourceType"] == "JournalArticle" ): is_article = True else: if ( datacite_dict["types"]["resourceTypeGeneral"] == "Dataset" ): is_dataset = True elif ( datacite_dict["types"]["resourceTypeGeneral"] == "JournalArticle" ): is_article = True else: if datacite_dict["types"]["resourceTypeGeneral"] == "Dataset": is_dataset = True elif ( datacite_dict["types"]["resourceTypeGeneral"] == "JournalArticle" ): is_article = True # if wish to add more checks, simply add more "elif" stmts like above # and adjust provenance/relationship functions to include new type check if is_dataset: # grab name, description, license, and creators non_spase_info["name"] = datacite_dict["titles"][0]["title"] if datacite_dict["descriptions"]: non_spase_info["description"] = datacite_dict[ "descriptions" ][0]["description"] else: non_spase_info["description"] = ( f"No description currently available for {url}." ) if datacite_dict["rightsList"]: non_spase_info["license"] = [] for each in datacite_dict["rightsList"]: non_spase_info["license"].append(each["rightsUri"]) for creator in datacite_dict["creators"]: if ("givenName" in creator.keys()) and ( "familyName" in creator.keys() ): family_name = creator["familyName"] given_name = creator["givenName"] elif ", " in creator["name"]: family_name, _, given_name = creator["name"].partition( ", " ) else: family_name = "" given_name = "" # adjust DataCite format to conform to schema.org format if creator["affiliation"]: non_spase_info["creators"] = person_format( "creator", "", creator["name"], given_name, family_name, creator["affiliation"]["name"], ) else: non_spase_info["creators"] = person_format( "creator", "", creator["name"], given_name, family_name, ) return is_dataset, is_article, non_spase_info def get_resource_id(metadata: etree.ElementTree, namespaces: Dict) -> Union[str, None]: """ :param metadata: The SPASE metadata object as an XML tree. :param namespaces: The SPASE namespaces used in the form of a dictionary. :returns: The ResourceID for the SPASE record. """ root = metadata.getroot() desired_root = None dataset_id = None for elt in root.iter(tag=etree.Element): if ( elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData") or elt.tag.endswith("Observatory") or elt.tag.endswith("Instrument") or elt.tag.endswith("Person") or elt.tag.endswith("Collection") or elt.tag.endswith("Catalog") ): desired_root = elt desired_tag = desired_root.tag.split("}") spase_location = ".//spase:" + f"{desired_tag[1]}/spase:ResourceID" dataset_id = metadata.findtext(spase_location, namespaces=namespaces) return dataset_id def get_measurement_method( metadata: etree.ElementTree, namespaces: Dict ) -> Union[List, None]: """ Scrapes all measurementType fields found in the SPASE record and maps them to the schema.org property measurementMethod. :param metadata: The SPASE metadata object as an XML tree. :param namespaces: The SPASE namespaces used in the form of a dictionary. :returns: The MeasurementType(s) for the SPASE record. """ # Mapping: schema:measurementMethod = spase:MeasurementType # schema:measurementMethod found at https://schema.org/measurementMethod measurement_method = [] desired_root = None root = metadata.getroot() for elt in root.iter(tag=etree.Element): if elt.tag.endswith("NumericalData") or elt.tag.endswith("DisplayData"): desired_root = elt desired_tag = desired_root.tag.split("}") spase_location = ".//spase:" + f"{desired_tag[1]}/spase:MeasurementType" all_measures = metadata.findall(spase_location, namespaces=namespaces) for item in all_measures: # Split string on uppercase characters res = re.split(r"(?=[A-Z])", item.text) # Remove empty strings and join with space pretty_name = " ".join(filter(None, res)) # most basic entry for measurementMethod entry = { "@type": "DefinedTerm", "inDefinedTermSet": { "@id": "https://spase-group.org/data/model/spase-latest/spase-latest_xsd" + ".htm#MeasurementType" }, "name": pretty_name, "termCode": item.text, } # if this is the first item added, add additional info for DefinedTermSet if all_measures.index(item) == 0: entry["inDefinedTermSet"]["@type"] = "DefinedTermSet" entry["inDefinedTermSet"]["name"] = "SPASE MeasurementType" entry["inDefinedTermSet"]["url"] = ( "https://spase-group.org/data/model/spase-latest/spase-latest_xsd." "htm#MeasurementType" ) measurement_method.append(entry) if len(measurement_method) == 0: measurement_method = None elif len(measurement_method) == 1: measurement_method = measurement_method[0] return measurement_method def get_relation( desired_root: etree.Element, association: list[str], file="", **kwargs: dict ) -> Union[List[Dict], Dict, None]: """ Scrapes through the SPASE record and returns the AssociationIDs which have the given AssociationType. These are formatted as dictionaries and use the verify_type function to add the correct type to each entry. :param desired_root: The element in the SPASE metadata tree object we are searching from. :param association: The AssociationType(s) we are searching for in the SPASE record. :param file: The file path of the SPASE record being converted. :param **kwargs: Allows for additional parameters to be passed (only to be used for testing). :returns: The ID's of other SPASE records related to this one in some way. """ relations = [] assoc_id = "" assoc_type = "" relational_records = {} if file: file = file.replace("\\", "/") # iterate thru xml to find desired info if desired_root is not None: for child in desired_root.iter(tag=etree.Element): if child.tag.endswith("Association"): target_child = child for child in target_child: if child.tag.endswith("AssociationID"): assoc_id = child.text elif child.tag.endswith("AssociationType"): assoc_type = child.text for each in association: if assoc_type == each: relations.append(assoc_id) if not relations: relation = None else: i = 0 # try and get DOI instead of SPASE ID for record in relations: # get home directory home_dir = str(Path.home()) home_dir = home_dir.replace("\\", "/") # get current working directory cwd = str(Path.cwd()).replace("\\", "/") # add SPASE repo that contains related SPASE record to log file repo_name, _, _ = record.replace("spase://", "").partition("/") update_log(cwd, repo_name, "requiredRepos") # format record if ("src/soso/strategies/spase/" in file) or kwargs: # being called by test function = change directory to xml file in tests folder *_, file_name = record.rpartition("/") if "src/soso/strategies/spase/" in file: # if called by snapshot creation script if "soso-spase/" in file: record = ( f"{home_dir}/soso-spase/" + "tests/data/spase/" + f"spase-{file_name}" + ".xml" ) # being called by CI workflow else: abs_path, _, _ = file.partition( "src/soso/strategies/spase/" ) record = ( f"{abs_path}" + "tests/data/spase/" + f"spase-{file_name}" + ".xml" ) # print(record) else: record = home_dir + "/" + record.replace("spase://", "") + ".xml" record = record.replace("'", "") if os.path.isfile(record): test_spase = SPASE(record) url = test_spase.get_url() name = test_spase.get_name() description = test_spase.get_description() spase_license = test_spase.get_license() # to ensure snapshot matches when running in local env # uncomment if creating snapshot # if "soso-spase" in file: # creators = test_spase.get_creator( # **{"placeholder": "so that snapshot matches"} # ) # else: creators = test_spase.get_creator() if creators is None: creators = "No creators were found. View record for contacts." relational_records[url] = { "name": name, "description": description, "creators": creators, } if spase_license is not None: relational_records[url]["license"] = spase_license else: if os.path.exists(temp_file_path): temp_file.seek(0) if temp_file.read(): temp_file.write(f", {record}") else: temp_file.write(f"{record}") i += 1 # add correct type if len(relations) > 1: relation = [] # not SPASE records if not relational_records: for each in relations: if "spase" not in each: # most basic entry into relation entry = {"@id": each, "identifier": each, "url": each} is_dataset, is_article, non_spase_info = verify_type(each) if is_dataset: entry["@type"] = "Dataset" entry["name"] = non_spase_info["name"] entry["description"] = non_spase_info["description"] if "license" in non_spase_info.keys(): entry["license"] = non_spase_info["license"] entry["creator"] = non_spase_info["creators"] elif is_article: entry["@type"] = "ScholarlyArticle" if len(relations) > 1: relation.append(entry) else: relation = entry else: for each in relational_records.keys(): # most basic entry into relation entry = {"@id": each, "identifier": each, "url": each} is_dataset, is_article, non_spase_info = verify_type(each) if is_dataset: entry["@type"] = "Dataset" entry["name"] = relational_records[each]["name"] entry["description"] = relational_records[each]["description"] if "license" in relational_records[each].keys(): entry["license"] = relational_records[each]["license"] entry["creator"] = relational_records[each]["creators"] elif is_article: entry["@type"] = "ScholarlyArticle" if len(relations) > 1: relation.append(entry) else: relation = entry else: relation = None return relation def update_log(cwd: str, addition: str, log_file_name: str) -> None: """ Updates a log file with the given addition. Log files currently updated using this method are one containing the SPASE repositories needed for the metadata conversion to work as intended and another containing all of the SPASE records that could not be accessed. :param cwd: The current working directory of your workstation. :param addition: The addition to the log file, such as the name of the repository needed to access the SPASE record or the SPASE record itself. """ if (cwd is not None) and (addition is not None): # create test requiredRepos.txt file for testing suite if os.path.isfile(f"{cwd}/{log_file_name}.txt"): """with open(f"{cwd}/{log_file_name}.txt", "w", encoding="utf-8") as f: f.write("This is placeholder text.")""" with open(f"{cwd}/{log_file_name}.txt", "r", encoding="utf-8") as f: text = f.read() if addition not in text: with open(f"{cwd}/{log_file_name}.txt", "a", encoding="utf-8") as f: f.write(f"\n{addition}") def make_trial_start_and_stop( temp_covg: Union[str, Dict], ) -> Union[tuple[str, str], None]: """ Creates a test end time for the dataset based on the TemporalDescription found in the SPASE record. Returns two sentences describing the start and stop times for use in the description(s) for datasets with HAPI links. :param temp_covg: The value returned from the get_temporal_coverage function :returns: Two sentence descriptions of the start and (newly created) trial stop times """ if temp_covg: start_sent = "" end_sent = "" if isinstance(temp_covg, str): start, _, end = temp_covg.partition("/") else: start, _, end = temp_covg["temporalCoverage"].partition("/") # create test end time date, _, time_str = start.partition("T") time_str = time_str.replace("Z", "") if "." in time_str: substring2 = time_str.split(".", 1) time_str = substring2[0] dt_string = date + " " + time_str dt_obj = datetime.strptime(dt_string, "%Y-%m-%d %H:%M:%S") # make test stop time 1 minute after start time test_end = dt_obj + timedelta(minutes=1) test_end = str(test_end).replace(" ", "T") # set test_end as end time if no end time found in record if end in ("", ".."): end = test_end else: end_sent = f"Data is available up to {end}. " end_sent += f"Use {test_end} as a test end value." start_sent = f"Use {start} as default value." else: start_sent = None end_sent = None return start_sent, end_sent def find_match( contacts_list: dict, person: str, author_role: list, matching_contact: bool = None ) -> tuple[dict, list]: """ Attempts to find a match in the provided dictionary of contacts (with their roles) found in the SPASE record to the given person name. If a match is found, that role is added to corresponding entry in the given list of author roles, and, in the dictionary of contacts, the role value is replaced with the formatted person name. :param contacts_list: The dictionary containing the contacts found in the SPASE record as keys and their roles as values. :param person: The string containing the name of the person you wish to find a match for. :param author_role: The list of author roles. :param matching_contact: The string containing the contact from the contacts_list parameter that matches the person parameter :returns: The updated versions of the given dictionary of contacts and list of author roles. """ if contacts_list and person and author_role: for contact in contacts_list.keys(): if matching_contact is None: initial = None first_name, _, last_name = contact.rpartition(".") first_name, _, initial = first_name.partition(".") *_, first_name = first_name.rpartition("/") if len(first_name) == 1: first_name = first_name[0] + "." # Assumption: if first name initial, middle initial, and last name # match = same person # remove <f"{first_name[0]}."> in the lines below if this assumption # is no longer accurate # if no middle name if not initial: if ((f"{first_name[0]}." in person) or (first_name in person)) and ( last_name in person ): matching_contact = contact # if middle name is not initialized, check whole string elif len(initial) > 1: if ( ((f"{first_name[0]}." in person) or (first_name in person)) and (initial in person) and (last_name in person) ): matching_contact = contact else: if ( ((f"{first_name[0]}." in person) or (first_name in person)) and (f"{initial}." in person) and (last_name in person) ): matching_contact = contact # if match is found, add role to author_role and replace role with # formatted person name in contacts_list if matching_contact is not None: if author_role[0] != contacts_list[matching_contact]: author_role[0] = [author_role[0]] + contacts_list[matching_contact] if not initial: contacts_list[matching_contact] = f"{last_name}, {first_name}" elif len(initial) > 1: contacts_list[matching_contact] = f"{last_name}, {first_name} {initial}" else: contacts_list[matching_contact] = ( f"{last_name}, {first_name} {initial}." ) return contacts_list, author_role def get_problematic_records() -> str: """Saves input from various functions to the temp file containing problematic records found during script, closes the file, and returns the content.""" problematic_records = "" if os.path.exists(temp_file_path): temp_file.seek(0) problematic_records = temp_file.read() # print("Records are: " + problematic_records) temp_file.close() # Close and remove the temp file object return problematic_records