Skip to content

Commit

Permalink
Qualification tool: Filter based on timestamp in event logs (#2947)
Browse files Browse the repository at this point in the history
* overall and per-app-name limit

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* add overall filter feature

* checkpoint for overall filter

* code cleanup

* per-app-name filter

* added tests

* addressed review comments

* addressed review comments

* update readme based on review comments

Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored Jul 20, 2021
1 parent d641125 commit c53b025
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 51 deletions.
41 changes: 33 additions & 8 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Filter event logs to be processed. 10 newest file with filenames containing "loc
```bash
$SPARK_HOME/bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain \
rapids-4-spark-tools_2.12-<version>.jar \
-m "local" -f "10-newest" \
-m "local" -f "10-newest-filesystem" \
/directory/with/eventlogs/
```

Expand Down Expand Up @@ -196,12 +196,31 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
of a selection criterion. i.e Select all
event logs except the ones which have
application name as the input string.
-f, --filter-criteria <arg> Filter newest or oldest N eventlogs for
processing.eg: 100-newest (for processing
newest 100 event logs). eg: 100-oldest (for
processing oldest 100 event logs)
-f, --filter-criteria <arg> Filter newest or oldest N eventlogs based on application start
timestamp, unique application name or filesystem
timestamp. Filesystem based filtering happens before any
application based filtering.
For application based filtering, the order in which filters are
applied is: application-name, start-app-time, filter-criteria.
Application based filter-criteria are:
100-newest (for processing newest 100 event logs based on
timestamp of the application inside the eventlog i.e application
start time)
100-oldest (for processing oldest 100 event logs based on
timestamp of the application inside the eventlog i.e application
start time)
100-newest-per-app-name (select at most 100 newest log files for
each unique application name)
100-oldest-per-app-name (select at most 100 oldest log files for
each unique application name)
Filesystem based filter criteria are:
100-newest-filesystem (for processing newest 100 event
logs based on filesystem timestamp).
100-oldest-filesystem (for processing oldest 100 event logs
based on filesystem timestamp).
-m, --match-event-logs <arg> Filter event logs whose filenames contain the
input string
input string. Filesystem based filtering happens before
any application based filtering.
-n, --num-output-rows <arg> Number of output rows in the summary report.
Default is 1000.
--num-threads <arg> Number of thread to use for parallel
Expand All @@ -224,6 +243,12 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
--report-read-schema Whether to output the read formats and
datatypes to the CSV file. This can be very
long. Default is false.
-s, --start-app-time <arg> Filter event logs whose application start
occurred within the past specified time
period. Valid time periods are
min(minute),h(hours),d(days),w(weeks),m(months).
If a period is not specified it defaults to
days.
-t, --timeout <arg> Maximum time in seconds to wait for the event
logs to be processed. Default is 24 hours
(86400 seconds) and must be greater than 3
Expand Down Expand Up @@ -592,8 +617,8 @@ For usage see below:
less than 10 applications). Default is false
-f, --filter-criteria <arg> Filter newest or oldest N event logs for processing.
Supported formats are:
To process 10 recent event logs: --filter-criteria "10-newest"
To process 10 oldest event logs: --filter-criteria "10-oldest"
To process 10 recent event logs: --filter-criteria "10-newest-timestamp"
To process 10 oldest event logs: --filter-criteria "10-oldest-timestamp"
-g, --generate-dot Generate query visualizations in DOT format.
Default is false
-m, --match-event-logs <arg> Filter event logs filenames which contains the input string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ object EventLogPathProcessor extends Logging {
logsWithTimestamp.filterKeys(_.eventLog.getName.contains(strMatch))
}.getOrElse(logsWithTimestamp)

val filteredLogs = filterNLogs.map { filter =>
val filteredLogs = if (filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) {
val filteredInfo = filterNLogs.get.split("-")
val numberofEventLogs = filteredInfo(0).toInt
val criteria = filteredInfo(1)
Expand All @@ -179,15 +179,21 @@ object EventLogPathProcessor extends Logging {
} else if (criteria.equals("oldest")) {
LinkedHashMap(matchedLogs.toSeq.sortWith(_._2 < _._2): _*)
} else {
logError("Criteria should be either newest or oldest")
logError("Criteria should be either newest-filesystem or oldest-filesystem")
Map.empty[EventLogInfo, Long]
}
matched.take(numberofEventLogs)
}.getOrElse(matchedLogs)

} else {
matchedLogs
}
filteredLogs.keys.toSeq
}

