diff --git a/packages/gmail/src/Adaptor.js b/packages/gmail/src/Adaptor.js index 23df3375e..ee1aed161 100644 --- a/packages/gmail/src/Adaptor.js +++ b/packages/gmail/src/Adaptor.js @@ -2,16 +2,17 @@ import { execute as commonExecute, composeNextState, } from '@openfn/language-common'; - import { normalizeOauthConfig, expandReferences, } from '@openfn/language-common/util'; +import { + createConnection, + fetchMessages, + getContentFromMessage, + removeConnection, +} from './Utils'; -import unzipper from 'unzipper'; -import { google } from 'googleapis'; - -let gmail = undefined; const isTesting = false; export function getContentsFromMessages( @@ -21,6 +22,9 @@ export function getContentsFromMessages( callback = s => s ) { return async state => { + const [resolvedUserId, resolvedQuery, resolvedDesiredContents] = + expandReferences(state, userId, query, desiredContents); + const messageContents = []; const currentIds = []; const previousIds = Array.isArray(state.processedIds) @@ -34,7 +38,11 @@ export function getContentsFromMessages( let nextPageToken = null; do { - const messagesResult = await fetchMessages(userId, query, nextPageToken); + const messagesResult = await fetchMessages( + resolvedUserId, + resolvedQuery, + nextPageToken + ); if (!messagesResult.messages?.length) { console.log('No messages found.'); break; @@ -48,12 +56,12 @@ export function getContentsFromMessages( id => !previousIds.includes(id) ); - for (let messageId of unprocessedIds) { + for (const messageId of unprocessedIds) { const messageContent = { messageId: messageId, }; - for (let hint of desiredContents) { + for (const hint of resolvedDesiredContents) { const desiredContent = typeof hint === 'object' ? hint : { type: hint }; @@ -85,7 +93,7 @@ export function getContentsFromMessages( `Duplicate content name detected: ${desiredContent.name}` ); } - + messageContent[desiredContent.name] ??= content; } @@ -107,170 +115,9 @@ export function getContentsFromMessages( }; } -async function fetchMessages(userId, query, lastPageToken) { - let messagesResponse = null; - - try { - messagesResponse = await gmail.users.messages.list({ - userId: userId, - q: query, - maxResults: 3, - pageToken: lastPageToken, - }); - } catch (error) { - throw new Error('Error fetching messages: ' + error.message); - } - - const { messages, nextPageToken } = messagesResponse.data; - - return { messages, nextPageToken }; -} - -async function getContentFromMessage(userId, messageId, desiredContent) { - const messageResponse = await gmail.users.messages.get({ - userId, - id: messageId, - format: 'full', - }); - - if (desiredContent.type === 'archive') { - const { attachmentId, filename } = getAttachmentInfo( - messageResponse, - desiredContent.archive - ); - - if (!attachmentId) { - return null; - } - - const archive = await getAttachment(userId, messageId, attachmentId); - const file = await getFileFromArchive(archive, desiredContent.file); - return { ...file, archiveFilename: filename }; - } - - if (desiredContent.type === 'file') { - const { attachmentId, filename } = getAttachmentInfo( - messageResponse, - desiredContent.file - ); - - if (!attachmentId) { - return null; - } - - const attachment = await getAttachment(userId, messageId, attachmentId); - const file = await getFileFromAttachment(attachment); - return { content: file, filename }; - } - - if (desiredContent.type === 'body') { - const body = getBodyFromMessage(messageResponse); - return body; - } - - if (desiredContent.type === 'subject') { - const headers = messageResponse.data?.payload?.headers; - const subject = headers?.find(h => h.name === 'Subject')?.value; - return subject; - } - - if (desiredContent.type === 'from') { - const headers = messageResponse.data?.payload?.headers; - const from = headers?.find(h => h.name === 'From')?.value; - return from; - } - - if (desiredContent.type === 'date') { - const headers = messageResponse.data?.payload?.headers; - const rawDate = headers?.find(h => h.name === 'Date')?.value; - const date = rawDate ? new Date(rawDate) : null; - return date; - } - - return `Unsupported content type: ${desiredContent.type}`; -} - -async function getAttachment(userId, messageId, attachmentId) { - return await gmail.users.messages.attachments.get({ - userId, - messageId, - id: attachmentId, - }); -} - -async function getFileFromAttachment(attachment) { - const base64String = attachment?.data?.data; - if (!base64String) { - throw new Error('No data found in file.'); - } - - const fileContent = atob(base64String); - - return isTesting ? fileContent.substring(0, 40) : fileContent; -} - -async function getFileFromArchive(archive, filePattern) { - const base64String = archive?.data?.data; - if (!base64String) { - throw new Error('No data found in zip attachmentResponse.'); - } - - const compressedBuffer = Buffer.from(base64String, 'base64'); - const directory = await unzipper.Open.buffer(compressedBuffer); - - const file = directory?.files.find(f => filePattern.test(f.path)); - - if (!file) { - throw new Error('File not found in the archive.'); - } - - const fileBuffer = await file.buffer(); - const fileString = fileBuffer.toString('base64'); - const fileContent = atob(fileString); - - return { - content: isTesting ? fileContent.substring(0, 40) : fileContent, - filename: file.path, - }; -} - -function getAttachmentInfo(messageResponse, regex) { - const parts = messageResponse?.data?.payload?.parts; - const part = parts?.find(p => regex.test(p.filename)); - - return part - ? { attachmentId: part.body.attachmentId, filename: part.filename } - : { attachmentId: null }; -} - -function getBodyFromMessage(fullMessage) { - const parts = fullMessage?.data?.payload?.parts; - - const bodyPart = parts?.find( - part => part.mimeType === 'multipart/alternative' - ); - - const textBodyPart = bodyPart?.parts.find( - part => part.mimeType === 'text/plain' - ); - - const textBody = textBodyPart?.body?.data; - if (textBody) { - let body = Buffer.from(textBody, 'base64').toString('utf-8'); - return isTesting ? body.substring(0, 40) : body; - } - - return null; -} - /** - * Execute a sequence of oper. + * Execute a sequence of operations. * Wraps `language-common/execute`, and prepends initial state for http. - * @example - * execute( - * create('foo'), - * delete('bar') - * )(state) * @private * @param {Operations} operations - Operations to be performed. * @returns {Operation} @@ -282,8 +129,6 @@ export function execute(...operations) { }; return state => { - // Note: we no longer need `steps` anymore since `commonExecute` - // takes each operation as an argument. return commonExecute( createConnection, ...operations, @@ -296,36 +141,6 @@ export function execute(...operations) { }; } -function createConnection(state) { - const { access_token } = state.configuration; - - const auth = new google.auth.OAuth2(); - auth.credentials = { access_token }; - - gmail = google.gmail({ version: 'v1', auth }); - - return state; -} - -function removeConnection(state) { - gmail = undefined; - return state; -} - -function logError(error) { - console.log('RAW ERROR:', error); - const { code, errors, response } = error; - if (code && errors && response) { - console.error('The API returned an error:', errors); - - const { statusText, config } = response; - const { url, method, body } = config; - const message = `${method} ${url} - ${code}:${statusText} \nbody: ${body}`; - - console.log(message); - } -} - export { alterState, combine, diff --git a/packages/gmail/src/Utils.js b/packages/gmail/src/Utils.js index 9bce9240f..c466ac797 100644 --- a/packages/gmail/src/Utils.js +++ b/packages/gmail/src/Utils.js @@ -1,70 +1,190 @@ -import { composeNextState } from '@openfn/language-common'; -import { - request as commonRequest, - makeBasicAuthHeader, - assertRelativeUrl, -} from '@openfn/language-common/util'; -import nodepath from 'node:path'; - -export const prepareNextState = (state, response, callback = s => s) => { - const { body, ...responseWithoutBody } = response; - - if (!state.references) { - state.references = []; +import unzipper from 'unzipper'; +import { google } from 'googleapis'; + +let gmail = undefined; + +export async function fetchMessages(userId, query, lastPageToken) { + let messagesResponse = null; + + try { + messagesResponse = await gmail.users.messages.list({ + userId: userId, + q: query, + maxResults: 3, + pageToken: lastPageToken, + }); + } catch (error) { + throw new Error('Error fetching messages: ' + error.message); } - const nextState = { - ...composeNextState(state, response.body), - response: responseWithoutBody, - }; + const { messages, nextPageToken } = messagesResponse.data; - return callback(nextState); -}; - -// This helper function will call out to the backend service -// and add authorisation headers -// Refer to the common request function for options and details -export const request = (configuration = {}, method, path, options) => { - // You might want to check that the path is not an absolute URL befor - // appending credentials commonRequest will do this for you if you - // pass a baseURL to it and you don't need to build a path here - // assertRelativeUrl(path); - - // TODO This example adds basic auth from config data - // you may need to support other auth strategies - const { baseUrl, username, password } = configuration; - const headers = makeBasicAuthHeader(username, password); - - // TODO You can define custom error messages here - // The request function will throw if it receives - // an error code (<=400), terminating the workflow - const errors = { - 404: 'Page not found', - }; + return { messages, nextPageToken }; +} + +export async function getContentFromMessage(userId, messageId, desiredContent) { + const messageResponse = await gmail.users.messages.get({ + userId, + id: messageId, + format: 'full', + }); + + if (desiredContent.type === 'archive') { + const { attachmentId, filename } = getAttachmentInfo( + messageResponse, + desiredContent.archive + ); + + if (!attachmentId) { + return null; + } + + const archive = await getAttachment(userId, messageId, attachmentId); + const file = await getFileFromArchive(archive, desiredContent.file); + return { ...file, archiveFilename: filename }; + } + + if (desiredContent.type === 'file') { + const { attachmentId, filename } = getAttachmentInfo( + messageResponse, + desiredContent.file + ); + + if (!attachmentId) { + return null; + } + + const attachment = await getAttachment(userId, messageId, attachmentId); + const file = await getFileFromAttachment(attachment); + return { content: file, filename }; + } + + if (desiredContent.type === 'body') { + const body = getBodyFromMessage(messageResponse); + return body; + } + + if (desiredContent.type === 'subject') { + const headers = messageResponse.data?.payload?.headers; + const subject = headers?.find(h => h.name === 'Subject')?.value; + return subject; + } + + if (desiredContent.type === 'from') { + const headers = messageResponse.data?.payload?.headers; + const from = headers?.find(h => h.name === 'From')?.value; + return from; + } + + if (desiredContent.type === 'date') { + const headers = messageResponse.data?.payload?.headers; + const rawDate = headers?.find(h => h.name === 'Date')?.value; + const date = rawDate ? new Date(rawDate) : null; + return date; + } + + return `Unsupported content type: ${desiredContent.type}`; +} - const opts = { - // Force the response to be parsed as JSON - parseAs: 'json', +export async function getAttachment(userId, messageId, attachmentId) { + return await gmail.users.messages.attachments.get({ + userId, + messageId, + id: attachmentId, + }); +} - // Include the error map - errors, +export async function getFileFromAttachment(attachment) { + const base64String = attachment?.data?.data; + if (!base64String) { + throw new Error('No data found in file.'); + } + + const fileContent = atob(base64String); + + return isTesting ? fileContent.substring(0, 40) : fileContent; +} + +export async function getFileFromArchive(archive, filePattern) { + const base64String = archive?.data?.data; + if (!base64String) { + throw new Error('No data found in zip attachmentResponse.'); + } + + const compressedBuffer = Buffer.from(base64String, 'base64'); + const directory = await unzipper.Open.buffer(compressedBuffer); - // Set the baseUrl from the config object - baseUrl, + const file = directory?.files.find(f => filePattern.test(f.path)); - ...options, + if (!file) { + throw new Error('File not found in the archive.'); + } + + const fileBuffer = await file.buffer(); + const fileString = fileBuffer.toString('base64'); + const fileContent = atob(fileString); - // You can add extra headers here if you want to - headers: { - 'content-type': 'application/json', - ...headers, - }, + return { + content: isTesting ? fileContent.substring(0, 40) : fileContent, + filename: file.path, }; +} + +export function getAttachmentInfo(messageResponse, regex) { + const parts = messageResponse?.data?.payload?.parts; + const part = parts?.find(p => regex.test(p.filename)); + + return part + ? { attachmentId: part.body.attachmentId, filename: part.filename } + : { attachmentId: null }; +} + +export function getBodyFromMessage(fullMessage) { + const parts = fullMessage?.data?.payload?.parts; + + const bodyPart = parts?.find( + part => part.mimeType === 'multipart/alternative' + ); - // TODO you may want to add a prefix to the path - // use path.join to build the path safely - const safePath = nodepath.join(path); + const textBodyPart = bodyPart?.parts.find( + part => part.mimeType === 'text/plain' + ); - // Make the actual request - return commonRequest(method, safePath, options); -}; + const textBody = textBodyPart?.body?.data; + if (textBody) { + let body = Buffer.from(textBody, 'base64').toString('utf-8'); + return isTesting ? body.substring(0, 40) : body; + } + + return null; +} + +export function createConnection(state) { + const { access_token } = state.configuration; + + const auth = new google.auth.OAuth2(); + auth.credentials = { access_token }; + + gmail = google.gmail({ version: 'v1', auth }); + + return state; +} + +export function removeConnection(state) { + gmail = undefined; + return state; +} + +export function logError(error) { + console.log('RAW ERROR:', error); + const { code, errors, response } = error; + if (code && errors && response) { + console.error('The API returned an error:', errors); + + const { statusText, config } = response; + const { url, method, body } = config; + const message = `${method} ${url} - ${code}:${statusText} \nbody: ${body}`; + + console.log(message); + } +} diff --git a/packages/gmail/test/Adaptor.test.js b/packages/gmail/test/Adaptor.test.js index d12340805..306a7f4c0 100644 --- a/packages/gmail/test/Adaptor.test.js +++ b/packages/gmail/test/Adaptor.test.js @@ -1,75 +1 @@ -import { expect } from 'chai'; -import { enableMockClient } from '@openfn/language-common/util'; - -import { request, dataValue } from '../src/Adaptor.js'; - -// This creates a mock client which acts like a fake server. -// It enables pattern-matching on the request object and custom responses -// For the full mock API see -// https://undici.nodejs.org/#/docs/api/MockPool?id=mockpoolinterceptoptions -const testServer = enableMockClient('https://fake.server.com'); - -describe('request', () => { - it('makes a post request to the right endpoint', async () => { - // Setup a mock endpoint - testServer - .intercept({ - path: '/api/patients', - method: 'POST', - headers: { - Authorization: 'Basic aGVsbG86dGhlcmU=', - }, - }) - // Set the reply from this endpoint - // The body will be returned to state.data - .reply(200, { id: 7, fullName: 'Mamadou', gender: 'M' }); - - const state = { - configuration: { - baseUrl: 'https://fake.server.com', - username: 'hello', - password: 'there', - }, - data: { - fullName: 'Mamadou', - gender: 'M', - }, - }; - - const finalState = await request('POST', 'patients', { - name: state.data.fullName, - gender: state.data.gender, - })(state); - - expect(finalState.data).to.eql({ - fullName: 'Mamadou', - gender: 'M', - id: 7, - }); - }); - - it('throws an error if the service returns 403', async () => { - testServer - .intercept({ - path: '/api/noAccess', - method: 'POST', - }) - .reply(403); - - const state = { - configuration: { - baseUrl: 'https://fake.server.com', - username: 'hello', - password: 'there', - }, - }; - - const error = await request('POST', 'noAccess', { name: 'taylor' })( - state - ).catch(error => { - return error; - }); - - expect(error.statusMessage).to.eql('Forbidden'); - }); -}); +describe.skip('getContentsFromMessages', () => {});