Skip to content

Commit

Permalink
Merge pull request #18 from zazuko/append-metadata
Browse files Browse the repository at this point in the history
Append metadata operation
  • Loading branch information
cristianvasquez authored Apr 25, 2022
2 parents c8f20e5 + 7b899e7 commit f368ec5
Show file tree
Hide file tree
Showing 21 changed files with 1,153 additions and 12 deletions.
87 changes: 87 additions & 0 deletions packages/rdf/docs/metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@

# Metadata operation

## Append

Say you have a `dataset_description.ttl` file containing:

```turtle
<http://example.org/test> a <http://schema.org/Dataset> .
```

Then a step:

```turtle
@prefix p: <https://pipeline.described.at/> .
@prefix code: <https://code.described.at/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
<attachMetadata>
a p:Step ;
rdfs:label "Attach metadata" ;
code:implementedBy [ a code:EcmaScriptModule ;
code:link <node:barnard59-rdf/metadata.js#append> ] ;
code:arguments [
code:name "input"; code:value "../../metadata/dataset_description.ttl"
],
[
code:name "dateCreated";
code:value "2020-05-30";
],
[
code:name "dateModified";
code:value "TIME_NOW";
] .
```

will append the contents of `dataset_description.ttl` to the stream, with new or updated `schema.dateModified` or `schema.dateCreated` properties.

```turtle
<http://example.org/test> <http://schema.org/dateModified> "2022-04-13T08:55:21.363Z"^^<http://www.w3.org/2001/XMLSchema#dateTime> .
<http://example.org/test> <http://schema.org/dateCreated> "2020-05-30"^^<http://www.w3.org/2001/XMLSchema#dateTime> .
```

### Parameters

#### input

The quads to append. Can be a file, a quad stream or an URL pointing to the resource.

### Optional parameters

#### basepath

Sets the base path used to fetch the file.

#### graph

The namedgraph used for the incoming metadata quads.

### Dataset Classes

The operation updates subjects with a type that's a 'well known dataset class', currently:

* http://rdfs.org/ns/void#Dataset
* http://www.w3.org/ns/dcat#Dataset

That will add or modify the `dcterms:created` and `dcterms:modified` properties, and:

* http://schema.org/Dataset
* https://cube.link/Cube

that will add or modify the `schema:dateCreated` and `schema:dateUpdated` properties.

### Named Date Literals

#### TIME_NOW

The current time

#### TIME_FILE_CREATION

The file creation time. Applies only to files

#### TIME_FILE_MODIFICATION

The file modification time. Applies only to files
47 changes: 47 additions & 0 deletions packages/rdf/lib/append.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Transform } from 'readable-stream'
import { localFetch } from './localFetch/localFetch.js'
import { applyOptions } from './metadata/applyOptions.js'

class MetadataAppend extends Transform {
constructor (context, basePath, input, options) {
super({ objectMode: true })
this.context = context
this.basePath = basePath
this.input = input
this.options = options
}

_transform (chunk, encoding, callback) {
callback(null, chunk)
}

async _flush (callback) {
try {
const { quadStream, metadata } = await localFetch(this.input, this.basePath)
for (const quad of await applyOptions(quadStream, metadata, this.options)) {
this.push(quad)
}
} catch (err) {
this.destroy(err)
} finally {
callback()
}
}
}

async function append ({
input,
basepath,
dateModified = undefined,
dateCreated = undefined,
graph = undefined
} = {}) {
if (!input) {
throw new Error('Needs input as parameter (url or filename)')
}
const basePath = this?.basePath ? this.basePath : basepath

return new MetadataAppend(this, basePath, input, { graph, dateModified, dateCreated })
}

export default append
2 changes: 1 addition & 1 deletion packages/rdf/lib/cube/buildCubeShape/Cube.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import TermSet from '@rdfjs/term-set'
import clownface from 'clownface'
import rdf from 'rdf-ext'
import cbdCopy from '../../cbdCopy.js'
import * as ns from '../../namespaces.js'
import Dimension from './Dimension.js'
import * as ns from './namespaces.js'