def filterByAppCriteria(filterNLogs: Option[String]): Boolean = {
filterNLogs.get.endsWith("-oldest") || filterNLogs.get.endsWith("-newest") ||
filterNLogs.get.endsWith("per-app-name")
}

def logApplicationInfo(app: ApplicationInfo) = {
logInfo(s"============== ${app.appId} (index=${app.index}) ==============")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ For usage see below:
val filterCriteria: ScallopOption[String] =
opt[String](required = false,
descr = "Filter newest or oldest N eventlogs for processing." +
"eg: 100-newest (for processing newest 100 event logs). " +
"eg: 100-oldest (for processing oldest 100 event logs)")
"eg: 100-newest-filesystem (for processing newest 100 event logs). " +
"eg: 100-oldest-filesystem (for processing oldest 100 event logs)")
val matchEventLogs: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose filenames contain the input string")
Expand All @@ -81,8 +81,10 @@ For usage see below:
descr = "Write an SVG graph out for the full application timeline.")

validate(filterCriteria) {
case crit if (crit.endsWith("-newest") || crit.endsWith("-oldest")) => Right(Unit)
case _ => Left("Error, the filter criteria must end with either -newest or -oldest")
case crit if (crit.endsWith("-newest-filesystem") ||
crit.endsWith("-oldest-filesystem")) => Right(Unit)
case _ => Left("Error, the filter criteria must end with either -newest-filesystem " +
"or -oldest-filesystem")
}

verify()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,25 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
" eg: s3a://<BUCKET>/eventlog1 /path/to/eventlog2")
val filterCriteria: ScallopOption[String] =
opt[String](required = false,
descr = "Filter newest or oldest N eventlogs for processing." +
"eg: 100-newest (for processing newest 100 event logs). " +
"eg: 100-oldest (for processing oldest 100 event logs). Filesystem " +
"based filtering happens before any application based filtering.")
descr = "Filter newest or oldest N eventlogs based on application start timestamp, " +
"unique application name or filesystem timestamp. Filesystem based filtering " +
"happens before any application based filtering." +
"For application based filtering, the order in which filters are" +
"applied is: application-name, start-app-time, filter-criteria." +
"Application based filter-criteria are:" +
"100-newest (for processing newest 100 event logs based on timestamp inside" +
"the eventlog) i.e application start time) " +
"100-oldest (for processing oldest 100 event logs based on timestamp inside" +
"the eventlog) i.e application start time) " +
"100-newest-per-app-name (select at most 100 newest log files for each unique " +
"application name) " +
"100-oldest-per-app-name (select at most 100 oldest log files for each unique " +
"application name)" +
"Filesystem based filter criteria are:" +
"100-newest-filesystem (for processing newest 100 event logs based on filesystem " +
"timestamp). " +
"100-oldest-filesystem (for processing oldest 100 event logsbased on filesystem " +
"timestamp).")
val applicationName: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose application name matches " +
Expand Down Expand Up @@ -97,8 +112,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
}

validate(filterCriteria) {
case crit if (crit.endsWith("-newest") || crit.endsWith("-oldest")) => Right(Unit)
case _ => Left("Error, the filter criteria must end with either -newest or -oldest")
case crit if (crit.endsWith("-newest-filesystem") || crit.endsWith("-oldest-filesystem")
|| crit.endsWith("-newest-per-app-name") || crit.endsWith("-oldest-per-app-name")
|| crit.endsWith("-oldest") || crit.endsWith("-newest")) => Right(Unit)
case _ => Left("Error, the filter criteria must end with -newest, -oldest, " +
"-newest-filesystem, -oldest-filesystem, -newest-per-app-name or -oldest-per-app-name")
}

