From 4fc6257be958a606ea0b60ea572bd4efaf9ab5ba Mon Sep 17 00:00:00 2001 From: luabida Date: Mon, 13 Jun 2022 21:22:09 -0300 Subject: [PATCH] Prepare TTrack to receive external func calls --- .gitignore | 2 +- invoicex/ttrack.py | 127 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 103 insertions(+), 26 deletions(-) diff --git a/.gitignore b/.gitignore index bfad38e..efb3ee1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ *.env* __pycache__ .vscode/ -data_test/.timetrackdb +data_test/.timetrackdb* diff --git a/invoicex/ttrack.py b/invoicex/ttrack.py index e98f534..695bea6 100644 --- a/invoicex/ttrack.py +++ b/invoicex/ttrack.py @@ -1,34 +1,111 @@ -""" import sqlite3 -from humanfriendly import format_timespan +from typing import Any +import pandas as pd +import datetime as dt +import os -TTRACK_DB = 'data_test/.timetrackdb' +TTRACK_DB = "data_test/.timetrackdb" -def _get_list_of_tasks_entries_with_time_in_seconds(sql_database): - conn = sqlite3.connect(sql_database) - cur = conn.cursor() - entries_with_seconds = [] - for row in cur.execute(f"SELECT name, (end - start) FROM tasks AS T" - " INNER JOIN tasklog AS L ON T.id=L.task" - " ORDER BY name"): - entries_with_seconds.append(row) - return entries_with_seconds +class TTrack: + def __init__(self, timetrackdb_file) -> os.path: + self.timetrackdb = timetrackdb_file -def _isolate_same_tasks_in_list(): - entries = _get_list_of_tasks_entries_with_time_in_seconds(TTRACK_DB) - same = {t:0 for t, s in entries} - for task, secs in entries: - same[task] += secs - result = list(map(tuple, same.items())) - return result + def _conn_point(self): + conn = sqlite3.connect(self.timetrackdb) + cur = conn.cursor() + return cur -def _return_index_of_tasks_in_hours(): - tasks = _isolate_same_tasks_in_list() - for t, s in tasks: - print(f'{t} ---- {format_timespan(s)}') + def _get_query(self, task=None): + if task is None: + return ( + "SELECT name, start, end FROM tasks AS T" + " INNER JOIN tasklog AS L ON T.id=L.task" + " ORDER BY start" + ) + if ", " in task: + tasks = task.split(", ") + other = [] + for x in tasks[1:]: + other.append(f" OR name='{x}'") + return ( + "SELECT name, start, end FROM tasks AS T" + " INNER JOIN tasklog AS L ON T.id=L.task" + f" WHERE name='{tasks[0]}'" + f" {''.join(other)}" + " ORDER BY start" + ) + else: + return ( + "SELECT name, start, end FROM tasks AS T" + " INNER JOIN tasklog AS L ON T.id=L.task" + f" WHERE name='{task}'" + " ORDER BY start" + ) + def _get_list_of_tasks_entries_in_timestamp(self): + cur = self._conn_point() + entries_in_timestamp = [] + for row in cur.execute( + self._get_query() + ): # TODO Externalize _get_query() to receive params from func call + entries_in_timestamp.append(row) + return entries_in_timestamp -_return_index_of_tasks_in_hours() + def _format_unix_time_to_datetime_obj_in_lists(self): + entries = self._get_list_of_tasks_entries_in_timestamp() + list_of_entries_with_formated_date = [] + for task, start, end in entries: + start_f = dt.datetime.fromtimestamp(start) + end_f = dt.datetime.fromtimestamp(end) + list_of_entries_with_formated_date.append([task, start_f, end_f]) + return list_of_entries_with_formated_date -""" + def _prepare_dataframe(self): + raw_data = self._format_unix_time_to_datetime_obj_in_lists() + data = [] + for task, start, end in raw_data: + time_worked = end - start + task_dict = { + "task": task, + "date": start.strftime("%Y-%m-%d"), + "time_worked": time_worked, + } + data.append(task_dict) + df = pd.DataFrame(data=data).sort_values(["date"]) + return df + + def _filter_by_month(self, year_month=None): + df = self._prepare_dataframe() + if year_month is None: + return df + else: + return df[df["date"].str.contains(str(year_month))] + + def _group_tasks_remove_duplicates(self, v): + tasks = v.to_string(index=False).split() + unique_tasks = set(tasks) + for t in unique_tasks: + return ", ".join(str(t) for t in unique_tasks) + + def _group_time_and_sum(self, v): + return v.sum() + + def _group_tasks_and_time(self): + df = self._filter_by_month( + "2022-06" + ) # TODO Externalize _filter_by_month to the same function that _get_query() + print(df) + df_grouped = df.groupby("date").aggregate( + lambda v: self._group_tasks_remove_duplicates(v) + if v.name == "task" + else self._group_time_and_sum(v) + ) + return df_grouped + + def __call__(self, *args: Any, **kwds: Any) -> Any: + return self._group_tasks_and_time() + + +# e = TTrack(TTRACK_DB) +# print(e())