Skip to content

Commit

Permalink
Prepare TTrack to receive external func calls
Browse files Browse the repository at this point in the history
  • Loading branch information
luabida committed Jun 14, 2022
1 parent fdbc36f commit 4fc6257
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
*.env*
__pycache__
.vscode/
data_test/.timetrackdb
data_test/.timetrackdb*
127 changes: 102 additions & 25 deletions invoicex/ttrack.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,111 @@
"""
import sqlite3
from humanfriendly import format_timespan
from typing import Any
import pandas as pd
import datetime as dt
import os

TTRACK_DB = 'data_test/.timetrackdb'
TTRACK_DB = "data_test/.timetrackdb"

def _get_list_of_tasks_entries_with_time_in_seconds(sql_database):
conn = sqlite3.connect(sql_database)
cur = conn.cursor()
entries_with_seconds = []
for row in cur.execute(f"SELECT name, (end - start) FROM tasks AS T"
" INNER JOIN tasklog AS L ON T.id=L.task"
" ORDER BY name"):
entries_with_seconds.append(row)
return entries_with_seconds

class TTrack:
def __init__(self, timetrackdb_file) -> os.path:
self.timetrackdb = timetrackdb_file

def _isolate_same_tasks_in_list():
entries = _get_list_of_tasks_entries_with_time_in_seconds(TTRACK_DB)
same = {t:0 for t, s in entries}
for task, secs in entries:
same[task] += secs
result = list(map(tuple, same.items()))
return result
def _conn_point(self):
conn = sqlite3.connect(self.timetrackdb)
cur = conn.cursor()
return cur

def _return_index_of_tasks_in_hours():
tasks = _isolate_same_tasks_in_list()
for t, s in tasks:
print(f'{t} ---- {format_timespan(s)}')
def _get_query(self, task=None):
if task is None:
return (
"SELECT name, start, end FROM tasks AS T"
" INNER JOIN tasklog AS L ON T.id=L.task"
" ORDER BY start"
)
if ", " in task:
tasks = task.split(", ")
other = []
for x in tasks[1:]:
other.append(f" OR name='{x}'")
return (
"SELECT name, start, end FROM tasks AS T"
" INNER JOIN tasklog AS L ON T.id=L.task"
f" WHERE name='{tasks[0]}'"
f" {''.join(other)}"
" ORDER BY start"
)
else:
return (
"SELECT name, start, end FROM tasks AS T"
" INNER JOIN tasklog AS L ON T.id=L.task"
f" WHERE name='{task}'"
" ORDER BY start"
)

def _get_list_of_tasks_entries_in_timestamp(self):
cur = self._conn_point()
entries_in_timestamp = []
for row in cur.execute(
self._get_query()
): # TODO Externalize _get_query() to receive params from func call
entries_in_timestamp.append(row)
return entries_in_timestamp

_return_index_of_tasks_in_hours()
def _format_unix_time_to_datetime_obj_in_lists(self):
entries = self._get_list_of_tasks_entries_in_timestamp()
list_of_entries_with_formated_date = []
for task, start, end in entries:
start_f = dt.datetime.fromtimestamp(start)
end_f = dt.datetime.fromtimestamp(end)
list_of_entries_with_formated_date.append([task, start_f, end_f])
return list_of_entries_with_formated_date

"""
def _prepare_dataframe(self):
raw_data = self._format_unix_time_to_datetime_obj_in_lists()
data = []
for task, start, end in raw_data:
time_worked = end - start
task_dict = {
"task": task,
"date": start.strftime("%Y-%m-%d"),
"time_worked": time_worked,
}
data.append(task_dict)
df = pd.DataFrame(data=data).sort_values(["date"])
return df

def _filter_by_month(self, year_month=None):
df = self._prepare_dataframe()
if year_month is None:
return df
else:
return df[df["date"].str.contains(str(year_month))]

def _group_tasks_remove_duplicates(self, v):
tasks = v.to_string(index=False).split()
unique_tasks = set(tasks)
for t in unique_tasks:
return ", ".join(str(t) for t in unique_tasks)

def _group_time_and_sum(self, v):
return v.sum()

def _group_tasks_and_time(self):
df = self._filter_by_month(
"2022-06"
) # TODO Externalize _filter_by_month to the same function that _get_query()
print(df)
df_grouped = df.groupby("date").aggregate(
lambda v: self._group_tasks_remove_duplicates(v)
if v.name == "task"
else self._group_time_and_sum(v)
)
return df_grouped

def __call__(self, *args: Any, **kwds: Any) -> Any:
return self._group_tasks_and_time()


# e = TTrack(TTRACK_DB)
# print(e())

0 comments on commit 4fc6257

Please sign in to comment.