Source code for finds.unstructured.unstructured

"""Classes for unstructured and textual datasets

Copyright 2022, Terence Lim

MIT License
"""
from typing import Dict, Iterable, List, Any, Tuple
import re
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from functools import reduce
from finds.database.mongodb import MongoDB, parse_where
_VERBOSE = 1

[docs]class Unstructured(object): """Base class for unstructured datasets Args: mongod: connection to MongoClient where data collection is stored database: name of the database in MongoDB Attributes: db : pymongo.database.Database connection Examples: >>> fomc = Unstructured(mongodb, 'fomc') # connect to client named 'fomc' >>> fomc.show() >>> fomc.select('minutes', where_clause) >>> fomc.delete('minutes', where_clause) >>> fomc.insert('minutes', doc) >>> fomc['minutes'].estimated_document_count() # count docs in collection >>> fomc['minutes', 'field'] Notes: - sudo apt-get install -y mongodb-org # install latest community version - sudo systemctl start mongod # start and stop mongodb server - sudo systemctl status mongod - sudo systemctl restart mongod - sudo systemctl stop mongod """ def __init__(self, mongodb: MongoDB, database: str): self.mongodb = mongodb self.database = database self.db = mongodb.client[database] # make Database operations available #self._c = self.c #self.collections = self._c
[docs] def __getitem__(self, args: Tuple | Any) -> Any: """Access a collection by name, or optionally by field""" if isinstance(args, tuple): return self.get(*args) else: return self.db[args]
[docs] def delete(self, collection: str, where: str | Dict | List) -> int: """Delete all docs in collection satisfying where clause Args: collection: name of collection in database to delete where: where clause describing documents to delete Returns: number of documents deleted, -1 if collection not in database Notes: - str filter (passed on directly to pymongo) - dict of {keys:values} - list of key names (to delete if key name $exists) """ if collection not in self.db.list_collection_names(): return -1 result = self[collection].delete_many(parse_where(where)) return result.deleted_count
[docs] def insert(self, collection: str, doc: Dict, keys: List[str] = []): """Insert one doc; optionally remove existing duplicate document first Args: collection: name of collection in database to insert into doc: dict of {key:value} representing document keys: list of field names, to delete existing docs with same values Returns: number of existing documents (with same key values) deleted """ deleted = self.delete(collection, keys) if [] else 0 self[collection].insert_one({k:v for k,v in doc.items() if k != '_id'}) return deleted
[docs] def get(self, collection: str, field: str) -> Any: """Return value of field of first doc containing key field name Args: collection: name of collection in database to retrieve from field: key field name Returns: value of key field of first document where key field name exists """ return self[collection].find_one({field : {'$exists' : True}})[field]
[docs] def select(self, collection, where: str | List | Dict = [], include_id: bool = False) -> List: """Iterator to retrieve docs in collection satisfying where clause Args: collection: Name of collection in database to delete where: Where clause describing documents to retrieve include_id: If True, then include _id field in return Returns: Document selecting where clause in a list of dict """ include_id = dict() if include_id else {'projection': {'_id': 0}} return self[collection].find(parse_where(where), **include_id)
[docs] def show(self, collection: str = ''): """Return list of collections; or key names in all docs in collection""" if not collection: #: self[c].index_information() return {c for c in self.db.list_collection_names()} else: return reduce(lambda all_keys, rec_keys: all_keys | set(rec_keys), map(lambda d: d.keys(), self[collection].find()), set())
[docs] def load_dataframe(self, collection: str, df: DataFrame, keys: List[str] = [], update: bool = False): """Insert_many records from rows of dataframe to a collection Args: collection: Name of collection in database to delete df: Each row of DataFrame is document, column names as key fields keys: Fields names to update or replace if same values update: If key fields have same value, update if True. Else replace """ if not keys: self[collection].insert_many(df.to_dict(orient='records')) else: for doc in df.to_dict(orient='records'): if update: self[collection].update_one({k: doc[k] for k in keys}, doc, upsert=True) else: self[collection].replace_one({k: doc[k] for k in keys}, doc, upsert=True)
if __name__ == "__main__": from pathlib import Path import time import csv import io from finds.database import MongoDB from secret import credentials, paths def update_situations(csvfile: str, sep: str = '\t'): """Helper method to parse situations text from key developments file Args: csvfile: name of delimited text file, may be .gz sep: delimiter used in input file Returns: DataFrame with overflowing 'situation' text corrected, and unique 'keydevid' """ tic = time.time() open_ = gzip.open if csvfile.endswith('.gz') else open with open_(csvfile, mode = "rt", encoding="latin-1") as f: lines = f.readlines() # "ISO-8859-1" "latin-1") nsep = lines[0].count(sep) # infer number of delimiters for i in range(len(lines)-1, 0, -1): # merge overflow text from end lines[i] = lines[i].encode('ascii', 'ignore').decode('ascii') if lines[i].count(sep) < nsep: lines[i-1] += lines[i] del lines[i] else: lines[i] = re.sub('\n', ' ', re.sub('\x1a', ' ', lines[i])) print(round(time.time() - tic, 0), 'secs', len(lines), min([line.count('\t') for line in lines]), lines[0]) tic = time.time() df = DataFrame(data=list(csv.reader(io.StringIO("\n".join(lines[1:])), quotechar=None, delimiter=sep)), columns=lines[0].lower().rstrip().split(sep)) print(round(time.time() - tic, 0), 'secs', len(df), df.columns) df = df.sort_values(['keydevid', 'keydeveventtypeid'])\ .drop_duplicates(['keydevid']) # keep unique 'keydevid' df['keydevid'] = df['keydevid'].astype(int) df['keydeveventtypeid'] = df['keydeveventtypeid'].astype(int) df.index = np.arange(len(df)) return df.loc[:, ['keydeveventtypeid', 'keydevid', 'headline', 'situation']] def update_keydev(): # Sample code to read keydev situations text file keydev = Unstructured(mongodb, 'KeyDev') keydev['events'].create_index('keydevid', unique=True) downloads = os.path.join(paths['downloads'], 'stocks2020', 'PSTAT') for year in [2018, 2019]: tic = time.time() csvfile = os.path.join(downloads, f"situations{year}.txt.gz") df = read_situations(csvfile) keydev.load_dataframe('events', df) print(time.time() - tic) counts = Series({t: keydev['events']\ .count_documents({'keydeveventtypeid': t}) for t in keydev['events']\ .distinct('keydeveventtypeid')}) if False: mongodb = MongoDB() print("unstructured")