class Cube {
constructor ({ metadata, observationSet, shape, term }) {
Expand Down
2 changes: 1 addition & 1 deletion packages/rdf/lib/cube/buildCubeShape/Dimension.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import clownface from 'clownface'
import rdf from 'rdf-ext'
import { fromRdf } from 'rdf-literal'
import cbdCopy from '../../cbdCopy.js'
import * as ns from './namespaces.js'
import * as ns from '../../namespaces.js'

const datatypeParsers = new TermMap([
[ns.xsd.byte, fromRdf],
Expand Down
16 changes: 8 additions & 8 deletions packages/rdf/lib/cube/buildCubeShape/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import TermMap from '@rdfjs/term-map'
import TermSet from '@rdfjs/term-set'
import clownface from 'clownface'
import once from 'lodash/once.js'
import rdf from 'rdf-ext'
import $rdf from 'rdf-ext'
import { Transform } from 'readable-stream'
import * as ns from '../../namespaces.js'
import urlJoin from '../../urlJoin.js'
import Cube from './Cube.js'
import * as ns from './namespaces.js'

function defaultCube ({ observationSet }) {
const observationSetIri = observationSet && observationSet.value
Expand All @@ -15,7 +15,7 @@ function defaultCube ({ observationSet }) {
return null
}

return rdf.namedNode(urlJoin(observationSetIri, '..'))
return $rdf.namedNode(urlJoin(observationSetIri, '..'))
}

function defaultShape ({ term }) {
Expand All @@ -25,7 +25,7 @@ function defaultShape ({ term }) {
return null
}

return rdf.namedNode(urlJoin(cubeIri, 'shape'))
return $rdf.namedNode(urlJoin(cubeIri, 'shape'))
}

class CubeShapeBuilder extends Transform {
Expand All @@ -35,7 +35,7 @@ class CubeShapeBuilder extends Transform {
this.options = {
cubes: new TermMap(),
cube: defaultCube,
excludeValuesOf: new TermSet(excludeValuesOf ? excludeValuesOf.map(v => rdf.namedNode(v)) : []),
excludeValuesOf: new TermSet(excludeValuesOf ? excludeValuesOf.map(v => $rdf.namedNode(v)) : []),
metadataStream: metadata,
shape: defaultShape
}
Expand All @@ -45,9 +45,9 @@ class CubeShapeBuilder extends Transform {

async _init () {
if (this.options.metadataStream) {
this.options.metadata = await rdf.dataset().import(this.options.metadataStream)
this.options.metadata = await $rdf.dataset().import(this.options.metadataStream)
} else {
this.options.metadata = rdf.dataset()
this.options.metadata = $rdf.dataset()
}
}

Expand All @@ -58,7 +58,7 @@ class CubeShapeBuilder extends Transform {
return callback(err)
}

const dataset = rdf.dataset([...chunk])
const dataset = $rdf.dataset([...chunk])

const context = {
dataset,
Expand Down
91 changes: 91 additions & 0 deletions packages/rdf/lib/localFetch/localFetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import fsp from 'fs/promises'

import { resolve } from 'path'
import rdfFetch from '@rdfjs/fetch'
import fileFetch from 'file-fetch'
import isStream, { isReadable } from 'isstream'
import protoFetch from 'proto-fetch'
import { getParserByExtension } from './lookupParser.js'

function isReadableStream (arg) {
return isStream(arg) && isReadable(arg)
}

function isAbsolute (str) {
return str.startsWith('https:') || str.startsWith('http:') || str.startsWith('file:')
}

async function streamWithMetadata (input) {
return {
quadStream: input,
metadata: {
type: input.constructor.name
}
}
}

async function fetchHTTPWithMeta (input) {
const url = new URL(input, import.meta.url)
const res = await rdfFetch(url)
return {
quadStream: await res.quadStream(),
metadata: {
type: url.constructor.name,
value: url
}
}
}

function guessParserForFile (filePath) {
const parser = getParserByExtension(filePath)
if (!parser) {
throw new Error(`No parser could be guessed for ${filePath}`)
}
return parser
}

async function fetchFileWithMeta (input) {
const filePathURL = new URL(input, import.meta.url)
const res = await fileFetch(filePathURL.toString())
const stream = res.body
const quadStream = await guessParserForFile(input).import(stream)
return {
quadStream: quadStream,
metadata: {
type: filePathURL.constructor.name,
value: filePathURL.toString(),
stats: await fsp.lstat(filePathURL)
}
}
}

// Tries to fetch or read locally one file
async function localFetch (
input,
basePath
) {
if (!(input)) {
throw new Error('needs input filename or URL')
}
if (isReadableStream(input)) {
return streamWithMetadata(input, basePath)
}
if (typeof input !== 'string') {
throw new Error(`needs input filename or URL, got [${typeof input}]`)
}
const fetch = protoFetch({
file: fetchFileWithMeta,
http: fetchHTTPWithMeta,
https: fetchHTTPWithMeta
})

const url = isAbsolute(input)
? input
: basePath
? `file://${resolve(basePath, input)}`
: input

return fetch(url)
}

export { localFetch }
11 changes: 11 additions & 0 deletions packages/rdf/lib/localFetch/lookupParser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import defaultFormats from '@rdfjs/formats-common'
import mime from 'mime-types'

function getParserByExtension (fileUrl) {
const mimeType = mime.lookup(fileUrl.toString())
return defaultFormats.parsers.get(mimeType)
}

export {
getParserByExtension
}
72 changes: 72 additions & 0 deletions packages/rdf/lib/metadata/applyOptions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import TermSet from '@rdfjs/term-set'
import rdf from 'rdf-ext'
import * as ns from '../namespaces.js'
import { xsd } from '../namespaces.js'
import { wellKnownDatasetClasses, wellKnownDatasetClassesWithDcterms } from './datasetClasses.js'
import { namedDateLiterals } from './namedDateLiterals.js'

function subjectsWithDatasetType (dataset, classes) {
const result = new TermSet()
dataset
.filter(quad => (quad.predicate.equals(ns.rdf.type) && classes.has(quad.object)))
.forEach(quad => {
result.add(quad.subject)
})
return result
}

function updateOrInsert (dataset, datasetClasses, predicate, object) {
const targetSubjects = subjectsWithDatasetType(dataset, datasetClasses)

// Remove existent
dataset = dataset.filter(quad => {
return !(quad.predicate.equals(predicate) && targetSubjects.has(quad.subject))
})

// Append
for (const subject of targetSubjects) {
dataset.add(rdf.quad(subject, predicate, object))
}

return dataset
}

function toDateLiteral (item) {
return typeof item === 'string' ? rdf.literal(item, xsd.dateTime) : item
}

function toNamedNode (item) {
return typeof item === 'string' ? rdf.namedNode(item) : item
}

function resolveNamedDate (value, metadata) {
return namedDateLiterals.has(value) ? namedDateLiterals.get(value)(metadata) : toDateLiteral(value)
}

async function applyOptions (quadStream, metadata = {}, options = {}) {
let dataset = await rdf.dataset().import(quadStream)

// dateModified
if (options.dateModified) {
const dateModifiedLiteral = resolveNamedDate(options.dateModified, metadata)

dataset = updateOrInsert(dataset, wellKnownDatasetClassesWithDcterms, ns.dcterms.modified, dateModifiedLiteral)
dataset = updateOrInsert(dataset, wellKnownDatasetClasses, ns.schema.dateModified, dateModifiedLiteral)
}

// dateCreated
if (options.dateCreated) {
const dateCreatedLiteral = resolveNamedDate(options.dateCreated, metadata)
dataset = updateOrInsert(dataset, wellKnownDatasetClassesWithDcterms, ns.dcterms.created, dateCreatedLiteral)
dataset = updateOrInsert(dataset, wellKnownDatasetClasses, ns.schema.dateCreated, dateCreatedLiteral)
}

// Sets graph
if (options.graph) {
dataset = dataset.map(quad => rdf.quad(quad.subject, quad.predicate, quad.object, toNamedNode(options.graph)))
}

return dataset
}

export { applyOptions }
Loading

0 comments on commit f368ec5

Please sign in to comment.