validate(timeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ object QualificationMain extends Logging {
}

def argsContainsAppFilters(appArgs: QualificationArgs): Boolean = {
appArgs.applicationName.isSupplied || appArgs.startAppTime.isSupplied
val filterCriteria = appArgs.filterCriteria.toOption
appArgs.applicationName.isSupplied || appArgs.startAppTime.isSupplied ||
(filterCriteria.isDefined && (filterCriteria.get.endsWith("-newest") ||
filterCriteria.get.endsWith("-oldest") || filterCriteria.get.endsWith("-per-app-name")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class AppFilterImpl(
val apps = appsForFiltering.asScala

val filterAppName = appArgs.applicationName.getOrElse("")
val filterCriteria = appArgs.filterCriteria.getOrElse("")

val appNameFiltered = if (appArgs.applicationName.isSupplied && filterAppName.nonEmpty) {
val filtered = if (filterAppName.startsWith(NEGATE)) {
// remove ~ before passing it into the containsAppName function
Expand All @@ -98,7 +100,38 @@ class AppFilterImpl(
} else {
appNameFiltered
}
appTimeFiltered.map(_.eventlog).toSeq
val appCriteriaFiltered = if (appArgs.filterCriteria.isSupplied && filterCriteria.nonEmpty) {
if (filterCriteria.endsWith("-newest") || filterCriteria.endsWith("-oldest")) {
val filteredInfo = filterCriteria.split("-")
val numberofEventLogs = filteredInfo(0).toInt
val criteria = filteredInfo(1)
val filtered = if (criteria.equals("oldest")) {
appTimeFiltered.toSeq.sortBy(_.appInfo.get.startTime).take(numberofEventLogs)
} else {
appTimeFiltered.toSeq.sortBy(_.appInfo.get.startTime).reverse.take(numberofEventLogs)
}
filtered
} else if (filterCriteria.endsWith("-per-app-name")) {
val distinctAppNameMap = appTimeFiltered.groupBy(_.appInfo.get.appName)
val filteredInfo = filterCriteria.split("-")
val numberofEventLogs = filteredInfo(0).toInt
val criteria = filteredInfo(1)
val filtered = distinctAppNameMap.map { case (name, apps) =>
val sortedApps = if (criteria.equals("oldest")) {
apps.toSeq.sortBy(_.appInfo.get.startTime).take(numberofEventLogs)
} else {
apps.toSeq.sortBy(_.appInfo.get.startTime).reverse.take(numberofEventLogs)
}
(name, sortedApps)
}
filtered.values.flatMap(x => x)
} else {
appTimeFiltered
}
} else {
appTimeFiltered
}
appCriteriaFiltered.map(_.eventlog).toSeq
}

private def containsAppName(app: AppFilterReturnParameters, filterAppName: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {

private val expRoot = ToolTestUtils.getTestResourceFile("ProfilingExpectations")
private val logDir = ToolTestUtils.getTestResourcePath("spark-events-profiling")
private val qualLogDir = ToolTestUtils.getTestResourcePath("spark-events-qualification")

test("test single event") {
testSqlCompression()
Expand Down Expand Up @@ -373,9 +374,9 @@ class ApplicationInfoSuite extends FunSuite with Logging {
val appArgs = new ProfileArgs(Array(
"--match-event-logs",
matchFileName,
"src/test/resources/spark-events-qualification/udf_func_eventlog",
"src/test/resources/spark-events-qualification/udf_dataset_eventlog",
"src/test/resources/spark-events-qualification/dataset_eventlog"
s"$qualLogDir/udf_func_eventlog",
s"$qualLogDir/udf_dataset_eventlog",
s"$qualLogDir/dataset_eventlog"
))

val result = EventLogPathProcessor.processAllPaths(appArgs.filterCriteria.toOption,
Expand All @@ -399,7 +400,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
tempFile2.setLastModified(12324567) // oldest file
tempFile3.setLastModified(34567891) // second newest file
tempFile4.setLastModified(23456789)
val filterNew = "2-newest"
val filterNew = "2-newest-filesystem"
val appArgs = new ProfileArgs(Array(
"--filter-criteria",
filterNew,
Expand Down Expand Up @@ -440,7 +441,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
tempFile3.setLastModified(34567891) // second newest file
tempFile4.setLastModified(23456789)

val filterOld = "3-oldest"
val filterOld = "3-oldest-filesystem"
val matchFileName = "temp"
val appArgs = new ProfileArgs(Array(
"--filter-criteria",
Expand Down
Loading

0 comments on commit c53b025

Please sign in to comment.