Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bookmarking and new streams to Slack Tap #8

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,42 @@ version: 2
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
steps:
- checkout
- run:
name: 'Setup virtual env'
command: |
/root/.pyenv/versions/3.6.9/bin/python -mvenv /usr/local/share/virtualenvs/tap-slack
python3 -mvenv /usr/local/share/virtualenvs/tap-slack

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
python3 -mvenv /usr/local/share/virtualenvs/tap-slack
pyenv global 3.6.9
pyenv rehash
python3 -mvenv /usr/local/share/virtualenvs/tap-slack

source /usr/local/share/virtualenvs/tap-slack/bin/activate
pip install -U 'pip<19.2' setuptools
pip install .[dev]
- add_ssh_keys
- run:
name: 'pylint'
name: 'Integration Tests'
command: |
source /usr/local/share/virtualenvs/tap-slack/bin/activate
pylint tap_slack -d C,R,W,no-member
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh
source dev_env.sh
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-slack \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
[email protected] \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tests
workflows:
version: 2
commit:
jobs:
- build
build_daily:
triggers:
- schedule:
cron: "0 0 * * *"
filters:
branches:
only:
- master
jobs:
- build
172 changes: 159 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,81 @@ deactivate
## Setup

The tap requires a [Slack API token](https://github.com/slackapi/python-slackclient/blob/master/documentation_v2/auth.md#tokens--authentication) to interact with your Slack workspace. You can obtain a token for a single workspace by creating a new [Slack App](https://api.slack.com/apps?new_app=1) in your workspace and assigning it the relevant [scopes](https://api.slack.com/docs/oauth-scopes). As of right now, the minimum required scopes for this App are:
- `channels:read`
- `channels:history`
- `channels:join`
- `channels:read`
- `files:read`
- `groups:read`
- `links:read`
- `reactions:read`
- `remote_files:read`
- `remote_files:write`
- `team:read`
- `usergroups:read`
- `users.profile:read`
- `users:read`

Create a config file containing the API token and a start date, e.g.:

```json
{
"token":"xxxx",
"start_date":"2020-05-01T00:00:00"
}
```

### Private channels

Optionally, you can also specify whether you want to sync private channels or not by adding the following to the config:

```json
"private_channels":"false"
```

By default, private channels will be synced.

### Joining Public Channels

By adding the following to your config file you can have the tap auto-join all public channels in your ogranziation.
```json
"join_public_channels":"true"
```
If you do not elect to have the tap join all public channels you must invite the bot to all channels you wish to sync.

### Specify channels to sync

By default, the tap will sync all channels it has been invited to. However, you can limit the tap to sync only the channels you specify by adding their IDs to the config file, e.g.:

```json
"channels":[
"abc123",
"def345"
]
```

Note this needs to be channel ID, not the name, as [recommended by the Slack API](https://api.slack.com/types/conversation#other_attributes). To get the ID for a channel, either use the Slack API or [find it in the URL](https://www.wikihow.com/Find-a-Channel-ID-on-Slack-on-PC-or-Mac).

### Archived Channels

You can control whether or not the tap will sync archived channels by including the following in the tap config:
```json
"exclude_archived": "false"
```
It's important to note that a bot *CANNOT* join an archived channel, so unless the bot was added to the channel prior to it being archived it will not be able to sync the data from that channel.

### Date Windowing

Due to the potentially high volume of data when syncing certain streams (messages, files, threads)
this tap implements date windowing based on a configuration parameter.

including
```json
"date_window_size": "5"
```

Will cause the tap to sync 5 days of data per request, for applicable streams. The default value if
one is not defined is to window requests for 7 days at a time.

## Usage

It is recommended to follow Singer [best practices](https://github.com/singer-io/getting-started/blob/master/docs/RUNNING_AND_DEVELOPING.md#running-and-developing-singer-taps-and-targets) when running taps either [on their own](https://github.com/singer-io/getting-started/blob/master/docs/RUNNING_AND_DEVELOPING.md#running-a-singer-tap) or [with a Singer target](https://github.com/singer-io/getting-started/blob/master/docs/RUNNING_AND_DEVELOPING.md#running-a-singer-tap-with-a-singer-target).
Expand All @@ -35,36 +106,35 @@ In practice, it will look something like the following:
## Replication

The Slack Conversations API does not natively store last updated timestamp information about a Conversation. In addition, Conversation records are mutable. Thus, `tap-slack` requires a `FULL_TABLE` replication strategy to ensure the most up-to-date data in replicated when replicating the following Streams:
- `Conversations`
- `ConversationMembersStream`
- `ConversationHistoryStream`
- `Channels` (Conversations)
- `Channel Members` (Conversation Members)

The `Users` stream _does_ store information about when a User record was last updated, so `tap-slack` uses that timestamp as a bookmark value and prefers using an `INCREMENTAL` replication strategy.

## Table Schemas

### Conversations
### Channels (Conversations)

- Table Name: `conversations`
- Table Name: `channels`
- Description:
- Primary Key Column: `id`
- Replication Strategy: `FULL_TABLE`
- API Documentation: [Link](https://api.slack.com/methods/conversations.list)

### Conversation Members
### Channel Members (Conversation Members)

- Table Name: `conversation_members`
- Table Name: `channel_members`
- Description:
- Primary Key Column: N/A
- Primary Key Columns: `channel_id`, `user_id`
- Replication Strategy: `FULL_TABLE`
- API Documentation: [Link](https://api.slack.com/methods/conversations.members)

### Conversation History
### Messages (Conversation History)

- Table Name: `conversation_history`
- Table Name: `messages`
- Description:
- Primary Key Column: N/A
- Replication Strategy: `FULL_TABLE`
- Primary Key Columns: `channel_id`, `ts`
- Replication Strategy: `INCREMENTAL`
- API Documentation: [Link](https://api.slack.com/methods/conversations.history)

### Users
Expand All @@ -74,5 +144,81 @@ The `Users` stream _does_ store information about when a User record was last up
- Primary Key Column: `id`
- Replication Strategy: `INCREMENTAL`
- API Documentation: [Link](https://api.slack.com/methods/users.list)

### Threads (Conversation Replies)

- Table Name: `threads`
- Description:
- Primary Key Columns: `channel_id`, `ts`, `thread_ts`
- Replication Strategy: `FULL_TABLE` for each parent `message`
- API Documentation: [Link](https://api.slack.com/methods/conversations.replies)

### User Groups

- Table Name: `user_groups`
- Description:
- Primary Key Column: `id`
- Replication Strategy: `FULL_TABLE`
- API Documentation: [Link](https://api.slack.com/methods/usergroups.list)

### Files

- Table Name: `files`
- Description:
- Primary Key Column: `id`
- Replication Strategy: `INCREMENTAL` query filtered using date windows and lookback window
- API Documentation: [Link](https://api.slack.com/methods/files.list)

### Remote Files

- Table Name: `remote_files`
- Description:
- Primary Key Column: `id`
- Replication Strategy: `INCREMENTAL` query filtered using date windows and lookback window
- API Documentation: [Link](https://api.slack.com/methods/files.remote.list)

## Testing the Tap

While developing the Slack tap, the following utilities were run in accordance with Singer.io best practices:
Pylint to improve [code quality](https://github.com/singer-io/getting-started/blob/master/docs/BEST_PRACTICES.md#code-quality):
```bash
> pylint tap_slack -d missing-docstring -d logging-format-interpolation -d too-many-locals -d too-many-arguments
```
Pylint test resulted in the following score:
```bash
Your code has been rated at 9.72/10

```

To [check the tap](https://github.com/singer-io/singer-tools#singer-check-tap) and verify working:
```bash
> tap-slack --config tap_config.json --catalog catalog.json | singer-check-tap > state.json
> tail -1 state.json > state.json.tmp && mv state.json.tmp state.json
```
Check tap resulted in the following:
```bash
Checking stdin for valid Singer-formatted data
The output is valid.
It contained 3657 messages for 9 streams.

581 schema messages
2393 record messages
683 state messages

Details by stream:
+-----------------+---------+---------+
| stream | records | schemas |
+-----------------+---------+---------+
| threads | 633 | 573 |
| user_groups | 1 | 1 |
| channel_members | 1049 | 1 |
| users | 22 | 1 |
| channels | 0 | 1 |
| remote_files | 3 | 1 |
| messages | 573 | 1 |
| teams | 1 | 1 |
| files | 111 | 1 |
+-----------------+---------+---------+
```
----
Copyright &copy; 2019 Stitch
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
py_modules=['tap_slack'],
install_requires=[
'singer-python==5.9.0',
'slackclient==2.0.1'
'slackclient==2.6.0',
],
extras_require={
'dev': [
Expand Down
84 changes: 70 additions & 14 deletions tap_slack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,80 @@
import json
import singer
from slack import WebClient
from .streams import AVAILABLE_STREAMS
from .catalog import generate_catalog

from tap_slack.client import SlackClient
from tap_slack.streams import AVAILABLE_STREAMS
from tap_slack.catalog import generate_catalog

LOGGER = singer.get_logger()

def discover(webclient):

def auto_join(client, config):

if "channels" in config:
conversations = config.get("channels")

for conversation_id in conversations:
join_response = client.join_channel(channel=conversation_id)
if not join_response.get("ok", False):
error = join_response.get("error", "Unspecified Error")
LOGGER.error('Error joining {}, Reason: {}'.format(conversation_id, error))
raise Exception('{}: {}'.format(conversation_id, error))
else:
response = client.get_all_channels(types="public_channel", exclude_archived="true")
conversations = response.get("channels", [])

for conversation in conversations:
conversation_id = conversation.get("id", None)
conversation_name = conversation.get("name", None)
join_response = client.join_channel(channel=conversation_id)
if not join_response.get("ok", False):
error = join_response.get("error", "Unspecified Error")
LOGGER.error('Error joining {}, Reason: {}'.format(conversation_name, error))
raise Exception('{}: {}'.format(conversation_name, error))


def discover(client):
LOGGER.info('Starting Discovery..')
streams = [stream_class(webclient) for _,stream_class in AVAILABLE_STREAMS.items()]
streams = [stream_class(client) for _, stream_class in AVAILABLE_STREAMS.items()]
catalog = generate_catalog(streams)
json.dump(catalog, sys.stdout, indent=2)
LOGGER.info("Finished Discovery..")


def sync(webclient, config, catalog, state):

def sync(client, config, catalog, state):
LOGGER.info('Starting Sync..')
for catalog_entry in catalog.get_selected_streams(state):
stream = AVAILABLE_STREAMS[catalog_entry.stream](webclient=webclient, config=config, catalog_stream=catalog_entry.stream, state=state)
LOGGER.info('Syncing stream: %s', catalog_entry.stream)
stream.write_schema()
stream.sync(catalog_entry.metadata)
stream.write_state()
selected_streams = catalog.get_selected_streams(state)

streams = []
stream_keys = []
for catalog_entry in selected_streams:
streams.append(catalog_entry)
stream_keys.append(catalog_entry.stream)

if "threads" in stream_keys and "messages" not in stream_keys:
sync_messages = False
streams.append(catalog.get_stream("messages"))
elif "messages" in stream_keys:
sync_messages = True
else:
sync_messages = False

for catalog_entry in streams:
if "threads" != catalog_entry.stream:
if "messages" == catalog_entry.stream:
stream = AVAILABLE_STREAMS[catalog_entry.stream](client=client, config=config,
catalog=catalog,
state=state,
write_to_singer=sync_messages)
else:
stream = AVAILABLE_STREAMS[catalog_entry.stream](client=client, config=config,
catalog=catalog,
state=state)
LOGGER.info('Syncing stream: %s', catalog_entry.stream)
stream.write_schema()
stream.sync(catalog_entry.metadata)
stream.write_state()

LOGGER.info('Finished Sync..')

Expand All @@ -32,11 +84,15 @@ def main():
args = singer.utils.parse_args(required_config_keys=['token', 'start_date'])

webclient = WebClient(token=args.config.get("token"))
client = SlackClient(webclient=webclient, config=args.config)

if args.discover:
discover(webclient=webclient)
discover(client=client)
elif args.catalog:
sync(webclient=webclient, config=args.config, catalog=args.catalog, state=args.state)
if args.config.get("join_public_channels", "false") == "true":
auto_join(client=client, config=args.config)
sync(client=client, config=args.config, catalog=args.catalog, state=args.state)


if __name__ == '__main__':
main()
10 changes: 5 additions & 5 deletions tap_slack/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ def generate_catalog(streams):
'stream': stream.name,
'tap_stream_id': stream.name,
'schema': schema,
'metadata': singer.metadata.get_standard_metadata(schema=schema,
key_properties=stream.key_properties,
valid_replication_keys=stream.valid_replication_keys,
replication_method=stream.replication_method)
'metadata': singer.metadata.get_standard_metadata(
schema=schema,
key_properties=stream.key_properties,
valid_replication_keys=stream.valid_replication_keys,
replication_method=stream.replication_method)
}
# TODO: valid_replication_keys do not get marked with automatic metadata which could break bookmarking
catalog['streams'].append(catalog_entry)

return catalog
Loading